MENU

Fun & Interesting

How to choose the No. of Partitions for a Kafka Topic? Horizontal Scaling for Kafka Consumer

Knowledge Amplifier 3,842 2 years ago
Video Not Working? Fix It Now

This video explains , how to choose the no. of partitions for a kafka topic? Also the video explains , how to scale up Kafka Consumer horizontally by adding extra partitions & new consumer in Consumer Group. Prerequisite: ------------------- Consumer & Consumer Group in Kafka https://youtu.be/QTi_oQhxARs Kafka Consumer Groups CLI Demo | Kafka-Python https://youtu.be/yKzdum_vX7k Manual Offset Commits & Exactly-Once Once Processing in Kafka Consumer using Python https://youtu.be/XRRyO6b9Wqg Start Kafka: ------------------- F:/kafka_2.12-3.3.1/bin/windows/zookeeper-server-start.bat F:/kafka_2.12-3.3.1/config/zookeeper.properties F:/kafka_2.12-3.3.1/bin/windows/kafka-server-start.bat F:/kafka_2.12-3.3.1/config/server.properties F:/kafka_2.12-3.3.1/bin/windows/kafka-topics.bat --create --topic sensor_data_consumer --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 Producer Code: ------------------------ from time import sleep from json import dumps from kafka import KafkaProducer def custom_partitioner(key, all_partitions, available): """ Customer Kafka partitioner to get the partition corresponding to key :param key: partitioning key :param all_partitions: list of all partitions sorted by partition ID :param available: list of available partitions in no particular order :return: one of the values from all_partitions or available """ print("The key is : {}".format(key)) print("All partitions : {}".format(all_partitions)) print("After decoding of the key : {}".format(key.decode('UTF-8'))) return int(key.decode('UTF-8'))%len(all_partitions) producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'),partitioner=custom_partitioner) topic_name='sensor_data_consumer' for e in range(0,100): data={"number":e} producer.send(topic_name, key=str(e).encode(), value=data) sleep(10) Consumer Code: ------------------ from kafka import KafkaConsumer from kafka import TopicPartition , OffsetAndMetadata import kafka import json class MyConsumerRebalanceListener(kafka.ConsumerRebalanceListener): def on_partitions_revoked(self, revoked): print("Partitions %s revoked" % revoked) def on_partitions_assigned(self, assigned): print("Partitions %s assigned" % assigned) consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('utf-8')), group_id='demo112215sgtrjwrykvjh', auto_offset_reset='earliest', enable_auto_commit=False) listener = MyConsumerRebalanceListener() consumer.subscribe('sensor_data_consumer',listener=listener) for message in consumer: print(message) print("The value is : {}".format(message.value)) tp=TopicPartition(message.topic,message.partition) om = OffsetAndMetadata(message.offset+1, message.timestamp) consumer.commit({tp:om}) print('*' * 100) Code to change no. of Partitions in a Topic: ---------------------------------------------------------------- F:/kafka_2.12-3.3.1/bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic sensor_data_consumer --partitions 2 Check this playlist for more Data Engineering related videos: https://youtube.com/playlist?list=PLjfRmoYoxpNopPjdACgS5XTfdjyBcuGku Apache Kafka form scratch https://youtube.com/playlist?list=PLjfRmoYoxpNrs0VmIq6mOTqXP52RfZdRf Snowflake Complete Course from scratch with End-to-End Project with in-depth explanation-- https://doc.clickup.com/37466271/d/h/13qc4z-104/d4346819bd8d510 🙏🙏🙏🙏🙏🙏🙏🙏 YOU JUST NEED TO DO 3 THINGS to support my channel LIKE SHARE & SUBSCRIBE TO MY YOUTUBE CHANNEL

Comment