flow php

Partitioning

Partitioning divides data into logical groups based on column values, enabling more efficient processing of large datasets and reducing memory usage.

Basic Partitioning

partitionBy() - Partition by columns

<?php

use function Flow\ETL\DSL\{data_frame, from_array, col, to_parquet};

data_frame()
    ->read(from_array([
        ['date' => '2024-01-01', 'department' => 'sales', 'amount' => 100],
        ['date' => '2024-01-01', 'department' => 'marketing', 'amount' => 200],
        ['date' => '2024-01-02', 'department' => 'sales', 'amount' => 150],
        ['date' => '2024-01-02', 'department' => 'marketing', 'amount' => 250],
    ]))
    ->partitionBy('date') // Partition by date
    ->write(to_parquet(__DIR__ . '/output/sales.parquet'))
    ->run();

File structure:

output/
├── date=2024-01-01/
│   └── sales.parquet       # Contains 2 rows (sales + marketing for Jan 1)
└── date=2024-01-02/
    └── sales.parquet       # Contains 2 rows (sales + marketing for Jan 2)

Each partition creates a separate directory with the format column=value, and data files are written inside those directories.

Multi-Column Partitioning

<?php

data_frame()
    ->read(from_array([
        ['date' => '2024-01-01', 'department' => 'sales', 'amount' => 100],
        ['date' => '2024-01-01', 'department' => 'marketing', 'amount' => 200],
        ['date' => '2024-01-02', 'department' => 'sales', 'amount' => 150],
        ['date' => '2024-01-02', 'department' => 'marketing', 'amount' => 250],
    ]))
    ->partitionBy('date', 'department') // Partition by date AND department
    ->write(to_parquet(__DIR__ . '/output/sales.parquet'))
    ->run();

File structure with nested partitions:

output/
├── date=2024-01-01/
│   ├── department=sales/
│   │   └── sales.parquet       # 1 row: sales on Jan 1
│   └── department=marketing/
│       └── sales.parquet       # 1 row: marketing on Jan 1
└── date=2024-01-02/
    ├── department=sales/
    │   └── sales.parquet       # 1 row: sales on Jan 2
    └── department=marketing/
        └── sales.parquet       # 1 row: marketing on Jan 2

Each partition column creates an additional nesting level in the directory hierarchy.

Dropping Partitions

<?php

$dataFrame = data_frame()
    ->read($extractor)
    ->partitionBy('date')
    ->map($transformation)
    ->dropPartitions() // Remove partition information but keep data
    ->write($loader)
    ->run();

// Drop partitions AND partition columns
$dataFrame
    ->partitionBy('date')
    ->dropPartitions(dropPartitionColumns: true) // Also removes 'date' column
    ->run();

Performance Considerations

Choosing Partition Columns

<?php

// Good partitioning - balanced partition sizes
$dataFrame->partitionBy('date'); // Assuming data is spread across dates

// Bad partitioning - unbalanced partitions
$dataFrame->partitionBy('id'); // If IDs are unique, creates many tiny partitions

// Good partitioning - moderate cardinality
$dataFrame->partitionBy('department'); // Assuming reasonable number of departments

Save Modes with Partitioning

When writing partitioned data, the save mode determines how existing partition directories are handled.

Overwrite Mode

<?php

use function Flow\ETL\DSL\{data_frame, from_array, overwrite, ref};
use function Flow\ETL\Adapter\CSV\to_csv;

data_frame()
    ->read(from_array([
        ['date' => '2024-01-01', 'amount' => 100],
        ['date' => '2024-01-02', 'amount' => 200],
    ]))
    ->partitionBy(ref('date'))
    ->mode(overwrite())
    ->write(to_csv(__DIR__ . '/output/sales.csv'))
    ->run();

Behavior:

  • Removes ALL files within partition directories being written to
  • Partitions NOT in the current dataset are preserved
  • Running twice with the same partition values replaces the first write completely

Common pitfall: If you write two separate DataFrames to the same partitions using overwrite(), the second write deletes data from the first:

<?php

// First write - creates date=2024-01-01/sales.csv with amount=100
data_frame()
    ->read(from_array([['date' => '2024-01-01', 'amount' => 100]]))
    ->partitionBy(ref('date'))
    ->mode(overwrite())
    ->write(to_csv(__DIR__ . '/output/sales.csv'))
    ->run();

// Second write - DELETES the 100 and writes 200
data_frame()
    ->read(from_array([['date' => '2024-01-01', 'amount' => 200]]))
    ->partitionBy(ref('date'))
    ->mode(overwrite())
    ->write(to_csv(__DIR__ . '/output/sales.csv'))
    ->run();

// Result: date=2024-01-01/sales.csv contains ONLY amount=200

To combine data from multiple sources into the same partition, use append() mode or merge data before writing.

Append Mode

<?php

data_frame()
    ->read(from_array([['date' => '2024-01-01', 'amount' => 100]]))
    ->partitionBy(ref('date'))
    ->mode(append())
    ->write(to_csv(__DIR__ . '/output/sales.csv'))
    ->run();

Behavior:

  • Creates new files with randomized suffixes in existing partition directories
  • Does not remove existing files
  • Multiple runs accumulate files (may cause duplicates)

Ignore Mode

<?php

data_frame()
    ->read(from_array([['date' => '2024-01-01', 'amount' => 100]]))
    ->partitionBy(ref('date'))
    ->mode(ignore())
    ->write(to_csv(__DIR__ . '/output/sales.csv'))
    ->run();

Behavior:

  • Skips writing if partition directory already exists
  • No error thrown, silently continues

Exception If Exists Mode (Default)

<?php

data_frame()
    ->read(from_array([['date' => '2024-01-01', 'amount' => 100]]))
    ->partitionBy(ref('date'))
    ->write(to_csv(__DIR__ . '/output/sales.csv')) // Default mode
    ->run();

Behavior:

  • Throws RuntimeException if any partition path already exists
  • Safest option to prevent accidental overwrites

Reading Partitioned Data

Read partitioned data using glob patterns to match partition directories:

<?php

use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\DSL\{data_frame, to_output};

data_frame()
    ->read(from_csv(__DIR__ . '/output/date=*/*.csv'))
    ->write(to_output())
    ->run();

Partition Pruning

Skip entire partitions without reading their contents using filterPartitions():

<?php

data_frame()
    ->read(from_csv(__DIR__ . '/output/date=*/department=*/*.csv'))
    ->filterPartitions(ref('date')->greaterThanEqual(lit('2024-01-01')))
    ->write(to_output())
    ->run();

Unlike filter() which reads all data then discards non-matching rows, filterPartitions() evaluates partition metadata first and only reads matching partitions - significantly improving performance for large datasets.

Path Partitions

Extract partition metadata without reading file contents using from_path_partitions():

<?php

use function Flow\ETL\DSL\{data_frame, from_path_partitions, to_output};

data_frame()
    ->read(from_path_partitions(__DIR__ . '/output/date=*/department=*/*.csv'))
    ->write(to_output())
    ->run();

// Output includes 'path' and 'partitions' columns

Useful for discovering available partitions or building file manifests before processing data.


Contributors

Join us on GitHub external resource
scroll back to top