Using dbt Cloud with Dagster#

Using the local dbt Core CLI? Check out the dbt Core with Dagster guide!

Dagster allows you to run dbt Cloud alongside other technologies like Spark, Python, etc., and has built-in support for loading dbt Cloud models, seeds, and snapshots as software-defined assets.


Prerequisites#

To get started, you will need to install the dagster and dagster-dbt Python packages:

pip install dagster dagster-dbt

You'll also want to have a dbt Cloud instance with an existing project that is deployed with a dbt Cloud job. If you don't have one already, you can set up dbt Cloud with a sample project.

To manage the dbt Cloud job from Dagster, you'll need three values:

  1. An auth_token for connecting with the dbt Cloud API, stored in an environment variable DBT_CLOUD_API_TOKEN,
  2. The account_id of your dbt Cloud account, stored in an environment variable DBT_CLOUD_ACCOUNT_ID, and
  3. The job_id of the dbt Cloud job you want to manage in Dagster

The auth_token can also be found by generating a Service account token in the dbt Cloud console.

The account_id and job_id can be obtained by inspecting the URL of the dbt Cloud job in the dbt Cloud console. For example, in this screenshot, the account_id is 111111 and the job_id is 33333.

Screenshot of the dbt Cloud console on the job page.

Step 1: Connecting to dbt Cloud#

The first step in using dbt Cloud with Dagster is to tell Dagster how to connect to your dbt Cloud instance using a dbt Cloud resource. This resource contains information on where the dbt Cloud instance is located and any credentials sourced from environment variables that are needed to access it.

from dagster_dbt import DbtCloudClientResource
from dagster import EnvVar

dbt_cloud_instance = DbtCloudClientResource(
    auth_token=EnvVar("DBT_CLOUD_API_TOKEN"),
    account_id=EnvVar.int("DBT_CLOUD_ACCOUNT_ID"),
)

Step 2: Loading dbt Cloud models as assets#

In this step, you'll load the dbt Cloud models managed by a dbt Cloud job into Dagster as assets. For context, a dbt Cloud job defines set of commands to run for a dbt Cloud project. The dbt Cloud models managed by a dbt Cloud job are the models that are run by the job after filtering options are respected.

Using our dbt Cloud resource, we can retrieve information about the models that the dbt Cloud job is managing.

from dagster_dbt import load_assets_from_dbt_cloud_job

# Use the dbt_cloud_instance resource we defined in Step 1, and the job_id from Prerequisites
dbt_cloud_assets = load_assets_from_dbt_cloud_job(
    dbt_cloud=dbt_cloud_instance,
    job_id=33333,
)

We support dbt Cloud jobs with multiple commands, but we require one of the commands be a materialization command. A materialization command is one of either `dbt run` or `dbt build`, along with any optional command line arguments.

The load_assets_from_dbt_cloud_job function loads the dbt Cloud models into Dagster as assets, creating one Dagster asset for each model.

When invoked, the function:

  1. Invokes your dbt Cloud job with command overrides to compile your dbt project,
  2. Parses the metadata provided by dbt Cloud, and
  3. Generates a set of software-defined assets reflecting the models in the project managed by the dbt Cloud job. Materializing these assets will run the dbt Cloud job that is represented by the loaded assets.

Step 3: Schedule dbt Cloud job runs#

Now that your dbt Cloud assets are loaded, you can define a Dagster job that materializes some or all of these assets, triggering the underlying dbt Cloud job.

You can explicitly define when your assets should be materialized. For example, you can schedule assets based on their upstream or downstream dependencies, external events using a sensor, or a cron schedule.

from dagster import (
    ScheduleDefinition,
    define_asset_job,
    AssetSelection,
    Definitions,
)

# Materialize all assets
run_everything_job = define_asset_job("run_everything_job", AssetSelection.all())

# Materialize only the staging assets
run_staging_job = define_asset_job(
    "run_staging_job", AssetSelection.groups("staging")
)

defs = Definitions(
    # Use the dbt_cloud_assets defined in Step 2
    assets=[dbt_cloud_assets],
    schedules=[
        ScheduleDefinition(
            job=run_everything_job,
            cron_schedule="@daily",
        ),
        ScheduleDefinition(
            job=run_staging_job,
            cron_schedule="@hourly",
        ),
    ],
)

Step 4: Cache the dbt Cloud job compilation#

This step is optional.

You'll need the following secrets in your GitHub repository and local environment:

  • DBT_CLOUD_API_TOKEN - The API token for your dbt Cloud account, as described in the Prerequisites.
  • DBT_CLOUD_ACCOUNT_ID - The id of your dbt Cloud account, as described in the Prerequisites.
  • DBT_CLOUD_PROJECT_ID - The id of your dbt Cloud project, that corresponds to the dbt project managed in git.

Step 4.1 Add DBT_DAGSTER_COMPILE_RUN_ID environment variable to dbt Cloud#

In order to cache the compilation of your dbt project, you'll need to add an environment variable to your dbt Cloud project. Navigate to dbt Cloud's Environment Variables, and set the DBT_DAGSTER_COMPILE_RUN_ID variable with default values of -1 at the project and environment level.

Step 4.2 Override DBT_DAGSTER_COMPILE_RUN_ID on dbt Cloud jobs managed by Dagster#

Next, in the dbt Cloud jobs that you are managing with Dagster, you'll need to override the DBT_DAGSTER_COMPILE_RUN_ID environment variable. The presence of this override at the job level will cause the dbt Cloud job to use the cached compilation of your dbt project. Set the job level override to also be -1.

Step 4.3 Test out the cache locally#

Run dagster-dbt-cloud cache-compile-references to populate the cache for your dbt Cloud jobs. This command will compile your dbt project and cache the compilation. The compilation will be cached for each dbt Cloud job that you are managing with Dagster. After running this command, the DBT_DAGSTER_COMPILE_RUN_ID environment variable for your job will now be updated to the run id of the cached compilation.

When running dagster dev locally, your dbt Cloud assets will now be loaded from the cached compilation, rather than compiling when the Dagster definitions are loaded.

Step 4.4: Set up a GitHub action to automate the cache#

A reference to this cached compilation can be automated by using a GitHub action on your dbt Cloud project. For example, add a GitHub action that will cache the compilation of your dbt project. This action can be triggered on a push to your main branch.

name: Compile dbt Cloud project
on:
  push:
    branches:
      - main
jobs:
  build:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - uses: actions/setup-python@v2
        with:
          python-version: "3.9"
      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install dagster_dbt
      - name: Run commit script
        run: python -m dagster_dbt.cloud.cli cache-compile-references
        env:
          DBT_CLOUD_API_TOKEN: ${{ secrets.DBT_CLOUD_API_TOKEN }}
          DBT_CLOUD_ACCOUNT_ID: ${{ secrets.DBT_CLOUD_ACCOUNT_ID }}
          DBT_CLOUD_PROJECT_ID: ${{ secrets.DBT_CLOUD_PROJECT_ID }}

What's next?#

By now, you should have a working dbt Cloud and Dagster integration and a handful of materialized Dagster assets.

What's next? From here, you can: