Jobs#

Jobs are the main unit of execution and monitoring in Dagster.

A job does one of two things:

Jobs can be launched in a few different ways:

  • Manually from the Dagster UI
  • At fixed intervals, by schedules
  • When external changes occur, using sensors

Relevant APIs#

NameDescription
define_asset_jobA function for defining a job from a selection of assets.
@jobThe decorator used to define a job.
JobDefinitionA job definition. Jobs are the main unit of execution and monitoring in Dagster. Typically constructed using the @job decorator.

Creating jobs#

Jobs can be created in several ways:

From software-defined assets#

Asset jobs materialize a fixed set of assets each time they run. Additionally, multiple jobs can target overlapping sets of assets:

from dagster import Definitions, asset, define_asset_job


@asset
def asset1():
    return [1, 2, 3]


@asset
def asset2(asset1):
    return asset1 + [4]


all_assets_job = define_asset_job(name="all_assets_job")
asset1_job = define_asset_job(name="asset1_job", selection="asset1")


defs = Definitions(
    assets=[asset1, asset2],
    jobs=[all_assets_job, asset1_job],
)

Unlike jobs created using the job decorator where you explicitly define the dependencies when you create the job, the topology of an asset-based job is based on the assets and their dependencies.

Directly from ops#

Using the @job decorator#

The simplest way to create an op-based job is to use the job decorator.

Within the decorated function body, you can use function calls to indicate the dependency structure between the ops/graphs. This allows you to explicitly define dependencies between ops when you define the job.

In this example, the add_one op depends on the return_five op's output. Because this data dependency exists, the add_one op executes after return_five runs successfully and emits the required output.

from dagster import job, op


@op
def return_five():
    return 5


@op
def add_one(arg):
    return arg + 1


@job
def do_stuff():
    add_one(return_five())

When defining a job, you can provide resources, configuration, hooks, tags, and an executor (follow the links for explanation of how to use each of these).

Like regular jobs, jobs that target assets can be placed on schedules and sensors.

From a graph#

Creating jobs from a graph can be useful when you want to define inter-op dependencies before binding them to resources, configuration, executors, and other environment-specific features. This approach to job creation allows you to customize graphs for each environment by plugging in configuration and services specific to that environment.

You can model this by building multiple jobs that use the same underlying graph of ops. The graph represents the logical core of data transformation, and the configuration and resources on each job customize the behavior of that job for its environment.

To do this, you first define a graph with the @graph decorator.

from dagster import graph, op, ConfigurableResource


class Server(ConfigurableResource):
    def ping_server(self):
        ...


@op
def interact_with_server(server: Server):
    server.ping_server()


@graph
def do_stuff():
    interact_with_server()

Then you build jobs from it using the GraphDefinition.to_job method:

from dagster import ResourceDefinition

prod_server = ResourceDefinition.mock_resource()
local_server = ResourceDefinition.mock_resource()

prod_job = do_stuff.to_job(resource_defs={"server": prod_server}, name="do_stuff_prod")
local_job = do_stuff.to_job(
    resource_defs={"server": local_server}, name="do_stuff_local"
)

to_job accepts the same arguments as the @job decorator: you can provide resources, configuration, hooks, tags, and an executor.


Configuring jobs#

Ops, software-defined assets, and resources often accept configuration that determines how they behave. By default, you supply configuration for these ops and resources at the time you launch the job.

When constructing a job, you can customize how that configuration will be satisfied, by passing a value to the config parameter of the GraphDefinition.to_job method or the @job decorator. The options are discussed below:

Hardcoded configuration#

You can supply a RunConfig object or raw config dictionary. The supplied config will be used to configure the job whenever the job is launched. It will show up in the UI Launchpad and can be overridden.

from dagster import Config, RunConfig, job, op


class DoSomethingConfig(Config):
    config_param: str


@op
def do_something(context, config: DoSomethingConfig):
    context.log.info("config_param: " + config.config_param)


default_config = RunConfig(
    ops={"do_something": DoSomethingConfig(config_param="stuff")}
)


@job(config=default_config)
def do_it_all_with_default_config():
    do_something()


if __name__ == "__main__":
    # Will log "config_param: stuff"
    do_it_all_with_default_config.execute_in_process()

Partitioned configuration#

For op-based jobs, you can supply a PartitionedConfig to create a partitioned job. This defines a discrete set of partitions along with a function for generating config for a partition. Job runs can be configured by selecting a partition.

Refer to the Partitions documentation for more info and examples.

Config mapping#

You can supply a ConfigMapping. This allows you to expose a narrower config interface to your job. Instead of needing to configure every op and resource individually when launching the job, you can supply a smaller number of values to the outer config, and the ConfigMapping can translate it into config for all the job's ops and resources.

from dagster import Config, RunConfig, config_mapping, job, op


class DoSomethingConfig(Config):
    config_param: str


@op
def do_something(context, config: DoSomethingConfig) -> None:
    context.log.info("config_param: " + config.config_param)


class SimplifiedConfig(Config):
    simplified_param: str


@config_mapping
def simplified_config(val: SimplifiedConfig) -> RunConfig:
    return RunConfig(
        ops={"do_something": DoSomethingConfig(config_param=val.simplified_param)}
    )


@job(config=simplified_config)
def do_it_all_with_simplified_config():
    do_something()


if __name__ == "__main__":
    # Will log "config_param: stuff"
    do_it_all_with_simplified_config.execute_in_process(
        run_config={"simplified_param": "stuff"}
    )

Making jobs available to Dagster tools#

You make jobs available to the UI, GraphQL, and the command line by including them in Definitions object at the top-level of Python module or file. The tool loads that module as a code location. If you include schedules or sensors, the code location will automatically include jobs that those schedules or sensors target.

from dagster import Definitions, job


@job
def do_it_all():
    ...


defs = Definitions(jobs=[do_it_all])

Testing jobs#

Dagster has built-in support for testing, including separating business logic from environments and setting explicit expectations on uncontrollable inputs. Refer to the Testing guide for more info and examples.


Executing jobs#

You can run a job in a variety of ways:

  • In the Python process where it's defined
  • Via the command line
  • Via the GraphQL API
  • In the UI. The UI centers on jobs, making it a one-stop-shop - you can manually kick off runs for a job and view all historical runs.

Refer to the Job execution guide for more info and examples.


For more examples of jobs, check out the following in our Hacker News example: