Skip to content
Search
Examples

Data frame

Description

Delete rows from a database table that match records in your DataFrame. This is useful for synchronizing data or removing outdated entries.

Documentation

Code

<?php

declare(strict_types=1);

use function Flow\ETL\Adapter\Doctrine\{
    sqlite_insert_options,
    to_dbal_table_delete,
    to_dbal_table_insert};
use function Flow\ETL\DSL\{data_frame, from_array, from_csv};
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Schema\{Column, Table, UniqueConstraint};
use Doctrine\DBAL\Types\{Type, Types};

require __DIR__ . '/vendor/autoload.php';

if (!\extension_loaded('pdo_sqlite')) {
    print 'Example skipped. Requires PDO SQLite extension which is not available in this environment.' . PHP_EOL;

    return;
}

$connection = DriverManager::getConnection([
    'path' => __DIR__ . '/output/orders.db',
    'driver' => 'pdo_sqlite',
]);

$schemaManager = $connection->createSchemaManager();

if (!$schemaManager->tablesExist(['orders'])) {
    $schemaManager->createTable(new Table(
        $table = 'orders',
        [
            new Column('order_id', Type::getType(Types::GUID), ['notnull' => true]),
            new Column('created_at', Type::getType(Types::DATETIME_IMMUTABLE), ['notnull' => true]),
            new Column('updated_at', Type::getType(Types::DATETIME_IMMUTABLE), ['notnull' => false]),
            new Column('discount', Type::getType(Types::FLOAT), ['notnull' => false]),
            new Column('email', Type::getType(Types::STRING), ['notnull' => true, 'length' => 255]),
            new Column('customer', Type::getType(Types::STRING), ['notnull' => true, 'length' => 255]),
            new Column('address', Type::getType(Types::JSON), ['notnull' => true]),
            new Column('notes', Type::getType(Types::JSON), ['notnull' => true]),
            new Column('items', Type::getType(Types::JSON), ['notnull' => true]),
        ],
        uniqueConstraints: [
            new UniqueConstraint('orders_order_id', ['order_id']),
        ]
    ));
}

data_frame()
    ->read(from_csv(__DIR__ . '/data/orders.csv'))
    ->select('order_id', 'created_at', 'updated_at', 'discount', 'email', 'customer', 'address', 'notes', 'items')
    ->limit(10)
    ->write(
        to_dbal_table_insert(
            DriverManager::getConnection(['path' => __DIR__ . '/output/orders.db', 'driver' => 'pdo_sqlite']),
            'orders',
            sqlite_insert_options(conflict_columns: ['order_id'])
        )
    )
    ->run();

$orderIds = \array_column(
    $connection->fetchAllAssociative('SELECT order_id FROM orders'),
    'order_id'
);

data_frame()
    ->read(from_array(\array_map(static fn (string $id) => ['order_id' => $id], $orderIds)))
    ->write(
        to_dbal_table_delete(
            DriverManager::getConnection(['path' => __DIR__ . '/output/orders.db', 'driver' => 'pdo_sqlite']),
            'orders',
        )
    )
    ->run();

$orders = $connection->fetchAllAssociative('SELECT * FROM orders ORDER BY order_id');

assert(\count($orders) === 0, 'There should be no orders left in the database after deletion.');
Contributors

Built in the open.

Join us on GitHub
scroll back to top