Auto-Materializing Assets
Experimental
#

You can set up Dagster to automatically materialize assets when criteria are met. This enables a declarative approach to asset scheduling – instead of defining imperative workflows to materialize your assets, you just describe the conditions under which they should be materialized.

At a high-level, there are two factors that can be used to determine when an asset is auto-materialized:

  1. Whether upstream data has changed
  2. Any FreshnessPolicys set on the asset or downstream assets

Assets can be auto-materialized "eagerly" – i.e. immediately after upstream changes occur. Or they can be auto-materialized "lazily" – i.e. by waiting until downstream FreshnessPolicys dictate that they need to be fresh. Or a mixture of both.

Turning on auto-materializing#

To enable assets to be automatically materialized, you need to first flip a toggle in the Dagster UI.

  • If you're using an open source Dagster deployment, you can get to this toggle by clicking "Deployment" in the top navigation pane and then clicking on the "Daemons" tab.
  • If you're using Dagster Cloud, you can get to this toggle by clicking "Deployment" in the top navigation pane, then clicking on the "Agents" tab, then looking under "Cloud service statuses".

First time using auto-materialize? Have a large number of partitioned assets? We recommend starting small: add auto-materialize policies only to a subset of your assets and then expand incrementally. Auto-materializing has a startup cost that’s roughly proportional to the number of materialized partitions of assets that are ancestors of assets with auto-materialize policies. Turning on auto-materializing with large numbers of partitioned assets at once can overload the database and make the database unavailable to other Dagster components that rely on it.

Auto-materialize policies#

You can set up an asset to be auto-materialized by assigning it an AutoMaterializePolicy. In this example, we use AutoMaterializePolicy.eager to indicate that, any time that asset1 is materialized, asset2 should be automatically materialized right after:

from dagster import AutoMaterializePolicy, asset


@asset
def asset1():
    ...


@asset(auto_materialize_policy=AutoMaterializePolicy.eager())
def asset2(asset1):
    ...

This example assumes that asset1 will be materialized in some other way - e.g. manually, via a sensor, or via a schedule.

Adding an auto-materialize policy to multiple assets at once#

If you want to apply the same AutoMaterializePolicy to a set of assets, you can use the auto_materialize_policy when loading them with functions like load_assets_from_current_module and load_assets_from_package_module.

from dagster import (
    AutoMaterializePolicy,
    Definitions,
    asset,
    load_assets_from_current_module,
)


@asset
def asset1():
    ...


@asset
def asset2(asset1):
    ...


defs = Definitions(
    assets=load_assets_from_current_module(
        auto_materialize_policy=AutoMaterializePolicy.eager(),
    )
)

Auto-materialize policies and freshness policies#

Instead of auto-materializing downstream assets immediately after new upstream data arrives, you can use AutoMaterializePolicy.lazy to materialize assets only when needed to meet an asset's FreshnessPolicy. This allows avoiding unnecessary materializations.

In this example, even if asset1 is materialized every hour, asset2 will only be materialized roughly once per day:

from dagster import AutoMaterializePolicy, FreshnessPolicy, asset


@asset
def asset1():
    ...


@asset(
    auto_materialize_policy=AutoMaterializePolicy.lazy(),
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=24 * 60),
)
def asset2(asset1):
    ...

Freshness policies express how fresh an asset needs to be relative to data at the root of the graph. This means that, for an asset to meet it's freshness policy, both it and upstream assets need to be materialized in time.

Setting a lazy auto-materialize policy on an asset allows it to be auto-materialized to help downstream assets meet their freshness policies. In this example, both asset2 and asset3 will be auto-materialized up to once per day, to help asset3 meet its freshness policy. Conversely, if asset2, did not have an auto-materialize policy, then asset3 would never become fresh, unless asset2 were materialized in some other way.

from dagster import AutoMaterializePolicy, FreshnessPolicy, asset


@asset
def asset1():
    ...


@asset(auto_materialize_policy=AutoMaterializePolicy.lazy())
def asset2(asset1):
    ...


@asset(
    auto_materialize_policy=AutoMaterializePolicy.lazy(),
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=24 * 60),
)
def asset3(asset2):
    ...

If multiple assets with freshness policies depend on the same upstream asset, Dagster will try to intelligently materialize the upstream asset at times that allow it to minimize the number of runs of the upstream asset, while meeting the downstream freshness policies.

Auto-materialize policies and data versions#

Observable source assets are assets that your data pipeline doesn't materialize, but that you provide a function for that can tell when they've changed. If you set an AutoMaterializePolicy on an asset that's downstream of an observable source asset, then changes to the source asset will be treated as new upstream data that can cause the downstream asset to be auto-materialized.

In this example, we check every minute to see whether source_file was modified. If it was, then the AutoMaterializePolicy on asset1 will cause it to be materialized.

import os

from dagster import AutoMaterializePolicy, DataVersion, asset, observable_source_asset


@observable_source_asset(auto_observe_interval_minutes=1)
def source_file():
    return DataVersion(str(os.path.getmtime("source_file.csv")))


@asset(
    deps=[source_file],
    auto_materialize_policy=AutoMaterializePolicy.eager(),
)
def asset1():
    ...

Auto-materialization and partitions#

Partitioned assets can have AutoMaterializePolicys. Partitions are eligible for auto-materialization when either:

  • They are missing - i.e. have never been materialized before
  • New data has arrived in the partitions that are upstream of them

For time-partitioned assets (daily, hourly, etc.), only the last partition will be auto-materialized. To materialize earlier partition, launch a backfill.

Here's a pipeline with two daily-partitioned assets that have eager auto-materialize policies. At the end of each day, a partition for that day will be added to the set of partitions for each of the assets. Dagster will notice that the new partitions exist, but have no materializations, and then auto-materialize them.

from dagster import AutoMaterializePolicy, DailyPartitionsDefinition, asset


@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2020-10-10"),
    auto_materialize_policy=AutoMaterializePolicy.eager(),
)
def asset1():
    ...


@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2020-10-10"),
    auto_materialize_policy=AutoMaterializePolicy.eager(),
)
def asset2(asset1):
    ...

If the last partition of asset1 is re-materialized, e.g. manually from the UI, then the corresponding partition of asset2 will be auto-materialized after.

StaticPartitionsDefinitions and DynamicPartitionsDefinitions do not have this limit, so all partitions will be automatically-materialized.

Rules of auto-materialization#

  • Assets will not be auto-materialized if any of their ancestors are currently being auto-materialized.
  • Assets will not be auto-materialized if any of their ancestors have not yet incorporated the newest materialization of their upstream assets.
  • If the run to auto-materialize an asset fails, it can be retried if run retries are configured. Otherwise, Dagster won't try to auto-materialize that asset again until it would auto-materialize it if the failed run had succeeded. I.e., if an asset has a daily freshness policy, and it fails, Dagster won't auto-materialize it again until the next day.
  • By default, no more than one materialization of a given asset will be kicked off per minute. Further materialization requests will be discarded, and will require manual backfilling to complete. This can be configured using the max_materializations_per_minute argument to AutoMaterializePolicy.eager and AutoMaterializePolicy.lazy.

Run tags#

Runs triggered by auto-materialize policies are tagged with dagster/auto_materialize: true. Additional tags can be configured in dagster.yaml (OSS) or deployment settings (Cloud).