Kafka and Python— Let’s learn together

According to its official apache page: “Kafka is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.”

A small glimpse according to Stackshare.io of whom is using Kafka

Messages are organized into “Topics”. Producers push or publish the messages. Consumers pull the messages. As a consumer you subscribe to a topic to get the messages. Kafka runs in a cluster, each node is referred to as a broker.

A topic can have multiple partitions which are spread across multiple brokers. You can parallelize the consumers to pull from different topic partition.

Each partition is essentially a log file written sequentially. You can dictate the amount of time to store the data. Each broker has many partitions which can be replicated across additional brokers.

Each partition has a leader, which is where writes are sent to . Consistency and availability can be set .

When a consumer consumes a message from Kafka, it uses the message offset to keep track of what messages have been consumed. If it consumes the first 50 messages in a topic, when new messages come in it will start at 50th offset key and start consuming those unread messages. How offsets are handled and committed can be configured from fully automatic to manually through your application/code when you create the consumer.

Consumer Groups are comprised of multiple consumers. Each consumer receives a unique partition of data that is processed as an entire group which allows for horizontal scaling.

ZooKeeper/Zetcd in short is used to make sure all the moving pieces of a Kafka cluster work together seamlessly. It helps with synchronization across all brokers and keeps track of broker system state (heartbeats), replication, and manages broker topic registries.

Interacting with Python

There are a couple of different libraries ( pykafka and confluent-kafka) available for python that are wrappers of the C client. Additional installation steps are needed to take advantage of the C client: librdkafka.

Example with confluent-kafka:

pip install confluent-kafka

Producer

from confluent_kafka import Producer
# Change to point to your kafka installation
p = Producer({'bootstrap.servers': 'localhost'})

def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

for data in some_data_source:
# Trigger any available delivery report callbacks from previous produce() calls
p.poll(0)

# Asynchronously produce a message, the delivery report callback
# will be triggered from poll() above, or flush() below, when the message has
# been successfully delivered or failed permanently.
p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)

# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()

Consumer

from confluent_kafka import Consumer, KafkaError


c = Consumer({
'bootstrap.servers': 'mybroker',
'group.id': 'mygroup',
'default.topic.config': {
'auto.offset.reset': 'smallest'
}
})

c.subscribe(['mytopic'])

while True:
msg = c.poll(1.0)

if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break

print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()