Connecting Elixir to Kafka with Kaffe: A codealong

Meryl Dakin
Flatiron Labs
Published in
6 min readApr 10, 2019
Cross-post from Elixir School!

If we want to use the popular messaging system Kafka with our Elixir projects, we have a few wrappers we can choose from. This blog post covers integrating one of them, Kaffe, which proved difficult to troubleshoot because of results like this:

Exactly what I needed, consumers drinking coffee

In this codealong, we’ll build a simple Elixir application and use Kaffe to connect it to a locally running Kafka server. Later we’ll cover a couple of variations to connect a dockerized Kafka server or an umbrella Elixir app.

This post assumes basic knowledge of Elixir and no knowledge of Kafka or Kaffe. Here is the repo with the full project: Elixir Kaffe Codealong.

What is Kafka, briefly?

Kafka is a messaging system. It does essentially three things:
1. Receives messages from applications
2. Keeps those messages in the order they were received in
3. Allows other applications to read those messages in order

A use case for Kafka:
Say we want to keep an activity log for users. Every time a user triggers an event on your website — logs in, makes a search, clicks a banner, etc. — you want to log that activity. You also want to allow multiple services to access this activity log, such as a marketing tracker, user data aggregator, and of course your website’s front-end application. Rather than persisting each activity to your own database, we can send them to Kafka and allow all these applications to read only what they need from it.

Here’s a basic idea of how this might look:

The three services reading from Kafka would only take the pieces of data that they require. For example, the first service would only read from the banner_click topic while the last only from search_term. The second service that cares about active users might read from both topics to capture all site activity.

Basic Kafka terminology in simple terms

- consumer: what is receiving messages from Kafka
- producer: what is sending messages to Kafka
- topic: a way to organize messages and allow consumers to only subscribe to the ones they want to receive
- partition: allows a topic to be split among multiple machines and retain the same data so that more than one consumer can read from a single topic at a time
- leader/replica: these are types of partitions. There is one leader and multiple replicas. The leader makes sure the replicas have the same and newest data. If the leader fails, a replica will take over as leader.
- offset: the unique identifier of a message that keeps its order within Kafka

Codealong: basic Elixir app & Kafka running locally

SET UP KAFKA SERVER

Follow the first two steps of the quickstart instructions from Apache Kafka. For convenience, here they are:

  1. Download the code
  2. Start the servers

First Zookeeper (a service that handles some coordination and state management for Kafka):

bin/zookeeper-server-start.sh config/zookeeper.properties

Then Kafka:

bin/kafka-server-start.sh config/server.properties

SET UP ELIXIR APP

1. Start new Elixir project

mix new elixir_kaffe_codealong

2. Configure Kaffe

2.a: In mix.exs add :kaffe to the list of extra applications:

def application do
[
extra_applications: [:logger, :kaffe]
]
end

2.b: Add Kaffe to list of dependencies:


defp deps do
[
{:kaffe, “~> 1.9”}
]
end

2.c: Run mix deps.get in the terminal to lock new dependencies.

3. Configure producer

in config/config.exs add:

config :kaffe,
producer: [
endpoints: [localhost: 9092],
topics: [“our_topic”, “another_topic”]
]

Note: endpoints references [hostname: port]. Kafka is configured to run on port 9092. In this example, the hostname is localhost because we’ve started the Kafka server straight from our machine. However, if the server is dockerized, the hostname will be called whatever is specified by that container (usually “kafka”)

4. Configure consumer

4.a: add a new file, /lib/application.ex, with the following code:

defmodule ElixirKaffeCodealong.Application do
use Application
def start(_type, args) do
import Supervisor.Spec
children = [
worker(Kaffe.Consumer, [])
]
opts = [strategy: :one_for_one, name: ExampleConsumer.Supervisor]
Supervisor.start_link(children, opts)
end
end

4.b: back in mix.exs, add a new item to the application function:

def application do
[
extra_applications: [:logger, :kaffe],
mod: {ElixirKaffeCodealong.Application, []}
]
end

Note: now that we’re using the Application module, this is where we’ll tell it to start. We use the keyword `mod` with applications that start a supervision tree, which we configured when adding our Kaffe.Consumer to Application above

4.c: add a consumer module to accept messages from Kafka as /lib/example_consumer.ex with the following code:

defmodule ExampleConsumer do
def handle_message(%{key: key, value: value} = message) do
IO.inspect message
IO.puts “#{key}: #{value}”
:ok
end
end

Note: function to accept Kafka messaged MUST be named “handle_message”.
It MUST accept arguments structured as shown here
It MUST return :ok
It can do anything else within the function with the incoming message

4.d: configure the consumer module in /config/config.exs

config :kaffe,
consumer: [
endpoints: [localhost: 9092],
topics: [“our_topic”, “another_topic”],
consumer_group: “example-consumer-group”,
message_handler: ExampleConsumer
]

5. Add a producer module (optional, can also call Kaffe from the console)

add /lib/example_producer.ex with the following code:

defmodule ExampleProducer do
def send_my_message({key, value}, topic) do
Kaffe.Producer.produce_sync(topic, [{key, value}])
end

def send_my_message(key, value) do
Kaffe.Producer.produce_sync(key, value)
end
def send_my_message(value) do
Kaffe.Producer.produce_sync(“sample_key”, value)
end
end

6. Send and receive messages in the console!

Now we have everything configured and can use the modules we’ve created to send and read messages through Kafka!

  1. We’re going to call on our producer to send a message to the Kafka server.
  2. The Kafka server receives the message.
  3. Our consumer, which we configured to subscribe to the topic called “another_topic”, will receive the message we’ve sent and print it to the console.

Start an interactive elixir shell with iex -S mix and call the following:

ExampleProducer.send_my_message({“Metamorphosis”, “Franz Kafka”}, “another_topic”)

You should see this result:

iex>[debug] event#produce_list topic=another_topic
…>[debug] event#produce_list_to_topic topic=another_topic partition=0
…>:ok
iex> %{
…> attributes: 0,
…> crc: 2125760860, # will vary
…> key: “Metamorphosis”,
…> magic_byte: 1,
…> offset: 1, # will vary
…> partition: 0,
…> topic: “another_topic”,
…> ts: 1546634470702, # will vary
…> ts_type: :create,
…> value: “Franz Kafka”
…> }
…> Metamorphosis: Franz Kafka

Variations: Docker & Umbrella Apps

- If you’re running Kafka from a docker container (most common in real applications), you will use that hostname in the config file rather than localhost
- In an umbrella app, you’ll configure Kaffe in the child application running it. If you have apps separated by environment, you can start the consumer by structuring it as a child like this:

 children = case args do
[env: :prod] -> [worker(Kaffe.Consumer, [])]
[env: :test] -> []
[env: :dev] -> [worker(Kaffe.Consumer, [])] # note that it is not a tuple
[_] -> []
end

Troubleshooting Errors

No leader error

** (MatchError) no match of right hand side value: {:error, :LeaderNotAvailable}

Solution: Try again. It just needed a minute to warm up.

Invalid Topic error

**(MatchError) no match of right hand side value: {:error, :InvalidTopicException}

Solution: Your topic shouldn’t have spaces in it, does it?

The end

This should have given you the basic setup for you to start exploring more of this on your own, but there’s lots more you can do with Kaffe so check out sending multiple messages, consumer groups, etc. If you come up with any more troubleshooting errors you’ve solved, I’d love to continue adding them to what we’ve started here.

Resources

Thanks for reading! Want to work on a mission-driven team that loves illustrated codealongs and working in Elixir? We’re hiring!

Footer top

To learn more about Flatiron School, visit the website, follow us on Facebook and Twitter, and visit us at upcoming events near you.

Flatiron School is a proud member of the WeWork family. Check out our sister technology blogs WeWork Technology and Making Meetup.

Footer bottom

--

--

Meryl Dakin
Flatiron Labs

Dev at Knock.app. @meryldakin on github, LinkedIn, and twitter.