Outbox Pattern Implementation using Debezium and Google Protobuffers

With the widespread evolution towards microservices, we have an increasing need to replicate data between microservices.
To make this data replication near real time, without any additional load to your DB, we have Change Data Capture tools like Debezium or Maxwell.
With the default configuration, these tools push these change events into Kafka using their own schema. This forces downstream services to consume using these schemas instead of raw data. This also make us lose performance gains that we expect when using formats like Google Protocol Buffers or Avro.
In this POC, I implemented a simple outbox pattern example and configured Debezium to send the raw protobuf data. This way the downstream services can now consume these events just like any normal protobuf message.

  • A dockerized Django web application that stores data in a MySQL database. This application writes the events to an outbox table in a protobuf binary serialized format.
  • An Apache Kafka docker setup with a Kafka Connect container that has the Debezium MySQL connector installed. The Debezium connector listens to the MySQL binlog for a specific outbox table and pushes the changes to a Kafka topic
  • A dockerized .NET Core React application with a PostgreSQL database that consumes this data.

Setup:

  • Just docker :)
docker-compose up -d
docker-compose run --rm --no-deps python_app python manage.py migrate
docker-compose run --rm --no-deps python_app python manage.py createsuperuser
Server: mysql
Username: root
Password: pass
Database: djangodb
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO django@'%';
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ --data-raw '{
"name": "cdc-python-netcore-connector-outbox",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "django",
"database.password": "django",
"database.server.name": "cdc-mysql",
"database.history.kafka.topic": "cdc-test",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"table.include.list": "djangodb.polls_outbox",
"transforms": "outbox",
"transforms.outbox.type" : "io.debezium.transforms.outbox.EventRouter",
"value.converter": "io.debezium.converters.ByteBufferConverter",
"value.converter.schemas.enable": "false",
"value.converter.delegate.converter.type": "org.apache.kafka.connect.json.JsonConverter"
}
}'
brew install protobuf
protoc question.proto --python_out=./output/python/ --csharp_out=./output/csharp/ --descriptor_set_out=question.desc
class Outbox(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
aggregatetype = models.CharField(max_length=255)
aggregateid = models.CharField(max_length=255)
event_type = models.CharField(max_length=255, db_column='type')
payload = models.BinaryField()
@transaction.atomic
def save_model(self, request, obj, form, change):
super().save_model(request, obj, form, change)
self.create_outbox_record(obj)
def create_outbox_record(self, obj):
ts = Timestamp()
ts.FromDatetime(obj.pub_date)
proto = QuestionProto(
id=obj.id,
question_text=obj.question_text,
pub_date=ts,
)
outbox = Outbox(
aggregatetype='question',
aggregateid=obj.id,
event_type='question_created',
payload=proto.SerializeToString(),
)
outbox.save()
#outbox.delete()
public class QuestionConsumerService : BackgroundService
{
private readonly string topic;
private readonly IConsumer<string, QuestionProto> kafkaConsumer;
public QuestionConsumerService()
{
topic = "outbox.event.question";
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "host.docker.internal:9092",
GroupId = "hus-dotnet-consumer",
AutoOffsetReset = AutoOffsetReset.Earliest,
};
kafkaConsumer = new ConsumerBuilder<string, QuestionProto>(consumerConfig)
.SetValueDeserializer(new MyDeserializer())
.SetErrorHandler((_, e) => Console.WriteLine($"Consumer Error at SetErrorHandler: {e.Reason}"))
.Build();
}
...
kafkaConsumer = new ConsumerBuilder<string, QuestionProto>(consumerConfig)
.SetValueDeserializer(new ProtobufDeserializer<QuestionProto>().AsSyncOverAsync())
.SetErrorHandler((_, e) => Console.WriteLine($"Consumer Error at SetErrorHandler: {e.Reason}"))
.Build();
System.IO.InvalidDataException: "Expecting message Value with Confluent Schema Registry framing. Magic byte was 8, expecting 0"
proto.SerializeToString()
public class MyDeserializer : IDeserializer<Question>
{
public Question Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull)
return null;
return Question.Parser.ParseFrom(data);
}
}
public class Question
{
public int Id { get; set; }
public string QuestionText { get; set; }
public DateTime? PubDate { get; set; }
}
void SaveMessage(QuestionProto val)
{
var question = new Question
{
Id = val.Id,
QuestionText = val.QuestionText,
PubDate = val.PubDate?.ToDateTime()
};
var settings = new ConnectionSettings().DefaultIndex("question");
var client = new ElasticClient(settings);
client.IndexDocument(question);
}
[HttpGet]
public IEnumerable<Question> Get()
{
var settings = new ConnectionSettings().DefaultIndex("question");
var client = new ElasticClient(settings);
var searchResponse = client.Search<Question>(s => s
.Query(q => q.MatchAll())
.Sort(q => q.Descending(question => question.Id))
.Size(20)
);
return searchResponse.Documents;
}
{questions.map(question =>
<tr key={question.id}>
<td>{question.id}</td>
<td>{question.questionText}</td>
<td>{question.pubDate}</td>
</tr>
)}

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store