# \*\*context

***

Think of the **Context Dictionary as Airflow's "Shared Brain."** At every moment a task is running, Airflow creates a massive Python dictionary that contains every possible detail about the current run: what time it is, which task is running, where the files are, and what happened in the past.

***

#### What is actually inside it?

If you were to print the `context`, you would see dozens of keys. Here are the "VIPs" you actually care about:

| **Key**                   | **Description**                                                         |
| ------------------------- | ----------------------------------------------------------------------- |
| `ds`                      | The datestamp (e.g., `2026-02-05`).                                     |
| `logical_date`            | The full Pendulum object for the start of the interval.                 |
| `dag_run`                 | The actual object representing this specific execution.                 |
| `task_instance` (or `ti`) | The object representing the specific task's attempt.                    |
| `params`                  | Any custom parameters you passed manually to the DAG.                   |
| `triggering_asset_events` | (Airflow 3 exclusive) The metadata from assets that triggered this run. |

***

#### How to "Summon" the Context (Airflow 3.x)

In Airflow 3, there are three main ways to get this information.

**Method A: The "Explicit" Way (Recommended for `@task`)**

Airflow supports **Function Argument Injection**. You don't have to take the whole dictionary; you can just ask for the specific keys you want.

```python
@task
def my_python_task(ds, logical_date, ti):
    # Airflow sees the names 'ds' and 'ti' and injects them automatically
    print(f"Running for date: {ds}")
    print(f"This is attempt number: {ti.try_number}")
```

**Method B: The "Catch-All" Way (`**context`)**

Use this if you are writing a complex helper function and you aren't sure exactly which variables you'll need yet.

```python
@task
def complex_task(**context):
    # You reach into the 'brain' manually
    date = context['ds']
    dag_id = context['dag'].dag_id
    print(f"DAG {dag_id} is running for {date}")
```

**Method C: The "Template" Way (Jinja)**

This is what you use inside **Operators** like `BashOperator` or `PythonOperator` arguments. You don't write Python code here; you use the double curly braces.

```python
fetch_task = BashOperator(
    task_id="fetch",
    bash_command="echo 'The date is {{ ds }}'" # Airflow swaps {{ ds }} for the value in context
)
```

***

#### When should you use it?

You should reach for the context whenever your code needs to be **Dynamic** instead of **Hardcoded**.

* **To find Files:** Instead of `/data/my_file.json`, use `/data/{{ ds }}.json`.
* **To query Databases:** `SELECT * FROM table WHERE created_at = '{{ ds }}'`.
* **To handle Metadata:** Accessing `triggering_asset_events` to see what the Producer sent.
* **To Branch Logic:** `if context['logical_date'].day_of_week == 1: # Only run on Mondays`.

***

#### The Distinction: Why do we need it?

The reason we use the **Context** instead of Python's standard `datetime.now()` is **Idempotency**.

If you use `datetime.now()`, your code is tied to the **real-world clock**. If you re-run a failed job from three days ago, it will use *today's* date and produce the wrong data.

If you use `context['ds']`, you are tied to the **Airflow interval**. If you re-run a job from three days ago, the context will correctly say, "The date for this run is still three days ago," and your data will be perfect.

***

#### A Quick "Cheat Sheet" for Airflow 3

Airflow 3 introduced the **Task SDK**, which makes the context even cleaner. You can now import specific objects to help your IDE give you autocomplete:

```python
from airflow.sdk import get_current_context

@task
def clean_task():
    # You can even get context WITHOUT passing it as an argument!
    context = get_current_context()
    print(context["ds"])
```

***
