Index cassandra data on elasticsearch with akka streams and alpakka
Alpakka cassandra and elasticsearch
Effectively handling the back-pressure is the most common problem in producer, consumer based streaming applications. Most of the old streaming platforms handles the back-pressure in a synchronous way(ex apache camel). They slows down the producer until consumer processes the message(which is inefficient). Reactive streams specification gives a solution for this problem. It provides a way to handle
back-pressured stream processing in
non-blocking way. According to the reactive streaming documentation, it’s an intuitive and safe way to do
backpressured stream processing.
Akka streams implemented reactive streaming specification on top of akka actors. It allows to process the data(which comes from various sources) through different pipelines(flows). All the processing is done in a
Basically in a stream there is a
source(where data is generated) and
sink(where data is ended up). In between source and sink there could be various
flows where data is transformed into different forms.
Akka streams defines these terminologies with three build in types(source, sink and flow).
Source- Where the data is originating(input source to the stream). It has has single output channel and no inputs.
Flow- Data originating from source are transformed into different forms via different flows. Flow has one input and one output.
Sink- Endpoint of the stream. Data originating from source transformed in different flow stages and ended up in sink. Sink has only one input and no outputs.
In akka streams data may comes(
source) from various existing systems and ended up(
sink) in various existing systems. For an example data comes from
source), index in
sink). Alpakka gives a way to connect akka streams into these kind of existing systems. Basically it provides connectors to source and sink the data from different existing components. There are various existing connectors in alpakka. You can find them in here.
In this post I’m gonna show how to source data from cassandra and sink them to elasticsearch. All the source codes which related to this post are at hakka gitlab repo. Please checkout the repository and follow the steps below.
1. Setup cassandra
First I need to run cassandra in my local machine. I can simply run cassandra with docker.
Then I need to create cassandra
bootstrap some data into the table. Alpakka cassandra connector connects to this table and fetch the data on it.
2. Setup elasticsearch
Next I need to run the elasticsearch. It also can be run with docker. That is the easiest way.
Now the infrastructure is ready, next thing is to build and run the akka stream application with
3. Sbt dependency
In my sbt application I need to define akka-streams dependency and alpakka connectors dependencies(cassandra and elasticsearch). Following is the
4. Cassandra source
Stream application’s data source comes from cassandra. I need to define cassandra cluster configurations and source in here.
5. Elasticsearch sink
Next thing is to write elasticsearch sink with alpakka elasticsearch connector. I need to define following configurations on it.
6. Connect flows and run stream
Now cassandra source and elasticsearch sink are ready. I need to combine source and sink through the flows. Following are the flows I need to have.
Cassandra Row to Account- Cassandra source returns
Rowobjects. I need to map them as
Accountcase class objects.
Account to WriteMessage- Elasticsearch sink required
WriteMessagetype data. I need to transform
I can combine source, flows and sink as akka stream materializer and run it. When running the stream materializer it returns a
Future which I can use to evaluate the status of stream completion.