Resources#

This guide covers using the new Pythonic resources system introduced in Dagster 1.3. If your code is still using the legacy resources system, see the legacy resources guide. To migrate your code, refer to the migrating to Pythonic resources and config guide.

Resources are objects that are shared across the implementations of multiple software-defined assets, ops, schedules, and sensors. These resources can be plugged in after the definitions of your assets or ops, and can be easily swapped out.

Resources typically model external components that assets and ops interact with. For example, a resource might be a connection to a data warehouse like Snowflake or a service like Slack.

So, why use resources?

  • Plug in different implementations in different environments - If you have a heavy external dependency that you want to use in production, but avoid using in testing, you can accomplish this by providing different resources in each environment. Check out Separating Business Logic from Environments for more info about this capability.
  • Surface configuration in the UI - Resources and their configuration are surfaced in the Dagster UI, making it easy to see where your resources are used and how they are configured.
  • Share configuration across multiple ops or assets - Resources are configurable and shared, so you can supply configuration in one place instead of configuring the ops and assets individually.
  • Share implementations across multiple ops or assets - When multiple ops access the same external services, resources provide a standard way to structure your code to share the implementations.

Relevant APIs#

NameDescription
ConfigurableResourceThe base class extended to define resources. Under the hood, implements ResourceDefinition.
ResourceParamAn annotation used to specify that a plain Python object parameter for an op or asset is a resource.
ResourceDefinitionClass for resource definitions. You almost never want to use initialize this class directly. Instead, you should extend the ConfigurableResource class which implements ResourceDefinition.
InitResourceContextThe context object provided to a resource during initialization. This object contains required resource, config, and other run information.
build_init_resource_contextFunction for building an InitResourceContext outside of execution, intended to be used when testing a resource.
build_resourcesFunction for initializing a set of resources outside of the context of a job's execution.
with_resourcesAdvanced API for providing resources to a specific set of software-defined assets and source assets, overriding those provided to Definitions.

Defining a resource#

Typically, resources are defined by subclassing ConfigurableResource. Attributes on the class are used to define the resource's configuration schema. The configuration system has a few advantages over plain Python parameter passing; configured values are displayed in the Dagster UI and can be set dynamically using environment variables. Binding resource config values can also be delayed so that they can be specified at run launch time.

Assets and ops specify resource dependencies by annotating the resource as a parameter to the asset or op function.

To provide resource values to your assets and ops, attach them to your Definitions call. These resources are automatically passed to the function at runtime.

Using software-defined assets#

Here, we define a subclass of ConfigurableResource representing a connection to an external service. We can configure the resource by constructing it in the Definitions call.

We can define methods on the resource class which depend on config values. These methods can be used by assets and ops.

from dagster import asset, Definitions, ConfigurableResource
import requests
from requests import Response

class MyConnectionResource(ConfigurableResource):
    username: str

    def request(self, endpoint: str) -> Response:
        return requests.get(
            f"https://my-api.com/{endpoint}",
            headers={"user-agent": "dagster"},
        )

@asset
def data_from_service(my_conn: MyConnectionResource) -> Dict[str, Any]:
    return my_conn.request("/fetch_data").json()

defs = Definitions(
    assets=[data_from_service],
    resources={
        "my_conn": MyConnectionResource(username="my_user"),
    },
)

There are many supported config types that can be used when defining resources. See the advanced config types documentation for a more comprehensive overview on the available config types.

Using resources with sensors#

Sensors can use resources in the same way as ops and assets, which can be useful for querying external services for data.

To specify resource dependencies on a sensor, annotate the resource type as a parameter to the sensor's function. For more information and examples, refer to the Sensors documentation.

Using resources with schedules#

Schedules can also use resources in case your schedule logic needs to interface with an external tool, or to make your schedule logic more testable.

To specify resource dependencies on a schedule, annotate the resource type as a parameter to the schedule's function. For more information and examples, refer to the Schedules documentation.

Using environment variables with resources#

Resources can be configured using environment variables, which is useful for secrets or other environment-specific configuration. If you're using Dagster Cloud, environment variables can be configured directly in the UI.

To use environment variables, pass an EnvVar when constructing your resource. EnvVar inherits from str and can be used to populate any string config field on a resource. The value of the environment variable will be evaluated at launch time.

from dagster import EnvVar, Definitions, ConfigurableResource

class CredentialsResource(ConfigurableResource):
    username: str
    password: str

defs = Definitions(
    assets=...,
    resources={
        "credentials": CredentialsResource(
            username=EnvVar("MY_USERNAME"),
            password=EnvVar("MY_PASSWORD"),
        )
    },
)

For more information on using environment variables with Dagster, refer to the Environment variables guide.

Configuring resources at launch time#

In some cases, you may want to specify configuration for a resource at launch time, in the launchpad or in a RunRequest for a schedule or sensor. For example, you may want a sensor-triggered run to specify a different target table in a database resource for each run.

You can use the configure_at_launch() method to defer the construction of a configurable resource until launch time.

from dagster import ConfigurableResource, Definitions, asset

class DatabaseResource(ConfigurableResource):
    table: str

    def read(self):
        ...

@asset
def data_from_database(db_conn: DatabaseResource):
    return db_conn.read()

defs = Definitions(
    assets=[data_from_database],
    resources={"db_conn": DatabaseResource.configure_at_launch()},
)

Providing resource launch time configuration in Python code#

Then, configuration for the resource can be provided at launch time in the launchpad or in Python code using the config parameter of the RunRequest:

from dagster import sensor, define_asset_job, RunRequest, RunConfig

update_data_job = define_asset_job(
    name="update_data_job", selection=[data_from_database]
)

@sensor(job=update_data_job)
def table_update_sensor():
    tables = ...
    for table_name in tables:
        yield RunRequest(
            run_config=RunConfig(
                resources={
                    "db_conn": DatabaseResource(table=table_name),
                },
            ),
        )

Resources which depend on other resources#

In some situations, you may want to define a resource which depends on other resources. This is useful for common configuration. For example, separate resources for a database and for a filestore may both depend on credentials for a particular cloud provider. Defining these credentials as a separate, nested resource allows you to specify configuration in a single place. It also makes it easier to test your resources, since you can mock the nested resource.

In this case, you can list that nested resource as an attribute of your resource class.

from dagster import Definitions, ConfigurableResource

class CredentialsResource(ConfigurableResource):
    username: str
    password: str

class FileStoreBucket(ConfigurableResource):
    credentials: CredentialsResource
    region: str

    def write(self, data: str):
        get_filestore_client(
            username=self.credentials.username,
            password=self.credentials.password,
            region=self.region,
        ).write(data)

defs = Definitions(
    assets=[my_asset],
    resources={
        "bucket": FileStoreBucket(
            credentials=CredentialsResource(
                username="my_user", password="my_password"
            ),
            region="us-east-1",
        ),
    },
)

If we instead would like the configuration for our credentials to be provided at launch time, we can use the configure_at_launch() method to defer the construction of the CredentialsResource until launch time.

Because credentials requires launch time configuration through the launchpad, it must also be passed to the Definitions object, so that configuration can be provided at launch time. Nested resources only need to be passed to the Definitions object if they require launch time configuration.

credentials = CredentialsResource.configure_at_launch()

defs = Definitions(
    assets=[my_asset],
    resources={
        "credentials": credentials,
        "bucket": FileStoreBucket(
            credentials=credentials,
            region="us-east-1",
        ),
    },
)

Managing state in resources#

Once a resource reaches a certain complexity, it may be desirable to manage the state of the resource over its lifetime. This is useful for resources which require special initilization or cleanup. ConfigurableResource is a dataclass meant to encapsulate config, but can also be used to set up basic state.

You can mark any private state attributes using Pydantic's PrivateAttr. These attributes, which must start with an underscore, will not be included in the resource's config.

The setup_for_execution and teardown_after_execution methods can be overridden to initialize or teardown a resource before each run execution, and are free to modify any private state attributes.

In this instance, we can setup an API token for a client resource based on the username and password provided in the config. We then use that API token to query an API in our asset body.

from dagster import ConfigurableResource, asset
import requests

from pydantic import PrivateAttr

class MyClientResource(ConfigurableResource):
    username: str
    password: str

    _api_token: str = PrivateAttr()

    def setup_for_execution(self, context) -> None:
        # Fetch and set up an API token based on the username and password
        self._api_token = requests.get(
            "https://my-api.com/token", auth=(self.username, self.password)
        ).text

    def get_all_users(self):
        return requests.get(
            "https://my-api.com/users",
            headers={"Authorization": self._api_token},
        )

@asset
def my_asset(client: MyClientResource):
    return client.get_all_users()

setup_for_execution and teardown_after_execution are each called once per run, per process. When using the in-process executor, this means that they will be called once per run. When using the multiprocess executor, each process' instance of the resource will be initialized and torn down.

For more complex use-cases, you may instead override the yield_for_execution. By default, this context manager will call setup_for_execution, yield the resource, and then call teardown_after_execution, but you can override it to provide any custom behavior. This is useful for resources which require a context to be open for the duration of a run, such as database connections or file handles.

from dagster import ConfigurableResource, asset
from contextlib import contextmanager
from pydantic import PrivateAttr

class DBConnection:
    ...

    def query(self, body: str):
        ...

@contextmanager
def get_database_connection(username: str, password: str):
    ...

class MyClientResource(ConfigurableResource):
    username: str
    password: str

    _db_connection: DBConnection = PrivateAttr()

    @contextmanager
    def yield_for_execution(self, context):
        # keep connection open for the duration of the execution
        with get_database_connection(self.username, self.password) as conn:
            # set up the connection attribute so it can be used in the execution
            self._db_connection = conn

            # yield, allowing execution to occur
            yield self

    def query(self, body: str):
        return self._db_connection.query(body)

@asset
def my_asset(client: MyClientResource):
    client.query("SELECT * FROM my_table")

Defining Pythonic I/O managers#

Pythonic I/O managers are defined as subclasses of ConfigurableIOManager, and similarly to Pythonic resources specify any configuration fields as attributes. Each subclass must implement a handle_output and load_input method, which are called by Dagster at runtime to handle the storing and loading of data.

from dagster import (
    Definitions,
    AssetKey,
    OutputContext,
    InputContext,
    ConfigurableIOManager,
)

class MyIOManager(ConfigurableIOManager):
    root_path: str

    def _get_path(self, asset_key: AssetKey) -> str:
        return self.root_path + "/".join(asset_key.path)

    def handle_output(self, context: OutputContext, obj):
        write_csv(self._get_path(context.asset_key), obj)

    def load_input(self, context: InputContext):
        return read_csv(self._get_path(context.asset_key))

defs = Definitions(
    assets=...,
    resources={"io_manager": MyIOManager(root_path="/tmp/")},
)

Using bare Python objects as resources#

When starting to build a set of assets or jobs, you may want to use a bare Python object without configuration as a resource, such as a third-party API client.

Dagster supports passing plain Python objects as resources. This follows a similar pattern to using a ConfigurableResource subclass, however assets and ops which use these resources must annotate them with ResourceParam. This annotation lets Dagster know that the parameter is a resource and not an upstream input.

from dagster import Definitions, asset, ResourceParam

# `ResourceParam[GitHub]` is treated exactly like `GitHub` for type checking purposes,
# and the runtime type of the github parameter is `GitHub`. The purpose of the
# `ResourceParam` wrapper is to let Dagster know that `github` is a resource and not an
# upstream asset.

@asset
def public_github_repos(github: ResourceParam[GitHub]):
    return github.organization("dagster-io").repositories()

defs = Definitions(
    assets=[public_github_repos],
    resources={"github": GitHub(...)},
)

Testing configurable resources#

You can test the initialization of a ConfigurableResource by constructing it manually. In most cases, you can construct your resource directly:

from dagster import ConfigurableResource

class MyResource(ConfigurableResource):
    value: str

    def get_value(self) -> str:
        return self.value

def test_my_resource():
    assert MyResource(value="foo").get_value() == "foo"

If your resource requires other resources, then you can pass them as constructor arguments.

from dagster import ConfigurableResource

class StringHolderResource(ConfigurableResource):
    value: str

class MyResourceRequiresAnother(ConfigurableResource):
    foo: StringHolderResource
    bar: str

def test_my_resource_with_nesting():
    string_holder = StringHolderResource(value="foo")
    resource = MyResourceRequiresAnother(foo=string_holder, bar="bar")
    assert resource.foo.value == "foo"
    assert resource.bar == "bar"

Testing with resource context#

In the case that your resource makes use of the resource initialization context, you can use the build_init_resource_context utility alongside the with_init_resource_context helper on your resource class:

from dagster import (
    ConfigurableResource,
    build_init_resource_context,
    DagsterInstance,
)
from typing import Optional

class MyContextResource(ConfigurableResource[GitHub]):
    base_path: Optional[str] = None

    def effective_base_path(self) -> str:
        if self.base_path:
            return self.base_path
        instance = self.get_resource_context().instance
        assert instance
        return instance.storage_directory()

def test_my_context_resource():
    with DagsterInstance.ephemeral() as instance:
        context = build_init_resource_context(instance=instance)
        assert (
            MyContextResource(base_path=None)
            .with_resource_context(context)
            .effective_base_path()
            == instance.storage_directory()
        )

Next steps#

Resources are a powerful way to encapsulate reusable logic in your assets and ops. For more information on the supported config types for resources, see the advanced config types documentation. For information on the Dagster config system, which you can use to parameterize ops and assets, see the run configuration documentation.