# Pure ETL tasks

### Pure ETL Tasks

#### Definition

Pure ETL tasks are the data engineering equivalent of pure functions in programming.

```
PURE ETL TASK CHARACTERISTICS:

✓ Idempotent
  └─ Run 1x or 100x = same result

✓ Deterministic  
  └─ Same inputs = same outputs always

✓ No side effects
  └─ Only affects its designated output

✓ Uses immutable sources
  └─ Reads from partitions that don't change

✓ Targets single partition
  └─ One task = one output partition

✓ Never mutates data
  └─ Only INSERT or INSERT OVERWRITE
```

#### The Traditional Problem

```
IMPURE ETL (the old way):

Task: Load daily sales aggregates
┌─────────────────────────────────────┐
│ 1. READ from sales_raw              │
│ 2. UPDATE sales_agg                 │ ← Mutation!
│    SET amount = amount + new_amount │
│    WHERE date = today               │
│ 3. APPEND to sales_log              │ ← Append!
│ 4. DELETE old records               │ ← Deletion!
│ 5. Send email notification          │ ← Side effect!
└─────────────────────────────────────┘

What happens if this fails halfway?
┌─────────────────────────────────────┐
│ • Some records updated, some not    │
│ • Log partially written             │
│ • Can't safely rerun                │
│ • Double-counting risk              │
│ • Corrupted state                   │
└─────────────────────────────────────┘
```

#### Maxime's Pure Task Solution

```
PURE ETL TASK:

Task: Load daily sales aggregates
┌─────────────────────────────────────┐
│ Input:                              │
│   sales_raw partition='2024-01-15'  │
│   (immutable, never changes)        │
│                                     │
│ Process:                            │
│   CREATE TEMP TABLE temp_agg AS     │
│   SELECT date, sum(amount)          │
│   FROM sales_raw                    │
│   WHERE partition='2024-01-15'      │
│   GROUP BY date                     │
│                                     │
│ Output:                             │
│   INSERT OVERWRITE                  │ ← Key: OVERWRITE
│   sales_agg partition='2024-01-15'  │
│   SELECT * FROM temp_agg            │
└─────────────────────────────────────┘

What happens if this fails?
┌─────────────────────────────────────┐
│ • Just rerun it!                    │
│ • No risk of double-counting        │
│ • No partial state                  │
│ • Previous output completely        │
│   replaced                          │
└─────────────────────────────────────┘
```

#### The Operations Matrix

```
FORBIDDEN OPERATIONS (for pure tasks):

UPDATE:
  UPDATE table SET col = val WHERE condition
  ✗ Mutates existing data
  ✗ Not reproducible
  ✗ Can't rerun safely

APPEND:
  INSERT INTO table VALUES (...)
  ✗ Running twice = duplicate data
  ✗ Not idempotent
  ✗ Double-counting risk

DELETE:
  DELETE FROM table WHERE condition
  ✗ Destructive
  ✗ Can't undo
  ✗ Not reproducible

UPSERT (UPDATE + INSERT):
  MERGE INTO table ... WHEN MATCHED UPDATE ...
  ✗ Combines mutation problems
  ✗ Complex state management
  ✗ Hard to reason about


ALLOWED OPERATIONS:

INSERT OVERWRITE (batch):
  INSERT OVERWRITE table PARTITION(date='2024-01-15')
  SELECT * FROM source
  ✓ Idempotent
  ✓ Previous data replaced
  ✓ Safe to rerun

TRUNCATE + INSERT:
  TRUNCATE table PARTITION(date='2024-01-15');
  INSERT INTO table PARTITION(date='2024-01-15')
  SELECT * FROM source
  ✓ Idempotent (if atomic)
  ✓ Clean slate each time

CREATE OR REPLACE:
  CREATE OR REPLACE TABLE temp AS SELECT ...
  ✓ Always starts fresh
  ✓ No accumulation
```

#### Handling Transitional State

From the talk: "All transitional states within the pure-task are insulated much like locally scoped variables in pure-functions."

```
INSULATED TEMP TABLES:

BAD (shared state):
┌─────────────────────────────────────┐
│ CREATE TEMP TABLE staging AS ...   │ ← Shared name!
│                                     │
│ If two instances run:               │
│ Instance A: writes to staging      │
│ Instance B: writes to staging      │ ← Collision!
│ They interfere with each other!     │
└─────────────────────────────────────┘

GOOD (isolated state):
┌─────────────────────────────────────┐
│ task_id = generate_unique_id()      │
│ temp_name = f"staging_{task_id}"    │
│ CREATE TEMP TABLE {temp_name} AS ..│
│                                     │
│ If two instances run:               │
│ Instance A: staging_abc123         │
│ Instance B: staging_xyz789         │ ← No collision!
│ Fully independent execution         │
└─────────────────────────────────────┘
```

#### The Airflow Connection

```
In Airflow (designed for pure tasks):

┌─────────────────────────────────────┐
│     DAG: sales_pipeline             │
├─────────────────────────────────────┤
│                                     │
│  Task: load_sales_agg               │
│  ├─ Partition: 2024-01-01 → ✓      │
│  ├─ Partition: 2024-01-02 → ✓      │
│  ├─ Partition: 2024-01-03 → ✗      │ ← Failed
│  └─ Partition: 2024-01-04 → waiting │
│                                     │
│  Action: Clear 2024-01-03 and rerun │
│  Result: Safe, idempotent           │
└─────────────────────────────────────┘

Each square in Airflow tree view = one task instance
= one partition = one pure, rerunnable unit of work
```
