flow php

UNIFIED DATA PROCESSING FRAMEWORK

composer require flow-php/etl ^0.10.0

Changelog

elephant
extract

Extracts

Read from various data sources.

arrow
transform

Transforms

Shape and optimize for your needs.

arrow
load

Loads

Store and secure in one of many available data sinks.

Definition


feature source
join_on(Comparison|array $comparisons, string $join_prefix) : Expression

Usage Examples


Example: Join - Join

<?php

declare(strict_types=1);

use function Flow\ETL\DSL\{data_frame, from_array, join_on, to_stream};
use Flow\ETL\Join\{Join};

require __DIR__ . '/../../../autoload.php';

$users = [
    ['id' => 1, 'name' => 'John'],
    ['id' => 2, 'name' => 'Jane'],
    ['id' => 3, 'name' => 'Doe'],
    ['id' => 4, 'name' => 'Bruno'],
];

$emails = [
    ['id' => 2, 'email' => '[email protected]'],
    ['id' => 3, 'email' => '[email protected]'],
    ['id' => 4, 'email' => '[email protected]'],
];

data_frame()
    ->read(from_array($users))
    ->join(
        data_frame()->read(from_array($emails)),
        join_on(['id' => 'id'], join_prefix: 'joined_'),
        Join::left
    )
    ->collect()
    ->write(to_stream(__DIR__ . '/output.txt', truncate: false))
    ->run();

Example: Join - Join each

<?php

declare(strict_types=1);

use function Flow\ETL\DSL\{data_frame, df, equal, int_entry, join_on, row, rows, str_entry, to_stream};
use Flow\ETL\Join\Join;
use Flow\ETL\{DataFrame, DataFrameFactory, Extractor, FlowContext, Row, Rows};

require __DIR__ . '/../../../autoload.php';

$apiExtractor = new class implements Extractor {
    public function extract(FlowContext $context) : Generator
    {
        yield rows(
            row(int_entry('id', 1), str_entry('sku', 'PRODUCT01')),
            row(int_entry('id', 2), str_entry('sku', 'PRODUCT02')),
            row(int_entry('id', 3), str_entry('sku', 'PRODUCT03'))
        );

        yield rows(
            row(int_entry('id', 10_001), str_entry('sku', 'PRODUCT10_001')),
            row(int_entry('id', 10_002), str_entry('sku', 'PRODUCT10_002')),
            row(int_entry('id', 10_003), str_entry('sku', 'PRODUCT10_003'))
        );
    }
};

$dbDataFrameFactory = new class implements DataFrameFactory {
    public function from(Rows $rows) : DataFrame
    {
        return df()->process($this->findRowsInDatabase($rows));
    }

    private function findRowsInDatabase(Rows $rows) : Rows
    {
        // Lets pretend there are 10k more entries in the DB
        $rowsFromDb = \array_map(
            static fn (int $id) : Row => row(int_entry('id', $id), str_entry('sku', 'PRODUCT' . $id)),
            \range(1, 10_000)
        );

        return (new Rows(...$rowsFromDb))
            // this would be a database SQL query in real life
            ->filter(fn (Row $row) => \in_array($row->valueOf('id'), $rows->reduceToArray('id'), true));
    }
};

data_frame()
    ->extract($apiExtractor)
    ->joinEach(
        $dbDataFrameFactory,
        join_on(equal('id', 'id')), // by using compare_all() or compare_any(), more than one entry can be used to prepare the condition
        Join::left_anti
    )
    ->write(to_stream(__DIR__ . '/output.txt', truncate: false))
    ->run();

Contributors

Join us on GitHub external resource
scroll back to top