Using Airbyte Cloud with Dagster#

Using self-hosted Airbyte? Check out the guide on using self-hosted Airbyte with Dagster.

Dagster can orchestrate your Airbyte Cloud connections, making it easy to chain an Airbyte sync with upstream or downstream steps in your workflow.

This guide focuses on how to work with Airbyte Cloud connections using Dagster's software-defined asset (SDA) framework.

Screenshot of the Airbyte UI and Dagster UI in a browser.

Airbyte Cloud connections and Dagster software-defined assets#

An Airbyte Cloud connection defines a series of data streams which are synced between a source and a destination. During a sync, a replica of the data from each data stream is written to the destination, typically as one or more tables. Dagster represents each of the replicas generated in the destination as a software-defined asset. This enables you to easily:

  • Visualize the streams involved in an Airbyte Cloud connection and execute a sync from Dagster
  • Define downstream computations which depend on replicas produced by Airbyte
  • Track data lineage through Airbyte and other tools

Prerequisites#

To get started, you will need to install the dagster and dagster-airbyte Python packages:

pip install dagster dagster-airbyte

You'll also need to have an Airbyte Cloud account, and have created an Airbyte API Key. For more information, see the Airbyte API docs.


Step 1: Connecting to Airbyte Cloud#

The first step in using Airbyte Cloud with Dagster is to tell Dagster how to connect to your Airbyte Cloud account using an Airbyte Cloud resource. This resource handles your Airbyte Cloud credentials and any optional configuration.

from dagster import EnvVar
from dagster_airbyte import AirbyteCloudResource

airbyte_instance = AirbyteCloudResource(
    api_key=EnvVar("AIRBYTE_API_KEY"),
)

Here, the API key is provided using an EnvVar. For more information on setting environment variables in a production setting, see Using environment variables and secrets.


Step 2: Building Airbyte Cloud assets using Dagster#

In order to create software-defined assets for your Airbyte Cloud connections, you will first need to determine the connection IDs for each of the connections you would like to build assets for. The connection ID can be seen in the URL of the connection page when viewing the Airbyte Cloud UI, located between /connections/ and /status.

For example, the connection ID for the URL https://cloud.airbyte.com/workspaces/11f3741b-0b54-45f8-9886-937f96f2ba88/connections/43908042-8399-4a58-82f1-71a45099fff7/status is 43908042-8399-4a58-82f1-71a45099fff7.

Screenshot of the Airbyte UI in a browser, showing the connection ID in the URL.

Then, supply the connection ID and the list of tables which the connection creates in the destination to build_airbyte_assets. This utility will generate a set of software-defined assets corresponding to the tables which Airbyte creates in the destination.

from dagster_airbyte import build_airbyte_assets

airbyte_assets = build_airbyte_assets(
    connection_id="43908042-8399-4a58-82f1-71a45099fff7",
    destination_tables=["releases", "tags", "teams"],
)

Adding a resource#

The Airbyte assets constructed using build_airbyte_assets require an Airbyte resource which defines how to connect and interact with your Airbyte Cloud instance.

We can add the Airbyte Cloud resource we configured above to our Airbyte assets by providing it to our Definitions.

from dagster_airbyte import build_airbyte_assets, AirbyteCloudResource

from dagster import Definitions, EnvVar

airbyte_instance = AirbyteCloudResource(
    api_key=EnvVar("AIRBYTE_API_KEY"),
)
airbyte_assets = build_airbyte_assets(
    connection_id="43908042-8399-4a58-82f1-71a45099fff7",
    destination_tables=["releases", "tags", "teams"],
)

defs = Definitions(assets=[airbyte_assets], resources={"airbyte": airbyte_instance})

Step 3: Adding downstream assets#

Looking to orchestrate Airbyte with dbt? Check out our Modern Data Stack example and our dbt integration docs.

Once you have loaded your Airbyte Cloud assets into Dagster, you can create assets which depend on them. These can be other assets pulled in from external sources such as dbt or assets defined in Python code.

In this case, we have an Airbyte Cloud connection that stores data in our Snowflake warehouse's stargazers table. We specify the output I/O manager to tell downstream assets how to retrieve the data.

import json
from dagster import asset, Definitions, define_asset_job, AssetSelection, EnvVar
from dagster_airbyte import (
    build_airbyte_assets,
    AirbyteCloudResource,
)
from dagster_snowflake_pandas import SnowflakePandasIOManager
import pandas as pd

airbyte_instance = AirbyteCloudResource(
    api_key=EnvVar("AIRBYTE_API_KEY"),
)
airbyte_assets = build_airbyte_assets(
    connection_id="43908042-8399-4a58-82f1-71a45099fff7",
    destination_tables=["releases", "tags", "teams"],
)

@asset
def stargazers_file(stargazers: pd.DataFrame):
    with open("stargazers.json", "w", encoding="utf8") as f:
        f.write(json.dumps(stargazers.to_json(), indent=2))

# only run the airbyte syncs necessary to materialize stargazers_file
my_upstream_job = define_asset_job(
    "my_upstream_job",
    AssetSelection.keys("stargazers_file")
    .upstream()  # all upstream assets (in this case, just the stargazers Airbyte asset)
    .required_multi_asset_neighbors(),  # all Airbyte assets linked to the same connection
)

defs = Definitions(
    jobs=[my_upstream_job],
    assets=[airbyte_assets, stargazers_file],
    resources={
        "snowflake_io_manager": SnowflakePandasIOManager(...),
        "airbyte_instance": airbyte_instance,
    },
)

Step 4: Scheduling Airbyte Cloud syncs#

Once you have Airbyte Cloud assets, you can define a job that runs some or all of these assets on a schedule, triggering the underlying Airbyte sync.

from dagster_airbyte import AirbyteCloudResource, build_airbyte_assets

from dagster import (
    EnvVar,
    ScheduleDefinition,
    define_asset_job,
    Definitions,
)

airbyte_instance = AirbyteCloudResource(
    api_key=EnvVar("AIRBYTE_API_KEY"),
)
airbyte_assets = build_airbyte_assets(
    connection_id="43908042-8399-4a58-82f1-71a45099fff7",
    destination_tables=["releases", "tags", "teams"],
)

# materialize all assets
run_everything_job = define_asset_job("run_everything", selection="*")

defs = Definitions(
    assets=[airbyte_assets],
    schedules=[
        ScheduleDefinition(
            job=run_everything_job,
            cron_schedule="@weekly",
        ),
    ],
    resources={"airbyte": airbyte_instance},
)

Refer to the Schedule documentation for more info on running jobs on a schedule.


Conclusion#

If you find a bug or want to add a feature to the dagster-airbyte library, we invite you to contribute.

If you have questions on using Airbyte with Dagster, we'd love to hear from you:

join-us-on-slack