Source code for dagster_dbt.core.resources_v2

import json
import os
import shutil
import subprocess
import sys
import time
from contextlib import suppress
from dataclasses import dataclass
from pathlib import Path
from typing import (
    Any,
    Dict,
    Iterator,
    List,
    Mapping,
    Optional,
    Sequence,
    Union,
)

import dateutil.parser
from dagster import (
    AssetObservation,
    AssetsDefinition,
    ConfigurableResource,
    OpExecutionContext,
    Output,
    _check as check,
    get_dagster_logger,
)
from dagster._core.errors import DagsterInvalidPropertyError
from dbt.contracts.results import NodeStatus
from dbt.node_types import NodeType
from pydantic import Field
from typing_extensions import Literal

from ..asset_utils import get_manifest_and_translator_from_dbt_assets, output_name_fn
from ..dagster_dbt_translator import DagsterDbtTranslator
from ..errors import DagsterDbtCliRuntimeError
from ..utils import ASSET_RESOURCE_TYPES, get_node_info_by_dbt_unique_id_from_manifest

logger = get_dagster_logger()


PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"


@dataclass
class DbtCliEventMessage:
    """The representation of a dbt CLI event.

    Args:
        raw_event (Dict[str, Any]): The raw event dictionary.
            See https://docs.getdbt.com/reference/events-logging#structured-logging for more
            information.
    """

    raw_event: Dict[str, Any]

    @classmethod
    def from_log(cls, log: str) -> "DbtCliEventMessage":
        """Parse an event according to https://docs.getdbt.com/reference/events-logging#structured-logging.

        We assume that the log format is json.
        """
        raw_event: Dict[str, Any] = json.loads(log)

        return cls(raw_event=raw_event)

    def __str__(self) -> str:
        return self.raw_event["info"]["msg"]

    def to_default_asset_events(
        self,
        manifest: Mapping[str, Any],
        dagster_dbt_translator: DagsterDbtTranslator = DagsterDbtTranslator(),
    ) -> Iterator[Union[Output, AssetObservation]]:
        """Convert a dbt CLI event to a set of corresponding Dagster events.

        Args:
            manifest (Mapping[str, Any]): The dbt manifest blob.
            dagster_dbt_translator (DagsterDbtTranslator): Optionally, a custom translator for
                linking dbt nodes to Dagster assets.

        Returns:
            Iterator[Union[Output, AssetObservation]]: A set of corresponding Dagster events.
                - AssetMaterializations for refables (e.g. models, seeds, snapshots.)
                - AssetObservations for test results.
        """
        event_node_info: Dict[str, Any] = self.raw_event["data"].get("node_info")
        if not event_node_info:
            return

        unique_id: str = event_node_info["unique_id"]
        node_resource_type: str = event_node_info["resource_type"]
        node_status: str = event_node_info["node_status"]

        is_node_successful = node_status == NodeStatus.Success
        is_node_finished = bool(event_node_info.get("node_finished_at"))
        if node_resource_type in NodeType.refable() and is_node_successful:
            started_at = dateutil.parser.isoparse(event_node_info["node_started_at"])
            finished_at = dateutil.parser.isoparse(event_node_info["node_finished_at"])
            duration_seconds = (finished_at - started_at).total_seconds()

            yield Output(
                value=None,
                output_name=output_name_fn(event_node_info),
                metadata={
                    "unique_id": unique_id,
                    "Execution Duration": duration_seconds,
                },
            )
        elif node_resource_type == NodeType.Test and is_node_finished:
            upstream_unique_ids: List[str] = manifest["parent_map"][unique_id]

            for upstream_unique_id in upstream_unique_ids:
                upstream_node_info: Dict[str, Any] = manifest["nodes"].get(
                    upstream_unique_id
                ) or manifest["sources"].get(upstream_unique_id)
                upstream_asset_key = dagster_dbt_translator.get_asset_key(upstream_node_info)

                yield AssetObservation(
                    asset_key=upstream_asset_key,
                    metadata={
                        "unique_id": unique_id,
                        "status": node_status,
                    },
                )


@dataclass
class DbtCliInvocation:
    """The representation of an invoked dbt command.

    Args:
        process (subprocess.Popen): The process running the dbt command.
        manifest (Mapping[str, Any]): The dbt manifest blob.
        project_dir (Path): The path to the dbt project.
        target_path (Path): The path to the dbt target folder.
        raise_on_error (bool): Whether to raise an exception if the dbt command fails.
    """

    process: subprocess.Popen
    manifest: Mapping[str, Any]
    dagster_dbt_translator: DagsterDbtTranslator
    project_dir: Path
    target_path: Path
    raise_on_error: bool

    @classmethod
    def run(
        cls,
        args: List[str],
        env: Dict[str, str],
        manifest: Mapping[str, Any],
        dagster_dbt_translator: DagsterDbtTranslator,
        project_dir: Path,
        target_path: Path,
        raise_on_error: bool,
    ) -> "DbtCliInvocation":
        # Attempt to take advantage of partial parsing. If there is a `partial_parse.msgpack` in
        # in the target folder, then copy it to the dynamic target path.
        #
        # This effectively allows us to skip the parsing of the manifest, which can be expensive.
        # See https://docs.getdbt.com/reference/programmatic-invocations#reusing-objects for more
        # details.
        partial_parse_file_path = project_dir.joinpath("target", PARTIAL_PARSE_FILE_NAME)
        partial_parse_destination_target_path = target_path.joinpath(PARTIAL_PARSE_FILE_NAME)

        if partial_parse_file_path.exists():
            logger.info(
                f"Copying `{partial_parse_file_path}` to `{partial_parse_destination_target_path}`"
                " to take advantage of partial parsing."
            )

            partial_parse_destination_target_path.parent.mkdir(parents=True)
            shutil.copy(partial_parse_file_path, partial_parse_destination_target_path)

        # Create a subprocess that runs the dbt CLI command.
        logger.info(f"Running dbt command: `{' '.join(args)}`.")
        process = subprocess.Popen(
            args=args,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            env=env,
            cwd=project_dir,
        )

        return cls(
            process=process,
            manifest=manifest,
            dagster_dbt_translator=dagster_dbt_translator,
            project_dir=project_dir,
            target_path=target_path,
            raise_on_error=raise_on_error,
        )

    def wait(self) -> Sequence[DbtCliEventMessage]:
        """Wait for the dbt CLI process to complete and return the events.

        Returns:
            Sequence[DbtCliEventMessage]: A sequence of events from the dbt CLI process.

        Examples:
            .. code-block:: python

                import json
                from dagster_dbt import DbtCliResource

                with open("path/to/manifest.json", "r") as f:
                    manifest = json.load(f)

                dbt = DbtCliResource(project_dir="/path/to/dbt/project")

                dbt_cli_task = dbt.cli(["run"], manifest=manifest)
                dbt_cli_task.wait()
        """
        return list(self.stream_raw_events())

    def is_successful(self) -> bool:
        """Return whether the dbt CLI process completed successfully.

        Returns:
            bool: True, if the dbt CLI process returns with a zero exit code, and False otherwise.

        Examples:
            .. code-block:: python

                import json
                from dagster_dbt import DbtCliResource

                with open("path/to/manifest.json", "r") as f:
                    manifest = json.load(f)

                dbt = DbtCliResource(project_dir="/path/to/dbt/project")

                dbt_cli_task = dbt.cli(["run"], manifest=manifest)
                dbt_cli_task.wait()

                if dbt_cli_task.is_successful():
                    ...
        """
        return self.process.wait() == 0

    def stream(self) -> Iterator[Union[Output, AssetObservation]]:
        """Stream the events from the dbt CLI process and convert them to Dagster events.

        Returns:
            Iterator[Union[Output, AssetObservation]]: A set of corresponding Dagster events.
                - Output for refables (e.g. models, seeds, snapshots.)
                - AssetObservations for test results.

        Examples:
            .. code-block:: python

                from pathlib import Path
                from dagster_dbt import DbtCliResource, dbt_assets

                @dbt_assets(manifest=Path("target", "manifest.json"))
                def my_dbt_assets(context, dbt: DbtCliResource):
                    yield from dbt.cli(["run"], context=context).stream()
        """
        for event in self.stream_raw_events():
            yield from event.to_default_asset_events(
                manifest=self.manifest, dagster_dbt_translator=self.dagster_dbt_translator
            )

    def stream_raw_events(self) -> Iterator[DbtCliEventMessage]:
        """Stream the events from the dbt CLI process.

        Returns:
            Iterator[DbtCliEventMessage]: An iterator of events from the dbt CLI process.
        """
        for raw_line in self.process.stdout or []:
            log: str = raw_line.decode().strip()
            try:
                event = DbtCliEventMessage.from_log(log=log)

                # Re-emit the logs from dbt CLI process into stdout.
                sys.stdout.write(str(event) + "\n")
                sys.stdout.flush()

                yield event
            except:
                # If we can't parse the log, then just emit it as a raw log.
                sys.stdout.write(log + "\n")
                sys.stdout.flush()

        # Ensure that the dbt CLI process has completed.
        if not self.is_successful() and self.raise_on_error:
            raise DagsterDbtCliRuntimeError(
                description=(
                    f"The dbt CLI process failed with exit code {self.process.returncode}. Check"
                    " the compute logs for the full information about the error."
                )
            )

    def get_artifact(
        self,
        artifact: Union[
            Literal["manifest.json"],
            Literal["catalog.json"],
            Literal["run_results.json"],
            Literal["sources.json"],
        ],
    ) -> Dict[str, Any]:
        """Retrieve a dbt artifact from the target path.

        See https://docs.getdbt.com/reference/artifacts/dbt-artifacts for more information.

        Args:
            artifact (Union[Literal["manifest.json"], Literal["catalog.json"], Literal["run_results.json"], Literal["sources.json"]]): The name of the artifact to retrieve.

        Returns:
            Dict[str, Any]: The artifact as a dictionary.

        Examples:
            .. code-block:: python

                import json
                from dagster_dbt import DbtCliResource

                with open("path/to/manifest.json", "r") as f:
                    manifest = json.load(f)

                dbt = DbtCliResource(project_dir="/path/to/dbt/project")

                dbt_cli_task = dbt.cli(["run"], manifest=manifest)
                dbt_cli_task.wait()

                # Retrieve the run_results.json artifact.
                if dbt_cli_task.is_successful():
                    run_results = dbt_cli_task.get_artifact("run_results.json")
        """
        artifact_path = self.target_path.joinpath(artifact)
        with artifact_path.open() as handle:
            return json.loads(handle.read())


[docs]class DbtCliResource(ConfigurableResource): """A resource used to execute dbt CLI commands. Attributes: project_dir (str): The path to the dbt project directory. This directory should contain a `dbt_project.yml`. See https://docs.getdbt.com/reference/dbt_project.yml for more information. global_config_flags (List[str]): A list of global flags configuration to pass to the dbt CLI invocation. See https://docs.getdbt.com/reference/global-configs for a full list of configuration. profile (Optional[str]): The profile from your dbt `profiles.yml` to use for execution. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information. target (Optional[str]): The target from your dbt `profiles.yml` to use for execution. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information. Examples: .. code-block:: python from dagster_dbt import DbtCliResource dbt = DbtCliResource( project_dir="/path/to/dbt/project", global_config_flags=["--no-use-colors"], profile="jaffle_shop", target="dev", ) """ project_dir: str = Field( ..., description=( "The path to your dbt project directory. This directory should contain a" " `dbt_project.yml`. See https://docs.getdbt.com/reference/dbt_project.yml for more" " information." ), ) global_config_flags: List[str] = Field( default=[], description=( "A list of global flags configuration to pass to the dbt CLI invocation. See" " https://docs.getdbt.com/reference/global-configs for a full list of configuration." ), ) profile: Optional[str] = Field( default=None, description=( "The profile from your dbt `profiles.yml` to use for execution. See" " https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more" " information." ), ) target: Optional[str] = Field( default=None, description=( "The target from your dbt `profiles.yml` to use for execution. See" " https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more" " information." ), ) def _get_unique_target_path(self, *, context: Optional[OpExecutionContext]) -> str: """Get a unique target path for the dbt CLI invocation. Args: context (Optional[OpExecutionContext]): The execution context. Returns: str: A unique target path for the dbt CLI invocation. """ current_unix_timestamp = str(int(time.time())) path = current_unix_timestamp if context: path = f"{context.op.name}-{context.run_id[:7]}-{current_unix_timestamp}" return f"target/{path}" def cli( self, args: List[str], *, raise_on_error: bool = True, manifest: Optional[Mapping[str, Any]] = None, dagster_dbt_translator: Optional[DagsterDbtTranslator] = None, context: Optional[OpExecutionContext] = None, ) -> DbtCliInvocation: """Execute a dbt command. Args: args (List[str]): The dbt CLI command to execute. raise_on_error (bool): Whether to raise an exception if the dbt CLI command fails. manifest (Optional[Mapping[str, Any]]): The dbt manifest blob. If an execution context from within `@dbt_assets` is provided to the context argument, then the manifest provided to `@dbt_assets` will be used. dagster_dbt_translator (Optional[DagsterDbtTranslator]): The translator to link dbt nodes to Dagster assets. If an execution context from within `@dbt_assets` is provided to the context argument, then the dagster_dbt_translator provided to `@dbt_assets` will be used. context (Optional[OpExecutionContext]): The execution context from within `@dbt_assets`. Returns: DbtCliInvocation: A task that can be used to retrieve the output of the dbt CLI command. Examples: .. code-block:: python from pathlib import Path from dagster_dbt import DbtCliResource, dbt_assets @dbt_assets(manifest=Path("target", "manifest.json")) def my_dbt_assets(context, dbt: DbtCliResource): yield from dbt.cli(["run"], context=context).stream() """ target_path = self._get_unique_target_path(context=context) env = { **os.environ.copy(), # Run dbt with unbuffered output. "PYTHONUNBUFFERED": "1", # The DBT_LOG_FORMAT environment variable must be set to `json`. We use this # environment variable to ensure that the dbt CLI outputs structured logs. "DBT_LOG_FORMAT": "json", # The DBT_TARGET_PATH environment variable is set to a unique value for each dbt # invocation so that artifact paths are separated. # See https://discourse.getdbt.com/t/multiple-run-results-json-and-manifest-json-files/7555 # for more information. "DBT_TARGET_PATH": target_path, } assets_def: Optional[AssetsDefinition] = None with suppress(DagsterInvalidPropertyError): assets_def = context.assets_def if context else None if context and assets_def is not None: manifest, dagster_dbt_translator = get_manifest_and_translator_from_dbt_assets( [assets_def] ) selection_args = get_subset_selection_for_context( context=context, manifest=manifest, select=context.op.tags.get("dagster-dbt/select"), exclude=context.op.tags.get("dagster-dbt/exclude"), ) else: selection_args: List[str] = [] if manifest is None: check.failed( "Must provide a value for the manifest argument if not executing as part of" " @dbt_assets" ) dagster_dbt_translator = dagster_dbt_translator or DagsterDbtTranslator() # TODO: verify that args does not have any selection flags if the context and manifest # are passed to this function. profile_args: List[str] = [] if self.profile: profile_args = ["--profile", self.profile] if self.target: profile_args += ["--target", self.target] args = ["dbt"] + self.global_config_flags + args + profile_args + selection_args project_dir = Path(self.project_dir).resolve(strict=True) return DbtCliInvocation.run( args=args, env=env, manifest=manifest, dagster_dbt_translator=dagster_dbt_translator, project_dir=project_dir, target_path=project_dir.joinpath(target_path), raise_on_error=raise_on_error, )
def get_subset_selection_for_context( context: OpExecutionContext, manifest: Mapping[str, Any], select: Optional[str], exclude: Optional[str], ) -> List[str]: """Generate a dbt selection string to materialize the selected resources in a subsetted execution context. See https://docs.getdbt.com/reference/node-selection/syntax#how-does-selection-work. Args: context (OpExecutionContext): The execution context for the current execution step. select (Optional[str]): A dbt selection string to select resources to materialize. exclude (Optional[str]): A dbt selection string to exclude resources from materializing. Returns: List[str]: dbt CLI arguments to materialize the selected resources in a subsetted execution context. If the current execution context is not performing a subsetted execution, return CLI arguments composed of the inputed selection and exclusion arguments. """ default_dbt_selection = [] if select: default_dbt_selection += ["--select", select] if exclude: default_dbt_selection += ["--exclude", exclude] node_info_by_output_name = get_node_info_by_output_name(manifest) # TODO: this should be a property on the context if this is a permanent indicator for # determining whether the current execution context is performing a subsetted execution. is_subsetted_execution = len(context.selected_output_names) != len( context.assets_def.node_keys_by_output_name ) if not is_subsetted_execution: logger.info( "A dbt subsetted execution is not being performed. Using the default dbt selection" f" arguments `{default_dbt_selection}`." ) return default_dbt_selection selected_dbt_resources = [] for output_name in context.selected_output_names: node_info = node_info_by_output_name[output_name] # Explicitly select a dbt resource by its fully qualified name (FQN). # https://docs.getdbt.com/reference/node-selection/methods#the-file-or-fqn-method fqn_selector = f"fqn:{'.'.join(node_info['fqn'])}" selected_dbt_resources.append(fqn_selector) # Take the union of all the selected resources. # https://docs.getdbt.com/reference/node-selection/set-operators#unions union_selected_dbt_resources = ["--select"] + [" ".join(selected_dbt_resources)] logger.info( "A dbt subsetted execution is being performed. Overriding default dbt selection" f" arguments `{default_dbt_selection}` with arguments: `{union_selected_dbt_resources}`" ) return union_selected_dbt_resources def get_node_info_by_output_name(manifest: Mapping[str, Any]) -> Mapping[str, Mapping[str, Any]]: node_info_by_dbt_unique_id = get_node_info_by_dbt_unique_id_from_manifest(manifest) return { output_name_fn(node): node for node in node_info_by_dbt_unique_id.values() if node["resource_type"] in ASSET_RESOURCE_TYPES }