Financial Times Data Platform: From zero to hero
The Financial Times, one of the world’s leading business news organisations, has been around for more than 130 years and is famous for its quality journalism.
To stay at the top for this long, you have to be able to adapt as the world changes. For the last decade, that has meant being able to take advantage of the opportunities that technology provides, as the FT undergoes a digital transformation.
This article will take an in-depth look behind the scenes for one part of that transformation: the creation and evolution of the Financial Times’ Data platform. The Data Platform provides information about how our readers interact with the FT that allows us to make decisions about how we can continue to deliver the things our readers want and need.
Generation 1: 2008–2014 Early days
At first, the Data Platform focussed on providing recommendations to readers based on what they had read already.
At the time, the majority of our readers still read the FT in print, so a single store and 24 hours latency was sufficient. The architecture was clean and simple, and Financial Times’ employees were able to execute queries on top of it to analyse user’s interests.
But then a number of events happened.
- Internet revolution. The internet took off, and day after day the number of readers visiting ft.com rather than reading the print newspaper increased.
- Mobile innovation. Mobile devices started being part of people’s lives. Having a smartphone moved from a luxury to an expectation, and this allowed the Financial Times to release mobile applications for each of the most popular operating systems. This became another stream of users who could benefit from reading articles while they were travelling to work, resting at home or being outside in nature without access to their laptops.
Generation 2: 2014–2016 The arrival of our Extract, Transform, Load (ETL) Framework
The second generation of our platform faced two new challenges: firstly, the need to allow our stakeholders to analyse data at scale, asking new types of questions; and secondly, an increasing volume of data.
In order to achieve these goals, we built our own ETL Framework in 2014. This allowed our teams to set up new jobs and models in an automated and scalable way and included features such as:
- Scheduling. Automating running SQL queries multiple times per day, synchronising the outputs with other teams and last but not least focusing more on the business cases rather than on the implementation details.
- Python interface. Providing the ability to run Python code in addition to the SQL queries, allowing the stakeholders to run even more complex data models.
- Configuration over implementation. One of the reasons for choosing to introduce an ETL Framework was the ability to produce jobs in XML file format, which enabled even more business capabilities at that time.
The release of the ETL Framework had a huge positive impact but could not on its own resolve all issues coming with the increased amount of data and number of consumers.
In fact, adding this new component actually created more issues from a performance point of view, as the number of consumers of the Data Platform increased, now including the Business Intelligence (BI) Team, Data Science Team, and others. The SQL Server instance started to become a bottleneck for the Data Platform, hence for all the stakeholders too. It was time for a change and we were trying to find the best solution for this particular issue.
As the Financial Times was already using some services provided by Amazon Web Services (AWS), we started evaluating Amazon Redshift as an option for a fast, simple and cost-effective Data Warehouse for storing the increasing amount of data. Amazon Redshift is designed for Online Analytical Processing (OLAP) in the cloud which was exactly what we were looking for. Using this approach we were able to optimise query performance a lot without any additional effort from our team to support the new storage service.
Generation 3: 2016–2018 The beginning of Big Data at Financial Times
Having Amazon Redshift as a Data Warehouse solution and an ETL Framework as a tool for deploying extract, transform, load jobs, all the FT teams were seeing the benefit of having a Data Platform. However, when working for a big company leading the market, such as Financial Times in business news distribution, we cannot be satisfied with our existing achievements. That’s why we started to think how we can improve this architecture even more.
Our next goal was to reduce data latency. We were ingesting data once per day, so latency was up to 24 hours. Reducing latency would mean the FT could respond more quickly to trends in the data.
In order to reduce the latency, we started working on a new approach — named Next Generation Data Analytics (NGDA) — in 2015 and in early 2016 it was adopted by all teams in Financial Times.
First, we developed our own tracking library, responsible for sending every interaction of our readers to the Data Platform. The existing architecture expected a list of CSV files that would have been transferred once per day by jobs run by the ETL Framework, so sending events one by one meant that we needed to change the existing architecture to support the new event-driven approach.
Then, we created an API service responsible for ingesting readers’ interactions. However, we still needed a way to transfer this data to the Data Warehouse with the lowest possible latency as well as exposing this data to multiple consuming downstream systems. As we were migrating all services to the cloud, and more specifically to AWS, we looked at the managed services provided by Amazon that could fulfil our event processing needs.
After analysing the alternatives, we redesigned our system to send all raw events from ft.com to the Simple Notification Service (SNS). Using this approach, it was possible for many teams in the organisation to subscribe to the SNS topic and unlock new business cases relying on the real time data.
Still, having this raw data in SNS was not enough — we also needed to get the data into the Data Warehouse to support all the existing workflows. We decided to use a Simple Queue Service (SQS) queue as it allowed us to persist all events in a queue immediately when they arrived in the system.
But before moving the data to our Data Warehouse, we had one more requirement from the business — to enrich the raw events with additional data provided by internal services, external services or by simple in-memory transformations. In order to satisfy these needs with minimal latency, we created a NodeJS service responsible for processing all the events in a loop asynchronously, making the enrichment step possible at scale. Once an event had been fully enriched, the data was sent immediately to the only managed event store provided by AWS at that time — Kinesis. Using this architecture, we were able to persist our enriched events in a stream with milliseconds latency, which was amazing news for our stakeholders.
Once we had the data in a Kinesis Stream, we used another AWS managed service — Kinesis Firehose — to consume the enriched events stream and output them as CSV files into a S3 bucket based on one of two main conditions — a predefined time period having passed (which happened rarely) or the file size reaching 100mb. This new event-driven approach produced CSV files with enriched events in a couple of minutes depending on the time of the day, hence the latency in our data lake was reduced to 1–5 minutes.
But there was one more important requirement from the business teams. They requested clean data in the Data Warehouse. Using the Kinesis Firehose approach, we couldn’t guarantee that we only had one instance of an event because:
- We could receive duplicate events from our client side applications.
- The Kinesis Firehose itself could duplicate data when a Firehose job retried on failure.
In order to deduplicate all events, we created another Amazon Redshift cluster responsible for ingesting and deduplicating each new CSV file. This involved a tradeoff: implementing a process which guarantees uniqueness increased the latency for data to get into the Data Warehouse to approximately 4 hours, but enabled our business teams to generate insights much more easily.
Generation 4: 2019 Rebuild the platform to allow our team to focus on adding business value
Generation 3 of the platform was complicated to run. Our team spent most of the day supporting the large number of independent services, with engineering costs increasing, and far less time to do interesting, impactful work.
We wanted to take advantage of new technologies to reduce this complexity, but also to provide far more exciting capabilities to our stakeholders: we wanted to turn the Data Platform into a PaaS (Platform as a Service).
Our initial criteria were the platform should offer:
- Self service — Enabling stakeholders to independently develop and release new features.
- Support for multiple internal consumers — with different teams having different levels of access.
- Security isolation — so that teams could only access their own data and jobs.
- Code reuse — to avoid duplication for common functionality.
Building a multi-tenant, self service platform is quite challenging because it requires every service to support both of these things. Still, putting effort into implementing this approach would be extremely beneficial for the future, with the key benefits being:
- Stakeholder teams can deliver value without having to wait to coordinate with platform teams — this reduces costs, increases velocity, and puts them in charge of their own destiny
- Platform teams can focus on building new functionality for the platform — rather than spending their time unblocking stakeholder teams
The way we chose to deliver this decoupling was through a focus on configuration over implementation, with stakeholder teams able to set up their own management rules based on their internal team structure, roles and permissions, using an admin web interface.
A software system is like a house. You need to build it from the foundations rather than from the roof. In engineering, the foundation is the infrastructure. Without a stable infrastructure, having a production ready and stable system is impossible. That’s why we have started with the foundation, discussing what would be the best approach for the short and long term future.
Our existing Data Platform has been deployed to AWS ECS. While AWS ECS is a really great container orchestrator, we decided to switch to Kubernetes because on EKS, we get baked in support for lots of things we need for supporting multiple tenants, such as security isolation between the tenants, hardware limitations per tenant, etc. In addition to that there are many Kubernetes Operators coming out of the box for us, such as spark-k8s-operator, prometheus-operator and many more. AWS has been offering a managed Kubernetes cluster (EKS) for a while and it was the obvious choice for the foundations of the Data Platform for the short and long term future. Aiming to have a self service multi-tenant Data Platform, we had to apply several requirements on top of each service and the Kubernetes cluster itself.
- System namespace — Separate all system components in an isolated Kubernetes namespace responsible for the management of all the services.
- Namespace per team — Group all team resources in a Kubernetes namespace in order to automatically apply team-based configurations and constraints for each of them.
- Security isolation per namespace — Restrict cross namespace access in the Kubernetes cluster to prevent unexpected interactions between different team resources.
- Resource quota per namespace — Prevent affecting all teams when one of them reaches hardware limits, while measuring efficiency by calculating the ratio between spent money and delivered business value per team.
The ETL Framework was quite stable and had been running for years, but to fully benefit from our adoption of cloud-native technologies, we needed a new one that supported:
- Cloud deployment.
- Horizontal scaling. As the number of workflows and the amounts of data increased, we needed to be able to scale up with minimal effort.
- Multi-tenancy. Because the whole platform needed to support this.
- Deployment to Kubernetes. Again, for consistency across the whole platform.
Since we built our ETL framework, the expectations from ETL have moved on. We wanted the ability to support:
- Language agnostic jobs. In order to get the most out of the diverse skill set in all teams using the Data Platform.
- Workflow concept. The need to define a sequence of jobs depending on each other in a workflow is another key business requirement to make data-driven decisions on a daily basis.
- Code reusability. Since the functionality behind part of the steps in the workflows are repetitive, they are a good candidate for code reuse.
- Automated distributed backfilling for ETL jobs. Since this process occurs quite often for our new use cases and automation will increase business velocity.
- Monitoring. We need good monitoring, in order to prevent making data driven decisions based on low quality, high latency or even missing data.
- Extendability. The ability to extend the batch processing service with new capabilities based on feedback and requirements provided by the stakeholders will make this service flexible enough for the foreseeable future.
The other big change is that fully-featured ETL frameworks now exist, rather than having to be built from scratch.
Having all these requirements in mind, we evaluated different options on the market such as Luigi, Oozie, Azkaban, AWS Steps, Cadence and Apache Airflow.
The best fit for our requirements was Apache Airflow.
Great though it is, it still has some limitations — such as a single scheduler and lack of native multi-tenancy support. While the first one is not a huge concern for us at the moment based on the benchmarks, our estimated load and the expected release of this feature in Apache Airflow 2.0, the second one would impact our whole architecture, and so we decided to build custom multi-tenant support on top of Apache Airflow.
We considered using an Apache Airflow managed service — there are multiple providers — but in the end decided to continue with a self managed solution based on some of the requirements including multi-tenancy, language agnostic jobs and monitoring. All of them could not be achieved with a managed solution, leading to the extensibility requirement and its importance for us.
Once Apache Airflow had been integrated into our platform, we started by releasing new workflows on top of it, to ensure its capabilities. When we knew it met all criteria, the next step was obvious and currently we are in the process of migrating all of our existing ETL jobs to Apache Airflow. In addition to that, we have released it as a self service product to all stakeholders in the company and we already have consumers such as the BI Team, the Data Science team, and others.
Generation 5: 2020 It’s time for real time data
Generation 4 was a big step forward. However, there were still some targets for improvement.
Real time data
Our latency was still around 4 hours for significant parts of our data.
Most of these 4 hours of latency happened because of the deduplication procedure — which is quite important for our stakeholders and their needs. For example, the FT can not make any business development decisions based on low quality data. That’s why we must ensure that our Data Warehouse persists clean data for these use cases.
However, as the product, business and technologies evolve, new use cases have emerged. They could provide impact by using real time data even with a small percentage of low quality data. A great example for that is ordering a user’s feed in ft.com and the mobile application based on the reader’s interests. Having a couple of duplicated events would not be crucial for this use case as the user experience would always be much better than showing the same content to all users without having their interests in mind.
We already had a stable stream processing architecture but it was quite complicated. We started looking into optimising it by migrating from SNS, SQS, and Kinesis to a new architecture using Apache Kafka as an event store. Having a managed service for the event store would be our preference and we decided to give Amazon MSK a try as it seemed to have been stable for quite some time.
Ingesting data in Apache Kafka topics was a great starting point to provide real time data to the business. However, the stakeholders still didn’t have access to the data in the Apache Kafka cluster. So, our next goal was to create a stream processing platform that could allow them to deploy models on top of the real time data. We needed something that matched the rest of our architecture — supporting multi-tenancy, self service, multiple languages and deployable to Kubernetes.
Having those requirements in mind, Apache Spark seemed to fit very well for us, being the most used analytics engine and having one of the biggest open-source communities worldwide.
In order to deploy Apache Spark streaming jobs to Kubernetes, we decided to use the spark-on-k8s-operator.
Moreover, we have built a section in our Data UI which allows our stakeholders to deploy their Apache Spark stream processing jobs to production by filling a simple form containing information for the job such as the Docker image and tag, CPU and memory limitations, credentials for the data sources used in the job, etc.
Another area where we needed to make optimisations was moving the data validation to the earliest possible step in the pipeline. We had services validating the data coming into the Data Platform, however these validations were executed at different steps of the pipeline. This led to issues as the pipeline sometimes has broken because of incoming incorrect data. That’s why we wanted to improve this area by providing the following features:
- A Data contract for the event streams in the pipeline
- Moving the validation step to the earliest possible stage
- Adding compression to reduce event size
Having all these needs in mind, we found a great way to achieve these requirements by using Apache Avro. It allows defining a data contract per topic in Apache Kafka, hence ensuring the data quality in the cluster.
This approach also resolves another issue — the validation step can be moved to be the first step in the pipeline. Using an Apache Spark streaming job with Apache Avro schema prevents us from having broken data in the pipeline by moving all incorrect events to other Kafka topics used as Dead Letter Queues.
Another great feature coming with Apache Avro is serialisation and deserialisation, which makes it possible to provide compression over the data persisted in the Apache Kafka event store.
Migrating from CSV to parquet files in our data lake storage has been a great initial choice for most of our needs. However, we still lacked some features on top of it that could make our life much easier, including ACID transactions, schema enforcements and updating events in parquet files.
After analysing all existing alternatives on the market including Hudi, Iceberg and Delta Lake, we decided to start using Delta Lake based on its Apache Spark 3.x support. It provides all of the main requirements and fits perfectly in our architecture.
- Efficiency. We decoupled the computation process from the storage allowing our architecture to scale more efficiently.
- Low latency, high quality data. Using the upsert and schema enforcements features provided by Delta Lake, we can continuously deliver low latency and high quality data to all stakeholders in Financial Times.
- Multiple access points. Persisting all incoming data into Delta Lake allows the stakeholders to query low latency data through multiple systems including Apache Spark and Presto.
- Time travel. Delta Lake allows reprocessing data from a particular time in the past which automates back-populating data, in addition to allowing analysis between particular date intervals for different use cases such as reports or training machine learning models.
At the Financial Times we have different kinds of storage used by teams in the company, including Amazon Redshift, Google BigQuery, Amazon S3, Apache Kafka, VoltDB, etc. However, stakeholders often need to analyse data split across more than one data store in order to make data-driven decisions. In order to satisfy this need, they use Apache Airflow to move data between different data stores.
However, this approach is far from optimal. Using a batch processing approach adds additional latency to the data and, in some cases, making decisions with low latency data is crucial for a business use case. Moreover, deploying a batch processing job requires more technical background which may limit some of the stakeholders. Having these details in mind, we had some clear requirements about what the stakeholders would expect in order to deliver even more value to our readers — support for:
- Ad hoc queries over any storage
- ANSI SQL — syntax they often know well
- Being able to join data between different data storages
And we wanted the ability to deploy to Kubernetes, to fit into our platform architecture.
After analysing different options on the market, we decided to start with Presto as it allows companies to analyse petabytes of data at scale while being able to join data from many data sources, including all of the data sources used at the Financial Times.
Plan for the future
At the Financial Times we are never satisfied with our achievements and this is one of the reasons why this company has been on the top of this business for more than 130 years. That’s why we already have plans on how to evolve this architecture even more.
- Ingestion platform. We ingest data by using the three components — batch processing jobs managed by Apache Airflow, Apache Spark streaming jobs consuming data from Apache Kafka streams and REST services expecting incoming data to the Data Platform. We aim to replace the existing high latency ingestion services with Change Data Capture (CDC) which will enable ingesting new data immediately when it arrives in any data sources, hence the business will be able to deliver an even better experience for our readers.
- Real time data for everyone. One of the main features that we have in mind is enabling all people in Financial Times to have access to the data, without the need to have particular technical skills. In order to do that, we plan to enhance the Data UI and the stream processing platform to allow drag and drop for building streaming jobs. This would be a massive improvement because it will enable employees without a technical background to consume, transform, produce and analyse data.
If working on challenging Big Data tasks is interesting to you, consider applying for a role in our Data team in the office in Sofia, Bulgaria. We are waiting for you!