Asset-based scheduling
Asset-Based Scheduling (The "Event" Approach)
Formerly known as "Datasets," Airflow 3.x refers to these as Assets. The trigger is the completion of a task.
Philosophy: "I don't care what time it is. I will run the moment the
sales_tableis updated."Best for: Chained pipelines where DAG B depends on DAG A, but you don't want to tightly couple them.
The Mechanism: It uses a Producer / Consumer model.
How it works in Code
Step A: Define the Asset You create a global identifier for the data. This isn't the actual file content, just a URI pointer.
from airflow.sdk import Asset
# Define the "Token"
raw_events_file = Asset("s3://my-bucket/events/raw.json")Step B: The Producer (The "Updater") In your first DAG, you tell Airflow: "When this task finishes, I have successfully updated the raw_events_file."
@task(outlets=[raw_events_file]) # <--- notifying the system
def download_data():
# ... code that actually uploads the file to S3 ...
passStep C: The Consumer (The "Reactor") In your second DAG, instead of a time, you schedule it on the Asset itself.
Visualized example
Example 1: Consumer and Producer with Asset based scheduling
This is one of the trickiest parts of Asset-based scheduling: Context Alignment.
The Problem This Solves
When the Consumer DAG starts, its own logical_date is just "the time it triggered" (e.g., 9:05 AM). But the file it needs to read is named after the Producer's logical_date (e.g., 2025-01-01).
If the Consumer looks for {{ ds }}.json, it will look for 2025-02-04.json (today) and fail. In this code snippet there is the "bridge" that reaches back into the metadata to ask: "What was the logical date of the run that triggered me?". In other words, you need the Consumer to grab the logical date of the Producer.
In a standard Time-Based setup, every task in the DAG automatically shares the same logical_date because they all belong to the same "bucket."
But in Asset-Based scheduling, the Producer and Consumer are two completely separate "lifecycles."
Here is the full example with that logic integrated.
Full Example: Producer & Consumer with Context Bridge
The Logic Breakdown: What is that huge template doing?
Let's dissect this line: {{ (triggering_asset_events.values() | first | first).source_dag_run.logical_date }}
triggering_asset_events: This is a dictionary available only in Asset-triggered runs. It contains a list of all the "events" (updates) that caused this DAG to start..values() | first: Since we likely only have one Asset triggering us, we grab the first list of events available.| first: We grab the most recent specific event from that list..source_dag_run: This is the reference to the Producer's execution run..logical_date: This extracts the Producer's original scheduled date (e.g.,2025-01-01).
Why the | ds at the end matters
| ds at the end mattersIn the code block above, note the added | ds to the very end of your template: ...logical_date | ds }}.json
Without | ds, logical_date returns a full timestamp object (e.g., 2025-01-01T00:00:00+00:00). Your file system usually prefers the clean 2025-01-01 string, so adding the ds filter ensures the filename is valid.
One Small Warning
This logic assumes one run of the Producer triggers one run of the Consumer.
If your Consumer is waiting for multiple Assets (e.g., Sales AND Marketing), you have to be careful, as you'll have multiple "Source Dates" to choose from. In that case, you'd usually look for the logical_date of the specific asset you are about to process.
Example 2: Passing metadata along with the Asset
Here is a complete, production-style example that uses Airflow 3 syntax to pass a "Data Contract" through an Asset.
In this scenario, we use the yield Metadata approach. This is the cleanest way to ensure the Consumer knows exactly which file to look for, even if the Producer and Consumer are scheduled differently.
The Full Code
Why this works so well
Explicit Communication: You aren't forcing the Consumer to "guess" the date based on when it was triggered. The Producer tells it exactly: "Hey, I just finished the file for 2025-01-01."
No Logic Leakage: The Consumer doesn't need to know the name of the Producer DAG or how it's structured. It only needs to know that the Asset it listens to provides a
datefield in theextrametadata.The "Yield" Pattern: In Airflow 3, using
yield Metadatais the preferred way to signal that a task has finished and created a specific version of a dataset.
How to verify it in your Environment
Once you run these DAGs in your Docker setup:
Trigger the Producer.
After it finishes, go to the Consumer DAG in the Airflow UI.
Click on the "Rendered Template" tab for the
calculate_statstask.You will see that the
input_pathhas been transformed from the complex Jinja string into a clean path like/data/events_02/2026-02-05.json.
A Final Tip on the "Contract"
If you ever need to add more info, you can just expand the dictionary: extra={"date": run_date, "batch_id": 42, "env": "prod"}
Downstream, the Consumer can pick and choose which of those "sticky notes" it wants to read.
Example 3: Consuming multiple assets
When you consume multiple assets, you are essentially creating a Logical Gate in your data pipeline. By default, Airflow 3 uses "AND" logic for multiple assets in a list, meaning the DAG will only trigger once every asset in the list has been updated at least once since the last time the consumer ran.
Here is a comprehensive example using the Context Manager style. In this scenario, we have a "Dashboard" that requires both Sales Data and Weather Data to be ready before it can calculate its insights.
Full Example: Multiple Asset Consumer (AND Logic)
Key Concepts in this Example
1. The "AND" Trigger
Because schedule=[sales_asset, weather_asset], the behavior is as follows:
If Sales updates: The consumer waits.
If Weather updates: The consumer waits.
Once both have sent at least one success signal since the last consumer run: The Consumer Triggers.
2. Accessing Specific Metadata
Since you are listening to two different things, triggering_asset_events now contains two sets of "sticky notes." To tell them apart, we access the dictionary using the unique URI of each asset:
triggering_asset_events.get(sales_asset.uri)
3. Complexity of the "Contract"
In this multi-asset setup, your Data Contract is now with two different teams (the Sales team and the Weather team). If either team changes their extra metadata key, this aggregator DAG will fail to find the data it needs.
What about "OR" Logic?
If you wanted the DAG to trigger whenever either Sales or Weather updated (without waiting for the other), Airflow 3 allows you to use the "Pipe" operator:
Summary of the Graph View
If you look at this in the Asset Graph UI, you will see two separate Producer boxes on the left, with lines converging into a single box representing the Consumer DAG. This "Fan-In" visual makes it immediately obvious that the Dashboard depends on two distinct streams of data.
Example 4: combining time and asset based scheduling
This is the "Safety Net" pattern in Airflow 3.x. It solves one of the biggest dilemmas in data engineering: "Do I wait for the data to arrive, or do I run on a fixed schedule no matter what?"
By using AssetOrTimeSchedule, you are essentially telling Airflow: "Trigger this DAG whenever the Asset is updated, BUT if it hasn't updated by Wednesday at 1:00 AM, run it anyway."
The Logic: "First Come, First Served"
As the name suggests, this is an OR operation between the clock and the data.
Scenario A (Data arrives early): The Producer finishes on Tuesday. The
example_datasetis materialized. The Consumer DAG triggers immediately. The "timer" for Wednesday is reset.Scenario B (Data is late/missing): It’s Wednesday at 1:00 AM. The Producer hasn't finished yet. Airflow says, "I can't wait any longer," and triggers the Consumer DAG regardless.
CronTriggerTimetable: This is the standard Airflow 3 way to define a "point in time" trigger (unlike the interval-based cron we discussed at the very beginning).assets: A list of one or more assets. If you have multiple assets here, they usually follow AND logic (all must update) before they can "beat" the clock.
Last updated