Python实现 多进程导入CSV数据到 MySQL
前段时间帮同事处理了一个把CSV数据导入到MySQL的需求。两个很大的CSV文件,分别有3GB、2100万条记录和7GB、3500万条记录。对于这个量级的数据,用简单的单进程/单线程导入会耗时很久,最终用了多进程的方式来实现。具体过程不赘述,记录一下几个要点:
- 批量插入而不是逐条插入
- 为了加快插入速度,先不要建索引
- 生产者和消费者模型,主进程读文件,多个worker进程执行插入
- 注意控制worker的数量,避免对MySQL造成太大的压力
- 注意处理脏数据导致的异常
- 原始数据是GBK编码,所以还要注意转换成UTF-8
- 用click封装命令行工具
具体的代码实现如下:
#!/usr/bin/envpython
#-*-coding:utf-8-*-
importcodecs
importcsv
importlogging
importmultiprocessing
importos
importwarnings
importclick
importMySQLdb
importsqlalchemy
warnings.filterwarnings('ignore',category=MySQLdb.Warning)
#批量插入的记录数量
BATCH=5000
DB_URI='mysql://root@localhost:3306/example?charset=utf8'
engine=sqlalchemy.create_engine(DB_URI)
defget_table_cols(table):
sql='SELECT*FROM`{table}`LIMIT0'.format(table=table)
res=engine.execute(sql)
returnres.keys()
definsert_many(table,cols,rows,cursor):
sql='INSERTINTO`{table}`({cols})VALUES({marks})'.format(
table=table,
cols=','.join(cols),
marks=','.join(['%s']*len(cols)))
cursor.execute(sql,*rows)
logging.info('process%sinserted%srowsintotable%s',os.getpid(),len(rows),table)
definsert_worker(table,cols,queue):
rows=[]
#每个子进程创建自己的engine对象
cursor=sqlalchemy.create_engine(DB_URI)
whileTrue:
row=queue.get()
ifrowisNone:
ifrows:
insert_many(table,cols,rows,cursor)
break
rows.append(row)
iflen(rows)==BATCH:
insert_many(table,cols,rows,cursor)
rows=[]
definsert_parallel(table,reader,w=10):
cols=get_table_cols(table)
#数据队列,主进程读文件并往里写数据,worker进程从队列读数据
#注意一下控制队列的大小,避免消费太慢导致堆积太多数据,占用过多内存
queue=multiprocessing.Queue(maxsize=w*BATCH*2)
workers=[]
foriinrange(w):
p=multiprocessing.Process(target=insert_worker,args=(table,cols,queue))
p.start()
workers.append(p)
logging.info('starting#%sworkerprocess,pid:%s...',i+1,p.pid)
dirty_data_file='./{}_dirty_rows.csv'.format(table)
xf=open(dirty_data_file,'w')
writer=csv.writer(xf,delimiter=reader.dialect.delimiter)
forlineinreader:
#记录并跳过脏数据:键值数量不一致
iflen(line)!=len(cols):
writer.writerow(line)
continue
#把None值替换为'NULL'
clean_line=[Noneifx=='NULL'elsexforxinline]
#往队列里写数据
queue.put(tuple(clean_line))
ifreader.line_num%500000==0:
logging.info('put%stasksintoqueue.',reader.line_num)
xf.close()
#给每个worker发送任务结束的信号
logging.info('sendclosesignaltoworkerprocesses')
foriinrange(w):
queue.put(None)
forpinworkers:
p.join()
defconvert_file_to_utf8(f,rv_file=None):
ifnotrv_file:
name,ext=os.path.splitext(f)
ifisinstance(name,unicode):
name=name.encode('utf8')
rv_file='{}_utf8{}'.format(name,ext)
logging.info('starttoprocessfile%s',f)
withopen(f)asinfd:
withopen(rv_file,'w')asoutfd:
lines=[]
loop=0
chunck=200000
first_line=infd.readline().strip(codecs.BOM_UTF8).strip()+'\n'
lines.append(first_line)
forlineininfd:
clean_line=line.decode('gb18030').encode('utf8')
clean_line=clean_line.rstrip()+'\n'
lines.append(clean_line)
iflen(lines)==chunck:
outfd.writelines(lines)
lines=[]
loop+=1
logging.info('processed%slines.',loop*chunck)
outfd.writelines(lines)
logging.info('processed%slines.',loop*chunck+len(lines))
@click.group()
defcli():
logging.basicConfig(level=logging.INFO,
format='%(asctime)s-%(levelname)s-%(name)s-%(message)s')
@cli.command('gbk_to_utf8')
@click.argument('f')
defconvert_gbk_to_utf8(f):
convert_file_to_utf8(f)
@cli.command('load')
@click.option('-t','--table',required=True,help='表名')
@click.option('-i','--filename',required=True,help='输入文件')
@click.option('-w','--workers',default=10,help='worker数量,默认10')
defload_fac_day_pro_nos_sal_table(table,filename,workers):
withopen(filename)asfd:
fd.readline()#skipheader
reader=csv.reader(fd)
insert_parallel(table,reader,w=workers)
if__name__=='__main__':
cli()
以上就是本文给大家分享的全部没人了,希望大家能够喜欢