Kafka, Java, and Bitcoin.

Distributed Systems Development A-Z Guide.

Dmytro Nasyrov
Pharos Production
6 min readFeb 26, 2019

--

Give us a message if you’re interested in Blockchain and FinTech software development or just say Hi at Pharos Production Inc.

Or follow us on Youtube to know more about Software Architecture, Distributed Systems, Blockchain, High-load Systems, Microservices, and Enterprise Design Patterns.

Pharos Production Youtube channel

In previous articles, we have looked at Kafka deployment locally in a development environment and basic manipulations from Java project. This time we will go further into Kafka and will use Twitter Streaming API to get all Bitcoin tweets and storing them into Kafka. Check out our previous articles if you have missed them.

As usual, we start with an empty Maven project.

New Maven project.

Next, we choose the name of the project.

Artifact and Group IDs.

And the root module name.

Root module name.

After pressing the Finish button we should add the root package for the module and our first and the single for this tutorial class.

Project structure.

Before we will start to code, we should add several dependencies and a plugin. We want to make Java 11 as our target platform.

Java 11.

And for our project, we need Apache Kafka dependency, Logger Slf4j, and Twitter library to make our life easier when communicating with Twitter API.

Dependencies.

Twitter API.

To start work with Twitter we need to create a new App. And in section Keys and Tokens you can find your Consumer API keys and Access token & access token secret.

Producer.

In the Producer class create 4 constants and put your API keys into them.

Twitter API keys.

Also, we add 3 more constants:

  • TWITTER_CLIENT — this one defines the name of the twitter client;
  • BOOTSTRAP_SERVER — the Kafka broker address;
  • TOPIC — the topic we will use in Kafka to store tweets.
More constants.

Let’s add two global variables — one for Logger and another for the list of search topics on Twitter. We will stream all tweets with bitcoin and blockchain inside.

Global variables.

We define a run method. The method consists of the next things:

  • on the first step, we create a queue for tweets with 1K elements capacity;
  • next, we create a Twitter API client;
  • and a Kafka producer;
  • we have added a shutdown method, it will be used when you stop the producer;
  • in a while-loop we read a message from the queue and send it to Kafka.
Run method.

Let’s go through all this stuff in details. Connection to Twitter is pretty straightforward. We define hosts to connect — Stream Host of Twitter API; also we define a filtering endpoint — a class that takes a list of terms to filter output Twitter stream. We connection to Twitter via OAuth client with keys defined in constants section and adding the queue as an output.

Create a Twitter client.

To connect to the Twitter we call the connect method.

Connect method.

Queue polling is simple too. We read messages from the queue with 5 seconds timeouts. In case of the Interrupted Exception, we stop polling the queue.

Poll the queue.

It’s enough Twitter, let’s jump into Kafka. We should define a list of properties here:

  • bootstrap server — is our Kafka broker;
  • key and value serializers — string serializers for keys and values;
  • enable idempotence config — this setting will save us from doubling values in Kafka due to issues with acknowledges. Kafka will add a unique identifier to every request and in case we send it twice, Kafka adds the data only once;
  • Acks config — we request the number of acknowledges from the maximum number of Kafka brokers defined in kafka’s configuration;
  • Max in-flight requests per configuration — the maximum number of unacknowledged requests the client will send on a single connection before blocking. If this setting is greater than 1, pipelining is used when the producer sends the grouped batch to the broker. This improves throughput, but if there are failed sends there is a risk of out-of-order delivery due to retries (if retries are enabled). Note also that excessive pipelining reduces throughput;
  • compression type config — we should compress message batches to make them smaller in size. It will add a latency just a little, but it’s ok;
  • linger ms config — time to collect a batch;
  • batch size config — the size of the batch in bytes.
Kafka configuration.

It’s straightforward to send to Kafka — we create a new record on a selected topic and send it via producer.

Send a record to Kafka.

The last thing is a graceful shutdown of the Twitter client and Kafka producer.

Shutdown

The last thing is the main method to launch our producer

Main method.

Let’s create a Kafka topic manually. In a console run the next statement.

kafka-topics --zookeeper 127.0.0.1:2181 --create --topic bitcoin_tweets --partitions 6 --replication-factor 1

Here we pointing to ZooKeeper instance and creating a topic bitcoin_tweets with 6 partitions and a replication factor of 1.

In a separate console tab run Kafka consumer to check incoming tweets

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic bitcoin_tweets

and run Producer. Now you should see both streams — in the log of IDE incoming tweets and in the console data from Kafka broker.

You can find the full source code at our Github repo.

Thank you for reading!

--

--

Dmytro Nasyrov
Pharos Production

We build high-load software. Pharos Production founder and CTO.