dagster-duckdb integration reference#

This reference page provides information for working with dagster-duckdb features that are not covered as part of the Using Dagster with DuckDB tutorial.


Selecting specific columns in a downstream asset#

Sometimes you may not want to fetch an entire table as the input to a downstream asset. With the DuckDB I/O manager, you can select specific columns to load by supplying metadata on the downstream asset.

import pandas as pd

from dagster import AssetIn, asset

# this example uses the iris_dataset asset from Step 2 of the Using Dagster with DuckDB tutorial


@asset(
    ins={
        "iris_sepal": AssetIn(
            key="iris_dataset",
            metadata={"columns": ["sepal_length_cm", "sepal_width_cm"]},
        )
    }
)
def sepal_data(iris_sepal: pd.DataFrame) -> pd.DataFrame:
    iris_sepal["sepal_area_cm2"] = (
        iris_sepal["sepal_length_cm"] * iris_sepal["sepal_width_cm"]
    )
    return iris_sepal

In this example, we only use the columns containing sepal data from the IRIS_DATASET table created in Step 2: Create tables in DuckDB of the Using Dagster with DuckDB tutorial. To select specific columns, we can add metadata to the input asset. We do this in the metadata parameter of the AssetIn that loads the iris_dataset asset in the ins parameter. We supply the key columns with a list of names of the columns we want to fetch.

When Dagster materializes sepal_data and loads the iris_dataset asset using the DuckDB I/O manager, it will only fetch the sepal_length_cm and sepal_width_cm columns of the IRIS.IRIS_DATASET table and pass them to sepal_data as a Pandas DataFrame.


Storing partitioned assets#

The DuckDB I/O manager supports storing and loading partitioned data. To correctly store and load data from the DuckDB table, the DuckDB I/O manager needs to know which column contains the data defining the partition bounds. The DuckDB I/O manager uses this information to construct the correct queries to select or replace the data.

In the following sections, we describe how the I/O manager constructs these queries for different types of partitions.

Storing static partitioned assets#

To store static partitioned assets in DuckDB, specify partition_expr metadata on the asset to tell the DuckDB I/O manager which column contains the partition data:

import pandas as pd

from dagster import StaticPartitionsDefinition, asset


@asset(
    partitions_def=StaticPartitionsDefinition(
        ["Iris-setosa", "Iris-virginica", "Iris-versicolor"]
    ),
    metadata={"partition_expr": "SPECIES"},
)
def iris_dataset_partitioned(context) -> pd.DataFrame:
    species = context.asset_partition_key_for_output()

    full_df = pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )

    return full_df[full_df["Species"] == species]


@asset
def iris_cleaned(iris_dataset_partitioned: pd.DataFrame):
    return iris_dataset_partitioned.dropna().drop_duplicates()

Dagster uses the partition_expr metadata to craft the SELECT statement when loading the partition in the downstream asset. When loading a static partition (or multiple static partitions), the following statement is used:

SELECT *
 WHERE [partition_expr] in ([selected partitions])

When the partition_expr value is injected into this statement, the resulting SQL query must follow DuckDB's SQL syntax. Refer to the DuckDB documentation for more information.

A partition must be selected when materializing the above assets, as described in the Materializing partitioned assets documentation. In this example, the query used when materializing the Iris-setosa partition of the above assets would be:

SELECT *
 WHERE SPECIES in ('Iris-setosa')

Storing tables in multiple schemas#

You may want to have different assets stored in different DuckDB schemas. The DuckDB I/O manager allows you to specify the schema in several ways.

If you want all of your assets to be stored in the same schema, you can specify the schema as configuration to the I/O manager, as we did in Step 1: Configure the DuckDB I/O manager of the Using Dagster with DuckDB tutorial.

If you want to store assets in different schemas, you can specify the schema as part of the asset's asset key:

  • For SourceAsset, use the key parameter. The schema should be the second-to-last value in the parameter. In the following example, this would be daffodil.
  • For Software-defined Assets, use the key_prefix parameter. This value will be prepended to the asset name to create the full asset key. In the following example, this would be iris.
import pandas as pd

from dagster import SourceAsset, asset

daffodil_dataset = SourceAsset(key=["daffodil", "daffodil_dataset"])


@asset(key_prefix=["iris"])
def iris_dataset() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )

In this example, the iris_dataset asset will be stored in the IRIS schema, and the daffodil_dataset asset will be found in the DAFFODIL schema.

The two options for specifying schema are mutually exclusive. If you provide schema configuration to the I/O manager, you cannot also provide it via the asset key and vice versa. If no schema is provided, either from configuration or asset keys, the default schema PUBLIC will be used.


Using the DuckDB I/O manager with other I/O managers#

You may have assets that you don't want to store in DuckDB. You can provide an I/O manager to each asset using the io_manager_key parameter in the @asset decorator:

import pandas as pd
from dagster_aws.s3.io_manager import s3_pickle_io_manager
from dagster_duckdb_pandas import DuckDBPandasIOManager

from dagster import Definitions, asset


@asset(io_manager_key="warehouse_io_manager")
def iris_dataset() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )


@asset(io_manager_key="blob_io_manager")
def iris_plots(iris_dataset):
    # plot_data is a function we've defined somewhere else
    # that plots the data in a DataFrame
    return plot_data(iris_dataset)


defs = Definitions(
    assets=[iris_dataset, iris_plots],
    resources={
        "warehouse_io_manager": DuckDBPandasIOManager(
            database="path/to/my_duckdb_database.duckdb",
            schema="IRIS",
        ),
        "blob_io_manager": s3_pickle_io_manager,
    },
)

In this example:

  • The iris_dataset asset uses the I/O manager bound to the key warehouse_io_manager and iris_plots uses the I/O manager bound to the key blob_io_manager
  • In the Definitions object, we supply the I/O managers for those keys
  • When the assets are materialized, the iris_dataset will be stored in DuckDB, and iris_plots will be saved in Amazon S3

Storing and loading PySpark or Polars DataFrames in DuckDB#

The DuckDB I/O manager also supports storing and loading PySpark and Polars DataFrames.

Storing and loading PySpark DataFrames in DuckDB#

To use the DuckDBPySparkIOManager, first install the package:

pip install dagster-duckdb-pyspark

Then you can use the DuckDBPySparkIOManager in your Definitions as in Step 1: Configure the DuckDB I/O manager of the Using Dagster with DuckDB tutorial.

from dagster_duckdb_pyspark import DuckDBPySparkIOManager

from dagster import Definitions

defs = Definitions(
    assets=[iris_dataset],
    resources={
        "io_manager": DuckDBPySparkIOManager(
            database="path/to/my_duckdb_database.duckdb",  # required
            schema="IRIS",  # optional, defaults to PUBLIC
        )
    },
)

The DuckDBPySparkIOManager requires an active SparkSession. You can either create your own SparkSession or use the spark_resource.

from dagster_duckdb_pyspark import DuckDBPySparkIOManager
from dagster_pyspark import pyspark_resource
from pyspark import SparkFiles
from pyspark.sql import (
    DataFrame,
)
from pyspark.sql.types import (
    DoubleType,
    StringType,
    StructField,
    StructType,
)

from dagster import Definitions, asset


@asset(required_resource_keys={"pyspark"})
def iris_dataset(context) -> DataFrame:
    spark = context.resources.pyspark.spark_session

    schema = StructType(
        [
            StructField("sepal_length_cm", DoubleType()),
            StructField("sepal_width_cm", DoubleType()),
            StructField("petal_length_cm", DoubleType()),
            StructField("petal_width_cm", DoubleType()),
            StructField("species", StringType()),
        ]
    )

    url = "https://docs.dagster.io/assets/iris.csv"
    spark.sparkContext.addFile(url)

    return spark.read.schema(schema).csv("file://" + SparkFiles.get("iris.csv"))


defs = Definitions(
    assets=[iris_dataset],
    resources={
        "io_manager": DuckDBPySparkIOManager(
            database="path/to/my_duckdb_database.duckdb",
            schema="IRIS",
        ),
        "pyspark": pyspark_resource,
    },
)

Storing multiple DataFrame types in DuckDB#

If you work with several DataFrame libraries and want a single I/O manager to handle storing and loading these DataFrames in DuckDB, you can write a new I/O manager that handles the DataFrame types.

To do this, inherit from the DuckDBIOManager base class and implement the type_handlers and default_load_type methods. The resulting I/O manager will inherit the configuration fields of the base DuckDBIOManager.

from typing import Optional, Type

import pandas as pd
from dagster_duckdb import DuckDBIOManager
from dagster_duckdb_pandas import DuckDBPandasTypeHandler
from dagster_duckdb_polars import DuckDBPolarsTypeHandler
from dagster_duckdb_pyspark import DuckDBPySparkTypeHandler

from dagster import Definitions


class DuckDBPandasPySparkPolarsIOManager(DuckDBIOManager):
    @staticmethod
    def type_handlers():
        """type_handlers should return a list of the TypeHandlers that the I/O manager can use.
        Here we return the DuckDBPandasTypeHandler, DuckDBPySparkTypeHandler, and DuckDBPolarsTypeHandler so that the I/O
        manager can store Pandas DataFrames, PySpark DataFrames, and Polars DataFrames.
        """
        return [
            DuckDBPandasTypeHandler(),
            DuckDBPySparkTypeHandler(),
            DuckDBPolarsTypeHandler(),
        ]

    @staticmethod
    def default_load_type() -> Optional[Type]:
        """If an asset is not annotated with an return type, default_load_type will be used to
        determine which TypeHandler to use to store and load the output.
        In this case, unannotated assets will be stored and loaded as Pandas DataFrames.
        """
        return pd.DataFrame


defs = Definitions(
    assets=[iris_dataset, rose_dataset],
    resources={
        "io_manager": DuckDBPandasPySparkPolarsIOManager(
            database="path/to/my_duckdb_database.duckdb",
            schema="IRIS",
        )
    },
)

Executing custom SQL commands with the DuckDB resource#

In addition to the DuckDB I/O manager, Dagster also provides a DuckDB resource for executing custom SQL queries.

import pandas as pd
from dagster_duckdb import DuckDBResource

from dagster import Definitions, asset

# this example executes a query against the IRIS_DATASET table created in Step 2 of the
# Using Dagster with DuckDB tutorial


@asset
def small_petals(duckdb: DuckDBResource) -> pd.DataFrame:
    with duckdb.get_connection() as conn:
        return (
            conn.cursor()
            .execute(
                "SELECT * FROM IRIS_DATASET WHERE 'petal_length_cm' < 1 AND"
                " 'petal_width_cm' < 1"
            )
            .fetch_df()
        )


defs = Definitions(
    assets=[small_petals],
    resources={
        "duckdb": DuckDBResource(
            database="path/to/my_duckdb_database.duckdb",
            schema="IRIS",
        )
    },
)

In this example, we attach the DuckDB resource to the small_petals asset. In the body of the asset function, we use the get_connection context manager on the resource to get a duckdb.DuckDBPyConnection. We can use this connection to execute a custom SQL query against the IRIS_DATASET table created in Step 2: Create tables in DuckDB of the Using Dagster with DuckDB tutorial.

For more information on the DuckDB resource, including additional configuration settings, see the DuckDB resource API docs.