flow php

Example: Join each

Topic: Join


Description

The main difference between join and joinEach is that joinEach is designed to handle large data frames that do not fit into memory.
Instead of loading entire data_frame into memory, joinEach expects an implementation of DataFrameFactory which will be used to load only specific rows from a source based on passed Rows.

joinEach in some cases might become more optimal choice, especially when right size is much bigger then a left side.
In that case it's better to reduce the ride side by fetching from the storage only what is relevant for the left side.

To maximize performance, you should adjust DataFrame::batchSize(int $size), the default value is 1 which might result in a large number of calls to DataFrameFactory::from method.

The rule of thumb is to set the batch size to the number of rows that DataFrameFactory can safely and quickly load into memory.

Code

<?php

declare(strict_types=1);

use function Flow\ETL\DSL\{data_frame, df, equal, int_entry, join_on, row, rows, str_entry, to_stream};
use Flow\ETL\Join\Join;
use Flow\ETL\{DataFrame, DataFrameFactory, Extractor, FlowContext, Row, Rows};

require __DIR__ . '/../../../autoload.php';

$apiExtractor = new class implements Extractor {
    public function extract(FlowContext $context) : Generator
    {
        yield rows(
            row(int_entry('id', 1), str_entry('sku', 'PRODUCT01')),
            row(int_entry('id', 2), str_entry('sku', 'PRODUCT02')),
            row(int_entry('id', 3), str_entry('sku', 'PRODUCT03'))
        );

        yield rows(
            row(int_entry('id', 10_001), str_entry('sku', 'PRODUCT10_001')),
            row(int_entry('id', 10_002), str_entry('sku', 'PRODUCT10_002')),
            row(int_entry('id', 10_003), str_entry('sku', 'PRODUCT10_003'))
        );
    }
};

$dbDataFrameFactory = new class implements DataFrameFactory {
    public function from(Rows $rows) : DataFrame
    {
        return df()->process($this->findRowsInDatabase($rows));
    }

    private function findRowsInDatabase(Rows $rows) : Rows
    {
        // Lets pretend there are 10k more entries in the DB
        $rowsFromDb = \array_map(
            static fn (int $id) : Row => row(int_entry('id', $id), str_entry('sku', 'PRODUCT' . $id)),
            \range(1, 10_000)
        );

        return (new Rows(...$rowsFromDb))
            // this would be a database SQL query in real life
            ->filter(fn (Row $row) => \in_array($row->valueOf('id'), $rows->reduceToArray('id'), true));
    }
};

data_frame()
    ->extract($apiExtractor)
    ->joinEach(
        $dbDataFrameFactory,
        join_on(equal('id', 'id')), // by using compare_all() or compare_any(), more than one entry can be used to prepare the condition
        Join::left_anti
    )
    ->write(to_stream(__DIR__ . '/output.txt', truncate: false))
    ->run();

Output

+-------+---------------+
|    id |           sku |
+-------+---------------+
| 10001 | PRODUCT10_001 |
| 10002 | PRODUCT10_002 |
| 10003 | PRODUCT10_003 |
+-------+---------------+
3 rows

Contributors

Join us on GitHub external resource
scroll back to top