Here’s how 1mg handles multi million rows of data using Sharding
1mg has been growing as an organisation with increasing number of orders. This constant increase in growth demands scalable systems and so does our technology team’s never ending motivation to make our infrastructure better.
Problem Statement
With ten’s of thousands of orders everyday, we face many scenarios where we have to save multiple records for each order. One such scenario is saving comments on each update that happens on every order. We come across hundred of thousands of such updates everyday.
Initially we used to store all the records in single table. Eventually due to growth in orders it became difficult for us to handle latency for fetching such records (large volume of data made indexes insignificant).
Things we tried?
We tried for optimising indexes and queries but that too didn’t scale for long. Thereafter came the idea to shard our tables i.e bucketing our data in small chunks .
Our application runs on internal python framework (running on python 3.4) and uses PostgreSQL (9.4) as our database. Our expectation was to create multiple comments for each status update of every order which itself are identified by a comment ID using a big-serial ID.
Some of the key points we considered during implementation of sharding:
- Minimum impact points on application layer: We follow micro-services architecture with multiple dependent services, so we didn’t want the dependent services to have any contract changes or major changes around our comment service.
- Unique ID generation for every insert: When a single table is used , one can use auto increment ID generation of postgreSQL but in case of multiple tables one has to think around creating unique identifier across all the tables. Also, we want the ID generation to be independent of some special architecture or some ticketing system.
- Routing of queries to correct shard for CRUD operations: We wanted to have our query optimised in a way that one intended shards should be queried.
We evaluated some of the solutions with respect to above points:
Approach 1: Handling sharding on the application layer
Though efficient in terms of CRUD operations, we wanted to have least amount of changes on the application level . Also we might have to think of some solution for handling multiple services creating a unique ID for each entry. This might get complex.
Approach 2: Handling sharding on the database level
Handling CRUD operation on DB level by using logical shards and unique ID generation for insertion using postgreSQL auto-incremental and custom procedures.
We went ahead with this, as it didn’t require much changes on the application layer and it can also handle multiple instances of service sending in request to create entries on DB.
Here’s how we implemented sharding using second approach:
To create unique ID for our entries, we used custom defined procedure get_comments_table_next_id aka unique ID for our insert:
we went ahead with a 64-bit ID that would have
- 43 bits for epoch in mili seconds.
- 3 bits for physical shard number: number of different DB shards, can support upto 8 physical shards.
- 7 bits for logical shard number: can support upto 128 logical shards.
- 11 bits for auto-incrementing sequence (common database feature).
This function can generate 2048 unique IDs per milli second.
As far as table schema and insertion of data is concerned,
We created 20 logical shards AKA child tables for comments table using inheritance feature of postgreSQL. Also we insert batch of 300000 rows in round robin fashion(inserts in equal portions and in circular order).
Now the question arises as to why did we choose 20 logical shards and 300000 rows in round robin? Let’s discuss that:
We arrived at these numbers by calculating average numbers of orders we get daily and average comments associated with it. Also the reason for using round robin was that we wanted comments of an order to exist in as much less shards as possible. Let’s say, we make around 100000 entries in 3 days and average lifetime of an order is 3 days. So round robin insertion will ensure that our records for a particular order are present in 1 shard table or 2 shard tables in worst case.
Now, one of the tricky part was how to insert the entry into particular logical shard. Let’s discuss that:
In postgreSQL, allowed triggers for tables are — after/before on insert , update or delete. But in our case , we wanted to insert the entry into particular shard table rather than the master table and if we use any of the above triggers it would result in two entries — one in shard table and other in master table. To solve this problem , we used postgreSQL views for handling inserts. Now the views in postgreSQL support trigger “instead of insert on”. So, every time there is an entry to be made to the view, trigger would insert the value to particular shard rather than the view. On application layer, we would just insert the entry to this view and rest would be handled by the procedures. This is how the trigger looks like:
insert_comments_view_trigger INSTEAD OF INSERT ON insert_comments_view FOR EACH ROW EXECUTE PROCEDURE insert_comments()
Here’s how insert_comments procedure works:
CREATE OR REPLACE FUNCTION public.insert_comments()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
DECLARE
SHARD_TABLE_NAME_PREFIX text := 'comments_shard_';
TOTAL_LOGICAL_SHARDS INTEGER := get_total_logical_shards();
SHARD_NO_FOR_INSERT INTEGER;
BEGIN
SHARD_NO_FOR_INSERT := get_logical_shard_number_from_key(NEW.referral_id);
IF (SHARD_NO_FOR_INSERT < TOTAL_LOGICAL_SHARDS) THEN
IF NEW.created IS NOT NULL THEN
NEW.id := get_comments_table_next_id(SHARD_NO_FOR_INSERT, NEW.created);
ELSE
NEW.id := get_comments_table_next_id(SHARD_NO_FOR_INSERT);
SELECT now() INTO NEW.created;
NEW.updated := NEW.created;
END IF;
-- INSERT QUERY HERE FOR LOGICAL SHARD
ELSE
RAISE EXCEPTION 'Shard number exceeded allowed limit';
END IF;
IF TG_TABLE_NAME = 'comments' THEN
return NULL;
ELSE
RETURN NEW;
END IF;
END;
$function$
Now get_logical_shard_number_from_key can be custom defined, In our case we do as follows:
SHARD_NO_FOR_INSERT :=(referral::BIGINT/300000)%total_number_of_logical_shards
and get_comments_table_next_id is already explained above.
For fetching comments, we created a stored procedure get_comments that would take comment identifier (referral_id in our case) and get comments from relevant shard table using get_logical_shard_number_from_key procedure, otherwise in absence of identifier it would scan all the shards.
CREATE OR REPLACE FUNCTION public.get_comments(referral_ids text[] DEFAULT NULL::text[], comment_ids bigint[] DEFAULT NULL::bigint[], where_clause text DEFAULT NULL::text, order_by text DEFAULT NULL::text, _limit integer DEFAULT NULL::integer, _offset integer DEFAULT NULL::integer)RETURNS SETOF comments_master
LANGUAGE plpgsqlAS $function$
DECLAREtemp TEXT;
final_query TEXT := '';
main_table TEXT := 'comments';
master_table TEXT := 'comments_master';
all_tables TEXT[] := ARRAY[]::TEXT[];
all_queries TEXT[] := ARRAY[]::TEXT[];BEGINSELECT which_shards(referral_ids, comment_ids) INTO all_tables;
IF where_clause IS NOT NULL THEN
where_clause := ' where ' || where_clause;
ELSE
where_clause := '';
END IF;
FOREACH temp in ARRAY all_tables
LOOP
all_queries := array_append(all_queries, 'select * from ' || temp || where_clause);
END LOOP;
final_query := array_to_string(all_queries, ' UNION ALL ');
IF order_by IS NOT NULL THEN
final_query := final_query || ' order by ' || order_by;
END IF;
IF _limit IS NOT NULL THEN
final_query := final_query || ' limit ' || _limit::TEXT;
END IF;
IF _offset IS NOT NULL THEN
final_query := final_query || ' offset ' || _offset::TEXT;
END IF;
RETURN QUERY EXECUTE final_query;
RETURN;END;$function$
With the help of this approach, we were able bring our database query time back to normal. Moreover, it has been scaling well with our continuous growth of orders.
Hope you liked reading this blog and might have few queries in mind. Feel free to reach out to me on LinkedIn.
Also, special mention to Prashant Mishra who has been a key part of this project.