In the previous portion of the tutorial, you wrote your first Software-defined asset (SDA), looked at Dagster's UI, and manually materialized your asset.
Continuing from there, you will:
Add more assets to your Dagster project
Connect them to finish creating the pipeline
Give users more knowledge about the assets by adding metadata and logging
By the end of the previous section, you ingested the top Hacker News story IDs into Dagster. Using this data, you'll now look up each story by its ID, ingest that data, and make a DataFrame out of it. You'll connect your current asset with this new asset to establish dependencies and make an asset graph.
Copy and paste the completed asset code below into assets.py:
Add new imports, such as import pandas as pd, to the top of assets.py
import json
import os
import pandas as pd # Add new imports to the top of `assets.py`import requests
from dagster import asset
@asset(deps=[topstory_ids])# this asset is dependent on topstory_idsdeftopstories()->None:withopen("data/topstory_ids.json","r")as f:
topstory_ids = json.load(f)
results =[]for item_id in topstory_ids:
item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
results.append(item)iflen(results)%20==0:print(f"Got {len(results)} items so far.")
df = pd.DataFrame(results)
df.to_csv("data/topstories.csv")
Dependencies between assets are defined using the deps parameter of the @asset decorator. In this case, topstory_ids (the list of IDs) is a dependency of topstories (the CSV file).
In your browser, navigate to Dagster's UI (localhost:3000) and look at the asset graph to see the relationship between your assets.
In the code above, a print statement is used to show progress while fetching stories from the Hacker News API. Dagster has a built-in logger that extends past print and other Python logging methods. This logger shows exactly where logging happens in your Dagster project, such as which asset and materialization the log came from.
To access the logger, use the get_dagster_logger function. The code below replaces the print statement with the Dagster logger:
Along with structured data like tables, Dagster's assets can also be unstructured data, such as images. Your next and final asset will take the DataFrame of stories and create a dictionary of the most frequent words in the titles.
Below is the finished code for a most_frequent_words asset. Copy and paste the code into assets.py:
@asset(deps=[topstories])defmost_frequent_words()->None:
stopwords =["a","the","an","of","to","in","for","and","with","on","is"]
topstories = pd.read_csv("data/topstories.csv")# loop through the titles and count the frequency of each word
word_counts ={}for raw_title in topstories["title"]:
title = raw_title.lower()for word in title.split():
cleaned_word = word.strip(".,-!?:;()[]'\"-")if cleaned_word notin stopwords andlen(cleaned_word)>0:
word_counts[cleaned_word]= word_counts.get(cleaned_word,0)+1# Get the top 25 most frequent words
top_words ={
pair[0]: pair[1]for pair insorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:25]}withopen("data/most_frequent_words.json","w")as f:
json.dump(top_words, f)
Software-defined assets can be enriched with different types of metadata. Anything can be used as metadata for an SDA. Common details to add are:
Statistics about the data, such as row counts or other data profiling
Test results or assertions about the data
Images or tabular previews of the asset
Information about who owns the asset, where it's stored, and links to external documentation
The following code adds a row count and a preview of the topstories asset. It uses the context object to attach metadata to the asset. Update your code for the topstories asset to match the changes below. The context is automatically provided by Dagster during asset materialization and contains some useful methods. In this case, we will use the add_output_metadata method.
import base64
from io import BytesIO
import matplotlib.pyplot as plt
from dagster import(
AssetExecutionContext,
MetadataValue,
asset,
get_dagster_logger,)# Add the imports above to the top of `assets.py`@asset(deps=[topstory_ids])deftopstories(context: AssetExecutionContext)->None:
logger = get_dagster_logger()withopen("data/topstory_ids.json","r")as f:
topstory_ids = json.load(f)
results =[]for item_id in topstory_ids:
item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
results.append(item)iflen(results)%20==0:
logger.info(f"Got {len(results)} items so far.")
df = pd.DataFrame(results)
df.to_csv("data/topstories.csv")
context.add_output_metadata(
metadata={"num_records":len(df),# Metadata can be any key-value pair"preview": MetadataValue.md(df.head().to_markdown()),# The `MetadataValue` class has useful static methods to build Metadata})
Reload the definitions and re-materialize your assets. The metadata can then be seen in the following places:
In the Asset graph page, click on an asset and its metadata will be shown in the right sidebar:
In the Asset Catalog's page for the topstories asset:
The DataFrame was embedded into the asset's metadata with Markdown. Any valid Markdown snippet can be stored and rendered in the Dagster UI, including images. By embedding a bar chart of the most frequently used words as metadata, you and your team can visualize and analyze the most_frequent_words asset without leaving the Dagster UI.
Below is code that changes shows how to add an an image of a bar chart in asset metadata. Replace your most_frequent_words asset with the following:
@asset(deps=[topstories])defmost_frequent_words(context: AssetExecutionContext)->None:
stopwords =["a","the","an","of","to","in","for","and","with","on","is"]
topstories = pd.read_csv("data/topstories.csv")# loop through the titles and count the frequency of each word
word_counts ={}for raw_title in topstories["title"]:
title = raw_title.lower()for word in title.split():
cleaned_word = word.strip(".,-!?:;()[]'\"-")if cleaned_word notin stopwords andlen(cleaned_word)>0:
word_counts[cleaned_word]= word_counts.get(cleaned_word,0)+1# Get the top 25 most frequent words
top_words ={
pair[0]: pair[1]for pair insorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:25]}# Make a bar chart of the top 25 words
plt.figure(figsize=(10,6))
plt.bar(top_words.keys(), top_words.values())
plt.xticks(rotation=45, ha="right")
plt.title("Top 25 Words in Hacker News Titles")
plt.tight_layout()# Convert the image to a saveable formatbuffer= BytesIO()
plt.savefig(buffer,format="png")
image_data = base64.b64encode(buffer.getvalue())# Convert the image to Markdown to preview it within Dagster
md_content =f"![img](data:image/png;base64,{image_data.decode()})"withopen("data/most_frequent_words.json","w")as f:
json.dump(top_words, f)# Attach the Markdown content as metadata to the asset
context.add_output_metadata(metadata={"plot": MetadataValue.md(md_content)})
Reload your definitions and rematerialize your assets. The bar chart will be visible with the rest of your materialization metadata for the most_frequent_words asset. The path key in the metadata will contain a link that says [Show Markdown]. Clicking on the link will open the preview in the Dagster UI. The bar chart will change throughout the day as the top stories change. Here's an example of what most_frequent_words looked like at the time we wrote this tutorial:
If your data is sensitive, such as PHI or PII, be careful and follow your organization's policies for surfacing data. You should practice due diligence before showing your data in metadata or logs.