Multi-Source Processing v2.0

James Gimourginas
Slalom Build
Published in
16 min readMay 5, 2020
Photo by NASA on Unsplash

Have you ever worked on a project that, looking back years later, you wish you could redo? I’m sure I’m not alone in reassessing past projects and wondering what the outcomes would have been using the technology offerings and delivery practices of today. For me, there’s a special project, still one of the largest I’ve worked on, that was, by design, ahead of its time.

About ten years ago, I was part of a team that was asked to build a new system to demonstrate how emerging technologies of the time could be combined to solve the space challenges of the future. Our charge was to integrate truly bleeding edge tech, challenge existing processing paradigms, and break through barriers present in legacy systems. The resulting system was meant to provide more accurate and more timely awareness of space activities in anticipation of increased congestion and contention in the space domain. Our approach involved multi-source data processing and decision support services, which were backed by a set of emerging distributed computing technologies (e.g. Hadoop, Puppet, etc.) and approaches (e.g. Continuous Delivery, Microservices, etc.). I’ll call the system Multi-Source Processing (MSP) for simplicity.

There were many bright spots during those years: engineers who had an opportunity to be challenged by a world-class problem space, a dedicated team who believed in the mission we were supporting, strong leadership and guidance from senior sponsors who valued disrupting the untenable status quo, and interesting distributed computing technologies of the time that provided a glimpse into what the tech landscape would look like in the decade to come. Unfortunately, the bright spots are more like a silver lining than a glorious triumph. The bleeding edge technology of the time wasn’t quite ready for prime time.

What we were missing was a simpler, more scalable, and more reliable way to realize the architecture we had defined. Over the last decade, AWS and other public cloud providers have made remarkable progress in creating services that are foundational to robust distributed computing solutions. With those advancements, teams can now spend more time focused on user needs instead of spending so many cycles (like we did) on infrastructure management, data management, throughput, scale, reliability, security and other non-functional concerns.

Which leads to a fun thought experiment: how might a similar system (let’s call it MSP 2.0) be architected today on AWS?

Data Ingress

In MSP 1.0, there were a wide range of systems creating the input we would be processing. The more modern systems were event-driven (we called them “near-real-time”), but the large majority still relied on batch processing. The amount of data was, for its time, enormous (we referred to it as “big data”). Millions of measurements to process, hundreds of thousands of calculations to adjust based on those measurements, and a mixture of structured and semi-structured data from different sources to link together.

The approach we took at the time consisted of storing files into a large network file system on receipt. Then our Inbound File Adapter, using Spring Integration’s file inbound channel adapter, would identify the new file and publish its contents as a message to the appropriate ActiveMQ topic, depending on file type. The Inbound File Adapter is also where we’d add metadata and normalize messages to a common (XML-based) messaging schema.

Some of the limitations we hit:

  • Reprocessing data wasn’t possible. Our NFS was large but the data we were processing was larger. It wasn’t possible to keep originals of all inbound files, meaning we weren’t able to recover from failures by reprocessing originals.
  • NFS permission management was a burden. Granular file system permissions weren’t easily managed across environments, where each environment had a different permission set for NFS access. Dealing with cross-environment access differences was possible but took extra work to automate and verify (meaning more cycles not focused on delivering real user value).
  • Single points of failure impacted reliability. The Inbound File Adapter, for example, was its own dedicated service (good) but was a single point of failure (bad). We had tight control of our servers and built custom monitoring to stay aware of our services, but this was all custom work and didn’t add any user value.
  • The integration solution became a bottleneck. The ActiveMQ messaging solution required tuning, updates, and maintenance (if it was overloaded) and (like our system overall) had limited memory and disk space. We beefed up the server hosting ActiveMQ (scaled up) but, again, that was custom engineering work.

What would I do today?

Figure 1 — AWS Data Ingress

First, I would use Amazon S3 for inbound file storage. S3 scales infinitely, allowing us to preserve all inbound files for some reasonable (a month? a year?) amount of time, before archiving for historical reference. The initial destination within S3 would be a “Landed” bucket for all the original files.

Second, I’d distribute files based on type and source, mapping type to buckets and source to folders. This would allow us to manage write access on a per-bucket and per-folder basis. And best of all, no single point of failure given S3’s phenomenal availability.

Third, I’d use built-in S3 triggers (SNS => Lambda => Step Functions) to process the files as they landed. Because we couldn’t keep copies of all inbound files, our first step after files landed was storing their contents into a centralized relational database (facepalm). With S3 in place upstream, there would be less urgency to store the file contents immediately. Instead, we could afford more up-front processing cycles on validating, cleansing, deduplicating (it was possible to get the same record from multiple sources), and normalizing. The use of SNS prior to Step Functions isn’t necessary for this specific scenario but does help with extensibility for other scenarios. The end result, a file in a “Ready” bucket would be ready for downstream processing.

A perfectly acceptable alternative to Step Functions for validating, cleansing, and deduplicating new data would be AWS Glue. I personally like the Step Functions approach (probably thanks to several years I spent developing a workflow platform) for its simplicity and ease of control. However, as AWS Glue becomes more feature-rich in areas like streaming and event-driven processing, it becomes a perfectly viable solution for the type of architecture I’d favor for MSP 2.0.

Whether the ultimate solution leverages Step Functions or Glue, the improved approach relies on two key elements that we struggled with in MSP 1.0: unlimited storage and inherent event-driven processing enabled by the storage solution.

Parallel Processing and Storage

We did have the foresight to not only store inbound measurements in a centralized relational database, but also in several different flavors of distributed storage so that parallel processing with data affinity was possible. Hadoop was a very popular framework to allow parallel processing at the time, so we used it (starting with v0.20.X, showing just how bleeding edge we were comfortable with). The Map/Reduce paradigm wasn’t a great fit for most of our parallel processing needs, but there were very few open-source frameworks available that included distributed task scheduling, error and failover handling, and, an out-of-the-box dashboard for operational visibility. While it wasn’t a perfect fit, it allowed us to process an order of magnitude more measurements per day than any system at the time.

Some of the challenges we hit:

  • Hadoop cluster management was a burden. Maintaining the Hadoop clusters across Dev, QA, Staging, and Prod environments was cumbersome. A very early version of Puppet helped us keep the clusters in sync but the overall robustness and tooling integration wasn’t what it is today.
  • Operational visibility was disjoint. Operating the Hadoop clusters in an integrated way alongside the rest of our system required significant additional effort. The Hadoop cluster had its own operational dashboards and metrics but those weren’t integrated with our application-level monitoring and reporting. A single operational picture could be created, but it required custom engineering and integration with yet another product.
  • The eventual consistency approach wasn’t robust. Ensuring eventual consistency across our centralized database (used as our source of truth), the Hadoop cluster’s HDFS, and other storage mechanisms such as EHCache’s replicated cache was difficult. We had to address edge cases as they occurred since the synchronization across stores required custom development.

What would I do today?

With a much more robust set of options to choose from, I’d update both our parallel processing framework selections and supporting storage selections to be better fit for purpose. I’d also shift away from a centralized database acting as our single source of truth. Instead, I’d fully adopt a microservices architecture. In other words, I’d favor services that own their own processing and their own data. This would allow data store selections that best matched data types of interaction patterns, selecting among DynamoDB, Aurora, and ElastiCache, for example. For integration, I’d use streams as a means of keeping data relevant for processing synced across domains, and use features like Step Function’s Map state to enable dynamic parallel processing.

Figure 2 — AWS Parallel Processing and Data Storage

Starting on the left, the Measurement Service would receive and store Measurements, with the Data Ingress approach outlined above, sending new Measurements as they landed. The Correlation Service would also receive the same Measurements, once they were ready for processing. Correlation Results would be stored in DynamoDB as they were calculated, and DynamoDB Streams would trigger an update to the original Measurement to include a Tag linking the Measurement to the State to which it correlated.

Correlation Results, in the same stream mentioned above, would also trigger State Update processing for any State having a newly correlated Measurement. State Updates would be stored in DynamoDB as they were created, and DynamoDB Streams trigger an update the original State with a new set of values (calculated using the newly correlated Measurements and the previous State as input for State Update processing).

Amazon Aurora would be used for storing the two entities — Measurement and State — since API access will allow for querying across attributes. DynamoDB would be used for storing calculated results — Correlation Results and State Updates — since these require high-throughput, low-latency access but do not need to support complex queries (i.e. key/value would be sufficient).

The Correlation and State Update Step Functions would differ in implementation specifics but would follow a similar pattern for execution: Setup, Process, Cleanup. The execution pattern would be performed at scale and in parallel, using AWS Step Function’s Map state.

Figure 3 — Dynamic Parallelism with Step Functions

For those unfamiliar, AWS Step Functions provides the ability to define processing workflows, including steps that can process work across Lambda functions in parallel. The workflow for Correlation Processing would begin by loading the latest States from the State API, storing them in Amazon ElastiCache for easy access in subsequent workflow steps. Next, the workflow would separate measurements into granular chunks that could be processed in parallel, with each chunk traversing the Setup, Process, and Cleanup steps. Ultimately, a single Step Function invocation would produce many Correlation Results, each stored in Amazon DynamoDB.

What really stands out is the simplicity AWS provides for high-throughput, low-latency processing. It took us literally weeks to get the supporting infrastructure and services in place to support our parallel processing; today it would take hours (the first time) to minutes (every time after).

APIs and User Interfaces

We did provide API and User Interfaces to access to the data and processing results from the system. These layers would likely not be massively different than they were, though the implementations and ease of deployment and operations would certainly be different. At the time, we used the Jersey REST API framework for the APIs, ExtJS for the web application, and a WorldWind-based desktop application. Since that time, a lot has changed.

Some of the challenges we hit:

  • Large payloads, leading to slow response times. We had much larger than necessary payloads between APIs and UIs, and slower than ideal response times as a result. Aggregating from APIs across Services required — wait for it — yet another custom API. Further, even with an aggregation API, clients were unable to proactively filter for the elements they actually needed.
  • Database contention, leading to slow response times. Though APIs were decentralized across each Service, the centralized, relational data store backing them caused contention and additional response time challenges.
  • No content delivery network or client-side caching mechanisms, leading to slow response times. We had no good way to improve UI latencies by moving the web application closer to our users. Our on-prem web application deployments were located on the east coast, while most of our target users were on the west coast.
  • Web browsers couldn’t support the UI features our users wanted. Having two user interfaces (web and desktop) using two completely separate code bases added a significant amount to our backlogs. Having two UIs was a good idea at the time, when web browser limitations were severely restricting (e.g. IE6 support), but added to an already demanding delivery schedule.

What would I do today?

For APIs, I’d favor a Serverless approach wherever possible. This would simplify our development, deployment, testing, and operations. In addition, I’d build a Backend For Frontend (BFF) using Apollo or AppSync to simplify the aggregation/linking/filtering of data elements from across Service APIs. For UIs, I’d move to a web application (and only a web application ) using a leading modern web framework or library (take your pick). For web application deployment, I’d leverage the simplicity of S3 and the content delivery benefits of CloudFront.

Figure 4 — Web Application, GraphQL, and APIs

A few additional notes on the services depicted above:

  • AWS Cognito provides a User Store, Management, and Authentication services for the web application, though it could be replaced by any solution that offered the same capabilities
  • The web application can still go directly to the Service APIs (protected by an API Gateway) or choose to use the BFF (using the same credentials the API Gateway would accept, a Cognito-issued JWT)

It’s amazing to think about the products and custom solutions our old system could eliminate using the solutions available today. A ton of additional features with a fraction of the complexity.

Data Reporting

Not to sound like a broken record but storing all the data we had at our disposal in a centralized database was a really bad idea. We fortunately had purchased (for a sizable cost) a massive appliance to house our database (i.e. we scaled up), which came with the ability to create a read-only database snapshot that we could use to produce daily reports. This wasn’t an ideal solution, as our reports still required a lot of custom code to extract (from an RDBMS) and transform (using a static report format) into something digestible for analysts. Worst of all, the daily reports didn’t reflect reality by the time they were generated. It took, literally, hours to create the reports and, by that time, thousands of new measurements had been received and processed.

What would I do today?

When storage is no longer a limiting factor and when services are improved to exploit a much larger, more disparate set of data, we would look at the problem differently. Creating a Data Lake, built on S3, is one immediate improvement.

Figure 5 — Reporting Pipelines

Processing the raw data provided from internal and external sources consistently leads to a well-structured, already-validated set of data to use for reporting. From there, we would populate purpose-built reporting stores to power different types of reports: those that look at relationships, historical trends, and real-time aggregations and recommendations. Everything “to the right” of the Reporting Stores would be built based on prioritized personas and user flows, but the foundation would come from those event-driven data processing pipelines.

As mentioned in the Data Ingress section, the ability to store all of our data and trigger downstream processing in an inherently event-driven fashion (whether enabled by Step Functions or AWS Glue) would solve many of the data reporting challenges we faced in MSP 1.0.

Operational Visibility

Because we weren’t using a single, unified platform as the foundation for our system, it took work (a lot of work) to maintain operational visibility. Sadly, we relied heavily on log files (distributed over a large compute cluster) and operational dashboards from each of the integrated products (Hadoop Map/Reduce, Hadoop HDFS, Oracle RDBMS, ActiveMQ, Layer7 API Gateway, etc.) to determine how our system was performing. We had some basic service health endpoints to help identify outages, but ultimately it was a cobbled together solution that required a lot of care and feeding.

What would I do today?

Using AWS would solve, by providing a single and unified platform, many of the challenges we faced that resulted from our disparate collection of technologies. It would also provide a means to collect metrics from across our Services using a consistent approach and a single aggregation point. This approach would involve a combination of built-in AWS CloudWatch metrics to provide a baseline of insight, custom AWS CloudWatch metrics that would provide additional Service-specific metrics, and AWS X-Ray for traceability across microservices. Building AWS CloudWatch dashboards and alerting would be fairly straightforward once that level of data collection was in place.

Figure 6 — Built-In and Custom Metrics

Supplementing the Services defined in the previous section, we have three new outbound arrows representing the flow of metrics and traces from the Services to Operational Visibility services. CloudWatch gets the built-in metrics directly from AWS. In addition, custom metrics are sent to Kinesis Data Streams for further processing. We can either write those directly to CloudWatch (via a Lambda) or can add some preprocessing using Kinesis Data Analytics beforehand.

The simplicity and comprehensiveness of the solution is made possible by adopting a single, unified platform. Built-in metrics from each AWS service, for example, provides an exceptional foundation on which to build in additional operational visibility.

Security

Security was at the heart of our system from the beginning. Since we were mixing data from multiple sources, we had to be sensitive to the data ownership and sharing rules of each source. We had multiple levels of protection built-in, including in these key areas:

  • Authentication and Authorization — We needed to know who was accessing the system
  • Row-Level Database Access Control — One of the reasons we needed to know who was accessing the system was to filter data appropriately for different user groups
  • Data Egress — In addition to filtering data requested by users with access to the system, we also needed to verify data leaving our system was authorized to leave the system

There were other security considerations at play as well, but these three were paramount.

For Authentication, we relied on establishing 2-Way SSL connections between all clients (including users) and servers. Given the environment in which we were operating, we could safely assume client certificates were in place for all of our users. However, this did add a cumbersome manual series of steps as we on-boarded new users and user groups.

For Authorization, we relied on a Layer7 API Gateway (later acquired by CA, then Broadcom). The challenge here was that the API Gateway and the client certificate issuance process were completely disconnected. We, the development team, had to bridge that gap through API Gateway configuration changes anytime a new Root Certificate Authority, for example, was introduced.

For Row-Level Access Control, we selected Oracle Label Security (OLS). Part of the reason we did choose to go with a centralized database was to ensure all access was governed through a single OLS policy, applied to a single database. That made access traceability easier to manage and audit, though it did come at a cost of database contention and performance drag.

For Data Egress, we relied on filtered data extraction via the row-level access control provided above and then cross-checked the results during data egress using a leading Guard solution. The challenge was that this resulted in a solution that was binary at the measurement or state level. For example, either a measurement could leave the system, in its entirety, or it could not. This limited downstream visibility, potentially more so than actually required by security policy.

What would I do today?

With services available today from AWS, the Authentication, Authorization, and Row-Level Access control mechanisms would be much more tightly integrated. Improved integration would lend itself to simpler and more comprehensive configuration, automation, and auditability.

Amazon Cognito acting as both the User Store (i.e. a User Pool) and Authentication Service. There are certainly a slew of Cognito alternatives, including offerings from Okta and Ping, but keeping the Authentication and Authorization on the same platform as all of our other services is the simplest and most cohesive option. Amazon API Gateway as the Authorization Service, leveraging the provided Cognito User Pool Authorizer. From there, we could leverage the Cognito-issued User Tokens to limit access to either DynamoDB or Aurora (the PostgreSQL version) based on user attributes (e.g. level, department, group, etc.) captured in the token.

Figure 7 — Coarse-Grained and Fine-Grained Authorization

You’ll notice there’s still one important security aspect missing: Data Egress that provides (static) Data Filtering and (dynamic) Data Masking. A service like AWS Macie is a step in the right direction, but the data protection mechanisms and timeliness are severely lacking for our (event-driven) needs. Ideally, a data protection service offering from AWS will soon exist to mask data (statically or dynamically) using a set of possible data masking schemes, including tokenizing, redacting, masking and substitution. The goal would be to identify several outbound gateway patterns that would allow two combinations:

  • S3 => Data Protection Service => Outbound Adapters — This would be static data masking for pre-created content, such as a daily report
  • Lambda => Data Protection Service => API Client — This would be dynamic data masking for user-requested content, such as an API request

In both cases, the Data Protection Service would act as a proxy between “what we want to leave the system” and “what is permitted to leave the system”, providing necessary masking along the way. What a powerful and marketable new product offering (note to self).

Final Thoughts

Tech has changed over the last decade, and remarkably so. What was at the time a bleeding edge solution is very clearly, as we saw throughout this article, now squarely a “legacy” system. One key takeaway is to optimize solutions for a low cost of change so that, as advancement in technology continue, it’s easy to take advantage of them.

Another key takeaway is that the services AWS offers, especially in the serverless arena, are absolute game changers when building large, secure, distributed, high-throughput transactional solutions. The ability to provision scalable and resilient foundational services improves the timeliness and quality of the solutions being developed and delivered. This enables teams to recoup cycles from infrastructure management and apply them to creating user value.

While we didn’t have the platform scale, extensibility, and cohesion available today, we took a number of innovative approaches (some by circumstance, some by design) using the best solutions available at the time. If nothing else, it makes me (and probably others) fully appreciate the advancements that have come since. It also highlights the value of continuously exploring new tools and applying them to address lingering challenges. Systems metaphorically held together by duct tape and dental floss are ripe for a refresh.

--

--

James Gimourginas
Slalom Build

I build, lead, and manage high-performing development and delivery teams to create world-class technology products.