In this article we will do the following:
- Download Apache Pulsar
- Start a local standalone pulsar cluster
- Install Python pulsar-client library
- Start a consumer application and subscribe to a topic using Python
- Start a producer application and publish messages to a topic using Python
Before we dive into the coding, lets have a look at what Apache Pulsar is.
In simple terms, Apache Pulsar is a messaging solution for sending data from one server to another. To build this system, we have to start a Pulsar cluster. The cluster can be configured to run on multiple machines or a single machine locally. When we run the cluster on single machine we call it a standalone cluster. In this article we are going to run a standalone cluster.
When we start a cluster, that cluster contains a broker that allows other clients to publish and read messages, and the broker actually manages the writes and reads to something called topics. Topics are the entities where the messages are actually published, stored and read from. In general a cluster contains number of brokers and the brokers manages number of topics.
The application which writes or publishes message to the topic are called producer application and the application which reads or consumes messages from the topics are called consumer application. Any application that interacts with Pulsar to read or write messages are called Pulsar Clients. Any client that wants to write or read messages from any topic have to call the service with the following details:
- host-name and port number where the cluster is running
- name of the topic from/to which they want to read/write message
All the writes and reads are facilitated by the broker service running in the cluster. When a producer has to write a message to any topic, they interact with the broker with the above two details. The broker in turns takes the message from the producer application and writes in the topic that is mentioned by the producer application, and the same goes for consumer applications too.
Now that we know the nuts and bolts of Apache Pulsar, lets dive in to build one such system.
Note: The hands-on is done on Linux OS.
Downloading Apache Pulsar
Start a shell and ensure you have internet connectivity to download Apache Pulsar, and run the following command:
Unzip the downloaded content and move inside the extracted directory:
tar xvfz apache-pulsar-2.4.1-bin.tar.gz
Start the standalone cluster by running this command:
Installing Python Pulsar Library
Once you have started a cluster, keep this cluster running and open a second terminal to write two python applications, one for producer and one for consumer. But before that we have to install python pulsar-client library. Also, Pulsar only supports python2.x version, so we have to install pulsar-client for python2.x. Depending on your python2.x version use pip2.7 or pip2.6 accordingly.
pip2.7 install pulsar-client
Once you have installed pulsar-client start a python shell and try importing pulsar to see if it is installed. In the shell type the two following commands:
If you do not see any error you are good to go ahead
Writing Consumer Application in Python
We will first write and deploy consumer application so that it is ready and waiting for messages when producer application sends any message.
Open any editor and write the following code:
Note: In the below code, instead of localhost you can use the actual host-name of your machine like 10.000.00.000
import pulsarclient = pulsar.Client(‘pulsar://localhost:6650’)consumer = client.subscribe(‘persistent://sample/standalone/ns1/my-topic’, ‘my-subscription’)while True:
msg = consumer.receive()
Lets go through the code:
In the first line we import the pulsar library.
In the second line, we create a ‘client’ object using the .Client method by giving the address and port of the machine where pulsar cluster is running.
In the 3rd line, we use the ‘client’ object we created to subscribe to a topic so that we can receive message posted to that topic, and to do so we give the name of the topic and our subscription name (more on that later).
We then run a while loop that will run forever to keep checking if any message has been posted to that topic. When we receive any message, we extract the data from the message using .data() method and print it.
Once we read that message, we have to send an acknowledgement (more on that later) to the broker that the message has been read.
Finally we close our connection.
Save the above file as consumer.py and run it using the following command:
You should see some verbiage as below:
Writing Producer Application in Python
Your consumer is now ready to consume messages. Lets deploy a producer application now. Open an editor again and create a file producer.py:
import pulsarclient = pulsar.Client(‘pulsar://localhost:6650’)producer = client.create_produce(‘persistent://sample/standalone/ns1/my-topic’)producer.send('Hello Pulsar')client.close()
Lets go through the code that we have written:
In the 2nd line, we again create a ‘client’ object by giving the cluster details.
We then use the .create_producer method on the ‘client’ object to tell the broker which topic to push the message to.
We then push the actual message ‘Hello Pulsar’ using the .send method on the ‘producer’ object we created.
We finally close the connection.
Save the above file, open a third terminal and run this file:
You should see the following information messages:
In the shell where consumer.py is deployed, you should see one message like the following:
Congratulations! You just created a system where you can post message to a pulsar topic and read it as well.
We skipped a few things in the process and lets go through them.
In the consumer.py application, we gave two arguments to the .subscribe method. We already went through what the 1st argument is, the 2nd argument is the name of the subscription that the consumer uses to subscribe to a topic. The subscription name is required because the broker needs to know which consumers has consumed how many messages so that it can provide the consumers with the next message. Let’s consider an example.
Lets say you have a topic to which a producer has sent 5 messages, and there are two consumers on the same topic, consumer A and consumer B. Without a subscription name, broker will not know which consumer has consumed how many messages. When the consumer has a subscription name, the broker maintains a log with the information of which consumer has consumed how many messages. So let’s say consumer A and consumed 3 messages and consumer B has consumed 1 message, the next time consumer A tried to consume a message, broker will know that consumer A has already read 3 messages so give the consumer A the 4th message, and similarly when consumer B tries to consume message the next time, broker will give consumer B the 2nd message. Also, in consumer.py we used .acknowledge method on the ‘consumer’ object. This is again necessary so that the broker can know that the message has been consumed and it can send the next message when consumer tries to read the next time. If you don’t acknowledge the received message, you will keep getting the same message that is sitting in the topic.
That’s it for now! If you have any other tutorial requests or clarifications leave a comment and I will get back to you!