In this guide, we’ll walk you through how to run your data pipelines without manual intervention, i.e. automate, and identify the Dagster tools to make that happen.
This guide assumes you have some familiarity with several Dagster concepts, including software-defined assets and jobs.
Dagster offers a few different ways of automating data pipelines, and choosing the right Dagster tool depends on your specific needs. In this guide we'll walk through a few different cases, and which Dagster tool you should use in each case.
When thinking about automating your data pipelines, it’s helpful to think through how and when your data needs to be refreshed. A few helpful things to think through:
How often does my data need to be refreshed? Does all data need to be refreshed at the same frequency?
How is my data split up? Do older records need updating?
Am I waiting for some data that needs to trigger downstream updates?
Do I want it to run every time a new file is added or update data in batches?
You want computation to be triggered based on the time. For example, you want some data to be updated every day by a certain time to refresh a dashboard, or you want your data warehouse to have the most recent information for the team to work with. This is one of the most traditional cases when building a pipeline. Let's go into how to do this with Dagster!
Dagster offers basic scheduling capabilities that allow you to specify how often and when you want a job to run. This can mean daily, weekly, or hourly. Dagster also supports any type of cron scheduling.
You can also use the @schedule decorator if you want to provide custom run config and tags. For example, if you want to pass a parameter to the job at runtime, for example, activity = ‘party’ on the weekend and activity=’grind’ on weekdays, you can use the configuration to pass parameters in.
# sets the schedule to be updated everyday at 9:00 AM@schedule(job=configurable_job, cron_schedule="0 9 * * *")defconfigurable_job_schedule(context: ScheduleEvaluationContext):if context.scheduled_execution_time.weekday()<5:
activity_selection ="grind"else:
activity_selection ="party"return RunRequest(
run_config={"ops":{"configurable_op":{"config":{"activity": activity_selection}}}})
I want specify how fresh each asset needs to be and let Dagster figure out when to run them#
As an alternative to thinking about specific times that you want to run jobs, Dagster offers a declarative approach to scheduling, based on how up-to-date you need each of your data assets to be. When you opt for this approach, you specify FreshnessPolicys on each of the assets that you need to be updated, and then tell Dagster to use those freshness policies to figure out when to auto-materialize your assets.
In the following example:
By 9 am each morning, we want the finance_report to include data on all the transactions from the previous day, so that the finance team can review it.
We want the sales_report to always include data on all the transactions that occurred two hours before or earlier.
The transactions_cleaned table feeds both of these reports. We don't have any direct requirements for it, but it needs to be updated so that the reports can have up-to-date transaction data.
We accomplish this by setting FreshnessPolicys on the report assets and lazy AutoMaterializePolicys on all the assets. When possible, Dagster will time runs of transactions_cleaned intelligently so that it doesn't need to be updated separately on behalf of each downstream report.
Note that this way of scheduling is currently marked experimental, and some of the APIs may change in the future.
My data should be updated every time something happens#
I want my data to be updated when an event happens#
Let’s say you have sales pipeline data that you want updated every time a customer submits an RFP on your website. Dagster offers sensors that allow you to kick off jobs based on some external change, such as a new file in an S3 bucket, if some other asset is updated, or if a system is down.
from dagster_aws.s3.sensor import get_s3_keys
@sensor(job=log_file_job)defmy_s3_sensor(context):
since_key = context.cursor orNone
new_s3_keys = get_s3_keys("my_s3_rfp_bucket", since_key=since_key)ifnot new_s3_keys:return SkipReason("No new s3 files found for bucket my_s3_rfp_bucket.")
last_key = new_s3_keys[-1]
run_requests =[RunRequest(run_key=s3_key, run_config={})for s3_key in new_s3_keys]
context.update_cursor(last_key)return run_requests
I want some assets to materialize every time other assets materialize#
For example, the asset2 is downstream of asset1, and you want asset2 to be materialized every time asset1 is materialized.
Dagster provides eagerAutoMaterializePolicys, which allow materializing an asset whenever it's parent assets are materialized. Note that this feature is currently marked experimental, and some of the APIs may change in the future.
from dagster import AutoMaterializePolicy, asset
@assetdefasset1():...@asset(auto_materialize_policy=AutoMaterializePolicy.eager())defasset2(asset1):...
To learn more about auto-materializing assets, visit the Concept page.
Some of my data should be updated, but not everything#
Let’s say you have a database with all your credit card transactions. When a return occurs, the original purchase record doesn’t change last week’s data, but a new transaction showing the return is added to the current day’s information.
In this case, you want your data pipeline to include all the latest data, but it doesn’t make sense to continuously process data from two years or two days ago when you know it hasn’t changed. What you want is to process yesterday’s data and not waste compute on the rest of the data.
Dagster partitions do just that. A partition is a slice of your data; in this case, each partition represents a day of transactions. You can partition or ‘split’ your data based on whatever makes the most sense, by one or multiple dimensions.
Partitions can be used with both schedules and sensors. You can use schedules to kick off a partitioned job to update slices of data. You can also trigger runs once a specific partition has been updated using auto-materialize policies based on partitioned materializations.