Using Dagster with Google BigQuery#

This tutorial focuses on how to store and load Dagster's software-defined assets (SDAs) in BigQuery.

By the end of the tutorial, you will:

  • Configure a BigQuery I/O manager
  • Create a table in BigQuery using a Dagster asset
  • Make a BigQuery table available in Dagster
  • Load BigQuery tables in downstream assets

This guide focuses on storing and loading Pandas DataFrames in BigQuery. Dagster also supports using PySpark DataFrames with BigQuery. The concepts from this guide apply to working with PySpark DataFrames, and you can learn more about setting up and using the BigQuery I/O manager with PySpark DataFrames in the reference guide.


Prerequisites#

To complete this tutorial, you'll need:

  • To install the dagster-gcp and dagster-gcp-pandas libraries:

    pip install dagster-gcp dagster-gcp-pandas
    
  • To gather the following information:

    • Google Cloud Project (GCP) project name: You can find this by logging into GCP and choosing one of the project names listed in the dropdown in the top left corner.

    • GCP credentials: You can authenticate with GCP two ways: by following GCP authentication instructions here, or by providing credentials directly to the BigQuery I/O manager.

      In this guide, we assume that you have run one of the gcloud auth commands or have set GOOGLE_APPLICATION_CREDENTIALS as specified in the linked instructions. For more information on providing credentials directly to the BigQuery I/O manager, see Providing credentials as configuration in the BigQuery reference guide.


Step 1: Configure the BigQuery I/O manager#

The BigQuery I/O manager requires some configuration to connect to your Bigquery instance:

You can also specify a location where data should be stored and processed and dataset that should hold the created tables. You can also set a timeout and number of retries when working with Pandas DataFrames.

from dagster_gcp_pandas import BigQueryPandasIOManager

from dagster import Definitions

defs = Definitions(
    assets=[iris_data],
    resources={
        "io_manager": BigQueryPandasIOManager(
            project="my-gcp-project",  # required
            location="us-east5",  # optional, defaults to the default location for the project - see https://cloud.google.com/bigquery/docs/locations for a list of locations
            dataset="IRIS",  # optional, defaults to PUBLIC
            timeout=15.0,  # optional, defaults to None
        )
    },
)

With this configuration, if you materialized an asset called iris_data, the BigQuery I/O manager would store the data in the IRIS.IRIS_DATA table in the my-gcp-project project. The BigQuery instance would be located in us-east5.

Finally, in the Definitions object, we assign the BigQueryPandasIOManager to the io_manager key. io_manager is a reserved key to set the default I/O manager for your assets.

For more info about each of the configuration values, refer to the BigQueryPandasIOManager API documentation.


Step 2: Create tables in BigQuery#

The BigQuery I/O manager can create and update tables for your Dagster defined assets, but you can also make existing BigQuery tables available to Dagster.

Store a Dagster asset as a table in BigQuery#

To store data in BigQuery using the BigQuery I/O manager, the definitions of your assets don't need to change. You can tell Dagster to use the BigQuery I/O manager, like in Step 1: Configure the BigQuery I/O manager, and Dagster will handle storing and loading your assets in BigQuery.

import pandas as pd

from dagster import asset


@asset
def iris_data() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )

In this example, we first define our asset. Here, we are fetching the Iris dataset as a Pandas DataFrame and renaming the columns. The type signature of the function tells the I/O manager what data type it is working with, so it is important to include the return type pd.DataFrame.

When Dagster materializes the iris_data asset using the configuration from Step 1: Configure the BigQuery I/O manager, the BigQuery I/O manager will create the table IRIS.IRIS_DATA if it does not exist and replace the contents of the table with the value returned from the iris_data asset.


Step 3: Load BigQuery tables in downstream assets#

Once you have created an asset or source asset that represents a table in BigQuery, you will likely want to create additional assets that work with the data. Dagster and the BigQuery I/O manager allow you to load the data stored in BigQuery tables into downstream assets.

import pandas as pd

from dagster import asset

# this example uses the iris_data asset from Step 2


@asset
def iris_cleaned(iris_data: pd.DataFrame) -> pd.DataFrame:
    return iris_data.dropna().drop_duplicates()

In this example, we want to provide the iris_data asset from the Store a Dagster asset as a table in BigQuery example to the iris_cleaned asset.

In iris_cleaned, the iris_data parameter tells Dagster that the value for the iris_data asset should be provided as input to iris_cleaned. If this feels too magical for you, refer to the docs for explicitly specifying dependencies.

When materializing these assets, Dagster will use the BigQueryPandasIOManager to fetch the IRIS.IRIS_DATA as a Pandas DataFrame and pass this DataFrame as the iris_data parameter to iris_cleaned. When iris_cleaned returns a Pandas DataFrame, Dagster will use the BigQueryPandasIOManager to store the DataFrame as the IRIS.IRIS_CLEANED table in BigQuery.


Completed code example#

When finished, your code should look like the following:

import pandas as pd
from dagster_gcp_pandas import BigQueryPandasIOManager

from dagster import Definitions, SourceAsset, asset

iris_harvest_data = SourceAsset(key="iris_harvest_data")


@asset
def iris_data() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )


@asset
def iris_cleaned(iris_data: pd.DataFrame) -> pd.DataFrame:
    return iris_data.dropna().drop_duplicates()


defs = Definitions(
    assets=[iris_data, iris_harvest_data, iris_cleaned],
    resources={
        "io_manager": BigQueryPandasIOManager(
            project="my-gcp-project",
            location="us-east5",
            dataset="IRIS",
            timeout=15.0,
        )
    },
)

For more BigQuery features, refer to the BigQuery reference.

For more information on software-defined assets, refer to the tutorial or the Assets concept documentation.

For more information on I/O managers, refer to the I/O manager concept documentation.