Streaming Video Menggunakan Kafka

Alfi Salim
BISA.AI
Published in
5 min readApr 21, 2020
Photo by Kon Karampelas on Unsplash

Dalam bidang Video Analytics ataupun berbagai bidang dimana video harus didistribusikan ke konsumer seperti Youtube, Twitch dan lain sebagainya. Teknik streaming video adalah solusi yang tepat supaya video dapat terdistribusi dengan baik untuk dikonsumsi.

Streaming adalah proses pengiriman data secara terus menerus, biasanya pengiriman data dilakukan dalam ukuran yang kecil (bukan dalam bentuk batch) dalam aliran yang tetap berkelanjutan saat data dihasilkan. Karena data yang dikirim adalah data dalam ukuran kecil, maka jika datanya besar harus dikonversi terlebih dahulu menjadi data yang berukuran kecil (biasanya dalam bentuk byte). Dengan streaming data, ini memungkinkan kita untuk dapat menganalisis data secara real time atau nyata dan memberi berbagai macam pengetahuan seperti pengukuran, aktivitas server, geolokasi perangkat dan lain sebagainya. Salah satu tools yang dapat digunakan untuk streaming data adalah Kafka.

Kafka merupakan platform Apache untuk streaming data yang didistribusikan. Secara implementasi, Kafka merupakan sistem pengiriman pesan yang mempunyai fitur publish-subscribe terdistribusi yang mana pesan dikelola dalam topik, baik partisi ataupun replikasinya.

Secara sederhana terdapat 3 poin utama dalam alur penggunaan Kafka:

  1. Producer, pesan (data) yang akan didistribusikan dihasilkan oleh producer. Producer akan menentukan pesan apa yang akan dikirimkan dan ke topic mana pesan itu akan dikirim. Produsen dapat melampirkan key ke setiap pesan untuk menjamin bahwa semua pesan dengan key yang sama akan sampai pada topik atau partisi topic yang sesuai.
  2. Topic, merupakan sebuah log yang menerima pesan dari Producer dan menyimpan pesan tersebut dalam partisinya.
  3. Consumer, pesan yang telah disimpan dalam topic, dapat dibaca oleh Consumer. Consumer dapat membaca sekumpulan partisi dari pilihan topik yang telah ditentukan. Consumer ini dapat dibuat secara berkelompok (consumer group). Bila Consumer berbentuk grup, ini berarti bahwa Consumer subscribe pada Topic yang sama, maka pesan yang akan didistribusikan ke Consumer akan dibagikan sedemikian rupa agar tidak terjadi rendundansi data, atau data yang sama.

Mungkin dalam tulisan ini kita tidak akan secara rinci membahas mengenai Kafka. Pembahasan Kafka hanya sebatas apa yang nantinya akan diimplementasikan pada tulisan ini.

Implementasi

Dalam tulisan ini saya akan mengimplementasikan streaming video dengan Kafka menggunakan bahasa pemrograman Python. Di sini saya asumsikan bahwa Kafka dan berbagai requirement-nya sudah terinstal di masing-masing komputer teman-teman. Kalau belum, bisa diikuti cara instalnya pada website resmi Kafka di http://kafka.apache.org/quickstart.

Langkah awal adalah dengan membuka Terminal dan ganti direktorinya ke direktori Kafka. Pada tulisan ini saya menggunakan OS Windows, jika kalian menggunakan OS Linux/Mac secara umum sama saja caranya, mungkin nanti tinggal disesuaikan saja sedikit perintahnya.

Change Ke Direktori Kafka

Selanjutnya jalankan ZooKeeper. ZooKeeper berfungsi untuk centralized service for distributed systems, artinya ZooKeeper menyediakan konfigurasi terpusat untuk sistem terdistribusi yang besar. Cara mengaktifkan ZooKeeper adalah dengan menuliskan perintah berikut pada Terminal.

.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties

Setelah ZooKeeper aktif, selanjutnya aktifkan Kafka dengan cara membuka Terminal baru lalu tuliskan perintah berikut.

.\bin\windows\kafka-server-start.bat config\server.properties

Kita bisa membuat Topic sesuai dengan apa yang kita inginkan, seperti berapa banyak nama, banyak partisi dalam Topic, replikasi pada Topic dan lain sebagainya dengan menuliskan perintah berikut pada Terminal.

.\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor <jumlah replikasi> --partition <jumlah partisi> --topic <nama topic>

Namun pada tulisan ini, saya membuat Topic secara default tanpa mengatur partisi dan replikasinya, default dari keduanya adalah 1. Saya langsung membuat Topic melalui kode pada Producer dan Consumer.

Selanjutnya buat file baru dan beri nama producer.py, tuliskan kode berikut pada file tersebut.

from time import sleep
from kafka import KafkaProducer
import cv2producer = KafkaProducer(bootstrap_servers=['localhost:9092'])video = cv2.VideoCapture('test2.mp4') #Direktori & Nama file video
count = 0
if (video.isOpened() == False):
print("Unable to read camera feed")
while(True): success, frame = video.read()
ret, img = cv2.imencode('.jpg', frame)
producer.send('videoStream', img.tobytes())
sleep(5)
video.release()
print('Sukses!')

Kode di atas berperan sebagai Producer, producer mengirimkan video dengan cara dipecah perframe, selanjutnya tiap framenya dikonversi menjadi bytes lalu dikirimkan ke Topic.

  • Import semua library yang dibutuhkan. Pada Producer, kita menggunakan 3 library, yaitu library time, library KafkaProducer pada Python dan library opencv.
  • Inisialisasi Kafka Producer dengan fungsi KafkaProducer(**configs), kita juga bisa mengatur konfigurasi dari Producer melalui fungsi tersebut. Di sini saya hanya mengatur konfigurasi alamat IP nya saja.
  • Read video dengan fungsi dari library opencv, yaitu cv2.VideoCapture()
  • Split video perframe dan lakukan perulangan while. Tiap frame dikonversi menjadi gambar dengan ekstensi file .jpg, kemudian konversi gambar kedalam bytes untuk dikirimkan ke dalam Topic Kafka.
  • Kirim hasil konversi ke Topic dengan fungsi .send(<nama topic>, <data>.

Buat file baru lagi dan beri nama consumer.py. Kemudian tuliskan kode berikut pada file tersebut.

from kafka import KafkaConsumerimport numpy as np
import cv2
consumer = KafkaConsumer('videoStream', bootstrap_servers=['localhost:9092'])def get_video():
for message in consumer:
frame = np.frombuffer(message.value, dtype='uint8')
yield cv2.imdecode(frame, cv2.IMREAD_COLOR)
for value in get_video():
cv2.imshow('frame', value)

if cv2.waitKey(1) == ord('q'):
break
cv2.destroyAllWindows()

Kode di atas berperan sebagai Consumer, Consumer mengambil data dari Topic yang sudah berisi data yang dikirimkan dari Producer. Data yang diambil dikonversi kembali menjadi tipe data yang sebenarnya. Kemudian dilakukan decode kedalam gambar yang berwarna.

  • Import semua library yang dibutuhkan, pada Consumer kita membutuhkan 3 library, yaitu library KafkaConsumer, library numpy, dan library opencv.
  • Inisialisasi Kafka Consumer dengan fungsi KafkaConsumer(**configs), kita juga bisa mengatur configurasi dari consumer melalui fungsi tersebut. Disini saya hanya mengatur config alamat IP dan nama topicnya saja.
  • Buatlah sebuah fungsi sebagai pengambil data dan mengkonversi data yang diambil menjadi gambar berwarna. Di sini fungsi tersebut saya beri nama get_video()
  • Konversi data yang diambil kedalam tipe data yang sebenarnya. Di sini, tipe data yang sebenarnya adalah ndarray, maka data yang diambil dalam bentuk bytes dikonversi terlebih dahulu menjadi ndarray dengan fungsi np.frombuffer(<data>, <tipe data>)
  • Decode data menjadi gambar berwarna menggunakan fungsi cv2.imdecode(), kemudian kembalikan value menggunakan keyword yield, keyword yield berfungsi untuk mengembalikan nilai pada sebuah generator
  • Tampilkan gambar tersebut dengan cara looping value pada fungsi get_video() dan tampilkan menggunakan fungsi dari library opencv, yaitu cv2.imshow()

Jalankan file consumer.py dengan mengikuti cara berikut

cd C:/kafka_2.12-2.4.1

Buka Terminal baru dan ganti direktori ke tempat penyimpanan file consumer.py. Di sini saya menyimpan file tersebut dalam direktori Kafka, kemudian jalankan consumer.py dengan perintah berikut

python consumer.py
Running consumer

Terakhir, buka Terminal baru lagi untuk menjalankan file producer.py. Cara menjalankannya sama dengan cara menjalankan file consumer.py. Pertama change direktori ke tempat file berada, dan jalankan filenya.

python producer.py
Running producer

Jika berhasil, akan muncul video hasil dari streaming menggunakan Kafka. Adapun hasilnya sebagai berikut:

Sekian pembahasan mengenai streaming video di Kafka menggunakan Python kali ini. Semoga bermanfaat, terima kasih.

Referensi

  1. https://github.com/mrxmamun/kafka-python-camera-stream

--

--