This video explains , how to choose the no. of partitions for a kafka topic?
Also the video explains , how to scale up Kafka Consumer horizontally by adding extra partitions & new consumer in Consumer Group.
Prerequisite:
-------------------
Consumer & Consumer Group in Kafka
https://youtu.be/QTi_oQhxARs
Kafka Consumer Groups CLI Demo | Kafka-Python
https://youtu.be/yKzdum_vX7k
Manual Offset Commits & Exactly-Once Once Processing in Kafka Consumer using Python
https://youtu.be/XRRyO6b9Wqg
Start Kafka:
-------------------
F:/kafka_2.12-3.3.1/bin/windows/zookeeper-server-start.bat F:/kafka_2.12-3.3.1/config/zookeeper.properties
F:/kafka_2.12-3.3.1/bin/windows/kafka-server-start.bat F:/kafka_2.12-3.3.1/config/server.properties
F:/kafka_2.12-3.3.1/bin/windows/kafka-topics.bat --create --topic sensor_data_consumer --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
Producer Code:
------------------------
from time import sleep
from json import dumps
from kafka import KafkaProducer
def custom_partitioner(key, all_partitions, available):
"""
Customer Kafka partitioner to get the partition corresponding to key
:param key: partitioning key
:param all_partitions: list of all partitions sorted by partition ID
:param available: list of available partitions in no particular order
:return: one of the values from all_partitions or available
"""
print("The key is : {}".format(key))
print("All partitions : {}".format(all_partitions))
print("After decoding of the key : {}".format(key.decode('UTF-8')))
return int(key.decode('UTF-8'))%len(all_partitions)
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'),partitioner=custom_partitioner)
topic_name='sensor_data_consumer'
for e in range(0,100):
data={"number":e}
producer.send(topic_name, key=str(e).encode(), value=data)
sleep(10)
Consumer Code:
------------------
from kafka import KafkaConsumer
from kafka import TopicPartition , OffsetAndMetadata
import kafka
import json
class MyConsumerRebalanceListener(kafka.ConsumerRebalanceListener):
def on_partitions_revoked(self, revoked):
print("Partitions %s revoked" % revoked)
def on_partitions_assigned(self, assigned):
print("Partitions %s assigned" % assigned)
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='demo112215sgtrjwrykvjh', auto_offset_reset='earliest',
enable_auto_commit=False)
listener = MyConsumerRebalanceListener()
consumer.subscribe('sensor_data_consumer',listener=listener)
for message in consumer:
print(message)
print("The value is : {}".format(message.value))
tp=TopicPartition(message.topic,message.partition)
om = OffsetAndMetadata(message.offset+1, message.timestamp)
consumer.commit({tp:om})
print('*' * 100)
Code to change no. of Partitions in a Topic:
----------------------------------------------------------------
F:/kafka_2.12-3.3.1/bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic sensor_data_consumer --partitions 2
Check this playlist for more Data Engineering related videos:
https://youtube.com/playlist?list=PLjfRmoYoxpNopPjdACgS5XTfdjyBcuGku
Apache Kafka form scratch
https://youtube.com/playlist?list=PLjfRmoYoxpNrs0VmIq6mOTqXP52RfZdRf
Snowflake Complete Course from scratch with End-to-End Project with in-depth explanation--
https://doc.clickup.com/37466271/d/h/13qc4z-104/d4346819bd8d510
🙏🙏🙏🙏🙏🙏🙏🙏
YOU JUST NEED TO DO
3 THINGS to support my channel
LIKE
SHARE
&
SUBSCRIBE
TO MY YOUTUBE CHANNEL