Building Data Pipeline Step 1: A Stable Connection To Data Source

Dakai Zhou
4 min readApr 24, 2022

--

Data pipeline development is a common task that is always required to be implemented by people who work as a Data Engineer, an ETL Developer, or a Business Intelligence Developer. There are some cases in which this can be done just by connecting different tools. However, in most scenarios, developers need to go through the API(application programming interface) documentation of the data source and build the whole data pipeline by using one of the program languages.

What Is the Connection?

Most platforms provide an API for developers to do data retrieval, insertion, deletion or updates programmatically. This is also the interface we get the data from them. Usually, data retrieval includes single data retrieval and multiple data listing. As our goal is to move data from data sources to our own database, the method of data retrieval is often data listing. Therefore, the connection is a process that is able to call data from data sources via APIs.

What Is a Good Data Source Connection?

In my perspective, a good data source connection is functional, stable, and scalable.

Functional means the connection can take all possible parameters that a data endpoint can have. Stable is the connection will not be easily broken due to an unstable network or short time temporal server errors. Scalable says the connection works properly no matter the size of data volume.

The Implementation

First of all, check whether there is an official package or library in your programming language that can be used directly. If there is any, the API connection part can be skipped. If not, the API connection needs to be established from scratch.

In both cases, API documentation should always be your “Bible”, it contains authentification, pagination, endpoints, and all important information. Meanwhile, the official forum is also a good place to ask something that cannot be found in the API documentation or to find frequently asked questions by developers. Here, Pipedrive is used as an example platform for demonstrating. Although it provides a python API client that developers can directly use, by following the purpose of this blog, a self-developed API client/connection will be implemented.

Token Security

Before starting implementation, an API token needs to be generated, mostly in the platform. As a rule of thumb, for security reasons, any API token or key should never be hardcoded in your code or uploaded to a version control system like Github, SVN. API tokens can be stored in a safe place, such as a database or a system variable.

Establish the Basic API Connection

From the documentation, we have the request URL https://api.pipedrive.com/{api_version}/{endpoint} with token passed as parameter api_token. As mentioned in the endpoint documentation, start and limit are two common parameters for all the endpoints. These two parameters are also important to pagination, so they will be two inputs in our function. Parameter start tells the API where to start to get the data and parameter limit adjusts the amount of data getting from the API. Apart from this, parameter params will also be passed to the request for any other possible parameters.

Considering a stable connection, a retry will be added to the connection function. This will enable the connection to tolerate temperate network issues or server issues.

To adapt it to other platforms, just change the request URL and the parameters.

Most of the APIs have a data amount limitation for each API call. Therefore, with the basic API connection above, it can only get the up_limit (500, or 1000 depending on your platform) entries.

Data Retrieval Method

With the basic connection above, the number of entries that can be retrieved is limited, up to the up limit. To make it scalable, pagination information is required. API response usually contains pagination information in two or three flags. One indicates whether there is more data for this endpoint, like more_items_in_collection in Pipedrive. One gives information about the next start item for the next page, in Pipedrive is next_start. The flag names are varied on different platforms.

The main workflow is as follows. It will collect all data entries and return all the data. It is the fastest way to get all the data needed.

Data retrieval method workflow

Following the workflow, the code can be:

A More Scalable Data Retrieval Method

The data retrieval method shown above retrieves all data entries of an endpoint. When the endpoint has a huge amount of data or the system memory is small, it causes memory issues. To make it scalable, the whole data retrieval process should be split into several smaller sessions. So, a chunk_size can be defined, which defines how many data entries the data retrieval process should get. Then the data pipeline does the next steps with this amount of data till the last step. After this amount of data is processed, the data pipeline goes back to the data retrieval process to get the next set of data which starts from next_start, and repeats. In this way, the time consumption increases, but it reduces the burden of system resources and makes it scalable.

Scalable data retrieval method workflow

Here is the example code snippet:

Hope you enjoy it. Step 2 for building a data pipeline will come soon, stay tuned.

--

--

Dakai Zhou

A Python developer and data engineer based in Germany.