Increasing the concurrency of a Kafka consumer ๐งฎ
Multithreading is โthe ability of a central processing unit (
CPU
) to provide multiple threads of execution concurrently, supported by the operating system.โ
Imagine you have multiple producers publishing to a kafka topic.
Now, let’s consider the consumer(s) of this topic.
By default, kafka reads and processes messages in a single-threaded fashion.
It uses a poll()
operation to fetch data records from a topic.
Generally, this operation is smooth because it is preferable that the processing we do between each poll is fast and efficient.
If this processing becomes inefficient… well then, a few nasty things happen โ ๏ธ
The code used in this blog can be found here ๐ค
Kafka rebalancing⌗
When a consumer polls and tries to fetch data from a topic, it will also send a heartbeat ๐ to kafka. This is signalling to the cluster that it is in a healthy state, thereby extending its lease on consuming from the topic’s partitions.
If the length of time between heartbeats is too long (> session timeout period), then kafka will assume that this consumer is dead and will trigger a rebalance.
Explaining what happens under the hood during a rebalance is beyond the scope of this blog, but keeping it short, a rebalance occurs whenever the number of consumers in a consumer group changes ( for e.g. when a new consumer joins a consumer group, or when a consumer is marked as dead).
During this operation, Kafka will redistribute the partitions among healthy members of the group and all consumer instances (in the consumer group) will stop reading from partitions and committing any pending offsets. The time taken for a rebalance can be significant too.
So as you can imagine, constant rebalances can be problematic… they are essentially periods where we do not process any data!
What can we do about this?⌗
1๏ธโฃ Increase
max.poll.interval.ms
The most common cause of rebalances is that the processing times for some applications take too long ๐ด … it exceeds max.poll.interval.ms
and therefore triggers a rebalance.
Simple solution is to increase this value.
โ ๏ธThis can come with some side effects. Kafka will only revoke partitions when a consumer finishes its current poll. If the processing within this poll() takes hours, then you got it, rebalancing will take hours too.
2๏ธโฃ Improve processing performance by scaling up the number of consumers
The main way we scale data consumption in kafka is by adding more consumers to a consumer group.
The maximum number of consumers in a group is bound by the number of partitions in a topic. Extra consumers will just remain idle since all the partitions have been distributed among consumers.
poll()
processing times can be decreased by reducing the batch size. For e.g. instead of a single subscriber consuming/processing a batch of 100 messages in one poll()
call, we can have 10 subscribers consuming 20 messages. This will decrease poll times whilst also increasing throughput.
โ ๏ธ Solving slow processing by increasing the number of partitions/consumers = throwing ๐ต at the problem.
Multi-threaded consumer model⌗
In this blog, we will explore ways to increase the concurrency of a kafka consumer so that we can achieve more with a single consumer than simply increasing the number of partitions.
We will explore a multi-threaded consumer model.
The jist of this approach is as follows:
- A single main thread will call
poll()
and fetch data from multiple / single partition(s). - Once a batch of messages has been received by the main thread, we will deliver the messages to a pool of threads where the processing will be done in parallel.
- This pool of threads will be of
length=batch size
(if we have a batch of 20 messages, the total number of threads used 20) - Offset management becomes paramount in this approach
If we allowed child threads to commit offsets, this can lead to synchronisation issues (๐๏ธ conditions) and data loss or data duplication.
- Therefore, we will ensure that the main thread is the only thread that commits an offset. It will wait for all threads to finish processing. This is to ensure that all messages are processed and their offsets are committed in a coordinated and synchronous manner.
So without further ado, let’s get coding ๐งโ๐ป …
The code⌗
First things first, we need a kafka cluster up and running.
All of the processes used in this experiment will be containerised!
Let’s create a simple docker-compose.yml
file that will do all this magic for us ๐งโโ๏ธ
version: "3.9"
services:
kafka:
image: confluentinc/cp-kafka:7.3.0
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=1
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_DEFAULT_REPLICATION_FACTOR=1
restart: always
zookeeper:
image: confluentinc/cp-zookeeper:7.3.1
ports:
- "2181:2181"
environment:
- ZOOKEEPER_CLIENT_PORT=2181
restart: always
kafka-ui:
image: provectuslabs/kafka-ui
ports:
- "8080:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
- KAFKA_CLUSTERS_0_READONLY=false
python_producer:
build: .
environment:
- KAFKA_TOPIC=test # Kafka topic to publish
- KAFKA_BROKER=kafka:9092 # Kafka broker list
- PUBLISH_DURATION=60 # The duration the application will produce to kafka
command: python producer.py
deploy:
replicas: 2 # The number of producer replicas we want
python_consumer:
build: .
environment:
- KAFKA_TOPIC=test # Kafka topic to subscribe
- KAFKA_BROKER=kafka:9092 # Kafka broker list
- BATCH_SIZE=20 # Number of messages the consumer will poll for
- MULTI_THREAD=true # Use threading if true
command: python consumer.py
To build or pull the docker images and create the services, run:
docker-compose up -d
This should create 6 running services:
โฏ docker ps --format 'table {{.Names}}\t{{.Status}}'
NAMES STATUS
concurrency_kafka_zookeeper_1 Up About a minute
concurrency_kafka_kafka_1 Up About a minute
concurrency_kafka_kafka-ui_1 Up About a minute
concurrency_kafka_python_producer_1 Up About a minute
concurrency_kafka_python_producer_2 Up About a minute
concurrency_kafka_python_consumer_1 Up About a minute
The kafka dashboard can be accessed by going to http://localhost:8080.
ELI5: What’s happening?⌗
The producer will publish a JSON payload to a kafka topic for some set duration.
The payload will look like:
{
"sleep_time": 2
}
Upon receipt of this message, the consumer will sleep for the value specified in the payload.
The publish rate will be a measly ~
360msgs/minute
.
Threading disabled โ:
With threading disabled, the consumer will pull down messages in batches of 20 and for each message, a sleep command will be executed. This will be done in a sequential fashion by the main thread:
$ docker logs -f --tail 10 $(docker ps --filter name=consumer -q)
2023-02-03 17:21:07,805 MainThread/274912867520 INFO Received 20 messages in this batch
2023-02-03 17:21:07,809 MainThread/274912867520 INFO Going to sleep for 2 s
2023-02-03 17:21:09,811 MainThread/274912867520 INFO Going to sleep for 3 s
2023-02-03 17:21:12,815 MainThread/274912867520 INFO Going to sleep for 2 s
2023-02-03 17:21:14,818 MainThread/274912867520 INFO Going to sleep for 2 s
2023-02-03 17:21:16,823 MainThread/274912867520 INFO Going to sleep for 2 s
2023-02-03 17:21:18,828 MainThread/274912867520 INFO Going to sleep for 1 s
2023-02-03 17:21:19,834 MainThread/274912867520 INFO Going to sleep for 2 s
2023-02-03 17:21:21,839 MainThread/274912867520 INFO Going to sleep for 3 s
2023-02-03 17:21:24,845 MainThread/274912867520 INFO Going to sleep for 3 s
After the batch has been processed, the last offset of the batch will be committed by the main thread. This is letting kafka know that this batch has been processed successfully.
Threading enabled ๐ข:
With threading enabled, the sleep command will be handed off to multiple child threads for execution.
For e.g. consider the consumer pulling down a batch of 20 messages. Each sleep command will be executed concurrently within the process via child threads:
$ docker logs -f --tail 10 $(docker ps --filter name=consumer -q)
2023-02-03 17:24:00,102 MainThread/274912867520 INFO Received 20 messages in this batch
2023-02-03 17:24:00,106 Thread-1 (process_msg)/275507214080 INFO Going to sleep for 3 s
2023-02-03 17:24:00,109 Thread-2 (process_msg)/275649840896 INFO Going to sleep for 1 s
2023-02-03 17:24:00,111 Thread-3 (process_msg)/275792467712 INFO Going to sleep for 3 s
2023-02-03 17:24:00,113 Thread-4 (process_msg)/275935094528 INFO Going to sleep for 3 s
2023-02-03 17:24:00,116 Thread-5 (process_msg)/276077721344 INFO Going to sleep for 1 s
2023-02-03 17:24:00,118 Thread-6 (process_msg)/276220348160 INFO Going to sleep for 1 s
2023-02-03 17:24:00,120 Thread-7 (process_msg)/276362974976 INFO Going to sleep for 1 s
2023-02-03 17:24:00,122 Thread-8 (process_msg)/276505601792 INFO Going to sleep for 3 s
...
2023-02-03 17:24:39,511 MainThread/274912867520 INFO Batch 12 processing time: 3.0180017948150635 seconds
2023-02-03 17:24:39,515 MainThread/274912867520 INFO Committing offsets for batch: 12
The main thread would wait for all threads to finish executing before committing the last offset of the batch to kafka.
It’s possible to monitor the number of threads the python container is spawning by running:
$ docker stats $(docker ps --filter name=consumer -q) | awk '{print $NF}'
PIDS
26
PIDS
17
PIDS
17
PIDS
9
PIDS
6
PIDS
26
Now, let’s delve into the python logic ๐ต๏ธ…
The producer⌗
๐
$ cat producer.py
import json
import logging
import os
import random
import time
from confluent_kafka import Producer
def delivery_report(err, msg):
if err is not None:
logging.error('Message delivery failed: {}'.format(err))
else:
logging.info('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
def produce_loop(producer,topic):
# start time
start_time = time.time()
while (time.time() - start_time) < interval:
for i in range(3):
sleep_time = random.randint(1, 3)
payload = {'sleep_time': sleep_time}
producer.produce(topic, value=json.dumps(payload).encode('utf-8'), callback=delivery_report)
producer.poll(0)
time.sleep(1)
if __name__ == '__main__':
# Get how long the producer will publish to kafka from an environment variable
interval = int(os.environ.get('PUBLISH_DURATION', '60'))
# Get broker details from environment variable
broker = os.getenv('KAFKA_BROKER', 'localhost:9092')
# Get topic name from environment variable
topic = os.getenv('KAFKA_TOPIC', 'test')
conf = {
'bootstrap.servers': broker,
'client.id': 'python_producer'
}
logging.basicConfig(
format='%(asctime)s %(threadName)s/%(thread)d %(levelname)s %(message)s',
level=logging.INFO
)
logger = logging.getLogger('python_producer')
producer = Producer(conf)
produce_loop(producer,topic)
In a nutshell:
- The code creates a Kafka producer which sends data to a Kafka broker.
- It uses the
confluent_kafka
library to interact with the broker.- The publish interval,broker address and topic are determined by the environment variables:
PUBLISH DURATION
,KAFKA_BROKER
andKAFKA_TOPIC
with default values60
,localhost:9092
andtest
, respectively- These env vars are set inside
docker-compose.yml
(You can also include these in a.env
var)- The
produce_loop
function generates a JSON payload which will be published to the given topic 5 times every second. This function will continue to run until the publish interval has elapsed.- The
delivery_report
function is a callback that logs a message upon successfuly delivery or logs an error if delivery fials.
The consumer⌗
๐
$ cat consumer.py
import logging
import json
import os
import time
import threading
from confluent_kafka import Consumer, KafkaError
def process_msg(msg):
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
logging.info('Reached end of partition event for %s', msg.topic())
else:
logging.error('Error while polling for messages: %s', msg.error())
else:
# Log the received message
message = json.loads(msg.value().decode('utf-8'))
sleep_time = message['sleep_time']
logging.info('Going to sleep for %d s', sleep_time)
# Sleep for sleep_time seconds
time.sleep(sleep_time)
def consume_messages(config,topic,size,multi_thread):
# Create the Kafka consumer instance
consumer = Consumer(config)
# Subscribe to the topic
consumer.subscribe([topic])
# Set a batch number to track batches
batch_num = 0
# Continuously poll for new messages
while True:
logging.info('Number of threads currently running: %d', len(threading.enumerate()))
msg_batch = consumer.consume(num_messages=size, timeout=1.0)
# Log the number of messages in the batch
logging.info('Received %d messages in this batch', len(msg_batch))
if len(msg_batch) == 0:
logging.info('No Messages to process')
continue
start_time = time.time()
if multi_thread == 'true':
threads = []
for msg in msg_batch:
t = threading.Thread(target=process_msg, args=(msg,))
threads.append(t)
t.start()
for t in threads:
t.join()
else:
for msg in msg_batch:
process_msg(msg)
end_time = time.time()
elapsed_time = end_time - start_time
logging.info("Batch %d processing time: %s seconds", batch_num, elapsed_time)
logging.info('Committing offsets for batch: %d', batch_num)
# Commit the offset of the last message in the batch
consumer.commit(msg_batch[-1])
# Increment batch number
batch_num += 1
if __name__ == '__main__':
# Get the Kafka broker from an environment variable
kafka_broker = os.environ.get('KAFKA_BROKER')
# Get the recommended batch size from an environment variable
batch_size = int(os.environ.get('BATCH_SIZE'))
# Get topic name from environment variable
topic = os.getenv('KAFKA_TOPIC', 'test')
# Check if multithreading is enabled
multi_thread = os.environ.get('MULTI_THREAD')
# Set up logging
logging.basicConfig(
format='%(asctime)s %(threadName)s/%(thread)d %(levelname)s %(message)s',
level=logging.INFO
)
# Define the Kafka consumer configuration
conf = {
'bootstrap.servers': kafka_broker,
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
}
consume_messages(conf,topic,batch_size,multi_thread)
In a nutshell:
- The
consume_messages
function creates a consumer instance and subscribes to a specified topic.- It uses a while loop to continuously poll for messages(
long-lived consumer
), and processes messages in batches.- If multi-threading is enabled, the function starts a new thread for each message in the batch, and each thread processes a single message.
- If multi-threading is disabled, the function processes all messages in the batch sequentially in the same thread.
- After processing the messages in a batch, the
main thread
commits the offsets of the processed messages.- The
process_msg
function is used to process individual messages. It will extract a sleep time from the message, and sleeps for the specified sleep time.
The results⌗
Using a single-threaded approach, all messages were processed in ~12 minutes, with each batch taking approximately: 39.8s
๐
$ batches=$(docker logs `docker ps --filter name=consumer -q` 2>&1 | grep 'processing time' | awk '{print $2" | "$5" "$6" | " $9}')
$ echo $batches
17:35:04,625 | Batch 0 | 39.11198592185974
17:35:45,752 | Batch 1 | 41.103530168533325
17:36:24,858 | Batch 2 | 39.10003614425659
17:37:06,002 | Batch 3 | 41.13357758522034
17:37:46,128 | Batch 4 | 40.11229872703552
17:38:31,231 | Batch 5 | 45.09226679801941
17:39:10,361 | Batch 6 | 39.126227617263794
17:39:52,436 | Batch 7 | 42.07227873802185
17:40:27,602 | Batch 8 | 35.14990282058716
17:41:02,703 | Batch 9 | 35.093626976013184
17:41:40,853 | Batch 10 | 38.14328336715698
17:42:20,943 | Batch 11 | 40.08165717124939
17:42:59,036 | Batch 12 | 38.077364921569824
17:43:45,147 | Batch 13 | 46.1059627532959
17:44:22,259 | Batch 14 | 37.10597634315491
17:45:03,375 | Batch 15 | 41.10637903213501
17:45:46,550 | Batch 16 | 43.16052007675171
17:46:25,646 | Batch 17 | 39.08514332771301
#Get the average time for a batch to be processed
echo $batches | awk '{sum+=$NF} END {print sum/NR}'
39.8016
Using a multi-threaded approach, all messages were processed ~ 55 seconds, with each batch taking approximately: 3.03s
๐ฅ
$ batches=$(docker logs `docker ps --filter name=consumer -q` 2>&1 | grep 'processing time' | awk '{print $2" | "$5" "$6" | " $9}')
$ echo $batches
17:57:50,587 | Batch 0 | 3.087355852127075
17:57:53,633 | Batch 1 | 3.035799741744995
17:57:56,660 | Batch 2 | 3.024221897125244
17:57:59,701 | Batch 3 | 3.032158374786377
17:58:02,738 | Batch 4 | 3.030973434448242
17:58:05,781 | Batch 5 | 3.036787271499634
17:58:08,813 | Batch 6 | 3.0284106731414795
17:58:11,835 | Batch 7 | 3.017594814300537
17:58:14,854 | Batch 8 | 3.017782688140869
17:58:17,884 | Batch 9 | 3.02677059173584
17:58:20,920 | Batch 10 | 3.0328853130340576
17:58:23,953 | Batch 11 | 3.0289969444274902
17:58:26,973 | Batch 12 | 3.0188632011413574
17:58:30,006 | Batch 13 | 3.0296244621276855
17:58:33,041 | Batch 14 | 3.0310428142547607
17:58:36,089 | Batch 15 | 3.0452497005462646
17:58:39,110 | Batch 16 | 3.0187816619873047
17:58:42,163 | Batch 17 | 3.0502078533172607
$ echo $batches | awk '{sum+=$NF} END {print sum/NR}'
3.03332
What’s better and more important than the improvement in performance?
The committal of the offset was synchronised and performed by the main thread ๐
Pretty good, eh? ๐