# Branching

***

### Branching

In the traditional Airflow world, branching is done with a specific `BranchPythonOperator`. In **Airflow 3.x** and the **TaskFlow API**, branching is much more intuitive because it uses standard Python logic inside a decorated task.

The key to branching in TaskFlow is the `@task.branch` decorator.

***

#### How `@task.branch` works

A branching task’s job is to return the `task_id` (or a list of IDs) of the path that Airflow should follow next. Any paths not returned by the branching task are automatically **skipped**.

The Code Pattern:

```python
from airflow.decorators import dag, task
import pendulum

@dag(start_date=pendulum.datetime(2025, 1, 1), schedule="@daily", catchup=False)
def branching_logic_demo():

    @task.branch
    def choose_path():
        # You can put any logic here: 
        # checking metadata, database values, or dates.
        day = pendulum.now().day_of_week
        
        if day in [5, 6]:
            return "weekend_processing"  # Return the task_id of the next step
        else:
            return "weekday_processing"

    @task
    def weekday_processing():
        print("Handling the Monday-Friday grind.")

    @task
    def weekend_processing():
        print("Handling the Saturday-Sunday surge.")

    # Setting the dependencies
    choose_path() >> [weekday_processing(), weekend_processing()]

branching_logic_demo()
```

***

#### The "Join" Problem (Trigger Rules)

A common issue in branching is what happens **after** the branch. If you have a task that needs to run regardless of which path was taken (a "Join" point), you have to change its **Trigger Rule**.

By default, tasks have a trigger rule of `all_success`. If you branch, one path is **skipped**. Since a "skipped" task is not a "success," the final join task will also be skipped unless you fix it.

**The Solution:** `TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS`

```python
from airflow.utils.trigger_rule import TriggerRule

@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def final_report():
    print("This runs after either weekday or weekend processing is done.")

# The Flow:
# choose_path >> [weekday, weekend] >> final_report
path_choice = choose_path()
weekday = weekday_processing()
weekend = weekend_processing()

path_choice >> [weekday, weekend] >> final_report()
```

***

#### Branching based on Asset Metadata

If you’ve been learning about Assets, here is a "Pro" example. You can branch based on the metadata passed from a Producer.

Imagine your Taxi Data project: you only want to run heavy analytics if the Producer reports that more than 1,000 rows were ingested.

```python
@task.branch
def check_volume(triggering_asset_events=None):
    # Unpack the metadata we talked about earlier
    event = list(triggering_asset_events.values())[0][0]
    row_count = event.extra.get("row_count", 0)

    if row_count > 1000:
        return "heavy_analytics"
    else:
        return "light_summary"
```

***

#### Summary Checklist for Branching

| **Component**    | **Rule**                                                                       |
| ---------------- | ------------------------------------------------------------------------------ |
| **Decorator**    | Use `@task.branch` instead of `@task`.                                         |
| **Return Value** | Must return the `task_id` of the target task(s) as a string.                   |
| **Downstream**   | The branch task must be connected to all possible paths using `>>`.            |
| **The Join**     | If you merge paths later, set `trigger_rule` to `NONE_FAILED_MIN_ONE_SUCCESS`. |

#### Why use this over standard `@task`?

If you just used a regular `@task` and an `if/else` statement inside it, you would only see one big task in the UI. By using `@task.branch`, your Airflow Graph UI will show the separate paths clearly. This makes it much easier to see which logic path was taken just by looking at the colors (Green for success, Pink for skipped).

***

### Using `BranchPythonOperator`

```python
from airflow import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
import pendulum

with DAG(
    dag_id="traditional_branching",
    start_date=pendulum.datetime(2025, 1, 1),
    schedule="@daily",
    catchup=False
) as dag:

    def _choose_path():
        # Logic remains the same: return the task_id of the next step
        if pendulum.now().hour < 12:
            return "morning_task"
        return "afternoon_task"

    branching_node = BranchPythonOperator(
        task_id="branching_node",
        python_callable=_choose_path
    )

    morning = EmptyOperator(task_id="morning_task")
    afternoon = EmptyOperator(task_id="afternoon_task")

    # Define the structure
    branching_node >> [morning, afternoon]
```

#### Key Differences from TaskFlow

* Explicit IDs: You must ensure the strings returned by your function (`morning_task`) exactly match the `task_id` assigned to the downstream operators.
* Decoupled Logic: The logic function (`_choose_path`) lives outside the operator definition, which can make it easier to unit test independently.
* Structure: You use the `>>` bitshift operator at the bottom to define the tree, rather than calling functions like Python objects.

***
