Starschema Blog
Published in

Starschema Blog

Extending Airbyte: Creating a Source Connector for Wrike

The data ingestion tool business is anything but new, and there are many solutions on the market, all with different histories and superpowers. Airbyte claims to be one of the most extensible: using their programming language-agnostic open protocols and CDKs, one can make connectors in matter of hours, if not minutes.

In this post, I will fact-check Airbyte’s claims by building a new connector from scratch for my Wrike data.

Airbyte’s Superpowers

Again, if you start designing a data platform, you’ll quickly realize that ingestion is a crowded market. Old players, dating back to 1995, are still in the game — GoldenGate is 27 years old — while new players like Fivetran, Stitch, Airbyte and Dataddo emerged with the modern data stack hype as market disruptors.
Let’s see how Airbyte is trying to stand out from this crowd:

  • The core product is open-source, including all source and destination integrations and a web UI (without built-in authentication). It makes Airbyte one of the few usable data integration products that can run on your own infrastructure, in your own cloud VPC and on-prem too.
    Yes, you can run your production Airbyte on your own Kubernetes cluster for free.
  • Connectors utilize Airbyte’s own, open standards. You can pick any of your favorite programming languages to write a connector — no need to stick to slow languages like Python. A connector written in Rust or C++ could be simply faster than interpreted or JIT-ed languages. On the other hand, there are higher-level APIs (CDKs) in Python and Java for quick development.
  • An interesting design decision I actually like is that all connectors are separate Docker containers. This is great because you do not have to deal with the dependency hell of connectors requiring different libraries. You can isolate your own code from the rest of the system, update it without the fear that it will break something else. Plus, connectors in containers help with elasticity: things can run parallel in different workers on your Kubernetes cluster.
  • And last, but not least: Airbyte claims that they will support your connectors after publishing. While you can always help, the responsibility is theirs after you share your code with them.

Good stuff. But let’s see how it fares for my use case.

The Use Case

I work at a decent-sized consulting tech company, and our presales team uses Wrike to manage sales support requests from the account teams. We’re constantly working on dozen of customer demos and presentations, and Wrike has been a trendemous help in keeping us organized. To visualize our impact, I wanted to collect this information from Wrike, store it in Snowflake and build a cool-looking dashboard on top it.

Wrike is quite developer-friendly: their API is beautiful and well documented. Everything is provided to help you get started.

Building the Wrike Source Connector

Installing Airbyte couldn’t be easier — just clone their Git repo and ramp it up with Docker Compose. To start building a new connector, use their connector generator:

cd airbyte-integrations/connector-templates/generator 
./generate.sh

I’ve selected Python HTTP API Source and called it as source-wrike. The generator will create a set of sample files in anairbyte-integrations/connectors/source-wrike folder, so we just need to customize them.

Specifying metadata

All integration components have some sort of input variables defined in source_wrike/spec.yaml. In my case, I needed three inputs: the API key, the wrike instance name and the start date for replication. My spec.yaml looked like this.

Defining source and check_connection

All sources should implement at least two functions: one for testing the connectivity to the source and another to specify what streams (tables) are available from that source.

For web-based APIs a simple request-based invocation should be sufficient to test if the API key and Wrike instance are properly configured:

To test the connection, create a config.json with the same properties defined in spec.yaml, then run the following:

python main.py check --config secrets/config.json
{"type": "LOG", "log": {"level": "INFO", "message": "Ping response code: 200"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Check succeeded"}}
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}

Looks rad so far. Now, let’s define the output streams inside the same SourceWrike class:

Airbyte has two Authenticator classes to handle HTTP API authentications: a TokenAuthenticator to deal with Bearer tokens and one for OAuth 2.0 authentication flows. Theoretically, you can just pass this authenticator object to your stream objects — this will be the next step — and Airbyte will do the rest. You don’t have to worry about setting headers, managing secrets and so on. Pretty convenient.

Defining streams

In the previous steps, I defined the stream objects (Tasks, Folders, etc.), but now we have to actually implement them. Since the API endpoints are really similar, the pagination is the same and the URL base is the same, I made a neat superclass for the actual streams:

There’s a few things going on, so let me explain:

  • In __init__ we set the URL base. This is what Airbyte will use when making the HTTP calls — yes, Airbyte will issue the HTTP request, we just need to instruct what we need. All caching, throttling, error handling is done by the CDK, not by us.
  • In next_page_token, we parse the next page token from the response. This will be passed to all subsequent function calls by Airbyte.
  • The request_params will return the parameters that need to be added to the request. At the very least, the next_page_token should be added in case we have one.
  • The parse_response is the actual code that parses the response and returns with the data in JSON format.
  • The path function (or property) returns with path needs to be concatenated with the base_url. In my case, the default path name is the same as the class name — for example, Tasks will use thetasks API endpoint.

What is important here is to define base_url, path and request_params. Airbyte will make the HTTP request and call parse_response.

Now, we can go ahead and define the stream classes:

In case of Tasks, I added a plus parameter to retrieve more fields. All I had to was to override request_params.

Defining static schema

The most tedious part is to define the output schema. You need to place one JSON file for each stream as source_wrike/schema/<stream_name>.json. No magic here, all properties need their data type(s). Later, Airbyte will use this information to build database tables and flatten JSON hierarchies when asked.

For the Comments stream, my comments.json is:

{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": ["null", "string"]
},
"authorId": {
"type": ["null", "string"]
},
"text": {
"type": ["null", "string"]
},
"taskId": {
"type": ["null", "string"]
},
"createdDate": {
"type": ["null", "string"],
"format": "date-time"
},
"updatedDate": {
"type": ["null", "string"],
"format": "date-time"
}
}
}

To test the schema discovery, run the following command:

python main.py discover --config sample_files/config.json

You should see all of your streams’ schema. Save this file, as next up is the most annoying part of the development: we need to create a configured_catalog.json file. This is the combination of the stream schema with additional properties on how to retrieve files like supported_sync_modes or destination_sync_mode.

{
"streams": [
{
"stream": {
"name": "tasks",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": "string"
},
// other properties, as defined in schema
},
"supported_sync_modes": [ "full_refresh" ],
"source_defined_primary_key": [[ "id" ]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
// next stream

Save this file as sample_files/configured_catalog.json and run the following command to actually read records from the API:

python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json

If it looks good — and it should — we should be able to build a nice Docker container from it:

docker build . -t airbyte/source-wrike:dev

More trouble: iterations, slices

During development, I realized that the comments endpoint limits the query to seven days. This means I need to iterate from the initial replication time until today in seven-day chunks and retrieve the data for that period only. Airbyte has a concept for slices: it is a nifty method for defining your own partitioning logic for your stream. It can be used to improve performance by parallelization or handle situations like mine. I ended up implementing stream_slices and request_params with the following logic:

The request_param function will be called as many times as the number of records emitted by stream_slices with the current slice data.

The source code of the connector is here in case you want to have a look.

Using My New Connector from the UI

After local development, I decided to deploy Airbyte in Kubernetes (well, I deploy everything there, so this shouldn’t come as a surprise). To deploy it, I only had to issue the following:

kubectl apply -k kube/overlays/stable

Then, I logged in by using the superb kubefwd. Please note that the open-source version does not include authentication — it is advised to put a reverse proxy in front of it, adding the necessary SAML or OAuth 2.0 authentication.

Adding the Connector from the UI

Head to Settings, then hit + New Connector to configure the source connector as Docker container.

Next, let’s head over to Sources and select +New Source. In the dropdown, we should already see our newly created connector:

After filling outthe configuration forms, we are ready to roll.

Results

After configuring a destination — Snowflake, in my case — we can start the sync process and obtain the results on the target side. If you enabled Normalized tabular data in the settings, Airbyte will create flattened tables from your JSON structures, using dbt internally.

In case of a Snowflake destination, you can directly access the JSON results as VARIANTs in _airbyte_raw_<stream> tables or as normalized tables like src_folders or src_tasks. If a stream had a nested object, Airbyte will also create a subtable for our convenience:

The “folders_project” table is the “project” nested object in the folders stream.

Everything is in the stage area — job done.

Summary

After an hour or two, I immediately felt fluent in Airbyte, and during this time I was able to make my first connector using their Python CDK. The documentations were great: they have short and more detailed ones for all kinds of audiences. I also like the Docker-based approach, as managing connectors centrally using Docker repositories feels convenient and safe.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Tamas Foldi

Tamas Foldi

Tamas is co-founder and CTO of data services firm Starschema where he leads the Starschema technical team to deliver results for the most innovative enterprises