c#几种数据库的大数据批量插入(SqlServer、Oracle、SQLite和MySql)
在之前只知道SqlServer支持数据批量插入,殊不知道Oracle、SQLite和MySql也是支持的,不过Oracle需要使用Orace.DataAccess驱动,今天就贴出几种数据库的批量插入解决方法。
首先说一下,IProvider里有一个用于实现批量插入的插件服务接口IBatcherProvider,此接口在前一篇文章中已经提到过了。
///<summary> ///提供数据批量处理的方法。 ///</summary> publicinterfaceIBatcherProvider:IProviderService { ///<summary> ///将<seecref="DataTable"/>的数据批量插入到数据库中。 ///</summary> ///<paramname="dataTable">要批量插入的<seecref="DataTable"/>。</param> ///<paramname="batchSize">每批次写入的数据量。</param> voidInsert(DataTabledataTable,intbatchSize=10000); }
一、SqlServer数据批量插入
SqlServer的批量插入很简单,使用SqlBulkCopy就可以,以下是该类的实现:
///<summary> ///为System.Data.SqlClient提供的用于批量操作的方法。 ///</summary> publicsealedclassMsSqlBatcher:IBatcherProvider { ///<summary> ///获取或设置提供者服务的上下文。 ///</summary> publicServiceContextServiceContext{get;set;} ///<summary> ///将<seecref="DataTable"/>的数据批量插入到数据库中。 ///</summary> ///<paramname="dataTable">要批量插入的<seecref="DataTable"/>。</param> ///<paramname="batchSize">每批次写入的数据量。</param> publicvoidInsert(DataTabledataTable,intbatchSize=10000) { Checker.ArgumentNull(dataTable,"dataTable"); if(dataTable.Rows.Count==0) { return; } using(varconnection=(SqlConnection)ServiceContext.Database.CreateConnection()) { try { connection.TryOpen(); //给表名加上前后导符 vartableName=DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(),dataTable.TableName); using(varbulk=newSqlBulkCopy(connection,SqlBulkCopyOptions.KeepIdentity,null) { DestinationTableName=tableName, BatchSize=batchSize }) { //循环所有列,为bulk添加映射 dataTable.EachColumn(c=>bulk.ColumnMappings.Add(c.ColumnName,c.ColumnName),c=>!c.AutoIncrement); bulk.WriteToServer(dataTable); bulk.Close(); } } catch(Exceptionexp) { thrownewBatcherException(exp); } finally { connection.TryClose(); } } } }
以上没有使用事务,使用事务在性能上会有一定的影响,如果要使用事务,可以设置SqlBulkCopyOptions.UseInternalTransaction。
二、Oracle数据批量插入
System.Data.OracleClient不支持批量插入,因此只能使用Oracle.DataAccess组件来作为提供者。
///<summary> ///Oracle.Data.Access组件提供的用于批量操作的方法。 ///</summary> publicsealedclassOracleAccessBatcher:IBatcherProvider { ///<summary> ///获取或设置提供者服务的上下文。 ///</summary> publicServiceContextServiceContext{get;set;} ///<summary> ///将<seecref="DataTable"/>的数据批量插入到数据库中。 ///</summary> ///<paramname="dataTable">要批量插入的<seecref="DataTable"/>。</param> ///<paramname="batchSize">每批次写入的数据量。</param> publicvoidInsert(DataTabledataTable,intbatchSize=10000) { Checker.ArgumentNull(dataTable,"dataTable"); if(dataTable.Rows.Count==0) { return; } using(varconnection=ServiceContext.Database.CreateConnection()) { try { connection.TryOpen(); using(varcommand=ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) { if(command==null) { thrownewBatcherException(newArgumentException("command")); } command.Connection=connection; command.CommandText=GenerateInserSql(ServiceContext.Database,command,dataTable); command.ExecuteNonQuery(); } } catch(Exceptionexp) { thrownewBatcherException(exp); } finally { connection.TryClose(); } } } ///<summary> ///生成插入数据的sql语句。 ///</summary> ///<paramname="database"></param> ///<paramname="command"></param> ///<paramname="table"></param> ///<returns></returns> privatestringGenerateInserSql(IDatabasedatabase,DbCommandcommand,DataTabletable) { varnames=newStringBuilder(); varvalues=newStringBuilder(); //将一个DataTable的数据转换为数组的数组 vardata=table.ToArray(); //设置ArrayBindCount属性 command.GetType().GetProperty("ArrayBindCount").SetValue(command,table.Rows.Count,null); varsyntax=database.Provider.GetService<ISyntaxProvider>(); for(vari=0;i<table.Columns.Count;i++) { varcolumn=table.Columns[i]; varparameter=database.Provider.DbProviderFactory.CreateParameter(); if(parameter==null) { continue; } parameter.ParameterName=column.ColumnName; parameter.Direction=ParameterDirection.Input; parameter.DbType=column.DataType.GetDbType(); parameter.Value=data[i]; if(names.Length>0) { names.Append(","); values.Append(","); } names.AppendFormat("{0}",DbUtility.FormatByQuote(syntax,column.ColumnName)); values.AppendFormat("{0}{1}",syntax.ParameterPrefix,column.ColumnName); command.Parameters.Add(parameter); } returnstring.Format("INSERTINTO{0}({1})VALUES({2})",DbUtility.FormatByQuote(syntax,table.TableName),names,values); } }
以上最重要的一步,就是将DataTable转为数组的数组表示,即object[][],前数组的上标是列的个数,后数组是行的个数,因此循环Columns将后数组作为Parameter的值,也就是说,参数的值是一个数组。而insert语句与一般的插入语句没有什么不一样。
三、SQLite数据批量插入
SQLite的批量插入只需开启事务就可以了,这个具体的原理不得而知。
publicsealedclassSQLiteBatcher:IBatcherProvider { ///<summary> ///获取或设置提供者服务的上下文。 ///</summary> publicServiceContextServiceContext{get;set;} ///<summary> ///将<seecref="DataTable"/>的数据批量插入到数据库中。 ///</summary> ///<paramname="dataTable">要批量插入的<seecref="DataTable"/>。</param> ///<paramname="batchSize">每批次写入的数据量。</param> publicvoidInsert(DataTabledataTable,intbatchSize=10000) { Checker.ArgumentNull(dataTable,"dataTable"); if(dataTable.Rows.Count==0) { return; } using(varconnection=ServiceContext.Database.CreateConnection()) { DbTransactiontranscation=null; try { connection.TryOpen(); transcation=connection.BeginTransaction(); using(varcommand=ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) { if(command==null) { thrownewBatcherException(newArgumentException("command")); } command.Connection=connection; command.CommandText=GenerateInserSql(ServiceContext.Database,dataTable); if(command.CommandText==string.Empty) { return; } varflag=newAssertFlag(); dataTable.EachRow(row=> { varfirst=flag.AssertTrue(); ProcessCommandParameters(dataTable,command,row,first); command.ExecuteNonQuery(); }); } transcation.Commit(); } catch(Exceptionexp) { if(transcation!=null) { transcation.Rollback(); } thrownewBatcherException(exp); } finally { connection.TryClose(); } } } privatevoidProcessCommandParameters(DataTabledataTable,DbCommandcommand,DataRowrow,boolfirst) { for(varc=0;c<dataTable.Columns.Count;c++) { DbParameterparameter; //首次创建参数,是为了使用缓存 if(first) { parameter=ServiceContext.Database.Provider.DbProviderFactory.CreateParameter(); parameter.ParameterName=dataTable.Columns[c].ColumnName; command.Parameters.Add(parameter); } else { parameter=command.Parameters[c]; } parameter.Value=row[c]; } } ///<summary> ///生成插入数据的sql语句。 ///</summary> ///<paramname="database"></param> ///<paramname="table"></param> ///<returns></returns> privatestringGenerateInserSql(IDatabasedatabase,DataTabletable) { varsyntax=database.Provider.GetService<ISyntaxProvider>(); varnames=newStringBuilder(); varvalues=newStringBuilder(); varflag=newAssertFlag(); table.EachColumn(column=> { if(!flag.AssertTrue()) { names.Append(","); values.Append(","); } names.Append(DbUtility.FormatByQuote(syntax,column.ColumnName)); values.AppendFormat("{0}{1}",syntax.ParameterPrefix,column.ColumnName); }); returnstring.Format("INSERTINTO{0}({1})VALUES({2})",DbUtility.FormatByQuote(syntax,table.TableName),names,values); } }
四、MySql数据批量插入
///<summary> ///为MySql.Data组件提供的用于批量操作的方法。 ///</summary> publicsealedclassMySqlBatcher:IBatcherProvider { ///<summary> ///获取或设置提供者服务的上下文。 ///</summary> publicServiceContextServiceContext{get;set;} ///<summary> ///将<seecref="DataTable"/>的数据批量插入到数据库中。 ///</summary> ///<paramname="dataTable">要批量插入的<seecref="DataTable"/>。</param> ///<paramname="batchSize">每批次写入的数据量。</param> publicvoidInsert(DataTabledataTable,intbatchSize=10000) { Checker.ArgumentNull(dataTable,"dataTable"); if(dataTable.Rows.Count==0) { return; } using(varconnection=ServiceContext.Database.CreateConnection()) { try { connection.TryOpen(); using(varcommand=ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) { if(command==null) { thrownewBatcherException(newArgumentException("command")); } command.Connection=connection; command.CommandText=GenerateInserSql(ServiceContext.Database,command,dataTable); if(command.CommandText==string.Empty) { return; } command.ExecuteNonQuery(); } } catch(Exceptionexp) { thrownewBatcherException(exp); } finally { connection.TryClose(); } } } ///<summary> ///生成插入数据的sql语句。 ///</summary> ///<paramname="database"></param> ///<paramname="command"></param> ///<paramname="table"></param> ///<returns></returns> privatestringGenerateInserSql(IDatabasedatabase,DbCommandcommand,DataTabletable) { varnames=newStringBuilder(); varvalues=newStringBuilder(); vartypes=newList<DbType>(); varcount=table.Columns.Count; varsyntax=database.Provider.GetService<ISyntaxProvider>(); table.EachColumn(c=> { if(names.Length>0) { names.Append(","); } names.AppendFormat("{0}",DbUtility.FormatByQuote(syntax,c.ColumnName)); types.Add(c.DataType.GetDbType()); }); vari=0; foreach(DataRowrowintable.Rows) { if(i>0) { values.Append(","); } values.Append("("); for(varj=0;j<count;j++) { if(j>0) { values.Append(","); } varisStrType=IsStringType(types[j]); varparameter=CreateParameter(database.Provider,isStrType,types[j],row[j],syntax.ParameterPrefix,i,j); if(parameter!=null) { values.Append(parameter.ParameterName); command.Parameters.Add(parameter); } elseif(isStrType) { values.AppendFormat("'{0}'",row[j]); } else { values.Append(row[j]); } } values.Append(")"); i++; } returnstring.Format("INSERTINTO{0}({1})VALUES{2}",DbUtility.FormatByQuote(syntax,table.TableName),names,values); } ///<summary> ///判断是否为字符串类别。 ///</summary> ///<paramname="dbType"></param> ///<returns></returns> privateboolIsStringType(DbTypedbType) { returndbType==DbType.AnsiString||dbType==DbType.AnsiStringFixedLength||dbType==DbType.String||dbType==DbType.StringFixedLength; } ///<summary> ///创建参数。 ///</summary> ///<paramname="provider"></param> ///<paramname="isStrType"></param> ///<paramname="dbType"></param> ///<paramname="value"></param> ///<paramname="parPrefix"></param> ///<paramname="row"></param> ///<paramname="col"></param> ///<returns></returns> privateDbParameterCreateParameter(IProviderprovider,boolisStrType,DbTypedbType,objectvalue,charparPrefix,introw,intcol) { //如果生成全部的参数,则速度会很慢,因此,只有数据类型为字符串(包含'号)和日期型时才添加参数 if((isStrType&&value.ToString().IndexOf('\'')!=-1)||dbType==DbType.DateTime) { varname=string.Format("{0}p_{1}_{2}",parPrefix,row,col); varparameter=provider.DbProviderFactory.CreateParameter(); parameter.ParameterName=name; parameter.Direction=ParameterDirection.Input; parameter.DbType=dbType; parameter.Value=value; returnparameter; } returnnull; } }
MySql的批量插入,是将值全部写在语句的values里,例如,insertbatcher(id,name)values(1,'1',2,'2',3,'3',........10,'10')。
五、测试
接下来写一个测试用例来看一下使用批量插入的效果。
publicvoidTestBatchInsert() { Console.WriteLine(TimeWatcher.Watch(()=> InvokeTest(database=> { vartable=newDataTable("Batcher"); table.Columns.Add("Id",typeof(int)); table.Columns.Add("Name1",typeof(string)); table.Columns.Add("Name2",typeof(string)); table.Columns.Add("Name3",typeof(string)); table.Columns.Add("Name4",typeof(string)); //构造100000条数据 for(vari=0;i<100000;i++) { table.Rows.Add(i,i.ToString(),i.ToString(),i.ToString(),i.ToString()); } //获取IBatcherProvider varbatcher=database.Provider.GetService<IBatcherProvider>(); if(batcher==null) { Console.WriteLine("不支持批量插入。"); } else { batcher.Insert(table); } //输出batcher表的数据量 varsql=newSqlCommand("SELECTCOUNT(1)FROMBatcher"); Console.WriteLine("当前共有{0}条数据",database.ExecuteScalar(sql)); }))); }
以下表中列出了四种数据库生成10万条数据各耗用的时间
MsSql
00:00:02.9376300
Oracle
00:00:01.5155959
SQLite
00:00:01.6275634
MySql
00:00:05.4166891