“Real-Time Music Data Streaming Using Apache Kafka: Leveraging Avenged Sevenfold Data from Spotify”

Ahmad Kamiludin
Data Engineering Indonesia
15 min readJul 16, 2024

Introduction

In today’s data-driven world, businesses are always looking for ways to use their data to make better decisions and stay ahead of the competition. Real-Time Data Streaming is crucial in this effort because it lets companies process and analyze data as soon as it’s created, providing instant insights and helping them make quick, informed decisions.

To illustrate the power of Real-Time Data Streaming, let’s delve into the world of Avenged Sevenfold’s music data sourced from Spotify. Using data from the Spotify API, we can uncover valuable insights into the listening patterns and popularity of this iconic band.

To transform raw data into actionable insights, we rely on a powerful trio of tools:

  • Apache Kafka: The backbone for real-time data streaming and processing.
  • Docker: The containerization platform ensures seamless and scalable deployment.
  • Looker Studio: The data visualization tool that brings insights to life.

By integrating these tools, we can create a streamlined data transformation pipeline. Apache Kafka handles task automation and real-time processing, Docker ensures seamless deployment, and Looker Studio provides intuitive visualization. This approach fosters collaboration among data engineers, analysts, and business users, ensuring that everyone works with the same high-quality Avenged Sevenfold data from Spotify to drive informed decisions.

Through this project, you’ll not only gain valuable insights into the music landscape of Avenged Sevenfold but also acquire practical experience in building and managing a real-time data streaming pipeline — a highly coveted skill set in today’s data-driven world. So, are you ready to unlock the secrets of Avenged Sevenfold’s music data? Let’s get started!

Setting the Configuration

Installing Apache Kafka with Docker

In this article, the version of Apache Kafka used is from the Confluent Platform. In this step, you begin by downloading a Docker Compose file. The docker-compose.yml file configures ports and Docker environment variables, such as the replication factor and listener properties for the Confluent Platform and its components. Furthermore, the Confluent Platform image specified in the file utilizes the new Kafka-based KRaft metadata service, which provides numerous benefits.

To get started, follow these steps:

Download or copy the contents of the Confluent Platform KRaft all-in-one Docker Compose file:

wget https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.6.1-post/cp-all-in-one-kraft/docker-compose.yml

Start the Confluent Platform stack in detached mode:

docker-compose up -d

Each Confluent Platform component will start in a separate container. Your output should resemble:

Creating broker ... done
Creating schema-registry ... done
Creating rest-proxy ... done
Creating connect ... done
Creating ksqldb-server ... done
Creating control-center ... done
Creating ksql-datagen ... done
Creating ksqldb-cli ... done

Verify that the services are up and running:

docker-compose ps

Your output should resemble:

Name                    Command               State                               Ports
--------------------------------------------------------------------------------------------------------------
broker /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp,:::9092->9092/tcp,
0.0.0.0:9101->9101/tcp,:::9101->9101/tcp
connect /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp,:::8083->8083/tcp, 9092/tcp
control-center /etc/confluent/docker/run Up 0.0.0.0:9021->9021/tcp,:::9021->9021/tcp
ksql-datagen bash -c echo Waiting for K ... Up
ksqldb-cli /bin/sh Up
ksqldb-server /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp,:::8088->8088/tcp
rest-proxy /etc/confluent/docker/run Up 0.0.0.0:8082->8082/tcp,:::8082->8082/tcp
schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp,:::8081->8081/tcp

After a few minutes, if any component’s state isn’t “Up”, rerun the docker-compose up -d command, or try restarting the specific component using docker-compose restart <component-name>. For example:

docker-compose restart control-center

Create Kafka Topics

In the Confluent Platform, real-time streaming events are stored in a Kafka topic, an append-only log that serves as Kafka’s primary organizational unit. In this step, you will create three topics using the Control Center for Confluent Platform. Control Center offers features for constructing and monitoring production data pipelines and event streaming applications. The topics you will create are named artist-topic, album-topic, and track-topic.

Confluent Control Center allows you to create topics in the user interface with just a few clicks.

  • Go to the Control Center at http://localhost:9021. It may take a minute or two for Control Center to start and load.
  • Click on the controlcenter.cluster tile.
  • From the navigation menu, select Topics to view the list of topics. Click on + Add topic to begin creating the topics.
  • In the Topic name field, type artist-topic and click Create with defaults. Repeat these steps to create the album-topic and track-topic.

Create Topic Schema

We can use the Schema Registry feature in the Control Center to manage Confluent Platform topic schemas. With it, we can:

  1. Create, edit, and view schemas
  2. Compare schema versions
  3. Download schemas

The Schema Registry performs validations and compatibility checks on schemas. To create a Schema Registry, follow these steps:

  • Select a cluster
  • Click Topics on the menu
  • Select a topic

The topic overview page is displayed

  • Click the Schema tab. You are prompted to set a message value schema.
  • Click Set a Schema. The Schema editor appears pre-populated with the basic structure of an Avro schema to use as a starting point if desired.
  • Select a schema format type: Avro, JSON, or Protobuf. Here, we will use JSON.
  • Repeat the above steps if you want to create a Key Schema and also repeat the schema creation process above for the other topics.

Build Producer and Consumer

Let’s create the Python producer application by pasting the following code into a file producer.py.

import json
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from confluent_kafka import Producer, KafkaException
from datetime import datetime

def read_config():
config = {}
try:
with open("client.properties") as fh:
for line in fh:
line = line.strip()
if len(line) != 0 and line[0] != "#":
parameter, value = line.strip().split('=', 1)
config[parameter] = value.strip()
except FileNotFoundError:
print("Error: Configuration file 'client.properties' not found.")
except Exception as e:
print(f"Error reading configuration file: {e}")
return config

def get_artist_data(sp, artist_name):
try:
results = sp.search(q='artist:' + artist_name, type='artist')
artist = results['artists']['items'][0]
artist_data = {
'id': artist['id'],
'name': artist['name'],
'followers': artist['followers']['total'],
'genres': artist['genres'],
'popularity': artist['popularity'],
'timestamp': datetime.now().isoformat()
}
return artist_data
except Exception as e:
print(f"Error fetching artist data: {e}")
return None

def get_albums_data(sp, artist_id):
try:
albums = sp.artist_albums(artist_id, album_type='album')
albums_data = []
for album in albums['items']:
albums_data.append({
'id': album['id'],
'artist_id': artist_id,
'name': album['name'],
'release_date': album['release_date']
})
return albums_data
except Exception as e:
print(f"Error fetching albums data: {e}")
return []

def get_tracks_data(sp, album_id):
try:
tracks = sp.album_tracks(album_id)
tracks_data = []
for track in tracks['items']:
track_info = sp.track(track['id'])
audio_features = sp.audio_features(track['id'])[0]
tracks_data.append({
'id': track['id'],
'album_id': album_id,
'name': track['name'],
'popularity': track_info['popularity'],
'danceability': audio_features['danceability'],
'energy': audio_features['energy'],
'key': audio_features['key'],
'loudness': audio_features['loudness'],
'mode': audio_features['mode'],
'speechiness': audio_features['speechiness'],
'acousticness': audio_features['acousticness'],
'instrumentalness': audio_features['instrumentalness'],
'liveness': audio_features['liveness'],
'valence': audio_features['valence'],
'tempo': audio_features['tempo']
})
return tracks_data
except Exception as e:
print(f"Error fetching tracks data: {e}")
return []

def produce_to_kafka(data, config, topic):
producer = Producer(config)

def delivery_callback(err, msg):
if err:
print(f'ERROR: Message failed delivery: {err}')
else:
key = msg.key().decode('utf-8') if msg.key() else None
value = msg.value().decode('utf-8') if msg.value() else None
print(f"Produced event to topic {msg.topic()}: key = {key} value = {value}")

try:
producer.produce(topic, value=json.dumps(data).encode('utf-8'), callback=delivery_callback)
producer.flush()
print(f"Produced data to topic {topic}")
except KafkaException as e:
print(f"Error producing to Kafka: {e}")

def main():
config = read_config()

# Check if the configuration was loaded correctly
if not config:
print("Configuration not loaded. Exiting...")
return

# Set up Spotify credentials
client_id = 'your_spotify_client_id'
client_secret = 'your_spotify_client_secret'
sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials(client_id=client_id, client_secret=client_secret))

artist_name = 'Avenged Sevenfold'
artist_data = get_artist_data(sp, artist_name)

if artist_data is None:
print("Failed to fetch artist data. Exiting...")
return

artist_id = artist_data['id']
albums_data = get_albums_data(sp, artist_id)
tracks_data = []
for album in albums_data:
tracks_data.extend(get_tracks_data(sp, album['id']))

# Produce artist data
produce_to_kafka(artist_data, config, "artist-topic")

# Produce albums data
for album in albums_data:
produce_to_kafka(album, config, "album-topic")

# Produce tracks data
for track in tracks_data:
produce_to_kafka(track, config, "track-topic")

if __name__ == '__main__':
main()

This Python script fetches data from Spotify about the band Avenged Sevenfold and sends it to Apache Kafka topics for real-time data processing.

  • Configuration Loading: The script starts by reading a configuration file named “client.properties’’. This file contains settings needed to connect to Kafka.
  • Spotify API Setup: The script uses the Spotify API to retrieve data. It sets up Spotify credentials using “client_id’’ and “client_secret”.
  • Fetching Data from Spotify:
    Artist Data: It searches for the artist “Avenged Sevenfold” and collects information such as the artist’s ID, name, number of followers, genres, popularity, and a timestamp of when the data was fetched.
    Albums Data: Using the artist’s ID, it retrieves all albums by the artist, including each album’s ID, name, and release date.
    Tracks Data: For each album, it fetches all tracks, gathering details like track ID, name, popularity, and various audio features (e.g., danceability, energy, tempo).
  • Producing Data to Kafka:
    Kafka Producer: The script sets up a Kafka producer using the configuration loaded earlier.
    Sending Data: It sends the collected artist, album, and track data to different Kafka topics (`artist-topic`, `album-topic`, and `track-topic`). It includes a callback function to confirm whether the data was successfully sent or if an error occurred.
  • Main Function Execution: The script’s main function orchestrates the entire process: loading configuration, setting up Spotify API credentials, fetching data, and producing it to Kafka topics. If the configuration fails to load or the artist data cannot be fetched, the script exits. Otherwise, it proceeds to collect and send the data to the specified Kafka topics.

After that, let’s also create the Python consumer application by pasting the following code into a file consumer.py.

from confluent_kafka import Consumer, KafkaException, KafkaError
import json

def read_config():
config = {}
try:
with open("client.properties") as fh:
for line in fh:
line = line.strip()
if len(line) != 0 and line[0] != "#":
parameter, value = line.strip().split('=', 1)
config[parameter] = value.strip()
except FileNotFoundError:
print("Error: Configuration file 'client.properties' not found.")
except Exception as e:
print(f"Error reading configuration file: {e}")
return config

def consume_from_kafka(config, topics):
# Update configuration for consumer
config.update({
'group.id': 'my-consumer-group',
'auto.offset.reset': 'earliest'
})

consumer = Consumer(config)

def print_assignment(consumer, partitions):
print('Assignment:', partitions)

consumer.subscribe(topics, on_assign=print_assignment)
topic_messages = {topic: False for topic in topics}

try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaException(msg.error())
data = json.loads(msg.value().decode('utf-8'))
print(f"Consumed event from topic {msg.topic()}: {json.dumps(data, indent=2)}")
topic_messages[msg.topic()] = True

# Check if all topics have been consumed
if all(topic_messages.values()):
print("All topics have been successfully consumed.")
break
except KeyboardInterrupt:
pass
finally:
consumer.close()

def main():
config = read_config()

# Check if the configuration was loaded correctly
if not config:
print("Configuration not loaded. Exiting...")
return

# List of topics to consume from
topics = ['artist-topic', 'album-topic', 'track-topic']

consume_from_kafka(config, topics)

if __name__ == '__main__':
main()

This code consists of three primary functions for consuming messages from Kafka: “read_config”, “consume_from_kafka”, and “main”.

  • The “read_config” function reads the “client.properties” configuration file into a dictionary, handling errors if the file is not found or if there are issues reading it.
  • The “consume_from_kafka” function consumes messages from specified Kafka topics. It updates the consumer configuration, creates a “Consumer” object, and sets up a callback for partition assignments. It then enters a loop to poll for messages, handle them, and track which topics have been consumed, stopping when all topics are processed or upon receiving a “KeyboardInterrupt”.
  • The “main” function reads the configuration, checks for successful loading, define the topics to consume, and calls “consume_from_kafka” to start the process. The code executes “main” if the file is run as the main program, ensuring Kafka consumption only starts when not imported as a module.

Start Streaming the Data

Set Up MongoDB Connection with Kafka Connect

Before starting the data streaming process, it is essential to configure the connection between MongoDB and Kafka Connect to ensure that data is directly streamed to MongoDB once processed. Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other data systems. It makes it simple to quickly define connectors that move large data sets in and out of Kafka. An export connector can deliver data from Kafka topics into secondary indexes like Elasticsearch, into batch systems such as Hadoop for offline analysis, or into document databases like MongoDB for real-time data storage and querying.

To move Kafka topics to MongoDB, we use specific connectors that facilitate this process. The MongoDB Kafka Connector is designed to stream data from Kafka to MongoDB seamlessly. Here are the steps to set up the connection:

  • Install the MongoDB Kafka Connector using the Confluent Hub Client. You can find the connector and follow the installation steps here: MongoDB Kafka Connector. Follow the provided instructions to complete the installation.
  • Once the MongoDB Kafka Connector is installed, restart your Kafka services. This will ensure that the connector is properly integrated into your Kafka environment.
  • After restarting, navigate to the Control Center, specifically to the Connect tab. You should see the MongoDB Kafka Connector listed as an available connector. In the Connect tab, click connect-default, then click Add connector.
  • After that, you will see the MongoDB Kafka Connector listed in the Control Center. Since we aim to transfer topics from Kafka to MongoDB, select the MongoSinkConnector from the available options.
  • Use the following code to create a connector that moves data from Kafka topics to MongoDB. Run this code in your command-line interface (CLI):
curl -X PUT -H "Content-Type: application/json" --data '{
"name": "(connector_name)",
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"topics": "TOTAL_TRACK",
"connection.uri": "(your_mongodb_connection_uri)",
"database": "av7-db",
"collection": "total_track",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"errors.log.enable": "true",
"errors.tolerance": "all"
}' http://localhost:8083/connectors/(connector_name)/config

In the code above, replace the placeholders (connector_name) with your desired connector name, and (your_mongodb_connection_uri) with your MongoDB connection URI. This command configures a connector to move data from the Kafka topic, namely “TOTAL_TRACK” (this is a new topic created from processing the “track-stream” topic) to the MongoDB database (av7-db) and the collection “total_track”. The configuration includes settings for the MongoDB connection URI, the database and collection names, and the data conversion format. Repeat this step for the other Kafka topics you want to move to MongoDB. Here, I will create connectors for other new topics: “TOTAL_ALBUM” (resulting from processing the “album_stream” topic) and “POPULAR_TRACK” (resulting from processing the “track_stream” topic). These new topics will be created during data processing using ksqlDB.

Produce and Consume Topics to Kafka

Execute the producer and consumer code that was previously developed using the Command Line interface.

python producer.py
python consumer.py

Upon successful consumption of all topics, a notification will appear, similar to the provided image.

Verify in Kafka through the Control Center that all topics have been successfully consumed. If successful, the data will be displayed as shown in the image below.

Stream Processing and Direct Streaming from ksqlDB to MongoDB

ksqlDB is a specialized database designed to assist developers in creating stream processing applications using Apache Kafka. Through the ksqlDB user interface available in the Confluent Control Center, you can build event streaming applications. Control Center allows you to create Kafka topics and develop persistent queries using the ksqlDB query editor. In this step, we will create streams and tables using familiar SQL syntax. By registering a stream or table on a topic, you can utilize the stream or table in SQL statements.

  • In the navigation menu, select ksqlDB.
  • Click on the ksqldb1 cluster to open its page. There are tabs for editing SQL statements and monitoring the streams and tables you create.
  • Copy the following SQL into the editor window. This statement will create streams named “album_stream”.
CREATE STREAM album_stream (
id STRING,
artist_id STRING,
name STRING,
release_date STRING
) WITH (
KAFKA_TOPIC='album-topic',
VALUE_FORMAT='JSON'
);

Repeat the above steps to create “track_stream” and “popular_track” based on this SQL code.

CREATE STREAM track_stream (
id STRING,
album_id STRING,
name STRING,
popularity INT,
danceability DOUBLE,
energy DOUBLE,
key INT,
loudness DOUBLE,
mode INT,
speechiness DOUBLE,
acousticness DOUBLE,
instrumentalness DOUBLE,
liveness DOUBLE,
valence DOUBLE,
tempo DOUBLE
) WITH (
KAFKA_TOPIC='track-topic',
VALUE_FORMAT='JSON'
);
CREATE STREAM popular_track AS
SELECT
name,
popularity
FROM track_stream
WHERE popularity > 60;
  • Click Run query to execute the statement. In the result window, your output should resemble:
  • Use a SELECT query to verify that data is flowing through your stream.
select * from ALBUM_STREAM EMIT CHANGES;
  • Copy the following SQL into the editor and click Run query. Your output should resemble:

Click Stop to terminate the SELECT query.

  • Next, copy the following SQL into the editor window and click Run query. This statement will create tables named “total_album” and “total_track”.
CREATE TABLE total_album AS
SELECT
artist_id,
COUNT(*) AS album_count
FROM album_stream
GROUP BY artist_id;
CREATE TABLE total_track AS
SELECT
'id' AS id,
COUNT(*) AS track_count
FROM track_stream
GROUP BY 'id';
  • Use a SELECT query to verify that data is flowing through your table. Copy the following SQL into the editor and click Run query.
select * from TOTAL_ALBUM EMIT CHANGES;

Click Stop when done. All new topics resulting from the above processing using KsqlDB (TOTAL_TRACK, TOTAL_ALBUM, and POPULAR_TRACK) will automatically be stored in our MongoDB database.

Create a Dashboard using Looker Studio

To create a dashboard using Looker Studio with data from MongoDB, we utilize CData Connect Cloud to bridge the connection between MongoDB and Looker Studio. CData Connect Cloud is a cloud-based data connectivity service that provides real-time access to cloud and on-premises data sources. It enables users to connect their data sources to various BI tools, applications, and services seamlessly. With CData Connect Cloud, you can access data from multiple sources, including MongoDB, and use it in Looker Studio to create comprehensive dashboards and reports. Steps to Connect MongoDB to Looker Studio

Sign up for a CData Connect Cloud account if you don’t already have one. Next, log in to your CData Connect Cloud account.

In the CData Connect Cloud dashboard, navigate to the “Connections” tab. Click on “Add Connection” and select “MongoDB” from the list of available connectors.

Provide the necessary connection details for your MongoDB instance, such as the server, database name, and any required authentication credentials.

Test the connection to ensure it is successful, then save the connection. After successfully connecting to MongoDB, navigate to the “Client Tools” tab in CData Connect Cloud. Select Looker Studio from the list of client tools.

Follow the instructions to authorize and enable the connection between CData Connect Cloud and Looker Studio. After that, create a Dashboard based on the data in MongoDB, which is already connected to CData Connect Cloud. This time, we will not demonstrate how to create the Dashboard. Here is a simple example of a Dashboard we created earlier.

Conclusion

In this article, we explored a comprehensive workflow for real-time music data streaming using Apache Kafka, focusing on data from Avenged Sevenfold sourced from Spotify. We delved into setting up the Confluent Platform, creating and managing Kafka topics, and utilizing ksqlDB to process and transform streaming data with SQL-like queries. Following this, we used Kafka Connect to seamlessly transfer data to MongoDB. Furthermore, we demonstrated how to visualize this data using Looker Studio, connected via CData Connect Cloud. By leveraging these tools, we can efficiently handle and analyze real-time streaming data, providing valuable insights and dynamic visualizations for enhanced data-driven decision-making. Whether for personal projects or professional applications, this approach exemplifies the power and flexibility of modern data streaming technologies.

If you enjoyed this article or found it helpful, please follow my Medium account for more insights and tutorials on data streaming and other tech topics. Your support is greatly appreciated! For access to the complete source code, visit our GitHub repository. Thank you for reading and see you next time!

--

--