How to process big datasets of Points Of Interest using Kafka and Kubernetes
One of the main requirements of managing and working with a large dataset such as the one we have in Geoblink is to dispose of an efficient set of procedures to update the Points Of Interest (POIs) of many companies. When we’re talking about such complex processes, there are many potential problems hard to spot before the tools are put into operation: loss of collected information due to the connection missing, problems trying for the machines to successfully deal with the amount of data provided and expected to be processed, etc.
At Geoblink we have paid attention to these problems and looked for solutions that could result in a general improvement of our former kit. In the end, the differences were so many we decided to come up with a new one: the POI-Kollektor, a tool that enables a fast, efficient and safe way to integrate the POIs into the Geoblink database through processing the information in streams with Kafka and managing the execution of applications in containers using Docker and Kubernetes, hence making a more efficient use of our own computing power.
One major use of our application is the visualisation of millions of POIs. They can be stores, restaurants, charging stations for electric cars… any location that can result of interest to a company is susceptible to be defined as a POI.
Although we already had developed a solid developer kit that allowed us to effectively store our information there were several issues that weren’t covered well enough and reduced the quality of our data (existence of possible duplicates, unstable scripts that would provide different information from one day to another, hence needing to geolocate them again). We, at Geoblink, decided we definitely could and should create a tool to avoid problems like these.
So, after careful consideration and the quest of finding the most appropriate and modern technologies to build the successor of our developer kit, we put the development of the POI-Kollektor in motion. As interns in the POI Acquisition team at Geoblink, this article is what we have learnt about it and present you the advancements in the POI-Kollektor, our new service to absorb, normalise, geolocate and add POIs to the Geoblink app.
How are the POIs sent to the pipeline?
The POI-Kollektor consists of two major parts: an API (based on Finch, used by Twitter) and a Controller.
The first thing one would want to do with the Kollektor is to send POIs. The API receives POST requests that contain various info about the POIs.
Necessary information about the POIs, like the country where they are or their generic name, can be found in the metadata of the request. Then, in its body, there is a JSON with all the POIs to be added. Information such as name or address are required to be accepted by the POI-Kollektor. Once everything is validated, the POIs are transformed into new objects readable by Kafka in the next step of the process.
Apache Kafka is the distributed streaming platform that the POI Kollektor makes use of. It has four core APIs:
- Producer API: allows an application to publish a stream of records to one or more Kafka topics.
- Consumer API: allows an application to subscribe to one or more topics and process the stream of records produced to them.
- Streams API: allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
- Connector API: allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.
Some of the main features that Kafka enables the POI-Kollektor to do are the following:
- Processing the information coming in batches would imply the necessity to store it, then stop the data collection at some point and finally process the data. Instead, Kafka allows real time stream processing, which at Geoblink enables us to handle a constant flow of POIs data and perform the processing of the collected data while the not yet collected POIs are being retrieved. There are two important parts of the process:
- The Address Cleaner… cleans the address to match a defined format ;
- The Geocoder takes information from the POI like the address, zipcode and city to select the best possible location for it. It gives us coordinates to put the points in our application.
- Its resources consumption is therefore lower, as the processing of the data is spread over time, instead of processing a big amount of it like it would happen when the information comes in batches. It also has another implication: the fact that the data is collected in real time also means that the processing of it happens the moment it’s collected; otherwise, there would be a risk that, for example, the server loses the connection and all the data in the batch that was not yet processed would be lost and needed to be resent.
- It is a distributed system, which allows to divide clusters into different partitions and send them to different servers to process them individually through the use of map-filter-reduce. Being able to process the different partitions of data simultaneously in distributed servers allows the information processing to be faster and more efficient.
- One of its applications, Kafka Mirror Maker, allows the messages to be replicated across multiple datacenters or cloud regions, having the information available for backup and recovery.
To provide some context, here are some details about the infrastructure we used for the POI-Kollektor and how it runs.
Running applications and processes like the one we are describing here can make use of a considerable amount of your own resources. However, this can be reduced significantly through the use of several platforms.
One of them is Docker. Docker is an open-source platform used to run applications and their dependencies inside containers. Docker allows the computer to package applications in these containers, hence being portable to any system running Linux or Windows OS.
The other part of the job is handled by Kubernetes, an open-source platform created by Google that is used to orchestrate containers. This is a way to simulate an OS and then run applications, dependencies between applications and Linux environnements isolated from the system. Deploying the applications doesn’t interfere with the server, machine or system, hence making a smaller use of your own resources. Compared to Docker, its use of containers is not as simple but has more parametrization.
Kubernetes gives then the possibility to manage your applications or jobs deployed on various containers without having to care about the infrastructure layers needed to run on a machine. One can run them, coordinate their execution, manage all their life cycle and monitor these containers. These clusters of containers can be physical, virtual, or on private or public clouds. This is how Kubernetes orchestrates the streams of information:
The controller master is the server managing the nodes. Nodes are machines hosting the containers inside of which are PODs. PODs are execution environments containing one or more containers, and master will tell these nodes how to proceed depending on how much resources are available. This information is coming from the kubelet (the primary “node agent” that runs on each node), which informs master if resources are still present, if a node failed, etc. On this scheme, a docker machine would be at the same level than a POD.
Globally, the administrator will give tasks to master, that will interact by giving this information to the nodes. The most adequate node for this task will be automatically chosen by master. Master will then allocate necessary resources to the corresponding PODs through the kubelet of this node, that will continuously keep track of the containers status to let know master.
Of course there is some code behind all this : Scala has emerged as one of the most powerful alternatives to Java in recent times. It was developed by Martin Odersky and launched in 2003, and it’s an object-oriented programming language with functional programming features that are highly scalable. Its feature of ‘scalability’, as you might have imagined, is what the language is named after. It is designed to express common programming patterns in a more elegant, concise and type-safe manner. Scala is a pure object-oriented programming language (in the sense that every value is an object) which provides the features of functional languages (in the sense that every function is a value), also counting with many useful libraries to make the processes lighter in its resources consumption, such as Cats Effect. Other of its advantages is that it works with Java Virtual Machines (JVMs), which allows you to work seamlessly with native Java libraries.
Now, what are the next steps for the POI-Kollektor? We can think about connecting more open databases to be even more precise with the locations of the POIs or link the POI-Kollektor to other services we have in our infrastructure to automate some processes that could take a lot of time, for example. Nevertheless, this has already been a huge improvement in the way we manage the POIs we have and let’s hope it stays on this path!
Antoine Fernandes & Pablo Vizán