Source code for dagster_docker.ops.docker_container_op

from typing import Any, Mapping, Optional, Sequence

import docker
from dagster import Field, In, Nothing, OpExecutionContext, StringSource, op
from dagster._annotations import experimental
from dagster._core.utils import parse_env_var
from dagster._serdes.utils import hash_str

from ..container_context import DockerContainerContext
from ..docker_run_launcher import DockerRunLauncher
from ..utils import DOCKER_CONFIG_SCHEMA, validate_docker_image

DOCKER_CONTAINER_OP_CONFIG = {
    **DOCKER_CONFIG_SCHEMA,
    "image": Field(
        StringSource,
        is_required=True,
        description="The image in which to run the Docker container.",
    ),
    "entrypoint": Field(
        [str],
        is_required=False,
        description="The ENTRYPOINT for the Docker container",
    ),
    "command": Field(
        [str],
        is_required=False,
        description="The command to run in the container within the launched Docker container.",
    ),
}


def _get_client(docker_container_context: DockerContainerContext):
    client = docker.client.from_env()
    if docker_container_context.registry:
        client.login(
            registry=docker_container_context.registry["url"],
            username=docker_container_context.registry["username"],
            password=docker_container_context.registry["password"],
        )
    return client


def _get_container_name(run_id, op_name, retry_number):
    container_name = hash_str(run_id + op_name)

    retry_number = retry_number
    if retry_number > 0:
        container_name = f"{container_name}-{retry_number}"

    return container_name


def _create_container(
    op_context: OpExecutionContext,
    client,
    container_context: DockerContainerContext,
    image: str,
    entrypoint: Optional[Sequence[str]],
    command: Optional[Sequence[str]],
):
    env_vars = dict([parse_env_var(env_var) for env_var in container_context.env_vars])
    return client.containers.create(
        image,
        name=_get_container_name(op_context.run_id, op_context.op.name, op_context.retry_number),
        detach=True,
        network=container_context.networks[0] if len(container_context.networks) else None,
        entrypoint=entrypoint,
        command=command,
        environment=env_vars,
        **container_context.container_kwargs,
    )


[docs]@experimental def execute_docker_container( context: OpExecutionContext, image: str, entrypoint: Optional[Sequence[str]] = None, command: Optional[Sequence[str]] = None, networks: Optional[Sequence[str]] = None, registry: Optional[Mapping[str, str]] = None, env_vars: Optional[Sequence[str]] = None, container_kwargs: Optional[Mapping[str, Any]] = None, ): """This function is a utility for executing a Docker container from within a Dagster op. Args: image (str): The image to use for the launched Docker container. entrypoint (Optional[Sequence[str]]): The ENTRYPOINT to run in the launched Docker container. Default: None. command (Optional[Sequence[str]]): The CMD to run in the launched Docker container. Default: None. networks (Optional[Sequence[str]]): Names of the Docker networks to which to connect the launched container. Default: None. registry: (Optional[Mapping[str, str]]): Information for using a non local/public Docker registry. Can have "url", "username", or "password" keys. env_vars (Optional[Sequence[str]]): List of environemnt variables to include in the launched container. ach can be of the form KEY=VALUE or just KEY (in which case the value will be pulled from the calling environment. container_kwargs (Optional[Dict[str[Any]]]): key-value pairs that can be passed into containers.create in the Docker Python API. See https://docker-py.readthedocs.io/en/stable/containers.html for the full list of available options. """ run_container_context = DockerContainerContext.create_for_run( context.dagster_run, context.instance.run_launcher if isinstance(context.instance.run_launcher, DockerRunLauncher) else None, ) validate_docker_image(image) op_container_context = DockerContainerContext( registry=registry, env_vars=env_vars, networks=networks, container_kwargs=container_kwargs ) container_context = run_container_context.merge(op_container_context) client = _get_client(container_context) try: container = _create_container( context, client, container_context, image, entrypoint, command ) except docker.errors.ImageNotFound: client.images.pull(image) container = _create_container( context, client, container_context, image, entrypoint, command ) if len(container_context.networks) > 1: for network_name in container_context.networks[1:]: network = client.networks.get(network_name) network.connect(container) container.start() for line in container.logs(stdout=True, stderr=True, stream=True, follow=True): print(line) # noqa: T201 exit_status = container.wait()["StatusCode"] if exit_status != 0: raise Exception(f"Docker container returned exit code {exit_status}")
[docs]@op(ins={"start_after": In(Nothing)}, config_schema=DOCKER_CONTAINER_OP_CONFIG) @experimental def docker_container_op(context): """An op that runs a Docker container using the docker Python API. Contrast with the `docker_executor`, which runs each Dagster op in a Dagster job in its own Docker container. This op may be useful when: - You need to orchestrate a command that isn't a Dagster op (or isn't written in Python) - You want to run the rest of a Dagster job using a specific executor, and only a single op in docker. For example: .. literalinclude:: ../../../../../../python_modules/libraries/dagster-docker/dagster_docker_tests/test_example_docker_container_op.py :start-after: start_marker :end-before: end_marker :language: python You can create your own op with the same implementation by calling the `execute_docker_container` function inside your own op. """ execute_docker_container(context, **context.op_config)