Databricks (dagster-databricks)

The dagster_databricks package provides these main pieces of functionality:

  • A resource, databricks_pyspark_step_launcher, which will execute a op within a Databricks context on a cluster, such that the pyspark resource uses the cluster’s Spark instance.

  • An op factory, create_databricks_run_now_op, which creates an op that launches an existing Databricks job using the Run Now API.

  • A op factory, create_databricks_submit_run_op, which creates an op that submits a one-time run of a set of tasks on Databricks using the Submit Run API.

Note that, for the databricks_pyspark_step_launcher, either S3 or Azure Data Lake Storage config must be specified for ops to succeed, and the credentials for this storage must also be stored as a Databricks Secret and stored in the resource config so that the Databricks cluster can access storage.

APIs

dagster_databricks.create_databricks_run_now_op(databricks_job_id, databricks_job_configuration=None, poll_interval_seconds=10, max_wait_time_seconds=86400, name=None, databricks_resource_key='databricks')[source]

Creates an op that launches an existing databricks job.

As config, the op accepts a blob of the form described in Databricks’ Job API: https://docs.databricks.com/api-explorer/workspace/jobs/runnow. The only required field is job_id, which is the ID of the job to be executed. Additional fields can be used to specify override parameters for the Databricks Job.

Parameters:
  • databricks_job_id (int) – The ID of the Databricks Job to be executed.

  • databricks_job_configuration (dict) – Configuration for triggering a new job run of a Databricks Job. See https://docs.databricks.com/api-explorer/workspace/jobs/runnow for the full configuration.

  • poll_interval_seconds (float) – How often to poll the Databricks API to check whether the Databricks job has finished running.

  • max_wait_time_seconds (float) – How long to wait for the Databricks job to finish running before raising an error.

  • name (Optional[str]) – The name of the op. If not provided, the name will be _databricks_run_now_op.

  • databricks_resource_key (str) – The name of the resource key used by this op. If not provided, the resource key will be “databricks”.

Returns:

An op definition to run the Databricks Job.

Return type:

OpDefinition

Example

from dagster import job
from dagster_databricks import create_databricks_run_now_op, DatabricksClientResource

DATABRICKS_JOB_ID = 1234


run_now_op = create_databricks_run_now_op(
    databricks_job_id=DATABRICKS_JOB_ID,
    databricks_job_configuration={
        "python_params": [
            "--input",
            "schema.db.input_table",
            "--output",
            "schema.db.output_table",
        ],
    },
)

@job(
    resource_defs={
        "databricks": DatabricksClientResource(
            host=EnvVar("DATABRICKS_HOST"),
            token=EnvVar("DATABRICKS_TOKEN")
        )
    }
)
def do_stuff():
    run_now_op()
dagster_databricks.create_databricks_submit_run_op(databricks_job_configuration, poll_interval_seconds=10, max_wait_time_seconds=86400, name=None, databricks_resource_key='databricks')[source]

Creates an op that submits a one-time run of a set of tasks on Databricks.

As config, the op accepts a blob of the form described in Databricks’ Job API: https://docs.databricks.com/api-explorer/workspace/jobs/submit.

Parameters:
  • databricks_job_configuration (dict) – Configuration for submitting a one-time run of a set of tasks on Databricks. See https://docs.databricks.com/api-explorer/workspace/jobs/submit for the full configuration.

  • poll_interval_seconds (float) – How often to poll the Databricks API to check whether the Databricks job has finished running.

  • max_wait_time_seconds (float) – How long to wait for the Databricks job to finish running before raising an error.

  • name (Optional[str]) – The name of the op. If not provided, the name will be _databricks_submit_run_op.

  • databricks_resource_key (str) – The name of the resource key used by this op. If not provided, the resource key will be “databricks”.

Returns:

An op definition to submit a one-time run of a set of tasks on Databricks.

Return type:

OpDefinition

Example

from dagster import job
from dagster_databricks import create_databricks_submit_run_op, DatabricksClientResource


submit_run_op = create_databricks_submit_run_op(
    databricks_job_configuration={
        "new_cluster": {
            "spark_version": '2.1.0-db3-scala2.11',
            "num_workers": 2
        },
        "notebook_task": {
            "notebook_path": "/Users/dagster@example.com/PrepareData",
        },
    }
)

@job(
    resource_defs={
        "databricks": DatabricksClientResource(
            host=EnvVar("DATABRICKS_HOST"),
            token=EnvVar("DATABRICKS_TOKEN")
        )
    }
)
def do_stuff():
    submit_run_op()
dagster_databricks.databricks_pyspark_step_launcher ResourceDefinition[source]

Config Schema:
run_config (strict dict):

Databricks job run configuration

Config Schema:
cluster (selector):
Config Schema:
new (strict dict):
Config Schema:
size (selector):
Config Schema:
autoscale (strict dict):
Config Schema:
min_workers (Int):

The minimum number of workers to which the cluster can scale down when underutilized. It is also the initial number of workers the cluster will have after creation.

max_workers (Int):

The maximum number of workers to which the cluster can scale up when overloaded. max_workers must be strictly greater than min_workers.

num_workers (Int):

If num_workers, number of worker nodes that this cluster should have. A cluster has one Spark Driver and num_workers Executors for a total of num_workers + 1 Spark nodes.

spark_version (String):

The Spark version of the cluster. A list of available Spark versions can be retrieved by using the Runtime versions API call. This field is required.

spark_conf (permissive dict, optional):

An object containing a set of optional, user-specified Spark configuration key-value pairs. You can also pass in a string of extra JVM options to the driver and the executors via spark.driver.extraJavaOptions and spark.executor.extraJavaOptions respectively. Example Spark confs: {“spark.speculation”: true, “spark.streaming.ui.retainedBatches”: 5} or {“spark.driver.extraJavaOptions”: “-verbose:gc -XX:+PrintGCDetails”}

nodes (selector):

The nodes used in the cluster. Either the node types or an instance pool can be specified.

Config Schema:
node_types (strict dict):
Config Schema:
node_type_id (String):

This field encodes, through a single value, the resources available to each of the Spark nodes in this cluster. For example, the Spark nodes can be provisioned and optimized for memory or compute intensive workloads. A list of available node types can be retrieved by using the List node types API call. This field is required.

driver_node_type_id (String, optional):

The node type of the Spark driver. This field is optional; if unset, the driver node type is set as the same value as node_type_id defined above.

instance_pool_id (String, optional):

The optional ID of the instance pool to which the cluster belongs. Refer to the Instance Pools API for details.

aws_attributes (permissive dict, optional):

Attributes related to clusters running on Amazon Web Services. If not specified at cluster creation, a set of default values is used. See aws_attributes at https://docs.databricks.com/dev-tools/api/latest/clusters.html.

Config Schema:
first_on_demand (Int, optional):

The first first_on_demand nodes of the cluster will be placed on on-demand instances. If this value is greater than 0, the cluster driver node will be placed on an on-demand instance. If this value is greater than or equal to the current cluster size, all nodes will be placed on on-demand instances. If this value is less than the current cluster size, first_on_demand nodes will be placed on on-demand instances and the remainder will be placed on availability instances. This value does not affect cluster size and cannot be mutated over the lifetime of a cluster.

availability (AWSAvailability, optional):

Availability type used for all subsequent nodes past the first_on_demand ones. Note: If first_on_demand is zero, this availability type will be used for the entire cluster.

zone_id (String, optional):

Identifier for the availability zone/datacenter in which the cluster resides.

instance_profile_arn (String, optional):

Nodes for this cluster will only be placed on AWS instances with this instance profile.

spot_bid_price_percent (Int, optional):

The max price for AWS spot instances, as a percentage of the corresponding instance type’s on-demand price.

ebs_volume_type (EBSVolumeType, optional):

The type of EBS volumes that will be launched with this cluster.

ebs_volume_count (Int, optional):

The number of volumes launched for each instance. You can choose up to 10 volumes.

ebs_volume_size (Int, optional):

The size of each EBS volume (in GiB) launched for each instance.

ebs_volume_iops (Int, optional):

The number of IOPS per EBS gp3 volume.

ebs_volume_throughput (Int, optional):

The throughput per EBS gp3 volume, in MiB per second.

ssh_public_keys (List[String], optional):

SSH public key contents that will be added to each Spark node in this cluster. The corresponding private keys can be used to login with the user name ubuntu on port 2200. Up to 10 keys can be specified.

custom_tags (List[strict dict], optional):

Additional tags for cluster resources. Databricks tags all cluster resources (e.g., AWS instances and EBS volumes) with these tags in addition to default_tags. Note: - Tags are not supported on legacy node types such as compute-optimized and memory-optimized - Databricks allows at most 45 custom tagsMore restrictions may apply if using Azure Databricks; refer to the official docs for further details.

cluster_log_conf (selector, optional):

Recommended! The configuration for delivering Spark logs to a long-term storage destination. Only one destination can be specified for one cluster. If the conf is given, the logs will be delivered to the destination every 5 mins. The destination of driver logs is <destination>/<cluster-id>/driver, while the destination of executor logs is <destination>/<cluster-id>/executor.

Config Schema:
dbfs (strict dict):

DBFS storage information

Config Schema:
destination (String):

DBFS destination, e.g. dbfs:/my/path

s3 (strict dict):

S3 storage information

Config Schema:
destination (String):

S3 destination, e.g. s3://my-bucket/some-prefix. You must configure the cluster with an instance profile and the instance profile must have write access to the destination. You cannot use AWS keys.

region (String):

S3 region, e.g. us-west-2. Either region or endpoint must be set. If both are set, endpoint is used.

endpoint (String):

S3 endpoint, e.g. https://s3-us-west-2.amazonaws.com. Either region or endpoint must be set. If both are set, endpoint is used.

enable_encryption (Bool, optional):

(Optional) Enable server side encryption, false by default.

encryption_type (String, optional):

(Optional) The encryption type, it could be sse-s3 or sse-kms. It is used only when encryption is enabled and the default type is sse-s3.

kms_key (String, optional):

(Optional) KMS key used if encryption is enabled and encryption type is set to sse-kms.

canned_acl (String, optional):

(Optional) Set canned access control list, e.g. bucket-owner-full-control.If canned_acl is set, the cluster instance profile must have s3:PutObjectAcl permission on the destination bucket and prefix. The full list of possible canned ACLs can be found at https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl. By default only the object owner gets full control. If you are using cross account role for writing data, you may want to set bucket-owner-full-control to make bucket owner able to read the logs.

init_scripts (List[selector], optional):

The configuration for storing init scripts. Any number of scripts can be specified. The scripts are executed sequentially in the order provided. If cluster_log_conf is specified, init script logs are sent to <destination>/<cluster-id>/init_scripts.

spark_env_vars (permissive dict, optional):

An object containing a set of optional, user-specified environment variable key-value pairs. Key-value pair of the form (X,Y) are exported as is (i.e., export X=”Y”) while launching the driver and workers. To specify an additional set of SPARK_DAEMON_JAVA_OPTS, we recommend appending them to $SPARK_DAEMON_JAVA_OPTS as shown in the example below. This ensures that all default Databricks managed environmental variables are included as well. Example Spark environment variables: {“SPARK_WORKER_MEMORY”: “28000m”, “SPARK_LOCAL_DIRS”: “/local_disk0”} or {“SPARK_DAEMON_JAVA_OPTS”: “$SPARK_DAEMON_JAVA_OPTS -Dspark.shuffle.service.enabled=true”}

enable_elastic_disk (Bool, optional):

Autoscaling Local Storage: when enabled, this cluster dynamically acquires attitional disk space when its Spark workers are running low on disk space. This feature requires specific AWS permissions to function correctly - refer to https://docs.databricks.com/clusters/configure.html#autoscaling-local-storage for details.

existing (String):

The ID of an existing cluster that will be used for all runs of this job. When running jobs on an existing cluster, you may need to manually restart the cluster if it stops responding. Databricks suggests running jobs on new clusters for greater reliability.

run_name (String, optional):

An optional name for the run. The default value is Untitled

libraries (List[selector], optional):

An optional list of libraries to be installed on the cluster that will execute the job. By default dagster, dagster-databricks and dagster-pyspark libraries will be included.

install_default_libraries (Bool, optional):

By default, Dagster installs a version of dagster, dagster-databricks, and dagster-pyspark matching the locally-installed versions of those libraries. If you would like to disable this behavior, this value can be set to False.

timeout_seconds (Int, optional):

An optional timeout applied to each run of this job. The default behavior is to have no timeout.

idempotency_token (String, optional):

An optional token that can be used to guarantee the idempotency of job run requests.If an active run with the provided token already exists, the request will not create a new run, but will return the ID of the existing run instead. If you specify the idempotency token, upon failure you can retry until the request succeeds. Databricks guarantees that exactly one run will be launched with that idempotency token. This token should have at most 64 characters.

permissions (strict dict, optional):
Default Value:
{}
Config Schema:
job_permissions (strict dict, optional):

job permission spec; ref: https://docs.databricks.com/security/access-control/jobs-acl.html#job-permissions

Config Schema:
NO_PERMISSIONS (List[selector], optional):

CAN_VIEW (List[selector], optional):

CAN_MANAGE_RUN (List[selector], optional):

IS_OWNER (List[selector], optional):

CAN_MANAGE (List[selector], optional):

cluster_permissions (strict dict, optional):

cluster permission spec; ref: https://docs.databricks.com/security/access-control/cluster-acl.html#cluster-level-permissions

Config Schema:
NO_PERMISSIONS (List[selector], optional):

CAN_ATTACH_TO (List[selector], optional):

CAN_RESTART (List[selector], optional):

CAN_MANAGE (List[selector], optional):

databricks_host (dagster.StringSource):

Databricks host, e.g. uksouth.azuredatabricks.com

databricks_token (dagster.StringSource):

Databricks access token

env_variables (permissive dict, optional):

Dictionary of arbitrary environment variables to be set on the databricks cluster.

secrets_to_env_variables (List[strict dict], optional):

Databricks secrets to be exported as environment variables. Since runs will execute in the Databricks runtime environment, environment variables (such as those required for a StringSource config variable) will not be accessible to Dagster. These variables must be stored as Databricks secrets and specified here, which will ensure they are re-exported as environment variables accessible to Dagster upon execution.

storage (selector, optional):

Databricks storage configuration for either S3 or ADLS2. If access credentials for your Databricks storage are stored in Databricks secrets, this config indicates the secret scope and the secret keys used to access either S3 or ADLS2.

Config Schema:
s3 (strict dict):

S3 storage secret configuration

Config Schema:
secret_scope (String):

The Databricks secret scope containing the storage secrets.

access_key_key (String):

The key of a Databricks secret containing the S3 access key ID.

secret_key_key (String):

The key of a Databricks secret containing the S3 secret access key.

adls2 (strict dict):

ADLS2 storage secret configuration

Config Schema:
secret_scope (String):

The Databricks secret scope containing the storage secrets.

storage_account_name (String):

The name of the storage account used to access data.

storage_account_key_key (String):

The key of a Databricks secret containing the storage account secret key.

local_pipeline_package_path (dagster.StringSource, optional):

Absolute path to root python package containing your Dagster code. If you set this value to a directory lower than the root package, and have user relative imports in your code (e.g. from .foo import bar), it’s likely you’ll encounter an import error on the remote step. Before every step run, the launcher will zip up the code in this local path, upload it to DBFS, and unzip it into the Python path of the remote Spark process. This gives the remote process access to up-to-date user code.

local_dagster_job_package_path (dagster.StringSource, optional):

Absolute path to root python package containing your Dagster code. If you set this value to a directory lower than the root package, and have user relative imports in your code (e.g. from .foo import bar), it’s likely you’ll encounter an import error on the remote step. Before every step run, the launcher will zip up the code in this local path, upload it to DBFS, and unzip it into the Python path of the remote Spark process. This gives the remote process access to up-to-date user code.

staging_prefix (dagster.StringSource, optional):

Directory in DBFS to use for uploaded job code. Must be absolute.

Default Value: ‘/dagster_staging’

wait_for_logs (Bool, optional):

If set, and if the specified cluster is configured to export logs, the system will wait after job completion for the logs to appear in the configured location. Note that logs are copied every 5 minutes, so enabling this will add several minutes to the job runtime. NOTE: this integration will export stdout/stderrfrom the remote Databricks process automatically, so this option is not generally necessary.

Default Value: False

max_completion_wait_time_seconds (dagster.IntSource, optional):

If the Databricks job run takes more than this many seconds, then consider it failed and terminate the step.

Default Value: 86400

poll_interval_sec (Float, optional):

How frequently Dagster will poll Databricks to determine the state of the job.

Default Value: 5.0

verbose_logs (Bool, optional):

Determines whether to display debug logs emitted while job is being polled. It can be helpful for Dagit performance to set to False when running long-running or fan-out Databricks jobs, to avoid forcing the UI to fetch large amounts of debug logs.

Default Value: True

add_dagster_env_variables (Bool, optional):

Automatically add Dagster system environment variables. This option is only applicable when the code being executed is deployed on Dagster Cloud. It will be ignored when the environment variables provided by Dagster Cloud are not present.

Default Value: True

Resource for running ops as a Databricks Job.

When this resource is used, the op will be executed in Databricks using the ‘Run Submit’ API. Pipeline code will be zipped up and copied to a directory in DBFS along with the op’s execution context.

Use the ‘run_config’ configuration to specify the details of the Databricks cluster used, and the ‘storage’ key to configure persistent storage on that cluster. Storage is accessed by setting the credentials in the Spark context, as documented here for S3 and here for ADLS.

class dagster_databricks.DatabricksError[source]

Resources

class dagster_databricks.DatabricksClient(host, token, workspace_id=None)[source]

A thin wrapper over the Databricks REST API.

property api_client

Retrieve a reference to the underlying Databricks API client. For more information, see the Databricks Python API.

Examples:

from dagster import op
from databricks_cli.jobs.api import JobsApi
from databricks_cli.runs.api import RunsApi

@op(required_resource_keys={"databricks_client"})
def op1(context):
    # Initialize the Databricks Jobs API
    jobs_client = JobsApi(context.resources.databricks_client.api_client)
    runs_client = RunsApi(context.resources.databricks_client.api_client)

    # Example 1: Run a Databricks job with some parameters.
    jobs_client.run_now(...)

    # Example 2: Trigger a one-time run of a Databricks workload.
    runs_client.submit_run(...)

    # Example 3: Get an existing run.
    runs_client.get_run(...)

    # Example 4: Cancel a run.
    runs_client.cancel_run(...)
Returns:

The authenticated Databricks API client.

Return type:

ApiClient

dagster_databricks.databricks_client ResourceDefinition[source]

Config Schema:
host (dagster.StringSource):

Databricks host, e.g. uksouth.azuredatabricks.com

token (dagster.StringSource):

Databricks access token

workspace_id (Union[dagster.StringSource, None], optional):

The Databricks workspace ID, as described in https://docs.databricks.com/workspace/workspace-details.html#workspace-instance-names-urls-and-ids. This is used to log a URL for accessing the job in the Databricks UI.