Source code for dagster_gcp.bigquery.resources
from contextlib import contextmanager
from typing import Any, Iterator, Optional
from dagster import ConfigurableResource, IAttachDifferentObjectToOpContext, resource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from google.cloud import bigquery
from pydantic import Field
from .utils import setup_gcp_creds
[docs]class BigQueryResource(ConfigurableResource, IAttachDifferentObjectToOpContext):
"""Resource for interacting with Google BigQuery.
Examples:
.. code-block:: python
from dagster import Definitions, asset
from dagster_gcp import BigQueryResource
@asset
def my_table(bigquery: BigQueryResource):
with bigquery.get_client() as client:
client.query("SELECT * FROM my_dataset.my_table")
defs = Definitions(
assets=[my_table],
resources={
"bigquery": BigQueryResource(project="my-project")
}
)
"""
project: Optional[str] = Field(
default=None,
description=(
"Project ID for the project which the client acts on behalf of. Will be passed when"
" creating a dataset / job. If not passed, falls back to the default inferred from the"
" environment."
),
)
location: Optional[str] = Field(
default=None,
description="Default location for jobs / datasets / tables.",
)
gcp_credentials: Optional[str] = Field(
default=None,
description=(
"GCP authentication credentials. If provided, a temporary file will be created"
" with the credentials and ``GOOGLE_APPLICATION_CREDENTIALS`` will be set to the"
" temporary file. To avoid issues with newlines in the keys, you must base64"
" encode the key. You can retrieve the base64 encoded key with this shell"
" command: ``cat $GOOGLE_AUTH_CREDENTIALS | base64``"
),
)
@classmethod
def _is_dagster_maintained(cls) -> bool:
return True
@contextmanager
def get_client(self) -> Iterator[bigquery.Client]:
"""Context manager to create a BigQuery Client.
Examples:
.. code-block:: python
from dagster import asset
from dagster_gcp import BigQueryResource
@asset
def my_table(bigquery: BigQueryResource):
with bigquery.get_client() as client:
client.query("SELECT * FROM my_dataset.my_table")
"""
if self.gcp_credentials:
with setup_gcp_creds(self.gcp_credentials):
yield bigquery.Client(project=self.project, location=self.location)
else:
yield bigquery.Client(project=self.project, location=self.location)
def get_object_to_set_on_execution_context(self) -> Any:
with self.get_client() as client:
yield client
[docs]@dagster_maintained_resource
@resource(
config_schema=BigQueryResource.to_config_schema(),
description="Dagster resource for connecting to BigQuery",
)
def bigquery_resource(context):
bq_resource = BigQueryResource.from_resource_context(context)
with bq_resource.get_client() as client:
yield client