Multithreading is โ€œthe ability of a central processing unit (CPU) to provide multiple threads of execution concurrently, supported by the operating system.โ€

Imagine you have multiple producers publishing to a kafka topic.

Now, let’s consider the consumer(s) of this topic.

By default, kafka reads and processes messages in a single-threaded fashion.

It uses a poll() operation to fetch data records from a topic.

Generally, this operation is smooth because it is preferable that the processing we do between each poll is fast and efficient.

If this processing becomes inefficient… well then, a few nasty things happen โ˜ ๏ธ

The code used in this blog can be found here ๐ŸคŒ


Kafka rebalancing

When a consumer polls and tries to fetch data from a topic, it will also send a heartbeat ๐Ÿ’“ to kafka. This is signalling to the cluster that it is in a healthy state, thereby extending its lease on consuming from the topic’s partitions.

If the length of time between heartbeats is too long (> session timeout period), then kafka will assume that this consumer is dead and will trigger a rebalance.

Explaining what happens under the hood during a rebalance is beyond the scope of this blog, but keeping it short, a rebalance occurs whenever the number of consumers in a consumer group changes ( for e.g. when a new consumer joins a consumer group, or when a consumer is marked as dead).

During this operation, Kafka will redistribute the partitions among healthy members of the group and all consumer instances (in the consumer group) will stop reading from partitions and committing any pending offsets. The time taken for a rebalance can be significant too.

So as you can imagine, constant rebalances can be problematic… they are essentially periods where we do not process any data!


What can we do about this?

1๏ธโƒฃ Increase max.poll.interval.ms

The most common cause of rebalances is that the processing times for some applications take too long ๐Ÿ˜ด … it exceeds max.poll.interval.ms and therefore triggers a rebalance.

Simple solution is to increase this value.

โš ๏ธThis can come with some side effects. Kafka will only revoke partitions when a consumer finishes its current poll. If the processing within this poll() takes hours, then you got it, rebalancing will take hours too.

2๏ธโƒฃ Improve processing performance by scaling up the number of consumers

The main way we scale data consumption in kafka is by adding more consumers to a consumer group.

The maximum number of consumers in a group is bound by the number of partitions in a topic. Extra consumers will just remain idle since all the partitions have been distributed among consumers.

poll() processing times can be decreased by reducing the batch size. For e.g. instead of a single subscriber consuming/processing a batch of 100 messages in one poll() call, we can have 10 subscribers consuming 20 messages. This will decrease poll times whilst also increasing throughput.

โš ๏ธ Solving slow processing by increasing the number of partitions/consumers = throwing ๐Ÿ’ต at the problem.



Multi-threaded consumer model

In this blog, we will explore ways to increase the concurrency of a kafka consumer so that we can achieve more with a single consumer than simply increasing the number of partitions.

We will explore a multi-threaded consumer model.

The jist of this approach is as follows:

  • A single main thread will call poll() and fetch data from multiple / single partition(s).
  • Once a batch of messages has been received by the main thread, we will deliver the messages to a pool of threads where the processing will be done in parallel.
  • This pool of threads will be of length=batch size (if we have a batch of 20 messages, the total number of threads used 20)
  • Offset management becomes paramount in this approach

If we allowed child threads to commit offsets, this can lead to synchronisation issues (๐ŸŽ๏ธ conditions) and data loss or data duplication.

  • Therefore, we will ensure that the main thread is the only thread that commits an offset. It will wait for all threads to finish processing. This is to ensure that all messages are processed and their offsets are committed in a coordinated and synchronous manner.

So without further ado, let’s get coding ๐Ÿง‘โ€๐Ÿ’ป …


The code

First things first, we need a kafka cluster up and running.

All of the processes used in this experiment will be containerised!

Let’s create a simple docker-compose.ymlfile that will do all this magic for us ๐Ÿง™โ€โ™‚๏ธ

version: "3.9"
services:
  kafka:
    image: confluentinc/cp-kafka:7.3.0
    ports:
      - "9092:9092"
    environment:
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_BROKER_ID=1
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_DEFAULT_REPLICATION_FACTOR=1
    restart: always

  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.1
    ports:
      - "2181:2181"
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
    restart: always

  kafka-ui:
    image: provectuslabs/kafka-ui
    ports:
      - "8080:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
      - KAFKA_CLUSTERS_0_READONLY=false

  python_producer:
    build: .
    environment:
      - KAFKA_TOPIC=test            # Kafka topic to publish
      - KAFKA_BROKER=kafka:9092     # Kafka broker list
      - PUBLISH_DURATION=60         # The duration the application will produce to kafka
    command: python producer.py
    deploy:
      replicas: 2                   # The number of producer replicas we want

  python_consumer:
    build: .
    environment:
      - KAFKA_TOPIC=test            # Kafka topic to subscribe
      - KAFKA_BROKER=kafka:9092     # Kafka broker list
      - BATCH_SIZE=20               # Number of messages the consumer will poll for
      - MULTI_THREAD=true           # Use threading if true
    command: python consumer.py

To build or pull the docker images and create the services, run:

docker-compose up -d

This should create 6 running services:

โฏ docker ps --format 'table {{.Names}}\t{{.Status}}'
NAMES                                 STATUS
concurrency_kafka_zookeeper_1         Up About a minute
concurrency_kafka_kafka_1             Up About a minute
concurrency_kafka_kafka-ui_1          Up About a minute
concurrency_kafka_python_producer_1   Up About a minute
concurrency_kafka_python_producer_2   Up About a minute
concurrency_kafka_python_consumer_1   Up About a minute

The kafka dashboard can be accessed by going to http://localhost:8080.

img


ELI5: What’s happening?

The producer will publish a JSON payload to a kafka topic for some set duration.

The payload will look like:

 {
  "sleep_time": 2
 }  

Upon receipt of this message, the consumer will sleep for the value specified in the payload.

The publish rate will be a measly ~360msgs/minute.


Threading disabled โŒ:

With threading disabled, the consumer will pull down messages in batches of 20 and for each message, a sleep command will be executed. This will be done in a sequential fashion by the main thread:

$ docker logs -f --tail 10 $(docker ps --filter name=consumer -q)
2023-02-03 17:21:07,805 MainThread/274912867520 INFO Received 20 messages in this batch
2023-02-03 17:21:07,809 MainThread/274912867520 INFO Going to sleep for 2 s
2023-02-03 17:21:09,811 MainThread/274912867520 INFO Going to sleep for 3 s
2023-02-03 17:21:12,815 MainThread/274912867520 INFO Going to sleep for 2 s
2023-02-03 17:21:14,818 MainThread/274912867520 INFO Going to sleep for 2 s
2023-02-03 17:21:16,823 MainThread/274912867520 INFO Going to sleep for 2 s
2023-02-03 17:21:18,828 MainThread/274912867520 INFO Going to sleep for 1 s
2023-02-03 17:21:19,834 MainThread/274912867520 INFO Going to sleep for 2 s
2023-02-03 17:21:21,839 MainThread/274912867520 INFO Going to sleep for 3 s
2023-02-03 17:21:24,845 MainThread/274912867520 INFO Going to sleep for 3 s

After the batch has been processed, the last offset of the batch will be committed by the main thread. This is letting kafka know that this batch has been processed successfully.


Threading enabled ๐ŸŸข:

With threading enabled, the sleep command will be handed off to multiple child threads for execution.

For e.g. consider the consumer pulling down a batch of 20 messages. Each sleep command will be executed concurrently within the process via child threads:

$ docker logs -f --tail 10 $(docker ps --filter name=consumer -q)
2023-02-03 17:24:00,102 MainThread/274912867520 INFO Received 20 messages in this batch
2023-02-03 17:24:00,106 Thread-1 (process_msg)/275507214080 INFO Going to sleep for 3 s
2023-02-03 17:24:00,109 Thread-2 (process_msg)/275649840896 INFO Going to sleep for 1 s
2023-02-03 17:24:00,111 Thread-3 (process_msg)/275792467712 INFO Going to sleep for 3 s
2023-02-03 17:24:00,113 Thread-4 (process_msg)/275935094528 INFO Going to sleep for 3 s
2023-02-03 17:24:00,116 Thread-5 (process_msg)/276077721344 INFO Going to sleep for 1 s
2023-02-03 17:24:00,118 Thread-6 (process_msg)/276220348160 INFO Going to sleep for 1 s
2023-02-03 17:24:00,120 Thread-7 (process_msg)/276362974976 INFO Going to sleep for 1 s
2023-02-03 17:24:00,122 Thread-8 (process_msg)/276505601792 INFO Going to sleep for 3 s
...
2023-02-03 17:24:39,511 MainThread/274912867520 INFO Batch 12 processing time: 3.0180017948150635 seconds
2023-02-03 17:24:39,515 MainThread/274912867520 INFO Committing offsets for batch: 12

The main thread would wait for all threads to finish executing before committing the last offset of the batch to kafka.

It’s possible to monitor the number of threads the python container is spawning by running:

$ docker stats $(docker ps --filter name=consumer -q) | awk '{print $NF}'
PIDS
26
PIDS
17
PIDS
17
PIDS
9
PIDS
6
PIDS
26

Now, let’s delve into the python logic ๐Ÿ•ต๏ธ…


The producer

๐Ÿ‘‡

$ cat producer.py
import json
import logging
import os
import random
import time
from confluent_kafka import Producer

def delivery_report(err, msg):
    if err is not None:
        logging.error('Message delivery failed: {}'.format(err))
    else:
        logging.info('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


def produce_loop(producer,topic):
    # start time
    start_time = time.time()
    
    while (time.time() - start_time) < interval:
        for i in range(3):
            sleep_time = random.randint(1, 3)
            payload = {'sleep_time': sleep_time}
            producer.produce(topic, value=json.dumps(payload).encode('utf-8'), callback=delivery_report)
            producer.poll(0)
        time.sleep(1)


if __name__ == '__main__':
    
    # Get how long the producer will publish to kafka from an environment variable
    interval = int(os.environ.get('PUBLISH_DURATION', '60'))
    
    # Get broker details from environment variable
    broker = os.getenv('KAFKA_BROKER', 'localhost:9092')

    # Get topic name from environment variable
    topic = os.getenv('KAFKA_TOPIC', 'test')
    
    conf = {
        'bootstrap.servers': broker,
        'client.id': 'python_producer'
    }

    logging.basicConfig(
        format='%(asctime)s %(threadName)s/%(thread)d %(levelname)s %(message)s',
        level=logging.INFO
    )
    logger = logging.getLogger('python_producer')

    producer = Producer(conf)

    produce_loop(producer,topic)

In a nutshell:

  • The code creates a Kafka producer which sends data to a Kafka broker.
  • It uses the confluent_kafka library to interact with the broker.
  • The publish interval,broker address and topic are determined by the environment variables: PUBLISH DURATION,KAFKA_BROKER and KAFKA_TOPIC with default values 60,localhost:9092 and test, respectively
  • These env vars are set inside docker-compose.yml (You can also include these in a .env var)
  • The produce_loop function generates a JSON payload which will be published to the given topic 5 times every second. This function will continue to run until the publish interval has elapsed.
  • The delivery_report function is a callback that logs a message upon successfuly delivery or logs an error if delivery fials.

The consumer

๐Ÿ‘‡

$ cat consumer.py
import logging
import json
import os
import time
import threading
from confluent_kafka import Consumer, KafkaError


def process_msg(msg):
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            logging.info('Reached end of partition event for %s', msg.topic())
        else:
            logging.error('Error while polling for messages: %s', msg.error())
    else:
        # Log the received message
        message = json.loads(msg.value().decode('utf-8'))
        sleep_time = message['sleep_time']
        logging.info('Going to sleep for %d s', sleep_time)
        # Sleep for sleep_time seconds
        time.sleep(sleep_time)


def consume_messages(config,topic,size,multi_thread):
    # Create the Kafka consumer instance
    consumer = Consumer(config)

    # Subscribe to the topic
    consumer.subscribe([topic]) 
    
    # Set a batch number to track batches
    batch_num  = 0
    
    # Continuously poll for new messages
    while True:
        logging.info('Number of threads currently running: %d', len(threading.enumerate()))
        msg_batch = consumer.consume(num_messages=size, timeout=1.0)
        # Log the number of messages in the batch
        logging.info('Received %d messages in this batch', len(msg_batch))
        if len(msg_batch) == 0:
            logging.info('No Messages to process')
            continue
        
        start_time = time.time()
        
        if multi_thread == 'true':
            threads = []
            for msg in msg_batch:
                t = threading.Thread(target=process_msg, args=(msg,))
                threads.append(t)
                t.start()
                
            for t in threads:
                t.join()
                
        else:
            for msg in msg_batch:
                process_msg(msg)
                
        end_time = time.time()
        elapsed_time = end_time - start_time
        logging.info("Batch %d processing time: %s seconds",  batch_num, elapsed_time)
        logging.info('Committing offsets for batch: %d', batch_num)
        # Commit the offset of the last message in the batch
        consumer.commit(msg_batch[-1])
        # Increment batch number
        batch_num += 1


if __name__ == '__main__':
    # Get the Kafka broker from an environment variable
    kafka_broker = os.environ.get('KAFKA_BROKER')

    # Get the recommended batch size from an environment variable
    batch_size = int(os.environ.get('BATCH_SIZE'))
    
    # Get topic name from environment variable
    topic = os.getenv('KAFKA_TOPIC', 'test')

    # Check if multithreading is enabled
    multi_thread = os.environ.get('MULTI_THREAD')

    # Set up logging
    logging.basicConfig(
        format='%(asctime)s %(threadName)s/%(thread)d %(levelname)s %(message)s',
        level=logging.INFO
    )
    
    # Define the Kafka consumer configuration
    conf = {
        'bootstrap.servers': kafka_broker,
        'group.id': 'my-group',
        'auto.offset.reset': 'earliest'
    }
    
    consume_messages(conf,topic,batch_size,multi_thread)

In a nutshell:

  • The consume_messages function creates a consumer instance and subscribes to a specified topic.
  • It uses a while loop to continuously poll for messages(long-lived consumer), and processes messages in batches.
  • If multi-threading is enabled, the function starts a new thread for each message in the batch, and each thread processes a single message.
  • If multi-threading is disabled, the function processes all messages in the batch sequentially in the same thread.
  • After processing the messages in a batch, the main thread commits the offsets of the processed messages.
  • The process_msg function is used to process individual messages. It will extract a sleep time from the message, and sleeps for the specified sleep time.

The results

Using a single-threaded approach, all messages were processed in ~12 minutes, with each batch taking approximately: 39.8s ๐Ÿ˜…

$ batches=$(docker logs `docker ps --filter name=consumer -q` 2>&1 | grep 'processing time' | awk '{print $2" | "$5" "$6" | " $9}')
$ echo $batches
17:35:04,625 | Batch 0 | 39.11198592185974
17:35:45,752 | Batch 1 | 41.103530168533325
17:36:24,858 | Batch 2 | 39.10003614425659
17:37:06,002 | Batch 3 | 41.13357758522034
17:37:46,128 | Batch 4 | 40.11229872703552
17:38:31,231 | Batch 5 | 45.09226679801941
17:39:10,361 | Batch 6 | 39.126227617263794
17:39:52,436 | Batch 7 | 42.07227873802185
17:40:27,602 | Batch 8 | 35.14990282058716
17:41:02,703 | Batch 9 | 35.093626976013184
17:41:40,853 | Batch 10 | 38.14328336715698
17:42:20,943 | Batch 11 | 40.08165717124939
17:42:59,036 | Batch 12 | 38.077364921569824
17:43:45,147 | Batch 13 | 46.1059627532959
17:44:22,259 | Batch 14 | 37.10597634315491
17:45:03,375 | Batch 15 | 41.10637903213501
17:45:46,550 | Batch 16 | 43.16052007675171
17:46:25,646 | Batch 17 | 39.08514332771301


#Get the average time for a batch to be processed
echo $batches | awk '{sum+=$NF} END {print sum/NR}'
39.8016

Using a multi-threaded approach, all messages were processed ~ 55 seconds, with each batch taking approximately: 3.03s ๐Ÿ”ฅ

$ batches=$(docker logs `docker ps --filter name=consumer -q` 2>&1 | grep 'processing time' | awk '{print $2" | "$5" "$6" | " $9}')
$ echo $batches
17:57:50,587 | Batch 0 | 3.087355852127075
17:57:53,633 | Batch 1 | 3.035799741744995
17:57:56,660 | Batch 2 | 3.024221897125244
17:57:59,701 | Batch 3 | 3.032158374786377
17:58:02,738 | Batch 4 | 3.030973434448242
17:58:05,781 | Batch 5 | 3.036787271499634
17:58:08,813 | Batch 6 | 3.0284106731414795
17:58:11,835 | Batch 7 | 3.017594814300537
17:58:14,854 | Batch 8 | 3.017782688140869
17:58:17,884 | Batch 9 | 3.02677059173584
17:58:20,920 | Batch 10 | 3.0328853130340576
17:58:23,953 | Batch 11 | 3.0289969444274902
17:58:26,973 | Batch 12 | 3.0188632011413574
17:58:30,006 | Batch 13 | 3.0296244621276855
17:58:33,041 | Batch 14 | 3.0310428142547607
17:58:36,089 | Batch 15 | 3.0452497005462646
17:58:39,110 | Batch 16 | 3.0187816619873047
17:58:42,163 | Batch 17 | 3.0502078533172607

$ echo $batches | awk '{sum+=$NF} END {print sum/NR}'
3.03332

What’s better and more important than the improvement in performance?

The committal of the offset was synchronised and performed by the main thread ๐Ÿ”

Pretty good, eh? ๐Ÿš€