c#实现几种数据库的大数据批量插入
在之前只知道SqlServer支持数据批量插入,殊不知道Oracle、SQLite和MySQL也是支持的,不过Oracle需要使用Orace.DataAccess驱动,今天就贴出几种数据库的批量插入解决方法。
首先说一下,IProvider里有一个用于实现批量插入的插件服务接口IBatcherProvider,此接口在前一篇文章中已经提到过了。
///<summary>
///提供数据批量处理的方法。
///</summary>
publicinterfaceIBatcherProvider:IProviderService
{
///<summary>
///将<seecref="DataTable"/>的数据批量插入到数据库中。
///</summary>
///<paramname="dataTable">要批量插入的<seecref="DataTable"/>。</param>
///<paramname="batchSize">每批次写入的数据量。</param>
voidInsert(DataTabledataTable,intbatchSize=10000);
}
一、SqlServer数据批量插入
SqlServer的批量插入很简单,使用SqlBulkCopy就可以,以下是该类的实现:
///<summary>
///为System.Data.SqlClient提供的用于批量操作的方法。
///</summary>
publicsealedclassMsSqlBatcher:IBatcherProvider
{
///<summary>
///获取或设置提供者服务的上下文。
///</summary>
publicServiceContextServiceContext{get;set;}
///<summary>
///将<seecref="DataTable"/>的数据批量插入到数据库中。
///</summary>
///<paramname="dataTable">要批量插入的<seecref="DataTable"/>。</param>
///<paramname="batchSize">每批次写入的数据量。</param>
publicvoidInsert(DataTabledataTable,intbatchSize=10000)
{
Checker.ArgumentNull(dataTable,"dataTable");
if(dataTable.Rows.Count==0)
{
return;
}
using(varconnection=(SqlConnection)ServiceContext.Database.CreateConnection())
{
try
{
connection.TryOpen();
//给表名加上前后导符
vartableName=DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(),dataTable.TableName);
using(varbulk=newSqlBulkCopy(connection,SqlBulkCopyOptions.KeepIdentity,null)
{
DestinationTableName=tableName,
BatchSize=batchSize
})
{
//循环所有列,为bulk添加映射
dataTable.EachColumn(c=>bulk.ColumnMappings.Add(c.ColumnName,c.ColumnName),c=>!c.AutoIncrement);
bulk.WriteToServer(dataTable);
bulk.Close();
}
}
catch(Exceptionexp)
{
thrownewBatcherException(exp);
}
finally
{
connection.TryClose();
}
}
}
}
以上没有使用事务,使用事务在性能上会有一定的影响,如果要使用事务,可以设置SqlBulkCopyOptions.UseInternalTransaction。
二、Oracle数据批量插入
System.Data.OracleClient不支持批量插入,因此只能使用Oracle.DataAccess组件来作为提供者。
///<summary>
///Oracle.Data.Access组件提供的用于批量操作的方法。
///</summary>
publicsealedclassOracleAccessBatcher:IBatcherProvider
{
///<summary>
///获取或设置提供者服务的上下文。
///</summary>
publicServiceContextServiceContext{get;set;}
///<summary>
///将<seecref="DataTable"/>的数据批量插入到数据库中。
///</summary>
///<paramname="dataTable">要批量插入的<seecref="DataTable"/>。</param>
///<paramname="batchSize">每批次写入的数据量。</param>
publicvoidInsert(DataTabledataTable,intbatchSize=10000)
{
Checker.ArgumentNull(dataTable,"dataTable");
if(dataTable.Rows.Count==0)
{
return;
}
using(varconnection=ServiceContext.Database.CreateConnection())
{
try
{
connection.TryOpen();
using(varcommand=ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())
{
if(command==null)
{
thrownewBatcherException(newArgumentException("command"));
}
command.Connection=connection;
command.CommandText=GenerateInserSql(ServiceContext.Database,command,dataTable);
command.ExecuteNonQuery();
}
}
catch(Exceptionexp)
{
thrownewBatcherException(exp);
}
finally
{
connection.TryClose();
}
}
}
///<summary>
///生成插入数据的sql语句。
///</summary>
///<paramname="database"></param>
///<paramname="command"></param>
///<paramname="table"></param>
///<returns></returns>
privatestringGenerateInserSql(IDatabasedatabase,DbCommandcommand,DataTabletable)
{
varnames=newStringBuilder();
varvalues=newStringBuilder();
//将一个DataTable的数据转换为数组的数组
vardata=table.ToArray();
//设置ArrayBindCount属性
command.GetType().GetProperty("ArrayBindCount").SetValue(command,table.Rows.Count,null);
varsyntax=database.Provider.GetService<ISyntaxProvider>();
for(vari=0;i<table.Columns.Count;i++)
{
varcolumn=table.Columns[i];
varparameter=database.Provider.DbProviderFactory.CreateParameter();
if(parameter==null)
{
continue;
}
parameter.ParameterName=column.ColumnName;
parameter.Direction=ParameterDirection.Input;
parameter.DbType=column.DataType.GetDbType();
parameter.Value=data[i];
if(names.Length>0)
{
names.Append(",");
values.Append(",");
}
names.AppendFormat("{0}",DbUtility.FormatByQuote(syntax,column.ColumnName));
values.AppendFormat("{0}{1}",syntax.ParameterPrefix,column.ColumnName);
command.Parameters.Add(parameter);
}
returnstring.Format("INSERTINTO{0}({1})VALUES({2})",DbUtility.FormatByQuote(syntax,table.TableName),names,values);
}
}
以上最重要的一步,就是将DataTable转为数组的数组表示,即object[][],前数组的上标是列的个数,后数组是行的个数,因此循环Columns将后数组作为Parameter的值,也就是说,参数的值是一个数组。而insert语句与一般的插入语句没有什么不一样。
三、SQLite数据批量插入
SQLite的批量插入只需开启事务就可以了,这个具体的原理不得而知。
publicsealedclassSQLiteBatcher:IBatcherProvider
{
///<summary>
///获取或设置提供者服务的上下文。
///</summary>
publicServiceContextServiceContext{get;set;}
///<summary>
///将<seecref="DataTable"/>的数据批量插入到数据库中。
///</summary>
///<paramname="dataTable">要批量插入的<seecref="DataTable"/>。</param>
///<paramname="batchSize">每批次写入的数据量。</param>
publicvoidInsert(DataTabledataTable,intbatchSize=10000)
{
Checker.ArgumentNull(dataTable,"dataTable");
if(dataTable.Rows.Count==0)
{
return;
}
using(varconnection=ServiceContext.Database.CreateConnection())
{
DbTransactiontranscation=null;
try
{
connection.TryOpen();
transcation=connection.BeginTransaction();
using(varcommand=ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())
{
if(command==null)
{
thrownewBatcherException(newArgumentException("command"));
}
command.Connection=connection;
command.CommandText=GenerateInserSql(ServiceContext.Database,dataTable);
if(command.CommandText==string.Empty)
{
return;
}
varflag=newAssertFlag();
dataTable.EachRow(row=>
{
varfirst=flag.AssertTrue();
ProcessCommandParameters(dataTable,command,row,first);
command.ExecuteNonQuery();
});
}
transcation.Commit();
}
catch(Exceptionexp)
{
if(transcation!=null)
{
transcation.Rollback();
}
thrownewBatcherException(exp);
}
finally
{
connection.TryClose();
}
}
}
privatevoidProcessCommandParameters(DataTabledataTable,DbCommandcommand,DataRowrow,boolfirst)
{
for(varc=0;c<dataTable.Columns.Count;c++)
{
DbParameterparameter;
//首次创建参数,是为了使用缓存
if(first)
{
parameter=ServiceContext.Database.Provider.DbProviderFactory.CreateParameter();
parameter.ParameterName=dataTable.Columns[c].ColumnName;
command.Parameters.Add(parameter);
}
else
{
parameter=command.Parameters[c];
}
parameter.Value=row[c];
}
}
///<summary>
///生成插入数据的sql语句。
///</summary>
///<paramname="database"></param>
///<paramname="table"></param>
///<returns></returns>
privatestringGenerateInserSql(IDatabasedatabase,DataTabletable)
{
varsyntax=database.Provider.GetService<ISyntaxProvider>();
varnames=newStringBuilder();
varvalues=newStringBuilder();
varflag=newAssertFlag();
table.EachColumn(column=>
{
if(!flag.AssertTrue())
{
names.Append(",");
values.Append(",");
}
names.Append(DbUtility.FormatByQuote(syntax,column.ColumnName));
values.AppendFormat("{0}{1}",syntax.ParameterPrefix,column.ColumnName);
});
returnstring.Format("INSERTINTO{0}({1})VALUES({2})",DbUtility.FormatByQuote(syntax,table.TableName),names,values);
}
}
四、MySql数据批量插入
///<summary>
///为MySql.Data组件提供的用于批量操作的方法。
///</summary>
publicsealedclassMySqlBatcher:IBatcherProvider
{
///<summary>
///获取或设置提供者服务的上下文。
///</summary>
publicServiceContextServiceContext{get;set;}
///<summary>
///将<seecref="DataTable"/>的数据批量插入到数据库中。
///</summary>
///<paramname="dataTable">要批量插入的<seecref="DataTable"/>。</param>
///<paramname="batchSize">每批次写入的数据量。</param>
publicvoidInsert(DataTabledataTable,intbatchSize=10000)
{
Checker.ArgumentNull(dataTable,"dataTable");
if(dataTable.Rows.Count==0)
{
return;
}
using(varconnection=ServiceContext.Database.CreateConnection())
{
try
{
connection.TryOpen();
using(varcommand=ServiceContext.Database.Provider.DbProviderFactory.CreateCommand())
{
if(command==null)
{
thrownewBatcherException(newArgumentException("command"));
}
command.Connection=connection;
command.CommandText=GenerateInserSql(ServiceContext.Database,command,dataTable);
if(command.CommandText==string.Empty)
{
return;
}
command.ExecuteNonQuery();
}
}
catch(Exceptionexp)
{
thrownewBatcherException(exp);
}
finally
{
connection.TryClose();
}
}
}
///<summary>
///生成插入数据的sql语句。
///</summary>
///<paramname="database"></param>
///<paramname="command"></param>
///<paramname="table"></param>
///<returns></returns>
privatestringGenerateInserSql(IDatabasedatabase,DbCommandcommand,DataTabletable)
{
varnames=newStringBuilder();
varvalues=newStringBuilder();
vartypes=newList<DbType>();
varcount=table.Columns.Count;
varsyntax=database.Provider.GetService<ISyntaxProvider>();
table.EachColumn(c=>
{
if(names.Length>0)
{
names.Append(",");
}
names.AppendFormat("{0}",DbUtility.FormatByQuote(syntax,c.ColumnName));
types.Add(c.DataType.GetDbType());
});
vari=0;
foreach(DataRowrowintable.Rows)
{
if(i>0)
{
values.Append(",");
}
values.Append("(");
for(varj=0;j<count;j++)
{
if(j>0)
{
values.Append(",");
}
varisStrType=IsStringType(types[j]);
varparameter=CreateParameter(database.Provider,isStrType,types[j],row[j],syntax.ParameterPrefix,i,j);
if(parameter!=null)
{
values.Append(parameter.ParameterName);
command.Parameters.Add(parameter);
}
elseif(isStrType)
{
values.AppendFormat("'{0}'",row[j]);
}
else
{
values.Append(row[j]);
}
}
values.Append(")");
i++;
}
returnstring.Format("INSERTINTO{0}({1})VALUES{2}",DbUtility.FormatByQuote(syntax,table.TableName),names,values);
}
///<summary>
///判断是否为字符串类别。
///</summary>
///<paramname="dbType"></param>
///<returns></returns>
privateboolIsStringType(DbTypedbType)
{
returndbType==DbType.AnsiString||dbType==DbType.AnsiStringFixedLength||dbType==DbType.String||dbType==DbType.StringFixedLength;
}
///<summary>
///创建参数。
///</summary>
///<paramname="provider"></param>
///<paramname="isStrType"></param>
///<paramname="dbType"></param>
///<paramname="value"></param>
///<paramname="parPrefix"></param>
///<paramname="row"></param>
///<paramname="col"></param>
///<returns></returns>
privateDbParameterCreateParameter(IProviderprovider,boolisStrType,DbTypedbType,objectvalue,charparPrefix,introw,intcol)
{
//如果生成全部的参数,则速度会很慢,因此,只有数据类型为字符串(包含'号)和日期型时才添加参数
if((isStrType&&value.ToString().IndexOf('\'')!=-1)||dbType==DbType.DateTime)
{
varname=string.Format("{0}p_{1}_{2}",parPrefix,row,col);
varparameter=provider.DbProviderFactory.CreateParameter();
parameter.ParameterName=name;
parameter.Direction=ParameterDirection.Input;
parameter.DbType=dbType;
parameter.Value=value;
returnparameter;
}
returnnull;
}
}
MySql的批量插入,是将值全部写在语句的values里,例如,insertbatcher(id,name)values(1,'1',2,'2',3,'3',........10,'10')。
五、测试
接下来写一个测试用例来看一下使用批量插入的效果。
publicvoidTestBatchInsert()
{
Console.WriteLine(TimeWatcher.Watch(()=>
InvokeTest(database=>
{
vartable=newDataTable("Batcher");
table.Columns.Add("Id",typeof(int));
table.Columns.Add("Name1",typeof(string));
table.Columns.Add("Name2",typeof(string));
table.Columns.Add("Name3",typeof(string));
table.Columns.Add("Name4",typeof(string));
//构造100000条数据
for(vari=0;i<100000;i++)
{
table.Rows.Add(i,i.ToString(),i.ToString(),i.ToString(),i.ToString());
}
//获取IBatcherProvider
varbatcher=database.Provider.GetService<IBatcherProvider>();
if(batcher==null)
{
Console.WriteLine("不支持批量插入。");
}
else
{
batcher.Insert(table);
}
//输出batcher表的数据量
varsql=newSqlCommand("SELECTCOUNT(1)FROMBatcher");
Console.WriteLine("当前共有{0}条数据",database.ExecuteScalar(sql));
})));
}
以下表中列出了四种数据库生成10万条数据各耗用的时间
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。