Building Highly Scalable Data Ingestion Architecture for Machine Learning and Marketing Intelligence

Leveraging AWS Ecosystem and Data Crawling for Scalable and Adaptive Data Pipelines

Rares Istoc
Decoding ML
13 min readJun 27, 2024

--

Machine learning without data is like a chef without ingredients — all the skills but nothing to cook.

These days, everything is circled around data, from the personalized ads we see online to the recommendations on our streaming services. Data drives decisions in businesses, healthcare, and even sports. Without data, our apps would be clueless, our smart devices would be dumb, and our predictions would be nothing more than guesses. In this digital age, data isn’t just the new oil; it’s the lifeblood of innovation and efficiency.

Ok, but why another article about data ingestion?

There are many ways to build data ingestion pipelines. With all the new toys created over the last decade, it's hard to select the best tools available to create a solution. Most of the time, the answer depends on your project's specific requirements.

In this article, you will explore an end-to-end solution tailored to a specific area: marketing intelligence. Using AWS’s integrated ecosystem of services, you can create a highly scalable data-ingestion pipeline that you can use to surf the web and crawl data, which you can plug into your various analytical processes: sales, competitor analysis, market analysis, and customer insights, you name it.

I also want to present a series of challenges I’ve encountered while building this solution. Most answers are scattered throughout the Internet, and it's hard to find a complete working solution… you can access the full solution code on GitHub

IMPORTANT NOTE: Before diving into this solution, you must be aware of the legal implications of ingesting data from some data sources, like social media pages, so we can make sure nobody goes to jail. Please read the terms and conditions of each major platform; these will restrict you from crawling user profiles and private pages.

Table of Contents

  1. Architecture Overview
  2. Implementation
  3. Challenges & Pitfalls
  4. Local Testings
  5. Deployment

1. Architecture Overview

This is what we are about to build:

[Image by the Author]

Here are some non-functional requirements that I’ve aimed to achieve with this architecture:

  • Scalability: I wanted this solution to simultaneously process as many pages as possible and add additional ones without care so the system can handle the amount of growth at any time.
  • Maintainability & Adaptability: I’ve designed each component of this system so that it can be easily changed and expanded without spending a lot of time on development.

Components Overview:

Scheduler: Plays multiple roles, despite its name, but the most important one is to trigger the crawler lambdas for each page link it has.

Crawler: The name states its purpose. If you’re not familiar with the term crawling, pause this article and look it over before proceeding. This component takes the page link and starts crawling/extracting various posts and information about them. More details will come in the implementation part.

Database: Our data lake storage instance will house these posts for later. For this, I’ve chosen MongoDB. Most posts are unstructured textual data, but we can extract other useful information from them, and Mongo shines at handling semi-structured data.

So, to mark the complete flow of our solution, the scheduler triggers a crawler lambda instance for each page, sending the page name and the link. The crawler starts extracting the posts from last week and stores the raw content, the post's creation date, the link itself, and the name, but this doesn’t stop here. You can extract more information depending on what the platform offers you.

Then, the scheduler waits for all lambda instances to finish their execution, aggregates the extracted posts from the database, and, using some prompt templates, sends the posts along with these to ChatGPT to generate some reports.

Tools & Libraries:

  • Selenium: Web browser automation tool. I’ve not used this directly. This is used by one of the crawling libraries included in this solution.
  • AWS Lambda: I’ll profit from its huge fire-and-forget concurrency capabilities. The default quota is 1000 concurrent lambda functions, which can be increased to tens of thousands according to your needs.
  • Eventbridge: Serverless EventBus helps you connect multiple applications in the AWS ecosystem; I used it to trigger the scheduler automatically through cron rules.
  • CloudWatch: Log monitorization and alarm service, we use this
  • Instaloader: This is an open-source tool for extracting metadata from Instagram.
  • Langchain: Powerful framework that lets you build applications around Large Language Models.
  • Pulumi: Infrastructure as Code (IaC) lets you write infrastructure as code in multiple languages. It's basically CDK with Terraform but on steroids.

2. Implementation

In this section, I’ll provide a detailed overview of the main components, breaking them down with code samples and explanations.

2.1 Scheduler

I’ll not focus so much on the reporting part, although you can find it here along with all the code I’ve shared in this article. The leading actor here is the scheduling part itself, and this is the main entry point of the system where the whole flow is started and orchestrated:

import json
import os
import time
from datetime import datetime, timedelta

import boto3
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.typing import LambdaContext

from src.constants import PAGE_LINK
from src.db import database
from src.utils import monitor

logger = Logger(service="decodingml/scheduler")

_client = boto3.client("lambda")


def lambda_handler(event, context: LambdaContext):
correlation_ids = []

for link in PAGE_LINK:
response = _client.invoke(
FunctionName="lambda",
InvocationType="Event",
Payload=json.dumps({"link": link}),
)
logger.info(f"Triggered crawler for: {link}")

correlation_ids.append(response["ResponseMetadata"]["RequestId"])

logger.info(f"Monitoring: {len(correlation_ids)} crawler processes")

while True:
time.sleep(15)
completed = monitor(correlation_ids)

correlation_ids = [c for c in correlation_ids if c not in completed]

if not correlation_ids:
break

logger.info(f"Still waiting for {len(correlation_ids)} crawlers to complete")

now = datetime.now()
posts = list(
database.profiles.find(
{
"date": {"$gte": (now - timedelta(days=7)), "$lte": now},
}
)
)

logger.info(f"Gathered {len(posts)} posts")

if not posts:
logger.info("Cannot generate report, no new posts available")
return

reports = generate_profiles_report(posts)

logger.info("Generated new report!")

As you can see, the scheduler acts as a scatterer. It iterates over a list of page links and invokes a crawler asynchronously by setting the InvocationType parameter to Event; this ensures that the scheduler won’t block the whole process to wait for a single page.

Then, it stores the correlation ID of each lambda in a list and waits for all lambdas to finish their execution here. I’ve defined an awaited time of 15 seconds; you can play with it according to the average time it takes for your crawler to complete its task, so you don’t call Cloudwatch that often.

Last, it finds all crawled posts from these pages and sends them to the report generation phase.

2.2 Crawler

Here I’ll break down the actual crawling process where you’ll see some good software practices applied so that you can easily [continue this]

import abc
import os
from datetime import datetime, timedelta
from itertools import takewhile, dropwhile
from typing import List, Dict, Any

import instaloader

from src.crawlers.base import BaseAbstractCrawler

class BaseAbstractCrawler(abc.ABC):

@abc.abstractmethod
def extract(self, link: str, **kwargs) -> None: ...


class InstagramCrawler(BaseAbstractCrawler):

def __init__(self, link: str, proxy=None):
self.link = link
self.loader = instaloader.Instaloader()
self._until = datetime.now()
self._since = self._until - timedelta(days=7)
self._proxy = proxy

def extract(self, **kwargs) -> List[Dict[str, str | Any]]:
parsed_url = urlparse(self.link)

if self._proxy:
os.environ['https_proxy'] = self._proxy.__dict__().get('http')
profile = instaloader.Profile.from_username(self.loader.context, parsed_url.path.strip('/').split('/')[0])
posts = takewhile(lambda p: p.date > self._since, dropwhile(lambda p: p.date > self._until, profile.get_posts()))

return [
{'content': post.caption, 'date': post.date, 'link': self.link}
for post in posts
]

Here, I’ve defined a main abstraction point for all types of crawlers. It defines a common interface that all derived crawlers must implement, and all subclasses must provide their implementation for the extract() method so wherever you need to build a new crawler. Besides the fact that this brings me a lot of reusability and uniformity, another valuable advantage is represented down below:

import re

from src.crawlers.base import BaseAbstractCrawler
from src.crawlers.instagram import InstagramCrawler


class CrawlerDispatcher:

def __init__(self) -> None:
self._crawlers = {}

def register(self, domain: str, crawler: type[BaseAbstractCrawler]) -> None:
self._crawlers[r"https://(www\.)?{}.com/*".format(re.escape(domain))] = crawler

def get_crawler(self, url: str) -> BaseAbstractCrawler:
for pattern, crawler in self._crawlers.items():
if re.match(pattern, url):
return crawler()
else:
raise ValueError("No crawler found for the provided link")


dispatcher = CrawlerDispatcher()
dispatcher.register('instagram', InstagramCrawler)

I wanted to make each crawler easily promoted and called automatically. In this case, I’ve built a dispatcher whose job is to select and instantiate the correct crawler class based on the link you’ve provided to be processed. This essentially acts as a registry and a factory for my crawler and manages these under the unified interface and structure we’ve created for them. This has certain advantages, which I’ll present here:

  • Flexibility & Scalability: This component unlocks the possibility for ease of addition without modifying the existing codebase. This makes the system to be easily expanded you can include more domains and specialized crawlers, just plug and play them.
  • Encapsulation & Modularity: The dispatcher encapsulates the logic for determining which crawler to use based on the link. This makes the system more modular and allows each crawler to focus on its core business logic without worrying about pattern matching.
from datetime import datetime, timedelta

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.typing import LambdaContext

from src.crawlers import dispatcher
from src.db import database

logger = Logger(service="decodingml/crawler")


def lambda_handler(event, context: LambdaContext):

link = event.get('link')

logger.info(f"Start extracting posts for {link}")

crawler = dispatcher.get_crawler(event.get('link'))

posts = [{**page, 'correlation_id': context.aws_request_id} for page in crawler.extract()]

now = datetime.now()
existing_posts = database.profiles.find({
"date": {"$gte": (now - timedelta(days=7)), "$lte": now},
"name": link
}, projection={'date': 1})

existing_posts = [post.get('date') for post in list(existing_posts)]

posts = [post for post in posts if post.get('date') not in existing_posts]

if not posts:
logger.info("No new posts on page")
return

logger.info(f"Successfully extracted {len(posts)} posts")
database.profiles.insert_many(posts)
logger.info(f"Successfully inserted data in db")

Here, I’ve assembled the main entry point, where we get the link from the event body, select the correct crawler, and start the extraction jobs. Once I've finished, I just check if some posts have already been added to the database so we don’t add unnecessary duplicates and then add the posts to it.

3. Challenges & Pitfalls

3.1 Running headless browser instance with selenium in lambda runtime environment.

I think this caused me the most headaches and nightmares. The Lambda execution environment is read-only, so anything you want to write on disk should be done into a temporary file. This will mostly ruin your dream of automatically installing the binary driver. So you would need to install this directly in the docker image and reference it manually in Selenium’s driver options. The only driver I could use for this setup was the Google binary driver.

FROM  public.ecr.aws/lambda/python:3.11 as build

# Download chrome driver and browser and manually unpack them in their folders
RUN yum install -y unzip && \
curl -Lo "/tmp/chromedriver-linux64.zip" "https://edgedl.me.gvt1.com/edgedl/chrome/chrome-for-testing/119.0.6045.105/linux64/chromedriver-linux64.zip" && \
curl -Lo "/tmp/chrome-linux64.zip" "https://edgedl.me.gvt1.com/edgedl/chrome/chrome-for-testing/119.0.6045.105/linux64/chrome-linux64.zip" && \
unzip /tmp/chromedriver-linux64.zip -d /opt/ && \
unzip /tmp/chrome-linux64.zip -d /opt/


FROM public.ecr.aws/lambda/python:3.11

# Install the function's OS dependencies using yum
RUN yum install -y \
atk \
cups-libs \
gtk3 \
libXcomposite \
alsa-lib \
libXcursor \
libXdamage \
libXext \
libXi \
libXrandr \
libXScrnSaver \
libXtst \
pango \
at-spi2-atk \
libXt \
xorg-x11-server-Xvfb \
xorg-x11-xauth \
dbus-glib \
dbus-glib-devel \
nss \
mesa-libgbm \
ffmpeg \
libxext6 \
libssl-dev \
libcurl4-openssl-dev \
libpq-dev

COPY --from=build /opt/chrome-linux64 /opt/chrome
COPY --from=build /opt/chromedriver-linux64 /opt/

COPY ./pyproject.toml ./poetry.lock ./

# Install Poetry, export dependencies to requirements.txt, and install dependencies
# in the Lambda task directory, finally cleanup manifest files.
RUN python3 -m pip install --upgrade pip && pip install poetry
RUN poetry export -f requirements.txt > requirements.txt && \
pip3 install --no-cache-dir -r requirements.txt --target "${LAMBDA_TASK_ROOT}" && \
rm requirements.txt pyproject.toml poetry.lock

# Copy function code
COPY ./src ${LAMBDA_TASK_ROOT}/src

The main idea in this Dockerfile is that I manually downloaded the Chrome driver and browser and unpacked them in a location where they can be accessed by Selenium, which usually would’ve done this directly.

This is a mandatory step for the Lambda environment. Since everything is read-only, in the next code sample I’ll show you how point Selenium to the correct driver and browser locations:

from tempfile import mkdtemp

def init_driver(self):
options = Options()
# Setup drover binary location manually
options.binary_location = '/opt/chrome/chrome'
# Run browser in headless mode
options.add_argument('--headless=new')
options.add_argument('--no-sandbox')
options.add_argument('--single-process')
options.add_argument('--window-size=1420,1080')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
options.add_argument('--disable-popup-blocking')
options.add_argument('--disable-notifications')
options.add_argument('--disable-dev-tools')
options.add_argument('--log-level=3')
options.add_argument('--ignore-certificate-errors')
options.add_argument("--no-zygote")
options.add_argument(f"--user-data-dir={mkdtemp()}")
options.add_argument(f"--data-path={mkdtemp()}")
options.add_argument(f"--disk-cache-dir={mkdtemp()}")
options.add_argument('--remote-debugging-port=9222')


self._driver = webdriver.Chrome(
service=Service("/opt/chromedriver"),
options=options,
)

Here I hardcode the driver and browser locations to the ones I’ve installed in Dockerfile.

You can see that I’ve also pointed a few fodlers to some temporary directory locations. I refer to user-data-dir, disk-cache-dir, and disk-cache-dir. Selenium will automatically create these directories, and I want to prevent that by manually setting them because doing so will cause errors due to Lambda's disk limitations.

3.2 Aggregate Empty Pages

My first monitorization algorithm was pretty basic. It looped over the correlation IDs of each lambda invocation and checked the database for generated posts, if there were any. Then, I hit a corner case where no new posts were created for some pages in the time range I searched, and the algorithm ended up in an infinite loop.

Then, I came up with an idea to bring CloudWatch logs into the game:

import datetime
import re
from typing import List

import boto3

_client = boto3.client('logs')


def monitor(correlation_ids: List[str]):
finished = []

now = int((datetime.datetime.now() - datetime.timedelta(days=1)).timestamp() * 1000)

response = _client.filter_log_events(
logGroupName='/aws/lambda/crawler',
startTime=now,
filterPattern="REPORT RequestId"
)

for event in response['events']:
match = re.search(r'REPORT RequestId: ([^\s]+)', event.get('message'))
if match:
correlation_id = match.group(1)
if correlation_id in correlation_ids:
finished.append(correlation_id)

return finished

Here, I search through all log streams for each lambda generated in that current day and look for the message, which usually has this format: REPORT RequestId: <correlation_id>. This indicates that the lambda has reached the end of its execution, and I can mark which correlation IDs have finished.

3.3 Avoid being blocked by social media platforms

This was a pity error—the kind you would’ve spent days on—and the solution was to watch it from a different perspective. Popular social media platforms implement many anti-bot protection mechanisms to prevent crawling, from request header analysis to rate limiting to IP blocking.

And because we run our browser in headless mode to mimic realistic user-browser interaction, and all our crawlers send requests under the same IP address to multiple pages at the same time repeatedly, this screams, please block me.

To address this, I’ve used a proxy to mask my IP address and location:

import os


class ProxyConnection:

def __init__(
self,
host: str = None,
port: str = None,
username: str = None,
password: str = None,
verify_ssl: bool = False
):
self.host = host or os.getenv('PROXY_HOST')
self.port = port or os.getenv('PROXY_PORT')
self.username = username or os.getenv('PROXY_USERNAME')
self.password = password or os.getenv('PROXY_PASSWORD')
self.verify_ssl = verify_ssl
self._url = f"{self.username}:{self.password}@{self.host}:{self.port}"

def __dict__(self):
return {
'https': 'https://{}'.format(self._url.replace(" ", "")),
'http': 'http://{}'.format(self._url.replace(" ", "")),
'no_proxy': 'localhost, 127.0.0.1',
'verify_ssl': self.verify_ssl
}

There are many good proxy solutions out there. However, I’ll recommend you get a paid one like SmartProxy, which offers you a pool of rotating proxy IPs to assign a different IP for each crawler.

This way, it will look just like any regular user trying to access a page from a different location.

Also, there’s a gotcha here: many social media platforms restrict access even to public pages in many countries for users who are not logged in, so by using a proxy, you can find a country that does not implement this restriction and get IPs from it.

4. Local Testings

To prove this works, I wrote a makefile containing some simple commands for crawler and lambda. The problem is that I’ve only managed to test the crawler locally. Since the scheduler spins up crawlers, they should be already deployed on AWS.

local-test-crawler: # Send test command on local to test  the lambda
curl -X POST "http://localhost:9000/2015-03-31/functions/function/invocations" \
-d '{"link": "https://www.instagram.com/mcdonalds"}'

local-test-scheduler: # Send test command on local to test the lambda
curl -X POST "http://localhost:9000/2015-03-31/functions/function/invocations" -d '{}'

Now, most people, when testing lambda functions on a local environment, use AWS Lambda RIE (Runtime Interface Emulator), which allows you to test your lambda function packages in a container. Basically, this emulates a lambda execution environment on your local machine. As you can see, I’ve managed to do this without using the emulator, which slightly simplified my environment.

You can use these commands to test each component. For example, if you would like to test the crawler, go into your terminal and use this command:

> make local-test-crawler

As you can see, the crawling process has started, and for this page, we’ve found three new posts in the last seven days:

5. Deployment

The deployment process is defined in our GitHub repository under the ops folder, where you can explore the whole solution written in Pulumi.

You can play with the Makefile. It contains all the necessary commands to make your infrastructure up and running.

Conclusion

In this article, we’ve explored a complete end-to-end robust solution for building a Highly Scalable Data Ingestion pipeline that can leverage existing data from multiple crawlable sources for various processes like ML training, data analysis, etc.

We’ve gone through specific challenges you might face and how to overcome them in this process.

| 🔗 Check out the code on GitHub and support us with a ⭐️

I hope you’ve enjoyed this article as much as I’ve enjoyed writing it. If you did, then…

↓↓↓

Join 7.5k+ engineers in the 𝗗𝗲𝗰𝗼𝗱𝗶𝗻𝗴 𝗠𝗟 𝗡𝗲𝘄𝘀𝗹𝗲𝘁𝘁𝗲𝗿 for battle-tested content on production-grade ML. 𝗘𝘃𝗲𝗿𝘆 𝘄𝗲𝗲𝗸:

References

[1] Web Scraping in Python: Avoid Detection Like a Ninja

Images

If not otherwise stated, all images are created by the author.

--

--