Sensors: Trigger workflows with external inputs
Sensors
How a Sensor Works (The "Poke" Cycle)
Efficiency: Poke vs. Reschedule
Implementation: The Modern Way
from airflow.decorators import task, dag
import pendulum
@dag(start_date=pendulum.now(), schedule="@daily")
def sensor_example():
@task.sensor(poke_interval=300, timeout=3600, mode="reschedule")
def wait_for_taxi_data():
# Logic to check if data exists
import os
return os.path.exists("/data/taxi_trips.csv")
@task
def process_data():
print("Data found! Starting processing...")
wait_for_taxi_data() >> process_data()
sensor_example()The Relationship: poke_interval vs. timeout
poke_interval vs. timeoutExample: ExternalTaskSensor using Taxi Analytics example
ExternalTaskSensor using Taxi Analytics exampleImplementation: Connecting Ingestion to Analytics
Critical Rules for ExternalTaskSensor
ExternalTaskSensorAdding execution_delta parameter to the previous example
execution_delta parameter to the previous exampleThe execution_delta Solution
execution_delta SolutionImplementation Example
Visualizing the "Time Travel"
What if the schedules are inconsistent? (execution_date_fn)
execution_date_fn)Summary Comparison
Do you usually wrap sensors in tasks or not?
Traditional Style (Not Wrapped)
TaskFlow API Style (Wrapped)
Key Distinctions for your Projects
Summary of the "Wrap" Decision
A Note on the "Reschedule" Mode
Last updated