# Interval-based scheduling

***

### **The Core Concept: What is a Data Interval?**

A **data interval** represents a specific time window of data your DAG is processing. Instead of thinking "run this DAG at 9 AM," you think "process data from 8 AM to 9 AM."

***

#### **Key Variables Explained**

```python
# For a daily DAG run on 2024-01-02 at midnight:
data_interval_start = 2024-01-01 00:00:00  # Start of the window
data_interval_end   = 2024-01-02 00:00:00  # End of the window
logical_date        = 2024-01-01 00:00:00  # Usually same as start
```

**The DAG runs AFTER the interval completes**, so it processes yesterday's data.

***

#### **Example 1: Daily Data Processing**

```python
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.timetables.interval import CronDataIntervalTimetable

with DAG(
    dag_id="daily_sales_report",
    schedule=CronDataIntervalTimetable("@daily", timezone="UTC"),
    start_date=pendulum.datetime(2024, 1, 1),
    catchup=True,
):
    # This fetches data FOR the previous day
    fetch_sales = BashOperator(
        task_id="fetch_sales",
        bash_command=(
            "echo 'Processing sales from "
            "{{data_interval_start}} to {{data_interval_end}}' && "
            "curl 'https://api.example.com/sales?"
            "start={{data_interval_start | ds}}&"
            "end={{data_interval_end | ds}}' "
            "> /data/sales_{{logical_date | ds}}.json"
        ),
    )
```

**What happens:**

* **2024-01-02 00:00:01** (just after midnight): DAG runs
  * Processes data from `2024-01-01 00:00` to `2024-01-02 00:00`
  * Saves to `/data/sales_2024-01-01.json`
* **2024-01-03 00:00:01**: DAG runs again
  * Processes data from `2024-01-02 00:00` to `2024-01-03 00:00`
  * Saves to `/data/sales_2024-01-02.json`

***

#### **Example 2: Every 2 Days Processing**

```python
from airflow.timetables.interval import DeltaDataIntervalTimetable

with DAG(
    dag_id="bi_daily_report",
    schedule=DeltaDataIntervalTimetable(pendulum.duration(days=2)),
    start_date=pendulum.datetime(2024, 1, 1),
):
    process_data = BashOperator(
        task_id="process",
        bash_command=(
            "echo 'Processing 2-day window: "
            "{{data_interval_start | ds}} to {{data_interval_end | ds}}'"
        ),
    )
```

**Timeline:**

```
Interval 1: Jan 1 00:00 → Jan 3 00:00 | Runs: Jan 3 00:00:01
Interval 2: Jan 3 00:00 → Jan 5 00:00 | Runs: Jan 5 00:00:01
Interval 3: Jan 5 00:00 → Jan 7 00:00 | Runs: Jan 7 00:00:01
```

***

#### **Example 3: Hourly Data Partitioning**

```python
with DAG(
    dag_id="hourly_metrics",
    schedule=CronDataIntervalTimetable("0 * * * *", timezone="UTC"),  # Every hour
    start_date=pendulum.datetime(2024, 1, 1, 0, 0),
    catchup=True,
):
    calculate_metrics = PythonOperator(
        task_id="calculate_metrics",
        python_callable=_calculate_hourly_metrics,
        op_kwargs={
            # Each hour gets its own file
            "input_path": "/data/logs/{{data_interval_start.strftime('%Y-%m-%d-%H')}}.log",
            "output_path": "/data/metrics/{{logical_date | ds}}/hour_{{data_interval_start.hour}}.csv",
        },
    )
```

**What happens at 2024-01-01 03:00:01:**

* Processes logs from `02:00 to 03:00`
* Reads: `/data/logs/2024-01-01-02.log`
* Writes: `/data/metrics/2024-01-01/hour_2.csv`

***

#### **Why This Matters: The Partitioning Benefit**

Looking at your example:

```python
fetch_events = BashOperator(
    bash_command=(
        "curl -o /data/events/{{logical_date | ds}}.json "  # One file per day
        "'http://events-api:8081/events/range?"
        "start_date={{data_interval_start | ds}}&"
        "end_date={{data_interval_end | ds}}'"
    ),
)

calculate_stats = PythonOperator(
    op_kwargs={
        "input_path": "/data/events/{{logical_date | ds}}.json",  # Process only today's file
        "output_path": "/data/stats/{{logical_date | ds}}.csv",
    },
)
```

**Benefits:**

1. **Incremental processing**: Each run only processes new data
2. **Easy backfills**: Re-run 2024-01-15 without touching other days
3. **Clear data lineage**: File `2024-01-15.csv` came from `2024-01-15.json`
4. **Parallelization**: Different days = different files = no conflicts

***

#### **Data Interval vs Trigger-Based: The Key Difference**

```python
# TRIGGER-BASED (old way)
schedule="@daily"
# Runs AT 00:00, doesn't know what time window it represents

# DATA INTERVAL-BASED (new way)
schedule=CronDataIntervalTimetable("@daily", timezone="UTC")
# Runs AFTER 00:00, knows it's processing "yesterday's data"
```

**Another explanation:**

```python
schedule=CronDataIntervalTimetable("@daily", timezone="UTC")
start_date=pendulum.datetime(2024, 1, 1)

# What happens:
# Jan 1 00:00 → Jan 2 00:00: Data accumulates
# Jan 2 00:00:01: DAG runs and processes ALL of Jan 1's data

# Trigger-based: Which day am I processing? 🤷
bash_command="process_data.py --date {{execution_date | ds}}"

# Interval-based: I'm processing data from start to end! ✅
bash_command="process_data.py --start {{data_interval_start | ds}} --end {{data_interval_end | ds}}"
```

**Practical difference:**

```python
# Trigger-based: Which day am I processing? 🤷
bash_command="process_data.py --date {{execution_date | ds}}"

# Interval-based: I'm processing data from start to end! ✅
bash_command="process_data.py --start {{data_interval_start | ds}} --end {{data_interval_end | ds}}"
```

***

#### **Common Pattern: Daily ETL**

```python
with DAG(
    dag_id="daily_etl",
    schedule=CronDataIntervalTimetable("@daily", timezone="UTC"),
    start_date=pendulum.datetime(2024, 1, 1),
):
    extract = BashOperator(
        task_id="extract",
        bash_command="extract_data.sh {{data_interval_start | ds}} {{data_interval_end | ds}}"
    )
    
    transform = PythonOperator(
        task_id="transform",
        python_callable=transform_data,
        op_kwargs={
            "partition_date": "{{logical_date | ds}}",  # Use as partition key
        },
    )
    
    load = BashOperator(
        task_id="load",
        bash_command="load_to_warehouse.sh --partition={{logical_date | ds}}"
    )
```

The **data interval** ensures every run knows exactly which slice of time it's responsible for, making your data pipeline predictable and debuggable.

***

### Irregular intervals

You're dealing with a very specific problem here: **Life doesn't always happen on a grid.**

While most data engineering is regular (daily sales, hourly logs), some things happen randomly or sporadically—like holidays, elections, or specific marketing campaigns.

Here is how Airflow handles **Irregular Intervals** using the **`EventsTimetable`**.

#### **The Problem: The "Empty Check" Waste**

If you want to process data only on public holidays, the "standard" way would be to run a DAG every single day and add a Python `if` statement:

* Jan 1 (Holiday): Run logic.
* Jan 2 (Normal): Skip.
* Jan 3 (Normal): Skip.

This is inefficient. You are creating thousands of "skipped" runs just to catch a few active days.

#### **The Solution: `EventsTimetable`**

Airflow's **`EventsTimetable`** allows you to bypass the calendar entirely. Instead of saying "Every Day," you provide a hardcoded **List of Dates**.

The DAG effectively sleeps until the clock hits one of the specific timestamps you provided.

```python
from airflow.timetables.events import EventsTimetable
import pendulum

# Define your specific list of dates
sporadic_dates = EventsTimetable(
    event_dates=[
        pendulum.datetime(2024, 1, 1),  # New Year
        pendulum.datetime(2024, 3, 31), # Easter (Example)
        pendulum.datetime(2024, 5, 2),  # Random Event
    ]
)

with DAG(
    dag_id="07_events_timetable",
    schedule=sporadic_dates, # <--- Pass the list here
    ...
):
    ...

```

#### **The Trade-Off: Trigger vs. Interval**

This is the most critical part to understand. **`EventsTimetable` behaves like a Trigger, not an Interval.**

* **No Buckets**: Because the dates are irregular (Jan 1st, then March 31st), there is no "interval" between them.
* **Missing Variables**: You cannot rely on `data_interval_start` or `data_interval_end`. They don't exist in a meaningful way.
* **Manual Math**: If you need to process data "for the holiday," you have to write your own Python logic to decide what time window that covers (e.g., "Take the last 24 hours").

#### **Constraints to Watch Out For**

1. **Finite List**: You cannot write a function that says "Every full moon forever." You must provide an actual list of datetime objects.
2. **List Size**: Don't put 10,000 dates in this list. Airflow has to load this list every time it parses the DAG, so a massive list will slow down your system.

#### **Controlling Manual Runs (`restrict_to_events`)**

By default, even if you schedule a DAG for "Holidays Only," you can go into the UI and trigger it for a random Tuesday.

If you want to prevent this, you can use the **`restrict_to_events=True`** flag.

```python
holiday_schedule = EventsTimetable(
    event_dates=[...],
    restrict_to_events=True  # <--- The Lock
)

```

**What this does:**

If you try to manually trigger the DAG on a random Tuesday, Airflow won't let you pick "Tuesday." It will force the manual run to align with the most recent valid event (e.g., the last Holiday that passed). This ensures your code always runs with a valid "Holiday" context, even when triggered manually.

***
