A Clojure library for the Apache Kafka (distributed stream-processing software platform).
Uses KF protocol and does not rely on ZooKeeper.
Tries to be as lightweigh as possible thus depends only on
- 
org.apache.kafka/kafka_2.12 "3.9.0"
- 
org.apache.kafka/kafka-clients "4.0.0"
but excluding jmx* and logging.
| Note | The Zookeeper dependency has been removed as it’s not required by modern Kafka clients for standard operations. | 
| Note | Some builds (for instance of v0.4.xbranch) may partially (sometimes even fully) be incompatible with some versions of other libraries that also use NIO! If you’re experiencing build problems and/or your application is unexpectedly crashed on start - try check your project dependencies more deeply, may be you will need to correct existing dependencies version or to add an actual version of full[io.netty/netty-all] | 
Actual library info:
Add the following to your Leiningen’s
project.clj:
[net.tbt-post/clj-kafka-x "0.8.0"](require '[clj-kafka-x.producer :as kp])
(with-open [p (kp/producer {"bootstrap.servers" "localhost:9092"}
                           (kp/string-serializer)
                           (kp/string-serializer))]
  @(kp/send p (kp/record "topic-a" "Hi there!")))(require '[clj-kafka-x.consumers.simple :as kc])
(with-open [c (kc/consumer {"bootstrap.servers" "localhost:9092"
                            "group.id" "consumer-id"}
                            (kc/string-deserializer)
                            (kc/string-deserializer))]
  (kc/subscribe c "topic-a")
  (kc/messages c))| Note | When you use multiple partitions per topic it is required
to specify them explicitly when subscribing, i.e. (kc/subscribe
    c [{:topic "topic-a" :partitions #{0 1}}
       {:topic "topic-b" :partitions #{0 1 2}}]) | 
(ns buzz.consumer.kafka
  (:require [clj-kafka-x.consumers.simple :as kc]
            [clojure.tools.logging :as log]))
(defn processor [msg schema] msg)
(def schema nil)
(def config {"bootstrap.servers" "localhost:9092"
             "group.id" "consumer-id"})
(defn process-message [msg]
  (let [{:keys [value topic partition offset]} msg
        processor processor ;; choose one by topic name
        schema schema]      ;; choose one by topic name
    (if (fn? processor) (processor value schema) value)))
(defn consume []
  (with-open [c (kc/consumer config
                             (kc/byte-array-deserializer)
                             (kc/byte-array-deserializer))]
    (kc/subscribe c (config/kafka-topics))
    (let [pool (kc/messages c)]
      (doseq [message pool]
        (log/warn (process-message message))))))you may also use specific timeouts form
(defn- consume [instance process-message]
  (when-let [co (kc/consumer config
                             (kc/byte-array-deserializer)
                             (kc/byte-array-deserializer))
             messages (kc/messages
                        co
                        :timeout (:request-timeout-ms config))]
    (doall (map process-message messages))))message count per poll execution may be specified by max.poll.records field of configuration
The project includes a test suite to verify functionality.
$ lein with-profile dev testFor test coverage reports:
$ lein with-profile dev cloverageThe project includes integration tests that require a running Kafka instance. By default, these tests are skipped. To run them:
(binding [clj-kafka-x.integration-test/*run-integration-tests* true
          clj-kafka-x.integration-test/*kafka-bootstrap-servers* "localhost:9092"]
  (clj-kafka-x.integration-test/run-integration-tests))Copyright © 2016-2025
Distributed under the Apache License v 2.0