Orchestrating Airbyte with Prefect 2.0
Introducing the prefect-airbyte
collection
Airbyte is an open-source data integration tool that’s pretty great when you want to move batch data from A to B. You can define connections
between sources
and destinations
and then trigger a sync
of those connections — moving data from each source to each destination in streams
.
In Prefect 1.0, we had Airbyte Tasks to perform a trigger of a sync
and export of all the defined connections
as a gzip
file.
Here we’ll introduce prefect-airbyte
, the standalone PyPI package and the Prefect 2.0 analogue — let’s look at what we can do with this collection!
Want to try out Airbyte locally?
If you’ve got Docker, this should spin up Airbyte for you on localhost:8000
git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker-compose up
Triggering an Airbyte connection sync
Go ahead and grab the connection ID from your Airbyte UI:
Here’s an example using blocks in prefect-airbyte >= 0.2.0
AirbyteServer
block representing a running Airbyte instance at http://localhost:8000AirbyteConnection
representing an active connection defined on your Airbyte instance
from prefect import flow
from prefect_airbyte.server import AirbyteServer
from prefect_airbyte.connections import AirbyteConnection
from prefect_airbyte.flows import run_connection_sync
server = AirbyteServer(server_host="localhost", server_port=8000)
connection = AirbyteConnection(
airbyte_server=server,
connection_id="e1b2078f-882a-4f50-9942-cfe34b2d825b",
status_updates=True,
)
@flow
def airbyte_syncs():
# do some setup
sync_result = run_connection_sync(
airbyte_connection=connection,
)
# do some other things, like trigger DBT based on number of records synced
print(f'Number of Records Synced: {sync_result.records_synced}')
Here’s another example using trigger_sync
(still available in 0.2.0
):
from prefect import flow, task
from prefect_airbyte.connections import trigger_sync
@flow(name="Airbyte Sync")
def example_trigger_sync_flow():
connection_id = "e1b2078f-882a-4f50-9942-cfe34b2d825b"
trigger_sync(
connection_id=connection_id,
status_updates=True
)
each of which will run your airbyte sync to completion as a prefect flow:
❯ python airbyte_syncs.py
03:46:03 | prefect.engine - Created flow run 'thick-seahorse' for flow 'Airbyte Sync'
03:46:03 | Flow run 'thick-seahorse' - Using task runner 'ConcurrentTaskRunner'
03:46:03 | Flow run 'thick-seahorse' - Created task run 'trigger_sync-35f0e9c2-0' for task 'trigger_sync'
03:46:03 | prefect - trigger airbyte connection: e1b2078f-882a-4f50-9942-cfe34b2d825b, poll interval 3 seconds
03:46:03 | prefect - pending
03:46:06 | prefect - running
03:46:09 | prefect - running
03:46:12 | prefect - running
03:46:16 | prefect - running
03:46:19 | prefect - running
03:46:22 | prefect - Job 26 succeeded.
03:46:22 | Task run 'trigger_sync-35f0e9c2-0' - Finished in state Completed(None)
03:46:22 | Flow run 'thick-seahorse' - Finished in state Completed('All states completed.')
Exporting your Airbyte configuration archive
(EDIT) Note: As of Airbyte v0.40.7-alpha, the API shipped with open-source Airbyte no longer supports this feature — their docs seem to recommend checking out the Ocatavia CLI for connector configuration management.
Many self-hosted Airbyte users will be running on EC2 or Kubernetes, where instances can sometimes die unexpectedly due to OOM errors or provider outages. With that situation in mind, it can be useful to backup your Airbyte connectors’ configuration somewhere so that you can pick up where you left off on a new instance.
We can write our instance’s gzip
archive somewhere with the export_configuration
task from prefect_airbyte.configuration
like this:
💡 Did you know?
You can pass any valid URI prefix into
somewhere
, likes3://**/my_destination.gz
,gs://**/my_destination.gz
, etc, as long as you’re authenticated with wherever you’re trying to write to!
… and that’s a wrap!
If you encounter any bugs while using prefect-airbyte
, feel free to open an issue in the prefect-airbyte repository.
If you have any questions or issues, you can find help in either the Prefect Discourse forum or the Prefect Slack community.
Happy Engineering!