Re-execution in Dagster#

This guide is intended to walk through how to re-execute Dagster jobs and where the subsequent executions are found in the Dagster UI.

Motivation#

If ops fail or upstream data has changed within a job execution, the job may need to be re-run starting from a particular point. Dagster calls this process re-execution.

Imagine a machine learning job with three ops. The first op, training the model, is the most time and resource intensive. Then, we test the model, and build analyses on the results. Suppose that the job fails with the op that is testing the model. After fixing the root cause, we want to re-run our job. However, it would take much more time to create a new run of the job as we would have to repeat the first op. It would be more economical to start again from the second op, reusing the previous run's execution result for the first op.

With Dagster, the re-execution of parts of the job is grouped with the original run to make it easy to trace. The original job execution metadata is not overwritten, making re-execution a non-destructive operation.

Example#

Consider the following job which has three ops, one of which fails half of the time.

from dagster import in_process_executor, job, op


@op
def start():
    return 1


@op
def unreliable(num: int) -> int:
    failure_rate = 0.5
    if some_random_result() < failure_rate:
        raise Exception("blah")

    return num


@op
def end(_num: int):
    pass


@job(executor_def=in_process_executor)
def unreliable_job():
    end(unreliable(start()))

Although very simple, there are inputs and outputs passed between ops. With an IO manager, re-execution is able to handle inputs and outputs stored from the initial run.

To initiate a re-execution from an existing run, navigate to the run in the UI and you can find the re-execution option on the top right of the interface.

Re-execution in the UI#

Under the re-execution drop down, you will see multiple options. No matter which one you choose, the re-executed job is linked to the original run.

  • All Ops: Re-execute the job from scratch. This option is most relevant if you would like to associate runs together when testing jobs end-to-end.
  • Selected Op(s): Re-execute the selected op(s). Ops can be selected regardless of their op status. This option is most relevant if your job is large, and you know exactly which ops to execute. This can be done by clicking on the boxes in the gantt chart view.
  • From Selected: Re-execute the job downstream from the selected ops. This option is most relevant if a particular op fails, and your intent is to run all downstream ops regardless of op status. You are likely developing a single op, and want to make sure downstream ops work as expected.
  • From Failure: Re-execute the job, skipping ops completed successfully. This option is only enabled when the run has failed. You have likely fixed the failed op, and want to re-run the op and all downstream dependencies. Dagster will figure out the dependencies for you!

In the above example, re-executing from failure would make sense as the failed task has a 50% chance of succeeding on the next run.

If the run succeeded but the underlying code changed, running specific ops to test the differences would be more relevant.

Selecting Ops#

Within the UI, a single or multiple ops may be selected simply by clicking them with the mouse. Alternatively, you can use the subset selector and specify your desired op names to re-run.

Re-execution using Python APIs#

Re-execution can be triggered via the API as well.

NameDescription
execute_jobExecute a dagster job synchronously from python.
ReexecutionOptionsConfiguration to provide to execute_job allowing for reexecution from a previous dagster run.

Again, let's revist the job unreliable_job, which has a op named unreliable.

from dagster import DagsterInstance, ReexecutionOptions, execute_job, reconstructable
from docs_snippets.guides.dagster.reexecution.unreliable_job import unreliable_job

instance = DagsterInstance.ephemeral()

# Initial execution
initial_result = execute_job(reconstructable(unreliable_job), instance=instance)

if not initial_result.success:
    options = ReexecutionOptions.from_failure(initial_result.run_id, instance)
    # re-execute the entire job
    from_failure_result = execute_job(
        reconstructable(unreliable_job), instance=instance, reexecution_options=options
    )

Using Dagster's API, you can programmatically trigger both an execution and a reexecution. Upon an initial run failing, you may want to re-trigger a run from the point of failure, as shown above. Similarly, you can trigger a re-execution of selected ops or from a particular point.

# re-execute the job, but only the "unreliable" op and all its descendents
options = ReexecutionOptions(
    parent_run_id=initial_result.run_id, step_selection=["unreliable*"]
)
result = execute_job(
    reconstructable(unreliable_job),
    instance=instance,
    reexecution_options=options,
)

The step_selection input is configurable, with syntax further documented in the API docs for ReexecutionOptions.