Migrate Geometry and JsonB data from Postgres to ElasticSearch from logstash JDBC Input plugin

Eresh Gorantla
Geek Culture
Published in
5 min readJun 21, 2021

--

This story explains how to migrate data in Postgres table to elastic search, includes geometry and JSONB data using logstash. Recently I started using Elastic search for our application use case to avoid cross-data joins from microservices.

Logstash helps us in seeding data from different sources to Elastic source indexes. Here, our source is the Postgres DB instance. We have a wide range of use cases one amongst them is migrating geometry and JSONB data to Elastic Search indexes.

I have to spend few hours to crack this, being new to logstash. Spent hours to understand JDBC input plugin and other filters in logstash. I have not got enough support from google on migrating special types (geometry and JSONB). So I thought of sharing my expirence.

What is logstash?

Logstash dynamically ingests, transforms, and ships your data regardless of format or complexity. Derive structure from unstructured data with grok, decipher geo coordinates from IP addresses, anonymize or exclude sensitive fields, and ease overall processing.

As data travels from source to store, Logstash filters parse each event, identify named fields to build structure, and transform them to converge on a common format for more powerful analysis and business value.

Logstash dynamically transforms and prepares your data regardless of format or complexity:

  • Derive structure from unstructured data with grok
  • Decipher geo coordinates from IP addresses
  • Anonymize PII data, exclude sensitive fields completely
  • Ease overall processing, independent of the data source, format, or schema.

This is the use case.

How to install locally?

To have it locally, I created a docker-compose file for the ELK stack (Elastic search and Kibana). Later installed logstash locally.

Docker-compose file for ES and Kibana for UI.

A total of three nodes were considered for ES.

I have used this link to install and enable logstash in my local.

Migrate data from Postgres to Elastic Search

For this, we have to use logstash. Before that let us examine the database.

I have created a table by name users with a Geometry and JSONB data type in it.

CREATE TABLE public.users (
user_id uuid NOT NULL DEFAULT uuid_generate_v4(),
first_name text NOT NULL,
last_name text NULL,
date_of_birth date NULL,
city text NULL,
country text NOT NULL,
location geometry NOT NULL,
created_at timestamp NOT NULL DEFAULT now(),
updated_at timestamp NULL,
additional_data jsonb NOT NULL DEFAULT '{}'::jsonb,
CONSTRAINT users_pkey PRIMARY KEY (user_id)
);

I had inserted some test data around 500 records, just to test the logstash and ES.

Logstash, a data pipeline typically contains three parts: input, filter, and output. For our use case

Input → RDBMS (JDBC Plugin)

Filter → We are using Ruby and also mutate filter.

Output → Elastic search

JDBC Input plugin

Typically the usage of this plugin is it connects JDBC instance and executes a query and gets the data passes to later stages ie., filter and output.

You can find all the options of the JDBC plugin here.

The configuration of logstash is conf file.

input {
jdbc {
jdbc_driver_library => "<Path to>/postgresql-42.2.19.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://localhost:5434/<schema>"
jdbc_user => <username>
jdbc_password => <password>
jdbc_paging_enabled => true
#schedule => "* * * * * *"
statement => "select user_id as userId, first_name as firstName, last_name as lastName, date_of_birth as dateOfBirth, city as city, country as country, st_asgeojson(location) as location, created_at as createdAt, updated_at as updatedAt, additional_data::text as additionalData from users"
use_column_value => true
tracking_column => "userId"
}
}

tracking_column is the primary key for me.

First, let us assume there are no special fields like Geometry and JSONB, Only has text, number, boolean, and date fields. In this case, there is no need for any filters. You can directly have an output plugin.

output {
stdout {
codec => json_lines
}
elasticsearch {
hosts => ["localhost:9200"]
index => "users_test"
document_id => "%{userId}"
}
}

However, if your table has geometry, JSONB, or any other special type of data type, without any filters you will get the below exception.

JDBC Plugin — Missing Converter handling for full class name=org.postgresql.util.PGobject, simple name=PGobject

If you encounter this exception, then you have to get data as text and parse it as per your requirements.

How to solve this issue for Geometry and JSONB.

The Geometry in PostGIS is equivalent to geo_point or geo_shape. I am going to choose geo_point because I have taken POINT in PostGIS.

The geo_point can be stored in 5 different ways.
Please see this link

As an Object

{
"point": {
"lat": 41.12,
"lon": -71.34
}
}

As String

{
"point": "41.12,-71.34"
}

As GEOHASH

{
"point": "drm3btev3e86"
}

As an Array

{
"point": [ -71.34, 41.12 ]
}

As WKT POINT primitive

{
"point" : "POINT (-71.34 41.12)"
}

I am going to explain in options 4 and 5.

How to store JSONB in ES

JSON Data can be stored as an Object or Flattenned type.

Apply Filter for GEO POINT

There are many ways to apply a filter, but using a ruby filter we can

filter {
ruby {
code => "
require 'json'
begin
point_json = JSON.parse(event.get('location'))
event.set('lon', point_json['coordinates'][0])
event.set('lat', point_json['coordinates'][1])
rescue Exception => e
event.tag('invalide boundaries json')
end
"
}
}

Apply Filter for JSON Object

Using the Ruby JSON function the data is parsed as JSON Object.

filter {
ruby {
code => "
require 'json'
begin
data_json = JSON.parse(event.get('additionaldata').to_s || {})
event.set('data', data_json)
rescue Exception => e
event.tag('invalide boundaries json')
end
"
}
}

We need another filter to convert and replace the field names (usually in RDBMS the column names are separated by “_”. This will be replaced with camelCase convention).

filter {
mutate {
rename => {
"data" => "additionalData"
}

replace => {
"location" => "POINT(%{lon} %{lat})"
}

rename => {
"userid" => "userId"
}
rename => {
"firstname" => "firstName"
}
rename => {
"lastname" => "lastName"
}
rename => {
"dateofbirth" => "dateOfBirth"
}
rename => {
"createdat" => "createdAt"
}
rename => {
"updatedat" => "updatedAt"
}
remove_field => ["lat", "lon", "@version", "@timestamp", "additionaldata", "data"]
}
}

Create mapping for the index as per the RDBMS

If you don’t create the mapping, logstash will create an index and along with mapping as per the first record datatype. There may be few deviations if the mapping is created automatically. So It is always recommended to create a mapping.

The Complete logstash config file

Here is the log file. I have truncated data logs in the log file.

--

--

Eresh Gorantla
Geek Culture

Experience in Open source stack, microservices, event-driven, analytics. Loves Cricket, cooking, movies and travelling.