深入了解如何基于Python读写Kafka
这篇文章主要介绍了深入了解如何基于Python读写Kafka,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
本篇会给出如何使用python来读写kafka,包含生产者和消费者.
以下使用kafka-python客户端
生产者
爬虫大多时候作为消息的发送端,在消息发出去后最好能记录消息被发送到了哪个分区,offset是多少,这些记录在很多情况下可以帮助快速定位问题,所以需要在send方法后加入callback函数,包括成功和失败的处理
#-*-coding:utf-8-*- ''' callback也是保证分区有序的,比如2条消息,a先发送,b后发送,对于同一个分区,那么会先回调a的callback,再回调b的callback ''' importjson fromkafkaimportKafkaProducer topic='demo' defon_send_success(record_metadata): print(record_metadata.topic) print(record_metadata.partition) print(record_metadata.offset) defon_send_error(excp): print('Iamanerrback:{}'.format(excp)) defmain(): producer=KafkaProducer( bootstrap_servers='localhost:9092' ) producer.send(topic,value=b'{"test_msg":"helloworld"}').add_callback(on_send_success).add_callback( on_send_error) #close()方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer producer.close() defmain2(): ''' 发送json格式消息 :return: ''' producer=KafkaProducer( bootstrap_servers='localhost:9092', value_serializer=lambdam:json.dumps(m).encode('utf-8') ) producer.send(topic,value={"test_msg":"helloworld"}).add_callback(on_send_success).add_callback( on_send_error) #close()方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer producer.close() if__name__=='__main__': #main() main2()
消费者
kafka的消费模型比较复杂,我会分以下几种情况来进行说明
1.不使用消费组(group_id=None)
不使用消费组的情况下可以启动很多个消费者,不再受限于分区数,即使消费者数量>分区数,每个消费者也都可以收到消息
#-*-coding:utf-8-*- ''' 消费者:group_id=None ''' fromkafkaimportKafkaConsumer topic='demo' defmain(): consumer=KafkaConsumer( topic, bootstrap_servers='localhost:9092', auto_offset_reset='latest', #auto_offset_reset='earliest', ) formsginconsumer: print(msg) print(msg.value) consumer.close() if__name__=='__main__': main()
2.指定消费组
以下使用pool方法来拉取消息
pool每次拉取只能拉取一个分区的消息,比如有2个分区1个consumer,那么会拉取2次
pool是如果有消息马上进行拉取,如果timeout_ms内没有新消息则返回空dict,所以可能出现某次拉取了1条消息,某次拉取了max_records条
#-*-coding:utf-8-*- ''' 消费者:指定group_id ''' fromkafkaimportKafkaConsumer topic='demo' group_id='test_id' defmain(): consumer=KafkaConsumer( topic, bootstrap_servers='localhost:9092', auto_offset_reset='latest', group_id=group_id, ) whileTrue: try: #returnadict batch_msgs=consumer.poll(timeout_ms=1000,max_records=2) ifnotbatch_msgs: continue ''' {TopicPartition(topic='demo',partition=0):[ConsumerRecord(topic='demo',partition=0,offset=42,timestamp=1576425111411,timestamp_type=0,key=None,value=b'74',headers=[],checksum=None,serialized_key_size=-1,serialized_value_size=2,serialized_header_size=-1)]} ''' fortp,msgsinbatch_msgs.items(): print('topic:{},partition:{}receivelength:'.format(tp.topic,tp.partition,len(msgs))) formsginmsgs: print(msg.value) exceptKeyboardInterrupt: break consumer.close() if__name__=='__main__': main()
关于消费组
我们根据配置参数分为以下几种情况
- group_id=None
- auto_offset_reset='latest':每次启动都会从最新出开始消费,重启后会丢失重启过程中的数据
- auto_offset_reset='latest':每次从最新的开始消费,不会管哪些任务还没有消费
- 指定group_id
- 全新group_id
- auto_offset_reset='latest':只消费启动后的收到的数据,重启后会从上次提交offset的地方开始消费
- auto_offset_reset='earliest':从最开始消费全量数据
- 旧group_id(即kafka集群中还保留着该group_id的提交记录)
- auto_offset_reset='latest':从上次提交offset的地方开始消费
- auto_offset_reset='earliest':从上次提交offset的地方开始消费
- 全新group_id
性能测试
以下是在本地进行的测试,如果要在线上使用kakfa,建议提前进行性能测试
producer
#-*-coding:utf-8-*- ''' producerperformance environment: mac python3.7 broker1 partition2 ''' importjson importtime fromkafkaimportKafkaProducer topic='demo' nums=1000000 defmain(): producer=KafkaProducer( bootstrap_servers='localhost:9092', value_serializer=lambdam:json.dumps(m).encode('utf-8') ) st=time.time() cnt=0 for_inrange(nums): producer.send(topic,value=_) cnt+=1 ifcnt%10000==0: print(cnt) producer.flush() et=time.time() cost_time=et-st print('sendnums:{},costtime:{},rate:{}/s'.format(nums,cost_time,nums//cost_time)) if__name__=='__main__': main() ''' sendnums:1000000,costtime:61.89236712455749,rate:16157.0/s sendnums:1000000,costtime:61.29534196853638,rate:16314.0/s '''
consumer
#-*-coding:utf-8-*- ''' consumerperformance ''' importtime fromkafkaimportKafkaConsumer topic='demo' group_id='test_id' defmain1(): nums=0 st=time.time() consumer=KafkaConsumer( topic, bootstrap_servers='localhost:9092', auto_offset_reset='latest', group_id=group_id ) formsginconsumer: nums+=1 ifnums>=500000: break consumer.close() et=time.time() cost_time=et-st print('one_by_one:consumenums:{},costtime:{},rate:{}/s'.format(nums,cost_time,nums//cost_time)) defmain2(): nums=0 st=time.time() consumer=KafkaConsumer( topic, bootstrap_servers='localhost:9092', auto_offset_reset='latest', group_id=group_id ) running=True batch_pool_nums=1 whilerunning: batch_msgs=consumer.poll(timeout_ms=1000,max_records=batch_pool_nums) ifnotbatch_msgs: continue fortp,msgsinbatch_msgs.items(): nums+=len(msgs) ifnums>=500000: running=False break consumer.close() et=time.time() cost_time=et-st print('batch_pool:max_records:{}consumenums:{},costtime:{},rate:{}/s'.format(batch_pool_nums,nums, cost_time, nums//cost_time)) if__name__=='__main__': #main1() main2() ''' one_by_one:consumenums:500000,costtime:8.018627166748047,rate:62354.0/s one_by_one:consumenums:500000,costtime:7.698841094970703,rate:64944.0/s batch_pool:max_records:1consumenums:500000,costtime:17.975456953048706,rate:27815.0/s batch_pool:max_records:1consumenums:500000,costtime:16.711708784103394,rate:29919.0/s batch_pool:max_records:500consumenums:500369,costtime:6.654940843582153,rate:75187.0/s batch_pool:max_records:500consumenums:500183,costtime:6.854053258895874,rate:72976.0/s batch_pool:max_records:1000consumenums:500485,costtime:6.504687070846558,rate:76942.0/s batch_pool:max_records:1000consumenums:500775,costtime:7.047331809997559,rate:71058.0/s '''
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。