# Asset-based scheduling

***

### Asset-Based Scheduling (The "Event" Approach)

Formerly known as "**Datasets**," Airflow 3.x refers to these as **Assets**. The trigger is the completion of a task.

* **Philosophy:** "I don't care what time it is. I will run the moment the `sales_table` is updated."
* **Best for:** Chained pipelines where DAG B depends on DAG A, but you don't want to tightly couple them.
* **The Mechanism:** It uses a **Producer / Consumer model.**

**How it works in Code**

Step A: Define the Asset You create a global identifier for the data. This isn't the actual file content, just a URI pointer.

```python
from airflow.sdk import Asset

# Define the "Token"
raw_events_file = Asset("s3://my-bucket/events/raw.json")
```

**Step B: The Producer (The "Updater")** In your first DAG, you tell Airflow: *"When this task finishes, I have successfully updated the `raw_events_file`."*

```python
@task(outlets=[raw_events_file]) # <--- notifying the system
def download_data():
    # ... code that actually uploads the file to S3 ...
    pass
```

**Step C: The Consumer (The "Reactor")** In your second DAG, instead of a time, you schedule it on the Asset itself.

```python
with DAG(
    dag_id="process_events",
    schedule=[raw_events_file], # <--- The Trigger
    ...
):
    # This DAG triggers instantly when 'download_data' finishes
    ...
```

***

### Visualized example

{% @mermaid/diagram content="graph LR
subgraph FetchDAG\["Fetch DAG (producer)"]
A\[Fetch<br/>events.]
end

```
subgraph Asset["Asset<br/>file:///data/events"]
    DB[(Storage)]
end

subgraph AnalyticsDAG["Analytics DAG (consumer)"]
    B[Calculate<br/>statistics.]
end

subgraph MarketingDAG["Marketing DAG (consumer)"]
    C[Calculate<br/>engagement.]
end

subgraph MonitoringDAG["Monitoring DAG (consumer)"]
    D[Calculate<br/>performance.]
end

A -->|Update| DB
DB -->|Trigger?| B
DB -->|Trigger?| C
DB -->|Trigger?| D

style FetchDAG fill:#e8f5e9
style AnalyticsDAG fill:#e3f2fd
style MarketingDAG fill:#e3f2fd
style MonitoringDAG fill:#e3f2fd
style Asset fill:#fff9c4" %}
```

***

### Example 1: Consumer and Producer with Asset based scheduling

This is one of the trickiest parts of Asset-based scheduling: **Context Alignment**.

#### The Problem This Solves

When the **Consumer** DAG starts, its own `logical_date` is just "the time it triggered" (e.g., 9:05 AM). But the file it needs to read is named after the **Producer's** `logical_date` (e.g., `2025-01-01`).

If the Consumer looks for `{{ ds }}.json`, it will look for `2025-02-04.json` (today) and fail. In this code snippet there is the "bridge" that reaches back into the metadata to ask: *"What was the logical date of the run that triggered me?".* **In other words, you need the Consumer to grab the logical date of the Producer.**

In a standard **Time-Based** setup, every task in the DAG automatically shares the same `logical_date` because they all belong to the same "bucket."

But in **Asset-Based** scheduling, the Producer and Consumer are two completely separate "lifecycles."

Here is the full example with that logic integrated.

#### Full Example: Producer & Consumer with Context Bridge

```python
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task
from airflow.sdk import Asset

# 1. Define the Asset
events_asset = Asset("s3://data-bucket/events_file")

# ==========================================
# PRODUCER (Standard)
# ==========================================
with DAG(
    dag_id="01_producer_with_date",
    schedule="@daily",
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    catchup=False,
) as producer_dag:

    # Using PythonOperator to keep style consistent, 
    # but @task works too.
    def _fetch_events(**kwargs):
        # Logic to simulate saving a file named '2025-01-01.json'
        date = kwargs['ds']
        print(f"Creating file: /data/events/{date}.json")

    fetch_task = PythonOperator(
        task_id="fetch_events",
        python_callable=_fetch_events,
        outlets=[events_asset]  # <--- Updates the Asset
    )

# ==========================================
# CONSUMER (With your Advanced Logic)
# ==========================================
with DAG(
    dag_id="02_consumer_with_bridge",
    schedule=[events_asset],  # Triggered by Asset
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    catchup=False,
) as consumer_dag:

    def _calculate_stats(input_path, output_path):
        print(f"Reading from: {input_path}")
        print(f"Writing to:   {output_path}")

    # The Logic you requested
    calculate_stats = PythonOperator(
        task_id="calculate_stats",
        python_callable=_calculate_stats,
        op_kwargs={
            # This template digs into the trigger history to find the Producer's date
            "input_path": "/data/events/{{ (triggering_asset_events.values() | first | first).source_dag_run.logical_date | ds }}.json",
            "output_path": "/data/stats/{{ (triggering_asset_events.values() | first | first).source_dag_run.logical_date | ds }}.csv",
        },
    )
```

#### The Logic Breakdown: What is that huge template doing?

Let's dissect this line: `{{ (triggering_asset_events.values() | first | first).source_dag_run.logical_date }}`

1. `triggering_asset_events`: This is a dictionary available only in Asset-triggered runs. It contains a list of all the "events" (updates) that caused this DAG to start.
2. `.values() | first`: Since we likely only have one Asset triggering us, we grab the first list of events available.
3. `| first`: We grab the most recent specific event from that list.
4. `.source_dag_run`: This is the reference to the **Producer's** execution run.
5. `.logical_date`: This extracts the **Producer's** original scheduled date (e.g., `2025-01-01`).

#### Why the `| ds` at the end matters

In the code block above, note the added `| ds` to the very end of your template: `...logical_date | ds }}.json`

Without `| ds`, `logical_date` returns a full timestamp object (e.g., `2025-01-01T00:00:00+00:00`). Your file system usually prefers the clean `2025-01-01` string, so adding the `ds` filter ensures the filename is valid.

#### One Small Warning

This logic assumes **one** run of the Producer triggers **one** run of the Consumer.

If your Consumer is waiting for *multiple* Assets (e.g., Sales AND Marketing), you have to be careful, as you'll have multiple "Source Dates" to choose from. In that case, you'd usually look for the `logical_date` of the specific asset you are about to process.

***

### Example 2: Passing metadata along with the Asset

Here is a complete, production-style example that uses **Airflow 3 syntax** to pass a "Data Contract" through an Asset.

In this scenario, we use the `yield Metadata` approach. This is the cleanest way to ensure the Consumer knows exactly which file to look for, even if the Producer and Consumer are scheduled differently.

#### The Full Code

```python
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.sdk import Asset, Metadata
from airflow.operators.python import PythonOperator

# 1. Define the shared Asset
# This acts as the "mailbox" between the two DAGs
events_dataset = Asset("s3://data-lake/raw_events")

# ==========================================
# PRODUCER DAG: Fetches and "Tags" Data
# ==========================================
with DAG(
    dag_id="01_producer_metadata_sender",
    schedule="@daily",
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    catchup=False,
) as producer_dag:

    @task(outlets=[events_dataset])
    def fetch_and_tag_data(**context):
        # 1. Get the logical date for this run
        run_date = context['logical_date'].strftime("%Y-%m-%d")
        
        # 2. Simulate doing work (e.g., saving a file)
        file_path = f"/data/events_02/{run_date}.json"
        print(f"Producer: Created file at {file_path}")

        # 3. Yield the Metadata "Contract"
        # This is the 'Sticky Note' the consumer will read
        yield Metadata(
            events_dataset, 
            extra={"date": run_date}
        )

    fetch_and_tag_data()


# ==========================================
# CONSUMER DAG: Reads the "Tag" and Processes
# ==========================================
def _calculate_stats(input_path, output_path):
    print(f"Consumer is now reading: {input_path}")
    print(f"Consumer is now writing: {output_path}")

with DAG(
    dag_id="02_consumer_metadata_receiver",
    schedule=[events_dataset],  # Triggered the moment the Producer yields Metadata
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    catchup=False,
) as consumer_dag:

    # The 'triggering_asset_events' dictionary lets us read the Producer's sticky note
    calculate_stats = PythonOperator(
        task_id="calculate_stats",
        python_callable=_calculate_stats,
        op_kwargs={
            "input_path": "/data/events_02/{{ (triggering_asset_events.values() | first | first).extra.date }}.json",
            "output_path": "/data/stats_02/{{ (triggering_asset_events.values() | first | first).extra.date }}.csv",
        },
    )
```

***

#### Why this works so well

1. **Explicit Communication:** You aren't forcing the Consumer to "guess" the date based on when it was triggered. The Producer tells it exactly: *"Hey, I just finished the file for 2025-01-01."*
2. **No Logic Leakage:** The Consumer doesn't need to know the name of the Producer DAG or how it's structured. It only needs to know that the Asset it listens to provides a `date` field in the `extra` metadata.
3. **The "Yield" Pattern:** In Airflow 3, using `yield Metadata` is the preferred way to signal that a task has finished and created a specific version of a dataset.

#### How to verify it in your Environment

Once you run these DAGs in your Docker setup:

1. **Trigger the Producer.**
2. After it finishes, go to the **Consumer DAG** in the Airflow UI.
3. Click on the **"Rendered Template"** tab for the `calculate_stats` task.
4. You will see that the `input_path` has been transformed from the complex Jinja string into a clean path like `/data/events_02/2026-02-05.json`.

***

#### A Final Tip on the "Contract"

If you ever need to add more info, you can just expand the dictionary: `extra={"date": run_date, "batch_id": 42, "env": "prod"}`

Downstream, the Consumer can pick and choose which of those "sticky notes" it wants to read.

***

### Example 3: Consuming multiple assets

When you consume multiple assets, you are essentially creating a **Logical Gate** in your data pipeline. By default, Airflow 3 uses "AND" logic for multiple assets in a list, meaning the DAG will only trigger once **every** asset in the list has been updated at least once since the last time the consumer ran.

{% @mermaid/diagram content="graph LR
subgraph Producer1\["Producer 1"]
A\[Fetch events<br/>from source 1.]
end

```
subgraph Asset1["Asset<br/>file:///data/events_1"]
    DB1[(Database)]
end

subgraph Producer2["Producer 2"]
    B[Fetch events<br/>from source 2.]
end

subgraph Asset2["Asset<br/>file:///data/events_2"]
    DB2[(Database)]
end

subgraph Consumer["Consumer"]
    C[Calculate<br/>statistics.]
end

A -->|Update| DB1
B -->|Update| DB2
DB1 -->|Trigger when<br/>both data sets<br/>have been<br/>updated.| C
DB2 -->|Trigger when<br/>both data sets<br/>have been<br/>updated.| C

style Producer1 fill:#e8f5e9
style Producer2 fill:#e8f5e9
style Consumer fill:#e3f2fd
style Asset1 fill:#fff9c4
style Asset2 fill:#fff9c4" %}
```

Here is a comprehensive example using the **Context Manager** style. In this scenario, we have a "Dashboard" that requires both **Sales Data** and **Weather Data** to be ready before it can calculate its insights.

#### Full Example: Multiple Asset Consumer (AND Logic)

```python
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sdk import Asset, Metadata

# 1. Define the shared Assets
# These are usually defined in a common file and imported by all DAGs
sales_asset = Asset("s3://company/data/sales.csv")
weather_asset = Asset("s3://company/data/weather.json")

# ==========================================
# CONSUMER DAG: Waiting for Multiple Assets
# ==========================================
with DAG(
    dag_id="dashboard_aggregator",
    # Using a list implies AND logic: (sales_asset & weather_asset)
    schedule=[sales_asset, weather_asset], 
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    catchup=False,
    tags=["analytics", "consumer"]
) as dag:

    def _generate_combined_report(triggering_asset_events):
        """
        Unpacking metadata from multiple different sources.
        triggering_asset_events is a dict where keys are the Asset URIs.
        """
        # 1. Extract info for Sales
        sales_events = triggering_asset_events.get(sales_asset.uri)
        sales_date = sales_events[0].extra.get("date") if sales_events else "Unknown"
        
        # 2. Extract info for Weather
        weather_events = triggering_asset_events.get(weather_asset.uri)
        weather_city = weather_events[0].extra.get("city") if weather_events else "Global"

        print(f"Running combined report for Sales Date: {sales_date}")
        print(f"Correlating with Weather for: {weather_city}")
        
        # Actual processing logic would go here...

    # Task definition
    aggregate_data = PythonOperator(
        task_id="aggregate_sales_and_weather",
        python_callable=_generate_combined_report,
        # We pass the triggering_asset_events dict into our function
        # In Airflow 3, this is automatically available in the context
    )
```

***

#### Key Concepts in this Example

**1. The "AND" Trigger**

Because `schedule=[sales_asset, weather_asset]`, the behavior is as follows:

* If **Sales** updates: The consumer waits.
* If **Weather** updates: The consumer waits.
* Once **both** have sent at least one success signal since the last consumer run: **The Consumer Triggers**.

**2. Accessing Specific Metadata**

Since you are listening to two different things, `triggering_asset_events` now contains two sets of "sticky notes." To tell them apart, we access the dictionary using the unique **URI** of each asset:

`triggering_asset_events.get(sales_asset.uri)`

**3. Complexity of the "Contract"**

In this multi-asset setup, your **Data Contract** is now with two different teams (the Sales team and the Weather team). If either team changes their `extra` metadata key, this aggregator DAG will fail to find the data it needs.

***

#### What about "OR" Logic?

If you wanted the DAG to trigger whenever **either** Sales **or** Weather updated (without waiting for the other), Airflow 3 allows you to use the "Pipe" operator:

```python
# Trigger whenever ANY of these assets update
schedule=(sales_asset | weather_asset)

# or like this
schedule=(asset_1 | (asset_2 & asset_3))
```

#### Summary of the Graph View

If you look at this in the **Asset Graph UI**, you will see two separate Producer boxes on the left, with lines converging into a single box representing the Consumer DAG. This "Fan-In" visual makes it immediately obvious that the Dashboard depends on two distinct streams of data.

***

### Example 4: combining time and asset based scheduling

This is the "Safety Net" pattern in Airflow 3.x. It solves one of the biggest dilemmas in data engineering: **"Do I wait for the data to arrive, or do I run on a fixed schedule no matter what?"**

By using `AssetOrTimeSchedule`, you are essentially telling Airflow: **"Trigger this DAG whenever the Asset is updated, BUT if it hasn't updated by Wednesday at 1:00 AM, run it anyway."**

#### The Logic: "First Come, First Served"

As the name suggests, this is an **OR** operation between the clock and the data.

* **Scenario A (Data arrives early)**: The Producer finishes on Tuesday. The `example_dataset` is materialized. The Consumer DAG triggers immediately. The "timer" for Wednesday is reset.
* **Scenario B (Data is late/missing)**: It’s Wednesday at 1:00 AM. The Producer hasn't finished yet. Airflow says, "I can't wait any longer," and triggers the Consumer DAG regardless.

```python
from airflow.timetables.assets import AssetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable

schedule=AssetOrTimeSchedule(
    # The 'Time' part: Every Wednesday at 01:00 UTC
    timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"),
    # The 'Asset' part: Trigger whenever this file is updated
    assets=[example_dataset],
)
```

* `CronTriggerTimetable`: This is the standard Airflow 3 way to define a "point in time" trigger (unlike the interval-based cron we discussed at the very beginning).
* `assets`: A list of one or more assets. If you have multiple assets here, they usually follow AND logic (all must update) before they can "beat" the clock.

***
