python消费kafka数据批量插入到es的方法
1、es的批量插入
这是为了方便后期配置的更改,把配置信息放在logging.conf中
用elasticsearch来实现批量操作,先安装依赖包,sudopipinstallElasticsearch2
fromelasticsearchimportElasticsearch
classImportEsData:
logging.config.fileConfig("logging.conf")
logger=logging.getLogger("msg")
def__init__(self,hosts,index,type):
self.es=Elasticsearch(hosts=hosts.strip(',').split(','),timeout=5000)
self.index=index
self.type=type
defset_date(self,data):
#批量处理
#es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()})
self.es.index(index=self.index,doc_type=self.index,body=data)
2、使用pykafka消费kafka
1.因为kafka是0.8,pykafka不支持zk,只能用get_simple_consumer来实现
2.为了实现多个应用同时消费而且不重消费,所以一个应用消费一个partition
3.为是确保消费数据量在不满足10000这个批量值,能在一个时间范围内插入到es中,这里设置consumer_timeout_ms一个超时等待时间,退出等待消费阻塞。
4.退出等待消费阻塞后导致无法再消费数据,因此在获取self.consumer的外层加入了whileTrue一个死循环
#!/usr/bin/python
#-*-coding:UTF-8-*-
frompykafkaimportKafkaClient
importlogging
importlogging.config
fromConfigUtilimportConfigUtil
importdatetime
classKafkaPython:
logging.config.fileConfig("logging.conf")
logger=logging.getLogger("msg")
logger_data=logging.getLogger("data")
def__init__(self):
self.server=ConfigUtil().get("kafka","kafka_server")
self.topic=ConfigUtil().get("kafka","topic")
self.group=ConfigUtil().get("kafka","group")
self.partition_id=int(ConfigUtil().get("kafka","partition"))
self.consumer_timeout_ms=int(ConfigUtil().get("kafka","consumer_timeout_ms"))
self.consumer=None
self.hosts=ConfigUtil().get("es","hosts")
self.index_name=ConfigUtil().get("es","index_name")
self.type_name=ConfigUtil().get("es","type_name")
defgetConnect(self):
client=KafkaClient(self.server)
topic=client.topics[self.topic]
p=topic.partitions
ps={p.get(self.partition_id)}
self.consumer=topic.get_simple_consumer(
consumer_group=self.group,
auto_commit_enable=True,
consumer_timeout_ms=self.consumer_timeout_ms,
#num_consumer_fetchers=1,
#consumer_id='test1',
partitions=ps
)
self.starttime=datetime.datetime.now()
defbeginConsumer(self):
print("beginConsumerkafka-python")
imprtEsData=ImportEsData(self.hosts,self.index_name,self.type_name)
#创建ACTIONS
count=0
ACTIONS=[]
whileTrue:
endtime=datetime.datetime.now()
print(endtime-self.starttime).seconds
formessageinself.consumer:
ifmessageisnotNone:
try:
count=count+1
#print(str(message.partition.id)+","+str(message.offset)+","+str(count))
#self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count))
action={
"_index":self.index_name,
"_type":self.type_name,
"_source":message.value
}
ACTIONS.append(action)
iflen(ACTIONS)>=10000:
imprtEsData.set_date(ACTIONS)
ACTIONS=[]
self.consumer.commit_offsets()
endtime=datetime.datetime.now()
print(endtime-self.starttime).seconds
#break
except(Exception)ase:
#self.consumer.commit_offsets()
print(e)
self.logger.error(e)
self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"\n")
#self.logger_data.error(message.value+"\n")
#self.consumer.commit_offsets()
iflen(ACTIONS)>0:
self.logger.info("等待时间超过,consumer_timeout_ms,把集合数据插入es")
imprtEsData.set_date(ACTIONS)
ACTIONS=[]
self.consumer.commit_offsets()
defdisConnect(self):
self.consumer.close()
fromelasticsearchimportElasticsearch
fromelasticsearch.helpersimportbulk
classImportEsData:
logging.config.fileConfig("logging.conf")
logger=logging.getLogger("msg")
def__init__(self,hosts,index,type):
self.es=Elasticsearch(hosts=hosts.strip(',').split(','),timeout=5000)
self.index=index
self.type=type
defset_date(self,data):
#批量处理
success=bulk(self.es,data,index=self.index,raise_on_error=True)
self.logger.info(success)
3、运行
if__name__=='__main__': kp=KafkaPython() kp.getConnect() kp.beginConsumer() #kp.disConnect()
注:简单的写了一个从kafka中读取数据到一个list里,当数据达到一个阈值时,在批量插入到es的插件
现在还在批量的压测中。。。
以上这篇python消费kafka数据批量插入到es的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。