import json
import time
from contextlib import contextmanager
from typing import Any, Dict, Mapping, Optional
import dagster._check as check
import yaml
from dagster import ConfigurableResource, IAttachDifferentObjectToOpContext, resource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
from pydantic import Field
from .configs import define_dataproc_create_cluster_config
from .types import DataprocError
TWENTY_MINUTES = 20 * 60
DEFAULT_ITER_TIME_SEC = 5
class DataprocClient:
"""Builds a client to the dataproc API."""
def __init__(self, config):
# Use Application Default Credentials to check the
# GOOGLE_APPLICATION_CREDENTIALS environment variable
# for the location of the service account key file.
credentials = GoogleCredentials.get_application_default()
# See https://github.com/googleapis/google-api-python-client/issues/299 for the
# cache_discovery=False configuration below
self.dataproc = build("dataproc", "v1", credentials=credentials, cache_discovery=False)
self.config = config
(self.project_id, self.region, self.cluster_name, self.cluster_config) = (
self.config.get(k) for k in ("projectId", "region", "clusterName", "cluster_config")
)
@property
def dataproc_clusters(self):
return (
# Google APIs dynamically genned, so pylint pukes
self.dataproc.projects()
.regions()
.clusters()
)
@property
def dataproc_jobs(self):
return (
# Google APIs dynamically genned, so pylint pukes
self.dataproc.projects()
.regions()
.jobs()
)
def create_cluster(self):
(
self.dataproc_clusters.create(
projectId=self.project_id,
region=self.region,
body={
"projectId": self.project_id,
"clusterName": self.cluster_name,
"config": self.cluster_config,
},
).execute()
)
def iter_fn():
# TODO: Add logging
# See: https://bit.ly/2UW5JaN
cluster = self.get_cluster()
return cluster["status"]["state"] in {"RUNNING", "UPDATING"}
done = DataprocClient._iter_and_sleep_until_ready(iter_fn) # noqa: SLF001
if not done:
cluster = self.get_cluster()
raise DataprocError(
"Could not provision cluster -- status: %s" % str(cluster["status"])
)
def get_cluster(self):
return self.dataproc_clusters.get(
projectId=self.project_id, region=self.region, clusterName=self.cluster_name
).execute()
def delete_cluster(self):
return self.dataproc_clusters.delete(
projectId=self.project_id, region=self.region, clusterName=self.cluster_name
).execute()
def submit_job(self, job_details):
return self.dataproc_jobs.submit(
projectId=self.project_id, region=self.region, body=job_details
).execute()
def get_job(self, job_id):
return self.dataproc_jobs.get(
projectId=self.project_id, region=self.region, jobId=job_id
).execute()
def wait_for_job(self, job_id, wait_timeout=TWENTY_MINUTES):
"""This method polls job status every 5 seconds."""
# TODO: Add logging here print('Waiting for job ID {} to finish...'.format(job_id))
def iter_fn():
# See: https://bit.ly/2Lg2tHr
result = self.get_job(job_id)
# Handle exceptions
if result["status"]["state"] in {"CANCELLED", "ERROR"}:
raise DataprocError("Job error: %s" % str(result["status"]))
if result["status"]["state"] == "DONE":
return True
return False
done = DataprocClient._iter_and_sleep_until_ready( # noqa: SLF001
iter_fn, max_wait_time_sec=wait_timeout
)
if not done:
job = self.get_job(job_id)
raise DataprocError("Job run timed out: %s" % str(job["status"]))
@staticmethod
def _iter_and_sleep_until_ready(
callable_fn, max_wait_time_sec=TWENTY_MINUTES, iter_time=DEFAULT_ITER_TIME_SEC
):
"""Iterates and sleeps until callable_fn returns true."""
# Wait for cluster ready state
ready, curr_iter = False, 0
max_iter = max_wait_time_sec / iter_time
while not ready and curr_iter < max_iter:
ready = callable_fn()
time.sleep(iter_time)
curr_iter += 1
# Will return false if ran up to max_iter without success
return ready
@contextmanager
def cluster_context_manager(self):
"""Context manager allowing execution with a dataproc cluster.
Example:
.. code-block::
with context.resources.dataproc.cluster as cluster:
# do stuff...
"""
self.create_cluster()
try:
yield self
finally:
self.delete_cluster()
[docs]class DataprocResource(ConfigurableResource, IAttachDifferentObjectToOpContext):
"""Resource for connecting to a Dataproc cluster.
Example:
.. code-block::
@asset
def my_asset(dataproc: DataprocResource):
with dataproc.get_client() as client:
# client is a dagster_gcp.DataprocClient
...
"""
project_id: str = Field(
description=(
"Required. Project ID for the project which the client acts on behalf of. Will be"
" passed when creating a dataset/job."
)
)
region: str = Field(description="The GCP region.")
cluster_name: str = Field(
description=(
"Required. The cluster name. Cluster names within a project must be unique. Names of"
" deleted clusters can be reused."
)
)
cluster_config_yaml_path: Optional[str] = Field(
default=None,
description=(
"Full path to a YAML file containing cluster configuration. See"
" https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig for"
" configuration options. Only one of cluster_config_yaml_path,"
" cluster_config_json_path, or cluster_config_dict may be provided."
),
)
cluster_config_json_path: Optional[str] = Field(
default=None,
description=(
"Full path to a JSON file containing cluster configuration. See"
" https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig for"
" configuration options. Only one of cluster_config_yaml_path,"
" cluster_config_json_path, or cluster_config_dict may be provided."
),
)
cluster_config_dict: Optional[Dict[str, Any]] = Field(
default=None,
description=(
"Python dictionary containing cluster configuration. See"
" https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig for"
" configuration options. Only one of cluster_config_yaml_path,"
" cluster_config_json_path, or cluster_config_dict may be provided."
),
)
@classmethod
def _is_dagster_maintained(cls) -> bool:
return True
def _read_yaml_config(self, path: str) -> Mapping[str, Any]:
with open(path, "r", encoding="utf8") as f:
return yaml.safe_load(f)
def _read_json_config(self, path: str) -> Mapping[str, Any]:
with open(path, "r", encoding="utf8") as f:
return json.load(f)
def _get_cluster_config(self) -> Optional[Mapping[str, Any]]:
methods = 0
methods += 1 if self.cluster_config_dict is not None else 0
methods += 1 if self.cluster_config_json_path is not None else 0
methods += 1 if self.cluster_config_yaml_path is not None else 0
# ensure that at most 1 method is provided
check.invariant(
methods <= 1,
(
"Dataproc Resource: Incorrect config: Cannot provide cluster config multiple ways."
" Choose one of cluster_config_dict, cluster_config_json_path, or"
" cluster_config_yaml_path"
),
)
cluster_config = None
if self.cluster_config_json_path:
cluster_config = self._read_json_config(self.cluster_config_json_path)
elif self.cluster_config_yaml_path:
cluster_config = self._read_yaml_config(self.cluster_config_yaml_path)
elif self.cluster_config_dict:
cluster_config = self.cluster_config_dict
return cluster_config
def get_client(self) -> DataprocClient:
cluster_config = self._get_cluster_config()
client_config_dict = {
"projectId": self.project_id,
"region": self.region,
"clusterName": self.cluster_name,
"cluster_config": cluster_config,
}
return DataprocClient(config=client_config_dict)
def get_object_to_set_on_execution_context(self) -> Any:
return self.get_client()
[docs]@dagster_maintained_resource
@resource(
config_schema=define_dataproc_create_cluster_config(),
description="Manage a Dataproc cluster resource",
)
def dataproc_resource(context):
return DataprocClient(context.resource_config)