Collecting and Distributing High-Resolution Crypto Market Data with AWS ECS, S3, Athena, Lambda, and Data Exchange

Jared Katz
Floating Point Group
14 min readDec 5, 2019

Floating Point Group is on a mission to bring institutional-grade trading services to the world of cryptocurrency. The need and demand for financial infrastructure designed specifically for trading digital assets may not be obvious. There’s a rather pervasive narrative that these coins and tokens are effectively just “natively digital” counterparts to traditional assets like currencies, commodities, equities, and fixed income. This narrative often manifests in the form of pithy one-liners recycled by pundits attempting to communicate the value proposition of various projects in the space (e.g. “bitcoin is just a currency with an algorithmically-controlled, tamper-proof monetary policy” or “ether is just a commodity like gasoline that can be used to pay for computational work on a global computer”). So unsurprisingly, we at FPG are often asked “What’s so special about cryptocurrencies that they warrant dedicated financial services? Why do we need solutions for problems that have already been solved?”

The truth is that these assets and the widespread public interest surrounding them are entirely unprecedented. The decentralized ledger technology that serves as an immutable record of network transactions, the clever use of proof-of-work algorithms to economically incentivize rational actors to help uphold the security of the network¹, the irreversible nature of transactions which poses unique legal challenges in cases such as human error or extortion, the precariousness of self-custody (third-party custody solutions don’t exactly have track records that inspire trust), the regulatory uncertainties that come with the difficulty of both classifying these assets as well as arbitrating their exchange which must ultimately be reconciled by entities like the IRS, SEC, and CFTC… It is all very new, and very weird. With 24hr market volume regularly exceeding $100 billion, we decided to direct our focus towards problems related specifically to trading these assets. Granted, crypto trading has undoubtedly matured since the days of bartering for Bitcoin in web forums and witnessing 10% price spreads between international exchanges. But there is still a long path ahead.

One major pain point we are aiming to address for institutional traders involves liquidity (or more precisely, the lack thereof). Simply put, the buying and selling of cryptocurrencies occurs across many different trading venues (i.e. exchanges), and liquidity (i.e. the offers to buy or sell a certain quantity of an asset at a certain price) only continues to become more fragmented as new exchanges emerge. So say we’re trying to buy 100 bitcoins. We must buy from people who are willing to sell. As we take the best (i.e. cheapest) offers, we’re left with increasingly expensive offers. By the time we fill our order (i.e. buy all 100 bitcoins), we may have paid a much higher average price than, say, the price we paid for the first bitcoin of our order. This phenomenon is referred to as “slippage.” One easy way to minimize slippage is by expanding your search for offers. So rather than looking at the offers on just one exchange, look at the offers across hundreds of exchanges. This process, traditionally referred to as “smart order routing” (SOR), is one of the core services we provide. By actively monitoring liquidity across dozens of exchanges, our smart order routing service allows traders to frictionlessly submit orders that our system can then match against the best offers available across multiple trading venues.

Fanning out large orders in search of the best prices is a rather intuitive and widely applicable concept — roughly 75% of equities are purchased and sold via SOR. But the value of such a service for crypto markets is particularly salient: a perpetual cycle of new exchanges surging in popularity while incumbents falter has resulted in a seemingly incessant fragmentation of liquidity across trading venues — yet traders tend to assume an exchange-agnostic mindset, concerned exclusively with finding the best possible price for a given quantity of an asset.

Access to both realtime and historical market data is essential to the functionality of our SOR service. The highest resolution data we could hope to obtain for a given market would include every trade as well as every change applied to the order book, effectively allowing us to recreate the state of a market at any given point in time². Fortunately, this data is freely available, as many exchanges offer realtime feeds of market data via WebSocket APIs³. This post provides a high-level overview of how we ingest and store realtime market data as well as how we leverage the AWS Data Exchange API to programmatically organize and publish our data sets.

Data ingestion

The WebSocket streams we connect to for data consumption are often the very same APIs responsible for providing realtime updates to an exchange’s trading dashboard.

This is the Binance trading dashboard for the BTC-USDT market. We can see the order book updating on the left and recent trades appearing on the right.

WebSocket connections transmit data as discrete messages. We can inspect the content of individual messages as they stream into the browser. Here we see a message that contains a batch of order book updates:

The updates are expressed as arrays of bids and asks that were either added to the book or removed from it. Client-side code processes each update, resulting in a realtime rendering of the market’s order book. In practice, our data ingestion service (Ingester) does not read a single stream, but rather thousands of different streams, covering various data feeds for all markets across multiple exchanges. All the connections required for such broad coverage as well as the resulting flood of incoming data raise some understandable concerns about data loss. We’ve taken several measures to mitigate such concerns, including a redundant system design that allows us to spin up an arbitrary number of instances of the Ingester service, all of which consume the same data feeds as each other while a downstream mechanism handles deduplication (this is covered in more detail below). We also set up CloudWatch Alerts to notify us when we detect non-contiguous messages, indicating a “gap” in the incoming data. Of course, the alerts don’t directly mitigate data loss, but they do serve the important function of prompting an investigation.

Ingester builds up separate buffers of incoming messages, split out by data-type/exchange/market. Then, after a fixed time interval, each buffer is flushed into S3 as a gzipped JSON file. The buffer-flush cycle repeats.

Files in S3 containing trade data from Binance’s BTC-USDT market from October 25th, 2019

Here’s what the content of one of these files looks like:

Trade data consumed from a WebSocket stream that has been normalized, batched, and flushed to S3. Each row contains data for a single trade.

Ingester handles additional functionality such as applying pre-defined mappings of venue-specific field names to our internal field names. Data normalization is just one of many processes necessary for enabling our systems to build a holistic understanding of market dynamics.

As with most distributed system designs, our services are written with horizontal scalability as a first-order priority. We took the same approach in designing our data ingestion service, but it’s important to note that it has some features that make it a bit different than the archetypical horizontally-scalable microservice. The most common motivations for adjusting the number of instances of a given service are load-balancing and throttling throughput. Either your system is experiencing backpressure and a consumer service scales to alleviate that pressure, or the consumer is over-provisioned and you scale down the number of instances for the sake of parsimony. In the case of our data ingestion service, however, our motivation for running multiple instances is to minimize data loss via redundancy. The CPU usage for each instance is independent of instance count, as each instance does identical work; rather than, say, helping alleviate backpressure by pulling messages from a single queue, each instance of our data ingestion service connects to the same WebSocket streams and performs the same amount of work. Another somewhat unusual and confounding aspect of horizontally scaling our data ingestion service is related to state: we batch records in memory then flush the records to S3 every minute (based on the incoming message’s timestamp, not the system timestamp, as those would be inconsistent). Redundancy is our primary measure for minimizing data loss, but we also need each instance to write the files to S3 in such a way that we don’t end up with duplicate records. Our first thought was that we’d need a mechanism for coordinating activity across the instances such as maintaining a cache that would allow us to check if a record had already been persisted. But then we realized that we could perform this deduplication without any coordination between instances at all. Most of the message streams we consume publish messages with sequence IDs. We can combine the sequence IDs with the incoming message timestamp to achieve our deduplication mechanism: by writing our service code to check that the message being added to the batch has the appropriate sequence ID relative to the previous message in the batch and using the timestamp on the incoming message to determine the exact start and end of each batch (we typically get a UNIX timestamp and just check when we’ve rolled over to the next clock minute), we can deterministically generate the same exact file names containing the exact same data. This allows us to simply rely on a key collision in S3 for deduplication. As an aside, it’s interesting to note that I recently came across this documentation regarding “handling duplicate records” while investigating Kinesis Streams, which suggests a very similar solution to what I’ve described above (for a slightly different problem):

With this scheme, even if records are processed more than one time, the resulting Amazon S3 file has the same name and has the same data. The retries only result in writing the same data to the same file more than one time.

Once we’ve stored the data, we can perform simple analytics queries on the billions of records we’ve stored in S3 using AWS Athena, a query service that requires minimal configuration and zero infrastructure overhead. Athena has a concept of “partitions” (inherited from one of its underlying services, Apache Hive). Partitions are simply mappings between virtual columns (in our case: pair, year, month, and day) and the S3 directories where the corresponding data is stored.

We follow a consistent key prefix structure when writing files to S3 that actually serves as a partitioning scheme for Athena

By pointing Athena directly to a particular subset of data, a well-defined partitioning scheme can drastically reduce query run times and costs. Though the ability to perform ad-hoc business analytics queries is mostly a “nice-to-have” feature for us, taking time to choose a sane multi-level partitioning scheme for Athena based on some of our most common access patterns seemed worthwhile, as a poorly designed partition structure can result in Athena unnecessarily scanning huge swaths of data and ultimately render the service unusable.

Using Athena to fetch all 775,418 trades for Binance’s BTC-USDT market on October 25th, 2019, ordered by the trade timestamps (ascending)

Data publication

Our pipeline for transforming thousands of small gzipped JSON files into clean CSVs and loading them into AWS Data Exchange involves three distinct jobs, each expressed as a Lambda function.

Job 1

Job 1 is initiated shortly after midnight UTC by a cron-scheduled CloudWatch Event. It formats values for the date and data source into an Athena query “template,” then outputs the query results as a CSV to a specified prefix path in S3. This PUT request to S3 triggers an S3 Event Notification.

It’s worth noting here that we run a full replica data ingestion system as an additional layer of redundancy — using the coalesce conditional expression, the Athena query in Job 1 merges data from our primary system with the corresponding data from our replica system, filling in any gaps while deduplicating redundant records.

An Athena query template
CloudWatch Event cron trigger
Output files from Athena query
CloudWatch Logs for a single run
CloudWatch Metrics

Job 2

Job 2 is triggered by the S3 Event Notification from Job 1. Job 2 simply copies the query results CSV file to a different key within the same S3 bucket.

The motivation for this step is twofold. First, we cannot dictate the name of an Athena query results CSV file — it is automatically set to the Athena query ID. Second, when adding an S3 object as an asset to an AWS Data Exchange revision, the asset’s name is automatically set to the S3 object’s key. So if we’d like to dictate how the CSV file name appears in AWS Data Exchange (which we do), we must first rename it, which we accomplish by copying it to a specified S3 key.

S3 trigger (left), CloudWatch Logs for a single run (right)

Job 3

Job 3 handles all work related to AWS Data Exchange and AWS Marketplace Catalog via their respective APIs. We use boto3, AWS’s Python SDK, to interface with these APIs. Note that the AWS Marketplace Catalog API is necessary for adding data set revisions to products which have already been published.

Our code explicitly defines mappings with the following structure:

data source → DataSet → Product(s)

Our data sources are typically represented by a trading venue + data type combination (e.g. “Binance trades”, “CoinbasePro order books”, etc.). Each new file for a given data source is delivered as a single asset within a single new revision for a particular data set.

An S3 trigger kicks off the lambda function. The trigger is scoped to a specified prefix which maps to a single data set. Lambda’s function alias feature allows us to define the unique S3 triggers for each data set while reusing the same underlying lambda function. Then Job 3 carries out the following steps:

AWS Data Exchange API

  1. Submits a request to create a new revision for the corresponding data set via CreateRevision.
  2. Adds the file which was responsible for triggering the lambda to the newly created revision via CreateJob using the IMPORT_ASSETS_FROM_S3 job type. In order to submit this job, we need to supply a few values — the S3 Bucket and Key values for the file are pulled from the lambda event message, while the RevisionID argument comes from the response to the CreateRevision call in the previous step.
  3. Kicks off the job with StartJob, sourcing the JobID argument from the response to the CreateJob call in the previous step.
  4. Polls the job’s status via GetJob (using the job ID from the response to the StartJob call in the previous step) to check that our file (i.e. the “asset”) was successfully added to the revision.
  5. Finalizes the revision via UpdateRevision.

AWS Marketplace Catalog API

  1. Requests a description of the marketplace entity using DescribeEntity, passing in the product ID stored in our hardcoded mappings as the EntityID argument.
  2. Kicks off the entity ChangeSet via StartChangeSet, passing in the entity ID from the previous step, the entity ID from the DescribeEntity response in the previous step as EntityID, the the revision ARN parsed from the response to our earlier call to CreateRevision as RevisionArn, and the data set ARN as DataSetArn, which we fetch at the start of the code’s runtime using AWS Data Exchange API’s GetDataSet.

Here’s a thin wrapper class we wrote to carry out the steps detailed above:

S3 trigger (left), CloudWatch Logs for a single run (center), CloudWatch Alarm for “ERROR” and “CRITICAL” log messages (right)

Finally, we can verify that our revisions have been successfully added to their corresponding data sets and products through the AWS console:

Revision successfully added to the target data set

It’s worth noting that AWS Data Exchange allows sellers to create private offers for their own AWS account IDs, providing a convenient means of sanity-checking that revisions show up in each product as expected:

Revision successfully added to the target product

Conclusion

I hope this post served as a useful demonstration of how AWS Data Exchange can be frictionlessly integrated into an existing data pipeline. We’re pleased to have been invited to participate in the AWS Data Exchange private preview, and even more pleased with the service itself, which has proven to be a sophisticated yet natural extension of our system.

I want to offer special thanks to both Kyle Patsen and Rafic Melhem of the AWS Data Exchange team for generously fielding my questions (and patiently enduring my ramblings) for the better part of the past year. I also want to thank to Lucas Adams for helping me design the system discussed in this post, and, more importantly, for his unwavering vote of confidence.

If you’re interested in our services, don’t hesitate to contact us. Also, we’re hiring! Check out our careers page for open positions.

  1. Granted, the “proof-of-work” concept dates back at least as far as 1993, but it was not until bitcoin that the technology showed potential for widespread adoption.
  2. The updates provided through the WebSocket streams are not sufficient for reconstructing order books. We need to periodically fetch snapshots of the order books and store those as well, which we can do using an exchange’s REST API. Then we can fetch a snapshot and apply the corresponding updates from the streams to “replay” the order book.
  3. We found several third-party vendors selling subscriptions to these data sets, typically in the form of CSV dumps delivered at a weekly or monthly cadence. This presented the question of “build vs buy.” Given that we felt capable of building a robust and reliable system for ingesting realtime market data in a relatively short amount of time and at a fraction of the cost of purchasing the data from a vendor, we were already leaning in favor of “build.”Further investigation made “buy”look like an increasingly unattractive option. Disclaimers issued by multiple vendors about their inability to guarantee data quality and consistency did not inspire confidence. Inspecting sample data sets revealed that some essential fields provided in the original data streams were missing — fields necessary for achieving our goal of being able to recreate the state of a market at an arbitrary point in time. We also recognized that a weekly/monthly delivery schedule would restrict our ability to explore relatively recent market data.
  4. Our system’s functionality extends well beyond data ingestion, normalization, and persistence; we run dedicated services for data validation, caching the most recent trade and order book for every market, computing and storing derivative metrics, and other services which help safeguard data accuracy and minimize the latency of our trading systems.
  5. Like most of our microservices, Ingester is a Dockerized service run on ECS and deployed via Terraform.
  6. S3’s file system is not actually hierarchical. Files are just prepended with long key prefixes that are rendered as directories in the AWS console when browsing a bucket’s contents. This has some non-trivial performance consequences when querying or filtering on large data sets.
  7. As mentioned earlier, our data ingestion service’s batching mechanism flushes each batch to S3 at a regular time interval. The rollover from one interval to the next is determined by a timestamp on the incoming message (applied server-side) as opposed to the ingestion service’s system timestamp, so in the rare case that a non-trivial amount of time elapses between the consumption of the final message of batch n and the first message of batch n+1, we kick off the first lambda function 20 minutes after midnight UTC to minimize the likelihood of omitting data pending write.
  8. Every Athena query produces a .metadata file and a CSV file of the query results (though DDL statements do not output a CSV).
  9. We experimented fairly extensively with AWS Glue + PySpark for the ETLish work performed in Job 1, but upon realizing that we could achieve our goals of merging all the small source files into one, joining the primary and replica data sets, and sorting the results, all with a single Athena query, we decided to stick with this seemingly simpler and more elegant approach.

--

--