Reddit Asynchronous Parser (Part 1/2)

Javidan Karimli
9 min readJun 23, 2024

--

In recent years, the importance of data created by people has grown significantly due to the rise of artificial intelligence. Social media platforms are where a lot of this data resides. Although these platforms provide ways to access detailed information through their API interfaces, they often come with limitations like rate limits and pricing. So, in this series of posts, we’ll be developing a system to collect public posts from specific Reddit subreddits, overcoming the challenges posed by these platform limitations. To effectively store the parsed data, a detailed and sophisticated architecture is necessary to ensure scalability and flexibility.

Services and Overall Architecture

We implemented a streaming data architecture to address the dynamic and unstructured nature of the retrieved post-data. Considering the evolving nature of this data and its lack of structure, MongoDB emerged as the optimal choice for final storage. However, given the constraint of each subreddit being capable of holding only 1000 instances of posts, necessitating prolonged processing times, we integrated the PostgreSQL database for process management and informed decision-making based on historical data.

To orchestrate the streaming process effectively, we employed Apache Kafka and Apache Spark Structured Streaming. While Apache Kafka offers robust stream processing capabilities, it lacks built-in visualization and system control functionalities. To address this, we augmented our architecture with Confluent Control Center and Apache Zookeeper to visualize and manage the entire system seamlessly. Here is the overall system architecture for efficiently parsing and storing the data.

Apache Kafka Cluster

Apache Kafka serves as a distributed data streaming platform for real-time processing, enabling the publication, subscription, storage, and processing of records. In our setup, Kafka facilitates the transfer of parsed data to Apache Spark for processing.

Deploying an Apache Kafka cluster entails more than just the Kafka broker. Apache Zookeeper manages cluster coordination, ensuring smooth task execution and configuration management.

A schema registry is crucial for intelligently transferring Kafka topic message data between producers and consumers, maintaining schema consistency. Additionally, control-center software offers visual monitoring of Kafka topics and messages, enhancing operational oversight.

zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
healthcheck:
test: ['CMD', 'bash', '-c', "echo 'ruok' | nc localhost 2181"]
interval: 10s
timeout: 5s
retries: 5
networks:
- Reddit

broker:
image: confluentinc/cp-server:7.4.0
hostname: broker
container_name: broker
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://${KAFKA_INSIDE_HOST}:${KAFKA_INSIDE_PORT},PLAINTEXT_HOST://${KAFKA_EXTERNAL_HOST}:${KAFKA_EXTERNAL_PORT}
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: ${KAFKA_INSIDE_HOST}:${KAFKA_INSIDE_PORT}
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'false'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
networks:
- Reddit
healthcheck:
test: ["CMD", "bash", "-c", 'nc -z localhost 9092']
interval: 10s
timeout: 5s
retries: 5

schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
hostname: schema-registry
container_name: schema-registry
depends_on:
broker:
condition: service_healthy
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: '${KAFKA_INSIDE_HOST}:${KAFKA_INSIDE_PORT}'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
networks:
- Reddit
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8081/"]
interval: 30s
timeout: 10s
retries: 5

control-center:
image: confluentinc/cp-enterprise-control-center:7.4.0
hostname: control-center
container_name: control-center
depends_on:
broker:
condition: service_healthy
schema-registry:
condition: service_healthy
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: '${KAFKA_INSIDE_HOST}:${KAFKA_INSIDE_PORT}'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
CONFLUENT_METRICS_ENABLE: 'false'
PORT: 9021
networks:
- Reddit
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9021/health"]
interval: 30s
timeout: 10s
retries: 5

As previously mentioned, our deployment comprises four services interconnected with the Kafka broker through a docker-compose file. Despite their apparent complexity, these services have straightforward configurations. Notably, for development purposes, our Kafka broker does not employ any authentication processes.

Variables such as ${KAFKA_INSIDE_HOST} represent environmental variables, providing a unified access point for the application and deployment. We will see more about it while deploying the whole application.

Following deployment, users can verify the success of the process by checking the operational status of the services.

https://localhost:9021

Apache Spark

Apache Spark is renowned for managing streaming processes or batch processes within Big Data environments. Our Apache Spark deployment adopts a minimal setup comprising one spark-master node and two spark-worker nodes. The spark-master node is allocated 2 CPU cores and 4 GB of RAM, while each worker node is assigned 1 CPU core and 1 GB of RAM.
Despite the anticipation of relatively low processing volumes for the forthcoming parsed data, the simplicity of this setup ensures that users can effortlessly scale their applications as needed.

To enable PySpark applications within the Apache Spark instance, the installation of py4j is necessary. Furthermore, our application requires access to the ‘/temp/checkpoint’ path to save Spark Streaming checkpoints, ensuring continuity from the previous state. Therefore, a custom spark image has been created as follows.

FROM bitnami/spark:3.4.1

USER root
RUN mkdir -p /temp/checkpoint
# Install additional libraries
RUN pip install py4j

To create a custom image, execute the following command, ensuring that the tag name is set to ‘custom_spark_app’. This tag will be referenced within the docker-compose file for deploying Spark instances.

docker build . -f ./infrastructure/Apache_Spark.Dockerfile -t custom_spark_app 

Once the custom image is created, we proceed to deploy the Apache Spark instance. Indeed, to ensure smooth development and prevent extensive resource usage, I’ve restricted the resource access of the Spark master to 4 gigs of RAM and 2 CPU cores otherwise it will tend to use all system capabilities while under processing pressure. This prevents any potential issues caused by excessive resource consumption. I strongly advise limiting the resources in advance.

You can access the Spark UI by navigating to http://localhost:9090/ in your web browser. Additionally, we’ve set up a mount ./temp/scripts to ./spark_src/scripts to directly place the PySpark files in the container. This allows you to execute them by triggering docker exec. This streamlined setup facilitates the efficient execution of Spark processes and simplifies development tasks. It also bound the ‘Reddit’ docker network.

spark-master:

image: custom_spark_app
command: bin/spark-class org.apache.spark.deploy.master.Master
ports:
- "9090:8080"
- "7077:7077"
volumes:
- ../spark_src/scripts:/temp/scripts
deploy:
resources:
limits:
cpus: '2'
memory: '4G'
environment:
- SPARK_LOCAL_IP=spark-master
- SPARK_WORKLOAD=master
networks:
- Reddit

spark-worker-a:
image: custom_spark_app
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
ports:
- "9089:8080"
- "7000:7000"
depends_on:
- spark-master
environment:
- SPARK_MASTER=spark://spark-master:7077
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=1G
- SPARK_DRIVER_MEMORY=1G
- SPARK_EXECUTOR_MEMORY=1G
- SPARK_WORKLOAD=worker
- SPARK_LOCAL_IP=spark-worker-a
volumes:
- ../spark_src/scripts:/temp/scripts
networks:
- Reddit

spark-worker-b:

image: custom_spark_app
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
ports:
- "9088:8080"
- "7001:7000"
depends_on:
- spark-master
environment:
- SPARK_MASTER=spark://spark-master:7077
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=1G
- SPARK_DRIVER_MEMORY=1G
- SPARK_EXECUTOR_MEMORY=1G
- SPARK_WORKLOAD=worker
- SPARK_LOCAL_IP=spark-worker-b
volumes:
- ../spark_src/scripts:/temp/scripts
networks:
- Reddit

PostgreSQL

We’ve deployed a single instance of PostgreSQL to manage and orchestrate the application. It’s crucial to include a storage path inside of docker-compose file under the volume section. We achieve this by declaring the variable inside of.env file to ensure permanent storage of application data. Without this configuration, restarting the container would result in the loss of all information.

Users also have the option to specify ports within the .env file, although it's highly recommended to use port 5432, as it's the default port for PostgreSQL instances. This simplifies setup and ensures consistency with common configurations. Users can also choose a Database to create and it will be used alongside the application by default.

  postgres:
image: postgres:latest
environment:
POSTGRES_USER: ${POSTGRES_REDDIT_USERNAME}
POSTGRES_PASSWORD: ${POSTGRES_REDDIT_PASSWORD}
POSTGRES_DB: ${POSTGRES_DATABASE}
volumes:
- ${POSTGRES_DATA_STORAGE_PATH}:/var/lib/postgresql/data

ports:
- "${POSTGRES_PORT}:5432"
networks:
- Reddit

MongoDB

Due to the dynamic and changeable nature of parsed data, we’ve opted for a NoSQL database solution to efficiently store structured data. MongoDB has been chosen as the document-based solution for this purpose. This deployment is primarily geared towards development purposes, and therefore, authentication has been excluded from the current setup. However, it’s worth noting that integrating authentication can be seamlessly incorporated into future releases.

A single instance of MongoDB has been deployed to serve as the primary storage option for the application. Just like with PostgreSQL, it’s essential to include a storage path in the .env file to ensure the permanent storage of application data. This ensures that data persists even if the container is restarted or redeployed, maintaining consistency and reliability.

  mongodb:
image: mongo:latest
container_name: my-mongodb
environment:
MONGO_INITDB_ROOT_USERNAME:
MONGO_INITDB_ROOT_PASSWORD:
ports:
- "${MONGO_PORT}:27017"
volumes:
- ${MONGO_DATA_STORAGE_PATH}:/data/db

You can start to deploy the application by cloning the repo from GitHub. You achieve this by executing. (Check out GitHub)

git clone https://github.com/Cavidan-oss/reddit_parser.git

After navigating into the copied repo you can start to deploy our application.

cd reddit_parser

As emphasized earlier, declaring environmental variables is crucial to ensure both the encryption of sensitive information and the establishment of a single point of credential access for the application. This approach enables seamless configuration adjustments, such as modifying ports or default Kafka topics, to align with your preferences. Below is an example of a .env file:

# Kafka Credentials
DEFAULT_KAFKA_TOPIC=Reddit

KAFKA_EXTERNAL_HOST=localhost
KAFKA_EXTERNAL_PORT=9092

KAFKA_INSIDE_HOST=broker
KAFKA_INSIDE_PORT=29092

# MongoDB Credentials
MONGO_HOST=host.docker.internal
MONGO_DATABASE=Reddit
MONGO_PORT=27017

MONGO_DATA_STORAGE_PATH=/c/path/to/storage # Absolute path which will be mounted

# PostgreSQL Credentials
POSTGRES_HOST=localhost
POSTGRES_PORT=6432
POSTGRES_DATABASE=RedditParser
POSTGRES_REDDIT_USERNAME=Admin
POSTGRES_REDDIT_PASSWORD=admin

POSTGRES_DATA_STORAGE_PATH=/c/path/to/storage

# App Related
WEB_USER_AGENT =

Defining “WEB_USER_AGENT” is vital for parsing the subreddit without actually opening up a browser instance and processing it in the background. From my experience, you can run the application without “WEB_USER_AGENT ” by triggering in open-browser mode, but it causes a forbidden error in closed-browser mode.

Keep in mind that all instances must connect to the Docker network named “Reddit”. You can achieve this by executing the following command:

docker network create Reddit

In the previous examples, all containers were connected to the Docker network named “Reddit”. This network creation command ensures seamless communication and coordination among the various components of the application.

Here is a sample Docker Compose file that includes 9 different containers. Keep in mind that this operation is resource-intensive and could use a large amount of RAM, leading to high CPU utilization as well. To reduce resource utilization, you can consider removing the control-center and schema-registry containers. However, this could potentially cause some unforeseen issues. Additionally, you may also consider reducing the number of Spark workers to 1 and adjusting the resource allocation limits for the Spark master.

version: '3.3'

services:
spark-master:

image: custom_spark_app
command: bin/spark-class org.apache.spark.deploy.master.Master
ports:
- "9090:8080"
- "7077:7077"
volumes:
- ../spark_src/scripts:/temp/scripts
deploy:
resources:
limits:
cpus: '2'
memory: '4G'
environment:
- SPARK_LOCAL_IP=spark-master
- SPARK_WORKLOAD=master
networks:
- Reddit

spark-worker-a:
image: custom_spark_app
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
ports:
- "9089:8080"
- "7000:7000"
depends_on:
- spark-master
environment:
- SPARK_MASTER=spark://spark-master:7077
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=1G
- SPARK_DRIVER_MEMORY=1G
- SPARK_EXECUTOR_MEMORY=1G
- SPARK_WORKLOAD=worker
- SPARK_LOCAL_IP=spark-worker-a
volumes:
- ../spark_src/scripts:/temp/scripts
networks:
- Reddit

spark-worker-b:

image: custom_spark_app
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
ports:
- "9088:8080"
- "7001:7000"
depends_on:
- spark-master
environment:
- SPARK_MASTER=spark://spark-master:7077
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=1G
- SPARK_DRIVER_MEMORY=1G
- SPARK_EXECUTOR_MEMORY=1G
- SPARK_WORKLOAD=worker
- SPARK_LOCAL_IP=spark-worker-b
volumes:
- ../spark_src/scripts:/temp/scripts
networks:
- Reddit

postgres:
image: postgres:latest
environment:
POSTGRES_USER: ${POSTGRES_REDDIT_USERNAME}
POSTGRES_PASSWORD: ${POSTGRES_REDDIT_PASSWORD}
POSTGRES_DB: ${POSTGRES_DATABASE}
volumes:
- ${POSTGRES_DATA_STORAGE_PATH}:/var/lib/postgresql/data

ports:
- "${POSTGRES_PORT}:5432"
networks:
- Reddit

zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
healthcheck:
test: ['CMD', 'bash', '-c', "echo 'ruok' | nc localhost 2181"]
interval: 10s
timeout: 5s
retries: 5
networks:
- Reddit

broker:
image: confluentinc/cp-server:7.4.0
hostname: broker
container_name: broker
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://${KAFKA_INSIDE_HOST}:${KAFKA_INSIDE_PORT},PLAINTEXT_HOST://${KAFKA_EXTERNAL_HOST}:${KAFKA_EXTERNAL_PORT}
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: ${KAFKA_INSIDE_HOST}:${KAFKA_INSIDE_PORT}
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'false'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
networks:
- Reddit
healthcheck:
test: ["CMD", "bash", "-c", 'nc -z localhost 9092']
interval: 10s
timeout: 5s
retries: 5

schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
hostname: schema-registry
container_name: schema-registry
depends_on:
broker:
condition: service_healthy
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: '${KAFKA_INSIDE_HOST}:${KAFKA_INSIDE_PORT}'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
networks:
- Reddit
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8081/"]
interval: 30s
timeout: 10s
retries: 5

control-center:
image: confluentinc/cp-enterprise-control-center:7.4.0
hostname: control-center
container_name: control-center
depends_on:
broker:
condition: service_healthy
schema-registry:
condition: service_healthy
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: '${KAFKA_INSIDE_HOST}:${KAFKA_INSIDE_PORT}'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
CONFLUENT_METRICS_ENABLE: 'false'
PORT: 9021
networks:
- Reddit
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9021/health"]
interval: 30s
timeout: 10s
retries: 5

mongodb:
image: mongo:latest
container_name: my-mongodb
environment:
MONGO_INITDB_ROOT_USERNAME:
MONGO_INITDB_ROOT_PASSWORD:
ports:
- "${MONGO_PORT}:27017"
volumes:
- ${MONGO_DATA_STORAGE_PATH}:/data/db


networks:
Reddit:
external: true

Assuming you’ve created an “infrastructure” folder and a “.env” file inside the parent folder

Reddit Asynchronous Parser
├── .env
├── infrastructure
│ └── all_docker_compose.yaml
├── src
└── spark_src

Then you can deploy our application using Docker Compose as shown below. One important point is that we’re indicating the --env-file tag, which allows Docker to read our environment variables from the ".env" file. We can indicate environmental variables inside the docker-compose file by writing ${variable_name}. Bear in mind that you need to create a custom spark image in order to successfully deploy the whole application.

docker-compose -f infrastructure\all_docker_compose.yaml   --env-file .env up -d

Hopefully, if all the things go well you will have such an image that indicates everything works as expected.

After deploying the infrastructure you are ready to parse some data and store it inside of MongoDB by utilizing Spark streaming capabilities. Explanation of how the application works will be in the second part of this post. You can check it out at -> Part 2

References

https://medium.com/@mehmood9501/using-apache-spark-docker-containers-to-run-pyspark-programs-using-spark-submit-afd6da480e0f

https://www.mongodb.com/docs/manual/tutorial/install-mongodb-community-with-docker/

--

--