Source code for dagster_duckdb.resource
from contextlib import contextmanager
import duckdb
from dagster import ConfigurableResource
from dagster._utils.backoff import backoff
from pydantic import Field
[docs]class DuckDBResource(ConfigurableResource):
"""Resource for interacting with a DuckDB database.
Examples:
.. code-block:: python
from dagster import Definitions, asset
from dagster_duckdb import DuckDBResource
@asset
def my_table(duckdb: DuckDBResource):
with duckdb.get_connection() as conn:
conn.execute("SELECT * from MY_SCHEMA.MY_TABLE")
defs = Definitions(
assets=[my_table],
resources={"duckdb": DuckDBResource(database="path/to/db.duckdb")}
)
"""
database: str = Field(
description=(
"Path to the DuckDB database. Setting database=':memory:' will use an in-memory"
" database "
)
)
@classmethod
def _is_dagster_maintained(cls) -> bool:
return True
@contextmanager
def get_connection(self):
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={"database": self.database, "read_only": False},
max_retries=10,
)
yield conn
conn.close()