Deploying click-stream data pipelines on AWS

Ranieri Castelar
The Startup
Published in
14 min readOct 2, 2019

Hello! Today we will talk a little about clickstreams and some of the pipeline architectures that can be implemented on AWS.

Click Streams

Click Streams are created through the capture and aggregation of user activity, due to interactions with an app or website. Capturing clicks on the login button, navigation through product pages and others can be beneficial not only for the marketing team but also for development. We could detect, for example, if the flow of user actions is interrupted during a specific point during navigation, hinting at possible UX problems, or even bugs. We could also measure the impact of changes: How does a change in the size of a button affects the experience? Was it positive, or negative?

On the marketing side of things, this data can be used to better understand our users. Despite the bad tone of capturing data for this purpose, it helps us to give what our clients need when they need it, which is beneficial for all parts involved.

Adversities of the implementation

While analyzing clickstreams is useful, it can also be challenging. This type of analysis naturally raises privacy concerns and increases the risks associated with data leakages. It can also be expensive: Imagine that our pipeline is receiving about 1000 actions per second. That means 86 million clicks per day, and 2.5 billion clicks a month! Also, if the schema of the received data changes, we might need to change parts of the pipeline. If not properly dealt with, the stream can become more of a problem than a benefit.

Some SaaS (Software as a service) products can help here, dealing with part of the complexity for us. They can, however, also introduce limitations: Maybe the data can only be retrieved a couple of times a day, you can’t easily switch to other vendors, or the cost is not ideal. That aside, the tools offered by AWS might be enough to properly deal with this complexity.

In this article, our focus will be on AWS, and as such, architectures that rely on servers or external offerings to pre-process the data won’t be covered. We will talk about the data itself, look at ways in which we can ingest, process or execute an initial analysis, as well as methods for storing it for posterior access. Also, while the focus of the article is on click streams, we could use the presented tools and architectures to deal with server logs or time-series data, among others.

Pipeline architectures: What we are looking for

Implementing a click-stream pipeline can be difficult, but we have an advantage: There is no need to immediately respond to user actions. Taking from a few seconds to a few minutes to ingest and execute the initial analysis is not usually a problem. Besides, it allows us to use micro-batches: We can create a buffer and use, for example, only one lambda to deal with many requests.

As such, we should have three goals in mind while creating a pipeline.

Scalability

We need a solution capable of managing large amounts of data, avoiding an increase in costs similar to the amount of ingested information. This means that simply storing raw data in a more expensive, high-performance location is not ideal. At the same time, having access to the “raw” data can be useful, and we may not want to immediately discard it. The solution should, of course, also be able to keep up with the analysis of thousands of actions per second, if our environment dictates it.

Flexibility

Ideally, changes to the data structure shouldn’t require changes to the pipeline. If we can’t add or remove fields from the JSON sent by the front-end, the pipeline might impact the development process. Also, frequent maintenance can be unsustainable.

Safety

When a failure occurs, parts of the pipeline located upstream should be able to deal with the problems.

Normally, implementing backups and correction mechanisms takes time, generating extra complexity and costs. One of the advantages of AWS, however, is the “overhead free” application of such mechanisms. We can avoid many problems through the use of available tools. Yet, In this particular case, privacy is also an issue. We should implement architectures that avoid the unnecessary storage of sensitive user information.

Pipeline implementation on AWS

AWS offers a large variety of tools, and at first glance, many of these look similar. Should we use Kinesis Streams or SQS? Maybe Firehose? Keeping the goals above in mind will help us filter down the possible pipelines.

Data ingestion tools

The first step of the architecture deals with data ingestion. Remember, we are trying to receive data from the front end. This means that directly storing data in RDS, for example, is not an option (And might not be the ideal solution anyway). For security purposes, imagine that all data is passing through AWS’s API gateway before being consumed by our pipeline.

Even here, many options could be used. SQS, Lambdas, Kinesis Firehose and Kinesis Streams come to mind.

So what should we use?

Lambdas

Lambdas are powerful, but using them to directly ingest data might not be the best approach. Despite the limited execution time, each request would be processed by a different lambda.

Grouping received actions in small batches is one of the main ways in which we can lower the costs of the pipeline. Also, we can only execute a limited number of functions in parallel.

By keeping each request separate we aren’t able to compare values until the data reaches its final destination. Lambdas also have limited security, if they fail, the received data would likely be lost. To prevent these problems, we would need to pair them with other tools anyway.

SQS

As the name implies, SQS offers developers a simple (yet powerful) queuing service. However, It keeps the received data available only until a consumer grabs it, and allows only one consumer at a time.

If we take 10 requests from the queue for processing, we wouldn’t be able to compare the 11’th one with these previous requests. This also means that SQS offers fewer reliability features than other tools. It’s not possible to restore the state of the queue if something goes wrong downstream.

The tool is still a valid alternative and should be considered if it meets the requirements of your environment. It’s cheaper, and easier to manage than Kinesis Streams, at $0,40/ Million requests. It has the additional disadvantage of being able to deal with a smaller number of requests per second, peaking at 1000.

Kinesis firehose

Firehose’s job is simply receiving data and pouring it at a target destination. The options are limited, as we can choose between S3, RedShift, Lambdas, and Elasticsearch.

Firehose, however, is an excellent fit for our scenario. For $0.029 (or less) per GigaByte, it is almost free for environments with lower throughput. You don’t pay to simply keep the solution online.

1000 requests/s still keeps our costs under $400,00/month. SQS, for example, would be more expensive than that. It’s important to keep the size of the requests under 5KB, however, or the prices will start to increase.

It also doesn’t care about the schema of the received data (Unless we are executing a transformation in the data structure, as we will see later). And, in case of failure, data is retained for 24 hours.

Firehouse also outputs data in micro-batches. The solution buffers data for a minimum of 60 seconds, or the total size of 64MB before dropping it into a destination. Instead of one lambda per request, Firehose can send a pack of requests to each fired function. Otherwise, having S3 as a target also keeps the storage costs of RAW data low (and querying data stored in S3 is still possible).

1. An example of architecture, where Firehose is sending micro-batches of incoming requests to each lambda function.

Additionally, Firehose can use lambdas to pre-process data, and then send it to another destination. This can be used to transform the incoming data into one of the formats supported by the solution or to obfuscate sensitive information. The pre-processing function also receives data in micro-batches.

As we will see, Firehose has the advantage of being compatible with kinesis analytics. The main limitation of the solution is the fact that by default, it can only deal with up to 5000 requests per second. It’s worth noting that in many scenarios, users can divide the incoming data into multiple streams.

Kinesis Streams

Similarly to SQS, Streams is also a queue service, requiring a consumer. The main difference is that it stores received data for a set period, allowing this data to be fetched multiple times. The received information is available almost immediately and can be fetched from Kinesis Streams at up to five times a second (per shard).

With enhanced fan-out, this limitation is lessened. By enabling it, each independent consumer can themselves fetch data up to five times a second. The latency might still increase, as each shard has an output limit in MB/s.

Streams is more expensive than previous solutions. Users pay not only for the number of requests received but also to keep the solution online. Each shard cost $0.015/ hour (and is also able to ingest up to 1MB/second ). An additional $0,014 is charged per million of processed PUT actions (Heavier payloads will require more PUT actions).

Using Streams, users can deploy hundreds of shards. As such, the solution can ingest an impressive amount of information. It should be used when SQS can’t keep up with the amount of data, or when keeping this information on the queue for multiple fetches is a requirement.

Initial analysis, filtering, and transformation

Similarly to the ingestion step, AWS also provides many options for data transformation. In most scenarios, we want to process the received RAW data as soon as possible, making it available for further analysis. Some times, however, this is not the case. Let’s take a look at possible options.

Kinesis analytics

Analytics can be paired with Firehose or Kinesis Streams, allowing users to execute SQL queries directly. It doesn’t function as a consumer, but as an external observer. When paired with Firehose, the tool will act on incoming requests even if the data is still being buffered, and information will still be sent to the original destination.

Analytics doesn’t provide a true real-time process. Usually, it will treat data in small batches, with a delay of about 10 seconds. The captured information is stored in the tool itself, allowing the execution of queries in time windows.

As we can see in the snippet below, Analytics queries can be divided into four pieces: An input stream, an output, a pump, and the query logic. In this example, we are capturing the average price of purchases made by different groups of users, over one minute. We are then sending these results to a second stream, to be stored in RedShift. This process can turn thousands of requests into a few dozen stored lines!

2. Using Kinesis Analytics to pre-process and reduce the amount of data being sent to RedShift
3. Example of a query executed through Kinesis Analytics.

Analytics is also able to detect anomalies through the use of unsupervised algorithms, capturing and passing these anomalies to a lambda. The lambda can then act as appropriate for the scenario. In our next architecture, a notification is sent to the user, with relevant data stored in DynamoBD.

4. Example of Architecture employing Kinesis Analytics to detect possible anomalies

If native functionalities are not enough, analytics allow users to deploy a Java-based application and act upon the captured data through it. This implies additional costs: Analytics is based in KPUs, and the Java application requires an additional KPU to function. Each KPU is composed of 1 vCPU and 4GB of memory, costing $0,11/ hour. Analytics will scale the number of KPUs up or down automatically, depending on the computation power needed to process the queries.

Analytics can output data to Firehose, a Kinesis stream, or a lambda function.

Glue

We can divide Glue into three main tools: Crawlers, the Data Catalog, and Jobs.

The Data Catalog is a support solution, used to store metadata about information located in DynamoDB, JDBC databases, or S3. When used in combination with Firehose, it allows the transformation of the request data structure. We can convert incoming JSON objects into ORC or Parquet, file types optimized for query execution.

It also powers Athena and Spectrum, tools that rely on catalog tables to query semi-structured data in S3.

5. Schema of a simple data catalog table

Crawlers are designed to inspect storage locations, creating catalog tables with relevant meta-data. They are dependent on DPUs, taking some time to initialize.

Crawlers are priced at $0.44 per DPU-Hour.

Glue Jobs is an ETL offering. Based on Python and Spark, Jobs is capable of using catalog tables to help with the deployment process. Yet, this is not required. I couldn’t retrieve data stored in .parquet files by using the catalog information, and to bypass this, the method GlueContext.​GetSourceWithFormat was implemented instead of GlueContext.​GetCatalogSource.

6. Architecture employing Glue jobs to extract and enrich raw data, sending the results to RedShift

It’s batch-based, and the user needs to wait for the DPUs to come up. It’s also priced at $0.44 per DPU-Hour. It’s worth noting that increasing the number of available DPUs won’t necessarily increase the JOB’s performance.

Glue’s main purpose is ETL. If more complex operations are required, EMR (Elastic Map Reduce) should also be considered.

Athena and Redshift Spectrum

Athena and Spectrum are similar. The main difference is that Athena works as a stand-alone solution, while Spectrum is dependent on RedShift clusters.

Both are used to execute SQL queries in semistructured Data stored in S3, taking advantage of Glue’s Data Catalog. Both tools also have the same price. They cost $5,00 per terabyte of analyzed data, with the user paying for each query.

While this might seem expensive, these tools can help users save money on specific scenarios: Keeping rarely used data in S3 and paying for the occasional query can be much cheaper than storing this data in, for example, RedShift. Constantly searching through a small amount of data is also cheap, even with AWS setting the minimal cost of a query, if it’s smaller than at 10MB.

Athena’s results are saved to S3 for posterior use, and if Spectrum is being used, we can even execute joins between data stored in S3 and data stored in RedShift itself.

As specified above, Athena and Spectrum are dependent on Glue’s Data Catalog, using its metadata to execute the searches. Glue also helps with performance optimization: It’s required to transform the received data into .parquet files. These files are smaller, providing even better compression than formats like gzip. They also store information in a columnar configuration.

A query executed on top of them doesn’t look at the entire file, but only at the internal index and necessary columns. This makes the query cheaper and faster: Depending on the structure of the data, queries that would take hundreds of seconds can finish in less than ten.

Taking this structure into account, we can also increase flexibility. After scanning a location containing nested JSON files, a Glue crawler will return a struct that specifies the fields contained by the JSON. We can modify this catalog table, transforming the struct into a map. In this context, the map will specify the type of data contained by the JSON, but not each field. The fields can now change at will, and still be queried by Athena or Spectrum, as shown below.

7. Example of an Athena query, executed on top of a map-based catalog table.

These tools, however, have some limitations. They can’t execute many queries in parallel, and it’s not possible to create indexes on data stored in S3. Also, they still don’t have a callback function implemented on Boto3, so a loop might be necessary to verify the execution status.

Bellow, we can see our Firehose/Kinesis Analytics architecture, now employing Spectrum to retrieve data from S3.

8. Example of architecture where Spectrum is being used to query RAW data stored in S3

AWS Lake Formation

AWS recently released a new platform called Lake Formation. While it doesn’t add new functionality, the tool streamlines the configuration and management of pipelines reliant on Glue and Athena/Spectrum.

At no additional cost, lake formation centralizes access to these tools and simplifies the management of permissions.

The interface provides shortcuts to Glue jobs, crawlers and S3 buckets registered as data-lakes (The targets of Kinesis Firehose, for example). Data Catalog tables are also accessible from the menu.

The tool also provides blue-prints capable of speeding-up the deployment process.

These Blueprints provide forms used to deploy Glue crawlers and jobs automatically: They can extract data from the specified source, being it an SQL database or S3 itself, and load this data at the target bucket. Users can choose between .csv or .parquet as the output format of the data.

10. Lake formation blueprint form

Through this platform, we can also add users and grant permissions for Glue Catalog tables, including rights to alter, create or drop a table. Additionally, we can define which permissions a new user can grant to others.

9. Permissions management form at AWS Lake formation

The tool also offers an easy way to register data lake administrators, as well as to specify who can create a new Catalog data-base.

These access control features of Lake Formation seem especially useful, and will become common place in this type of infrastructure.

Architectures and trade-offs

Knowledge about each tool is what enables us to make decisions based on the project requirements. No pipeline is perfect, and we will need to choose tools based on the need for flexibility, how fast we need to ingest the data, the necessity of a custom consumer, the number of requests per month and if we expect this number to grow, etc.

Some key takeaways:

Regarding the ingestion tools

  • Firehose is cheaper than Streams or SQS and can send data to lambdas, RedShift or S3. It buffers the data, taking a minute or more to release it (What could be an advantage).
  • With Streams, the incoming information is available almost instantly, and data is kept even after a fetch, but it requires (or enables the use of) at least one custom consumer. It is relatively expensive and can increase complexity. It also scales to meet almost any demand.
  • SQS can only send data to one consumer, not keeping information after a fetch. but is cheaper than streams, and simpler to maintain.

Regarding transformation tools

  • Kinesis Analytics can execute queries in firehose and streams even if the data still wasn’t retrieved by a consumer or sent to a destination. It can also detect anomalies, and send data to a Java-based application.
  • If batch processing makes sense, Glue can retrieve data from S3, JDBC databases or DynamoDB, and might be an alternative to Lambdas or Analytics.
  • Athena and Spectrum can decrease costs by enabling the querying and as such the storage, of rarely used data on S3. A lambda can also be triggered when these tools finish a query.
  • Lambdas are useful, but have a limited execution time and are not ideal to directly ingest data.

I hope you enjoyed it!

--

--