天天看點

kafka将offset自動重置為最新

 1.可以将kafka中的偏移量自動重置為最新的

 2.使用于kafka有積壓,但是也不想處理積壓,直接消費最新的資料

 3.此版本隻支援offset存儲在zk中, 暫未提供offset存儲在kafka中的版本

# -*- coding:utf-8 -*-
import time

import sys
from kafka.client_async import KafkaClient
from kafka.protocol.commit import OffsetCommitRequest_v0
from kafka.protocol.offset import OffsetRequest_v0, OffsetResponse_v0
from kafka.structs import TopicPartition

servers = '192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092'
gid = 'group_id'
topic = 'topic'

# 手動重置時需要配置,同時修改main方法中調用方法
manual_logsize = sys.maxint


def parse_logsize(t, p, responses):
    """
        單個broker中單個partition的logsize
    :param responses:
    :param p:
    :param t:
    :return:
    """
    for response in responses:
        if not isinstance(response, OffsetResponse_v0):
            continue
        tps = response.topics
        tpc = tps[0][0]
        partition_list = tps[0][1]
        parti = partition_list[0][0]
        if tpc == t and parti == p and partition_list[0][1] == 0:
            logsize_list = partition_list[0][2]
            logsize = logsize_list[0]
            return logsize
    return None


def auto_lateset(g, t):
	"""
		自動重置為最新的offset
	"""
    client = KafkaClient(bootstrap_servers=servers, request_timeout_ms=3000)
    partitions = client.cluster.partitions_for_topic(t)
    for partition in partitions:
        nodeId = client.cluster.leader_for_partition(TopicPartition(topic=t, partition=partition))
        while not client.is_ready(nodeId):
            client.ready(nodeId)
            time.sleep(1)
        client.send(nodeId, OffsetRequest_v0(replica_id=-1, topics=[(t, [(partition, -1, 1)])]))
        log_size = parse_logsize(t, partition, client.poll(timeout_ms=3000))
        if log_size:
            client.send(nodeId, OffsetCommitRequest_v0(consumer_group=g,
                                                       topics=[(t, [(partition, log_size, '')])]))
            print client.poll()


def manual(g, t, log_size):
	"""
		手動重置offset為 manual_logsize的值,注意:所有分區都會重置
	"""
    client = KafkaClient(bootstrap_servers=servers, request_timeout_ms=3000)
    partitions = client.cluster.partitions_for_topic(t)
    for partition in partitions:
        nodeId = client.cluster.leader_for_partition(TopicPartition(topic=t, partition=partition))
        while not client.is_ready(nodeId):
            client.ready(nodeId)
            time.sleep(1)
        client.send(nodeId, OffsetCommitRequest_v0(consumer_group=g,
                                                   topics=[(t, [(partition, log_size, '')])]))
        print client.poll()


if __name__ == "__main__":
    auto_lateset(gid, topic)
    # manual(gid, topic, manual_logsize)
           

版權聲明:本文為CSDN部落客「weixin_34419326」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。

原文連結:https://blog.csdn.net/weixin_34419326/article/details/91603279