Using Dagster with DuckDB#

This tutorial focuses on how to store and load Dagster's Software-defined Assets (SDAs) in DuckDB.

By the end of the tutorial, you will:

  • Configure a DuckDB I/O manager
  • Create a table in DuckDB using a Dagster asset
  • Make a DuckDB table available in Dagster
  • Load DuckDB tables in downstream assets

This guide focuses on storing and loading Pandas DataFrames in DuckDB. Dagster also supports using PySpark and Polars DataFrames with DuckDB. The concepts from this guide apply to working with PySpark and Polars DataFrames, and you can learn more about setting up and using the DuckDB I/O manager with PySpark and Polars DataFrames in the reference guide.


Prerequisites#

To complete this tutorial, you'll need:

  • To install the dagster-duckdb and dagster-duckdb-pandas libraries:

    pip install dagster-duckdb dagster-duckdb-pandas
    

Step 1: Configure the DuckDB I/O manager#

The DuckDB I/O manager requires some configuration to connect to your database. You must provide a path where a DuckDB database will be created. Additionally, you can specify a schema where the DuckDB I/O manager will create tables.

from dagster_duckdb_pandas import DuckDBPandasIOManager

from dagster import Definitions

defs = Definitions(
    assets=[iris_dataset],
    resources={
        "io_manager": DuckDBPandasIOManager(
            database="path/to/my_duckdb_database.duckdb",  # required
            schema="IRIS",  # optional, defaults to PUBLIC
        )
    },
)

With this configuration, if you materialized an asset called iris_dataset, the DuckDB I/O manager would store the data in the IRIS.IRIS_DATASET table in a database stored at path/to/my_duckdb_database.duckdb.

Finally, in the Definitions object, we assign the DuckDBPandasIOManager to the io_manager key. io_manager is a reserved key to set the default I/O manager for your assets.

For more info about each of the configuration values, refer to the DuckDBPandasIOManager API documentation.


Step 2: Create tables in DuckDB#

The DuckDB I/O manager can create and update tables for your Dagster-defined assets, but you can also make existing DuckDB tables available to Dagster.

Store a Dagster asset as a table in DuckDB#

To store data in DuckDB using the DuckDB I/O manager, the definitions of your assets don't need to change. You can tell Dagster to use the DuckDB I/O manager, like in Step 1: Configure the DuckDB I/O manager, and Dagster will handle storing and loading your assets in DuckDB.

import pandas as pd

from dagster import asset


@asset
def iris_dataset() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )

In this example, we first define our asset. Here, we are fetching the Iris dataset as a Pandas DataFrame and renaming the columns. The type signature of the function tells the I/O manager what data type it is working with, so it is important to include the return type pd.DataFrame.

When Dagster materializes the iris_dataset asset using the configuration from Step 1: Configure the DuckDB I/O manager, the DuckDB I/O manager will create the table IRIS.IRIS_DATASET if it does not exist and replace the contents of the table with the value returned from the iris_dataset asset.


Step 3: Load DuckDB tables in downstream assets#

Once you have created an asset or source asset that represents a table in DuckDB, you will likely want to create additional assets that work with the data. Dagster and the DuckDB I/O manager allow you to load the data stored in DuckDB tables into downstream assets.

import pandas as pd

from dagster import asset

# this example uses the iris_dataset asset from Step 2


@asset
def iris_cleaned(iris_dataset: pd.DataFrame) -> pd.DataFrame:
    return iris_dataset.dropna().drop_duplicates()

In this example, we want to provide the iris_dataset asset from the Store a Dagster asset as a table in DuckDB example to the iris_cleaned asset.

In iris_cleaned, the iris_dataset parameter tells Dagster that the value for the iris_dataset asset should be provided as input to iris_cleaned. If this feels too magical for you, refer to the docs for explicitly specifying dependencies.

When materializing these assets, Dagster will use the DuckDBPandasIOManager to fetch the IRIS.IRIS_DATASET as a Pandas DataFrame and pass this DataFrame as the iris_dataset parameter to iris_cleaned. When iris_cleaned returns a Pandas DataFrame, Dagster will use the DuckDBPandasIOManager to store the DataFrame as the IRIS.IRIS_CLEANED table in DuckDB.


Completed code example#

When finished, your code should look like the following:

import pandas as pd
from dagster_duckdb_pandas import DuckDBPandasIOManager

from dagster import Definitions, SourceAsset, asset

iris_harvest_data = SourceAsset(key="iris_harvest_data")


@asset
def iris_dataset() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )


@asset
def iris_cleaned(iris_dataset: pd.DataFrame) -> pd.DataFrame:
    return iris_dataset.dropna().drop_duplicates()


defs = Definitions(
    assets=[iris_dataset, iris_harvest_data, iris_cleaned],
    resources={
        "io_manager": DuckDBPandasIOManager(
            database="path/to/my_duckdb_database.duckdb",
            schema="IRIS",
        )
    },
)

For more DuckDB features, refer to the DuckDB reference.

For more information on Software-defined Assets, refer to the tutorial or the Assets documentation.

For more information on I/O managers, refer to the I/O manager documentation.