前言
很显然,你应该不至于使用 EntityFramework 直接插入 10W 数据到数据库中,那可能得用上个几分钟。EntityFramework 最被人诟病的地方就是它的性能,处理大量数据时的效率。此种条件下,通常会转回使用 ADO.NET 来完成任务。
但是,如果已经在项目中使用了 EntityFramework,如果碰到需要直接向数据库中插入 10W 的数据的需求,引入 ADO.NET 和 SqlBulkCopy 的组合将打破 EntityFramework 作为 ORM 所带来的优势,我们不得不再次去编写那些 SQL 语句,关注表结构的细节,相应的代码可维护性也在下降。
那么,假设我们将 SqlBulkCopy 的功能封装为 EntityFramework 中的一个扩展方法,通过接口像外暴露 BulkInsert 方法。这样,我们既没有改变使用 EntityFramework 的习惯,同时也隐藏了 SqlBulkCopy 的代码细节,更重要的是,合理的封装演进出复用的可能性,可以在多个 Entity 表中使用。
环境准备
以下测试基于 EntityFramework 6.0.2 版本。
首先定义一个 Customer 类:
1 public class Customer
2 {
3 public long Id { get; set; }
4 public string Name { get; set; }
5 public string Address { get; set; }
6 public string Phone { get; set; }
7 }
通过 CustomerMap 类将 Entity 映射到数据库表结构:
1 public class CustomerMap : EntityTypeConfiguration<Customer>
2 {
3 public CustomerMap()
4 {
5 // Primary Key
6 this.HasKey(t => t.Id);
7
8 // Properties
9 this.Property(t => t.Name)
10 .IsRequired()
11 .HasMaxLength(256);
12
13 this.Property(t => t.Phone)
14 .IsRequired()
15 .HasMaxLength(256);
16
17 // Table & Column Mappings
18 this.ToTable("Customer", "STORE");
19 this.Property(t => t.Id).HasColumnName("Id");
20 this.Property(t => t.Name).HasColumnName("Name");
21 this.Property(t => t.Address).HasColumnName("Address");
22 this.Property(t => t.Phone).HasColumnName("Phone");
23 }
24 }
我们定义数据库的名字为 “Retail”,则使用 RetailEntities 类来实现 DbContext :
1 public class RetailEntities : DbContext
2 {
3 static RetailEntities()
4 {
5 Database.SetInitializer<RetailEntities>(
6 new DropCreateDatabaseAlways<RetailEntities>());
7 }
8
9 public RetailEntities()
10 : base("Name=RetailEntities")
11 {
12 }
13
14 public DbSet<Customer> Customers { get; set; }
15
16 protected override void OnModelCreating(DbModelBuilder modelBuilder)
17 {
18 modelBuilder.Configurations.Add(new CustomerMap());
19 }
20 }
将 DatabaseInitializer 设置为 DropCreateDatabaseAlways,这样我们可以保证每次都针对新表进行测试。
如果需要更复杂的模型,我们将基于如下的模型进行测试:
测试主机
数据库:Microsoft SQL Server 2012 (64-bit)
EntityFramework 插入 10W 数据需要多久
我们先来看下EntityFramework 插入 10W 数据需要多久。
构造 10W 个 Customer 实例:
1 int customerCount = 100000;
2
3 List<Customer> customers = new List<Customer>();
4 for (int i = 0; i < customerCount; i++)
5 {
6 Customer customer = new Customer()
7 {
8 Name = "Dennis Gao" + i,
9 Address = "Beijing" + i,
10 Phone = "18888888888" + i,
11 };
12 customers.Add(customer);
13
14 Console.Write(".");
15 }
使用如下语法来将上面构造的 10W 数据保存到数据库中:
1 using (RetailEntities context = new RetailEntities())
2 {
3 foreach (var entity in customers)
4 {
5 context.Customers.Add(entity);
6 }
7 context.SaveChanges();
8 }
通过 context.SaveChanges() 来保证一次事务提交。
为了计算使用时间,在上面代码的前后加上 Stopwatch 来计算:
1 Stopwatch watch = Stopwatch.StartNew();
2
3 using (RetailEntities context = new RetailEntities())
4 {
5 foreach (var entity in customers)
6 {
7 context.Customers.Add(entity);
8 }
9 context.SaveChanges();
10 }
11
12 watch.Stop();
13 Console.WriteLine(string.Format(
14 "{0} customers are created, cost {1} milliseconds.",
15 customerCount, watch.ElapsedMilliseconds));
然后运行,
好吧,我应该没有耐心等待它运行完。
现在减少数据量进行测试,将数据数量降低到 500 条,
空表插入500 条数据耗时 5652 毫秒。我多测了几遍,这个数据稳定在 5 秒以上。
将数据量改变到 1000 条,
将数据量改变到 1500 条,
将数据量改变到 10000 条,
那么我们估计下 10W 数据大概需要 10W / 500 * 2 = 至少 400 秒 = 至少 6 分钟。
好吧,慢是毋庸置疑的。
SqlBulkCopy 接口描述
Microsoft SQL Server 提供一个称为 bcp 的流行的命令提示符实用工具,用于将数据从一个表移动到另一个表(表既可以在同一个服务器上,也可以在不同服务器上)。 SqlBulkCopy 类允许编写提供类似功能的托管代码解决方案。 还有其他将数据加载到 SQL Server 表的方法(例如 INSERT 语句),但相比之下 SqlBulkCopy 提供明显的性能优势。
使用 SqlBulkCopy 类只能向 SQL Server 表写入数据。 但是,数据源不限于 SQL Server;可以使用任何数据源,只要数据可加载到 DataTable 实例或可使用 IDataReader 实例读取数据。
WriteToServer(DataRow[]) | 将所提供的 DataRow 数组中的所有行复制到 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。 |
WriteToServer(DataTable) | 将所提供的 DataTable 中的所有行复制到 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。 |
WriteToServer(IDataReader) | 将所提供的 IDataReader 中的所有行复制到 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。 |
WriteToServer(DataTable, DataRowState) | 只将与所提供 DataTable 中所提供行状态匹配的行复制到 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。 |
在 .NET 4.5 中还提供了支持 async 语法的接口。
WriteToServerAsync(DataRow[]) | WriteToServer 的异步版本,将 DataRow 数组中提供的所有行都复制到由 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。 |
WriteToServerAsync(DataTable) | WriteToServer 的异步版本,将 DataTable 中提供的所有行都复制到 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。 |
WriteToServerAsync(IDataReader) | WriteToServer 的异步版本,将 IDataReader 中提供的所有行都复制到 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。 |
WriteToServerAsync(DataRow[], CancellationToken) | WriteToServer 的异步版本,将 DataRow 数组中提供的所有行都复制到由 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。取消标记可用于在命令超时超过前请求放弃操作。通过返回的任务对象将报告异常。 |
WriteToServerAsync(DataTable, DataRowState) | WriteToServer 的异步版本,之间与 DataTable 中所提供行状态匹配的行复制到 SqlBulkCopy 对象的 DestinationTableName 属性中指定的目标表中。 |
WriteToServerAsync(DataTable, CancellationToken) | WriteToServer 的异步版本,将 DataTable 中提供的所有行都复制到 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。取消标记可用于在命令超时超过前请求放弃操作。通过返回的任务对象将报告异常。 |
WriteToServerAsync(IDataReader, CancellationToken) | WriteToServer 的异步版本,将 IDataReader 中提供的所有行都复制到 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。取消标记可用于在命令超时超过前请求放弃操作。通过返回的任务对象将报告异常。 |
WriteToServerAsync(DataTable, DataRowState, CancellationToken) | WriteToServer 的异步版本,之间与 DataTable 中所提供行状态匹配的行复制到 SqlBulkCopy 对象的 DestinationTableName 属性中指定的目标表中。取消标记可用于在命令超时超过前请求放弃操作。通过返回的任务对象将报告异常。 |
这里,我们选用 DataTable 来构建数据源,将 10W 数据导入 DataTable 中。可以看出,我们需要构建出给定 Entity 类型所对应的数据表的 DataTable,将所有的 entities 数据插入到 DataTable 中。
构建 TableMapping 映射
此时,我并不想手工书写表中的各字段名称,同时,我可能甚至都不想关心 Entity 类到底被映射到了数据库中的哪一张表上。
此处,我们定义一个 TableMapping 类,用于存储一张数据库表的映射信息。
在获取和生成 TableMapping 之前,我们需要先定义和获取 DbMapping 类。
1 internal class DbMapping
2 {
3 public DbMapping(DbContext context)
4 {
5 _context = context;
6
7 var objectContext = ((IObjectContextAdapter)context).ObjectContext;
8 _metadataWorkspace = objectContext.MetadataWorkspace;
9
10 _codeFirstEntityContainer = _metadataWorkspace.GetEntityContainer("CodeFirstDatabase", DataSpace.SSpace);
11
12 MapDb();
13 }
14 }
通过读取 CodeFirstEntityContainer 中的元数据,我们可以获取到指定数据库中的所有表的信息。
1 private void MapDb()
2 {
3 ExtractTableColumnEdmMembers();
4
5 List<EntityType> tables =
6 _metadataWorkspace
7 .GetItems(DataSpace.OCSpace)
8 .Select(x => x.GetPrivateFieldValue("EdmItem") as EntityType)
9 .Where(x => x != null)
10 .ToList();
11
12 foreach (var table in tables)
13 {
14 MapTable(table);
15 }
16 }
进而,根据表映射类型的定义,可以获取到表中字段的映射信息。
1 private void MapTable(EntityType tableEdmType)
2 {
3 string identity = tableEdmType.FullName;
4 EdmType baseEdmType = tableEdmType;
5 EntitySet storageEntitySet = null;
6
7 while (!_codeFirstEntityContainer.TryGetEntitySetByName(baseEdmType.Name, false, out storageEntitySet))
8 {
9 if (baseEdmType.BaseType == null) break;
10 baseEdmType = baseEdmType.BaseType;
11 }
12 if (storageEntitySet == null) return;
13
14 var tableName = (string)storageEntitySet.MetadataProperties["Table"].Value;
15 var schemaName = (string)storageEntitySet.MetadataProperties["Schema"].Value;
16
17 var tableMapping = new TableMapping(identity, schemaName, tableName);
18 _tableMappings.Add(identity, tableMapping);
19 _primaryKeysMapping.Add(identity, storageEntitySet.ElementType.KeyMembers.Select(x => x.Name).ToList());
20
21 foreach (var prop in storageEntitySet.ElementType.Properties)
22 {
23 MapColumn(identity, _tableMappings[identity], prop);
24 }
25 }
然后,可以将表信息和字段信息存放到 TableMapping 和 ColumnMapping 当中。
internal class TableMapping
{
public string TableTypeFullName { get; private set; }
public string SchemaName { get; private set; }
public string TableName { get; private set; } public ColumnMapping[] Columns
{
get { return _columnMappings.Values.ToArray(); }
}
}
构建 DataTable 数据
终于,有了 TableMapping 映射之后,我们可以开始创建 DataTable 了。
1 private static DataTable BuildDataTable<T>(TableMapping tableMapping)
2 {
3 var entityType = typeof(T);
4 string tableName = string.Join(@".", tableMapping.SchemaName, tableMapping.TableName);
5
6 var dataTable = new DataTable(tableName);
7 var primaryKeys = new List<DataColumn>();
8
9 foreach (var columnMapping in tableMapping.Columns)
10 {
11 var propertyInfo = entityType.GetProperty(columnMapping.PropertyName, '.');
12 columnMapping.Type = propertyInfo.PropertyType;
13
14 var dataColumn = new DataColumn(columnMapping.ColumnName);
15
16 Type dataType;
17 if (propertyInfo.PropertyType.IsNullable(out dataType))
18 {
19 dataColumn.DataType = dataType;
20 dataColumn.AllowDBNull = true;
21 }
22 else
23 {
24 dataColumn.DataType = propertyInfo.PropertyType;
25 dataColumn.AllowDBNull = columnMapping.Nullable;
26 }
27
28 if (columnMapping.IsIdentity)
29 {
30 dataColumn.Unique = true;
31 if (propertyInfo.PropertyType == typeof(int)
32 || propertyInfo.PropertyType == typeof(long))
33 {
34 dataColumn.AutoIncrement = true;
35 }
36 else continue;
37 }
38 else
39 {
40 dataColumn.DefaultValue = columnMapping.DefaultValue;
41 }
42
43 if (propertyInfo.PropertyType == typeof(string))
44 {
45 dataColumn.MaxLength = columnMapping.MaxLength;
46 }
47
48 if (columnMapping.IsPk)
49 {
50 primaryKeys.Add(dataColumn);
51 }
52
53 dataTable.Columns.Add(dataColumn);
54 }
55
56 dataTable.PrimaryKey = primaryKeys.ToArray();
57
58 return dataTable;
59 }
通过 Schema 名称和表名称来构建指定 Entity 类型的 DataTable 对象。
然后将,entities 数据列表中的数据导入到 DataTable 对象之中。
1 private static DataTable CreateDataTable<T>(TableMapping tableMapping, IEnumerable<T> entities)
2 {
3 var dataTable = BuildDataTable<T>(tableMapping);
4
5 foreach (var entity in entities)
6 {
7 DataRow row = dataTable.NewRow();
8
9 foreach (var columnMapping in tableMapping.Columns)
10 {
11 var @value = entity.GetPropertyValue(columnMapping.PropertyName);
12
13 if (columnMapping.IsIdentity) continue;
14
15 if (@value == null)
16 {
17 row[columnMapping.ColumnName] = DBNull.Value;
18 }
19 else
20 {
21 row[columnMapping.ColumnName] = @value;
22 }
23 }
24
25 dataTable.Rows.Add(row);
26 }
27
28 return dataTable;
29 }
SqlBulkCopy 导入数据
终于,数据源准备好了。然后使用如下代码结构,调用 WriteToServer 方法,将数据写入数据库。
1 using (DataTable dataTable = CreateDataTable(tableMapping, entities))
2 {
3 using (SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(transaction.Connection, options, transaction))
4 {
5 sqlBulkCopy.BatchSize = batchSize;
6 sqlBulkCopy.DestinationTableName = dataTable.TableName;
7 sqlBulkCopy.WriteToServer(dataTable);
8 }
9 }
看下保存 500 数据的效果,用时 1.9 秒。
看下保存 1W 数据的效果,用时 2.1 秒。
看下保存 10W 数据的效果,用时 7.5 秒。
再试下 100W 数据的效果,用时 27 秒。
封装 BulkInsert 扩展方法
我们可以为 DbContext 添加一个 BulkInsert 扩展方法。
1 internal static class DbContextBulkOperationExtensions
2 {
3 public const int DefaultBatchSize = 1000;
4
5 public static void BulkInsert<T>(this DbContext context, IEnumerable<T> entities, int batchSize = DefaultBatchSize)
6 {
7 var provider = new BulkOperationProvider(context);
8 provider.Insert(entities, batchSize);
9 }
10 }
IBulkableRepository 接口
在下面两篇文章中,我介绍了精炼的 IRepository 接口。
EntityFramework用法探索(四)Repository和UnitOfWork
EntityFramework中使用Repository装饰器
1 public interface IRepository<T>
2 where T : class
3 {
4 IQueryable<T> Query();
5 void Insert(T entity);
6 void Update(T entity);
7 void Delete(T entity);
8 }
当我们需要扩展 BulkInsert 功能时,可以通过继承来完成功能扩展。
1 public interface IBulkableRepository<T> : IRepository<T>
2 where T : class
3 {
4 void BulkInsert(IEnumberable<T> entities);
5 }
这样,我们就可以使用很自然的方式直接使用 BulkInsert 功能了。
代码在哪里
代码在这里:
在 GitHub 上 右下点了推荐再下载也来的及啊~~亲
参考资料
SqlBulkCopy
Table-Valued Parameters
EntityFramework.BulkInsert
Using SQL bulk copy with your LINQ-to-SQL datacontext