dagster-dbt integration reference#

Using dbt Cloud? Check out the dbt Cloud with Dagster guide!

This reference provides a high-level look at working with dbt models through Dagster's software-defined assets framework using the dagster-dbt integration library.

For a step-by-step implementation walkthrough, refer to the Using dbt with Dagster software-defined assets tutorial.


Loading dbt models from a dbt project#

The dagster-dbt library offers two methods of loading dbt models from a project into Dagster:

Loading models using load_assets_from_dbt_project#

Check out part two of the dbt + Dagster tutorial to see this concept in context.

For smaller dbt projects where compilation time is not a concern, the simplest way to load your dbt assets into Dagster is the following:

from dagster_dbt import load_assets_from_dbt_project

dbt_assets = load_assets_from_dbt_project(project_dir="path/to/dbt/project")

The load_assets_from_dbt_project function:

  1. Compiles your dbt project,
  2. Parses the metadata that dbt provides, and
  3. Generates a set of software-defined assets that reflect the models in the project. These assets will share the same underlying operation, which will invoke dbt to run the models represented by the loaded assets.

Loading models using load_assets_from_dbt_manifest#

For larger dbt projects, the overhead involved with recompiling the entire project may be a concern. In these cases, you can load dbt models from an existing dbt manifest.json file using the load_assets_from_dbt_manifest function:

import json

from dagster_dbt import load_assets_from_dbt_manifest

with open("path/to/dbt/manifest.json") as f:
    manifest_json = json.load(f)

dbt_assets = load_assets_from_dbt_manifest(manifest_json)

If you make any changes to your dbt project that change the structure of the project (such as changing the dependencies of a model or adding a new one), you'll need to regenerate your manifest file for those changes to be reflected in Dagster.


Using the DbtCli for dbt CLI commands#

Check out part three of the dbt + Dagster tutorial to see this concept in context.

Assets loaded from dbt require a dbt resource, which is responsible for firing off dbt CLI commands. The dagster-dbt integration provides the DbtCli for this purpose. This resource can be configured with CLI flags that are passed into every dbt invocation.

The most important flag to set is the project_dir flag, which points Dagster at the directory of your dbt project. You can also use the target flag to point at an explicit dbt target. For a full list of configuration options, refer to the DbtCli API docs.

You can configure this resource and add it to your dbt assets by doing the following:

import os

from dagster_dbt import DbtCli, load_assets_from_dbt_project

from dagster import Definitions

DBT_PROJECT_PATH = "path/to/dbt_project"
DBT_TARGET = "hive" if os.getenv("EXECUTION_ENV") == "prod" else "duckdb"

defs = Definitions(
    assets=load_assets_from_dbt_project(DBT_PROJECT_PATH),
    resources={"dbt": DbtCli(project_dir=DBT_PROJECT_PATH, target=DBT_TARGET)},
)

Scheduling dbt jobs#

Once you have your dbt assets, you can define a job that runs some or all of these assets on a schedule:

from dagster import ScheduleDefinition, define_asset_job, Definitions

run_everything_job = define_asset_job("run_everything", selection="*")

# only `order_stats` and its children
run_something_job = define_asset_job("run_something", selection="order_stats*")

defs = Definitions(
    assets=dbt_assets,
    schedules=[
        ScheduleDefinition(
            job=run_something_job,
            cron_schedule="@daily",
        ),
        ScheduleDefinition(
            job=run_everything_job,
            cron_schedule="@weekly",
        ),
    ],
)

Refer to the Schedule documentation for more info on running jobs on a schedule.


Understanding asset keys#

In Dagster, each asset has an asset key to identify it. Dagster automatically generates these keys for each dbt node in the project as well as the sources for each node.

For dbt models, seeds, and snapshots#

For models, seeds, and snapshots, the default asset key will be the configured schema for that node (if any), concatenated with the name of the node.

For example, if you have configured a custom schema for a subdirectory in your dbt_project.yml file:

models:
  my_project:
    marketing:
      +schema: marketing

Then the asset key for a model named some_model will be marketing/some_model. If you haven't configured a custom schema, then the asset key will be some_model.

For dbt sources#

For sources, the default asset key will be the name of the source concatenated with the name of the source table.

For example, the source table defined in the following sources.yaml will be jaffle_shop/orders:

sources:
  - name: jaffle_shop
    tables:
      - name: orders

Adding a prefix to asset keys#

A common pattern is to use the prefix of an asset key to indicate what database an asset is stored in. For example, you might want all of your assets stored in Snowflake to start with the prefix snowflake.

To add a prefix to the models generated by your dbt project, you can pass in a key_prefix argument to either the load_assets_from_dbt_manifest or load_assets_from_dbt_project function:

dbt_assets = load_assets_from_dbt_project(
    "path/to/dbt_project",
    key_prefix="snowflake",
)

Note: The key_prefix argument only applies to assets loaded from this method. If you want to apply a prefix to the source keys that Dagster generates, pass in a source_key_prefix argument:

dbt_assets = load_assets_from_dbt_project(
    "path/to/dbt_project",
    source_key_prefix="snowflake",
)

dbt models, code versions, and staleness#

Note that Dagster allows the optional specification of a code_version for each software-defined asset, which are used to track changes. The code_version for an asset arising from a dbt model is defined automatically as the hash of the SQL defining the DBT model. This allows the asset graph in the UI to indicate which dbt models have new SQL since they were last materialized.


Defining dependencies#

Upstream dependencies#

Check out parts two and three of the dbt + Dagster tutorial to see this concept in context.

Dagster parses information about assets that are upstream of specific dbt models from the dbt project itself. Whenever a model is downstream of a dbt source, that source will be parsed as an upstream asset.

For example, if you defined a source in your sources.yml file like this:

sources:
  - name: jaffle_shop
    tables:
      - name: orders

and use it in a model:

select *
  from {{ source("jaffle_shop", "orders") }}
 where foo=1

Then the asset created for that model will be given an upstream asset key of jaffle_shop/orders. In many cases, this upstream asset might also be managed by Dagster.

If you add an asset definition to your repository which produces jaffle_shop/orders, then this asset will be upstream of your dbt model:

@asset(key_prefix="jaffle_shop")
def orders():
    return ...

Downstream dependencies#

Check out part four of the dbt + Dagster tutorial to see this concept in context.

Dagster allows you to define assets that are downstream of specific dbt models. One property of dbt-based assets is that the external tool - in this case, dbt - handles storing each model in the database internally, rather than Dagster directly storing the tables that are updated.

This means that there's a range of ways to load a dbt model as input to a Python function. For example, you might want to load the contents as a Pandas dataframe or into a PySpark session. You can specify this loading behavior on each downstream asset.

For example, if you wanted to consume a dbt model with the asset key my_dbt_model as a Pandas dataframe, that would look something like the following:

@asset(
    ins={"my_dbt_model": AssetIn(input_manager_key="pandas_df_manager")},
)
def my_downstream_asset(my_dbt_model):
    # my_dbt_model is a Pandas dataframe
    return my_dbt_model.where(foo="bar")

Defining an I/O manager#

To materialize your dbt assets, you need to tell Dagster how to handle the assets' inputs and outputs. You can do this using an I/O manager.

The implementation of your I/O manager depends on:

  • The Python object you want to use to represent your table, and
  • The database that dbt writes tables to

A simple I/O manager implementation that loads data from a dbt-managed table into a Pandas dataframe would look something like the following:

import pandas as pd

from dagster import ConfigurableIOManager

class PandasIOManager(ConfigurableIOManager):
    connection_str: str

    def handle_output(self, context, obj):
        # dbt handles outputs for us
        pass

    def load_input(self, context) -> pd.DataFrame:
        """Load the contents of a table as a pandas DataFrame."""
        table_name = context.asset_key.path[-1]
        return pd.read_sql(f"SELECT * FROM {table_name}", con=self.connection_str)

Once the I/O manager is defined, you can supply it like any other resource when calling with_resources :

from dagster_dbt import DbtCli, load_assets_from_dbt_project

from dagster import Definitions

defs = Definitions(
    assets=load_assets_from_dbt_project(...),
    resources={
        "dbt": DbtCli(project_dir="path/to/dbt_project"),
        "pandas_df_manager": PandasIOManager(connection_str=...),
    },
)