Introducing Deimos: Using Kafka as the Data Backbone for your Architecture

Daniel Orner
Oct 28, 2019 · 7 min read
Image for post
Image for post

In the previous article, I detailed the microservice pattern of using Kafka as a data backbone for your architecture — Kafka acts as the source of truth and each microservice consumes data from it. This allows each service to be independent and to own only the data it cares about. We also said that there should be some way to marry traditional relational databases like MySQL and Postgres with the Kafka data backbone with a minimum of fuss.

We left off with a couple of problems:

  1. Ensuring that downstream systems are not tied to your internal data schemas;
  2. Fixing the transaction problem, where due to errors data can be written to Kafka or your database but not both.

Ruby on Rails — the Database Experts

When we started widely using Kafka in our engineering org, we quickly realized we needed a way to:

  • Save data from Kafka to our database
  • Write data to Kafka — either updating state or sending events
  • Encode/decode data using Avro and the Confluent Schema Registry (a free centralized hub for all our Avro schemas)
  • Standardize logging and tracing for all our consumers

Our first attempt to send data across systems was to use Kafka Connect. We ran into a number of problems with it, some of which were detailed in the previous article. We decided it was more prudent for us to strike our own way to solve this problem.

We started off with a shared library we called FlippRubyKafka in late 2017. Over the last two years, this library has grown, added features and fixed bugs and has powered over a dozen of our core microservices. We are happy to announce that this project, now called Deimos, is now open source!

Deimos logo
Deimos logo

Deimos Features

Deimos is built on a previous, smaller library called Phobos (hence the name), which in turn is built on RubyKafka, a pure Ruby implementation of the Kafka client.

Note that Deimos assumes that you have a copy of the Confluent Schema Registry running. If you’ve already got Kafka and Zookeeper up then all you need to do is deploy the Docker container to the cloud provider of your choice and get a reference to its URL.

Configuring Producers and Consumers

Because Avro is a central tenet of Deimos, all producers and consumers must specify which Avro schema and namespace they will use while consuming or producing. In addition, they need to specify how they handle keys for these messages:

  • The default, and recommended, way to handle keys is to Avro-encode them. This allows for any downstream systems that use static types (e.g. those written in Java or Scala, which have a large ecosystem related to Kafka) to easily decode them.
  • You can also indicate that your topic has no keys (e.g. event topics should not have keys as you want to keep all events, not just the last updated state).
  • Finally, you can leave your keys as plaintext, although this is not recommended.

When using Avro-encoded keys, you can provide a separate key schema, or you can have Deimos auto-magically create one for you from your value schema, by specifying a field (usually “id”) to extract from it.

Sample producer:

class MyProducer < Deimos::Producer

namespace 'com.deimos.my-app-special'
topic 'MyApp.MyTopic'
schema 'MySchema' # will search in the configured path
key_config field: 'key_field'
def send_some_message(an_object)
payload = {
'some-key' => an_object.foo,
'some-key2' => an_object.bar,
'key_field' => an_object.my_field
}

self.publish(payload)
end
end
end

Sample consumer:

class MyConsumer < Deimos::Consumer

schema 'MySchema'
namespace 'com.my-namespace'
key_config field: :my_id

def consume(payload, metadata)
MyHandlerClass.do_something_with(payload[:my_field], metadata[:key])
end
end

All schemas are automatically registered in the schema registry using the topic name (so if you produce to MyTopic, your schemas are automatically registered to MyTopic-keyand MyTopic-value.

Producing to Kafka

  • You will get an automatically generated timestamp (current time) and message_id (uuid) in your payload, if they are present in the schema. This helps immensely when debugging messages across systems.
  • Types will be coerced before saving, so if your schema expects a string but you give it an int, you don’t need to worry about crashes.
  • Metrics are sent allowing you to track the messages you send per topic and partition, as well as tracing enabled to determine if encoding your message is slow.

Consuming from Kafka

Consumers will “swallow” errors by default (relying on tracing to handle alerting in this case) and move on, to prevent individual bad messages from blocking your consumer forever. You can change this behavior or define specific errors or situations where you would not want the consumer to continue.

Working with Databases

  • ActiveRecordConsumers essentially act as a “sink” — dumping a topic into a database table based on some business logic.
  • ActiveRecordProducers act as a “source” — taking in an ActiveRecord object and automatically turning it into a payload to be encoded and sent to Kafka.

Sample ActiveRecordCosumer:

class MyConsumer < Deimos::ActiveRecordConsumer

schema 'MySchema'
key_config field: 'my_field'
record_class Widget

# Optional override to change the attributes of the record before
# they are saved.
def record_attributes(payload)
super.merge(:some_field => 'some_value')
end
end

Sample ActiveRecordProducer:

class MyProducer < Deimos::ActiveRecordProducer

topic 'MyApp.MyTopic'
schema 'MySchema'
key_config field: 'my_field'
record_class Widget

# Optional override to change the default payload calculated from the record.
def generate_payload(attributes, record)
super.merge(:assoc_key => record.some_association.assoc_key)
end
end

MyProducer.send_events([Widget.new(foo: 1), Widget.new(foo: 2)])

In addition to the producer, Deimos also provides a mixin you can add to your ActiveRecord objects which will automatically send messages whenever your records are created, modified or destroyed (as long as you don’t use mass operations like update_all or import).

class Widget < ActiveRecord::Base
include Deimos::KafkaSource

def self.kafka_producers
[MyActiveRecordProducer]
end

end

Database Backend

You can read that article for more, but in a nutshell, we want a guarantee that every record written to the database has a corresponding Kafka message, and every Kafka message is written to the database.

Deimos achieves this with the database backend.

This feature transparently replaces the inline sending of Kafka messages to instead save them to a table in the database. There is then a separate process that reads the messages off this table and sends them off in batches.

Image for post
Image for post
Top: Naive Kafka. Bottom: DB Backend pattern.

Enabling this feature is incredibly simple:

  1. Generate the migration to add the table.
  2. Set the publish_backend configuration setting to :db .
  3. Run the DB Producer in some way (forked process, thread, rake task) via Deimos.run_db_backend!.

This is also known as the transactional outbox pattern and achieves full transactional integrity. All messages (whether they represent changes to objects or events) are part of the same transaction and will only be saved if the transaction commits. If it rolls back, all messages will also roll back and not be sent.

Calling code doesn’t even need to know that the DB backend is turned on. You call the publish method on your producer the same way — instead of sending the message to Kafka directly, it will encode it and save it as a message in the database.

This feature is currently powering our largest producer of data and our production metrics show being able to send 4,000–6,000 messages per second with a single thread.

Test Helpers

Just include the TestHelpers module in your specs and automatically stub out all loaded producers and consumers:

RSpec.configure do |config|
config.include Deimos::TestHelpers
config.before(:each) { stub_producers_and_consumers! }
end

Consumers can be tested by passing a hash and the consumer class into a test method:

expect(Widget.count).to eq(0)
test_consume_message(WidgetConsumer, {id: 5, name: "Darth"}, call_original: true)
expect(Widget.count).to eq(1)

Producers can be tested by just sending messages normally and using the have_sent matcher:

expect('widget-topic').not_to have_sent(anything)
Widget.update(:my_attr, 1)
expect('widget-topic').to have_sent(hash_including(:my_attr => 1))

What’s Next

Flipp Engineering

Insights from the engineers at Flipp

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store