This guide focuses on storing and loading Pandas DataFrames in DuckDB. Dagster also supports using PySpark and Polars DataFrames with DuckDB. The concepts from this guide apply to working with PySpark and Polars DataFrames, and you can learn more about setting up and using the DuckDB I/O manager with PySpark and Polars DataFrames in the reference guide.
The DuckDB I/O manager requires some configuration to connect to your database. You must provide a path where a DuckDB database will be created. Additionally, you can specify a schema where the DuckDB I/O manager will create tables.
from dagster_duckdb_pandas import DuckDBPandasIOManager
from dagster import Definitions
defs = Definitions(
assets=[iris_dataset],
resources={"io_manager": DuckDBPandasIOManager(
database="path/to/my_duckdb_database.duckdb",# required
schema="IRIS",# optional, defaults to PUBLIC)},)
With this configuration, if you materialized an asset called iris_dataset, the DuckDB I/O manager would store the data in the IRIS.IRIS_DATASET table in a database stored at path/to/my_duckdb_database.duckdb.
Finally, in the Definitions object, we assign the DuckDBPandasIOManager to the io_manager key. io_manager is a reserved key to set the default I/O manager for your assets.
For more info about each of the configuration values, refer to the DuckDBPandasIOManager API documentation.
The DuckDB I/O manager can create and update tables for your Dagster-defined assets, but you can also make existing DuckDB tables available to Dagster.
To store data in DuckDB using the DuckDB I/O manager, the definitions of your assets don't need to change. You can tell Dagster to use the DuckDB I/O manager, like in Step 1: Configure the DuckDB I/O manager, and Dagster will handle storing and loading your assets in DuckDB.
import pandas as pd
from dagster import asset
@assetdefiris_dataset()-> pd.DataFrame:return pd.read_csv("https://docs.dagster.io/assets/iris.csv",
names=["sepal_length_cm","sepal_width_cm","petal_length_cm","petal_width_cm","species",],)
In this example, we first define our asset. Here, we are fetching the Iris dataset as a Pandas DataFrame and renaming the columns. The type signature of the function tells the I/O manager what data type it is working with, so it is important to include the return type pd.DataFrame.
When Dagster materializes the iris_dataset asset using the configuration from Step 1: Configure the DuckDB I/O manager, the DuckDB I/O manager will create the table IRIS.IRIS_DATASET if it does not exist and replace the contents of the table with the value returned from the iris_dataset asset.
If you already have tables in DuckDB, you may want to make them available to other Dagster assets. You can accomplish this by using source assets for these tables. By creating a source asset for the existing table, you tell Dagster how to find the table so it can be fetched for downstream assets.
from dagster import SourceAsset
iris_harvest_data = SourceAsset(key="iris_harvest_data")
In this example, we create a SourceAsset for a pre-existing table containing iris harvests data. To make the data available to other Dagster assets, we need to tell the DuckDB I/O manager how to find the data.
Because we already supplied the database and schema in the I/O manager configuration in Step 1: Configure the DuckDB I/O manager, we only need to provide the table name. We do this with the key parameter in SourceAsset. When the I/O manager needs to load the iris_harvest_data in a downstream asset, it will select the data in the IRIS.IRIS_HARVEST_DATA table as a Pandas DataFrame and provide it to the downstream asset.
Once you have created an asset or source asset that represents a table in DuckDB, you will likely want to create additional assets that work with the data. Dagster and the DuckDB I/O manager allow you to load the data stored in DuckDB tables into downstream assets.
import pandas as pd
from dagster import asset
# this example uses the iris_dataset asset from Step 2@assetdefiris_cleaned(iris_dataset: pd.DataFrame)-> pd.DataFrame:return iris_dataset.dropna().drop_duplicates()
In iris_cleaned, the iris_dataset parameter tells Dagster that the value for the iris_dataset asset should be provided as input to iris_cleaned. If this feels too magical for you, refer to the docs for explicitly specifying dependencies.
When materializing these assets, Dagster will use the DuckDBPandasIOManager to fetch the IRIS.IRIS_DATASET as a Pandas DataFrame and pass this DataFrame as the iris_dataset parameter to iris_cleaned. When iris_cleaned returns a Pandas DataFrame, Dagster will use the DuckDBPandasIOManager to store the DataFrame as the IRIS.IRIS_CLEANED table in DuckDB.