Conditional tasks
Common Ways to Implement Conditions
Example: Use Airflow's Built-in LatestOnlyOperator
LatestOnlyOperator
from airflow.decorators import dag, task
from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
import pendulum
@dag(start_date=pendulum.datetime(2024, 1, 1), schedule="@daily", catchup=True)
def deploy_pipeline():
# 1. The "Safety Valve"
is_latest = LatestOnlyOperator(task_id="check_if_latest")
@task
def train_model():
print("Training model on historical data...")
@task
def deploy_to_production():
print("🚀 DEPLOYING! Only happens for the most recent run.")
# 2. The Chain
# train_model runs for EVERY backfill date.
# deploy_to_production ONLY runs for today.
train_model() >> is_latest >> deploy_to_production()
deploy_pipeline()How This Works
Scenario: Backfilling with Catchup
Visual DAG Structure
Key Distinctions and "Gotchas"
Last updated