March 15, 2016
By: Paul Bostrom

Kafka in Clojure

Kafka is a "high-throughput distributed messaging system". It has a number of uses in the industry, in particular website activity tracking. Suppose we want to aggregate all hotel search results on roomkey.com to identify trends in hotel prices. Every time a user searches for hotels, we'll record the hotel and price:

{:hotel-id 1234
 :nightly-rate 229.99}

One solution is to store it in a database, but this incurs some cost. Since this is a user-facing service, we want to minimize latency as much as possible. Kafka provides a nice system to accomplish this: we can embed a Kafka producer (the messaging publishing API) in our user-facing service and publish our data to a Kafka topic with very little overhead. Another service, a Kafka consumer, can subscribe to this topic and handle the aggregation and processing of the hotel data.

Kafka producer

Kafka includes a Java API1, and wrapping the producer API in Clojure is pretty straight-forward. We can create a producer with a few lines of Clojure:

(def p-cfg {"value.serializer" ByteArraySerializer
            "key.serializer" ByteArraySerializer
            "bootstrap.servers" "mykafkahost1:9092,mykafkahost2:9092"})

(def producer (KafkaProducer. p-cfg))

Note that I'm using a ByteArraySerializer in this configuration, which means that I'll be passing byte arrays to the producer. I prefer to deal with Clojure data structures as much as possible, so I use the Nippy serialization library to convert Clojure data to byte arrays. Kafka supports a number of other serialization formats depending on your data requirements. The only other required configuration2 is a list of one or more servers in our Kafka cluster. The producer docs state that "the producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances", so we can reuse this producer for any topics we publish to. We may want to wrap the producer in a component (or similar) lifecycle, since it does have a close() cleanup method. Once we've instantiated our producer, we can call the producer's send method to publish data to the topic "hotel-data":

(.send producer (ProducerRecord. "hotel-data" hotel-bytes))

The value for hotel-bytes is a byte array returned by calling the Nippy serialization function freeze on the Clojure data structure shown in the introduction. Now our hotel search service will produce a continuous stream of hotel data in our Kafka cluster. Next we'll look at how we can consume this stream.

Kafka consumer

Implementing a Kafka consumer involves a bit more complexity than the producer. The KafkaConsumer Javadocs gives a nice overview of some of the concerns. For this example, we'll use a single consumer subscribed to our "hotel-data" topic. We'll look at a few strategies for consuming messages and managing the offset, which lets us track our position while processing the message stream. If our message processing code encounters some error, we can restart message consumption at some known point to avoid losing messages or unnecessarily re-processing messages3.

We'll start with the minimum configuration for the consumer:

(def c-cfg
  {"bootstrap.servers" "mykafkahost1:9092,mykafkahost2:9092"
   "group.id" "avg-rate-consumer"
   "auto.offset.reset" "earliest"
   "enable.auto.commit" "true"
   "key.deserializer" ByteArrayDeserializer
   "value.deserializer" ByteArrayDeserializer})

(def consumer (doto (KafkaConsumer. c-cfg)
                    (.subscribe ["hotel-data"])))

It looks pretty similar to the producer, but we specify that we should initially start consuming messages at the beginning of the stream with "auto.offset.reset" "earliest", while "enable.auto.commit" "true" means that we'll let the consumer handle committing the offset automatically. Of course, we'll want to actually do something with every message we consume. Let's create a few helper functions: the first updates the average nightly rate for each hotel in a database:

(defn update-avg-rate [db hotel-id nightly-rate]
 ;; use your imagination)

And another which deserializes a Kafka message (or record) and calls update-avg-rate:

(defn process-record [record]
  (let [m (-> record
              (.value)
              nippy/thaw)]
    (update-avg-rate db (:hotel-id m) (:nightly-rate m))))
 

We can do a straight port of the while loop4 code from the consumer Javadocs:

(while true
  (let [records (.poll consumer 100)]
    (doseq [record records]
      (process-record record))))

One drawback to this is that we don't have precise control over when our offset is committed. A call to .poll might return a number of records (on the order of several thousand), at which point the consumer would automatically update the offset, marking the messages as being succesfully received. Suppose we then get an error updating the database, which causes the processing to halt. When we restart processing after fixing the database, we could lose track of those few thousand messages and never process them. Instead, we can manage the offset ourselves by specifying "enable.auto.commit" "false" in the consumer config, and then calling .commitSync on the consumer after processing the result of a poll:

(while true
  (let [records (.poll consumer 100)]
    (doseq [record records]
      (process-record record)))
  (.commitSync consumer))

Now we have the opposite problem: when we restart after an error, we may end up re-processing records, which would distort our averages. We could commit the offset after every database update, but that would be greatly reduce the processing rate of our message stream. A better strategy would be to do the processing in batches:

(def min-batch-size 1000)
(def buffer (atom []))
(while true
  (let [records (.poll consumer 100)]
    (swap! buffer into records)
    (when (>= (count buffer) min-batch-size)
      (process-batch buffer)
      (.commitSync consumer)
      (reset! buffer []))))

Again, this is a fairly straight port of the Java code from the Kafka consumer Javadocs. It's no surprise that our code is starting to look a little too Java-y. It would be nice to wrap the consumer polling in a nice Clojure abstraction. One idea is to have a .poll loop which writes to a core.async channel, and then have a processing loop read from the channel. After going down that path for a bit, I was not content with the implementation5. I ultimately settled on a good old-fashioned lazy sequence6:

(defn lazy-consumer [consumer]
  (lazy-seq
   (let [records (.poll consumer 100)]
     (concat records (consumer-lazy consumer)))))

Now our batch processing code looks a lot better:

(def min-batch-size 1000)

(loop [lc (lazy-consumer consumer)]
  (let [buffer (take min-batch-size lc)]
    (process-batch buffer)
    (.commitSync consumer (calculate-offsets buffer))
    (recur (drop min-batch-size lc))))

Notice that we are now passing a data structure representing offsets to .commitSync. This is because by default, it will commit offsets of all records returned by .poll. Since calls to take from the lazy sequence will usually just return a subset of records from the last internal call to .poll, we don't want to mark all of its records as committed yet. The code to maintain this offset structure is as follows:

(defn commit-tuple [r]
  [(TopicPartition. (.topic r) (.partition r)) (OffsetAndMetadata. (+ 1 (.offset r)))])

(defn calculate-offsets [buffer]
  (reduce #(conj %1 (commit-tuple %2)) {} buffer))

Now we have a rather robust strategy for managing our consumer offsets in the face of errors. Even with this, we can't guarantee "exactly-once" semantics, since our process could fail at some point between process-batch and .commitSync. That's not something I'm not going to try to solve here, but you can read more on the Kafka wiki.

At this point we've established a solid foundation for using Kafka in Clojure. There are a lot of remaining details, configuration options, caveats, and things that can go wrong that are beyond the scope of this article7. This is where Clojure really shines, because you can do a lot of rapid experimentation at the REPL to fine-tune everything. Throw in some good logging and metrics, and you can build a nice distributed messaging system with Kafka.

We're Hiring

Check out our job openings if you're interested in working on this stuff.

Footnotes

  1. This article will cover the producer and consumer APIs in the recent 0.9 release. It helps to have the Javadocs handy since a lot of Java interop is involved.
  2. This minimal configuration provides a "fire and forget" approach to sending messages, though Kafka can be configured to provide certain guarantees of message durability. It's not a requirement for this use case, since our hotel trends framework is not going to fall apart if we lose some of our hotel search data here or there. If we were dealing with financial transactions we might need it though.
  3. How to handle errors in messaging systems is a concept known as "delivery semantics". Note that "message delivery" in our case means not just receiving the message, but also doing some additional processing (i.e. deduplication, aggregation, persistent storage). It's a rather hard problem (and impossible in some cases) to achieve "exactly-once" semantics, so we will strive for either "at-least-once", meaning some messages may be reprocessed, or "at-most-once", meaning some messages may be lost.
  4. For simplicity, all consumer polling loops are implemented as infinite while loops. In practice you may want to use an atom holding a "shutdown" flag coupled with a JVM shutdown hook to cleanly exit from these loops.
  5. Aesthetically, the code just wasn't that much better than the original Java code to justify the effort. I finally gave up completely when I started seeing ConcurrentModificationException KafkaConsumer is not safe for multi-threaded access when attempting to manually commit offsets. Since the calls to .poll and .commitSync happen on different threads, I would need to implement some sort of coordination to avoid calling them concurrently.
  6. All the usual conditions on lazy sequences apply of course: don't hold on to the head, don't try to count it, don't try to hold the entire sequence in memory, etc.
  7. Refer to this blog post from Confluent for a deeper look at consumer configuration.
Tags: consumer Kafka producer lazy-seq Clojure