Building a Scalable RSS Feed Pipeline with Apache Airflow, Kafka, and MongoDB, Flask Api
In today’s data-driven world, processing large volumes of data in real-time has become essential for many organizations. The Extract, Transform, Load (ETL) process is a common way to manage the flow of data between systems. In this article, we’ll walk through how to build a scalable ETL pipeline using Apache Airflow, Kafka, and Python, Mongo and Flask
In this pipeline, the RSS feeds are scraped using a Python library called feedparser
. This library is used to parse the XML data in the RSS feeds and extract the relevant information. The parsed data is then transformed into a standardized JSON format using Python's built-in json
library. This format includes fields such as title
, summary
, link
, published_date
, and language
, which make the data easier to analyze and consume.
NEWS_FEEDS = {
"en": [
"https://www.cnn.com/rss/edition.rss",
"https://www.bbc.com/news/10628494",
"https://www.nbcnews.com/id/303207/device/rss/rss.xml",
"https://www.foxnews.com/about/rss/"
],
"pl": [
"https://www.tvn24.pl/najnowsze.xml",
"https://www.rmf24.pl/fakty/polska/feed",
"https://wiadomosci.wp.pl/rss",
"https://www.money.pl/rss/wszystkie"
],
"es": [
"https://www.elpais.com/rss/feed.html?feedId=1022",
"https://www.abc.es/rss/feeds/abc_EspanaEspana.xml",
"https://www.elconfidencial.com/rss/",
"https://www.elperiodico.com/es/rss/"
],
"de": [
"https://www.tagesschau.de/xml/rss2",
"https://www.faz.net/rss/aktuell/",
"https://www.zeit.de/rss",
"https://www.spiegel.de/schlagzeilen/tops/index.rss"
],
"fr": [
"https://www.lemonde.fr/rss/une.xml",
"https://www.lefigaro.fr/rss/figaro_actualites.xml",
"https://www.liberation.fr/rss/",
"https://www.lci.fr/rss"
]
}
What is Apache Airflow?
Apache Airflow is a platform used to programmatically author, schedule, and monitor workflows. It allows developers to create complex workflows by defining tasks and their dependencies. Airflow makes it easy to monitor the execution of tasks and provides an intuitive web interface to visualize the workflow.
What is Kafka?
Apache Kafka is a distributed event streaming platform that allows you to publish and subscribe to streams of records. Kafka provides high-throughput, low-latency, and fault-tolerant data transport. Kafka can be used for real-time data processing, streaming analytics, and log aggregation.
Implementing the ETL pipeline
To implement the ETL pipeline, we’ll use Python and the following libraries:
feedparser
: A Python library that parses RSS feedsbeautifulsoup4
: A Python library that extracts data from HTML and XML fileskafka-python
: A Python library that provides a Kafka clientredis
: A Python library that provides a Redis client
First, we’ll define a DAG (Directed Acyclic Graph) in Airflow to run the pipeline on a scheduled basis. The DAG consists of four tasks:
- Update the proxy pool: This task retrieves a list of proxy servers from Redis or a public API, tests their connectivity, and stores the valid proxies in Redis. We’ll use the proxies to avoid getting blocked by the RSS feed servers.
2. Extract news: This task reads the RSS feeds using the valid proxies, extracts the news articles, and stores them in a list. We’ll use concurrent programming to speed up the extraction process.
3. Validate data: This task checks if the news articles have all the required fields (title, link, and summary), and stores the valid articles in a separate list.
4. Send to Kafka: This task sends the validated news articles to a Kafka topic, using the JsonConverter
to serialize the data.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import feedparser
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import requests
import random
import redis
import concurrent.futures
import html
NEWS_FEEDS = {
"en": [
"https://www.cnn.com/rss/edition.rss",
"https://www.bbc.com/news/10628494",
"https://www.nbcnews.com/id/303207/device/rss/rss.xml",
"https://www.foxnews.com/about/rss/"
],
"pl": [
"https://www.tvn24.pl/najnowsze.xml",
"https://www.rmf24.pl/fakty/polska/feed",
"https://wiadomosci.wp.pl/rss",
"https://www.money.pl/rss/wszystkie"
],
"es": [
"https://www.elpais.com/rss/feed.html?feedId=1022",
"https://www.abc.es/rss/feeds/abc_EspanaEspana.xml",
"https://www.elconfidencial.com/rss/",
"https://www.elperiodico.com/es/rss/"
],
"de": [
"https://www.tagesschau.de/xml/rss2",
"https://www.faz.net/rss/aktuell/",
"https://www.zeit.de/rss",
"https://www.spiegel.de/schlagzeilen/tops/index.rss"
],
"fr": [
"https://www.lemonde.fr/rss/une.xml",
"https://www.lefigaro.fr/rss/figaro_actualites.xml",
"https://www.liberation.fr/rss/",
"https://www.lci.fr/rss"
]
}
headers_list = [
{
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:77.0) Gecko/20100101 Firefox/77.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://www.google.com/",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1"
},
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:77.0) Gecko/20100101 Firefox/77.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://www.google.com/",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1"
},
{
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "en-GB,en-US;q=0.9,en;q=0.8",
"Dnt": "1",
"Referer": "https://www.google.com/",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36",
"X-Amzn-Trace-Id": "Root=1-5ee7bae0-82260c065baf5ad7f0b3a3e3"
},
{
"User-Agent": 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:55.0) Gecko/20100101 Firefox/55.0',
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "pl-PL,pl;q=0.9,en-US;q=0.8,en;q=0.7",
"Referer": "https://www.reddit.com/",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1"
}
]
# Define default_args dictionary to pass to the DAG
ARGS = {
"owner": "stefentaime",
"start_date": days_ago(0),
"retries": 1,
"retry_delay": timedelta(seconds=30)
}
dag = DAG(
dag_id="ETL-Pipeline",
default_args=ARGS,
description="",
schedule_interval="0 0 1 * *",
tags=["ETL", "kafka", "Scrapting"]
)
REDIS_CONFIG = {'host': 'redis', 'port': 6379, 'decode_responses': True}
REDIS_KEY = 'proxies'
PROXY_WEBPAGE = 'https://free-proxy-list.net/'
TESTING_URL = 'https://httpbin.org/ip'
MAX_WORKERS = 20
PROXY_EXPIRATION = timedelta(minutes=5)
def get_proxies():
r = redis.Redis(**REDIS_CONFIG)
if r.exists(REDIS_KEY):
proxies = r.lrange(REDIS_KEY, 0, -1)
expiration = r.ttl(REDIS_KEY)
if expiration == -1:
r.expire(REDIS_KEY, PROXY_EXPIRATION)
elif expiration < PROXY_EXPIRATION.total_seconds():
r.delete(REDIS_KEY)
proxies = []
else:
proxies = []
if not proxies:
headers = random.choice(headers_list)
page = requests.get(PROXY_WEBPAGE, headers=headers)
soup = BeautifulSoup(page.content, 'html.parser')
for row in soup.find('tbody').find_all('tr'):
proxy = row.find_all('td')[0].text + ':' + row.find_all('td')[1].text
proxies.append(proxy)
r.rpush(REDIS_KEY, *proxies)
r.expire(REDIS_KEY, PROXY_EXPIRATION)
return proxies
def update_proxypool(**kwargs):
get_proxies()
def test_proxy(proxies):
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
results = list(executor.map(test_single_proxy, proxies))
return (proxy for valid, proxy in zip(results, proxies) if valid)
def test_single_proxy(proxy):
headers = random.choice(headers_list)
try:
resp = requests.get(TESTING_URL, headers=headers, proxies={"http": proxy, "https": proxy}, timeout=3)
if resp.status_code == 200:
return True
except:
pass
return False
# Define the task to update the proxypool
def update_proxypool(**kwargs):
proxies = get_proxies()
valid_proxies = list(test_proxy(proxies))
kwargs['ti'].xcom_push(key='valid_proxies', value=valid_proxies)
import datetime
next_id = 1
def extract_website_name(link):
# Extract the website name from the link
website_name = link.split('//')[1].split('/')[0]
# Remove any leading "www." from the website name
website_name = website_name.replace('www.', '')
return website_name
def extract_article_data(entry, language):
global next_id
title = entry.title.encode('ascii', 'ignore').decode()
soup = BeautifulSoup(entry.summary, 'html.parser')
summary = html.unescape(soup.get_text().strip().replace('\xa0', ' '))
link = entry.link
date_published = entry.get('published_parsed', None)
if date_published is not None:
date_published = datetime.datetime(*date_published[:6])
time_since_published = datetime.datetime.utcnow() - date_published
if time_since_published < datetime.timedelta(hours=1):
today = datetime.datetime.utcnow().strftime("%d-%m-%Y")
website_name = extract_website_name(link)
unique_id = f"{language.upper()}{next_id:02d}-{website_name}-01-{today}"
next_id += 1
return {
'id': unique_id,
'title': title,
'link': link,
'summary': summary,
'language': language
}
return None
def extract_news_feed(feed_url, language, proxy):
feed = feedparser.parse(feed_url, request_headers={'User-Agent': proxy})
articles = []
extracted_articles = set()
for entry in feed.entries:
if len(articles) >= 2:
break
link = entry.link
title = entry.title.encode('ascii', 'ignore').decode()
unique_id = f'{language}-{link}-{title}'
if unique_id in extracted_articles:
continue
extracted_articles.add(unique_id)
article_data = extract_article_data(entry, language)
if article_data is not None:
articles.append(article_data)
return articles
def extract_news(**kwargs):
valid_proxies = set(kwargs['ti'].xcom_pull(key='valid_proxies'))
articles = []
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(extract_news_feed, feed_url, language, proxy) for language in NEWS_FEEDS.keys()
for feed_url in NEWS_FEEDS[language] for proxy in valid_proxies]
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result is not None:
articles.extend(result)
kwargs['ti'].xcom_push(key='articles', value=articles)
return articles
# Define the task to validate the quality of the data
def validate_data(**kwargs):
articles = kwargs['ti'].xcom_pull(key='articles', task_ids='extract_news')
validated_articles = [article for article in articles if all(article.get(k) for k in ('title', 'link', 'summary'))]
kwargs['ti'].xcom_push(key='validated_articles', value=validated_articles)
return validated_articles
# Define the task to send data to the Kafka topic
def send_to_kafka(**kwargs):
validated_articles = kwargs['ti'].xcom_pull(key='validated_articles', task_ids='validate_data')
producer = KafkaProducer(bootstrap_servers='broker:29092')
for article in validated_articles:
try:
producer.send('rss_feeds', key=article['title'].encode(), value=json.dumps(article).encode())
except KafkaError as e:
print(f"Failed to send message to Kafka: {e}")
producer.flush()
print("Data sent to Kafka successfully.")
# Define the task dependencies
update_proxypool_task = PythonOperator(task_id='update_proxypool', python_callable=update_proxypool, provide_context=True, dag=dag)
extract_news_task = PythonOperator(task_id='extract_news', python_callable=extract_news, provide_context=True, dag=dag)
validate_data_task = PythonOperator(task_id='validate_data', python_callable=validate_data, provide_context=True, dag=dag)
send_to_kafka_task = PythonOperator(task_id='send_to_kafka', python_callable=send_to_kafka, provide_context=True, dag=dag)
# Set the task dependencies
update_proxypool_task >> extract_news_task >> validate_data_task >> send_to_kafka_task
Next, we’ll deploy a Kafka connector to consume the news articles from the Kafka topic and load them into MongoDB. We’ll use the MongoSinkConnector
from the mongo-kafka-connect
library, which provides an efficient and reliable way to integrate Kafka with MongoDB. The connector is configured to read the news articles from the Kafka topic, and write them to a MongoDB collection in the demo
database.
{
"name": "mongodb-sink-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"topics": "rss_feeds",
"connection.uri": "mongodb://debezium:dbz@mongo:27017/demo?authSource=admin",
"database": "demo",
"collection": "rss_feeds_collection",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}
To run the pipeline, you need to set up the following components:
- Apache Airflow: Use
pip
to install Airflow, and create a Python script that defines the DAG. - Redis: Set up a Redis instance to store the proxy servers.
- Kafka: Install and configure a Kafka cluster with a single broker, and create a Kafka topic named
rss_feeds
. - MongoDB: Install and configure a MongoDB cluster, and create a database named
demo
. - Kafka Connector: Deploy the
mongo-kafka-connect
connector to your Kafka cluster, and configure it to read from therss_feeds
topic and write to therss_feeds_collection
collection in thedemo
database.
Flask web application to serve news articles stored in a MongoDB database. The web application provides the following endpoints:
from pymongo import MongoClient
from bson.objectid import ObjectId
from flask import Flask, request, jsonify, render_template
client = MongoClient('mongodb://debezium:dbz@localhost:27017/?authSource=admin')
db = client['demo']
collection = db['rss_feeds_collection']
app = Flask(__name__, template_folder='/path/template')
# get all news articles
@app.route('/news', methods=['GET'])
def get_all_news():
cursor = collection.find({}, {"_id": 0})
news = []
for item in cursor:
news.append({'title': item['title'], 'summary': item['summary'], 'link': item['link'], 'language': item['language'], 'id': item['id']})
return jsonify({'news': news})
# get a news article by id
@app.route('/news/<id>', methods=['GET'])
def get_news_by_id(id):
item = collection.find_one({'id': id})
if item:
return jsonify({'_id': str(item['_id']), 'title': item['title'], 'summary': item['summary'], 'link': item['link'], 'language': item['language']})
else:
return jsonify({'error': 'News article not found'})
# update a news article by id
@app.route('/news/<id>', methods=['PUT'])
def update_news_by_id(id):
item = collection.find_one({'id': id})
if item:
data = request.get_json()
collection.update_one({'id': id}, {'$set': data})
return jsonify({'message': 'News article updated successfully'})
else:
return jsonify({'error': 'News article not found'})
# delete a news article by id
@app.route('/news/<id>', methods=['DELETE'])
def delete_news_by_id(id):
item = collection.find_one({'id': id})
if item:
collection.delete_one({'id': id})
return jsonify({'message': 'News article deleted successfully'})
else:
return jsonify({'error': 'News article not found'})
# render a web page with news articles
@app.route('/', methods=['GET'])
def news_page():
page = request.args.get('page', 1, type=int)
language = request.args.get('language')
# build query for language filtering
query = {} if not language else {'language': language}
# retrieve total count and paginated news articles
count = collection.count_documents(query)
cursor = collection.find(query, {"_id": 0}).skip((page-1)*5).limit(8)
news = []
for item in cursor:
news.append({'title': item['title'], 'summary': item['summary'], 'link': item['link'], 'language': item['language'], 'id': item['id']})
# calculate number of pages for pagination
num_pages = count // 8 + (1 if count % 8 > 0 else 0)
return render_template('index.html', news=news, page=page, language=language, num_pages=num_pages)
if __name__ == '__main__':
app.run(debug=True)
/news
: GET all news articles from the database/news/<id>
: GET a news article with the specifiedid
from the database/news/<id>
: PUT updates a news article with the specifiedid
in the database/news/<id>
: DELETE deletes a news article with the specifiedid
from the database/
: GET a web page that displays paginated news articles with an optional language filter
Prerequisites
Before we start, make sure you have the following installed:
- Python 3
- Docker and Docker Compose
- A text editor
Steps To Run:
- Clone the project to your desired location:
Execute the following command that will create the .env
file containing the Airflow UID needed by docker-compose:
$ echo -e "AIRFLOW_UID=$(id -u)" > .env
Build Docker:
$ docker-compose build
Initialize Airflow database:
$ docker-compose up airflow-init
Start Containers:
$ docker-compose up -d
When everything is done, you can check all the containers running:
$ docker ps
Now you can access Airflow web interface by going to http://localhost:8080 with the default user which is in the docker-compose.yml
. Username/Password: airflow. Now, we can trigger our DAG and see all the tasks running.
To setup Kafka and MongoDB, navigate to cd mongo-kafka
:
$ cd mongo-kafka
Start Kafka and MongoDB containers:
$ docker-compose up -d
Execute the following command that will create SinkConnector for MongoDB:
$ curl -X POST \ -H "Content-Type: application/json" \ --data @mongo-sink.json \ http://localhost:8083/connectors
Execute the following command that will Run Api
$ python api.pi
Conclusion:
In conclusion, this article has covered a variety of topics related to building a scalable RSS feed pipeline. We started by discussing RSS feeds and how to scrape them using Python. We then explored the use of Apache Airflow for orchestrating the pipeline and scheduling tasks.
Next, we looked at how to use Kafka as a message broker to handle the data flow between the different components of the pipeline. We also examined the use of Kafka Connect to integrate Kafka with MongoDB and to enable easy data ingestion.
To visualize the data ingested into MongoDB, we built a simple Flask API with Jinja templates to render a web page with paginated news articles. We used Bootstrap to make the page responsive and added filtering capabilities based on the language of the news articles.