flow php

ETL Adapter: Doctrine

Flow PHP's Adapter Doctrine is an adept library designed to seamlessly integrate Doctrine ORM within your ETL (Extract, Transform, Load) workflows. This adapter is crucial for developers seeking to effortlessly interact with databases using Doctrine ORM, ensuring a streamlined and reliable data transformation process. By harnessing the Adapter Doctrine library, developers can tap into a robust set of features engineered for precise database interaction through Doctrine ORM, simplifying complex data transformations and enhancing data processing efficiency. The Adapter Doctrine library encapsulates a rich set of functionalities, offering a streamlined API for managing database tasks, which is crucial in contemporary data processing and transformation scenarios. This library epitomizes Flow PHP's commitment to delivering versatile and efficient data processing solutions, making it an excellent choice for developers dealing with database operations in large-scale and data-intensive environments. With Flow PHP's Adapter Doctrine, managing database interactions within your ETL workflows becomes a more simplified and efficient endeavor, perfectly aligning with the robust and adaptable nature of the Flow PHP ecosystem.

Installation

composer require flow-php/etl-adapter-doctrine:~0.22.0

Description

Adapter for ETL using bulk operations from Doctrine Dbal Bulk.

Loader - DbalLoader

use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;

data_frame()
    ->read(from_())
    ->write(to_dbal_table_insert(['url' => \getenv('PGSQL_DATABASE_URL')], 'your-table-name'))
    ->run();

All supported DbalLoader operations via DSL functions:

  • to_dbal_table_insert(array|Connection $connection, string $table, ?InsertOptions $options = null) - Insert new rows
  • to_dbal_table_update(array|Connection $connection, string $table, ?UpdateOptions $options = null) - Update existing rows
  • to_dbal_table_delete(array|Connection $connection, string $table) - Delete rows

You can also configure bulk operations with platform-specific options:

use function Flow\ETL\Adapter\Doctrine\{to_dbal_table_insert, postgresql_insert_options};

data_frame()
    ->read(from_())
    ->write(to_dbal_table_insert(
        $connection,
        'users',
        postgresql_insert_options(conflict_columns: ['id'])
    ))
    ->run();

Type Detection and Optimization

DbalLoader now provides advanced type detection capabilities to optimize database operations:

Automatic Type Detection from Flow Schema

By default, DbalLoader automatically detects column types from the Flow Schema of your data:

use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;

data_frame()
    ->read(from_())
    ->write(to_dbal_table_insert($connection, 'users'))
    ->run();
// Types are automatically detected from the Flow Schema

Manual Type Override

You can override specific column types for fine-grained control:

use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
use Doctrine\DBAL\Types\Type;
use Doctrine\DBAL\Types\Types;

data_frame()
    ->read(from_())
    ->write(to_dbal_table_insert($connection, 'users')
        ->withColumnTypes([
            'id' => Type::getType(Types::INTEGER),
            'email' => Type::getType(Types::STRING),
            'created_at' => Type::getType(Types::DATETIME_IMMUTABLE),
        ]))
    ->run();

Custom Type Detector

For advanced scenarios, you can provide a custom type detector with your own type mapping:

use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
use Flow\ETL\Adapter\Doctrine\{DbalTypesDetector, TypesMap};
use Flow\Types\Type\Native\StringType;
use Doctrine\DBAL\Types\TextType;

$customTypesMap = new TypesMap([
    StringType::class => TextType::class, // Map Flow strings to DBAL text type
]);

data_frame()
    ->read(from_())
    ->write(to_dbal_table_insert($connection, 'users')
        ->withTypesDetector(new DbalTypesDetector($customTypesMap)))
    ->run();

Data Normalization

DbalLoader automatically handles data normalization for database compatibility:

  • XML Entries: XMLEntry and XMLElementEntry objects are automatically converted to their string representation
  • Complex Types: Lists, Maps, and Structures are serialized as JSON
  • Type Safety: All data is normalized while preserving type information for optimal database performance
use function Flow\ETL\DSL\{data_frame, from_array, xml_entry};
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;

data_frame()
    ->read(from_array([
        ['id' => 1, 'data' => xml_entry('data', $domDocument)],
        ['id' => 2, 'data' => xml_entry('data', $domElement)],
    ]))
    ->write(to_dbal_table_insert($connection, 'xml_table'))
    ->run();
// XML entries are automatically converted to strings before database insertion

Extractor - DbalQuery

This simple but powerful extractor let you extract data from a single or multiple parametrized queries.

Single Query

data_frame()
    ->read(DbalQueryExtractor::singleQuery($connection, "SELECT * FROM {$table} ORDER BY id"))
    ->write(to_())
    ->run()

Single Parametrized Query

data_frame()
    ->read(DbalQueryExtractor::singleQuery($connection, "SELECT * FROM {$table} WHERE id = :id", ['id' => 1]))
    ->write(to_())
    ->run()

Multiple Parametrized Query

data_frame()
    ->read(
        new DbalQueryExtractor(
            $connection
            "SELECT * FROM {$table} ORDER BY id LIMIT :limit OFFSET :offset",
            new ParametersSet(
                ['limit' => 2, 'offset' => 0],
                ['limit' => 2, 'offset' => 2],
                ['limit' => 2, 'offset' => 4],
                ['limit' => 2, 'offset' => 6],
                ['limit' => 2, 'offset' => 8],
            )
        )
    )
    ->write(to_())
    ->run()

In this case, query will be executed exactly five times, taking every time next entry of parameters from ParametersSet.

Schema Converter

With to_dbal_schema_table() function we can convert any Flow Schema (which represents a dataset) to Doctrine DBAL Schema Table.

By providing metadata defined in \Flow\ETL\Adapter\Doctrine\DbalMetadata we can also add additional information to the schema, like length, primary key, index, precision, etc

use function Flow\ETL\DSL\bool_schema;
use function Flow\ETL\DSL\date_schema;
use function Flow\ETL\DSL\float_schema;
use function Flow\ETL\DSL\int_schema;
use function Flow\ETL\DSL\json_schema;
use function Flow\ETL\DSL\list_schema;
use function Flow\ETL\DSL\map_schema;
use function Flow\ETL\DSL\schema;
use function Flow\ETL\DSL\str_schema;
use function Flow\ETL\DSL\type_integer;
use function Flow\ETL\DSL\type_list;
use function Flow\ETL\DSL\type_map;
use function Flow\ETL\DSL\type_string;

$flowSchema = schema(
    int_schema('int', nullable: false, metadata: DbalMetadata::primaryKey('pk_test')),
    str_schema('str', nullable: true, metadata: DbalMetadata::primaryKey('pk_test')),
    str_schema('str_with_length', true, DbalMetadata::length(255)),
    str_schema('str_unique', true, DbalMetadata::indexUnique('idx_str_unique')),
    date_schema('date', nullable: true, metadata: DbalMetadata::index('idx_date')),
    float_schema('float', nullable: true, metadata: DbalMetadata::precision(10)->merge(DbalMetadata::scale(2))),
    bool_schema('bool', nullable: true, metadata: DbalMetadata::default(true)),
    json_schema('json', nullable: true, metadata: DbalMetadata::platformOptions(['jsonb' => true])),
    list_schema('list', type_list(type_integer()), metadata: DbalMetadata::columnDefinition('integer[]')),
    map_schema('map', type_map(type_integer(), type_string()), metadata: DbalMetadata::comment('test comment!')),
);

Can be converted to Doctrine DBAL Schema Table like this:

use function Flow\ETL\Adapter\Doctrine\to_dbal_schema_table;

to_dbal_schema_table($flowSchema, 'test')

Will generate:


use Doctrine\DBAL\Schema\Column;
use Doctrine\DBAL\Schema\Index;
use Doctrine\DBAL\Schema\Table;

new Table(
    'test',
    [
        new Column('int', Type::getType('integer'), ['notnull' => true]),
        new Column('str', Type::getType('string'), ['notnull' => false]),
        new Column('str_with_length', Type::getType('string'), ['notnull' => false, 'length' => 255]),
        new Column('str_unique', Type::getType('string'), ['notnull' => false]),
        new Column('float', Type::getType('float'), ['notnull' => false, 'precision' => 10, 'scale' => 2]),
        new Column('bool', Type::getType('boolean'), ['notnull' => false, 'default' => true]),
        new Column('json', Type::getType('json'), ['notnull' => false, 'platformOptions' => ['jsonb' => true]]),
        new Column('list', Type::getType('json'), ['notnull' => true, 'columnDefinition' => 'integer[]']),
        new Column('map', Type::getType('json'), ['notnull' => true, 'comment' => 'test comment!']),
        new Column('date', Type::getType('date_immutable'), ['notnull' => false]),
    ],
    [
        new Index('pk_test', ['int', 'str'], true, true),
        new Index('idx_date', ['date'], false, false),
        new Index('idx_str_unique', ['str_unique'], true, false),
    ]
);

Schema Converter - Types Map

When types map is not provided, the default one will be used:

public const FLOW_TYPES = [
    StringType::class => \Doctrine\DBAL\Types\StringType::class,
    IntegerType::class => \Doctrine\DBAL\Types\IntegerType::class,
    FloatType::class => \Doctrine\DBAL\Types\FloatType::class,
    BooleanType::class => \Doctrine\DBAL\Types\BooleanType::class,
    DateType::class => \Doctrine\DBAL\Types\DateImmutableType::class,
    TimeType::class => \Doctrine\DBAL\Types\TimeImmutableType::class,
    DateTimeType::class => \Doctrine\DBAL\Types\DateTimeImmutableType::class,
    UuidType::class => \Doctrine\DBAL\Types\GuidType::class,
    JsonType::class => \Doctrine\DBAL\Types\JsonType::class,
    XMLType::class => \Doctrine\DBAL\Types\StringType::class,
    XMLElementType::class => \Doctrine\DBAL\Types\StringType::class,
    ListType::class => \Doctrine\DBAL\Types\JsonType::class,
    MapType::class => \Doctrine\DBAL\Types\JsonType::class,
    StructureType::class => \Doctrine\DBAL\Types\JsonType::class,
];

The TypesMap class provides bidirectional mapping between Flow types and Doctrine DBAL types, allowing for flexible type conversion in both directions:

use Flow\ETL\Adapter\Doctrine\TypesMap;
use Flow\Types\Type\Native\StringType;
use Doctrine\DBAL\Types\TextType;

// Create custom type mapping
$customMap = new TypesMap([
    StringType::class => TextType::class,
]);

// Convert Flow type to DBAL type
$dbalType = $customMap->toDbalType(StringType::class);

// Convert DBAL type to Flow type instance
$flowType = $customMap->toFlowType(TextType::class);

Contributors

Join us on GitHub external resource
scroll back to top