How to produce and consume Kafka data streams directly via Cypher with Streams Procedures

Leveraging Neo4j Streams — Part 3

Andrea Santurbano
May 9 · 3 min read

What is a Neo4j Stored Procedure?

What are the streams procedures?

Set-Up the Environment

$ docker-compose up


CALL streams.publish('my-topic', 'Hello World from Neo4j!')
{"payload": "Hello world from Neo4j!"}
The streams.publish procedure in action


CALL streams.consume('my-topic', {<config>}) YIELD event RETURN event
{"name": "Andrea", "surname": "Santurbano"}
CALL streams.consume('my-topic', {<config>}) YIELD event
CREATE (p:Person{firstName:, lastName:})
The stream.consume procedure in action

This is no longer updated. Go to instead

Thanks to Michael Hunger.

Andrea Santurbano

Written by

Data Lover @AgileLARUS

This is no longer updated. Go to instead