从kafka和zookeeper中获取生产和消费偏移量
-
特殊说明
- 该命令是使用python进行编译,需要使用centos7系统上进行使用。
-
命令详细
[root@mongodb_1 get_offsets_num]# ./get_offsets_num -h
usage: get_offsets_num [-h] [-k KAFKA_HOST] [-z ZOOKEEPER_HOST]
[-m INTERVAL_MINUTES]
Usage of argparse
optional arguments:
-h, --help show this help message and exit
-k KAFKA_HOST, --kafka_host KAFKA_HOST
需要输入kafka:端口
-z ZOOKEEPER_HOST, --zookeeper_host ZOOKEEPER_HOST
需要输入zookeeper:端口
-m INTERVAL_MINUTES, --Interval_minutes INTERVAL_MINUTES
间隔分钟
- 命令执行
[root@mongodb_1 get_offsets_num]# ./get_offsets_num_v2.py -k 10.130.25.77:9092 -z 10.130.25.79:2181
Interval 1 minutes sleep
=======================================================================================
kafka offsets: agent 2574552 2574552
zookeeper offsets: agent 2574552 2574552
agent kafka offsets num: 0 storm offsets num: 0 Actual consumption: 0
=======================================================================================
kafka offsets: record 89110 89110
zookeeper offsets: record 89110 89110
record kafka offsets num: 0 storm offsets num: 0 Actual consumption: 0
=======================================================================================
- 代码详情
#!/usr/local/python3/bin/python3
import os, time,json,argparse
from kazoo.client import KazooClient
from kafka3 import KafkaConsumer, TopicPartition
def get_zoo_consumer_info(Topology):
Topology_num = 0
zk_cli.start()
path = "/stormOffset/" + Topology + "/partition_0"
if zk_cli.exists(path):
str_data, stat = zk_cli.get(path)
str_data = json.loads(str_data)
Topology_num = str_data.get("offset")
#print("zookeeper now " + path + " offsets: " + str(Topology_num) )
else:
print("Path " + path + " does not exist.")
return Topology_num
def get_kafka_consumer_info(server, topic):
partition = 0
tp = TopicPartition(topic, partition)
end_offset = server.end_offsets([tp])[tp]
#print("kafka topic " + topic + " partition " + str(partition) + " offsets: " + str(end_offset))
return end_offset
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Usage of argparse')
parser.add_argument('-k','--kafka_host', type=str, default="192.168.127.10:9092",help='需要输入kafka:端口')
parser.add_argument('-z','--zookeeper_host', type=str, default="192.168.127.10:2181",help='需要输入zookeeper:端口')
parser.add_argument('-m','--Interval_minutes', type=int, default="1",help='间隔分钟')
args = parser.parse_args()
kafka_host= args.kafka_host
zookeer_host= args.zookeeper_host
Kafka_production_topics = "agent,record"
Zoo_consumption_topics= "agentTopology,recordTopology"
Interval_minutes = args.Interval_minutes
try:
zk_cli = KazooClient(hosts=zookeer_host)
#print("init zookeeper " + zookeer_host + " conn ok")
except Exception as e:
print("init zookeeper conn error: "+ str(e))
try:
#kafka_server = KafkaConsumer(bootstrap_servers=kafka_host)
kafka_server = KafkaConsumer(bootstrap_servers=kafka_host)
#print("init kafka " + kafka_host + " conn ok")
except Exception as e:
print("init kafka conn error: "+ str(e))
zoo_offset = {}
kafka_offset = {}
Kafka_production_topics_list = Kafka_production_topics.split(",")
Kafka_production_topics_list_2 = Kafka_production_topics.split(",")
Zoo_consumption_topics_list = Zoo_consumption_topics.split(",")
Zoo_consumption_topics_list_2 = Zoo_consumption_topics.split(",")
for i in range(0,len(Kafka_production_topics_list)):
kafka_topics = Kafka_production_topics_list.pop()
get_kafka_offset_num = get_kafka_consumer_info(kafka_server,kafka_topics)
kafka_offset[kafka_topics]=get_kafka_offset_num
zoo_topics = Zoo_consumption_topics_list.pop()
get_zoo_offset_num = get_zoo_consumer_info(zoo_topics)
zoo_offset[zoo_topics]= get_zoo_offset_num
print("Interval " + str(Interval_minutes) + " minutes sleep")
print("=======================================================================================")
time.sleep(int(Interval_minutes) * 60)
for i in range(0,len(Kafka_production_topics_list_2)):
kafka_topics = Kafka_production_topics_list_2.pop()
get_kafka_offset_num = get_kafka_consumer_info(kafka_server,kafka_topics)
last_kafka_num = kafka_offset.get(kafka_topics)
minutes_kafka_offset_num = get_kafka_offset_num - last_kafka_num
zoo_topics = Zoo_consumption_topics_list_2.pop()
get_zoo_offset_num = get_zoo_consumer_info(zoo_topics)
last_zoo_num = zoo_offset.get(zoo_topics)
minutes_zoo_offset_num = get_zoo_offset_num - last_zoo_num
Difference = minutes_kafka_offset_num - minutes_zoo_offset_num
print("kafka offsets:",kafka_topics,get_kafka_offset_num,last_kafka_num)
print("zookeeper offsets:",kafka_topics,get_zoo_offset_num,last_zoo_num)
print(kafka_topics + " kafka offsets num: " + str(minutes_kafka_offset_num) + " storm offsets num: " + str(minutes_zoo_offset_num) + " Actual consumption: " + str(Difference))
print("=======================================================================================")
zk_cli.stop()
# 关闭消费者连接
kafka_server.close()
评论区