Reddit Asynchronous Parser (Part 2/2)

Javidan Karimli
15 min readJun 23, 2024

--

In the first part of this blog, you have already deployed the necessary infrastructure to run the application by accomplishing the steps given. If you don't please follow the instructions in the given post(Link to first part). Our application is designed to parse data from a given subreddit and push the parsed information to defined Kafka topics asynchronously. On the other end of Kafka, Apache Spark Structured Streaming is employed to push data into specific MongoDB instances with a small amount of transformation. So there are 2 parts of the application that need to be triggered in order to fully start the application. The parsing part and spark streaming part are not dependent on each other as they can run independently. They have 2 different command line interfaces to interact.

Architecture of Reddit Scraper
Architecture for Reddit Async Scraper

Common Part

In both components, various elements work together to maintain integrity and centralize access points for the entire process. A key element is the Config component, which manages the distribution of external service credentials throughout the application. The architecture of the Config component is straightforward: it reads environmental variables from the .env file located in the parent folder and provides an interface through Python getters and setters. This design allows other components to easily import the Config class and access all necessary services seamlessly.

Currently, the Config class is implemented with a simple design. It consists of predefined properties that provide interfaces through getters and setters. However, the setter functionality is not used throughout the application and is restricted to prevent any potential issues.

from dotenv import load_dotenv, find_dotenv
import os

load_dotenv(find_dotenv())


class Config:
__conf = {
"POSTGRES_USER" : os.environ.get( 'POSTGRES_REDDIT_USERNAME' ),
"POSTGRES_PASSWORD" : os.environ.get('POSTGRES_REDDIT_PASSWORD'),
"POSTGRES_HOST" : os.environ.get('POSTGRES_HOST'),
"POSTGRES_PORT" : os.environ.get('POSTGRES_PORT'),
"POSTGRES_DATABASE": os.environ.get('POSTGRES_DATABASE') ,
"KAFKA_HOST" : os.environ.get('KAFKA_EXTERNAL_HOST') ,
"KAFKA_PORT" : os.environ.get('KAFKA_EXTERNAL_PORT') ,
"DEFAULT_KAFKA_TOPIC" : os.environ.get('DEFAULT_KAFKA_TOPIC'),
"KAFKA_INSIDE_HOST" : os.environ.get('KAFKA_INSIDE_HOST'),
"KAFKA_INSIDE_PORT" : os.environ.get('KAFKA_INSIDE_PORT'),
"MONGO_HOST" : os.environ.get('MONGO_HOST') ,
"MONGO_PORT" : os.environ.get("MONGO_PORT") ,
"MONGO_USERNAME" : os.environ.get('MONGO_USERNAME') ,
"MONGO_PASSWORD" : os.environ.get('MONGO_PASSWORD') ,
"MONGO_DATABASE" : os.environ.get('MONGO_DATABASE') ,
"WEB_USER_AGENT" : os.environ.get('WEB_USER_AGENT')
}
__setters = []

@staticmethod
def get(name):
return Config.__conf.get(name)

@staticmethod
def set(name, value):
if name in Config.__setters:
Config.__conf[name] = value
else:
raise NameError("Name not accepted in set() method")

As it is seen from the code all the variables are loaded using the env file located inside the parent folder and it is a location where user-defined variables are connected to the application.

Scraper Architecture

Scrapers

In our system, there are two main Scrapers, each serving distinct roles.

RedditScraper

The first, RedditScraper, operates at a lower level, handling the actual scraping processes. It determines which posts need parsing, excludes sponsored posts, and efficiently yields data in asynchronous chunks for optimal performance. Notably, this Scraper solely retrieves data from specified subreddits without storing any information on external services. To add context and purpose to the retrieved data, it relies on higher-level controller services.

RedditKafkaProducer

On the other hand, RedditKafkaProducer operates at a higher level and focuses on interfacing with external services. Derived from RedditScraper, this Scraper functions more like a controller, connecting with services such as PostgreSQL and Kafka Broker. It orchestrates processes based on the provided context, with all activities logged and managed by PostgreSQL. All retrieved data is sent through specified Kafka topics for further use. Additionally, a command-line interface (CLI) facilitates interaction with the system, ensuring a seamless user experience.

Utils

The application utilizes various utility components to automate processes involving external services. Currently, there are four different utilities and three decorators:

External Services Handlers

There are three external service handlers, but the application does not use MongoDBHelper at this stage since MongoDB is the destination for the data and we are not yet dealing with data enhancement and manipulation.

SQL Handler

Before delving into AsyncPostgreSQLHelper, it’s important to mention the SQL Handler. Handling numerous SQL queries can cause visual clutter, especially when dealing with lengthy queries. To manage this, a SQL class was created with a single method named read_sql(). This method reads a given SQL file and returns its content, ensuring all SQL queries are conveniently read through this method.

AsyncPostgreSQLHelper

The AsyncPostgreSQLHelper is embedded within the RedditKafkaProducer to manage processes asynchronously. This helper uses the asyncpg library to maintain asynchronous operations.

The helper class operates in three stages:

  1. Connection Creation and Closing: It provides the ability to create a connection object and close a connection object with the specified PostgreSQL database.
  2. Lower Layer Operations: This layer allows for basic operations such as executing commands, and retrieving data from a given query.
  3. Upper Layer Operations: This layer handles more complex database operations, often involving multiple steps to achieve the desired result. Instead of retrieving data from the database and then manipulating and inserting or updating fields across different tables, this can be done using a single method from this class.

Though database operations might not seem user-friendly at first glance due to their complexity and specificity, they are essential for large operations involving several database interactions and decision-making processes. Frequently used or complex operations are encapsulated within methods in this helper, streamlining the process.

import asyncpg
from .sql.query import QueryStorage
from .sql.base_sql import SQL



class AsyncPostgreSQLHelper:

def __init__(self, dbname, user, password, host="localhost", port=5432):
self.dbname = dbname if dbname else ''
self.user = user
self.password = password
self.host = host
self.port = port
self.connection = None


async def connect(self):

conn_string = f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.dbname}"

self.connection = await asyncpg.connect(
conn_string
)

return self.connection

......

async def execute_query(self, query, **params):
try:
await self.connection.execute(SQL.read_sql( query, **params))

except Exception as e:
print(f"Error executing query: {e}")

async def fetch_data(self, query,**params):
try:
async with self.connection.transaction():
async for row in self.connection.cursor(SQL.read_sql( query, **params)):
yield row

except Exception as e:
print(f"Error fetching data: {e}")

async def fetch_one(self, query, **params):
try:
async with self.connection.transaction():
row = await self.connection.fetchrow(SQL.read_sql( query, **params))
return row

except Exception as e:
print(f"Error fetching one row: {e}")

async def close_connection(self):
await self.connection.close()
print("Postgres Connection closed")

Design of Database Tables

Our database design includes five tables, each serving a specific purpose:

  1. Subreddit: Tracks the paths of the parsed subreddits.
  2. SubredditJob: Logs details of each parsing process, including start and end dates, and specifies the Kafka Topic where the data is pushed.
  3. SubredditSchedule: Stores the date and status of the last parse.
  4. KafkaTopic: Lists all previously used Kafka Topics.
  5. JobType: Indicates whether the job is triggered manually or by schedule.

AsyncKafkaProducer

The AsyncKafkaProducer is designed for interacting with a Kafka broker using the AIOKafkaProducer Python library. It has four primary methods:

  1. __init__(): Initializes the producer with necessary configurations.
  2. start(): Starts the Kafka producer, preparing it to send messages.
  3. stop(): Stops the Kafka producer, ensuring all resources are properly released.
  4. push_to_kafka(): Pushes the retrieved parsing information to a dedicated Kafka broker.

The main purpose of this class is to push the parsed data to a Kafka broker. The retrieved data is converted into JSON format and encoded in “utf-8” to prevent issues on the receiving end of the Kafka broker, which is used by Apache Spark Streaming. This consistent encoding ensures seamless data processing downstream.

import json
from aiokafka import AIOKafkaProducer
import asyncio

class AsyncKafkaProducer:
def __init__(self, port, host ='localhost', loop=None):
self.host = host
self.port = port

self.bootstrap_servers = f"{self.host}:{self.port}"
self.loop = loop or asyncio.get_event_loop()

self.producer = AIOKafkaProducer(
loop=self.loop,
bootstrap_servers=self.bootstrap_servers,
)

async def start(self):

await self.producer.start()
return True

async def stop(self):
await self.producer.stop()
print("Kafka Connection closed")

async def push_to_kafka(self,topic, message):
try:
# Produce message to Kafka topic
await self.producer.send(topic, json.dumps(message).encode('utf-8', errors='ignore'))
# print(f"Message sent to {topic}: {message}")
except Exception as e:
print(f"Error while pushing message to Kafka: {e}")

Decorators

Decorators are another crucial utility in our application, providing easy access to external services. Currently, we have two types and three different decorators.

Lower-level Decorators

The lower-level decorators are used to connect to PostgreSQL and the Kafka broker without adding complexity to the Scraper classes. They are named async_provide_kafka_connection and async_provide_postgres_connection.

These decorators utilize the AsyncKafkaProducer and AsyncPostgreSQLHelper classes, which are designed to manage external services. These helpers create connections and pass them to the decorated function or method. Since the entire application adopts asynchronous methodologies, these decorators are also written in async form.

  • async_provide_kafka_connection: This decorator manages the connection to the Kafka broker, using the AsyncKafkaProducer class to create and provide a Kafka connection to the decorated function or method.
  • async_provide_postgres_connection: This decorator handles the connection to the PostgreSQL database, utilizing the AsyncPostgreSQLHelper class to create and provide a PostgreSQL connection to the decorated function or method.

Upper-level Decorator

The log_to_postgres decorator is built on top of the async_provide_postgres_connection decorator. It not only provides a PostgreSQL connection but also logs specific information to the PostgreSQL database, streamlining the logging process within the application.

This decorator operates with a simple try and except structure, allowing it to catch any exceptions that occur during the execution of the decorated function or method. Based on the nature of the event, it runs several SQL queries to update the "SubredditJob" table with the outcome of the function, effectively logging the process.

By encapsulating the logging process within a decorator, we ensure that exception handling and logging are consistent across the application, enhancing the maintainability and readability of the codebase.

from utils.postgres_helper import AsyncPostgreSQLHelper
from config import Config
from utils.sql.base_sql import SQL

....

def async_provide_postgres_connection():
def wrapper(func):
@wraps(func)
async def wrapped(*args, **kwargs):
session = None
async_postgres_helper = None
try:
async_postgres_helper = AsyncPostgreSQLHelper(
host=Config.get('POSTGRES_HOST'),
port=Config.get('POSTGRES_PORT'),
dbname=Config.get('POSTGRES_DATABASE'),
user=Config.get('POSTGRES_USER'),
password=Config.get('POSTGRES_PASSWORD')
)

session = await async_postgres_helper.connect()
await func(postgres_session = async_postgres_helper , *args, **kwargs)
except Exception as e:
print(f'Error occured while connecting to databaase: {e} ')

finally:
if session and async_postgres_helper:
await async_postgres_helper.close_connection()
return


return wrapped
return wrapper

By using these decorators, we simplify the interaction with external services, ensuring that connections are managed efficiently and transparently within the application.

Command Line Interface

This system is engineered to manage multiple data parsing tasks concurrently, designed to be triggered externally, typically through scheduling tools like crontab. Therefore, a command-line interface (CLI) is a must in this setup. It accommodates nearly all parameters required by the primary trigger function of the RedditKafkaProducer class to initiate parsing operations. In operation, it receives parameters from the command line, instantiates the RedditKafkaProducer object, and commences the parsing process. Below are the currently available tags along with their descriptions.

usage: RedditParserCLI [-h] -n NAME [-t {tophour,day,topweek,topmonth,topyear,topall,new,hot,rising,controversial}] [-mt MAX_TABS] [-a] [-l LIMIT] [-p PERIOD] [-dn] [-ob] [-kt KAFKA_TOPIC]

List of available parametres

options:
-h, --help show this help message and exit
-n NAME, --name NAME Name of Subreddit to parse eg : (r/gradadmissions/)
-t {tophour,day,topweek,topmonth,topyear,topall,new,hot,rising,controversial}, --type {tophour,day,topweek,topmonth,topyear,topall,new,hot,rising,controversial}
In which order type subreddit has to be parsed.
-mt MAX_TABS, --max_tabs MAX_TABS
Maximum amount of tabs that can be opened. More tabs more RAM usage less parsing time
-a, --all Parse the entire subreddit (default: False)
-l LIMIT, --limit LIMIT
Maximum amount of posts that processed. If given time period will be ignored!
-p PERIOD, --period PERIOD
Time span to scrape the posts. Used for scraping the data from /new ordering type. If not given (assuming limit also not given) app will get the last parsed date from database.
-dn, --direct_name Indicating user entered full subreddit path and don't need subreddit modification.
-ob, --open_browser Whether open up browser instance visually to parse or not. (True - > Do not open | False -> Open browser)
-kt KAFKA_TOPIC, --kafka_topic KAFKA_TOPIC
Kafka topic to publish the results into

Thanks for using RedditParserCLI! :)
  • -n: Name of the subreddit path to be parsed, formatted as “r/datascience” to ensure proper handling. This tag is mandatory for user input.
  • -t: Type of subreddit to be parsed. Reddit subreddits often categorize posts differently, such as “Top Post this month” or “new posts”. The default value is “new”.
  • -mt: Determines the number of parallel tabs to be used. More tabs more speed, more RAM, and CPU usage.
  • -l: Limits the number of posts to be parsed.
  • -a: Parses all posts from the given subreddit (limited to 1000 posts). If this flag is used, the limit flag will be ignored.
  • -p: Time span for scraping the posts.
  • -dn: Direct subreddit path to be used for parsing.
  • -ob: Opens a browser instance visually.
  • -kt: Kafka topic to which the retrieved data will be pushed.

The application is engineered to operate over extended periods, prioritizing time-based decisions over retrieving all possible data from a given subreddit, due to Reddit’s 1000 post limitation. Consequently, our system places significant emphasis on time-based decisions. Within the application, we maintain records of the last time a specific subreddit was parsed. Unless users include additional flags that prioritize alternative decision-making models, the default behavior is looking for the posts included within the given timespan which requires time-sorted lists to properly perform. Therefore, the primary focus remains on tracking the “new” category, where new posts regularly emerge. For other categories, parsing occurs less frequently, typically at longer intervals to retrieve old and most interacted posts. It’s essential to update the activity status in MongoDB to prevent duplication for further usage of data.

It’s recommended to utilize either the -a flag or the -l flag with all types of subreddit categories except “new”. This is because the application is engineered to make decisions about posts based on whether they were created before the last parsing, requiring posts to be ordered by time. Keep in mind that the -a flag takes precedence over decision-making models by parsing all possible posts. Conversely, the -l limit flag supersedes time-based decisions. For instance:

If you want to parse the entire “topall” category of the “dataengineering” subreddit, even though you included the -l tag with a value of 10, it will still parse the entire category.

python src/main.py -n r/dataengineering -a -t topall -l 10

However, this will retrieve the first 10 topall categorized posts from “dataengineering” subreddit.

python src/main.py -n r/dataengineering -t topall -l 10

Spark Architecture

Due to the nature of Apache Spark deployment, executing Spark code must be done within the master node. This can be achieved either by using the docker exec method or by creating an interactive bash shell within the container. Assuming we have already mounted /temp/scripts to /spark_src/scripts, it is appropriate to place our code inside /spark_src/scripts and synchronize it.

Spark Structured Streaming

To get started, we need two additional packages: spark-sql-kafka and mongo-spark-connector. These packages are defined when connecting to the Spark instance.

 spark_session = SparkSession \
.builder \
.appName(app_name) \
.master(f"{master_url}:{master_port}") \
.config('spark.jars.packages', "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1," "org.mongodb.spark:mongo-spark-connector_2.12:10.2.1") \
.getOrCreate()

Next, we define the structured schema for streaming data to ensure it is processed in a well-organized manner. This schema provides a clear framework for how the data will be structured and handled during the streaming process.

StructType([
StructField("Type", StringType(), nullable=True),
StructField("Id", StringType(), nullable=True),
StructField("AuthorId", StringType(), nullable=True),
StructField("AuthorName", StringType(), nullable=True),
StructField("SubredditRankingType", StringType(), nullable=True),
StructField("Permalink", StringType(), nullable=True),
StructField("Comment", StringType(), nullable=True),
StructField("CreatedAt", StringType(), nullable=True),
StructField("ParentCommentId", StringType(), nullable=True),
StructField("ParentPostId", StringType(), nullable=True),
StructField("SubredditName", StringType(), nullable=True),
StructField("RelatedCommentId", MapType(StringType(), ArrayType(StringType())), nullable=True),
StructField("IsActive", BooleanType(), nullable=True),
StructField("ParsedTime", StringType(), nullable=False),

])

After defining the structured schema, we read the data in batches from the specified Kafka topic and load it into a Spark DataFrame. This allows us to process and analyze the streaming data efficiently within the Spark environment.

df = df \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")

Finally, our application writes the data into MongoDB in batches. During this process, we perform a small transformation to extract the specific set of properties required for each type of data, since the retrieved data varies by type. This transformation is handled using the foreachBatch method. Notably, we define a checkpoint_location, which ensures that the Spark application can resume from the last saved point if it encounters a failure.

df.writeStream \
.foreachBatch(process_spark_batch(url = conn_url, database = database_name)) \
.outputMode("append") \
.option("checkpointLocation", checkpoint_location) \
.start() \
.awaitTermination()

process_spark_batch function is defined as follows. We are dividing batch_df into “Posts” and “Comments” and processed them in

def process_spark_batch(url, database, 
col_table_mapping = {
"Post": "Posts",
"Comment": "Comments",
}):

def foreach_batch_function(batch_df, batch_id):
print(f"Batch Id - {batch_id}")
# Define type collection map


# Filter batch DataFrame for posts and comments
posts_df = batch_df.filter(col("Type") == "Post").select("Id",
"AuthorId",
"AuthorName",
"Permalink",
"Comment",
"CreatedAt",
"SubredditName",
"SubredditRankingType",
"RelatedCommentId",
"IsActive",
"ParsedTime"
)

comments_df = batch_df.filter(col("Type") == "Comment").select(
"Id",
"AuthorId",
"AuthorName",
"Permalink",
"Comment",
"CreatedAt",
"SubredditName",
"SubredditRankingType",
"ParentPostId",
"ParentCommentId",
"IsActive",
"ParsedTime"
)


# Write posts to MongoDB
posts_df.write.format("mongodb") \
.mode("append") \
.option("spark.mongodb.connection.uri", url) \
.option("spark.mongodb.database", database) \
.option("spark.mongodb.collection", col_table_mapping.get("Post", "Unknown")) \
.save()

# Write comments to MongoDB

comments_df.write.format("mongodb") \
.mode("append") \
.option("spark.mongodb.connection.uri", url) \
.option("spark.mongodb.database", database) \
.option("spark.mongodb.collection", col_table_mapping.get("Comment", "Unknown")) \
.save()



batch_df.show()

return foreach_batch_function

Command Line Interface and Docker Automation

On top of the Spark architecture, there is a higher-level Python file exists named streaming_main.py under the scripts folder. This file offers the ability to provide different parameters to the Spark application. Below are the currently available tags along with their descriptions:

usage: SparkStructured Streaming   [-h] [-n NAME] [-ks KAFKA_SERVER] [-kt KAFKA_TOPIC] [-md MONGO_DATABASE] -mu MONGO_URL [-cp CHECKPOINT_PATH] [-ccp]

List of available parametres

options:
-h, --help show this help message and exit
-n NAME, --name NAME Name of Spark application (default -> Reddit)
-ks KAFKA_SERVER, --kafka_server KAFKA_SERVER
Kafka server to connect.
-kt KAFKA_TOPIC, --kafka_topic KAFKA_TOPIC
Kafka topic to connect.
-md MONGO_DATABASE, --mongo_database MONGO_DATABASE
MongoDB Database to write a the data
-mu MONGO_URL, --mongo_url MONGO_URL
MongoDB connection string
-cp CHECKPOINT_PATH, --checkpoint_path CHECKPOINT_PATH
Checkpoint path for streaming
-ccp, --clear_checkpoint_path
Whether clear checkpoint path or not

Spark Streaming between Kafka Topic and MongoDB SparkStructured Streaming ! :)
  • n: Name of the Spark application. The default name is “Reddit”.
  • -ks: The Kafka server address to connect to for streaming data.
  • -kt: The Kafka topic to subscribe to for receiving data.
  • -md: The MongoDB database where the data will be stored.
  • -mu: The MongoDB connection string to connect to the MongoDB instance.
  • -cp: The checkpoint path for saving the state of the streaming application. This allows the application to resume from the last saved point in case of failure.
  • -ccp: A flag to indicate whether the checkpoint path should be cleared. The default is False, meaning the checkpoint path will not be cleared.

While the tags are mostly self-explanatory, I would like to provide additional details regarding the -ccp tag, which manages the clearing of the checkpoint path. Occasionally, the Spark application may fail to start due to fundamental changes such as alterations in the database destination or Kafka topic. In these cases, clearing the checkpoint path can resolve the issue and enable the application to run again. Though a straightforward approach, this method is highly effective in refreshing the application’s state and restarting its cache. Therefore, a small function is created for clearing a given path and it will be triggered if -ccp tag is given.

To initiate the Spark application from an external Python file, we encountered an issue where calling it directly resulted in an error indicating that Spark could not locate a master node. Despite numerous attempts, we were unable to find a method to trigger the Spark instance externally. Therefore, we began triggering the Spark application using the docker exec command. This command-line interface requires a set of parameters such as the MongoDB connection string and the Kafka broker URL to trigger the spark application.

docker exec -it infrastructure-spark-master-1  bash -c "cd  && python temp/scripts/streaming_main.py -mu 'mongodb://host.docker.internal:27017'

Firstly we are going to the main path and then trigger the streaming_main.py file with the required params. It is important to note that this part of triggering the spark application is not integrated with the whole application itself. So we have developed another way of starting of spark application which encapsulates docker exec commands and default parameters of attributes defined in .env file.

Firstly, We navigate to the main directory and then trigger the temp/scripts/streaming_main.py file with the required parameters. It is important to note that this way of triggering the Spark application is not integrated with the whole application itself. Therefore, we have developed another method to start the Spark application. This new approach encapsulates the docker exec commands and uses default parameter values defined in the .env file. This function is located inside of spark_src/spark_main.py

start_spark_process(spark_app_name = 'Medium', clear_checpoint_path=True)

This function constructs the docker exec command using the provided user inputs and default connection parameters. Additionally, there is another function named execute_command, which executes shell commands and displays the output.

def start_spark_process(
spark_app_name = 'Reddit',
kafka_topic = Config.get('DEFAULT_KAFKA_TOPIC'),
check_point_path = '/temp/checkpoint',
clear_checpoint_path = False
):
base_docker_exec_command = 'docker exec -it infrastructure-spark-master-1 bash -c '
pre_bash_command = ' cd '



spark_app_path = "/temp/scripts/streaming_main.py"

mongodb_url = f"mongodb://{Config.get('MONGO_HOST')}:{Config.get('MONGO_PORT')}"

kafka_server = f"{Config.get('KAFKA_INSIDE_HOST')}:{Config.get('KAFKA_INSIDE_PORT')}"

mongo_database = Config.get('MONGO_DATABASE')

clear_checpoint_path_flag = '-ccp' if clear_checpoint_path else ''


bash_command = f""" python {spark_app_path} -n {spark_app_name} -mu '{mongodb_url}' -ks {kafka_server} -kt {kafka_topic} -md {mongo_database} -cp {check_point_path} {clear_checpoint_path_flag} """
full_command = f""" {base_docker_exec_command} "{pre_bash_command} && {bash_command}" """



execute_command(full_command)

Usage

As previously mentioned, the application consists of two parts that need to be triggered. First, the user must start the Spark application, which is responsible for accepting the data. This can be accomplished by executing the following command:

python spark_src\spark_main.py 

Once the Spark application is running, we can start the parser. Next, we need to select a subreddit to parse. It is recommended to begin with the “topall” or “topyear” types to retrieve all the best posts from the given subreddits. After that, we can switch to parsing new posts.

You might wonder what happens if we parse the same post multiple times, especially if we repeatedly parse the “topall” for a subreddit that changes very slowly. The application will indeed parse the same post multiple times and save it in MongoDB. However, each post has IsActive and ParsedTime fields. This allows us to retain only the latest state of posts and comments and potentially delete previous states from MongoDB. Although this functionality is not yet implemented, we are focusing on collecting data in a structured format for now.

# For the first timepython src/main.py  -n r/dataengineering -t topall -a

# In the upcoming sessions
python src/main.py -n r/dataengineering

Reddit provides an opportunity to gather information on specific areas of interest. Therefore, collecting your desired subreddits into a .sh file and manually triggering this file using the command below, or setting up a cron job for scheduling, is likely the most efficient way to use the application. This approach will sequentially trigger the src/main.py file, allowing you to parse numerous subreddits effectively.

python src/main.py  -n r/dataengineering
echo '----------------------------------------------------------------------------------------------------------------------------'
python src/main.py -n r/data

Let’s imagine you store these bash commands inside a file named jobs.sh. The user can trigger the .sh file as follows:

sh jobs.sh

Conclusion

User-generated data about specific areas of interest is more valuable than ever, especially as the volume of AI-generated content reaches an all-time high. Therefore, leveraging a major platform like Reddit, where users freely share information with the public, is essential. The developed application enables users to track specific sets of information on almost any topic and store it in a database for future use.

--

--