Loggers#

As part of its rich, extensible logging system, Dagster includes loggers. Loggers can be applied to all jobs within a code location or, in advanced cases, overriden at the job level. Logging handlers will be automatically invoked whenever ops in a job log messages, meaning out-of-the-box loggers track all execution events. Loggers can also be customized to meet your specific needs.


Relevant APIs#

NameDescription
@loggerThe decorator used to define loggers. The decorator returns a LoggerDefinition .
LoggerDefinitionClass for loggers. You almost never want to use initialize this class directly. Instead, you should use the @logger decorator.
OpExecutionContextThe context object available to an op compute function.
InitLoggerContextThe context object passed to a custom logger's initialization function.
build_init_logger_contextA function to construct a InitLoggerContext outside of execution, used primarily for testing purposes.

Defining loggers#

By default, Dagster comes with a built-in logger that tracks all the execution events. You can find an example in the Using built-in loggers section.

The built-in loggers are defined internally using the LoggerDefinition class. The @logger decorator exposes a simpler API for the common logging use case, which is typically what you'll use to define your own loggers.

The decorated function should take a single argument, the init_context available during logger initialization, and return a logging.Logger. You can find an example in the Customizing loggers section.


Using loggers#

Logging from an op#

Any op can emit log messages at any point in its computation:

# demo_logger.py

from dagster import job, op


@op
def hello_logs(context):
    context.log.info("Hello, world!")


@job
def demo_job():
    hello_logs()

Using built-in loggers#

When you run the above job in the Terminal:

dagster job execute -f demo_logger.py -j demo_job

You'll find the messages have been logged through a built-in logger:

2022-12-15 14:29:33 -0500 - dagster - DEBUG - demo_job - 26304743-8f52-4342-950c-4f2cd5ee436c - 12151 - RUN_START - Started execution of run for "demo_job".
2022-12-15 14:29:33 -0500 - dagster - DEBUG - demo_job - 26304743-8f52-4342-950c-4f2cd5ee436c - 12151 - ENGINE_EVENT - Executing steps using multiprocess executor: parent process (pid: 12151)
2022-12-15 14:29:33 -0500 - dagster - DEBUG - demo_job - 26304743-8f52-4342-950c-4f2cd5ee436c - 12151 - hello_logs - STEP_WORKER_STARTING - Launching subprocess for "hello_logs".
2022-12-15 14:29:34 -0500 - dagster - DEBUG - demo_job - 26304743-8f52-4342-950c-4f2cd5ee436c - 12167 - STEP_WORKER_STARTED - Executing step "hello_logs" in subprocess.
2022-12-15 14:29:34 -0500 - dagster - DEBUG - demo_job - 26304743-8f52-4342-950c-4f2cd5ee436c - 12167 - hello_logs - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2022-12-15 14:29:34 -0500 - dagster - DEBUG - demo_job - 26304743-8f52-4342-950c-4f2cd5ee436c - 12167 - hello_logs - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2022-12-15 14:29:34 -0500 - dagster - DEBUG - demo_job - 26304743-8f52-4342-950c-4f2cd5ee436c - 12167 - LOGS_CAPTURED - Started capturing logs in process (pid: 12167).
2022-12-15 14:29:34 -0500 - dagster - DEBUG - demo_job - 26304743-8f52-4342-950c-4f2cd5ee436c - 12167 - hello_logs - STEP_START - Started execution of step "hello_logs".
2022-12-15 14:29:34 -0500 - dagster - INFO - demo_job - 26304743-8f52-4342-950c-4f2cd5ee436c - hello_logs - Hello, world!
2022-12-15 14:29:34 -0500 - dagster - DEBUG - demo_job - 26304743-8f52-4342-950c-4f2cd5ee436c - 12167 - hello_logs - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2022-12-15 14:29:34 -0500 - dagster - DEBUG - demo_job - 26304743-8f52-4342-950c-4f2cd5ee436c - hello_logs - Writing file at: /Users/erincochran/Desktop/asset_tutorial/my_dagster_project/tmppe9yyr4b/storage/26304743-8f52-4342-950c-4f2cd5ee436c/hello_logs/result
2022-12-15 14:29:34 -0500 - dagster - DEBUG - demo_job - 26304743-8f52-4342-950c-4f2cd5ee436c - 12167 - hello_logs - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2022-12-15 14:29:34 -0500 - dagster - DEBUG - demo_job - 26304743-8f52-4342-950c-4f2cd5ee436c - 12167 - hello_logs - STEP_SUCCESS - Finished execution of step "hello_logs" in 25ms.
2022-12-15 14:29:34 -0500 - dagster - DEBUG - demo_job - 26304743-8f52-4342-950c-4f2cd5ee436c - 12151 - ENGINE_EVENT - Multiprocess executor: parent process exiting after 1.34s (pid: 12151)
2022-12-15 14:29:34 -0500 - dagster - DEBUG - demo_job - 26304743-8f52-4342-950c-4f2cd5ee436c - 12151 - RUN_SUCCESS - Finished execution of run for "demo_job".

The context object passed to every op execution includes the built-in log manager, context.log. It exposes the usual debug, info, warning, error, and critical methods you would expect anywhere else in Python.

When you run Dagster jobs in the Dagster UI, you'll notice that log messages are visible as colored messages in the console. For example:

  • Logs stream back to the the UI frontend in real time:

    Real time logs in the Dagster UI
  • Filtering log messages based on execution steps and log levels:

    Log filtering in the Dagster UI

Debugging using logs#

What happens if we introduce an error into our op logic?

# demo_logger_error.py

from dagster import job, op


@op
def hello_logs_error(context):
    raise Exception("Somebody set up us the bomb")


@job
def demo_job_error():
    hello_logs_error()

Errors in user code are caught by the Dagster machinery to ensure jobs gracefully halt or continue to execute, but messages including the original stack trace get logged both to the console and back to the UI.

Messages at level ERROR or above are highlighted both in the UI and in the console logs, so they can be easily identified even without filtering:

ERROR level in logs in the Dagster UI

In many cases, especially for local development, this log viewer, coupled with op reexecution, is sufficient to enable a fast debug cycle for job implementation.


Examples#

Configuring built-in loggers#

Suppose that we've gotten the kinks out of our jobs developing locally, and now we want to run in production—without all of the log spew from DEBUG messages that was helpful during development.

Just like ops, loggers can be configured when you run a job. For example, to filter all messages below ERROR out of the colored console logger, add the following snippet to your config YAML:

loggers:
  console:
    config:
      log_level: ERROR

When a job with the above config is executed, you'll only see the ERROR level logs.

Customizing loggers#

You may find yourself wanting to add or supplement the built-in loggers so that Dagster logs are integrated with the rest of your log aggregation and monitoring infrastructure.

For example, you may be operating in a containerized environment where container stdout is aggregated by a tool such as Logstash. In this kind of environment, where logs will be aggregated and parsed by machine, the multi-line output from the default colored console logger is unhelpful. Instead, we'd much prefer to see single-line, structured log messages like:

{"orig_message": "Hello, world!", "log_message_id": "49854579-e4d1-4289-8453-b3e177b20056", ...}

Good news - Dagster already includes a logger that prints JSON-formatted single-line messages like this to the console (json_console_logger). But let's look at how we might implement a simplified version of this logger.

Loggers are defined internally using the LoggerDefinition class, but, following a common pattern in the Dagster codebase, the @logger decorator exposes a simpler API for the common use case and is typically what you'll use to define your own loggers. The decorated function should take a single argument, the init_context available during logger initialization, and return a logging.Logger:

@logger(
    {
        "log_level": Field(str, is_required=False, default_value="INFO"),
        "name": Field(str, is_required=False, default_value="dagster"),
    },
    description="A JSON-formatted console logger",
)
def json_console_logger(init_context):
    level = init_context.logger_config["log_level"]
    name = init_context.logger_config["name"]

    klass = logging.getLoggerClass()
    logger_ = klass(name, level=level)

    handler = logging.StreamHandler()

    class JsonFormatter(logging.Formatter):
        def format(self, record):
            return json.dumps(record.__dict__)

    handler.setFormatter(JsonFormatter())
    logger_.addHandler(handler)

    return logger_


@op
def hello_logs(context):
    context.log.info("Hello, world!")


@job(logger_defs={"my_json_logger": json_console_logger})
def demo_job():
    hello_logs()

As you can see, you can specify the logger name in the run config. It also takes a config argument, representing the config that users can pass to the logger. For example:

loggers:
  my_json_logger:
    config:
      log_level: INFO

When you execute the job:

dagster job execute -f custom_logger.py -c config_custom_logger.yaml

You'll see that the custom JSON logger is now being used:

Custom logger Terminal output

Specifying default code location loggers#

This feature is experimental.

Defalt loggers can be specified on a Definitions object by supplying an object containing a logger to the loggers argument.

When specified, these loggers will be added to every job specified in the code location and asset materializations provided to the code location:

from dagster import Definitions, define_asset_job, asset


@asset
def some_asset():
    ...


the_job = define_asset_job("the_job", selection="*")


defs = Definitions(
    jobs=[the_job], assets=[some_asset], loggers={"json_logger": json_console_logger}
)

Note: If loggers are explicitly specified at the job level, they will override those provided to the Definitions object.

Testing custom loggers#

You can unit test the initialization method of a logger by invoking it:

def test_init_json_console_logger():
    logger_ = json_console_logger(None)
    assert logger_.level == 20
    assert logger_.name == "dagster"

If you need to provide config to the initialization of your logger, you can use the build_init_logger_context function:

from dagster import build_init_logger_context


def test_init_json_console_logger_with_context():
    logger_ = json_console_logger(
        build_init_logger_context(logger_config={"name": "my_logger"})
    )
    assert logger_.level == 20
    assert logger_.name == "my_logger"

Patterns#

Environment-specific logging using jobs#

Logging is environment-specific: you don't want messages generated by data scientists' local development loops to be aggregated with production messages. On the other hand, you may find that in production console logging is irrelevant or even counterproductive.

Dagster recognizes this by attaching loggers to jobs so that you can seamlessly switch from, e.g., Cloudwatch logging in production to console logging in development and test, without changing any of your code. For example:

@op
def log_op(context):
    context.log.info("Hello, world!")


@graph
def hello_logs():
    log_op()


local_logs = hello_logs.to_job(
    name="local_logs", logger_defs={"console": colored_console_logger}
)
prod_logs = hello_logs.to_job(
    name="prod_logs", logger_defs={"cloudwatch": cloudwatch_logger}
)

From the UI, you can view and execute the prod_logs job and edit config in order to use the new Cloudwatch logger, for example:

loggers:
  cloudwatch:
    config:
      log_level: ERROR
      log_group_name: /my/cool/cloudwatch/log/group
      log_stream_name: very_good_log_stream