Kafka Stream Processing — Composing Views By Example

prabath weerasinghe
6 min readApr 22, 2020

--

The sample code and instruction to setup and run, is available in GitHub.

Event driven system designed with CQRS requires composing different views of the system. This involves processing multiple event types from different services from multiple bounded contexts. Usually views are de-normalized and custom tailored to cater specific queries.
For the discussion I’ll take a hospital management system. This hypothetical system compromises of three bounded context.

Figure-1 — Bounded contexts initial breakdown.
  • Patient
  • Staff
  • Appointment

For our example, we’ll focus on two services from Staff and Appointment contexts. Doctor-Management service from Staff context and Appointment-Management service from Appointment context. Each service would be publishing two types of events.

  1. Notification events —
    Carries notifications on domain aggregate and entity changes as well as auxiliary services. Eg : AppointmentCreated, DoctorUpdated, PatientAdmitted, PatientReminderMailSent etc.
  2. Domain entity snapshot events — These events carries snapshots of domain entities. Each entity would have it’s own topic and the event stream would be a change-log stream.
Figure-2 — Doctor with his/her latest appointment view composer flow.

As shown in Figure-2, our example compose a view of a Doctor entity along side his/hers latest or nearest ten appointments.
We’ll be listening to snapshot topics from Doctor and Appointment aggregate services and the composed denormalized view could be put in a No-SQL DB of your preference.

Since we are dealing with events from two different topics with different number of partitions and partitioning keys, we need to make sure that Appointments and Doctor events for the same doctor always get’s processed together.

DSL Version
— — — — —

Event Flow

  1. Consuming doctor snapshot topic

2. Consuming appointment snapshot topic

Appointments for a Doctor may get deleted and the Appointment service will denote them with a Null value event.
Appointments are going to be re-partitioned by the doctor-id. Appointment creation and update events carry the doctor-id in the event payload itself. But how can we embed the doctor-id in a Null valued Appointment event ?

3. Changing event key from <appointment-id> to<appointment-id,doctor-id>

As stated before, we need to keep track of the doctor-id for a given every Appointment event. This applies to Null valued appointments as well.
To that end, a Transformer maps the appointment-id to a composite key which encapsulate both the appointment-id and the doctor-id.

Once the new composite key is ready, a re-partitioning is triggered by the doctor-id. It will make sure that Appointment events for the same doctor get processed by the same Stream Task.

The custom partitioning logic is the same one used by Kafka partitioners.

Side-Node — How does the Transformer maps keys ?
When the transformer receives the initial Appointment created/updated event, which always comes before a deletion, it stores the doctor-id in the value against the appointment-id in a state store. The name of the state store given in the example is ‘temp_appointment_key_store’.

This way, when the transformer receives an Appointment event with a Null value (marking a deletion), it can get the corresponding doctor-id from the store and compose the new composite key.

Check the code for the complete example with the transformer implementation.

4. Aggregating Appointments into a Sorted Set.

Now that we have a re-partitioned stream of Appointments, we can aggregate them into a Set sorted by the appointment-date. But we have an issue. Grouped KStream aggregations skip over Null values. So we need to somehow sneak in Null valued events.

The better way is to wrap every Appointment event (Null valued or not), with another structure. But just for the example, I’ve replaced Null valued events with a dummy Appointment and set the appointment-date to Null.

5. Perform an Outer-Join between KTable<doctor_id, Doctor> and KTable<doctor_id, SortedSet<Appointment>>

The outer join creates an object containing both the Doctor entity and Sorted Set of latest Appointments. The created object will then be published to the external view topic with the doctor-id as the key.

Processor API Version
— — — — — — — — — —

Event Flow

  1. Adding a Source processor to consume Appointment events

This is straight forward. Check the documentation for more details.

2. Adding a Processor to change Appointment event key from <appointment-id> to<appointment-id, doctor-id>

This processor does what the transformer did in the DSL. It converts the original Appointment event key from <appointment-id> to the composite key <appointment-id, doctor-id>. This allows re-partitioning of Appointment events by the doctor-id even when the event value is Null (change-log event marking a deletion).

3. Adding a Sink Processor to re-partition Appointment topic and a Source Processor to consume that re-partitioned topic

The Sink Processor uses a custom partitioner and re-partitions Appointment stream by the doctor-id.

4. Adding a Source Processor to consume Doctor entity snapshot events

5. Adding a Processor to consume both the re-partitioned Appointment topic and the Doctor topic to prepare the final composed view

As you can see there’s only one processor used to do both the aggregation of Appointments and composing the final view.

It uses only one state store to keep the composed view against the doctor-id. It updates this composed view object for every Doctor and Appointment event.

Through re-partitioning, we’ve ensured that Appointments and Doctor events for a given doctor-id get processed by the same Processor instance ( The Stream Task controls the Processor).
Thus there’s no need to worry about anomalies like lost updates of the composed view.

DSL Vs Processor API

The DSL version of the previous example is really a mix of the Processor API and DSL. Because there’s no easy way around creating the composite key unless a Transformer was used.

Both approached have their own pros and cons. If the logic is quite straight forward like enriching an event stream with some static data, then pure DSL would be the better choice. Specially when we do windowed joins, DSL would be the better choice.

But for stateful, complicated functionalities, using pure DSL tends to add more boilerplate Processor nodes and internal topics in my opinion.
Not just the Processor topology but even the code itself might get cluttered.

On the other hand, though the Processor API makes an easy to read flow and requires less nodes and topics, it requires the developer to write Processors for everything. Sometimes we would essentially be reinventing the wheel.

In my opinion, combining the best of both worlds is the way forward.
Better to stay away from doing fancy multi step KTable-KTable joins followed by filtering and aggregations just for the sake of doing it.
It’ll make things quite hard to read when we revisit the code couple of months later.
Better to abstract out the complicated logical pieces into Transformers or Processors and use the DSL to glue them with the flow.

Hope this was helpful and please point out anything I’ve state wrong or overlooked.

--

--