Getting started with Druid(Imply)

linda H
6 min readMay 5, 2019

--

We have been recently evaluating OLAP system that support sub-sec query on a reasonable data size. This post will cover one option (Druid) we were trying, what we did, and what we learned.

Another option we evaluated is Citus, I will probably write another one cover it.

Druid as a high performance analytics database solution, has been around for a while, there are many good articles about it, like

  • this one talks about how airbnb use it.
  • this one cover the challanges metamarkers run into using Druid.
  • this one does great job explaning core concept in Druid(roll up, schema design, architecture graph & what each component is doing etc). If you don’t have time to read original Druid doc, I highly recommend reading this one instead.

Our experience is a little different from those above, in the sense that we didn’t set up Druid cluster ourselves. There is a 3rd party supplier called Imply, who build Druid cluster in S3. You might argue having another company managing Druid is not 100% a good thing. True, but for us, this is a perfect situation for 2 reasons:

  • We don’t want to be the one setting it up, because we don’t want the over-head of maintaining, 24 * 7 oncall (seriouly, who like oncall)
  • The cluster is setup for a different team in a different department before we even start look into using it. The contract they have with Imply is pretty good. So yes, it’s free. (Nothing is really free, but cost center wise, it appears to be).

Ready ? Let’s get started.

To simplify things, say this is the use case:

We have 2 events coming in, let’s call them EventA : user get some assignment (like see a green button VS red button), EventB: user did something (like click the button). We would like to associate these 2 events, that is, join them via some assignmentID (which is unique every time we give out assignment), the result is a table like below:

+---------------------------------------------------------------+
|AssignmentID | AssignmentValue| EventName | UserId | Timestamp |
+---------------------------------------------------------------+
| a00001 | Button:Red | Click | u000001 | 1557017456613 |
| a00002 | Button:Red | Null | u000002 | 1557017456622 |
| a00003 | Button:Blue | Null | u000003 | 1557017456655 |
| a00004 | Button:Red | Click | u000004 | 1557017456699 |
+---------------------------------------------------------------+

Then we can start answer questions like, what assignment values(s) — in real life, user will get a set of values from different sources — give us the higher rate of desire outcome (user click the button) ?

In real life, the time range of our query will be tipically within 5 days. The amount of events we ingest every day would be 2–5 millions each event.

Ingestion.

To make a somewhat fair comparation with the Citus, we create a single csv file, 1,000,000 events/rows per day, 6 days data, upload the file into S3 bucket.

Why S3 bucket ? — Because … Imply Druid is on AWS.

Then we create a ingestion spec. 8 dimensions, 1 metric.

//ingestion_spec.json
{
“type”: “index”,
“spec”: {
“dataSchema”: {
“dataSource”: “my_datasource”,
“parser”: {
“type”: “string”,
“parseSpec”: {
“format”: “csv”,
“columns”: [“project_id”, “assignment_id”, “user_id”, “ts”, “...”, “inputs”, “...”, “...”, “...”, “name”, “...”],
“dimensionsSpec”: {
“dimensions”: [
“user_id”,
“assignment_id”,
“...”,
“...”,
“...”,
“name”,
“...”,
“inputs”
]
},
“timestampSpec”: {
“column”: “ts”,
“format”: “auto”
}
}
},
“metricsSpec”: [
{
“type”: “count”,
“name”: “count”
}
],
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “DAY”,
“queryGranularity”: {
“type”: “none”
},
“rollup”: false,
“intervals” : [ “2019–01–01/2019–01–07” ]
}
},
“ioConfig”: {
“type”: “index”,
“firehose”: {
“type”: “static-s3”,
“uris”: [
“s3://path/to/our.csv”
]
},
“appendToExisting”: false
},
“tuningConfig”: {
“forceExtendableShardSpecs”: false,
“type”: “index”
}
}
}

If the column names above confuse you, here is the map:

  • inputs → AssignmentValue
  • name → EventName.

We could ingest data via Imply’s UI, but hitting the api is closer to what we would do in production.

curl -H "Content-Type: application/json" -X POST -d @ingestion_spec.json -v "our_druid_server:port/druid/indexer/v1/task"

Above will create a job, return a job ID, we can query its status like below:

curl -X 'GET' -v "our_druid_server:port/druid/indexer/v1/task/{job_id}/status"

It will come back with how long does it take to ingest the data.

{
"task":{job_id},
"status":{
...
"type":"index",
"statusCode":"SUCCESS",
"status":"SUCCESS",
"duration":445274
}
}

Take a note of duration above : 445274 (ms).

Query

Druid/Imply come with a very nice UI. Once your dataset is in, simply go to their SQL tab.

SELECT COUNT(DISTINCT user_id) FROM "my_datasource"
WHERE (.... AND inputs='{"button":"blue"}')

The query come back in 0.85 sec (2,965,193 distinct counts). Which is pretty good.

Notice the distinct key word on user_id ? If we only do COUNT, it would be much faster, but distinct is good way to test the performance of query, especially on a column with high cardinality.

Limits.

High cardinality columns

High cardinality columns is a not good for performance (see the explanation in the schema design doc). Short story is, treate certain column as “dimension” will speed up query, but if we have a column with too many distinct values, treating it as dimension might slow things down.

Do we have high cardinality column(s)? — Yes, user_id, assignment_id.

To mitigate this, Druid offer something called . . Sketch mode.

Using sketch mode will provide an approximate aggregation on certain metrics. Use it in the ingestion spec:

//ingestion2_spec.json
{
"type": "index",
"spec": {
"dataSchema": {
"dataSource": "my_datasource",
"parser": {
"type": "string",
"parseSpec": {
"format": "csv",
"columns": ["project_id", "assignment_id", "user_id", "ts", "...", "inputs", "session_id", "...", "...", "name", "..."],
"dimensionsSpec": {
"dimensions": [
"...",
"project_id",
"...",
"...",
"name",
"...",
"inputs"
]
},
"timestampSpec": {
"column": "ts",
"format": "auto"
}
}
},
"metricsSpec": [
{ "type": "thetaSketch", "name": "user_id_sketch", "fieldName": "user_id" },
{ "type": "thetaSketch", "name": "assignment_id_sketch", "fieldName": "assignment_id" }
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": {
"type": "none"
},
"rollup": false,
"intervals" : [ "2019-04-01/2019-04-07" ]
}
},
"ioConfig": {
"type": "index",
"firehose": {
"type": "static-s3",
"uris": [
"s3://….csv"
]
},
"appendToExisting": false
},
"tuningConfig": {
"forceExtendableShardSpecs": false,
"type": "index"
}
}
}

Notice we no longer has user_id and assignment_id in the dimensions, instead, we create metric with sketch for them.

Ingest the same csv file into Druid, remember it took 445274ms to ingest last time, how long does it take this time ?

{
"task":{job_id},
"status":{
...
"type":"index",
"statusCode":"SUCCESS",
"status":"SUCCESS",
"duration":261299
}
}

About 40% less

When it come to query, we can no longer do the COUNT on user_id like before, as it’s no longer a dimension.

However, we can still query it as aggregation.

{
"queryType": "groupBy",
"dataSource": "my_datasource",
"granularity": "ALL",
"dimensions": [],
"aggregations": [
{ "type": "thetaSketch", "name": "unique_users", "fieldName": "user_id_sketch" }
],
"intervals": [ "2019-01-01/2019-01-07" ]
}

Simply hit Druid endpoint with above.

curl -X 'POST' -H 'Content-Type:application/json' -d @join2_query.json -v "our_druid_server:port/druid/v2?pretty"

JOIN

Remember the original use case is that, we have 2 events we would like to join via some id assignment_id.

How come we end up ingesting Druid with the 1 datasource, where do we do the join ? and Why don’t we just create 2 datasource and ask Druid to join them ?

The answer is, Druid doesn’t do JOIN on datasource. The closest thing they offer is called “query-time lookups”, which will work if one of your datasource is small, one is large. But for our case, both datasource are large (2–5 millions events per day).

What we would end up doing, is join 2 event streams before ingesting into Druid — like join 2 Kafka steams.

This will be extra overhead on ETL pipeline, that’s one of the reason we are leaning towards Citus now.

That’s it, we are still on the early stage of building ETL pipeline, it’s a lot of fun getting into area we are not familiar with. Let’s learn as we go.

--

--