partition_consumer.py
- # -*- coding: utf-8 -*-
- """
- This module provide kafka partition partition consumer demo example.
- """
- import threading
- from kafka.client import KafkaClient
- from kafka.consumer import SimpleConsumer
- class Consumer(threading.Thread):
- daemon = True
- def __init__(self,partition_index):
- threading.Thread.__init__(self)
- self.part = [partition_index]
- self.__offset = 0
- def run(self):
- client = KafkaClient("10.206.216.13:19092,10.206.212.14:19092,10.206.209.25:19092")
- consumer = SimpleConsumer(client, "test-group", "jiketest",auto_commit=False,partitions=self.part)
- consumer.seek(0,0)
- while True:
- message = consumer.get_message(True,60)
- self.__offset = message.offset
- print message.message.value
- • 组 (Group) 消费模型的 Python 实现;
- main.py
- # -*- coding: utf-8 -*-
- """
- This module provide kafka partition and group consumer demo example.
- """
- import logging, time
- import group_consumer
- def main():
- conusmer_thread = group_consumer.Consumer()
- conusmer_thread.start()
- time.sleep(500000)
- if __name__ == '__main__':
- #logging.basicConfig(
- # format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
- # level=logging.INFO
- # )
- main()
- group_consumer.py
- # -*- coding: utf-8 -*-
- """
- This module provide kafka partition group consumer demo example.
- """
- import threading
- from kafka.client import KafkaClient
- from kafka.consumer import SimpleConsumer
- class Consumer(threading.Thread):
- daemon = True
- def run(self):
- client = KafkaClient("10.206.216.13:19092,10.206.212.14:19092,10.206.209.25:19092")
- consumer = SimpleConsumer(client, "test-group", "jiketest")
- for message in consumer:
- print(message.message.value)
来源: https://www.cnblogs.com/pony1223/p/9757878.html