Source code for dagster_databricks.resources

from typing import Any, Optional

from dagster import (
    ConfigurableResource,
    IAttachDifferentObjectToOpContext,
    resource,
)
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from pydantic import Field

from .databricks import DatabricksClient


class DatabricksClientResource(ConfigurableResource, IAttachDifferentObjectToOpContext):
    """Resource which provides a Python client for interacting with Databricks within an
    op or asset.
    """

    host: str = Field(description="Databricks host, e.g. uksouth.azuredatabricks.com")
    token: str = Field(description="Databricks access token")
    workspace_id: Optional[str] = Field(
        default=None,
        description=(
            "The Databricks workspace ID, as described in"
            " https://docs.databricks.com/workspace/workspace-details.html#workspace-instance-names-urls-and-ids."
            " This is used to log a URL for accessing the job in the Databricks UI."
        ),
    )

    @classmethod
    def _is_dagster_maintained(cls) -> bool:
        return True

    def get_client(self) -> DatabricksClient:
        return DatabricksClient(
            host=self.host,
            token=self.token,
            workspace_id=self.workspace_id,
        )

    def get_object_to_set_on_execution_context(self) -> Any:
        return self.get_client()


[docs]@dagster_maintained_resource @resource(config_schema=DatabricksClientResource.to_config_schema()) def databricks_client(init_context) -> DatabricksClient: return DatabricksClientResource.from_resource_context(init_context).get_client()