Creating, serving & storing data for discovery

Tao Tao | Pinterest engineer, Discovery

With tens of millions of Pinners, and tens of billions of Pins, there’s a massive supply of data to power discovery projects on Pinterest such as search, recommendations, and interests to provide unlimited exploration opportunities. Here we describe how these data are calculated, stored and eventually served online.

Building the data model for the interest graph

Pinterest gives people a way to organize information on the web in a way that makes sense to them. Every Pin exists because someone thought it was important enough to add to a collection. The discovery part of Pinterest comes in when we can connect people to related Pins and Pinners they may be interested in. The more a person pins, the more connections can be created.

Discovery data leverages basic objects that share connections, aggregate them, and add more complex information. We call these aggregations PinJoin, BoardJoin, and UserJoin. PinJoin is no longer a single Pin. Instead, we use the image signatures to group all pins with the same image. BoardJoin and UserJoin groups are still using board id and user id, respectively.

PinJoin, BoardJoin, and UserJoin each contains three types of information:

  • Raw data: information input from users.
  • Derived features: information we learn from raw data.
  • Other joins: PinJoin, BoardJoin, and UserJoin each contains the other two joins.

The benefit of aggregating all information is two folds. First, it helps us correct inaccurate user input. For example, a board category may be wrongly assigned by its owner. However, its underlying pins will be repinned to many other boards that are accurately assigned. Utilizing aggregated data with connections, we can identify the wrongly assigned board. Second, additional features can be derived from raw data. For example, PinJoin contains all repin chains. It helps us construct a repin graph and run PageRank algorithm on the graph to calculate the importance of each board. All derived features are fed back to PinJoins, BoardJoins, and UserJoins.

Data calculation

Discovery data are calculated offline using a hadoop cluster. This figure illustrates a typical workflow of all jobs, with more than 200 jobs scheduled to run everyday. Job dependency is handled by Pinball , and data are stored in Amazon s3. We use the date and the iteration number to do version control.

Access and storage

Discovery data are loaded into different storages after creation, based on data size and data importance. We support the following four types of access:

Production random access using keys:

  • Random access with very high QPS
  • Response time is less than 20 milliseconds at P99
  • Limited space

Redshift batch access:

  • Fast batch access
  • Response time for simple aggregation operations is around one second
  • Limited space

Hive/cascading batch access:

  • Slow batch access
  • Response time for simple aggregation can be up to multiple minutes
  • Unlimited space

Debugging random access using keys:

  • Random access with very low QPS
  • Response time is multiple seconds for a single key
  • Unlimited space

We use HFile format to store all data, which enables us to randomly access it on s3 without further indexing. (all data are debuggable offline). We create a light version for each set of data, where unimportant yet large fields are unset in the light version so that it can fit into space-limited storage. We use both HBase and in-house developed storage systems to hold production random access data.

Discovery data serving

We design our online high QPS serving system to achieve three high-level objectives:

  • It should be able to support multiple external storages.
  • It should be able to support multiple data sets independently.
  • It should be able to support arbitrary mixing of different data.

The serving system architecture is illustrated in the figure below. It’s a finagle thrift service to respond to given queries. Response is a list of object lists. Thus, different types of objects can be put into different lists inside a single Response object.

struct Response {

1: optional list responses;

}

Queries are handled by a scheduler. Upon receiving a query, the scheduler picks up Processors from a processor pool. Processors share the common API, and return a Future of a response object, which allows the processors be chained together arbitrarily.

public abstract class Processor {

public abstract Future

process(Request request, ResponseData prev);

}

An execution plan is an execution relationship of a set of processors. A valid one should contain no circles (DAG ). Execution plans are either defined by a query or loaded from configurations. After receiving a query, the scheduler verifies the validity of its execution plan, performs a topology sorting, and issues the task one by one. The top level processors fetch data from external storages or services. Data are further fed to other processors, and eventually transformed to a Response.

The finagle service is using async Future call. When issuing tasks, the scheduler doesn’t have to wait for the processing completion of each processor. Instead, it directly operates on Future objects in three ways:

  • Chain one Future object to the next Future object.
  • Merge multiple Future objects into a single Future object.
  • Split a single Future object into multiple ones.

Lessons in data

Data are fundamental units for all discovery projects. It is important to make them fresh and accessible. We learned several lessons, including:

  • Applications often have different needs. Offline analytics and online products have different access patterns. Creating data in multiple tiers is necessary.
  • A flexible serving system can quickly power experiments and products, and largely accelerate product development.

This project was a joint effort of the whole discovery team at Pinterest. If you are interested in being part of it, join our team!

Tao Tao is a software engineer at Pinterest.