# Taskflow API

***

[Taskflow API](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html)

***

Two "superpowers" of the TaskFlow API (the **modern way of writing Airflow DAGs** using decorators like `@task`).

These features allow you to write cleaner code by separating your logic (the Python code) from your configuration (how Airflow runs it).

#### Error Handling (Retries)

**"Don't fail immediately if the internet blips."**

In the world of data engineering, things break randomly. An API might time out, or a database might be momentarily locked. These are called transient failures.

If you don't set retries, your DAG fails the moment an error occurs, and you get woken up at 3 AM.

* **The Code:** `@task(retries=3)`
* **What it does:** If the function raises an exception (crashes), Airflow will catch it, wait (default is 5 minutes), and try running the code again. It will do this 3 times. It only marks the task as "Failed" if it fails the 4th time.

***

#### Task Parameterization (The `.override()` method)

**"Write the code once, use it differently everywhere."**

This is about **reusability**. Imagine you write a Python function that downloads a file. You want to use this exact same logic to download file A, file B, and file C.

In the old days, you might have copied and pasted the code or written complex wrappers. In the TaskFlow API, the decorated function acts like a blueprint. You can stamp out multiple copies of that task, changing the Airflow settings (like the `task_id` or `retries`) for each copy without changing the Python logic.

**Visualizing `.override()`**

Think of `.override()` as a modifier that intercepts the task before it runs.

{% @mermaid/diagram content="flowchart TD
subgraph Definition
Blueprint\[("Python Function
@task def process\_data()")]
end

```
subgraph "DAG Execution"
    Task1["Task 1 id: process_data retries: 0"]
    Task2["Task 2 id: special 
```

retries: 5"]
end

```
Blueprint -->|"Default Call"| Task1
Blueprint -->|".override(task_id='special', retries=5)"| Task2

style Blueprint fill:#f9f,stroke:#333,stroke-width:2px
style Task1 fill:#e1f5fe,stroke:#333
style Task2 fill:#fff9c4,stroke:#333
```

" %}

#### The Syntax Breakdown

The syntax `task.override(...)(...)` looks weird because it is chaining two steps:

1. `add_task.override(task_id="start")`: This creates a *copy* of the task configuration with the new ID "start". It hasn't run yet.
2. `(1, 2)`: This passes the actual arguments (1 and 2) to that *copy* and creates the task instance in the DAG.

#### A Concrete Example

Here is how you would use both in a real DAG.

```python
import pendulum
from airflow.decorators import dag, task

@dag(
    schedule=None,
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"]
)
def reusable_task_example():

    # 1. Define a generic task (The Blueprint)
    # Default behavior: 1 retry
    @task(retries=1)
    def ingest_data(source_name: str):
        print(f"Ingesting from {source_name}...")
        # Imagine flaky code here

    # 2. Use it normally (Uses default ID 'ingest_data' and 1 retry)
    task_a = ingest_data("Postgres")

    # 3. Reuse it with .override() (New ID 'ingest_critical', 5 retries!)
    # We increase retries because this specific source is very flaky.
    task_b = ingest_data.override(
        task_id="ingest_critical_source", 
        retries=5
    )("Salesforce")

    task_a >> task_b

dag_instance = reusable_task_example()
```

**Why this is huge:** You can import `ingest_data` from a shared file (like `include/common_tasks.py`) and use it in 50 different DAGs, giving it a unique name and retry policy in every single one.

***
