flow php

UNIFIED DATA PROCESSING FRAMEWORK

composer require flow-php/etl ~0.33.0

ChangelogRelease Cycle

play Try Playground

elephant
extract

Extracts

Read from various data sources.

arrow
transform

Transforms

Shape and optimize for your needs.

arrow
load

Loads

Store and secure in one of many available data sinks.

Examples:

Description

Delete rows from a database table that match records in your DataFrame. This is useful for synchronizing data or removing outdated entries.

Documentation

Code

play
<?php

declare(strict_types=1);

use function Flow\ETL\Adapter\Doctrine\{
    sqlite_insert_options,
    to_dbal_table_delete,
    to_dbal_table_insert};
use function Flow\ETL\DSL\{data_frame, from_array};
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Schema\{Column, Table, UniqueConstraint};
use Doctrine\DBAL\Types\{Type, Types};

require __DIR__ . '/vendor/autoload.php';

require __DIR__ . '/generate_static_orders.php';

$connection = DriverManager::getConnection([
    'path' => __DIR__ . '/output/orders.db',
    'driver' => 'pdo_sqlite',
]);

$schemaManager = $connection->createSchemaManager();

if (!$schemaManager->tablesExist(['orders'])) {
    $schemaManager->createTable(new Table(
        $table = 'orders',
        [
            new Column('order_id', Type::getType(Types::GUID), ['notnull' => true]),
            new Column('created_at', Type::getType(Types::DATETIME_IMMUTABLE), ['notnull' => true]),
            new Column('updated_at', Type::getType(Types::DATETIME_IMMUTABLE), ['notnull' => false]),
            new Column('discount', Type::getType(Types::FLOAT), ['notnull' => false]),
            new Column('email', Type::getType(Types::STRING), ['notnull' => true, 'length' => 255]),
            new Column('customer', Type::getType(Types::STRING), ['notnull' => true, 'length' => 255]),
            new Column('address', Type::getType(Types::JSON), ['notnull' => true]),
            new Column('notes', Type::getType(Types::JSON), ['notnull' => true]),
            new Column('items', Type::getType(Types::JSON), ['notnull' => true]),
        ],
        uniqueConstraints: [
            new UniqueConstraint('orders_order_id', ['order_id']),
        ]
    ));
}

$orderIds = [
    'c0a43894-0102-4a4e-9fcd-393ef9e4f16a',
    '83fd51a4-9bd1-4b40-8f6e-6a7cc940bb5a',
    '7c65db1a-410f-4e91-8aeb-66fb3f1665f7',
    '5af1d56c-a9f7-411e-8738-865942d6c40f',
    '3a3ae1a9-debd-425a-8f9d-63c3315bc483',
    '27d8ee4d-94cc-47fa-bc14-209a4ab2eb45',
    'cc4fd722-1407-4781-9ad4-fa53966060af',
    '718360e1-c4c9-40f4-84e2-6f7898788883',
    'ea7c731c-ce3b-40bb-bbf8-79f1c717b6ca',
    '17b0d6c5-dd8f-4d5a-ae06-1df15b67c82c',
];

data_frame()
    ->read(from_array(generateStaticOrders($orderIds)))
    ->write(
        to_dbal_table_insert(
            DriverManager::getConnection(['path' => __DIR__ . '/output/orders.db', 'driver' => 'pdo_sqlite']),
            'orders',
            sqlite_insert_options(conflict_columns: ['order_id'])
        )
    )
    ->run();

data_frame()
    ->read(from_array(\array_map(static fn (string $id) => ['order_id' => $id], $orderIds)))
    ->write(
        to_dbal_table_delete(
            DriverManager::getConnection(['path' => __DIR__ . '/output/orders.db', 'driver' => 'pdo_sqlite']),
            'orders',
        )
    )
    ->run();

$orders = $connection->fetchAllAssociative($connection->createQueryBuilder()->select('*')->from('orders')->orderBy('order_id')->getSQL());

assert(\count($orders) === 0, 'There should be no orders left in the database after deletion.');

Contributors

Join us on GitHub external resource
scroll back to top