Using Kafka in C

Building High-performance Software in C

Nowadays, Kafka has changed the way we build modern, high-performance distributed software. While C developers are mostly accustomed to relying on native tools for building scalable applications, Kafka can open ways for building more robust and reliable distributed software systems. This article employs librdkafka library to present an example of how to use Kafka in C.

Prerequisites

You may skip this if you already have Kafka up and running. The first and foremost requirement for the rest of this article is to have a healthy Kafka cluster. The easiest way to start Kafka in the development environment is by using Docker Compose:

$ curl -s 'https://raw.githubusercontent.com/conduktor/kafka-stack-docker-compose/master/zk-multiple-kafka-multiple.yml' \
       -o 'docker-compose.yml'
$ docker-compose up -d

The above starts a 3-node cluster on ports 9092, 9093, and 9094. Additionally, if a visual tool is needed for managing and monitoring the cluster, one can use any freely available console, such as UI for Apache Kafka.

Also, we need to have librdkafka librdkafka is the de facto standard for Kafka clients, originally written in C, but widely used via numerous language bindings. installed:

# MacOS (Homebrew)
$ brew install librdkafka

# Debian & Ubuntu
$ apt install librdkafka-dev

# RedHat, CentOS, Fedora, etc.
$ yum install librdkafka-devel

# Windows
$ vcpkg install librdkafka

Getting Started

After Kafka was started, it is now time to start coding. The next step would be importing the Kafka client API into our project’s main file: The include-path may differ, depending on your OS or installation.

#include <librdkafka/rdkafka.h>

rd_kafka_t *rk;           /* Kafka client         */
rd_kafka_conf_t *conf;    /* Configuration object */
rd_kafka_resp_err_t err;  /* Result error code    */
char errstr[512];         /* Error message buffer */

Then, we have to create a client configuration object, whether we are building a producer or a consumer: Note that although we have a hard-coded brokers list here, it must be typically deduced from the command-line arguments or an existing configuration file.

conf = rd_kafka_conf_new();

err = rd_kafka_conf_set(conf, "bootstrap.servers",
                        "localhost:9092,localhost:9093,localhost:9094",
                        errstr, sizeof(errstr));
if (err != RD_KAFKA_CONF_OK) {
  fprintf(stderr, "%s\n", errstr);
  rd_kafka_conf_destroy(conf);
  return 1;
}

Producer

To track the message deliveries, we must define a delivery report callback function:

static void
dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *m, void *) {
  if (m->err)
    /* Message delivery failed */
    fprintf(stderr, "%s\n", rd_kafka_err2str(m->err));
}

And, set the delivery report callback within the configuration object:

rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

Then, we can move on to create a producer:

rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
  fprintf(stderr, "%s\n", errstr);
  return 1;
}

Here, we will create a loop However, it is not necessary. that produces messages periodically:

while (true) {
  char buf[512];
  int len;

  /* Fill buf and len with the message data. */

retry:
  err = rd_kafka_producev(
    rk,
    RD_KAFKA_V_TOPIC("my_topic"),
    RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
    RD_KAFKA_V_VALUE(buf, len),
    RD_KAFKA_V_OPAQUE(NULL),
    RD_KAFKA_V_END);
  if (err) {
    if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
      /* Wait 1s for message deliveries. */
      rd_kafka_poll(rk, 1000);
      goto retry;
    }
    fprintf(stderr, "%s\n", rd_kafka_err2str(err));
  }

  /* Wait for message deliveries (non-blocking). */
  rd_kafka_poll(rk, 0);
}

And finally, we will flush the remaining messages in the queue and clean up the producer:

rd_kafka_flush(rk, 10 * 1000);
if (rd_kafka_outq_len(rk) > 0)
  fprintf(stderr, "%d message(s) not delivered\n", rd_kafka_outq_len(rk));

rd_kafka_destroy(rk);

Consumer

Each consumer is needed to join a consumer group The subscribed topic partitions will be assigned according to the partition assignment strategy to the consumers in the group. . All consumers sharing the same group ID will join the same group:

err = rd_kafka_conf_set(conf, "group.id", "my_consumers",
                        errstr, sizeof(errstr));
if (err != RD_KAFKA_CONF_OK) {
  fprintf(stderr, "%s\n", errstr);
  rd_kafka_conf_destroy(conf);
  return 1;
}

If there is no previously committed offset for a partition the offset reset strategy will be used to decide where in the partition to start fetching messages. By setting this to earliest the consumer will read all messages in the partition if there was no previously committed offset:

err = rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
                        errstr, sizeof(errstr));
if (err != RD_KAFKA_CONF_OK) {
  fprintf(stderr, "%s\n", errstr);
  rd_kafka_conf_destroy(conf);
  return 1;
}

Now, we need to create the consumer object and redirect all messages from per-partition queues to the main queue so that messages can be consumed with one call from all assigned partitions: Unless we would have to define separate consumer loops for each partition.

rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!rk) {
  fprintf(stderr, "%s\n", errstr);
  return 1;
}

rd_kafka_poll_set_consumer(rk);

Now, let’s subscribe to some topic:

rd_kafka_topic_partition_list_t *sub;

sub = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(sub, "my_topic", RD_KAFKA_PARTITION_UA);

err = rd_kafka_subscribe(rk, sub);
if (err) {
  fprintf(stderr, "%s\n", rd_kafka_err2str(err));
  rd_kafka_topic_partition_list_destroy(sub);
  rd_kafka_destroy(rk);
  return 1;
}

rd_kafka_topic_partition_list_destroy(sub);

And, start polling messages and process them one by one in the main loop: The main loop waits 100ms for each message and retries when timed out.

while (true) {
  rd_kafka_message_t *m;

  m = rd_kafka_consumer_poll(rk, 100);
  if (!m) continue;

  if (m->err) {
    fprintf(stderr, "%s\n", rd_kafka_message_errstr(m));
    rd_kafka_message_destroy(m);
    continue;
  }

  /* Process the message:
   *
   * - Key:       m->key & m->key_len
   * - Payload:   m->payload & m->len
   * - Topic:     rd_kafka_topic_name(m->rkt)
   * - Partition: m->partition & m->offset
   */

  rd_kafka_message_destroy(m);
}

And finally, when we exit the main loop, we have to clean up the consumer: Doesn’t look like we will ever exit that loop!

rd_kafka_consumer_close(rk);
rd_kafka_destroy(rk);

Graceful Shutdown

To prevent interrupts from breaking the main loop, we have to handle them gracefully. First, we need to define an interrupt handler at the top:

#include <signal.h>

static volatile sig_atomic_t running = 1;

static void stop(int sig) {
  running = 0;
}

Then, we have to capture interrupts right before the main loop:

signal(SIGINT, stop);

while (running) {
  ...
}

Wrap Up

Building a Kafka producer and a consumer using librdkafka is as easy as it is shown. However, these snippets do not showcase the full power of librdkafka. For that matter, a more sophisticated article is needed.