1.可以将kafka中的偏移量自動重置為最新的
2.使用于kafka有積壓,但是也不想處理積壓,直接消費最新的資料
3.此版本隻支援offset存儲在zk中, 暫未提供offset存儲在kafka中的版本
# -*- coding:utf-8 -*-
import time
import sys
from kafka.client_async import KafkaClient
from kafka.protocol.commit import OffsetCommitRequest_v0
from kafka.protocol.offset import OffsetRequest_v0, OffsetResponse_v0
from kafka.structs import TopicPartition
servers = '192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092'
gid = 'group_id'
topic = 'topic'
# 手動重置時需要配置,同時修改main方法中調用方法
manual_logsize = sys.maxint
def parse_logsize(t, p, responses):
"""
單個broker中單個partition的logsize
:param responses:
:param p:
:param t:
:return:
"""
for response in responses:
if not isinstance(response, OffsetResponse_v0):
continue
tps = response.topics
tpc = tps[0][0]
partition_list = tps[0][1]
parti = partition_list[0][0]
if tpc == t and parti == p and partition_list[0][1] == 0:
logsize_list = partition_list[0][2]
logsize = logsize_list[0]
return logsize
return None
def auto_lateset(g, t):
"""
自動重置為最新的offset
"""
client = KafkaClient(bootstrap_servers=servers, request_timeout_ms=3000)
partitions = client.cluster.partitions_for_topic(t)
for partition in partitions:
nodeId = client.cluster.leader_for_partition(TopicPartition(topic=t, partition=partition))
while not client.is_ready(nodeId):
client.ready(nodeId)
time.sleep(1)
client.send(nodeId, OffsetRequest_v0(replica_id=-1, topics=[(t, [(partition, -1, 1)])]))
log_size = parse_logsize(t, partition, client.poll(timeout_ms=3000))
if log_size:
client.send(nodeId, OffsetCommitRequest_v0(consumer_group=g,
topics=[(t, [(partition, log_size, '')])]))
print client.poll()
def manual(g, t, log_size):
"""
手動重置offset為 manual_logsize的值,注意:所有分區都會重置
"""
client = KafkaClient(bootstrap_servers=servers, request_timeout_ms=3000)
partitions = client.cluster.partitions_for_topic(t)
for partition in partitions:
nodeId = client.cluster.leader_for_partition(TopicPartition(topic=t, partition=partition))
while not client.is_ready(nodeId):
client.ready(nodeId)
time.sleep(1)
client.send(nodeId, OffsetCommitRequest_v0(consumer_group=g,
topics=[(t, [(partition, log_size, '')])]))
print client.poll()
if __name__ == "__main__":
auto_lateset(gid, topic)
# manual(gid, topic, manual_logsize)
版權聲明:本文為CSDN部落客「weixin_34419326」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。
原文連結:https://blog.csdn.net/weixin_34419326/article/details/91603279