Enabling Neo4j with AWS Managed Streaming for Kafka (MSK)

Introduction

Owen Robertson
18 min readApr 14, 2023

Adopting and deploying new technology infrastructure into an enterprise can range from mildly difficult to extremely complex. Issues can range from the technical, operational, expertise, costs, and the list goes on. While these things are nothing new, the pressure on enterprises to adopt new and advanced analytics is only increasing. Understanding how to manage innovation and not disrupt existing services or increasing operational overhead requires a subtle approach in the face of so many conflicting priorities and pace of new technology.

When looking at these large enterprises, unsurprisingly there is typically a wealth of pre-existing high value digital assets and analytics, and with the advent of microservices, enterprises have knit together impressive feature rich applications. However, this flexibility on the application layer is often burdened by a latency in data flow and the resulting inconsistencies between detailed and aggregate analytics. In this way microservices have leapfrogged the service levels of these older batch oriented data pipeline models, creating a gap or at least an obstacle for deploying new innovative data first applications.

Consider a common customer journey use-case, where there is a need to respond to activity occurring in an order management (ERP), website and contact center application. Individually these applications can generate events or actions but lack a consolidated view of the customer interaction and may require adjudication across multiple and potentially conflicting interaction signals, all of this with sub-second service levels.

Now consider a Neo4j graph and AWS/MSK streaming pattern where real-time heterogeneous application and data integration is the foundational design principal. Looking at this same use case you’ll see how using AWS/MSK can integrate these disparate data sources and once integrated how a single Neo4j query can traverse these events. This also reflects a de-centralization of the data integration flows at the departmental level where the real-time interpretation of these signals is best understood. In addition, these Knowledge Graphs and analytical stores lay the groundwork for supporting Neo4j Graph Data Science and Graph Neural Network (e.g. DGL) projects, and can examine the cross-domain connectedness of your data and the latent properties that exist in these relationships .

Highlighted features of this solution pattern include:

  • Low-latency data integration across multiple data sources.
  • Graph queries seamlessly traversing multiple sets of domain data via Neo4j Cypher
  • AWS Kafka as a managed service (i.e. MSK) lowering DevOps and human capital costs.
  • In-database Neo4j algorithms and Graph Data Science features
  • AWS ecosystem integration features (e.g. CloudWatch/Alarms, Kinesis,Lambda) enabling seamless connectivity with existing systems.

From a product maturity stand-point it’s worth noting that Neo4j and graph analytics have been around for some time (2010), and for that matter so has Apache Kafka(2011.) However for this example the enabling technology comes from the debezium and Neo4j Connect libraries, which now have a history of robust production deployments. These connectors can now look for updates in source and target databases ~50 times every second, a concept completely foreign to legacy ETL batch processes. It’s also worth noting that while Kafka remains the most robust streaming platform available there is no shortage of new data integration and pipeline tools taking advantage of these real-time data streaming libraries.

Another conceptual shift in this pattern is the low-code aspect to these implementations; where environments can rapidly be configured and deployed, allowing teams to quickly iterate on functionality. Recalling our focus here is to deploy a low-latency and low-friction solution to enable graph analytics. What follows are the details of the solution pattern in the enablement of Neo4j using AWS MSK, that is reliable, performant and analytically insightful. Expect to see more of these deployments, they should be around for a while.

*For those unfamiliar or looking for a detailed description of Kafka I’d recommend a review of the below, also included is a recent article on Neo4j’s new MSK Connect for Neo4j Source processing.

High-Level MSK Enabled Neo4j Graph Solution

The below components diagram shows the high-level flow of the CDC replication model while also highlighting the existence of a service layer that exposes the need for consistency across databases. In this case MySQL will support the read/write services and Neo4j will focus on new analytic queries, leaving AWS/MSK responsible for keeping the two data stores synchronized.

Change-Data-Capture (CDC) Flow

The flow for the above environment is pretty straight-forward and begins with a transactional update to the relational data store, in this case MySQL. This change is identified by the MSK Connect Source Worker (i.e. debezium) which is continually polling for updates in the transaction log and then publishes the database change to the identified Kafka Topic. These CDC events can be simple reflections of the source RDB tables or be transformed through Data Streaming or scripting to fit your requirements. Once published to the AWS MSK, the Brokers are responsible for the management functions such as partitioning, replication and offset tracking. This publication processing acts independently of the subscription processing and can integrate data from many different applications and may post to multiple or common topics.

On the subscription or consumption side of the implementation, the MSK Connect Sink Workers are continuing to poll topics for new events. When changes are found, a batch payload is constructed for the given topic and paired with the Neo4j Cypher topic logic for processing. This Cypher query is submitted to the Neo4j instance for execution using the Worker provided execution tasks that are pre-allocated and run in parallel. For both the publish and subscribe portions of processing, the Workers are polling at intervals and can receive 1000’s of events in a single batch, as such tuning these intervals and batch sizes are the primary configuration option for balancing latency and throughput for your application.

While application specific, you will likely discovery something of an impedance mismatch between relational records and graph structure. This is probably best characterized by the parent-child relationship in RDB, where child records lack the parents attribution. When joined this information can be especially useful in the data stream as a child events might be filtered or re-routed based upon a parent records property value. While this doesn’t affect traditional data loading, where source data extract logic can be applied, it is an issue for the raw CDC streaming model. Options for solving this can be pushed to the source database with features such as Materialized Views or outbox patterns (see here.) Or alternatively the Kafka Streams component, ksqlDB can be used to flatten out and denormalized the parent-child relationship to better interpret, filter or aggregate your topics on the fly. I’ll simply note that ksqlDB can be a source of tremendous functionality and will be explored in an followup post.

Key processing above are:

  • Source Worker Polls the MySQL binlog for changes based upon an offset
  • Source Worker transforms message to topic event
  • Brokers track and manage incoming topics (replica,partitions,offsets)
  • (Optional) Using ksqlDB to join parent / child table streams to create,route, aggregate and filter topics.
  • Sink worker subscribe to topics and polls for new things.
  • Worker tasks execute Neo4j specific cypher in parallel to update the graph

Component Considerations

Here we will discuss some more of the technical details and considerations for these individual Neo4j and AWS MSK components (i.e. fair warning for the tech-warry.) Also included are references to the accompanying github repository which provides example scripting for the configuration. If interested the full repository can be found here.

MySQL

Running as an AWS/RDS managed service or stand-alone, the MySQL instance represents our source data for this CDC replication example. It’s worth noting these types of operational databases often represent the workhorse in enterprise environments supporting critical API’s and services. This functionality can often be decades old and the prospect of unwinding this functionality can be a huge effort and as such necessitates a more finessed approached to liberating this data.

Another common situation on these source data stores is that of incomplete or dirty data; and while Data Quality is a critical and often enterprise wide project, our graph solution is generally interested in just a subset of the overall attribution. Regardless, graph data discovery activities will need to expose any gaps in key attribute defining properties or relationships, e.g. bad values for zip codes, employees with an invalid department, or more generally any data without referential or columnar constraints. It should go without saying it is worth focusing on these issues upfront as it will make your validation and auditing efforts notably more manageable.

Some additional considerations are to keep an eye out for large bulk processing workloads into your source database. Large file loads of millions of records into a table of concern will generate a corresponding amount of Kafka events, and while not overwhelming, testing these larger batch workloads will be important. Additionally changes in database schema will require impact analysis on any tables/columns of interest, but otherwise DDL changes should not affect existing CDC processing.

Additional MySQL Configuration details on github can be found here.

MSK Connect (Source)

MSK Connect represents the AWS implementation of Kafka Connect and executes in MSK environment between the source or target with the MSK Brokers. For our example the MSK Connect debezium Connector will interact between MySQL and the MSK Brokers. There are three primary components to a MSK Connect configuration which for our implementation looks as follows:

  • The Plugin that contains the debezium and additional libraries.
  • The MSK Worker which defines the runtime environment and parameters.
  • The MSK Connector defining the overall operating environment, including which brokers, security, encryption, scalability and logging configurations to use.

Examples of these configuration are present in the repository and have a number of parameters and configurations choices, some will be environment specific while others can be defaulted until you can establish your specific workload.

Here are some additional features and configuration considerations:

  • debezium production CDC support for Oracle, MySQL, DB2, PostgresQL, SQL Server, MongoDB
  • Off the shelf Plugin extensions available for Content-Based-Routing, filtering, Secrets Manager, AWS Glue Schema Registry, et al.
  • Configurable batch and polling parameters that control the balance between latency and throughput.
  • Error Processing and Logging tightly integrated with CloudWatch and S3

As evidence of the adaptation of this technology there over 200 Kafka Connect plugins on the market as of March 2023, connecting to a variety of applications and data stores, here is just a sample: Elasticsearch, Google BigQuery, PubSub, Snowflake, Splunk, Couchbase, AWS S3, SAP, SalesForce, ServiceNow, et al.

For more details on debezium see their web site, gitter for IM support, or for our example plugins on github go here.

MSK Workers

Worker configurations are defined within MSK and are instantiated when an MSK Connect configuration is started. As a result Worker Configuration are a prerequisite for MSK Connect and contain parameters for both producers and consumers. This is also where AWS Secrets Manager and other Config Providers for secure management can be configured (see AWS doc here.)

For source connectors (e.g. reading MySQL) there can be only one Worker with only one task reading for new events. This is due to the offset/polling model which requires serialization to avoid overlapping events. Even so this polling model is very high-performing intervals in the milliseconds, enabling dozens of calls every second collecting batches of changes. This is one of the keys to MSK’s very low-latency and high-throughput .

For the Sink Connectors (e.g. writing to Neo4j) the Workers are configured for multiple Worker JVM’s and multiple tasks, that map incoming topic events to there associated Neo4j transaction logic. As noted these events are batched and handed off to the Cypher transaction at varying sizes, which while configurable, will directly affect commit times and the exposure to locking issues.

Another method for achieving high levels of throughput and low-latency is through the use of topic partitioning. This approach allows the Sink Connectors to run in Kafka consumer groups that divide up work by partition. As expected this increases the number of concurrently running tasks across the system and one of the main reasons for Kafka’s high-level of scalability. However, using multiple partitions can no longer guarantee transaction sequence across all topics partitions, and while in some cases this acceptable, in some graph models this can be problematic causing failed transaction or even worse a no-op transaction that is undetectable. This is highly dependant on your target graph model and as such multi-partitioned topics must be considered and tested carefully.

MSK Worker Configuration details on github (here.)

AWS/MSK Cluster (Serverless)

Configuring the AWS MSK Cluster in either an existing or new environment is quite straight-forward thanks to the managed service provisioning. Understanding that existing environments will need to take note of the regions, VPC’s, subnets, IAM Roles and security groups and may take a moment, (along with the 10+ minute wait time.). Regardless production configurations will typically want a 3 node cluster that maps to a replication factor of 3 with, across the AZ’s running 2 Kafka Brokers per node.

Our project’s github repository provides AWS CLI and configuration examples for standing up a new MSK environment but ideally should be migrated to Terraform or other Infrastructure-as-Code (IaC) tool. Also the source and target data stores may be pre-existing therefore appropriate security group rules for passing traffic through to these endpoints.

MSK Broker Configuration details on github (here.)

MSK Connect (Sink)

The Neo4j’s MSK Connect plugin library provides the methods necessary for extracting events from the Kafka Topics to templated Cypher queries responsible for populating the graph. Similar to the Source MSK Worker the Sink Worker polls the MSK Cluster for changes based upon offsets but then must transform the event into a specific graph transaction. While these individual events come in the form of batch payloads containing 100’s or 1000’s of events, which are routed to a specific templated version of cypher to parameterize and process the event. Understanding and adjusting these batch sizes provides the most direct effect on latency and throughput and should be tested accordingly. It’s also important to consider the structural transformation between a relational record and the associated graph structure, and appreciate that even though the Cypher transaction is performed in batches the effective workload of the node and relationship merge functionality a simple insert into relational table (see below)

Below is a simple example for the relational to graph translation.

Since the Sink Connect and Cypher transactions represent a good portion of the processing workload (and performance budget,) it’s important to mention some key points of scalability of this component. The first and most effective is the use of topic partitions, this will in effect establish multiple logs for the same topic of data and allow the Kafka Sink processes to scale horizontally when updating the graph. While this is highly effective for some data, it is not uncommon that the executing larger batches of 100’s or 1000’s of records can incur dead-locking across competing threads, (consider updating relationships between customers and there home state, this could easily have multiple tasks competing for locks on Colorado for example. As a result there is value in considering the transaction update scenarios up front during your graph data modeling process.

Sink Kafka Connect for Neo4j details on github (here.)

Neo4j Cluster

While a Neo4j Cluster is not required for a AWS MSK implementation one of Kafka’s main enterprise selling points is its fault-tolerance, so we’ll premise our Neo4j configuration as a 3 node Cluster

A basic Neo4j Cluster is typically configured with three nodes which you may do manually or via AuroDB. In the latter case AuroDB takes care of all the configuration and management and simply allows your application to connect and essentially you’re ready for processing. However if you have custom requirements and need to manually configuring your cluster across EC2 instances (our environment was prior to AuroDB Cluster GA) here are some details on connectivity.

When setting up your Neo4j Cluster that will receive updates from any type of Kafka implementation it is a good idea to use the single DNS record model along with Server Side Routing to ensure that the Kafka connections can automatically be shifted if a node fails or if there is otherwise a change in schema/database leadership. The DNS record approach simply allows the network to route requests to a node that is up and taking network requests, in 1 of the 3 nodes is down the DNS routing might occur where one node is down the connection request will route to whichever nodes are up and running. For our Kafka example our transactions are all associated with write functionality, we these transactions will need to run on the respective leader for the schema being updated. When a transaction is received on a non-leader node Server Side Routing will identify this as a write transaction and forward to the elected leader and the write transaction will be performed.

Write operations are routed to this leader node (see Neo4j Server Side Routing.) As a result the leader node will ensure a consistent view of data even under heavy workloads. The followers in this model are made consistent under the Raft protocol which reads the committed transaction logs on the leader node and ships then to the subscribed follower nodes.

While clustering is not a requirement for this solution there are some valuable features when implementing Kafka across a Neo4j Cluster. One obvious one is that both Kafka and Neo4j Clusters are configured for fault-tolerance such that failure of a given node be it Neo4j or Kafka, operations will continue. The Kafka Connect model also uses a configurable retry connection model so that if there is a failure on Kafka’s current it will re-establish connectivity and re-try the batch payload.

In Neo4j; the cluster will elect a single leader/host (i.e. AWS compute instance) for processing write-requests associated with a given schema. When the Kafka Connect Sink Worker starts it will connect to this host via server side routing, and ensure all write-transactions are processed on the leader first then replicated via the Raft protocol to any follower’s within the cluster (see Neo4j Clustering .) If this host incurs downtime, the cluster will re-elect a leader and at which time write transactions are suspended until the new leader is elected and MSK Connect establishes a write connection. While very fault tolerant this condition will produce additional latency (or lag as it’s known within Kafka) on transactions executed during this period. If your application has no tolerance for latency under these fault conditions there are additional methods for locking resources that have not been fully replicated. This is an extended discussion will be left for another post.

A Word on Performance

While the above discussion is based upon production implementations where CDC replication SLA’s were less than one second across a workload that peaked at ~10k record/sec, we can all appreciate every application will be different. But generally speaking, supporting replication from an OLTP database system doing 100’s to several 1000’s of record transactions a second, the described Kafka CDC solution is more than capable of supporting it.

Regardless the need to run basic performance test are highly advised and not overly complicated. One place to start is to put-together a profile test that looks at a given table/topic and measures there movement across the streaming pipeline. Consider the below performance budget along with the below narrative.

Stepwise processing from MySQL through Neo4j update.

  1. A transaction is committed into the source data store; for a parent-child relationship with 1000 detail records with a commit time of 30ms. This timestamp is provided as the source.ts_ms property in the below sample event.
  2. The source MSK Connect (debezium) then polls the the MySQL binlog every 50ms for new records (i.e. poll.interval.ms) and will collect in batch any transactions of interest up to a given batch size. Receipt of the message is represented by the event.ts_ms timestamp.
  3. The topic is then formatted and routed to the Kafka Brokers for processing where the event is added to the topic log; when the write is confirmed MSK Connect source updates it’s offset consumer and moves onto the next batch.
  4. Independent of source processing the MSK Connect Sink connector is polling for updates to relevant topics, (i.e. poll.interval.ms.) and will similar to the Source connector return relevant records in batch to the Cypher transactions.
  5. The Cypher transactions will then submit there transaction (cypher and payload) to the existing session for processing. Ensuring that query timing is enabled (db.logs.query.parameter_logging_enabled) the query logs will contain the submit time and execution time and optionally payload and it’s size)

Updating the Neo4j node properties with these timestamps collected from the source record in MySQL, the MSK Connect timestamp and a timestamp on the updated node in Neo4j will provide very detailed profile and transaction variability data to construct a performance budget.

The below is a sample debezium event message showing the set of timestamps for establishing the front-end of the performance budget.

{
"before": {
"id": 3796,
"ue_id": 419,
"ue_name": "test-orig",
"ue_count": 430,
"cdc_modified_ts": 1673990000000,
"cdc_status": "active"
},
"after": {
"id": 3796,
"ue_id": 419,
"ue_name": "test",
"ue_count": 431,
// cdc-modified-ts Timestamp in MySQL needs more precision as it is truncating
// milliseconds this should be close to the source:ts_ms value
"cdc_modified_ts": 1673990008000,
"cdc_status": "pending"
},
"source": {
"version": "1.8.1.Final",
"connector": "mysql",
"name": "mskdev_cdc",
"ts_ms": 1673990008603, // Database Commit Timestamp
"snapshot": "false",
"db": "mskdev",
"table": "cdc_msk_test",
"server_id": 1691652969,
"file": "mysql-bin-changelog.000911",
"pos": 1847,
"row": 0
},
"op": "c",
"ts_ms": 1673990008688 // Kafka Connect Ingestion Timestamp
}

Kafka Lag

One of the key metrics in Kafka Monitoring is ‘Lag’ which represents the delta between the consumer’s last committed offset and the producer’s end offset. However this does not take into consideration the Cypher transaction processing time which in our example is taking approximately 200ms. Note that while 200ms represents the dominant portion of the performance budget the payload is a batch of transactions that as mentioned can by 1000’s of records.

Developing this sort of performance budget is relatively straight-forward as the debezium messages have timestamps from MySQL along with the Kafka Workers and when combined with property timestamps in MySQL and Neo4j offer the basis for tuning your processing pipeline.

Selecting AWS MSK

At this point it’s worth highlighting some of the selection process in evaluating an initial Kafka solution and for discussion purposes use the premise of a pre-existing and mature AWS ecosystem. To be fair, for companies that have had years of experience fine tuning their AWS environments and are implementing a departmental or LOB piece of instructure, the DevOps features will weigh heavily. In this case MSK will have the advantage due to its familiar user-experience around configuring and managing the environments . Considering elements such credentialing (IAM/Secrets Manager, Certificates), observability (CloudWatch), Scalability and network (VPC,subnets, security groups) where any real and/or perceived friction is enough to sway a first time implementation choice.

Initially AWS was slow to implement the latest Kafka versions but MSK has subsequently quickly caught up and is rapidly implementing integration features with AWS Kinesis, AWS Lambda and ML Pipelines to name a few.

Alternatively a key differentiator in the Confluent solution is ksqlDB which is owned and maintained by Confluent Inc and licensed under the Confluent Community License. While this licensing doesn’t preclude an MSK deployment, it does preclude it as a product offering which will constrain its deployment efficiencies. This is unfortunate for MSK, as ksqlDB has tremendous features in enabling Modern Data Flow and establishing a semantic layer early on in the data lifecycle. That said this lag in functionality is likely just that, and one would assume this is somewhere on the MSK product roadmap.

Reference Architecture

Finally here’s a more detailed reference architecture, highlighting some of the mentioned features available to the infrastructure. As was briefly mentioned the below environment is extended with an AWS Serverless Lambda function that is responsible for confirming CDC replication, AWS, CloudWatch which monitors the Kafka Brokers and Workers for faults as well as resource consumption and load. Any of these metrics may be used to set up Alarms to notify on critical issues, however there is some latency in CloudWatch reporting if you require sub-second visibility.

Summary

The goal of this article was to make the case for enabling Neo4j and other analytic database environments by deploying a highly reliable CDC data streaming solution. The article was based upon multiple implementation of a large scale Knowledge Graphs with 100’s of millions of nodes and billions of relationships supported by an incremental AWS/MSK CDC layer. The environment executed low-latency replications of 1000’s of records per second, enabling the operational and analytical environments to remain consistent even under heavy loads. Plans for a follow-up article detailing the performance budgets and tuning of the environment will be forthcoming along with updates to the github environment.

The full github for this project can be found here.

Also thanks to the folks at Neo4j who’ve built a great product and all the support. Thanks again for reading and check-out www.dts5280.com if your interested working with us or checking out our latest projects (at our github a.k.a. TheGraphFarm)

Some additional References for Configuring and working with AWS Managed Streaming for Kafka (MSK)

--

--

Owen Robertson

Long time parallel computing and analytics pro, focussed on graph, data pipelines, data science and connected data (also a dog person.)