A software-defined asset can represent a collection of partitions that can be tracked and materialized independently. In many ways, each partition functions like its own mini-asset, but they all share a common materialization function and dependencies. Typically, each partition will correspond to a separate file, or a slice of a table in a database.
A common use is for each partition to represent all the records in a data set that fall within a particular time window, e.g. hourly, daily or monthly. Alternatively, each partition can represent a region, a customer, an experiment - any dimension along which you want to be able to materialize and monitor independently. An asset can also be partitioned along multiple dimensions, e.g. by region and by hour.
A graph of assets with the same partitions implicitly forms a partitioned data pipeline, and you can launch a run that selects multiple assets and materializes the same partition in each asset.
Similarly, a partitioned job is a job where each run corresponds to a partition. It's common to construct a partitioned job that materializes a single partition across a set of partitioned assets every time it runs.
Having defined a partitioned asset or job, you can:
View runs by partition in the Dagster UI.
Define a schedule that fills in a partition each time it runs. For example, a job might run each day and process the data that arrived during the previous day.
Launch backfills, which are sets of runs that each process a different partition. For example, after making a code change, you might want to run your job on all partitions instead of just one of them.
A partitions definition whose partitions can be dynamically added and removed.
A software-defined asset can be assigned a PartitionsDefinition, which determines the set of partitions that compose it. If the asset is stored in a filesystem or an object store, then each partition will typically correspond to a file or object. If the asset is stored in a database, then each partition will typically correspond to a range of values in a table that fall within a particular window.
Once an asset has a set of partitions, you can launch materializations of individual partitions and view the materialization history by partition in the UI.
For example, below is a software-defined asset with a partition for each day since the first day of 2022. Materializing partition 2022-07-23 of this asset would result in fetching data from the URL coolweatherwebsite.com/weather_obs\&date=2022-07-23 and storing it at the path weather_observations/2022-07-23.csv.
In the above code snippet, the body of the decorated function writes out data to a file, but it's common to delegate this I/O to an I/O manager. Dagster's built-in I/O managers know how to handle partitioned assets. You can also handle them when writing your own I/O manager, following the instructions here.
Here's a software-defined asset that relies on an I/O manager to store its output:
import pandas as pd
from dagster import DailyPartitionsDefinition, asset
@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))defmy_daily_partitioned_asset(context)-> pd.DataFrame:
partition_date_str = context.asset_partition_key_for_output()return pd.read_csv(f"coolweatherwebsite.com/weather_obs&date={partition_date_str}")
If you're using the default I/O manager, materializing partition 2022-07-23 of this asset would store the output DataFrame in a pickle file at a path like my_daily_partitioned_asset/2022-07-23.
The MultiPartitionsDefinition class accepts a mapping of dimension name to partitions definition, creating a partition for each unique combination of dimension partitions. For example, the asset below would contain a partition for each combination of color and date: red|2022-01-01, yellow|2022-01-01, blue|2022-01-01, red|2022-01-02 and so on.
Currently, Dagster only allows two-dimensional multipartitions definitions.
Notice the code snippet above fetches the partition key from the asset context. Multi-dimensional partition keys are returned as MultiPartitionKey objects, which contain a MultiPartitionKey.keys_by_dimension method that returns the key per dimension. This object can also be passed into partition key execution parameters:
from dagster import MultiPartitionKey, materialize
result = materialize([multi_partitions_asset],
partition_key=MultiPartitionKey({"date":"2022-01-01","color":"red"}),)
Sometimes you don't know the set of partitions ahead of time when you're defining your assets. For example, maybe you want to add a new partition every time a new data file lands in a directory, or every time you want to experiment with a new set of hyperparameters. In these cases, you can use a DynamicPartitionsDefinition.
For a given dynamic partition set, partition keys can be added and removed. One common pattern is detecting the presence of a new partition through a sensor, adding the partition, and then triggering a run for that partition:
images_job = define_asset_job("images_job", AssetSelection.keys("images"), partitions_def=images_partitions_def
)@sensor(job=images_job)defimage_sensor(context):
new_images =[
img_filename
for img_filename in os.listdir(os.getenv("MY_DIRECTORY"))ifnot context.instance.has_dynamic_partition(
images_partitions_def.name, img_filename
)]return SensorResult(
run_requests=[
RunRequest(partition_key=img_filename)for img_filename in new_images
],
dynamic_partitions_requests=[
images_partitions_def.build_add_request(new_images)],)
To view all partitions for an asset, open the Definition tab of the asset's details page. The bar in the Partitions section represents all of the partitions for the asset.
In the following image, the partitions bar is entirely gray. This is because none of the partitions have been materialized:
When a partitioned asset depends on another partitioned asset, each partition in the downstream asset depends on a partition or multiple partitions in the upstream asset.
A few rules govern default partition-to-partition dependencies:
When the upstream asset and downstream asset have the same PartitionsDefinition, each partition in the downstream asset depends on the same partition in the upstream asset.
When the upstream asset and downstream asset are both time window-partitioned, each partition in the downstream asset depends on all partitions in the upstream asset that intersect its time window.
For example, if an asset with a DailyPartitionsDefinition depends on an asset with an HourlyPartitionsDefinition, then partition 2022-04-12 of the daily asset the would depend on 24 partitions of the hourly asset: 2022-04-12-00:00 through 2022-04-12-23:00.
You can override the default partition dependency rules by providing a PartitionMapping when specifying a dependency on an asset. For example, here's how to specify that each partition of a daily-partitioned asset depends on the prior day's partition in an upstream asset:
The most common kind of partitioned job is a time-partitioned job - each partition is a time window, and each run for a partition processes data within that time window.
Before we define a partitioned job, let's look at a non-partitioned job that computes some data for a given date:
from dagster import Config, job, op
classProcessDateConfig(Config):
date:str@opdefprocess_data_for_date(context, config: ProcessDateConfig):
date = config.date
context.log.info(f"processing data for {date}")@jobdefdo_stuff():
process_data_for_date()
It takes, as config, a string date. This piece of config defines which date to compute data for. For example, if you wanted to compute for May 5th, 2020, you would execute the graph with the following config:
With the job above, it's possible to supply any value for the date param, which means that, if you wanted to launch a backfill, Dagster wouldn't know what values to run it on. You can instead build a partitioned job that operates on a defined set of dates.
First, you define the PartitionedConfig. In this case, because each partition is a date, you can use the @daily_partitioned_config decorator. It defines the full set of partitions - every date between the start date and the current date, as well as how to determine the run config for a given partition.
from dagster import daily_partitioned_config
from datetime import datetime
@daily_partitioned_config(start_date=datetime(2020,1,1))defmy_partitioned_config(start: datetime, _end: datetime):return{"ops":{"process_data_for_date":{"config":{"date": start.strftime("%Y-%m-%d")}}}}
Then you can build a job that uses the PartitionedConfig by supplying it to the config argument when you construct the job:
In the UI, you can view runs by partition in the Partitions tab of a Job page:
In the "Run Matrix", each column corresponds to one of the partitions in the job. The time listed corresponds to the start time of the partition. Each row corresponds to one of the steps in the job. You can click on an individual box to navigate to logs and run information for the step.
You can view and use partitions in the UI Launchpad tab for a job. In the top bar, you can select from the list of all available partitions. Within the config editor, the config for the selected partition will be populated.
In the screenshot below, we select the 2020-01-02 partition, and we can see that the run config for the partition has been populated in the editor.
It's common that, when you have a partitioned job, you want to run it on a schedule. For example, if your job has a partition for each date, you likely want to run that job every day, on the partition for that day.
The build_schedule_from_partitioned_job function allows you to construct a schedule from a date partitioned job. It creates a schedule with an interval that matches the spacing of your partition. If you wanted to create a schedule for do_stuff_partitioned job defined above, you could write:
from dagster import build_schedule_from_partitioned_job, job
@job(config=my_partitioned_config)defdo_stuff_partitioned():...
do_stuff_partitioned_schedule = build_schedule_from_partitioned_job(
do_stuff_partitioned,)
Schedules can also be made from static partitioned jobs. If you wanted to make a schedule for the continent_job above that runs each partition, you could write:
from dagster import schedule, RunRequest
@schedule(cron_schedule="0 0 * * *", job=continent_job)defcontinent_schedule():for c in CONTINENTS:yield RunRequest(run_key=c, partition_key=c)
Or a schedule that will run a subselection of the partition
Invoking a PartitionedConfig object will directly invoke the decorated function.
If you want to check whether the generated run config is valid for the config of job, you can use the validate_run_config function.
from dagster import validate_run_config, daily_partitioned_config
from datetime import datetime
@daily_partitioned_config(start_date=datetime(2020,1,1))defmy_partitioned_config(start: datetime, _end: datetime):return{"ops":{"process_data_for_date":{"config":{"date": start.strftime("%Y-%m-%d")}}}}deftest_my_partitioned_config():# assert that the decorated function returns the expected output
run_config = my_partitioned_config(datetime(2020,1,3), datetime(2020,1,4))assert run_config =={"ops":{"process_data_for_date":{"config":{"date":"2020-01-03"}}}}# assert that the output of the decorated function is valid configuration for the# do_stuff_partitioned jobassert validate_run_config(do_stuff_partitioned, run_config)
If you want to test that your PartitionedConfig creates the partitions you expect, you can use the get_partition_keys or get_run_config_for_partition_key functions.
from dagster import Config
@daily_partitioned_config(start_date=datetime(2020,1,1), minute_offset=15)defmy_offset_partitioned_config(start: datetime, _end: datetime):return{"ops":{"process_data":{"config":{"start": start.strftime("%Y-%m-%d-%H:%M"),"end": _end.strftime("%Y-%m-%d-%H:%M"),}}}}classProcessDataConfig(Config):
start:str
end:str@opdefprocess_data(context, config: ProcessDataConfig):
s = config.start
e = config.end
context.log.info(f"processing data for {s} - {e}")@job(config=my_offset_partitioned_config)defdo_more_stuff_partitioned():
process_data()deftest_my_offset_partitioned_config():# test that the partition keys are what you expect
keys = my_offset_partitioned_config.get_partition_keys()assert keys[0]=="2020-01-01"assert keys[1]=="2020-01-02"# test that the run_config for a partition is valid for do_stuff_partitioned
run_config = my_offset_partitioned_config.get_run_config_for_partition_key(keys[0])assert validate_run_config(do_more_stuff_partitioned, run_config)# test that the contents of run_config are what you expectassert run_config =={"ops":{"process_data":{"config":{"start":"2020-01-01-00:15","end":"2020-01-02-00:15"}}}}
To run a partitioned job in-process on a particular partition, you can supply a value for the partition_key argument of JobDefinition.execute_in_process