Source code for dagster_dbt.dagster_dbt_translator

from dataclasses import dataclass
from typing import Any, Mapping, Optional

from dagster import AssetKey
from dagster._core.definitions.events import (
    CoercibleToAssetKeyPrefix,
    check_opt_coercible_to_asset_key_prefix_param,
)

from .asset_utils import (
    default_asset_key_fn,
    default_description_fn,
    default_group_from_dbt_resource_props,
    default_metadata_from_dbt_resource_props,
)


[docs]class DagsterDbtTranslator: """Holds a set of methods that derive Dagster asset definition metadata given a representation of a dbt resource (models, tests, sources, etc). This class is exposed so that methods can be overriden to customize how Dagster asset metadata is derived. """ @classmethod def get_asset_key(cls, dbt_resource_props: Mapping[str, Any]) -> AssetKey: """A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster asset key that represents that resource. Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json This method can be overridden to provide a custom asset key for a dbt resource. Args: dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource. Returns: AssetKey: The Dagster asset key for the dbt resource. Examples: .. code-block:: python from typing import Any, Mapping from dagster import AssetKey from dagster_dbt import DagsterDbtTranslator class CustomDagsterDbtTranslator(DagsterDbtTranslator): @classmethod def get_asset_key(cls, dbt_resource_props: Mapping[str, Any]) -> AssetKey: return AssetKey([dbt_resource_props["alias"]]).with_prefix("prefix") """ return default_asset_key_fn(dbt_resource_props) @classmethod def get_description(cls, dbt_resource_props: Mapping[str, Any]) -> str: """A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster description for that resource. Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json This method can be overridden to provide a custom description for a dbt resource. Args: dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource. Returns: str: The description for the dbt resource. Examples: .. code-block:: python from typing import Any, Mapping from dagster_dbt import DagsterDbtTranslator class CustomDagsterDbtTranslator(DagsterDbtTranslator): @classmethod def get_description(cls, dbt_resource_props: Mapping[str, Any]) -> str: return "custom description" """ return default_description_fn(dbt_resource_props) @classmethod def get_metadata(cls, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]: """A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster metadata for that resource. Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json This method can be overridden to provide a custom metadata for a dbt resource. Args: node_info (Mapping[str, Any]): A dictionary representing the dbt resource. Returns: Mapping[str, Any]: A dictionary representing the Dagster metadata for the dbt resource. Examples: .. code-block:: python from typing import Any, Mapping from dagster_dbt import DagsterDbtTranslator class CustomDagsterDbtTranslator(DagsterDbtTranslator): @classmethod def get_metadata(cls, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]: return {"custom": "metadata"} """ return default_metadata_from_dbt_resource_props(dbt_resource_props) @classmethod def get_group_name(cls, dbt_resource_props: Mapping[str, Any]) -> Optional[str]: """A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster group name for that resource. Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json This method can be overridden to provide a custom metadata for a dbt resource. Args: node_info (Mapping[str, Any]): A dictionary representing the dbt resource. Returns: Optional[str]: A Dagster group name. Examples: .. code-block:: python from typing import Any, Mapping from dagster_dbt import DagsterDbtTranslator class CustomDagsterDbtTranslator(DagsterDbtTranslator): @classmethod def get_group(cls, dbt_resource_props: Mapping[str, Any]) -> Optional[str]: return "custom_group_prefix" + node_info.get("config", {}).get("group") """ return default_group_from_dbt_resource_props(dbt_resource_props)
class KeyPrefixDagsterDbtTranslator(DagsterDbtTranslator): """A DagsterDbtTranslator that applies prefixes to the asset keys generated from dbt resources. Attributes: asset_key_prefix (Optional[Union[str, Sequence[str]]]): A prefix to apply to all dbt models, seeds, snapshots, etc. This will *not* apply to dbt sources. source_asset_key_prefix (Optional[Union[str, Sequence[str]]]): A prefix to apply to all dbt sources. """ def __init__( self, asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, source_asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, ): self._asset_key_prefix = ( check_opt_coercible_to_asset_key_prefix_param(asset_key_prefix, "asset_key_prefix") or [] ) self._source_asset_key_prefix = ( check_opt_coercible_to_asset_key_prefix_param( source_asset_key_prefix, "source_asset_key_prefix" ) or [] ) def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey: base_key = default_asset_key_fn(dbt_resource_props) if dbt_resource_props["resource_type"] == "source": return base_key.with_prefix(self._source_asset_key_prefix) else: return base_key.with_prefix(self._asset_key_prefix) @dataclass class DbtManifestWrapper: manifest: Mapping[str, Any]