參考文章: https://zhuanlan.zhihu.com/p/279784873
生産者代碼:
import traceback
from kafka import KafkaProducer,KafkaConsumer
from faker import Faker
fake=Faker()
# 生産者
kafka_topic = "test_kafka_demo"
kafka_bootstrap_servers = ['xx:9092','xx:9092','xxx:9092']
# 消費者
kafka_topic_group = "test-group-zeze" #消費者群組
def producer(num:int):
producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers)
phones = [fake.name()+"-"+str(i) for i in range(num)]
for p in phones:
msg = bytes(p, encoding='utf-8')
# print("生成消息",msg)
#同一個key的消息會被自動配置設定到同一個分區
future=producer.send(kafka_topic, key=b"test",value=msg)
#加了監聽事件是否成功發送後,執行速度很慢,是以這裡去掉了
# try:
# future.get(timeout=2)
# except Exception as e:
# traceback.print_stack()
print("成功生産{}條消息".format(num))
producer.close()
消費者代碼:
from kafka import KafkaConsumer
# 生産者
kafka_topic = "test_kafka_demo"
kafka_bootstrap_servers = ['xx:9092','xx:9092','xx:9092'] #xx為對應的ip位址
# 消費者
kafka_topic_group = "test-group-zeze" #消費者群組
def consumer():
consumer = KafkaConsumer(kafka_topic,group_id=kafka_topic_group,bootstrap_servers=kafka_bootstrap_servers)
for message in consumer:
print ("開始消費:","%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
# consumer.close()
親測單機生産10w消息耗時20秒内,單線程
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5yNygDMzQTO30SOwgTOxkjMwEjMygDMxIDMy0yN2MzN1MTMvwFOwEjMwIzLcdjNzcTNzEzLcd2bsJ2Lc12bj5ycn9Gbi52YuAjMwIzZtl2Lc9CX6MHc0RHaiojIsJye.png)
消費者沒記錄耗時,但是也非常快,kafka性能确實牛
腦子不夠用當然隻能腳踏實地的做事情!