Using dbt with Dagster, part three: Create and materialize upstream assets#

This is part three of the Using dbt with Dagster software-defined assets tutorial.

By this point, you've set up a dbt project and loaded dbt models into Dagster as assets.

In this step, you'll:


Step 1: Create upstream Dagster assets#

To fetch the data the dbt models require, we'll write two Dagster assets: one for customers and one for orders.

In /tutorial_template/tutorial_dbt_dagster/assets/__init__.py:

  1. Replace the import section at the top of the file with the following:

    import pandas as pd
    from dagster_dbt import load_assets_from_dbt_project
    
    from dagster import asset, file_relative_path
    

    In this step, we've added the pandas library and Dagster's asset.

  2. Add the following code before load_assets_from_dbt_project:

    @asset(key_prefix=["jaffle_shop"])
    def customers_raw() -> pd.DataFrame:
        data = pd.read_csv("https://docs.dagster.io/assets/customers.csv")
        return data
    
    @asset(key_prefix=["jaffle_shop"])
    def orders_raw() -> pd.DataFrame:
        data = pd.read_csv("https://docs.dagster.io/assets/orders.csv")
        return data
    

Let's take a closer look at the argument we've provided:

  • key_prefix - When the assets are materialized, Dagster will store them in DuckDB in the schema defined by the last value in key_prefix. In this case, that's jaffle_shop. The tables will have the same names as the assets that produced them, which are customers_raw and orders_raw.

    Because these tables will become the source data for the stg_customers.sql and stg_orders.sql models in the dbt project, the names of the assets must match the table names specified in /tutorial_template/jaffle_shop/models/sources.yml, which you configured in part one of this tutorial.

At this point, the /tutorial_template/tutorial_dbt_dagster/assets/__init__.py file should look like this:

import pandas as pd
from dagster_dbt import load_assets_from_dbt_project

from dagster import asset, file_relative_path


@asset(key_prefix=["jaffle_shop"])
def customers_raw() -> pd.DataFrame:
   data = pd.read_csv("https://docs.dagster.io/assets/customers.csv")
   return data


@asset(key_prefix=["jaffle_shop"])
def orders_raw() -> pd.DataFrame:
   data = pd.read_csv("https://docs.dagster.io/assets/orders.csv")
   return data


DBT_PROJECT_PATH = file_relative_path(__file__, "../../jaffle_shop")
DBT_PROFILES = file_relative_path(__file__, "../../jaffle_shop/config")

dbt_assets = load_assets_from_dbt_project(
    project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILES, key_prefix=["jaffle_shop"]
)

Step 2: Supply an I/O manager to the assets#

To materialize the assets, we need to tell Dagster how to handle the assets' inputs and outputs. We can do this using an I/O manager.

In this step, we'll supply the duckdb_io_manager to our assets. This resource is an I/O manager that, when assets are materialized, allows:

  • Upstream assets (customers_raw, orders_raw) to load data into DuckDB. In this example, the duckdb_io_manager uses DuckDBPandasTypeHandler to store the pandas DataFrames used in our assets as CSVs and load them into DuckDB.
  • Downstream assets to read data from DuckDB. We'll add the downstream asset in the next section.

In /tutorial_template/tutorial_dbt_dagster/__init__.py, replace its contents with the following:

import os

from dagster_dbt import DbtCli
from tutorial_dbt_dagster import assets
from tutorial_dbt_dagster.assets import DBT_PROFILES, DBT_PROJECT_PATH

from dagster_duckdb_pandas import duckdb_pandas_io_manager

from dagster import Definitions, load_assets_from_modules


resources = {
    "dbt": DbtCli(
        project_dir=DBT_PROJECT_PATH,
        profiles_dir=DBT_PROFILES,
    ),
    "io_manager": duckdb_pandas_io_manager.configured(
        {"database": os.path.join(DBT_PROJECT_PATH, "tutorial.duckdb")}
    ),
}

defs = Definitions(assets=load_assets_from_modules([assets]), resources=resources)

Step 3: Materialize the assets using the Dagster UI#

Now that you've created assets and resources, it's time to materialize the assets! Materializing an asset runs the op it contains and saves the results to persistent storage. In this tutorial, we're saving asset outputs to DuckDB.

  1. In the UI, click the Reload definitions button. This ensures that the UI picks up the changes you made in the previous steps.

    At this point, the customers_raw and orders_raw assets should display above stg_customers and stg_orders as upstream dependencies:

    Asset graph in the Dagster UI, showing dbt models and unmaterialized assets
  2. Click the Materialize all button near the top right corner of the page, which will launch a run to materialize the assets. When finished, the Materialized and Latest Run attributes in the asset will be populated:

    Asset graph in the Dagster UI, showing materialized assets

After the run completes, you can:

  • Click the asset to open a sidebar containing info about the asset, including its last materialization stats and a link to view the Asset details page
  • Click the ID of the Latest Run - in the above image, that's 651489a2 - in an asset to view the Run details page. This page contains detailed info about the run, including timing information, errors, and logs.

What's next?#

At this point, you've built and materialized two upstream Dagster assets, providing source data to your dbt models. In the last section of the tutorial, we'll show you how to add a downstream asset to the pipeline.