Connecting Elixir to Kafka with Kaffe: A codealong

Meryl Dakin
Apr 10, 2019 · 6 min read
Image for post
Image for post
Cross-post from Elixir School!

If we want to use the popular messaging system Kafka with our Elixir projects, we have a few wrappers we can choose from. This blog post covers integrating one of them, Kaffe, which proved difficult to troubleshoot because of results like this:

Image for post
Image for post
Exactly what I needed, consumers drinking coffee

In this codealong, we’ll build a simple Elixir application and use Kaffe to connect it to a locally running Kafka server. Later we’ll cover a couple of variations to connect a dockerized Kafka server or an umbrella Elixir app.

Image for post
Image for post

This post assumes basic knowledge of Elixir and no knowledge of Kafka or Kaffe. Here is the repo with the full project: Elixir Kaffe Codealong.

What is Kafka, briefly?

A use case for Kafka:
Say we want to keep an activity log for users. Every time a user triggers an event on your website — logs in, makes a search, clicks a banner, etc. — you want to log that activity. You also want to allow multiple services to access this activity log, such as a marketing tracker, user data aggregator, and of course your website’s front-end application. Rather than persisting each activity to your own database, we can send them to Kafka and allow all these applications to read only what they need from it.

Here’s a basic idea of how this might look:

Image for post
Image for post

The three services reading from Kafka would only take the pieces of data that they require. For example, the first service would only read from the banner_click topic while the last only from search_term. The second service that cares about active users might read from both topics to capture all site activity.

Basic Kafka terminology in simple terms

Codealong: basic Elixir app & Kafka running locally

SET UP KAFKA SERVER

  1. Download the code
  2. Start the servers

First Zookeeper (a service that handles some coordination and state management for Kafka):

bin/zookeeper-server-start.sh config/zookeeper.properties

Then Kafka:

bin/kafka-server-start.sh config/server.properties

SET UP ELIXIR APP

1. Start new Elixir project

mix new elixir_kaffe_codealong

2. Configure Kaffe

def application do
[
extra_applications: [:logger, :kaffe]
]
end

2.b: Add Kaffe to list of dependencies:


defp deps do
[
{:kaffe, “~> 1.9”}
]
end

2.c: Run mix deps.get in the terminal to lock new dependencies.

3. Configure producer

config :kaffe,
producer: [
endpoints: [localhost: 9092],
topics: [“our_topic”, “another_topic”]
]

Note: endpoints references [hostname: port]. Kafka is configured to run on port 9092. In this example, the hostname is localhost because we’ve started the Kafka server straight from our machine. However, if the server is dockerized, the hostname will be called whatever is specified by that container (usually “kafka”)

4. Configure consumer

defmodule ElixirKaffeCodealong.Application do
use Application
def start(_type, args) do
import Supervisor.Spec
children = [
worker(Kaffe.Consumer, [])
]
opts = [strategy: :one_for_one, name: ExampleConsumer.Supervisor]
Supervisor.start_link(children, opts)
end
end

4.b: back in mix.exs, add a new item to the application function:

def application do
[
extra_applications: [:logger, :kaffe],
mod: {ElixirKaffeCodealong.Application, []}
]
end

Note: now that we’re using the Application module, this is where we’ll tell it to start. We use the keyword `mod` with applications that start a supervision tree, which we configured when adding our Kaffe.Consumer to Application above

4.c: add a consumer module to accept messages from Kafka as /lib/example_consumer.ex with the following code:

defmodule ExampleConsumer do
def handle_message(%{key: key, value: value} = message) do
IO.inspect message
IO.puts “#{key}: #{value}”
:ok
end
end

Note: function to accept Kafka messaged MUST be named “handle_message”.
It MUST accept arguments structured as shown here
It MUST return :ok
It can do anything else within the function with the incoming message

4.d: configure the consumer module in /config/config.exs

config :kaffe,
consumer: [
endpoints: [localhost: 9092],
topics: [“our_topic”, “another_topic”],
consumer_group: “example-consumer-group”,
message_handler: ExampleConsumer
]

5. Add a producer module (optional, can also call Kaffe from the console)

defmodule ExampleProducer do
def send_my_message({key, value}, topic) do
Kaffe.Producer.produce_sync(topic, [{key, value}])
end

def send_my_message(key, value) do
Kaffe.Producer.produce_sync(key, value)
end
def send_my_message(value) do
Kaffe.Producer.produce_sync(“sample_key”, value)
end
end

6. Send and receive messages in the console!

  1. We’re going to call on our producer to send a message to the Kafka server.
  2. The Kafka server receives the message.
  3. Our consumer, which we configured to subscribe to the topic called “another_topic”, will receive the message we’ve sent and print it to the console.

Start an interactive elixir shell with iex -S mix and call the following:

ExampleProducer.send_my_message({“Metamorphosis”, “Franz Kafka”}, “another_topic”)

You should see this result:

iex>[debug] event#produce_list topic=another_topic
…>[debug] event#produce_list_to_topic topic=another_topic partition=0
…>:ok
iex> %{
…> attributes: 0,
…> crc: 2125760860, # will vary
…> key: “Metamorphosis”,
…> magic_byte: 1,
…> offset: 1, # will vary
…> partition: 0,
…> topic: “another_topic”,
…> ts: 1546634470702, # will vary
…> ts_type: :create,
…> value: “Franz Kafka”
…> }
…> Metamorphosis: Franz Kafka

Variations: Docker & Umbrella Apps

 children = case args do
[env: :prod] -> [worker(Kaffe.Consumer, [])]
[env: :test] -> []
[env: :dev] -> [worker(Kaffe.Consumer, [])] # note that it is not a tuple
[_] -> []
end

Troubleshooting Errors

No leader error

** (MatchError) no match of right hand side value: {:error, :LeaderNotAvailable}

Solution: Try again. It just needed a minute to warm up.

Invalid Topic error

**(MatchError) no match of right hand side value: {:error, :InvalidTopicException}

Solution: Your topic shouldn’t have spaces in it, does it?

The end

Resources

Thanks for reading! Want to work on a mission-driven team that loves illustrated codealongs and working in Elixir? We’re hiring!

Footer top

To learn more about Flatiron School, visit the website, follow us on Facebook and Twitter, and visit us at upcoming events near you.

Flatiron School is a proud member of the WeWork family. Check out our sister technology blogs WeWork Technology and Making Meetup.

Footer bottom

Flatiron Labs

We're the technology team at The Flatiron School (a WeWork…

Meryl Dakin

Written by

Dev at Frame.io. @meryldakin on github, LinkedIn, and twitter.

Flatiron Labs

We're the technology team at The Flatiron School (a WeWork company). Together, we're building a global campus for lifelong learners focused on positive impact.

Meryl Dakin

Written by

Dev at Frame.io. @meryldakin on github, LinkedIn, and twitter.

Flatiron Labs

We're the technology team at The Flatiron School (a WeWork company). Together, we're building a global campus for lifelong learners focused on positive impact.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store