深入了解如何基于Python读写Kafka
这篇文章主要介绍了深入了解如何基于Python读写Kafka,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
本篇会给出如何使用python来读写kafka,包含生产者和消费者.
以下使用kafka-python客户端
生产者
爬虫大多时候作为消息的发送端,在消息发出去后最好能记录消息被发送到了哪个分区,offset是多少,这些记录在很多情况下可以帮助快速定位问题,所以需要在send方法后加入callback函数,包括成功和失败的处理
#-*-coding:utf-8-*-
'''
callback也是保证分区有序的,比如2条消息,a先发送,b后发送,对于同一个分区,那么会先回调a的callback,再回调b的callback
'''
importjson
fromkafkaimportKafkaProducer
topic='demo'
defon_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
defon_send_error(excp):
print('Iamanerrback:{}'.format(excp))
defmain():
producer=KafkaProducer(
bootstrap_servers='localhost:9092'
)
producer.send(topic,value=b'{"test_msg":"helloworld"}').add_callback(on_send_success).add_callback(
on_send_error)
#close()方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer
producer.close()
defmain2():
'''
发送json格式消息
:return:
'''
producer=KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambdam:json.dumps(m).encode('utf-8')
)
producer.send(topic,value={"test_msg":"helloworld"}).add_callback(on_send_success).add_callback(
on_send_error)
#close()方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer
producer.close()
if__name__=='__main__':
#main()
main2()
消费者
kafka的消费模型比较复杂,我会分以下几种情况来进行说明
1.不使用消费组(group_id=None)
不使用消费组的情况下可以启动很多个消费者,不再受限于分区数,即使消费者数量>分区数,每个消费者也都可以收到消息
#-*-coding:utf-8-*- ''' 消费者:group_id=None ''' fromkafkaimportKafkaConsumer topic='demo' defmain(): consumer=KafkaConsumer( topic, bootstrap_servers='localhost:9092', auto_offset_reset='latest', #auto_offset_reset='earliest', ) formsginconsumer: print(msg) print(msg.value) consumer.close() if__name__=='__main__': main()
2.指定消费组
以下使用pool方法来拉取消息
pool每次拉取只能拉取一个分区的消息,比如有2个分区1个consumer,那么会拉取2次
pool是如果有消息马上进行拉取,如果timeout_ms内没有新消息则返回空dict,所以可能出现某次拉取了1条消息,某次拉取了max_records条
#-*-coding:utf-8-*-
'''
消费者:指定group_id
'''
fromkafkaimportKafkaConsumer
topic='demo'
group_id='test_id'
defmain():
consumer=KafkaConsumer(
topic,
bootstrap_servers='localhost:9092',
auto_offset_reset='latest',
group_id=group_id,
)
whileTrue:
try:
#returnadict
batch_msgs=consumer.poll(timeout_ms=1000,max_records=2)
ifnotbatch_msgs:
continue
'''
{TopicPartition(topic='demo',partition=0):[ConsumerRecord(topic='demo',partition=0,offset=42,timestamp=1576425111411,timestamp_type=0,key=None,value=b'74',headers=[],checksum=None,serialized_key_size=-1,serialized_value_size=2,serialized_header_size=-1)]}
'''
fortp,msgsinbatch_msgs.items():
print('topic:{},partition:{}receivelength:'.format(tp.topic,tp.partition,len(msgs)))
formsginmsgs:
print(msg.value)
exceptKeyboardInterrupt:
break
consumer.close()
if__name__=='__main__':
main()
关于消费组
我们根据配置参数分为以下几种情况
- group_id=None
- auto_offset_reset='latest':每次启动都会从最新出开始消费,重启后会丢失重启过程中的数据
- auto_offset_reset='latest':每次从最新的开始消费,不会管哪些任务还没有消费
- 指定group_id
- 全新group_id
- auto_offset_reset='latest':只消费启动后的收到的数据,重启后会从上次提交offset的地方开始消费
- auto_offset_reset='earliest':从最开始消费全量数据
- 旧group_id(即kafka集群中还保留着该group_id的提交记录)
- auto_offset_reset='latest':从上次提交offset的地方开始消费
- auto_offset_reset='earliest':从上次提交offset的地方开始消费
- 全新group_id
性能测试
以下是在本地进行的测试,如果要在线上使用kakfa,建议提前进行性能测试
producer
#-*-coding:utf-8-*-
'''
producerperformance
environment:
mac
python3.7
broker1
partition2
'''
importjson
importtime
fromkafkaimportKafkaProducer
topic='demo'
nums=1000000
defmain():
producer=KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambdam:json.dumps(m).encode('utf-8')
)
st=time.time()
cnt=0
for_inrange(nums):
producer.send(topic,value=_)
cnt+=1
ifcnt%10000==0:
print(cnt)
producer.flush()
et=time.time()
cost_time=et-st
print('sendnums:{},costtime:{},rate:{}/s'.format(nums,cost_time,nums//cost_time))
if__name__=='__main__':
main()
'''
sendnums:1000000,costtime:61.89236712455749,rate:16157.0/s
sendnums:1000000,costtime:61.29534196853638,rate:16314.0/s
'''
consumer
#-*-coding:utf-8-*-
'''
consumerperformance
'''
importtime
fromkafkaimportKafkaConsumer
topic='demo'
group_id='test_id'
defmain1():
nums=0
st=time.time()
consumer=KafkaConsumer(
topic,
bootstrap_servers='localhost:9092',
auto_offset_reset='latest',
group_id=group_id
)
formsginconsumer:
nums+=1
ifnums>=500000:
break
consumer.close()
et=time.time()
cost_time=et-st
print('one_by_one:consumenums:{},costtime:{},rate:{}/s'.format(nums,cost_time,nums//cost_time))
defmain2():
nums=0
st=time.time()
consumer=KafkaConsumer(
topic,
bootstrap_servers='localhost:9092',
auto_offset_reset='latest',
group_id=group_id
)
running=True
batch_pool_nums=1
whilerunning:
batch_msgs=consumer.poll(timeout_ms=1000,max_records=batch_pool_nums)
ifnotbatch_msgs:
continue
fortp,msgsinbatch_msgs.items():
nums+=len(msgs)
ifnums>=500000:
running=False
break
consumer.close()
et=time.time()
cost_time=et-st
print('batch_pool:max_records:{}consumenums:{},costtime:{},rate:{}/s'.format(batch_pool_nums,nums,
cost_time,
nums//cost_time))
if__name__=='__main__':
#main1()
main2()
'''
one_by_one:consumenums:500000,costtime:8.018627166748047,rate:62354.0/s
one_by_one:consumenums:500000,costtime:7.698841094970703,rate:64944.0/s
batch_pool:max_records:1consumenums:500000,costtime:17.975456953048706,rate:27815.0/s
batch_pool:max_records:1consumenums:500000,costtime:16.711708784103394,rate:29919.0/s
batch_pool:max_records:500consumenums:500369,costtime:6.654940843582153,rate:75187.0/s
batch_pool:max_records:500consumenums:500183,costtime:6.854053258895874,rate:72976.0/s
batch_pool:max_records:1000consumenums:500485,costtime:6.504687070846558,rate:76942.0/s
batch_pool:max_records:1000consumenums:500775,costtime:7.047331809997559,rate:71058.0/s
'''
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。