Tensorflow 多线程与多进程数据加载实例
在项目中遇到需要处理超级大量的数据集,无法载入内存的问题就不用说了,单线程分批读取和处理(虽然这个处理也只是特别简单的首尾相连的操作)也会使瓶颈出现在CPU性能上,所以研究了一下多线程和多进程的数据读取和预处理,都是通过调用datasetapi实现
1.多线程数据读取
第一种方法是可以直接从csv里读取数据,但返回值是tensor,需要在sess里run一下才能返回真实值,无法实现真正的并行处理,但如果直接用csv文件或其他什么文件存了特征值,可以直接读取后进行训练,可使用这种方法.
importtensorflowastf
#这里是返回的数据类型,具体内容无所谓,类型对应就好了,比如我这个,就是一个四维的向量,前三维是字符串类型最后一维是int类型
record_defaults=[[""],[""],[""],[0]]
defdecode_csv(line):
parsed_line=tf.decode_csv(line,record_defaults)
label=parsed_line[-1]#label
delparsed_line[-1]#deletethelastelementfromthelist
features=tf.stack(parsed_line)#Stackfeaturessothatyoucanlatervectorizeforwardprop.,etc.
#label=tf.stack(label)#NOTneeded.Onlyifmorethan1columnmakesthelabel...
batch_to_return=features,label
returnbatch_to_return
filenames=tf.placeholder(tf.string,shape=[None])
dataset5=tf.data.Dataset.from_tensor_slices(filenames)
#在这里设置线程数目
dataset5=dataset5.flat_map(lambdafilename:tf.data.TextLineDataset(filename).skip(1).map(decode_csv,num_parallel_calls=15))
dataset5=dataset5.shuffle(buffer_size=1000)
dataset5=dataset5.batch(32)#batch_size
iterator5=dataset5.make_initializable_iterator()
next_element5=iterator5.get_next()
#这里是需要加载的文件名
training_filenames=["train.csv"]
validation_filenames=["vali.csv"]
withtf.Session()assess:
for_inrange(2):
#通过文件名初始化迭代器
sess.run(iterator5.initializer,feed_dict={filenames:training_filenames})
whileTrue:
try:
#这里获得真实值
features,labels=sess.run(next_element5)
#Train...
#print("(train)features:")
#print(features)
#print("(train)labels:")
#print(labels)
excepttf.errors.OutOfRangeError:
print("Outofrangeerrortriggered(loopedthroughtrainingset1time)")
break
#Validate(cost,accuracy)ontrainset
print("\nDonewiththefirstiterator\n")
sess.run(iterator5.initializer,feed_dict={filenames:validation_filenames})
whileTrue:
try:
features,labels=sess.run(next_element5)
#Validate(cost,accuracy)ondevset
#print("(dev)features:")
#print(features)
#print("(dev)labels:")
#print(labels)
excepttf.errors.OutOfRangeError:
print("Outofrangeerrortriggered(loopedthroughdevset1timeonly)")
break
第二种方法,基于生成器,可以进行预处理操作了,sess里run出来的结果可以直接进行输入训练,但需要自己写一个生成器,我使用的测试代码如下:
importtensorflowastf
importrandom
importthreading
importnumpyasnp
fromdataimportload_image,load_wave
classSequenceData():
def__init__(self,path,batch_size=32):
self.path=path
self.batch_size=batch_size
f=open(path)
self.datas=f.readlines()
self.L=len(self.datas)
self.index=random.sample(range(self.L),self.L)
def__len__(self):
returnself.L-self.batch_size
def__getitem__(self,idx):
batch_indexs=self.index[idx:(idx+self.batch_size)]
batch_datas=[self.datas[k]forkinbatch_indexs]
img1s,img2s,audios,labels=self.data_generation(batch_datas)
returnimg1s,img2s,audios,labels
defgen(self):
foriinrange(100000):
t=self.__getitem__(i)
yieldt
defdata_generation(self,batch_datas):
#预处理操作,数据在参数里
returnimg1s,img2s,audios,labels
#这里的type要和实际返回的数据类型对应,如果在自己的处理代码里已经考虑的batchszie,那这里的batch设为1即可
dataset=tf.data.Dataset().batch(1).from_generator(SequenceData('train.csv').gen,
output_types=(tf.float32,tf.float32,tf.float32,tf.int64))
dataset=dataset.map(lambdax,y,z,w:(x,y,z,w),num_parallel_calls=32).prefetch(buffer_size=1000)
X,y,z,w=dataset.make_one_shot_iterator().get_next()
withtf.Session()assess:
for_inrange(100000):
a,b,c,d=sess.run([X,y,z,w])
print(a.shape)
不过python的多线程并不是真正的多线程,虽然看起来我是启动了32线程,但运行时的CPU占用如下所示:
还剩这么多核心空着,然后就是第三个版本了,使用了queue来缓存数据,训练需要数据时直接从queue中进行读取,是一个到多进程的过度版本(vscode没法debug多进程,坑啊,还以为代码写错了,在vscode里多进程直接就没法运行),在初始化时启动多个线程进行数据的预处理:
importtensorflowastf
importrandom
importthreading
importnumpyasnp
fromdataimportload_image,load_wave
fromqueueimportQueue
classSequenceData():
def__init__(self,path,batch_size=32):
self.path=path
self.batch_size=batch_size
f=open(path)
self.datas=f.readlines()
self.L=len(self.datas)
self.index=random.sample(range(self.L),self.L)
self.queue=Queue(maxsize=20)
foriinrange(32):
threading.Thread(target=self.f).start()
def__len__(self):
returnself.L-self.batch_size
def__getitem__(self,idx):
batch_indexs=self.index[idx:(idx+self.batch_size)]
batch_datas=[self.datas[k]forkinbatch_indexs]
img1s,img2s,audios,labels=self.data_generation(batch_datas)
returnimg1s,img2s,audios,labels
deff(self):
foriinrange(int(self.__len__()/self.batch_size)):
t=self.__getitem__(i)
self.queue.put(t)
defgen(self):
while1:
yieldself.queue.get()
defdata_generation(self,batch_datas):
#数据预处理操作
returnimg1s,img2s,audios,labels
#这里的type要和实际返回的数据类型对应,如果在自己的处理代码里已经考虑的batchszie,那这里的batch设为1即可
dataset=tf.data.Dataset().batch(1).from_generator(SequenceData('train.csv').gen,
output_types=(tf.float32,tf.float32,tf.float32,tf.int64))
dataset=dataset.map(lambdax,y,z,w:(x,y,z,w),num_parallel_calls=1).prefetch(buffer_size=1000)
X,y,z,w=dataset.make_one_shot_iterator().get_next()
withtf.Session()assess:
for_inrange(100000):
a,b,c,d=sess.run([X,y,z,w])
print(a.shape)
2.多进程数据读取
这里的代码和多线程的第三个版本非常类似,修改为启动进程和进程类里的Queue即可,但千万不要在vscode里直接debug!在vscode里直接f5运行进程并不能启动.
from__future__importunicode_literals
fromfunctoolsimportreduce
importtensorflowastf
importnumpyasnp
importwarnings
importargparse
importskimage.io
importskimage.transform
importskimage
importscipy.io.wavfile
frommultiprocessingimportProcess,Queue
classSequenceData():
def__init__(self,path,batch_size=32):
self.path=path
self.batch_size=batch_size
f=open(path)
self.datas=f.readlines()
self.L=len(self.datas)
self.index=random.sample(range(self.L),self.L)
self.queue=Queue(maxsize=30)
self.Process_num=32
foriinrange(self.Process_num):
print(i,'start')
ii=int(self.__len__()/self.Process_num)
t=Process(target=self.f,args=(i*ii,(i+1)*ii))
t.start()
def__len__(self):
returnself.L-self.batch_size
def__getitem__(self,idx):
batch_indexs=self.index[idx:(idx+self.batch_size)]
batch_datas=[self.datas[k]forkinbatch_indexs]
img1s,img2s,audios,labels=self.data_generation(batch_datas)
returnimg1s,img2s,audios,labels
deff(self,i_l,i_h):
foriinrange(i_l,i_h):
t=self.__getitem__(i)
self.queue.put(t)
defgen(self):
while1:
t=self.queue.get()
yieldt[0],t[1],t[2],t[3]
defdata_generation(self,batch_datas):
#数据预处理操作
returnimg1s,img2s,audios,labels
epochs=2
data_g=SequenceData('train_1.csv',batch_size=48)
dataset=tf.data.Dataset().batch(1).from_generator(data_g.gen,
output_types=(tf.float32,tf.float32,tf.float32,tf.float32))
X,y,z,w=dataset.make_one_shot_iterator().get_next()
withtf.Session()assess:
tf.global_variables_initializer().run()
foriinrange(epochs):
forjinrange(int(len(data_g)/(data_g.batch_size))):
face1,face2,voice,labels=sess.run([X,y,z,w])
print(face1.shape)
然后,最后实现的效果
以上这篇Tensorflow多线程与多进程数据加载实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。