Generates a set of software-defined assets that reflect the models in the project. These assets will share the same underlying operation, which will invoke dbt to run the models represented by the loaded assets.
Loading models using load_assets_from_dbt_manifest#
For larger dbt projects, the overhead involved with recompiling the entire project may be a concern. In these cases, you can load dbt models from an existing dbt manifest.json file using the load_assets_from_dbt_manifest function:
If you make any changes to your dbt project that change the structure of the project (such as changing the dependencies of a model or adding a new one), you'll need to regenerate your manifest file for those changes to be reflected in Dagster.
Assets loaded from dbt require a dbt resource, which is responsible for firing off dbt CLI commands. The dagster-dbt integration provides the DbtCli for this purpose. This resource can be configured with CLI flags that are passed into every dbt invocation.
The most important flag to set is the project_dir flag, which points Dagster at the directory of your dbt project. You can also use the target flag to point at an explicit dbt target. For a full list of configuration options, refer to the DbtCli API docs.
You can configure this resource and add it to your dbt assets by doing the following:
import os
from dagster_dbt import DbtCli, load_assets_from_dbt_project
from dagster import Definitions
DBT_PROJECT_PATH ="path/to/dbt_project"
DBT_TARGET ="hive"if os.getenv("EXECUTION_ENV")=="prod"else"duckdb"
defs = Definitions(
assets=load_assets_from_dbt_project(DBT_PROJECT_PATH),
resources={"dbt": DbtCli(project_dir=DBT_PROJECT_PATH, target=DBT_TARGET)},)
In Dagster, each asset has an asset key to identify it. Dagster automatically generates these keys for each dbt node in the project as well as the sources for each node.
For models, seeds, and snapshots, the default asset key will be the configured schema for that node (if any), concatenated with the name of the node.
For example, if you have configured a custom schema for a subdirectory in your dbt_project.yml file:
models:my_project:marketing:+schema: marketing
Then the asset key for a model named some_model will be marketing/some_model. If you haven't configured a custom schema, then the asset key will be some_model.
A common pattern is to use the prefix of an asset key to indicate what database an asset is stored in. For example, you might want all of your assets stored in Snowflake to start with the prefix snowflake.
Note: The key_prefix argument only applies to assets loaded from this method. If you want to apply a prefix to the source keys that Dagster generates, pass in a source_key_prefix argument:
Note that Dagster allows the optional specification of a code_version for each software-defined asset, which are used to track changes. The code_version for an asset arising from a dbt model is defined automatically as the hash of the SQL defining the DBT model. This allows the asset graph in the UI to indicate which dbt models have new SQL since they were last materialized.
Dagster parses information about assets that are upstream of specific dbt models from the dbt project itself. Whenever a model is downstream of a dbt source, that source will be parsed as an upstream asset.
For example, if you defined a source in your sources.yml file like this:
sources:-name: jaffle_shop
tables:-name: orders
and use it in a model:
select*from {{ source("jaffle_shop","orders") }}
where foo=1
Then the asset created for that model will be given an upstream asset key of jaffle_shop/orders. In many cases, this upstream asset might also be managed by Dagster.
If you add an asset definition to your repository which produces jaffle_shop/orders, then this asset will be upstream of your dbt model:
Dagster allows you to define assets that are downstream of specific dbt models. One property of dbt-based assets is that the external tool - in this case, dbt - handles storing each model in the database internally, rather than Dagster directly storing the tables that are updated.
This means that there's a range of ways to load a dbt model as input to a Python function. For example, you might want to load the contents as a Pandas dataframe or into a PySpark session. You can specify this loading behavior on each downstream asset.
For example, if you wanted to consume a dbt model with the asset key my_dbt_model as a Pandas dataframe, that would look something like the following:
@asset(
ins={"my_dbt_model": AssetIn(input_manager_key="pandas_df_manager")},)defmy_downstream_asset(my_dbt_model):# my_dbt_model is a Pandas dataframereturn my_dbt_model.where(foo="bar")
To materialize your dbt assets, you need to tell Dagster how to handle the assets' inputs and outputs. You can do this using an I/O manager.
The implementation of your I/O manager depends on:
The Python object you want to use to represent your table, and
The database that dbt writes tables to
A simple I/O manager implementation that loads data from a dbt-managed table into a Pandas dataframe would look something like the following:
import pandas as pd
from dagster import ConfigurableIOManager
classPandasIOManager(ConfigurableIOManager):
connection_str:strdefhandle_output(self, context, obj):# dbt handles outputs for uspassdefload_input(self, context)-> pd.DataFrame:"""Load the contents of a table as a pandas DataFrame."""
table_name = context.asset_key.path[-1]return pd.read_sql(f"SELECT * FROM {table_name}", con=self.connection_str)
Once the I/O manager is defined, you can supply it like any other resource when calling with_resources :
from dagster_dbt import DbtCli, load_assets_from_dbt_project
from dagster import Definitions
defs = Definitions(
assets=load_assets_from_dbt_project(...),
resources={"dbt": DbtCli(project_dir="path/to/dbt_project"),"pandas_df_manager": PandasIOManager(connection_str=...),},)