Orchestrating Airbyte with Prefect 2.0

Introducing the prefect-airbyte collection

Nate Nowack
The Prefect Blog

--

A blue duck on a roller coaster

Airbyte is an open-source data integration tool that’s pretty great when you want to move batch data from A to B. You can define connections between sources and destinations and then trigger a sync of those connections — moving data from each source to each destination in streams.

In Prefect 1.0, we had Airbyte Tasks to perform a trigger of a sync and export of all the defined connections as a gzip file.

Here we’ll introduce prefect-airbyte, the standalone PyPI package and the Prefect 2.0 analogue — let’s look at what we can do with this collection!

Want to try out Airbyte locally?

If you’ve got Docker, this should spin up Airbyte for you on localhost:8000

git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker-compose up

Triggering an Airbyte connection sync

Go ahead and grab the connection ID from your Airbyte UI:

The connector ID can be found in the URL when navigated to your Connection in the UI

Here’s an example using blocks in prefect-airbyte >= 0.2.0

  • AirbyteServer block representing a running Airbyte instance at http://localhost:8000
  • AirbyteConnection representing an active connection defined on your Airbyte instance
from prefect import flow
from prefect_airbyte.server import AirbyteServer
from prefect_airbyte.connections import AirbyteConnection
from prefect_airbyte.flows import run_connection_sync

server = AirbyteServer(server_host="localhost", server_port=8000)

connection = AirbyteConnection(
airbyte_server=server,
connection_id="e1b2078f-882a-4f50-9942-cfe34b2d825b",
status_updates=True,
)

@flow
def airbyte_syncs():
# do some setup

sync_result = run_connection_sync(
airbyte_connection=connection,
)

# do some other things, like trigger DBT based on number of records synced
print(f'Number of Records Synced: {sync_result.records_synced}')

Here’s another example using trigger_sync(still available in 0.2.0):

from prefect import flow, task
from prefect_airbyte.connections import trigger_sync

@flow(name="Airbyte Sync")
def example_trigger_sync_flow():
connection_id = "e1b2078f-882a-4f50-9942-cfe34b2d825b"
trigger_sync(
connection_id=connection_id,
status_updates=True
)

each of which will run your airbyte sync to completion as a prefect flow:

❯ python airbyte_syncs.py
03:46:03 | prefect.engine - Created flow run 'thick-seahorse' for flow 'Airbyte Sync'
03:46:03 | Flow run 'thick-seahorse' - Using task runner 'ConcurrentTaskRunner'
03:46:03 | Flow run 'thick-seahorse' - Created task run 'trigger_sync-35f0e9c2-0' for task 'trigger_sync'
03:46:03 | prefect - trigger airbyte connection: e1b2078f-882a-4f50-9942-cfe34b2d825b, poll interval 3 seconds
03:46:03 | prefect - pending
03:46:06 | prefect - running
03:46:09 | prefect - running
03:46:12 | prefect - running
03:46:16 | prefect - running
03:46:19 | prefect - running
03:46:22 | prefect - Job 26 succeeded.
03:46:22 | Task run 'trigger_sync-35f0e9c2-0' - Finished in state Completed(None)
03:46:22 | Flow run 'thick-seahorse' - Finished in state Completed('All states completed.')

Exporting your Airbyte configuration archive

(EDIT) Note: As of Airbyte v0.40.7-alpha, the API shipped with open-source Airbyte no longer supports this feature — their docs seem to recommend checking out the Ocatavia CLI for connector configuration management.

Many self-hosted Airbyte users will be running on EC2 or Kubernetes, where instances can sometimes die unexpectedly due to OOM errors or provider outages. With that situation in mind, it can be useful to backup your Airbyte connectors’ configuration somewhere so that you can pick up where you left off on a new instance.

We can write our instance’s gzip archive somewhere with the export_configuration task from prefect_airbyte.configuration like this:

an example of how to use the export_configuration task

💡 Did you know?

You can pass any valid URI prefix into somewhere , like s3://**/my_destination.gz , gs://**/my_destination.gz , etc, as long as you’re authenticated with wherever you’re trying to write to!

… and that’s a wrap!

If you encounter any bugs while using prefect-airbyte, feel free to open an issue in the prefect-airbyte repository.

If you have any questions or issues, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

Happy Engineering!

--

--

Nate Nowack
The Prefect Blog

hi, I’m Nate. I’m a software engineer with Prefect. I’m a lover of music, mexican food, and automating boring things with python.