# Apache Arrow and Pyarrow

***

[Docs](https://arrow.apache.org/)

[ADBC](https://arrow.apache.org/adbc/current/index.html)

[Python API](https://arrow.apache.org/docs/python/index.html)

***

***

## PyArrow: units

Here's the hierarchy from simplest to most complex units in PyArrow:

**1. Array** (simplest unit)

* A single column of data
* Example: `pa.array([1, 2, 3])`
* One data type, one-dimensional

**2. RecordBatch** (collection of arrays)

* Multiple equal-length arrays bundled together
* Guaranteed to be **contiguous in memory** (single chunk)
* Example: A batch with columns `[user_id, score, name]`
* This is your "unit of execution" for most operations
  * unit you work with most—it has the right balance of structure (schema + multiple columns) and performance (single contiguous chunk)

**3. Table** (collection of RecordBatches)

* Can contain multiple RecordBatches (chunked data)
* Not guaranteed to be contiguous
* Example: Combine 10 RecordBatches into one Table

***

## PyArrow: RecordBatch

This guide covers the essential RecordBatch patterns you might want to use in modern data engineering. The key takeaways: use **explicit schemas**, leverage **zero-copy operations**, handle **nulls** carefully, and remember that RecordBatches are immutable but cheap to reconstruct.

***

### Creating a RecordBatch and Basic Operations

A `RecordBatch` is a collection of equal-length arrays stored contiguously in memory—your fundamental unit of execution.

```python
import pyarrow as pa

# Define schema explicitly (best practice)
schema = pa.schema([
    ('user_id', pa.int64()),
    ('username', pa.string()),
    ('score', pa.float64())
])

# Create arrays
user_ids = pa.array([1, 2, 3, 4], type=pa.int64())
usernames = pa.array(['alice', 'bob', 'charlie', 'diana'], type=pa.string())
scores = pa.array([98.5, 87.3, 92.1, 95.8], type=pa.float64())

# Build the RecordBatch
batch = pa.RecordBatch.from_arrays(
    [user_ids, usernames, scores],
    schema=schema
)

print(f"Num rows: {batch.num_rows}")
print(f"Num columns: {batch.num_columns}")
print(f"Schema: {batch.schema}")

# Access columns by name or index
print(batch.column('username'))  # By name
print(batch[1])                  # By index (same result)

# Slice the batch (zero-copy operation)
subset = batch.slice(1, 2)  # Start at index 1, take 2 rows
print(subset.to_pandas())
```

***

### Struct Arrays: Nested Data in Columns

Struct Arrays let you represent nested objects (like JSON) while maintaining columnar performance.

```python
import pyarrow as pa

# Define a struct type for geolocation
location_type = pa.struct([
    ('latitude', pa.float64()),
    ('longitude', pa.float64()),
    ('city', pa.string())
])

# Create struct array
locations = pa.array([
    {'latitude': 40.7128, 'longitude': -74.0060, 'city': 'New York'},
    {'latitude': 34.0522, 'longitude': -118.2437, 'city': 'Los Angeles'},
    {'latitude': 41.8781, 'longitude': -87.6298, 'city': 'Chicago'},
], type=location_type)

# Combine with other columns
schema = pa.schema([
    ('user_id', pa.int64()),
    ('location', location_type)
])

batch = pa.RecordBatch.from_arrays(
    [pa.array([101, 102, 103]), locations],
    schema=schema
)

print(batch)

# Access nested fields directly
print(batch.column('location'))

# Extract a specific struct field
latitudes = pa.compute.struct_field(batch.column('location'), [0])  # Index 0 = latitude
print(latitudes)
```

***

### Flattening: Breaking Structs Apart

When you need to operate on individual fields (e.g., calculate on all latitudes), flatten the struct.

```python
import pyarrow as pa

# Start with the struct batch from above
location_col = batch.column('location')

# Extract individual fields
latitudes = pa.compute.struct_field(location_col, 'latitude')
longitudes = pa.compute.struct_field(location_col, 'longitude')
cities = pa.compute.struct_field(location_col, 'city')

# Create a new flattened batch
flattened_schema = pa.schema([
    ('user_id', pa.int64()),
    ('latitude', pa.float64()),
    ('longitude', pa.float64()),
    ('city', pa.string())
])

flattened_batch = pa.RecordBatch.from_arrays(
    [batch.column('user_id'), latitudes, longitudes, cities],
    schema=flattened_schema
)

print(flattened_batch.to_pandas())
```

***

### Conversion to Native Python (Handling Nulls)

When moving data back to Python objects, handle `None` values explicitly.

```python
import pyarrow as pa

# Create array with nulls
scores = pa.array([98.5, None, 92.1, None, 87.3], type=pa.float64())

batch = pa.RecordBatch.from_arrays(
    [pa.array([1, 2, 3, 4, 5]), scores],
    names=['id', 'score']
)

# Convert to Python list (None preserved)
score_list = batch.column('score').to_pylist()
print(score_list)  # [98.5, None, 92.1, None, 87.3]

# Convert to dictionaries
records = batch.to_pylist()
print(records)
# [{'id': 1, 'score': 98.5}, {'id': 2, 'score': None}, ...]

# Safe iteration with null checks
for val in batch.column('score'):
    if val.is_valid:
        print(f"Score: {val.as_py()}")
    else:
        print("Score: Missing")

# Filter out nulls before conversion
non_null_scores = pa.compute.drop_null(batch.column('score'))
print(non_null_scores.to_pylist())  # [98.5, 92.1, 87.3]
```

***

### Pro-Level Features for Data Engineering

#### Zero-Copy NumPy Views

For numerical operations, create NumPy views instead of converting to Python lists—they share the same memory.

```python
import pyarrow as pa
import numpy as np

# Create numeric array
values = pa.array([10.5, 20.3, 30.7, 40.2, 50.9], type=pa.float64())

# Zero-copy view (shares memory with Arrow)
np_view = values.to_numpy(zero_copy_only=True)
print(f"Memory address same: {np_view.__array_interface__['data'][0]}")

# Perform NumPy operations
mean = np.mean(np_view)
std = np.std(np_view)
print(f"Mean: {mean:.2f}, Std: {std:.2f}")

# With nulls, you need to handle them
values_with_nulls = pa.array([10.5, None, 30.7, None, 50.9])
# This will raise an error:
# np_view = values_with_nulls.to_numpy(zero_copy_only=True)

# Instead, get both values and mask
np_array = values_with_nulls.to_numpy()  # Nulls become np.nan
print(np_array)
```

#### Schema Evolution

RecordBatches are immutable, but you can efficiently create new ones with modified schemas.

```python
import pyarrow as pa

# Original batch
schema = pa.schema([
    ('user_id', pa.int64()),
    ('score', pa.float64())
])

batch = pa.RecordBatch.from_arrays(
    [pa.array([1, 2, 3]), pa.array([98.5, 87.3, 92.1])],
    schema=schema
)

# Add a new computed column
grades = pa.array(['A', 'B', 'A'])

new_schema = schema.append(pa.field('grade', pa.string()))
new_batch = pa.RecordBatch.from_arrays(
    list(batch.columns) + [grades],
    schema=new_schema
)

print(new_batch)

# Remove a column (create batch without it)
reduced_schema = pa.schema([('user_id', pa.int64()), ('grade', pa.string())])
reduced_batch = pa.RecordBatch.from_arrays(
    [batch.column('user_id'), grades],
    schema=reduced_schema
)

print(reduced_batch)
```

#### The Schema Object: Your Data Contract

Always define schemas explicitly. This is your pipeline's contract and catches errors early.

```python
import pyarrow as pa

# Explicit schema with metadata
schema = pa.schema([
    ('timestamp', pa.timestamp('us', tz='UTC')),
    ('sensor_id', pa.int32()),
    ('temperature', pa.float32()),
    ('humidity', pa.float32())
], metadata={
    'source': 'sensor_network_v2',
    'version': '1.0',
    'created_date': '2026-02-06'
})

# Schema validation happens automatically
try:
    batch = pa.RecordBatch.from_arrays(
        [
            pa.array([1707264000000000], type=pa.timestamp('us', tz='UTC')),
            pa.array([42], type=pa.int32()),
            pa.array([23.5], type=pa.float32()),
            pa.array(['invalid'], type=pa.string())  # Wrong type!
        ],
        schema=schema
    )
except pa.ArrowTypeError as e:
    print(f"Schema validation error: {e}")

# Correct version
batch = pa.RecordBatch.from_arrays(
    [
        pa.array([1707264000000000], type=pa.timestamp('us', tz='UTC')),
        pa.array([42], type=pa.int32()),
        pa.array([23.5], type=pa.float32()),
        pa.array([65.2], type=pa.float32())
    ],
    schema=schema
)

# Access schema metadata
print(f"Schema metadata: {batch.schema.metadata}")

# Schema comparison
other_schema = pa.schema([('timestamp', pa.timestamp('us', tz='UTC'))])
print(f"Schemas equal: {schema.equals(other_schema)}")
```

***

### Bonus: RecordBatch vs Table

```python
import pyarrow as pa

# RecordBatch: Single contiguous chunk
batch = pa.RecordBatch.from_arrays(
    [pa.array([1, 2, 3])],
    names=['id']
)

# Table: Can hold multiple RecordBatches (chunked)
table = pa.Table.from_batches([batch, batch])  # 2 chunks

print(f"Table has {table.num_rows} rows in {len(table.to_batches())} chunks")

# Convert between them
table_from_batch = pa.Table.from_batches([batch])
batch_from_table = table.to_batches()[0]
```

***

## PyArrow: Tables

**Working with Chunked Columnar Data**

Tables are your go-to structure for larger datasets. Unlike RecordBatches, Tables can contain multiple chunks and provide richer operations for data manipulation.

### 1. Creating Tables and Basic Operations

Tables can be created from RecordBatches, arrays, dictionaries, or Pandas DataFrames.

```python
import pyarrow as pa

# Method 1: From arrays (single chunk)
schema = pa.schema([
    ('user_id', pa.int64()),
    ('username', pa.string()),
    ('score', pa.float64())
])

table = pa.table({
    'user_id': [1, 2, 3, 4],
    'username': ['alice', 'bob', 'charlie', 'diana'],
    'score': [98.5, 87.3, 92.1, 95.8]
}, schema=schema)

print(f"Num rows: {table.num_rows}")
print(f"Num columns: {table.num_columns}")
print(f"Column names: {table.column_names}")

# Method 2: From RecordBatches (multiple chunks)
batch1 = pa.RecordBatch.from_arrays(
    [pa.array([1, 2]), pa.array(['alice', 'bob']), pa.array([98.5, 87.3])],
    schema=schema
)
batch2 = pa.RecordBatch.from_arrays(
    [pa.array([3, 4]), pa.array(['charlie', 'diana']), pa.array([92.1, 95.8])],
    schema=schema
)

chunked_table = pa.Table.from_batches([batch1, batch2])
print(f"Number of chunks: {chunked_table.column('user_id').num_chunks}")

# Access columns (returns ChunkedArray)
scores = table.column('score')
print(type(scores))  # ChunkedArray

# Slice the table (zero-copy)
subset = table.slice(1, 2)
print(subset.to_pandas())

# Select specific columns
user_scores = table.select(['user_id', 'score'])
print(user_scores)
```

***

### 2. Adding, Removing, and Renaming Columns

Tables support schema evolution operations that RecordBatches don't.

```python
import pyarrow as pa
import pyarrow.compute as pc

# Starting table
table = pa.table({
    'user_id': [1, 2, 3],
    'username': ['alice', 'bob', 'charlie'],
    'score': [98.5, 87.3, 92.1]
})

# Add a new column
grades = pa.array(['A', 'B', 'A'])
table_with_grade = table.append_column('grade', grades)
print(table_with_grade)

# Add a computed column
bonus_scores = pc.multiply(table.column('score'), 1.1)
table_with_bonus = table.append_column('bonus_score', bonus_scores)
print(table_with_bonus)

# Remove a column
table_no_username = table.remove_column(
    table.schema.get_field_index('username')
)
print(table_no_username)

# Rename columns
new_names = ['id', 'name', 'points']
renamed_table = table.rename_columns(new_names)
print(renamed_table.column_names)

# Set a column (replace existing)
new_scores = pa.array([100.0, 90.0, 95.0])
table_updated = table.set_column(
    table.schema.get_field_index('score'),
    'score',
    new_scores
)
print(table_updated)
```

***

### 3. Filtering and Sorting

Tables support powerful filtering and sorting operations using PyArrow compute functions.

```python
import pyarrow as pa
import pyarrow.compute as pc

table = pa.table({
    'user_id': [1, 2, 3, 4, 5],
    'username': ['alice', 'bob', 'charlie', 'diana', 'eve'],
    'score': [98.5, 87.3, 92.1, 95.8, 89.2],
    'active': [True, False, True, True, False]
})

# Filter rows (using boolean mask)
high_scorers = table.filter(pc.greater(table.column('score'), 90))
print(high_scorers.to_pandas())

# Multiple conditions
active_high_scorers = table.filter(
    pc.and_(
        pc.greater(table.column('score'), 90),
        pc.equal(table.column('active'), True)
    )
)
print(active_high_scorers.to_pandas())

# Sort by single column
sorted_by_score = table.sort_by('score')
print(sorted_by_score.to_pandas())

# Sort descending
sorted_desc = table.sort_by([('score', 'descending')])
print(sorted_desc.to_pandas())

# Sort by multiple columns
table_with_dept = table.append_column('dept', pa.array(['eng', 'eng', 'sales', 'eng', 'sales']))
sorted_multi = table_with_dept.sort_by([
    ('dept', 'ascending'),
    ('score', 'descending')
])
print(sorted_multi.to_pandas())
```

***

### 4. Grouping and Aggregation

Tables can be grouped and aggregated using PyArrow's dataset API.

```python
import pyarrow as pa
import pyarrow.compute as pc

table = pa.table({
    'department': ['eng', 'eng', 'sales', 'sales', 'eng', 'sales'],
    'employee': ['alice', 'bob', 'charlie', 'diana', 'eve', 'frank'],
    'salary': [120000, 115000, 95000, 98000, 125000, 92000],
    'years': [5, 3, 7, 4, 8, 2]
})

# Group by and aggregate
import pyarrow.dataset as ds

# Convert to dataset for groupby operations
dataset = ds.dataset(table)

# Aggregate using group_by
result = dataset.to_table().group_by('department').aggregate([
    ('salary', 'mean'),
    ('salary', 'sum'),
    ('years', 'max')
])
print(result.to_pandas())

# Manual aggregation per group using filters
eng_salaries = table.filter(pc.equal(table.column('department'), 'eng')).column('salary')
sales_salaries = table.filter(pc.equal(table.column('department'), 'sales')).column('salary')

print(f"Engineering avg: {pc.mean(eng_salaries).as_py():.2f}")
print(f"Sales avg: {pc.mean(sales_salaries).as_py():.2f}")
```

***

### 5. Combining Tables

Tables can be concatenated vertically (adding rows) or joined horizontally.

```python
import pyarrow as pa

# Vertical concatenation (stacking rows)
table1 = pa.table({
    'id': [1, 2],
    'name': ['alice', 'bob']
})

table2 = pa.table({
    'id': [3, 4],
    'name': ['charlie', 'diana']
})

combined = pa.concat_tables([table1, table2])
print(combined.to_pandas())

# Schemas must match
try:
    table3 = pa.table({'id': [5], 'username': ['eve']})  # Different column name
    bad_concat = pa.concat_tables([table1, table3])
except pa.ArrowInvalid as e:
    print(f"Schema mismatch: {e}")

# Promote schemas if needed
combined_promoted = pa.concat_tables([table1, table3], promote=True)
print(combined_promoted.to_pandas())  # Creates nulls for missing columns

# Horizontal join (add columns from another table)
extra_data = pa.table({
    'score': [98.5, 87.3]
})

# Note: PyArrow doesn't have built-in horizontal concat
# You need to add columns one at a time
result = table1
for col_name in extra_data.column_names:
    result = result.append_column(col_name, extra_data.column(col_name))

print(result.to_pandas())
```

***

### 6. Working with Chunked Columns

Tables contain ChunkedArrays, which can have multiple underlying chunks.

```python
import pyarrow as pa

# Create table from multiple batches
batch1 = pa.RecordBatch.from_arrays(
    [pa.array([1, 2]), pa.array([10, 20])],
    names=['id', 'value']
)
batch2 = pa.RecordBatch.from_arrays(
    [pa.array([3, 4, 5]), pa.array([30, 40, 50])],
    names=['id', 'value']
)

table = pa.Table.from_batches([batch1, batch2])

# Access chunked array
values = table.column('value')
print(f"Type: {type(values)}")  # ChunkedArray
print(f"Num chunks: {values.num_chunks}")
print(f"Chunk lengths: {[len(chunk) for chunk in values.chunks]}")

# Iterate over chunks
for i, chunk in enumerate(values.chunks):
    print(f"Chunk {i}: {chunk.to_pylist()}")

# Combine chunks into single array (can be expensive for large data)
combined = values.combine_chunks()
print(f"Combined type: {type(combined)}")  # Array
print(f"Combined: {combined.to_pylist()}")

# Convert table to RecordBatch (combines chunks)
single_batch = table.combine_chunks()
print(f"Type: {type(single_batch)}")  # Table with 1 chunk per column

# Or convert to actual RecordBatch
for batch in table.to_batches():
    print(f"Batch: {batch}")
```

***

### 7. Converting Between Formats

Tables provide rich conversion capabilities to and from other formats.

```python
import pyarrow as pa
import pandas as pd

# Create table
table = pa.table({
    'id': [1, 2, 3],
    'name': ['alice', 'bob', 'charlie'],
    'score': [98.5, 87.3, 92.1]
})

# To Pandas
df = table.to_pandas()
print(type(df))

# From Pandas (with explicit schema)
df2 = pd.DataFrame({
    'id': [4, 5],
    'name': ['diana', 'eve'],
    'score': [95.8, 89.2]
})
table2 = pa.Table.from_pandas(df2, schema=table.schema)

# To Python dictionaries
records = table.to_pydict()  # Column-oriented
print(records)  # {'id': [1,2,3], 'name': [...], ...}

pylist = table.to_pylist()  # Row-oriented
print(pylist)  # [{'id': 1, 'name': 'alice', ...}, ...]

# To batches
for batch in table.to_batches(max_chunksize=2):
    print(f"Batch with {batch.num_rows} rows")

# From batches
new_table = pa.Table.from_batches(table.to_batches())
```

***

### 8. Pro Pattern: Lazy Table Operations

Chain operations efficiently without materializing intermediate results.

```python
import pyarrow as pa
import pyarrow.compute as pc

# Large table simulation
table = pa.table({
    'user_id': range(1000),
    'score': [float(i % 100) for i in range(1000)],
    'active': [bool(i % 2) for i in range(1000)],
    'department': ['eng' if i % 3 == 0 else 'sales' for i in range(1000)]
})

# Chain operations (each is lazy until materialized)
result = (
    table
    .filter(pc.equal(table.column('active'), True))
    .filter(pc.greater(table.column('score'), 50))
    .select(['user_id', 'score', 'department'])
    .sort_by([('score', 'descending')])
    .slice(0, 10)
)

print(result.to_pandas())

# For very large datasets, use datasets API
import pyarrow.dataset as ds

# This allows predicate pushdown and lazy evaluation
dataset = ds.dataset(table)
filtered = dataset.to_table(
    filter=(pc.field('active') == True) & (pc.field('score') > 50),
    columns=['user_id', 'score', 'department']
)
```

***

#### Key Differences: RecordBatch vs Table

```python
import pyarrow as pa

# RecordBatch: Single chunk, immutable
batch = pa.RecordBatch.from_arrays(
    [pa.array([1, 2, 3])],
    names=['id']
)

# Table: Multiple chunks, more operations
table = pa.table({'id': [1, 2, 3]})

print(f"Batch can add columns: No (immutable)")
print(f"Table can add columns: Yes")
print(f"Batch guaranteed contiguous: Yes")
print(f"Table guaranteed contiguous: No (can have chunks)")
print(f"Batch supports filter/sort: No (must convert to Table)")
print(f"Table supports filter/sort: Yes")
```

***

**The pattern:** Use RecordBatches for low-level, performance-critical operations where you need memory contiguity. Use Tables for higher-level data manipulation, transformations, and when working with larger-than-memory datasets split across chunks.

***
