flow php

Example: Database upsert

Topic: Data writing

Description

Upsert (insert or update) data into a database

There are two ways to update data in the database:

  • to_dbal_table_insert(...)
  • to_dbal_table_update(...)

In order to insert or update data in the database you need to use InsertOptions which are platform-specific.

  • MySQLInsertOptions - mysql_insert_options(...)
  • PostgreSQLInsertOptions - postgresql_insert_options(...)
  • SQLiteInsertOptions - sqlite_insert_options(...)
composer.json
{
    "name": "flow-php/examples",
    "description": "Flow PHP - Examples",
    "license": "MIT",
    "type": "library",
    "require": {
        "flow-php/etl": "1.x-dev",
        "flow-php/etl-adapter-doctrine": "1.x-dev",
        "flow-php/doctrine-dbal-bulk": "1.x-dev",
        "fakerphp/faker": "^1.24",
        "symfony/uid": "^7.2"
    }
}
code.php
<?php

declare(strict_types=1);

use function Flow\ETL\Adapter\Doctrine\{from_dbal_query,
    sqlite_insert_options,
    to_dbal_table_insert};
use function Flow\ETL\DSL\{data_frame, from_array, to_stream};
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Schema\{Column, Table, UniqueConstraint};
use Doctrine\DBAL\Types\{Type, Types};

require __DIR__ . '/vendor/autoload.php';

require __DIR__ . '/generate_static_orders.php';

$connection = DriverManager::getConnection([
    'path' => __DIR__ . '/output/orders.db',
    'driver' => 'pdo_sqlite',
]);

$schemaManager = $connection->createSchemaManager();

if (!$schemaManager->tablesExist(['orders'])) {
    $schemaManager->createTable(new Table(
        $table = 'orders',
        [
            new Column('order_id', Type::getType(Types::GUID), ['notnull' => true]),
            new Column('created_at', Type::getType(Types::DATETIME_IMMUTABLE), ['notnull' => true]),
            new Column('updated_at', Type::getType(Types::DATETIME_IMMUTABLE), ['notnull' => false]),
            new Column('discount', Type::getType(Types::FLOAT), ['notnull' => false]),
            new Column('email', Type::getType(Types::STRING), ['notnull' => true, 'length' => 255]),
            new Column('customer', Type::getType(Types::STRING), ['notnull' => true, 'length' => 255]),
            new Column('address', Type::getType(Types::JSON), ['notnull' => true]),
            new Column('notes', Type::getType(Types::JSON), ['notnull' => true]),
            new Column('items', Type::getType(Types::JSON), ['notnull' => true]),
        ],
        uniqueConstraints: [
            new UniqueConstraint('orders_order_id', ['order_id']),
        ]
    ));
}

$orderIds = [
    'c0a43894-0102-4a4e-9fcd-393ef9e4f16a',
    '83fd51a4-9bd1-4b40-8f6e-6a7cc940bb5a',
    '7c65db1a-410f-4e91-8aeb-66fb3f1665f7',
    '5af1d56c-a9f7-411e-8738-865942d6c40f',
    '3a3ae1a9-debd-425a-8f9d-63c3315bc483',
    '27d8ee4d-94cc-47fa-bc14-209a4ab2eb45',
    'cc4fd722-1407-4781-9ad4-fa53966060af',
    '718360e1-c4c9-40f4-84e2-6f7898788883',
    'ea7c731c-ce3b-40bb-bbf8-79f1c717b6ca',
    '17b0d6c5-dd8f-4d5a-ae06-1df15b67c82c',
];

data_frame()
    ->read(from_array(generateStaticOrders($orderIds)))
    ->write(
        to_dbal_table_insert(
            DriverManager::getConnection([
                'path' => __DIR__ . '/output/orders.db',
                'driver' => 'pdo_sqlite',
            ]),
            'orders',
            sqlite_insert_options(conflict_columns: ['order_id'])
        )
    )
    // second insert that normally would fail due to Integrity constraint violation
    ->write(
        to_dbal_table_insert(
            DriverManager::getConnection([
                'path' => __DIR__ . '/output/orders.db',
                'driver' => 'pdo_sqlite',
            ]),
            'orders',
            sqlite_insert_options(conflict_columns: ['order_id'])
        )
    )
    ->run();

data_frame()
    ->read(
        from_dbal_query(
            DriverManager::getConnection([
                'path' => __DIR__ . '/output/orders.db',
                'driver' => 'pdo_sqlite',
            ]),
            'SELECT COUNT(*) as total_rows FROM orders'
        )
    )
    ->write(to_stream(__DIR__ . '/output.txt', truncate: false))
    ->run();

Output

+------------+
| total_rows |
+------------+
|         10 |
+------------+
1 rows

Contributors

Join us on GitHub external resource
scroll back to top
Oops, sorry we have to ask this—but we need your okay to use cookies for page view stats.
It helps us improve this open-source project and keep it awesome!
Care to help out?
Cookies Status: Rejected