Airflow (dagster-airflow)

This library provides a Dagster integration with Airflow.

For more information on getting started, see the Airflow integration guide.

Run Airflow on Dagster

dagster_airflow.make_dagster_definitions_from_airflow_dags_path(dag_path, safe_mode=True, connections=None, resource_defs={})[source]

Construct a Dagster repository corresponding to Airflow DAGs in dag_path.

Usage:

Create make_dagster_definitions.py:

from dagster_airflow import make_dagster_definitions_from_airflow_dags_path

def make_definitions_from_dir():
    return make_dagster_definitions_from_airflow_dags_path(
        '/path/to/dags/',
    )

Use RepositoryDefinition as usual, for example: dagit -f path/to/make_dagster_repo.py -n make_repo_from_dir

Parameters:
  • dag_path (str) – Path to directory or file that contains Airflow Dags

  • include_examples (bool) – True to include Airflow’s example DAGs. (default: False)

  • safe_mode (bool) – True to use Airflow’s default heuristic to find files that contain DAGs (ie find files that contain both b’DAG’ and b’airflow’) (default: True)

  • connections (List[Connection]) – List of Airflow Connections to be created in the Airflow DB

Returns:

Definitions

dagster_airflow.make_dagster_definitions_from_airflow_dag_bag(dag_bag, connections=None, resource_defs={})[source]

Construct a Dagster definition corresponding to Airflow DAGs in DagBag.

Usage:
Create make_dagster_definition.py:

from dagster_airflow import make_dagster_definition_from_airflow_dag_bag from airflow_home import my_dag_bag

def make_definition_from_dag_bag():

return make_dagster_definition_from_airflow_dag_bag(my_dag_bag)

Use Definitions as usual, for example:

dagit -f path/to/make_dagster_definition.py

Parameters:
  • dag_bag (DagBag) – Airflow DagBag Model

  • connections (List[Connection]) – List of Airflow Connections to be created in the Airflow DB

Returns:

Definitions

dagster_airflow.make_schedules_and_jobs_from_airflow_dag_bag(dag_bag, connections=None, resource_defs={})[source]

Construct Dagster Schedules and Jobs corresponding to Airflow DagBag.

Parameters:
  • dag_bag (DagBag) – Airflow DagBag Model

  • connections (List[Connection]) – List of Airflow Connections to be created in the Airflow DB

Returns:

The generated Dagster Schedules - List[JobDefinition]: The generated Dagster Jobs

Return type:

  • List[ScheduleDefinition]

dagster_airflow.make_dagster_job_from_airflow_dag(dag, tags=None, connections=None, resource_defs={})[source]

Construct a Dagster job corresponding to a given Airflow DAG.

Tasks in the resulting job will execute the execute() method on the corresponding Airflow Operator. Dagster, any dependencies required by Airflow Operators, and the module containing your DAG definition must be available in the Python environment within which your Dagster solids execute.

To set Airflow’s execution_date for use with Airflow Operator’s execute() methods, either:

  1. (Best for ad hoc runs) Execute job directly. This will set execution_date to the

    time (in UTC) of the run.

  2. Add {'airflow_execution_date': utc_date_string} to the job tags. This will override

    behavior from (1).

    my_dagster_job = make_dagster_job_from_airflow_dag(
            dag=dag,
            tags={'airflow_execution_date': utc_execution_date_str}
    )
    my_dagster_job.execute_in_process()
    
  3. (Recommended) Add {'airflow_execution_date': utc_date_string} to the run tags,

    such as in the Dagit UI. This will override behavior from (1) and (2)

We apply normalized_name() to the dag id and task ids when generating job name and op names to ensure that names conform to Dagster’s naming conventions.

Parameters:
  • dag (DAG) – The Airflow DAG to compile into a Dagster job

  • tags (Dict[str, Field]) – Job tags. Optionally include tags={‘airflow_execution_date’: utc_date_string} to specify execution_date used within execution of Airflow Operators.

  • connections (List[Connection]) – List of Airflow Connections to be created in the Ephemeral Airflow DB, if use_emphemeral_airflow_db is False this will be ignored.

Returns:

The generated Dagster job

Return type:

JobDefinition

dagster_airflow.load_assets_from_airflow_dag(dag, task_ids_by_asset_key={}, upstream_dependencies_by_asset_key={}, connections=None)[source]

[Experimental] Construct Dagster Assets for a given Airflow DAG.

Parameters:
  • dag (DAG) – The Airflow DAG to compile into a Dagster job

  • task_ids_by_asset_key (Optional[Mapping[AssetKey, AbstractSet[str]]]) – A mapping from asset keys to task ids. Used break up the Airflow Dag into multiple SDAs

  • upstream_dependencies_by_asset_key (Optional[Mapping[AssetKey, AbstractSet[AssetKey]]]) – A mapping from upstream asset keys to assets provided in task_ids_by_asset_key. Used to declare new upstream SDA depenencies.

  • connections (List[Connection]) – List of Airflow Connections to be created in the Airflow DB

Returns:

List[AssetsDefinition]

dagster_airflow.make_ephemeral_airflow_db_resource(connections=[], dag_run_config=None)[source]

Creates a Dagster resource that provides an ephemeral Airflow database.

Parameters:
  • connections (List[Connection]) – List of Airflow Connections to be created in the Airflow DB

  • dag_run_config (Optional[dict]) – dag_run configuration to be used when creating a DagRun

Returns:

The ephemeral Airflow DB resource

Return type:

ResourceDefinition

dagster_airflow.make_persistent_airflow_db_resource(uri='', connections=[], dag_run_config={})[source]

Creates a Dagster resource that provides an persistent Airflow database.

Usage:
from dagster_airflow import (
    make_dagster_definitions_from_airflow_dags_path,
    make_persistent_airflow_db_resource,
)
postgres_airflow_db = "postgresql+psycopg2://airflow:airflow@localhost:5432/airflow"
airflow_db = make_persistent_airflow_db_resource(uri=postgres_airflow_db)
definitions = make_dagster_definitions_from_airflow_example_dags(
    '/path/to/dags/',
    resource_defs={"airflow_db": airflow_db}
)
Parameters:
  • uri – SQLAlchemy URI of the Airflow DB to be used

  • connections (List[Connection]) – List of Airflow Connections to be created in the Airflow DB

  • dag_run_config (Optional[dict]) – dag_run configuration to be used when creating a DagRun

Returns:

The persistent Airflow DB resource

Return type:

ResourceDefinition

Orchestrate Dagster from Airflow

class dagster_airflow.DagsterCloudOperator(*args, **kwargs)[source]

DagsterCloudOperator.

Uses the dagster cloud graphql api to run and monitor dagster jobs on dagster cloud

Parameters:
  • repository_name (str) – the name of the repository to use

  • repostitory_location_name (str) – the name of the repostitory location to use

  • job_name (str) – the name of the job to run

  • run_config (Optional[Dict[str, Any]]) – the run config to use for the job run

  • dagster_conn_id (Optional[str]) – the id of the dagster connection, airflow 2.0+ only

  • organization_id (Optional[str]) – the id of the dagster cloud organization

  • deployment_name (Optional[str]) – the name of the dagster cloud deployment

  • user_token (Optional[str]) – the dagster cloud user token to use

class dagster_airflow.DagsterOperator(*args, **kwargs)[source]

DagsterOperator.

Uses the dagster graphql api to run and monitor dagster jobs on remote dagster infrastructure

Parameters:
  • repository_name (str) – the name of the repository to use

  • repostitory_location_name (str) – the name of the repostitory location to use

  • job_name (str) – the name of the job to run

  • run_config (Optional[Dict[str, Any]]) – the run config to use for the job run

  • dagster_conn_id (Optional[str]) – the id of the dagster connection, airflow 2.0+ only

  • organization_id (Optional[str]) – the id of the dagster cloud organization

  • deployment_name (Optional[str]) – the name of the dagster cloud deployment

  • user_token (Optional[str]) – the dagster cloud user token to use