flow php

UNIFIED DATA PROCESSING FRAMEWORK

composer require flow-php/etl ^0.10.0

Changelog

elephant
extract

Extracts

Read from various data sources.

arrow
transform

Transforms

Shape and optimize for your needs.

arrow
load

Loads

Store and secure in one of many available data sinks.

Edit external resource

Data Processing in PHP


Let's imagine a situation where we have to import a large amount of data from a file to our database.
Typically, the file format would be some variation of CSV or maybe even an Excel file.
The purpose of those kinds of operations is to allow system users to upload data in bulk.

In the simple scenario, our database structure will more or less match the import file data structure.
Some columns might be named differently, but those are minor obstacles that can be very easily addressed.
The real challenge starts when we need to fully convert the input format to match our database. Let me give you an example.

Let’s imagine a system that allows users to upload user delivery addresses.
Each user needs to have at least one address, and each address comes in the following format:

{
  "id": "uuid",
  "country": "string{2}",
  "state": "string{2}",
  "zip": "string{9}",
  "city": "string{256}",
  "address_1": "string{256}",
  "address_2": "string{256}",
  "address_3": "string{256}"
}

For the sake of this article, our database has only two tables.
The first one table is called users and it has the following columns:

The second one is called user_addresses and it has the following structure:

Doesn’t look like a complicated task, right?

Well, those are famous last words 😅

What if business users would like to upload files larger than our available amount of RAM?

PHP can process large amounts of data, as demonstrated in an article by Florian Engelhardt

However, his solution is optimized for speed, not memory consumption.
To handle large data in PHP with limited memory, use Generators (the foundation of Flow PHP).
Generators allow you to process data in chunks without loading the entire file into memory.

Similarly, fgets, fgetcsv, and fopen/fclose can be used.
Large data processing in PHP is possible, but it's not easy.

Languages like Python provide tools that hide this complexity behind easier-to-digest abstractions, making data processing more accessible.

Flow PHP aims to reduce data processing complexity and standardize the approach, considering resource consumption.
OK, but let’s get back to our example.
To make it easier to create those import files for the user, the file's expected data structure looks like this:

In order to import that file's content into our database, we need to take the following steps:

OK, but what about invalid rows?
There are many different strategies; sometimes the entire import pipeline should be stopped when the
first invalid row is spotted. In this case, we can just import all valid rows and filter out invalid ones. But to provide a decent UX, we are going to save filtered-out rows in a separate file so the user can fix them.

Step #1 - Reading file


No surprises here, we just need to use from_csv which will do its best to automatically detect separator, enclosure, and escape which can also be set manually.

Flow by design yields one row at a time; this behavior can be changed through DataFrame::batchSize(int $batchSize).

Step #2 - Validation


Here things are becoming a bit more tricky since we need to approach it in a similar way we would approach processing data directly in Excel.
One of the techniques we can use is creating a temporary column "valid" with a value set by default to "true".
Then we should update that column's value to false whenever the column we are validating is not satisfying our rules.

Let's take a look at email validation; here is what we want to check:

ref('user_email')->isType(type_string()) // valid string
ref('user_email')->size()->between(lit(1), lit(256)) // length

In order to combine those two conditions we can use and operator:

ref('user_email')->isType(type_string())->and(ref('user_email')->size()->between(lit(1), lit(256)))

Now we need to check if this condition is true and when it’s true, just leave the value of the column "valid" as is, otherwise we need to set it to "false".

<?php

use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\type_string;
use function Flow\ETL\DSL\when;

df()
    ->withEntry(
        'valid',
        when(ref('user_email')->isType(type_string())
            ->and(ref('user_email')->size()->between(1, 256)), ref('valid'), lit(false))
    );

This is how all our validation rules are going to look like:

<?php

use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\type_null;
use function Flow\ETL\DSL\type_string;
use function Flow\ETL\DSL\type_uuid;
use function Flow\ETL\DSL\when;

df()
    ->withEntry('valid', lit(true))
    ->withEntry('valid', when(ref('id')->isType(type_uuid(true)), ref('valid'), lit(false)))
    ->withEntry('valid', when(ref('user_email')->isType(type_string())->and(ref('user_email')->size()->between(1, 256)), ref('valid'), lit(false)))
    ->withEntry('valid', when(ref('country')->isType(type_string())->and(ref('country')->size()->equals(2)), ref('valid'), lit(false)))
    ->withEntry('valid', when(ref('state')->isType(type_string())->and(ref('state')->size()->equals(2)), ref('valid'), lit(false)))
    ->withEntry('valid', when(ref('city')->isType(type_string())->and(ref('city')->size()->between(1, 256)), ref('valid'), lit(false)))
    ->withEntry('valid', when(ref('zip')->isType(type_string())->and(ref('zip')->size()->between(4, 12)), ref('valid'), lit(false)))
    ->withEntry('valid', when(ref('address_1')->isType(type_string())->and(ref('address_1')->size()->between(1, 256)), ref('valid'), lit(false)))
    ->withEntry('valid', when(ref('address_2')->isType(type_null())->or(ref('address_2')->size()->between(1, 256)), ref('valid'), lit(false)))
    ->withEntry('valid', when(ref('address_3')->isType(type_null())->or(ref('address_3')->size()->between(1, 256)), ref('valid'), lit(false)))
;
💡 It’s usually a good idea to extract such logic into a standalone transformation. This way our validation logic can be tested in isolation.
<?php

use Flow\ETL\DataFrame;
use Flow\ETL\Transformation;
use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\type_null;
use function Flow\ETL\DSL\type_string;
use function Flow\ETL\DSL\type_uuid;
use function Flow\ETL\DSL\when;

class Validation implements Transformation
{
    public function transform(DataFrame $dataFrame): DataFrame
    {
        return $dataFrame
            ->withEntry('valid', lit(true))
            ->withEntry('valid', when(ref('id')->isType(type_uuid(true)), ref('valid'), lit(false)))
            ->withEntry('valid', when(ref('user_email')->isType(type_string())->and(ref('user_email')->size()->between(1, 256)), ref('valid'), lit(false)))
            ->withEntry('valid', when(ref('country')->isType(type_string())->and(ref('country')->size()->equals(2)), ref('valid'), lit(false)))
            ->withEntry('valid', when(ref('state')->isType(type_string())->and(ref('state')->size()->equals(2)), ref('valid'), lit(false)))
            ->withEntry('valid', when(ref('city')->isType(type_string())->and(ref('city')->size()->between(1, 256)), ref('valid'), lit(false)))
            ->withEntry('valid', when(ref('zip')->isType(type_string())->and(ref('zip')->size()->between(4, 12)), ref('valid'), lit(false)))
            ->withEntry('valid', when(ref('address_1')->isType(type_string())->and(ref('address_1')->size()->between(1, 256)), ref('valid'), lit(false)))
            ->withEntry('valid', when(ref('address_2')->isType(type_null())->or(ref('address_2')->size()->between(1, 256)), ref('valid'), lit(false)))
            ->withEntry('valid', when(ref('address_3')->isType(type_null())->or(ref('address_3')->size()->between(1, 256)), ref('valid'), lit(false)))
        ;
    }
}

df()
    ->read(from_csv(__DIR__ . '/import.csv'))
    ->with(new Validation())
;

At this point, all rows that go through our pipeline get that additional column "valid", which can be used to filter out invalid rows, but before we do that, we should first write all invalid rows to a separate file.

Flow has a dedicated loader which can write only a subset of the dataset based on the provided condition. It’s called to_branch().

<?php

use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\to_branch;

df()
    ->read(from_csv(__DIR__ . '/import.csv'))
    ->with(new Validation())
    ->write(
        to_branch(
            ref('valid')->isFalse(),
            to_csv(__DIR__ . '/invalid_rows_' . time() . '.csv'),
        )
    )
    ->filter(ref('valid')->isTrue())
    ->drop('valid')
;
💡 Try clicking function names in the code examples!

All invalid rows are going to be saved separately; everything else we can consider valid.

Obviously, those validation rules can be way more sophisticated, to the point where each field can be validated in a separate transformation object.

When operations are repeatable, it might be a good idea to extract them and make them flexible; for example, we can easily create a string length validation Transformation.

Below you can find an example of simple StringLengthValidation implemented as a Transformation object.

<?php

use Flow\ETL\DataFrame;
use Flow\ETL\Row\EntryReference;
use Flow\ETL\Transformation;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\type_string;
use function Flow\ETL\DSL\when;

final readonly class StringLengthValidation implements Transformation
{
    public function __construct(
        private EntryReference $reference,
        private int $min,
        private int $max,
        private string $outputEntry = 'valid'
    ) {
    }

    public function transform(DataFrame $dataFrame): DataFrame
    {
        return $dataFrame
            ->withEntry(
                $this->outputEntry,
                when(
                    ref($this->reference)->isType(type_string())
                        ->and(ref($this->reference)->size()->between($this->min, $this->max)),
                    ref($this->outputEntry),
                    lit(false)
                )
            )
        ;
    }
}

Dividing transformation logic into smaller parts is pretty much the only reliable way to properly test edge cases in data processing pipelines.

Step #3 - Finding user_id based on user_email

Once we confirmed that our data is valid, we can move on to figuring out to which user given address belongs. We already have a user email in the import file; what we are missing is user_id. User identifiers are stored in the table `users` in our database, so our next step is to bring those IDs under the `user_id` column.

In a typical scenario, we would try to collect emails from the import file into reasonable batches and query a database to return us the user_id’s those emails belong to.

In order to not overcomplicate our example, we are going to assume that all emails exist in the database.

We are going to use DataFrame::joinEach(), which is slightly different than DataFrame::join. The only difference between those two joins is that join will try to execute the join on the entire dataset before moving forward. This means that if our dataset has 2 million rows, Flow would first go through them, executing the join on each row before it can move forward.

Additionally, we are executing a join between two different data storages: database and file. In order to use DataFrame::join, we would need to either first load everything to the database and then run an SQL query with a join or we could dump the database table to a CSV file and then join between files.

This is where joinEach comes into play. The goal of this join is to execute a join operation on a subset of the dataset.
First thing we need to do is to define our batch size. As mentioned previously, all extractors by design will yield only one row at a time. Using batchSize, we can change that behavior.

<?php

use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\to_branch;

$report = df()
    ->read(from_csv(__DIR__ . '/import.csv'))
    ->with(new Validation())
    ->write(
        to_branch(
            ref('valid')->isFalse(),
            to_csv(__DIR__ . '/invalid_rows_' . time() . '.csv'),
        )
    )
    ->filter(ref('valid')->isTrue())
    ->drop('valid')
    ->batchSize(100)
;

In the example above, everything that will happen after ->batchSize(100) will happen on 100 rows (or less when there is not enough). So now when we use joinEach, it will perform a join operation at 100 rows at a time. After joining them, it will proceed with other transformations to start over again until the whole import file is processed.

JoinEach is a very specific operation; it requires us to create a new DataFrame for each batch, that’s why we need to implement the DataFrameFactory interface.

<?php

use Doctrine\DBAL\ArrayParameterType;
use Doctrine\DBAL\Connection;
use Flow\ETL\DataFrame;
use Flow\ETL\DataFrameFactory;
use Flow\ETL\Rows;
use function Flow\ETL\Adapter\Doctrine\from_dbal_query;
use function Flow\ETL\DSL\df;

final readonly class UserIdJoinDataFrameFactory implements DataFrameFactory
{

    public function __construct(private Connection $connection)
    {
    }

    public function from(Rows $rows): DataFrame
    {
        return df()->read(
            from_dbal_query(
                $this->connection,
                "SELECT id as user_id, email as user_email FROM users WHERE email IN (:emails)",
                ['emails' => $rows->reduceToArray(ref('user_email'))],
                ['emails' => ArrayParameterType::STRING]
            )
        );
    }
}

The very last step is to define the join expression (how to join two datasets, based on what condition).

<?php

use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\join_on;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\to_branch;

$report = df()
    ->read(from_csv(__DIR__ . '/import.csv'))
    ->with(new Validation())
    ->write(
        to_branch(
            ref('valid')->isFalse(),
            to_csv(__DIR__ . '/invalid_rows_' . time() . '.csv'),
        )
    )
    ->filter(ref('valid')->isTrue())
    ->drop('valid')
    ->batchSize(100)
    ->joinEach(
        new UserIdJoinDataFrameFactory($connection),
        join_on(['user_email' => 'user_email'])
    )
    ->drop('user_email')
;

That’s it, lets now use `to_output()` loader to display our dataset after join operation

+----------------------+---------+-------+------------+-------------------+-----------------+-----------+-----------+----------------------+
|                   id | country | state |        zip |              city |       address_1 | address_2 | address_3 |              user_id |
+----------------------+---------+-------+------------+-------------------+-----------------+-----------+-----------+----------------------+
| 36a1192d-1c8e-31fc-9 |      US |    AR |      44389 |       Keelingbury |     Noemy Burgs |       359 |           | 01a3e823-2a69-3432-9 |
| 3d763134-aec2-3a75-8 |      US |    FL | 71341-9244 |          Krisside |       Oren Road |       511 |           | 01b0abd3-7217-332c-9 |
| b03c6357-8451-3320-9 |      US |    ID |      65146 |      Bernardoview |   Tevin Harbors |      4344 |           | 01b1a371-1c70-320a-b |
| 476cf6ba-2aac-3b58-9 |      US |    LA | 64572-0907 |        Lake Estel |    Harber Ranch |       436 |  Motorway | 01c45346-3fb8-3edf-8 |
| b37710e0-26dc-343c-9 |      US |    MS | 83824-7369 | North Carissafort |      Bulah Dale |      8097 |           | 01c45346-3fb8-3edf-8 |
| 03eaaf10-644d-3d92-9 |      US |    OH |      71386 |      Cameronshire |     Oren Drives |      3828 |           | 01c64f97-9ec1-3b55-9 |
| a78970b7-4964-3f91-b |      US |    MA |      79319 |  Port Rodgershire |     Parker Mall |       873 |           | 01c64f97-9ec1-3b55-9 |
| cc73778c-cb3a-305a-8 |      US |    NE |      63187 |        Parkerfort |  Cassandre Ways |      4633 |           | 01c64f97-9ec1-3b55-9 |
|                      |      US |    NH |      96807 |    West Catherine | Schmeler Valley |      9834 |     Mills | 01ce6d03-6b8e-3b8a-9 |
| 2419b64c-db25-38af-9 |      US |    NM |      74053 |         Carolstad |        Rory Row |      8019 |  Crossing | 01ce6d03-6b8e-3b8a-9 |
+----------------------+---------+-------+------------+-------------------+-----------------+-----------+-----------+----------------------+

As we can see, the user_id column was added just as expected. Normally, here we would need to execute another validation and append invalid rows to our broken rows file, but let's skip that part and move to loading data into the database.

Step #4 - Insert an address to a database or update it, if it’s already there

Now, when we know which address belongs to which user, it’s time to update (insert or update) those addresses into the user_addresses database table.
Oh wait, there is one more thing.

If the address id is null (empty), it means that we are not updating an existing address but instead adding a new one.
So in this case, we just need to generate a new unique identifier, but only when id is null; otherwise, keep it as is.

->withEntry('id', when(ref('id')->isNull(), lit(Uuid::uuid4()->toString()), ref('id')))

Now we are ready to update our addresses into a database!
Ready? Normally it’s not the easiest to perform bulk upsert through Doctrine DBAL.
Luckily, Flow comes with a doctrine-dbal-bulk library so at the end of the day, here is how it looks:

<?php

use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
use function Flow\ETL\DSL\df;

df()
    ->write(
        to_dbal_table_insert(
            $connection,
            'user_addresses',
            [
                'conflict_columns' => ['id']
            ]
        )
    )
;

And finally, we got to the end. Below is the full code (without transformations that we extracted from this pipeline).

<?php

use Ramsey\Uuid\Uuid;
use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\join_on;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\to_branch;
use function Flow\ETL\DSL\when;

$report = df()
    ->read(from_csv(__DIR__ . '/import.csv'))
    ->with(new Validation())
    ->write(
        to_branch(
            ref('valid')->isFalse(),
            to_csv(__DIR__ . '/invalid_rows_' . time() . '.csv'),
        )
    )
    ->filter(ref('valid')->isTrue())
    // at this point all invalid records are stored in another file
    ->drop('valid')
    // we need to extract from the database user_id based on user_email
    // lets do it in batches of 100
    ->batchSize(100)
    ->joinEach(
        new UserIdJoinDataFrameFactory($connection),
        join_on(['user_email' => 'user_email'])
    )
    // user email is no longer needed
    ->drop('user_email')
    // defines the batch size for the insert operation
    ->batchSize(100)
    // when import file does not have address id we need to generate
    ->withEntry('id', when(ref('id')->isNull(), lit(Uuid::uuid4()->toString()), ref('id')))
    ->write(
        to_dbal_table_insert(
            $connection,
            'user_addresses',
            [
                'conflict_columns' => ['id']
            ]
        )
    )
    ->run(analyze: true);


echo 'Total rows: ' . $report->statistics()->totalRows() . PHP_EOL;
echo 'Execution time: ' . $report->statistics()->executionTime->highResolutionTime->toString() . PHP_EOL;

As you can see, processing large amounts of data is not an easy task, but with the right tools, it can be done without reinventing the wheel.

In the next article, we are going to take a look at how to export data from a database to a file.

If you would like to run code that was covered in this article, please use this fake data generator.

Contributors

Join us on GitHub external resource
scroll back to top