flow php

UNIFIED DATA PROCESSING FRAMEWORK

composer require flow-php/etl ~0.35.1

ChangelogRelease Cycle

play Try Playground

elephant
extract

Extracts

Read from various data sources.

arrow
transform

Transforms

Shape and optimize for your needs.

arrow
load

Loads

Store and secure in one of many available data sinks.

Examples:

Description

Divide large datasets into smaller, organized parts based on column values. Flow uses Hive partitioning convention, creating a directory structure where each folder represents a partition value.

output
├── color=blue
│   ├── sku=PRODUCT01
│   │   └── products.csv
│   └── sku=PRODUCT02
│       └── products.csv
├── color=green
│   ├── sku=PRODUCT01
│   │   └── products.csv
│   ├── sku=PRODUCT02
│   │   └── products.csv
│   └── sku=PRODUCT03
│       └── products.csv
└── color=red
    ├── sku=PRODUCT01
    │   └── products.csv
    ├── sku=PRODUCT02
    │   └── products.csv
    └── sku=PRODUCT03
        └── products.csv

Write partitioned data with overwrite mode. When writing, ALL files within each affected partition directory are removed and replaced. Partitions NOT in the current dataset remain untouched.

Documentation

Code

play
<?php

declare(strict_types=1);

use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\DSL\{data_frame, from_array, overwrite, ref};

require __DIR__ . '/vendor/autoload.php';

data_frame()
    ->read(from_array(
        [
            ['id' => 1, 'color' => 'red', 'sku' => 'PRODUCT01'],
            ['id' => 2, 'color' => 'red', 'sku' => 'PRODUCT02'],
            ['id' => 3, 'color' => 'red', 'sku' => 'PRODUCT03'],
            ['id' => 4, 'color' => 'green', 'sku' => 'PRODUCT01'],
            ['id' => 5, 'color' => 'green', 'sku' => 'PRODUCT02'],
            ['id' => 6, 'color' => 'green', 'sku' => 'PRODUCT03'],
            ['id' => 7, 'color' => 'blue', 'sku' => 'PRODUCT01'],
            ['id' => 8, 'color' => 'blue', 'sku' => 'PRODUCT02'],
        ]
    ))
    ->partitionBy(ref('color'), ref('sku'))
    ->mode(overwrite())
    ->write(to_csv(__DIR__ . '/output/products.csv'))
    ->run();

Contributors

Join us on GitHub external resource
scroll back to top