Taskflow API


Taskflow APIarrow-up-right


Two "superpowers" of the TaskFlow API (the modern way of writing Airflow DAGs using decorators like @task).

These features allow you to write cleaner code by separating your logic (the Python code) from your configuration (how Airflow runs it).

Error Handling (Retries)

"Don't fail immediately if the internet blips."

In the world of data engineering, things break randomly. An API might time out, or a database might be momentarily locked. These are called transient failures.

If you don't set retries, your DAG fails the moment an error occurs, and you get woken up at 3 AM.

  • The Code: @task(retries=3)

  • What it does: If the function raises an exception (crashes), Airflow will catch it, wait (default is 5 minutes), and try running the code again. It will do this 3 times. It only marks the task as "Failed" if it fails the 4th time.


Task Parameterization (The .override() method)

"Write the code once, use it differently everywhere."

This is about reusability. Imagine you write a Python function that downloads a file. You want to use this exact same logic to download file A, file B, and file C.

In the old days, you might have copied and pasted the code or written complex wrappers. In the TaskFlow API, the decorated function acts like a blueprint. You can stamp out multiple copies of that task, changing the Airflow settings (like the task_id or retries) for each copy without changing the Python logic.

Visualizing .override()

Think of .override() as a modifier that intercepts the task before it runs.

spinner

The Syntax Breakdown

The syntax task.override(...)(...) looks weird because it is chaining two steps:

  1. add_task.override(task_id="start"): This creates a copy of the task configuration with the new ID "start". It hasn't run yet.

  2. (1, 2): This passes the actual arguments (1 and 2) to that copy and creates the task instance in the DAG.

A Concrete Example

Here is how you would use both in a real DAG.

Why this is huge: You can import ingest_data from a shared file (like include/common_tasks.py) and use it in 50 different DAGs, giving it a unique name and retry policy in every single one.


Last updated