# Conditional tasks

***

Implementing conditions between tasks is how you add "intelligence" to your pipeline.

***

### Common Ways to Implement Conditions

In Airflow, you generally have three levels of "conditions":

* **Python Logic (Inside the Task)**: Use standard `if/else` inside a `@task`. The task always runs, but the code only executes under certain conditions.
* **Branching** (`@task.branch`): This skips entire downstream paths.
* **Short-Circuiting** (`ShortCircuitOperator`): This is like a "Kill Switch." If the condition is False, the entire pipeline stops immediately.

***

### Example: **Use Airflow's Built-in `LatestOnlyOperator`**

Airflow actually provides this pattern out of the box:

```python

from airflow.decorators import dag, task
from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
import pendulum

@dag(start_date=pendulum.datetime(2024, 1, 1), schedule="@daily", catchup=True)
def deploy_pipeline():

    # 1. The "Safety Valve"
    is_latest = LatestOnlyOperator(task_id="check_if_latest")

    @task
    def train_model():
        print("Training model on historical data...")

    @task
    def deploy_to_production():
        print("🚀 DEPLOYING! Only happens for the most recent run.")

    # 2. The Chain
    # train_model runs for EVERY backfill date.
    # deploy_to_production ONLY runs for today.
    train_model() >> is_latest >> deploy_to_production()

deploy_pipeline()
```

#### **How This Works**

The `LatestOnlyOperator` performs the exact calculation from your example automatically:

1. It looks at the current time in the real world.
2. It looks at the Data Interval of the current DAG run.
3. If "Now" is NOT inside the current interval, it concludes: "This must be a backfill/historical run."
4. It then skips all tasks directly **downstream** of it.

#### **Scenario: Backfilling with Catchup**

Let's say you enable `catchup=True` and have missed runs:

```
Today: Feb 5, 2026
Missed runs: Feb 1, Feb 2, Feb 3, Feb 4
```

Airflow triggers all 5 runs:

**Without `latest_only`:**

```
✅ Feb 1: train_model → deploy_model (deploys model v1)
✅ Feb 2: train_model → deploy_model (deploys model v2, overwrites v1)
✅ Feb 3: train_model → deploy_model (deploys model v3, overwrites v2)
✅ Feb 4: train_model → deploy_model (deploys model v4, overwrites v3)
✅ Feb 5: train_model → deploy_model (deploys model v5, overwrites v4)
```

**Result:** 5 unnecessary deployments! Only the last one matters.

***

**With `latest_only`:**

```
✅ Feb 1: train_model → latest_only (checks: not latest) → ⊗ deploy_model SKIPPED
✅ Feb 2: train_model → latest_only (checks: not latest) → ⊗ deploy_model SKIPPED
✅ Feb 3: train_model → latest_only (checks: not latest) → ⊗ deploy_model SKIPPED
✅ Feb 4: train_model → latest_only (checks: not latest) → ⊗ deploy_model SKIPPED
✅ Feb 5: train_model → latest_only (checks: IS latest!) → ✅ deploy_model (deploys model v5)
```

**Result:** Only 1 deployment - the most recent model! 🎯

***

#### **Visual DAG Structure**

```
┌──────────────┐
│ train_model  │
└──────┬───────┘
       │
       ▼
┌──────────────┐
│ latest_only  │  ← Conditional gate
└──────┬───────┘
       │
       ▼
┌──────────────┐
│ deploy_model │  ← Only runs if latest_only passes
└──────────────┘
```

**Benefits:**

* Can use **any operator** for deployment (Bash, Kubernetes, Docker, etc.)
* UI clearly shows SKIPPED vs SUCCESS
* Clean separation: gate = control flow, deploy = business logic
* Deployment code doesn't need to know about Airflow internals

***

#### Key Distinctions and "Gotchas"

* **Direct Downstream Only:** It only skips tasks that are *directly* connected after it. If you have a complex web, ensure the "LatestOnly" node is the bottleneck for all sensitive tasks.
* **External Triggers:** By default, if you manually click "Trigger DAG" in the UI, the `LatestOnlyOperator` will not skip downstream. It assumes if a human clicked the button, they *want* the whole thing to run regardless of the date.
* **Trigger Rules:** If you have a "Join" task later in the DAG (like a cleanup task) that needs to run even during backfills, remember to set its `trigger_rule` to `ALL_DONE` so it doesn't get swept up in the skipping process.

***
