Conditional tasks


Implementing conditions between tasks is how you add "intelligence" to your pipeline.


Common Ways to Implement Conditions

In Airflow, you generally have three levels of "conditions":

  • Python Logic (Inside the Task): Use standard if/else inside a @task. The task always runs, but the code only executes under certain conditions.

  • Branching (@task.branch): This skips entire downstream paths.

  • Short-Circuiting (ShortCircuitOperator): This is like a "Kill Switch." If the condition is False, the entire pipeline stops immediately.


Example: Use Airflow's Built-in LatestOnlyOperator

Airflow actually provides this pattern out of the box:


from airflow.decorators import dag, task
from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
import pendulum

@dag(start_date=pendulum.datetime(2024, 1, 1), schedule="@daily", catchup=True)
def deploy_pipeline():

    # 1. The "Safety Valve"
    is_latest = LatestOnlyOperator(task_id="check_if_latest")

    @task
    def train_model():
        print("Training model on historical data...")

    @task
    def deploy_to_production():
        print("🚀 DEPLOYING! Only happens for the most recent run.")

    # 2. The Chain
    # train_model runs for EVERY backfill date.
    # deploy_to_production ONLY runs for today.
    train_model() >> is_latest >> deploy_to_production()

deploy_pipeline()

How This Works

The LatestOnlyOperator performs the exact calculation from your example automatically:

  1. It looks at the current time in the real world.

  2. It looks at the Data Interval of the current DAG run.

  3. If "Now" is NOT inside the current interval, it concludes: "This must be a backfill/historical run."

  4. It then skips all tasks directly downstream of it.

Scenario: Backfilling with Catchup

Let's say you enable catchup=True and have missed runs:

Airflow triggers all 5 runs:

Without latest_only:

Result: 5 unnecessary deployments! Only the last one matters.


With latest_only:

Result: Only 1 deployment - the most recent model! 🎯


Visual DAG Structure

Benefits:

  • Can use any operator for deployment (Bash, Kubernetes, Docker, etc.)

  • UI clearly shows SKIPPED vs SUCCESS

  • Clean separation: gate = control flow, deploy = business logic

  • Deployment code doesn't need to know about Airflow internals


Key Distinctions and "Gotchas"

  • Direct Downstream Only: It only skips tasks that are directly connected after it. If you have a complex web, ensure the "LatestOnly" node is the bottleneck for all sensitive tasks.

  • External Triggers: By default, if you manually click "Trigger DAG" in the UI, the LatestOnlyOperator will not skip downstream. It assumes if a human clicked the button, they want the whole thing to run regardless of the date.

  • Trigger Rules: If you have a "Join" task later in the DAG (like a cleanup task) that needs to run even during backfills, remember to set its trigger_rule to ALL_DONE so it doesn't get swept up in the skipping process.


Last updated