Optimizing Data Materialization Using Dagster’s Policies

Olivier Dupuis
5 min readApr 28, 2023

--

Managing an effective asset materialization strategy for the discursus project, a data product on protest movements in North America, has proven challenging. As with most projects, some assets need to be materialized at different intervals or based on different criteria. For example:

  • Some assets should run every 15 minutes on the mark.
  • Others need to run immediately after an upstream asset materializes.
  • While others should run only once their data becomes stale.

Fortunately, Dagster has launched a suite of APIs to declaratively define when an asset should be materialized.

This article will discuss my own experimentation with asset materialization policies based on my project’s needs. Please refer to the official documentation and community channels to implement the right strategy for your own assets.

Controlling Materialization Through Policies

Dagster allows you to control different asset materialization requirements using two types of policies:

  • Freshness policies — “specifies how up-to-date you want a given asset to be”. For example, I might want an asset to have data as fresh as its upstream dependencies within 15 minutes.
  • Auto-materialization policies — “automatically materialize assets when criteria are met”. For example, I might want to immediately materialize an asset once upstream assets have been materialized (eager).

Freshness policies have been around since version 1.1, while auto-materialization was introduced in version 1.3. Both policies influence each other; for instance, an asset with a “lazy” auto-materialization policy will only check its freshness condition to determine when to materialize.

Let’s work through examples to understand these concepts better.

A Pre-Policy Materialization Strategy

This is my current DAG of assets.

discusus Dag of assets

In a pre-policy setup, I would have had schedules to materialize those assets. I would first define jobs:

# Packaging assets into a job
source_and_classify_relevancy_of_gdelt_assets_job = define_asset_job(
name="source_and_classify_relevancy_of_gdelt_assets_job",
selection=[
"gdelt_events",
"gdelt_mentions",
"gdelt_mentions_enhanced"
]
)

And then schedule those jobs:

# Scheduling a job run 
ScheduleDefinition(
job=source_and_classify_relevancy_of_gdelt_assets_job,
cron_schedule="47 * * * *"
)

This approach is not much different from other schedule-based orchestrators. You would need to think in terms of pipelines and the sequence of assets that require materialization, as well as how to optimize those jobs to minimize resource drain.

Software-Defined Assets and Freshness Policies

The release of Dagster version 1.1 significantly changed our perspective on data platforms as pipelines, as it introduced the concept of software-defined assets. Assets became the first-class citizens of data platforms, rather than pipelines.

This shift meant that our responsibility was now to define each asset’s freshness policy, and Dagster would take care of running jobs that would enforce those policies.

For example, if I have an asset with the following configurations:

@asset(
non_argument_deps = {"gdelt_mentions_enhanced"},
description = "Entity extraction of GDELT mentions",
key_prefix = ["gdelt"],
group_name = "prepared_sources",
resource_defs = {
'novacene_resource': my_resources.my_novacene_resource,
'snowflake_resource': my_resources.my_snowflake_resource
},
freshness_policy = FreshnessPolicy(maximum_lag_minutes=60)
)
def gdelt_mentions_entity_extraction(context):
# Bla

Dagster will interpret this policy as follows:

Asset freshness policy

By defining and enabling an asset sensor, Dagster would periodically assess an asset’s freshness:

  • Fresh: The asset has the latest data
  • Stale: The asset still conforms to the freshness policy, but there is more up-to-date data in upstream assets
  • Overdue: The asset’s data no longer conforms to the freshness policy

When an asset is overdue, it needs to be refreshed. The sensor will make that assessment, bundle all assets that need to be materialized to enforce freshness policies and run them.

As a result, you will have runs that might vary significantly based on those freshness policies, something difficult to replicate with schedules.

Runs based on freshness policies

Auto-Materialization

Freshness policies have been a game-changer in controlling the materialization of your data platform’s assets. However, there were certain scenarios where that policy wasn’t fully expressing an asset’s materialization needs.

For example, I have a suite of assets that should always run in a sequence. This means that whenever asset A materializes, assets B and C should materialize right away. There wasn’t an ideal way of expressing that with freshness policies.

Auto-materialization policies express this requirement by configuring assets as either lazyor eager.

Assets can be auto-materialized “eagerly” — i.e. immediately after upstream changes occur. Or they can be auto-materialized “lazily” — i.e. by waiting until downstream FreshnessPolicys dictate that they need to be fresh. Or a mixture of both.

For example, consider these two assets:

@asset(
description = "List of events mined on GDELT",
key_prefix = ["gdelt"],
group_name = "sources",
resource_defs = {
'aws_resource': my_resources.my_aws_resource,
'gdelt_resource': my_resources.my_gdelt_resource,
'snowflake_resource': my_resources.my_snowflake_resource
},
auto_materialize_policy=AutoMaterializePolicy.lazy(),
freshness_policy = FreshnessPolicy(maximum_lag_minutes=15),
)
def gdelt_events(context):
# Bla


@asset(
ins = {"gdelt_events": AssetIn(key_prefix = "gdelt")},
description = "List of mentions mined from GDELT",
key_prefix = ["gdelt"],
group_name = "sources",
resource_defs = {
'aws_resource': my_resources.my_aws_resource,
'gdelt_resource': my_resources.my_gdelt_resource,
'snowflake_resource': my_resources.my_snowflake_resource
},
auto_materialize_policy=AutoMaterializePolicy.eager(),
)
def gdelt_mentions(context, gdelt_events):
# Bla

With those policies in place:

  1. gdelt_events will have a lazy auto-materialization policy and will be materialized only when the freshness policy is violated (i.e., maximum lag of 15 minutes).
  2. gdelt_mentions will have an eager auto-materialization policy, which means it will be materialized immediately after gdelt_events is materialized.

I also have another asset that is eagerly “chained” in that sequence. That means that Dagster will always run those 2 subsequent assets whenever the gdelt_events asset gets materialized.

Eager auto-materialization policy

dbt Assets

For my dbt friends, defining freshness and auto-materialization policies for your assets is also straightforward. Here’s an example of how to define these policies for dbt assets in dbt_project.yml:

models:
discursus_dw:
+dagster_freshness_policy:
maximum_lag_minutes: !!float 360
+dagster_auto_materialize_policy:
type: lazy

This configuration is set at the project level, but as with other dbt configurations, you can define it at the folder level:

models:
discursus_dw:
staging:
+dagster_freshness_policy:
maximum_lag_minutes: !!float 360
+dagster_auto_materialize_policy:
type: lazy

Or within each model individually:

{{
config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'gdelt_event_natural_key',
dagster_freshness_policy = {"maximum_lag_minutes": 6 * 60}
)
}}

with source as (

select * from {{ source('gdelt', 'gdelt_events') }}

)

...

Please note that the dagster_auto_materialize_policy configuration is only available starting with version 1.3.2. (thanks to a community contribution 🙏).

Continuous Optimization of Materialization Strategy

This blog post has provided only a brief overview of software-defined assets, freshness policies, and auto-materialization policies in Dagster. As with everything in Dagster, there are many more features and nuances to explore, and you should adopt the techniques that best suit your data platform’s needs.

What’s important is to recognize that implementing a materialization strategy should be an ongoing effort. Each asset policy should be carefully configured so that it always delivers the desired levels of data freshness while minimizing resource consumption.

By relying on Dagster’s powerful and flexible features, you can continuously optimize your materialization strategy to meet the evolving needs of your data platform.

--

--