Prefer videos? Check out our explainer and demo videos to get a quick look at Software-defined assets.
An asset is an object in persistent storage, such as a table, file, or persisted machine learning model. A software-defined asset is a Dagster object that couples an asset to the function and upstream assets that are used to produce its contents.
Software-defined assets enable a declarative approach to data management, in which code is the source of truth on what data assets should exist and how those assets are computed.
A software-defined asset includes the following:
An AssetKey, which is a handle for referring to the asset.
A set of upstream asset keys, which refer to assets that the contents of the software-defined asset are derived from.
An op, which is a function responsible for computing the contents of the asset from its upstream dependencies.
Note: A crucial distinction between software-defined assets and ops is that software-defined assets know about their dependencies, while ops do not. Ops aren't connected to dependencies until they're placed inside a graph.
Materializing an asset is the act of running its op and saving the results to persistent storage. You can initiate materializations from the Dagster UI or by invoking Python APIs. By default, assets are materialized to pickle files on your local filesystem, but materialization behavior is fully customizable using I/O managers. It's possible to materialize an asset in multiple storage environments, such as production and staging.
A class that describes an asset, but doesn't define how to compute it. SourceAssets are used to represent assets that other assets or jobs depend on, in settings where they can't be materialized themselves.
The easiest way to create a software-defined asset is with the @asset decorator.
from dagster import asset
@assetdefmy_asset():return[1,2,3]
By default, the name of the decorated function, my_asset, is used as the asset key. The decorated function forms the asset's op: it's responsible for producing the asset's contents. The asset in this example doesn't depend on any other assets.
You can define a dependency between two assets by passing the upstream asset to the deps parameter in the downstream asset's @asset decorator.
In this example, the asset sugary_cereals creates a new table (sugary_cereals) by selecting records from the cereals table. Then the asset shopping_list creates a new table (shopping_list) by selecting records from sugary_cereals:
from dagster import asset
@assetdefsugary_cereals()->None:
execute_query("CREATE TABLE sugary_cereals AS SELECT * FROM cereals")@asset(deps=[sugary_cereals])defshopping_list()->None:
execute_query("CREATE TABLE shopping_list AS SELECT * FROM sugary_cereals")
When using basic dependencies, as above, it's expected that if you need direct access to the contents of the asset, the code you include inside your @asset-decorated function will load the data from the upstream asset. Dagster alternatively allows you to delegate loading data to an I/O manager. To do this, you express the dependency by using the upstream asset name as the name of one of the arguments on the decorated function.
In the following example, downstream_asset depends on upstream_asset. That means that the contents of upstream_asset are provided to the function that computes the contents of downstream_asset.
If defining dependencies by matching argument names to upstream asset names feels too magical for your tastes, you can also define dependencies in a more explicit way:
from dagster import AssetIn, asset
@assetdefupstream_asset():return[1,2,3]@asset(ins={"upstream": AssetIn("upstream_asset")})defdownstream_asset(upstream):return upstream +[4]
In this case, ins={"upstream": AssetIn("upstream_asset")} declares that the contents of the asset with the key upstream_asset will be provided to the function argument named upstream.
Asset keys can also be provided to AssetIn to explicitly identify the asset. For example:
from dagster import AssetIn, asset
# If the upstream key has a single segment, you can specify it with a string:@asset(ins={"upstream": AssetIn(key="upstream_asset")})defdownstream_asset(upstream):return upstream +[4]# If it has multiple segments, you can provide a list:@asset(ins={"upstream": AssetIn(key=["some_db_schema","upstream_asset"])})defanother_downstream_asset(upstream):return upstream +[10]
Software-defined assets frequently depend on assets that are generated elsewhere. Using SourceAsset, you can include these external assets and allow your other assets to depend on them.
For example:
from dagster import AssetKey, SourceAsset, asset
my_source_asset = SourceAsset(key=AssetKey("a_source_asset"))@asset(deps=[my_source_asset])defmy_derived_asset():return execute_query("SELECT * from a_source_asset").as_list()+[4]
You can also define a dependency on a SourceAsset that will load the data of the asset:
Note: The source asset's asset key must be provided as the argument to downstream assets. In the previous example, the asset key is a_source_asset and not my_source_asset.
You can also re-use assets across code locations by including them as source assets. Consider this example for code_location_1:
Using source assets has a few advantages over having the code inside of an asset's op load the data:
The UI can show asset lineage that includes the source assets. If different asset definitions in different code locations have the same asset key as a SourceAsset and both code locations are loaded into the underlying webserver, the UI can represent the asset lineage across the code locations. This can be accomplished using workspace files.
Dagster can use data-loading code factored into an IOManager to load the contents of the source asset.
Asset dependencies can be written in a consistent way, independent of whether they're downstream from a source asset or a derived asset. This makes it easy to swap out a source asset for a derived asset and vice versa.
If you'd like to define more complex assets, Dagster offers augmented software-defined asset abstractions:
Multi-assets: A set of software-defined assets that are all updated by the same op or graph.
Graph-backed assets: An asset whose computations are separated into multiple ops that are combined to build a graph. If the graph outputs multiple assets, the graph-backed asset is a multi-asset.
Like ops, assets in Dagster can specify a config schema. The configuration system is explained in detail in the Config schema documentation.
Asset functions can specify an annotated config parameter for the assets's configuration. The config class, which subclasses Config (which inherits from pydantic.BaseModel) specifies the configuration schema for the asset.
For example, the following downstream asset queries an API endpoint defined through configuration:
from dagster import Config, asset
@assetdefmy_upstream_asset()->int:return5classMyDownstreamAssetConfig(Config):
api_endpoint:str@assetdefmy_downstream_asset(config: MyDownstreamAssetConfig, my_upstream_asset:int)->int:
data = requests.get(f"{config.api_endpoint}/data").json()return data["value"]+ my_upstream_asset
When writing an asset, users can optionally provide a first parameter, context. When this parameter is supplied, Dagster will supply a context object to the body of the asset which provides access to system information like loggers and the current run id. Since a software-defined asset contains an op, this is an OpExecutionContext.
For example, to access the logger and log a info message:
@assetdefcontext_asset(context):
context.log.info("My run ID is {context.run_id}")...
In some cases, an asset may not need to be updated in storage each time the decorated function is executed. You can use the output_required parameter along with yield syntax to implement this behavior.
If the output_required parameter is set to False, and your function does not yield an Output object, then no asset materialization event will be created, the I/O manager will not be invoked, downstream assets will not be materialized, and asset sensors monitoring the asset will not trigger.
import random
from dagster import Output, asset
@asset(output_required=False)defmay_not_materialize():# to simulate an asset that may not always materialize.if random.randint(1,10)<5:yield Output([1,2,3,4])@assetdefdownstream(may_not_materialize):# will not run when may_not_materialize doesn't materialize the assetreturn may_not_materialize +[5]
Assets may be assigned a code_version. Versions let you help Dagster track what assets haven't been re-materialized since their code has changed, and avoid performing redundant computation.
When an asset with a code version is materialized, the generated AssetMaterialization is tagged with the version. The UI will indicate when an asset has a different code version than the code version used for its most recent materialization.
Multi-assets may assign different code versions for each of their outputs:
Just as with regular assets, these versions are attached to the AssetMaterialization objects for each of the constituent assets and represented in the UI.
To view and materialize assets in the UI, you can point the underlying webserver at a module that contains asset definitions or lists of asset definitions as module-level attributes:
dagster dev -m module_with_assets
If you want the UI to show both assets and jobs that target the assets, you can place the assets and jobs together inside a Definitions object. For example:
A Definitions object defines a code location, which is a collection of assets, jobs, resources, and schedules. Refer to the Code locations documentation for more info.
In the UI, you can launch runs that materialize assets by:
Navigating to the Asset details page for the asset and clicking the Materialize button in the upper right corner.
Navigating to the graph view of the Asset catalog page and clicking the Materialize button in the upper right corner. You can also click on individual assets to collect a subset to materialize.
Jobs that target assets can materialize a fixed selection of assets each time they run and be placed on schedules and sensors. Refer to the Jobs documentation for more info and examples.
To help keep your assets tidy, you can organize them into groups. Grouping assets by project, concept, and so on simplifies keeping track of them in the UI. Each asset is assigned to a single group, which by default is called "default".
This recommended approach constructs a group of assets from a specified module in your project. Using the load_assets_from_package_module function, you can import all assets in a module and apply a grouping:
from my_package import cereal
cereal_assets = load_assets_from_package_module(
cereal,
group_name="cereal_assets",)
If any of the assets in the module already has a group_name explicitly set on it, you'll encounter a Group name already exists on assets error.
To view your asset groups in the UI, open the left navigation by clicking the menu icon in the top left corner. As asset groups are grouped in code locations, you may need to open a code location to view its asset groups.
Click the asset group to open a dependency graph for all assets in the group. For example, in the following image, the dependency graph for the activity_analytics asset group is currently displayed:
You can manually provide values for those dependencies in your unit test. This allows you to test assets in isolation from one another:
deftest_more_complex_asset():
result = more_complex_asset([0])assert result ==[0,4,5,6]
If you use config of resources in your asset, they will be provided automatically during execution. When writing unit tests, you may provide them directly when invoking the asset function:
classMyConfig(Config):
api_url:strclassMyAPIResource(ConfigurableResource):defquery(self, url)-> Dict[str, Any]:return requests.get(url).json()@assetdefuses_config_and_resource(config: MyConfig, my_api: MyAPIResource):return my_api.query(config.api_url)deftest_uses_resource()->None:
result = uses_config_and_resource(
config=MyConfig(api_url="https://dagster.io"), my_api=MyAPIResource())assert result =={"foo":"bar"}
If you use a context object in your function, you can use build_op_context to generate the context object, because under the hood the function decorated by @asset is an op.
Consider the following asset that uses a context object:
It's sometimes useful to load an asset as a Python object outside of a Dagster run, such as performing exploratory analysis on data inside a Jupyter notebook.
Assets are often objects in systems with hierarchical namespaces, like filesystems. Because of this, it often makes sense for an asset key to be a list of strings, instead of just a single string. To define an asset with a multi-part asset key, use the key_prefix argument-- this can be either a list of strings or a single string with segments delimited by "/". The full asset key is formed by prepending the key_prefix to the asset name (which defaults to the name of the decorated function).
from dagster import AssetIn, asset
@asset(key_prefix=["one","two","three"])defupstream_asset():return[1,2,3]@asset(ins={"upstream_asset": AssetIn(key_prefix="one/two/three")})defdownstream_asset(upstream_asset):return upstream_asset +[4]
Dagster supports attaching arbitrary metadata to asset materializations. This metadata will be displayed on the "Activity" tab of the "Asset Details" page in the UI. If it's numeric, it will be plotted. To attach metadata, your asset's op can return an Output object that contains the output value and a dictionary of metadata:
from pandas import DataFrame
from dagster import Output, asset
@assetdeftable1()-> Output[DataFrame]:
df = DataFrame({"col1":[1,2],"col2":[3,4]})return Output(df, metadata={"num_rows": df.shape[0]})
This works even if you're not returning an object from your decorated function:
from dagster import Output, asset
@assetdeftable1()-> Output[None]:...# write out some data to table1return Output(None, metadata={"num_rows":25})
Recording materialization metadata using I/O managers#
Sometimes it's useful to record the same metadata for all assets that are stored in the same way. E.g. if you have a set of assets that are all stored on a filesystem, you might want to record the number of bytes they occupy on disk every time one is materialized. You can achieve this by recording metadata from an I/O manager that's shared by the assets.
Dagster supports attaching arbitrary metadata to asset definitions. This metadata will be displayed on the "Definition" tab of the "Asset Details" page in the UI. This is useful for metadata that describes how the asset should be handled, rather than metadata that describes the contents that were produced by a particular run.
To attach metadata, supply a metadata dictionary to the asset:
Interested in learning more about software-defined assets and working through a more complex example? Check out our guide on software-defined assets and our example project that integrates software-defined assets with other Modern Data Stack tools.