Connecting Elixir to Kafka with Kaffe: A codealong

Meryl Dakin
Apr 10 · 6 min read
Cross-post from Elixir School!
Exactly what I needed, consumers drinking coffee

What is Kafka, briefly?

Kafka is a messaging system. It does essentially three things: 1. Receives messages from applications 2. Keeps those messages in the order they were received in 3. Allows other applications to read those messages in order

Basic Kafka terminology in simple terms

- consumer: what is receiving messages from Kafka - producer: what is sending messages to Kafka - topic: a way to organize messages and allow consumers to only subscribe to the ones they want to receive - partition: allows a topic to be split among multiple machines and retain the same data so that more than one consumer can read from a single topic at a time - leader/replica: these are types of partitions. There is one leader and multiple replicas. The leader makes sure the replicas have the same and newest data. If the leader fails, a replica will take over as leader. - offset: the unique identifier of a message that keeps its order within Kafka

Codealong: basic Elixir app & Kafka running locally

SET UP KAFKA SERVER

Follow the first two steps of the quickstart instructions from Apache Kafka. For convenience, here they are:

  1. Start the servers
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

SET UP ELIXIR APP

1. Start new Elixir project

mix new elixir_kaffe_codealong

2. Configure Kaffe

2.a: In mix.exs add :kaffe to the list of extra applications:

def application do
  [
    extra_applications: [:logger, :kaffe]
  ]
end

defp deps do
  [
    {:kaffe, “~> 1.9”}
  ]
end

3. Configure producer

in config/config.exs add:

config :kaffe,
 producer: [
   endpoints: [localhost: 9092],
   topics: [“our_topic”, “another_topic”]
 ]

4. Configure consumer

4.a: add a new file, /lib/application.ex, with the following code:

defmodule ElixirKaffeCodealong.Application do
  use Application   def start(_type, args) do
    import Supervisor.Spec
    children = [
      worker(Kaffe.Consumer, [])
    ]
    opts = [strategy: :one_for_one, name: ExampleConsumer.Supervisor]
    Supervisor.start_link(children, opts)
  end
end
def application do
 [
   extra_applications: [:logger, :kaffe],
   mod: {ElixirKaffeCodealong.Application, []}
 ]
end
defmodule ExampleConsumer do
  def handle_message(%{key: key, value: value} = message) do
    IO.inspect message 
    IO.puts “#{key}: #{value}” 
    :ok
  end
end
config :kaffe,
  consumer: [
    endpoints: [localhost: 9092], 
    topics: [“our_topic”, “another_topic”],
    consumer_group: “example-consumer-group”, 
    message_handler: ExampleConsumer
 ]

5. Add a producer module (optional, can also call Kaffe from the console)

add /lib/example_producer.ex with the following code:

defmodule ExampleProducer do
 def send_my_message({key, value}, topic) do
   Kaffe.Producer.produce_sync(topic, [{key, value}])
 end
 
 def send_my_message(key, value) do
   Kaffe.Producer.produce_sync(key, value)
 end  def send_my_message(value) do
   Kaffe.Producer.produce_sync(“sample_key”, value)
 end
end

6. Send and receive messages in the console!

Now we have everything configured and can use the modules we’ve created to send and read messages through Kafka!

  1. The Kafka server receives the message.
  2. Our consumer, which we configured to subscribe to the topic called “another_topic”, will receive the message we’ve sent and print it to the console.
ExampleProducer.send_my_message({“Metamorphosis”, “Franz Kafka”}, “another_topic”)
iex>[debug] event#produce_list topic=another_topic
…>[debug] event#produce_list_to_topic topic=another_topic partition=0
…>:ok
iex> %{
…> attributes: 0,
…> crc: 2125760860, # will vary
…> key: “Metamorphosis”,
…> magic_byte: 1,
…> offset: 1, # will vary
…> partition: 0,
…> topic: “another_topic”,
…> ts: 1546634470702, # will vary
…> ts_type: :create,
…> value: “Franz Kafka”
…> }
…> Metamorphosis: Franz Kafka

Variations: Docker & Umbrella Apps

- If you’re running Kafka from a docker container (most common in real applications), you will use that hostname in the config file rather than localhost - In an umbrella app, you’ll configure Kaffe in the child application running it. If you have apps separated by environment, you can start the consumer by structuring it as a child like this:

 children = case args do
   [env: :prod] -> [worker(Kaffe.Consumer, [])]
   [env: :test] -> []
   [env: :dev] ->  [worker(Kaffe.Consumer, [])] # note that it is not a tuple
   [_] -> []
 end

Troubleshooting Errors

No leader error

** (MatchError) no match of right hand side value: {:error, :LeaderNotAvailable}

Invalid Topic error

**(MatchError) no match of right hand side value: {:error, :InvalidTopicException}

The end

This should have given you the basic setup for you to start exploring more of this on your own, but there’s lots more you can do with Kaffe so check out sending multiple messages, consumer groups, etc. If you come up with any more troubleshooting errors you’ve solved, I’d love to continue adding them to what we’ve started here.

Resources



Footer top
Footer bottom

Flatiron Labs

We're the technology team at The Flatiron School (a WeWork company). Together, we're building a global campus for lifelong learners focused on positive impact.

Meryl Dakin

Written by

Software Developer at the Flatiron School https://github.com/meryldakin http://www.linkedin.com/in/meryldakin/

Flatiron Labs

We're the technology team at The Flatiron School (a WeWork company). Together, we're building a global campus for lifelong learners focused on positive impact.