Sensors: Trigger workflows with external inputs


Sensors

Think of a Sensor as a specialized "waiter" in your pipeline. While a normal task does something (like running a SQL query or moving a file), a Sensor watches for something to happen before letting the rest of the DAG proceed.

How a Sensor Works (The "Poke" Cycle)

Imagine you are waiting for a taxi. Every 60 seconds, you walk to the window to see if it’s there.

  • The Poke: Checking the window is the "poke." By default, a sensor does this every 60 seconds (poke_interval).

  • The Success: If the taxi is there, you leave. The sensor task turns green, and the next task starts.

  • The Timeout: If you wait for 7 days (the default timeout) and no taxi comes, you give up. The sensor fails.


Efficiency: Poke vs. Reschedule

There are two ways a sensor can "wait":

  • Poke Mode (The "Hog"): The sensor stays active the whole time. It occupies a "worker slot" (a seat at the table) even when it's just sitting there doing nothing between pokes. If you have 10 sensors in poke mode, you might run out of workers for your actual data tasks.

  • Reschedule Mode (The "Smart" Way): The sensor pokes, sees the condition isn't met, and then shuts itself down and frees up its worker slot. It schedules itself to wake up again after the poke_interval. While it sleeps, its status in the UI is up_for_reschedule.


Implementation: The Modern Way

In Airflow 3, you'll likely use the @task.sensor decorator. It looks like a regular Python function but behaves like a watchman.

from airflow.decorators import task, dag
import pendulum

@dag(start_date=pendulum.now(), schedule="@daily")
def sensor_example():

    @task.sensor(poke_interval=300, timeout=3600, mode="reschedule")
    def wait_for_taxi_data():
        # Logic to check if data exists
        import os
        return os.path.exists("/data/taxi_trips.csv")

    @task
    def process_data():
        print("Data found! Starting processing...")

    wait_for_taxi_data() >> process_data()

sensor_example()

The Relationship: poke_interval vs. timeout

These two parameters work together to manage the lifecycle of your "waiter."

  • poke_interval (Frequency): This determines how often the sensor checks the condition. If set to 300 seconds (5 minutes), the sensor wakes up, checks if the condition is met, and if not, goes back to sleep for 5 minutes.

  • timeout (The Deadline): This is the maximum amount of time the sensor is allowed to wait in total before it gives up and fails.

The Math: The number of times a sensor will check the condition is roughly timeoutpoke_interval\frac{timeout}{poke\_interval}.

If you have a timeout of 3600 (1 hour) and a poke_interval of 600 (10 minutes), your sensor will "poke" the other DAG 6 times before failing.


Example: ExternalTaskSensor using Taxi Analytics example

To set up a relationship between your two DAGs, we use the ExternalTaskSensor. This is essentially a "cross-DAG" sensor that allows the Taxi Analytics DAG to look over at the Taxi Ingestion DAG and wait for a specific task to turn green.

Implementation: Connecting Ingestion to Analytics

In this example, the Analytics DAG will wait for the ingest_taxi_data task in the Ingestion DAG to finish successfully for the same logical_date.


Critical Rules for ExternalTaskSensor

When you use this sensor to link your Taxi Data projects, keep these two rules in mind:

  1. Matching Schedules: By default, the sensor looks for a task run with the exact same logical_date. If your Ingestion DAG runs at 01:00 and your Analytics DAG runs at 02:00, the sensor will fail because it can't find a 02:00 run for Ingestion. You would need to use an execution_delta to offset the time.

  2. Reschedule Mode is King: For cross-DAG sensors, always use mode="reschedule". Because you are waiting for an entire other pipeline to run, the wait time could be long. Using reschedule ensures that your Analytics DAG doesn't block other tasks from running while it's just "watching" the Ingestion DAG.


Adding execution_delta parameter to the previous example

In a complex data project like your taxi data pipeline, it is common for DAGs to run on different schedules. For example, your Ingestion DAG might run at midnight, but your Analytics DAG might not start until 2:00 AM.

By default, the ExternalTaskSensor is very strict: it looks for a task that has the exact same logical_date (down to the second) as the sensor itself. If the times don't match, the sensor will wait forever and eventually timeout.

The execution_delta Solution

The execution_delta parameter allows you to tell the sensor to "look back in time." It represents the difference between the two DAG schedules.

The Math: Analytics Start Time - Ingestion Start Time = execution_delta

If Analytics starts at 02:00 and Ingestion starts at 00:00, your delta is 2 hours.

Implementation Example


Visualizing the "Time Travel"

Think of the execution_delta as a window into the past:

  1. Sensor starts at its scheduled time (e.g., Feb 14, 02:00).

  2. Sensor applies delta: It subtracts 2 hours from its own time.

  3. Sensor searches: It looks for a task in the taxi_ingestion DAG that has a logical date of Feb 14, 00:00.

  4. Sensor evaluates: If that specific midnight run is green, the sensor succeeds immediately.

What if the schedules are inconsistent? (execution_date_fn)

If your schedules don't have a fixed mathematical difference (e.g., one runs daily and the other runs on "the last Friday of the month"), execution_delta won't work.

In that case, you use execution_date_fn. This allows you to write a small Python function that tells the sensor exactly which date and time to look for.

Summary Comparison

Parameter

Best For...

Default (None)

DAGs that share the exact same schedule (e.g., both @daily).

execution_delta

DAGs with fixed offsets (e.g., one is always 2 hours after the other).

execution_date_fn

Complex relationships where the delta changes based on the day or month.


Do you usually wrap sensors in tasks or not?

In Airflow, the answer depends on whether you are using the Traditional Operator style or the modern TaskFlow API.

Traditional Style (Not Wrapped)

When using standard sensors like the S3KeySensor or ExternalTaskSensor, you do not wrap them. They are standalone tasks themselves. You simply instantiate the class and assign it to a variable within your DAG context.

TaskFlow API Style (Wrapped)

If you are writing custom logic—like checking a specific API response or a complex file pattern—you do wrap your logic using the @task.sensor decorator. This turns a regular Python function into a fully functional Airflow Sensor.


Key Distinctions for your Projects

  • Pre-built logic? Use the Operator (no wrap). If you need to wait for a file in S3 or a task in another DAG, use S3KeySensor or ExternalTaskSensor directly.

  • Custom logic? Use the Decorator (wrap). If you need to check if a specific row exists in a database with a unique calculation, wrap that logic in @task.sensor.

Summary of the "Wrap" Decision

Method

What you write

Why use it?

Traditional Sensor

sensor_task = S3KeySensor(...)

Fastest way to use standard "waiting" logic (S3, SQL, External Tasks).

TaskFlow Sensor

@task.sensor wrapping a function

Best for custom Python conditions that don't have a dedicated operator.

A Note on the "Reschedule" Mode

Regardless of whether you wrap or not, if you expect the wait to be long, always set mode="reschedule". This prevents the sensor from "hogging" a worker slot while it waits for the next poke_interval.


Last updated