flow php

UNIFIED DATA PROCESSING FRAMEWORK

composer require flow-php/etl ~0.27.0

ChangelogRelease Cycle

play Try Flow Online

elephant
extract

Extracts

Read from various data sources.

arrow
transform

Transforms

Shape and optimize for your needs.

arrow
load

Loads

Store and secure in one of many available data sinks.

Examples:

Description

Batch by groups rows based on a column value, ensuring that related records stay together in the same batch. This is particularly useful when processing hierarchical data (like orders with line items) where splitting related records across batches would cause referential integrity issues during operations like DELETE+INSERT patterns.

When minSize is specified, batches are accumulated until they reach the minimum size, then yielded when a new group is encountered. This improves processing efficiency while maintaining data integrity.

When minSize is not specified, each unique group value gets its own batch.

Key features:

  • Groups are NEVER split across batches
  • Batches may exceed minSize to preserve logical grouping
  • Multiple small groups can be combined into one batch (when minSize is set)

Requirements:

  • Data must be sorted by the grouping column
composer.json
{
    "name": "flow-php/examples",
    "description": "Flow PHP - Examples",
    "license": "MIT",
    "type": "library",
    "require": {
        "flow-php/etl": "1.x-dev"
    },
    "minimum-stability": "dev"
}
code.php
<?php

declare(strict_types=1);

use function Flow\ETL\DSL\{constraint_sorted_by, data_frame, from_array, ref, to_stream};

require __DIR__ . '/vendor/autoload.php';

data_frame()
    ->read(from_array([
        ['order_id' => 1, 'item' => 'Widget', 'qty' => 2],
        ['order_id' => 1, 'item' => 'Gadget', 'qty' => 1],
        ['order_id' => 2, 'item' => 'Widget', 'qty' => 5],
        ['order_id' => 2, 'item' => 'Gizmo', 'qty' => 3],
        ['order_id' => 3, 'item' => 'Widget', 'qty' => 1],
    ]))
    ->constrain(constraint_sorted_by(ref('order_id')))
    ->batchBy('order_id')
    ->write(to_stream(__DIR__ . '/output.txt', truncate: false))
    ->run();

Output

+----------+--------+-----+
| order_id |   item | qty |
+----------+--------+-----+
|        1 | Widget |   2 |
|        1 | Gadget |   1 |
+----------+--------+-----+
2 rows
+----------+--------+-----+
| order_id |   item | qty |
+----------+--------+-----+
|        2 | Widget |   5 |
|        2 |  Gizmo |   3 |
+----------+--------+-----+
2 rows
+----------+--------+-----+
| order_id |   item | qty |
+----------+--------+-----+
|        3 | Widget |   1 |
+----------+--------+-----+
1 rows

Contributors

Join us on GitHub external resource
scroll back to top