C#如何读取Txt大数据并更新到数据库详解
环境
- Sqlserver2016
- .net4.5.2
目前测试数据1300万大约3-4分钟.(限制一次读取条数和线程数是要节省服务器资源,如果调太大服务器其它应用可能就跑不了了),SqlServerDBHelper为数据库帮助类.没有什么特别的处理.配置连接串时记录把连接池开起来
另外.以下代码中每次写都创建了连接.之前试过一个连接反复用.130次大约有20多次数据库会出问题.并且需要的时间是7-8分钟左右.
配置文件:xxx.json
[{
/*连接字符串*/
"ConnStr":"",
"FilePath":"读取的文件地址",
/*数据库表名称*/
"TableName":"写入的数据库表名",
/*导入前执行的语句*/
"ExecBeforeSql":"",
/*导入后执行的语句*/
"ExecAfterSql":"",
/*映射关系*/
"Mapping":[
{
"DBName":"XXX",
"TxtName":"DDD"
}
],
/*过滤数据的正则当前只实现了小数据一次性读完的检查*/
"FilterRegex":[],
/*检查数据合法性(从数据库获取字段属性进行验证)*/
"CheckData":false,
/*列分隔符*/
"Separator":"\t",
/*表头的行数*/
"HeaderRowsNum":1
}
]
读取代码:注意ConfigurationManager.AppSettings["frpage"]和ConfigurationManager.AppSettings["fr"]需要自己配置好
//读取配置文件信息 Listdt=JsonConvert.DeserializeObject >(File.ReadAllText(Path.Combine(AppDomain.CurrentDomain.BaseDirectory,"config\\ImportTxt.json"))); LogUtil.Info("开始读取txt数据,读取配置:"+dt.Count+"条"); if(dt.Count==0) { return; } List
li=newList (); foreach(dynamicrowindt) { LogUtil.Info("开始处理数据:"+JsonConvert.SerializeObject(row)); li.Add(ProcessRow(row)); } Task.WaitAll(li.ToArray()); LogUtil.Info("数据读取完毕");
publicasyncTaskProcessRow(dynamicrow)
{
awaitTask.Run(()=>
{
AutoResetEventAE=newAutoResetEvent(false);
DataTableData=null;
stringerror="",ConnStr,TableName,ExecBeforeSql,ExecAfterSql;
BooleanIsCheck=Convert.ToBoolean(row["CheckData"]);
TableName=Convert.ToString(row.TableName);
ConnStr=Convert.ToString(row.ConnStr);
ExecBeforeSql=Convert.ToString(row.ExecBeforeSql);
ExecAfterSql=Convert.ToString(row.ExecAfterSql);
intHeaderRowsNum=Convert.ToInt32(row.HeaderRowsNum);
stringSeparator=Convert.ToString(row.Separator);
Dictionarydic=newDictionary();
//文件达到多大时就分行读取
intfr=0;
if(!int.TryParse(ConfigurationManager.AppSettings["fr"],outfr))
{
fr=100;
}
fr=fr*1024*1024;
//分行读取一次读取多少
intpage=0;
if(!int.TryParse(ConfigurationManager.AppSettings["frpage"],outpage))
{
page=50000;
}
foreach(vardyninrow.Mapping)
{
dic.Add(Convert.ToString(dyn.TxtName),Convert.ToString(dyn.DBName));
}
Listregex=newList();
foreach(stringiteminrow["FilterRegex"])
{
regex.Add(item);
}
stringfpath="",cpath="";
cpath=Convert.ToString(row["FilePath"]);
stringrootPath=Path.Combine(AppDomain.CurrentDomain.BaseDirectory,"tmp");
if(!Directory.Exists(rootPath))
{
Directory.CreateDirectory(rootPath);
}
fpath=Path.Combine(rootPath,Path.GetFileName(cpath));
File.Copy(cpath,fpath,true);
LogUtil.Info("拷文件到本地已经完成.从本地读取数据操作");
intthreadCount=Environment.ProcessorCount*3;
FileInfofi=newFileInfo(fpath);
//如果文件大于100M就需要分批读取.一次50万条
if(fi.Length>fr)
{
longsumCount=0;
StreamReadersr=newStreamReader(fi.OpenRead());
intheadRow=0;
stringrowstr="";
Listli_th=newList();
boollast=false;
intij=0;
LogUtil.Info("生成StreamReader成功");
#region逐行读取
while(sr.Peek()>-1)
{
rowstr=sr.ReadLine();
#region将行数据写入DataTable
if(headRow1)
{
for(inti=1;i-1;i++)
{
rowstr+=""+sr.ReadLine();
}
}
Data.Rows.Add(rowstr.Split(newstring[]{Separator},StringSplitOptions.RemoveEmptyEntries));
if(Data.Rows.Count-1)
{
continue;
}
}
last=(sr.Peek()==-1);
#endregion
sumCount+=Data.Rows.Count;
ProcessPath(Data,page,sr,refij,TableName,ExecBeforeSql,ExecAfterSql,dic,IsCheck,li_th);
#region检查线程等待
if((ij>0&&(ij%threadCount)==0)||last)
{
LogUtil.Info("完成一批次当前共写数据:"+sumCount);
while(true)
{
boolisok=true;
foreach(variteminli_th)
{
if(item.IsAlive)
{
isok=false;
Application.DoEvents();
Thread.Sleep(1000);
}
}
if(isok)
{
li_th.Clear();
break;
}
}
//最后一页要等所有的执行完才能执行
if(sr.Peek()==-1)
{
WriteTODB(TableName,Data,ExecBeforeSql,ExecAfterSql,dic,false,true);
LogUtil.Info("最后一次写入完成");
}
LogUtil.Info("线程退出开始新的循环...");
}
Data.Clear();
#endregion
}
sr.Dispose();
#endregion
}
else
{
using(SQLServerDBHelpersdb=newSQLServerDBHelper())
{
sdb.OpenConnection();
#region一次性读取处理
Data=LoadDataTableFromTxt(fpath,referror,Separator,HeaderRowsNum,regex,IsCheck,dic,TableName);
if(IsCheck)
{
DataRow[]rows=Data.Select("ErrorMsgisnotnull");
if(rows.Length>0)
{
LogUtil.Info($"读取{TableName}数据出错:{JsonConvert.SerializeObject(rows)}");
return;
}
}
LogUtil.Info($"读取{TableName}的txt数据完成.共读取数据:{Data.Rows.Count}条");
if(Data.Rows.Count==0||!string.IsNullOrWhiteSpace(error))
{
if(!string.IsNullOrWhiteSpace(error))
{
LogUtil.Info("读取数据出错,地址:"+Convert.ToString(row["FilePath"])+"\r\n错误:"+error);
}
return;
}
sdb.BgeinTransaction();
try
{
WriteTODB(TableName,Data,ExecBeforeSql,ExecAfterSql,dic,sdb:sdb);
sdb.CommitTransaction();
LogUtil.Info(TableName+"数据更新完毕!!");
}
catch(Exceptionex)
{
LogUtil.Info(TableName+"更新数据出错,错误:"+ex.Message+"\r\n堆栈:"+ex.StackTrace);
sdb.RollbackTransaction();
}
#endregion
}
}
GC.Collect();
});
}
privatevoidProcessPath(DataTableData,intpage,StreamReadersr,refintij,stringTableName,stringExecBeforeSql,stringExecAfterSql,Dictionarydic,boolIsCheck,Listli_th)
{
intthreadCount=Environment.ProcessorCount*4;
stringerror="";
PoolModelp=newPoolModel{TableName=TableName,ExecBeforeSql=ExecBeforeSql,ExecAfterSql=ExecAfterSql,dic=dic};
p.Data=Data.Copy();
if(IsCheck)
{
using(SQLServerDBHelpersdb=newSQLServerDBHelper())
{
error=CheckData(Data,TableName,dic,sdb);
}
DataRow[]rows=Data.Select("ErrorMsgisnotnull");
if(rows.Length>0||!string.IsNullOrWhiteSpace(error))
{
LogUtil.Info($"读取{TableName}数据出错:{JsonConvert.SerializeObject(rows)}\r\n错误:"+error);
return;
}
}
ij++;
if(ij==1)
{
WriteTODB(p.TableName,p.Data,p.ExecBeforeSql,p.ExecAfterSql,p.dic,true,false);
LogUtil.Info("首次写入完成");
}
elseif(sr.Peek()>-1)
{
Threadt=newThread(d=>
{
PoolModelc=dasPoolModel;
try
{
WriteTODB(c.TableName,c.Data,c.ExecBeforeSql,c.ExecAfterSql,c.dic,false,false);
}
catch(ThreadAbortException)
{
LogUtil.Error("线程退出.................");
}
catch(Exceptionex)
{
LogUtil.Error(c.TableName+"写入数据失败:"+ex.Message+"\r\n堆栈:"+ex.StackTrace+"\r\n数据:"+JsonConvert.SerializeObject(c.Data));
ExitApp();
return;
}
});
t.IsBackground=true;
t.Start(p);
li_th.Add(t);
}
}
publicvoidExitApp()
{
Application.Exit();
}
publicvoidWriteTODB(stringTableName,DataTableData,stringExecBeforeSql,stringExecAfterSql,Dictionarydic,boolfirst=true,boollast=true,SQLServerDBHelpersdb=null)
{
boolhave=false;
if(sdb==null)
{
sdb=newSQLServerDBHelper();
have=true;
}
if(first&&!string.IsNullOrWhiteSpace(ExecBeforeSql))
{
LogUtil.Info(TableName+"执行前Sql:"+ExecBeforeSql);
sdb.ExecuteNonQuery(ExecBeforeSql);
}
sdb.BulkCopy(Data,TableName,dic);
if(last&&!string.IsNullOrWhiteSpace(ExecAfterSql))
{
LogUtil.Info(TableName+"执行后Sql:"+ExecAfterSql);
sdb.ExecuteNonQuery(ExecAfterSql);
}
LogUtil.Info(TableName+"本次执行完成");
if(have)
{
sdb.Dispose();
}
}
publicstringCheckData(DataTabledt,stringdbTableName,Dictionarydic,SQLServerDBHelpersdb)
{
if(string.IsNullOrWhiteSpace(dbTableName))
{
return"表名不能为空!";
}
if(dic.Count==0)
{
return"映射关系数据不存在!";
}
ListerrorMsg=newList();
ListCols=newList();
dic.Foreach(c=>
{
if(!dt.Columns.Contains(c.Key))
{
errorMsg.Add(c.Key);
}
Cols.Add(c.Key);
});
if(errorMsg.Count>0)
{
return"数据列不完整,请与映射表的数据列数量保持一致!列:"+string.Join(",",errorMsg);
}
//如果行数据有错误信息则添加到这一列的值里
dt.Columns.Add(newDataColumn("ErrorMsg",typeof(string)){DefaultValue=""});
stringsql=@"--获取SqlServer中表结构
SELECTsyscolumns.nameasColName,systypes.nameasDBType,syscolumns.isnullable,
syscolumns.length
FROMsyscolumns,systypes
WHEREsyscolumns.xusertype=systypes.xusertype
ANDsyscolumns.id=object_id(@tb);";
DataSetds=sdb.GetDataSet(sql,newSqlParameter[]{newSqlParameter("@tb",dbTableName)});
EnumerableRowCollectionTableDef=ds.Tables[0].AsEnumerable();
//stringcolName="";
Objectobj_val;
//将表结构数据重组成字典.
vardic_Def=TableDef.ToDictionary(c=>Convert.ToString(c["ColName"]),d=>
{
stringDBType="";
stringold=Convert.ToString(d["DBType"]).ToUpper();
DBType=GetCSharpType(old);
returnnew{ColName=Convert.ToString(d["ColName"]),DBType=DBType,SqlType=old,IsNullble=Convert.ToBoolean(d["isnullable"]),Length=Convert.ToInt32(d["length"])};
});
DateTimenow=DateTime.Now;
foreach(DataRowrowindt.Rows)
{
errorMsg.Clear();
foreach(stringcolNameinCols)
{
if(dic.ContainsKey(colName))
{
if(!dic_Def.ContainsKey(dic[colName]))
{
return"Excel列名:"+colName+"映射数据表字段:"+dic[colName]+"在当前数据表中不存在!";
}
//去掉数据两边的空格
row[colName]=obj_val=Convert.ToString(row[colName]).Trim();
varinfo=dic_Def[dic[colName]];
//是否是DBNULL
if(obj_val.Equals(DBNull.Value))
{
if(!info.IsNullble)
{
errorMsg.Add("列"+colName+"不能为空!");
}
}
else
{
if(info.DBType=="String")
{
//time类型不用验证长度(日期的时间部分如17:12:30.0000)
if(info.SqlType=="TIME")
{
if(!DateTime.TryParse(now.ToString("yyyy-MM-dd")+""+obj_val.ToString(),outnow))
{
errorMsg.Add("列"+colName+"填写的数据无效应为日期的时间部分如:17:30:12");
}
}
elseif(Convert.ToString(obj_val).Length>info.Length)
{
errorMsg.Add("列"+colName+"长度超过配置长度:"+info.Length);
}
}
else
{
Typet=Type.GetType("System."+info.DBType);
try
{//如果数字中有千分位在这一步可以处理掉重新给这个列赋上正确的数值
row[colName]=Convert.ChangeType(obj_val,t);;
}
catch(Exceptionex)
{
errorMsg.Add("列"+colName+"填写的数据"+obj_val+"无效应为"+info.SqlType+"类型.");
}
}
}
}
}
row["ErrorMsg"]=string.Join("||",errorMsg);
}
return"";
}
///
///wm2018年11月28日13:37
///将数据库常用类型转为C#中的类名(.Net的类型名)
///
///
///
privatestringGetCSharpType(stringold)
{
stringDBType="";
switch(old)
{
case"INT":
case"BIGINT":
case"SMALLINT":
DBType="Int32";
break;
case"DECIMAL":
case"FLOAT":
case"NUMERIC":
DBType="Decimal";
break;
case"BIT":
DBType="Boolean";
break;
case"TEXT":
case"CHAR":
case"NCHAR":
case"VARCHAR":
case"NVARCHAR":
case"TIME":
DBType="String";
break;
case"DATE":
case"DATETIME":
DBType="DateTime";
break;
default:
thrownewException("GetCSharpType数据类型"+DBType+"无法识别!");
}
returnDBType;
}
publicclassPoolModel
{
publicstringTableName{get;set;}
publicDataTableData{get;set;}
publicstringExecBeforeSql{get;set;}
publicstringExecAfterSql{get;set;}
publicDictionarydic{get;set;}
}
//////wm2018年11月28日13:32 ///获取Txt数据并对数据进行校验返回一个带有ErrorMsg列的DataTable,如果数据校验失败则该字段存放失败的原因 ///注意:在使用该方法前需要数据表应该已经存在 /// ///是否校验数据合法性(数据需要校验则会按传入的dbTableName获取数据库表的结构出来验证) /// 如果需要验证数据则此处需要传映射关系keyExcel列名,Value数据库列名 /// 验证数据合法性的表(即数据会插入到的表) /// 非数据验证上的异常返回 /// 用来过滤数据的正则 /// 读取文件的路径 /// 列分隔符 /// 表头的行数 /// 如果需求验证则返回一个带有ErrorMsg列的DataTable,如果数据校验失败则该字段存放失败的原因,不需要验证则数据读取后直接返回DataTable publicDataTableLoadDataTableFromTxt(stringpath,refstringerror,stringSeparator,intHeaderRowsNum,ListRegexs=null,boolisCheck=false,Dictionary map=null,stringdbTableName="",SQLServerDBHelpersdb=null) { DataTabledt=newDataTable(); error=""; if(isCheck&&(map==null||map.Count==0||string.IsNullOrWhiteSpace(dbTableName))) { error="参数标明需要对表格数据进行校验,但没有指定映射表集合或数据表名."; returndt; } stringtxts=File.ReadAllText(path); #region把读出来的方便数据转成DataTable Regexs?.ForEach(c=> { txts=newRegex(c).Replace(txts,""); }); ////替换掉多表的正则 //Regexmu_re=newRegex(@"\+[-+]{4,}\s+\+[-+\s|\w./]{4,}\+");//FTPnewRegex(@"\+[-+]{4,}\s+\+[-+\s|\w./]{4,}\+");//原来以-分隔的newRegex(@"-{5,}(\s)+-{5,}\s+\|.+(\s)?\|.+(\s)?\|-{5,}"); ////去掉所有横线 //Regexmu_r=newRegex(@"[+-]{4,}");//FTPnewRegex(@"[+-]{4,}");//原newRegex(@"(\|-{5,})|(-{5,})"); //strings1=mu_re.Replace(txts,""); //strings2=mu_r.Replace(s1,""); //string[]tts=s2.Split(newstring[]{"\r\n"},StringSplitOptions.None); string[]tts=txts.Split(newstring[]{"\r\n"},StringSplitOptions.None); string[]vals; strings1; //生成表头默认第一行时表头直到遇到第一个只有一个|的内容为止(有几行表头,下面的内容就会有几行) intheaderNum=-1;//记录表头有几列 DataRowdr; //处理col重复的问题,如果有重复按第几个来命名比如A1A2 Dictionary col_Rep=newDictionary (); stringcolName=""; boolisre=false;//记录当前是否有重复列 intempty_HeaderRow=0; for(inti=0;i c.Value>1).Select(c=>c.Key))+"存在重复"; returndt; } //多行时把多行的数据加在一起处理 if(headerNum>1) { for(intj=1;j 总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对毛票票的支持。