DuckDB (dagster-duckdb)

This library provides an integration with the DuckDB database.

Related Guides:

dagster_duckdb.DuckDBIOManager IOManagerDefinition[source]

Config Schema:
database (dagster.StringSource):

Path to the DuckDB database.

schema (Union[dagster.StringSource, None], optional):

Name of the schema to use.

Base class for an IO manager definition that reads inputs from and writes outputs to DuckDB.

Examples

from dagster_duckdb import DuckDBIOManager
from dagster_duckdb_pandas import DuckDBPandasTypeHandler

class MyDuckDBIOManager(DuckDBIOManager):
    @staticmethod
    def type_handlers() -> Sequence[DbTypeHandler]:
        return [DuckDBPandasTypeHandler()]

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in duckdb
)
def my_table() -> pd.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={"io_manager": MyDuckDBIOManager(database="my_db.duckdb")}
)

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the IO Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
    ...

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
    # my_table will just contain the data from column "a"
    ...
dagster_duckdb.build_duckdb_io_manager IOManagerDefinition[source]

Config Schema:
database (dagster.StringSource):

Path to the DuckDB database.

schema (Union[dagster.StringSource, None], optional):

Name of the schema to use.

Builds an IO manager definition that reads inputs from and writes outputs to DuckDB.

Parameters:
  • type_handlers (Sequence[DbTypeHandler]) – Each handler defines how to translate between DuckDB tables and an in-memory type - e.g. a Pandas DataFrame. If only one DbTypeHandler is provided, it will be used as teh default_load_type.

  • default_load_type (Type) – When an input has no type annotation, load it as this type.

Returns:

IOManagerDefinition

Examples

from dagster_duckdb import build_duckdb_io_manager
from dagster_duckdb_pandas import DuckDBPandasTypeHandler

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in duckdb
)
def my_table() -> pd.DataFrame:  # the name of the asset will be the table name
    ...

duckdb_io_manager = build_duckdb_io_manager([DuckDBPandasTypeHandler()])

@repository
def my_repo():
    return with_resources(
        [my_table],
        {"io_manager": duckdb_io_manager.configured({"database": "my_db.duckdb"})}
    )

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the IO Manager. For assets, the schema will be determined from the asset key. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
    ...

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
    # my_table will just contain the data from column "a"
    ...
dagster_duckdb.DuckDBResource ResourceDefinition[source]

Config Schema:
database (dagster.StringSource):

Path to the DuckDB database. Setting database=’:memory:’ will use an in-memory database

Resource for interacting with a DuckDB database.

Examples

from dagster import Definitions, asset
from dagster_duckdb import DuckDBResource

@asset
def my_table(duckdb: DuckDBResource):
    with duckdb.get_connection() as conn:
        conn.execute("SELECT * from MY_SCHEMA.MY_TABLE")

defs = Definitions(
    assets=[my_table],
    resources={"duckdb": DuckDBResource(database="path/to/db.duckdb")}
)