@asset(key_prefix=["jaffle_shop"])defcustomers_raw()-> pd.DataFrame:
data = pd.read_csv("https://docs.dagster.io/assets/customers.csv")return data
@asset(key_prefix=["jaffle_shop"])deforders_raw()-> pd.DataFrame:
data = pd.read_csv("https://docs.dagster.io/assets/orders.csv")return data
Let's take a closer look at the argument we've provided:
key_prefix - When the assets are materialized, Dagster will store them in DuckDB in the schema defined by the last value in key_prefix. In this case, that's jaffle_shop. The tables will have the same names as the assets that produced them, which are customers_raw and orders_raw.
Because these tables will become the source data for the stg_customers.sql and stg_orders.sql models in the dbt project, the names of the assets must match the table names specified in /tutorial_template/jaffle_shop/models/sources.yml, which you configured in part one of this tutorial.
At this point, the /tutorial_template/tutorial_dbt_dagster/assets/__init__.py file should look like this:
import pandas as pd
from dagster_dbt import load_assets_from_dbt_project
from dagster import asset, file_relative_path
@asset(key_prefix=["jaffle_shop"])defcustomers_raw()-> pd.DataFrame:
data = pd.read_csv("https://docs.dagster.io/assets/customers.csv")return data
@asset(key_prefix=["jaffle_shop"])deforders_raw()-> pd.DataFrame:
data = pd.read_csv("https://docs.dagster.io/assets/orders.csv")return data
DBT_PROJECT_PATH = file_relative_path(__file__,"../../jaffle_shop")
DBT_PROFILES = file_relative_path(__file__,"../../jaffle_shop/config")
dbt_assets = load_assets_from_dbt_project(
project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILES, key_prefix=["jaffle_shop"])
To materialize the assets, we need to tell Dagster how to handle the assets' inputs and outputs. We can do this using an I/O manager.
In this step, we'll supply the duckdb_io_manager to our assets. This resource is an I/O manager that, when assets are materialized, allows:
Upstream assets (customers_raw, orders_raw) to load data into DuckDB. In this example, the duckdb_io_manager uses DuckDBPandasTypeHandler to store the pandas DataFrames used in our assets as CSVs and load them into DuckDB.
Downstream assets to read data from DuckDB. We'll add the downstream asset in the next section.
In /tutorial_template/tutorial_dbt_dagster/__init__.py, replace its contents with the following:
import os
from dagster_dbt import DbtCli
from tutorial_dbt_dagster import assets
from tutorial_dbt_dagster.assets import DBT_PROFILES, DBT_PROJECT_PATH
from dagster_duckdb_pandas import duckdb_pandas_io_manager
from dagster import Definitions, load_assets_from_modules
resources ={"dbt": DbtCli(
project_dir=DBT_PROJECT_PATH,
profiles_dir=DBT_PROFILES,),"io_manager": duckdb_pandas_io_manager.configured({"database": os.path.join(DBT_PROJECT_PATH,"tutorial.duckdb")}),}
defs = Definitions(assets=load_assets_from_modules([assets]), resources=resources)
Step 3: Materialize the assets using the Dagster UI#
Now that you've created assets and resources, it's time to materialize the assets! Materializing an asset runs the op it contains and saves the results to persistent storage. In this tutorial, we're saving asset outputs to DuckDB.
In the UI, click the Reload definitions button. This ensures that the UI picks up the changes you made in the previous steps.
At this point, the customers_raw and orders_raw assets should display above stg_customers and stg_orders as upstream dependencies:
Click the Materialize all button near the top right corner of the page, which will launch a run to materialize the assets. When finished, the Materialized and Latest Run attributes in the asset will be populated:
After the run completes, you can:
Click the asset to open a sidebar containing info about the asset, including its last materialization stats and a link to view the Asset details page
Click the ID of the Latest Run - in the above image, that's 651489a2 - in an asset to view the Run details page. This page contains detailed info about the run, including timing information, errors, and logs.
At this point, you've built and materialized two upstream Dagster assets, providing source data to your dbt models. In the last section of the tutorial, we'll show you how to add a downstream asset to the pipeline.