Introduction
ETL Adapter: Doctrine
Flow PHP's Adapter Doctrine is an adept library designed to seamlessly integrate Doctrine ORM within your ETL (Extract, Transform, Load) workflows. This adapter is crucial for developers seeking to effortlessly interact with databases using Doctrine ORM, ensuring a streamlined and reliable data transformation process. By harnessing the Adapter Doctrine library, developers can tap into a robust set of features engineered for precise database interaction through Doctrine ORM, simplifying complex data transformations and enhancing data processing efficiency. The Adapter Doctrine library encapsulates a rich set of functionalities, offering a streamlined API for managing database tasks, which is crucial in contemporary data processing and transformation scenarios. This library epitomizes Flow PHP's commitment to delivering versatile and efficient data processing solutions, making it an excellent choice for developers dealing with database operations in large-scale and data-intensive environments. With Flow PHP's Adapter Doctrine, managing database interactions within your ETL workflows becomes a more simplified and efficient endeavor, perfectly aligning with the robust and adaptable nature of the Flow PHP ecosystem.
Installation
composer require flow-php/etl-adapter-doctrine:~0.22.0
Description
Adapter for ETL using bulk operations from Doctrine Dbal Bulk.
Loader - DbalLoader
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
data_frame()
->read(from_())
->write(to_dbal_table_insert(['url' => \getenv('PGSQL_DATABASE_URL')], 'your-table-name'))
->run();
All supported DbalLoader operations via DSL functions:
to_dbal_table_insert(array|Connection $connection, string $table, ?InsertOptions $options = null)
- Insert new rowsto_dbal_table_update(array|Connection $connection, string $table, ?UpdateOptions $options = null)
- Update existing rowsto_dbal_table_delete(array|Connection $connection, string $table)
- Delete rows
You can also configure bulk operations with platform-specific options:
use function Flow\ETL\Adapter\Doctrine\{to_dbal_table_insert, postgresql_insert_options};
data_frame()
->read(from_())
->write(to_dbal_table_insert(
$connection,
'users',
postgresql_insert_options(conflict_columns: ['id'])
))
->run();
Type Detection and Optimization
DbalLoader
now provides advanced type detection capabilities to optimize database operations:
Automatic Type Detection from Flow Schema
By default, DbalLoader
automatically detects column types from the Flow Schema of your data:
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
data_frame()
->read(from_())
->write(to_dbal_table_insert($connection, 'users'))
->run();
// Types are automatically detected from the Flow Schema
Manual Type Override
You can override specific column types for fine-grained control:
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
use Doctrine\DBAL\Types\Type;
use Doctrine\DBAL\Types\Types;
data_frame()
->read(from_())
->write(to_dbal_table_insert($connection, 'users')
->withColumnTypes([
'id' => Type::getType(Types::INTEGER),
'email' => Type::getType(Types::STRING),
'created_at' => Type::getType(Types::DATETIME_IMMUTABLE),
]))
->run();
Custom Type Detector
For advanced scenarios, you can provide a custom type detector with your own type mapping:
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
use Flow\ETL\Adapter\Doctrine\{DbalTypesDetector, TypesMap};
use Flow\Types\Type\Native\StringType;
use Doctrine\DBAL\Types\TextType;
$customTypesMap = new TypesMap([
StringType::class => TextType::class, // Map Flow strings to DBAL text type
]);
data_frame()
->read(from_())
->write(to_dbal_table_insert($connection, 'users')
->withTypesDetector(new DbalTypesDetector($customTypesMap)))
->run();
Data Normalization
DbalLoader
automatically handles data normalization for database compatibility:
- XML Entries:
XMLEntry
andXMLElementEntry
objects are automatically converted to their string representation - Complex Types: Lists, Maps, and Structures are serialized as JSON
- Type Safety: All data is normalized while preserving type information for optimal database performance
use function Flow\ETL\DSL\{data_frame, from_array, xml_entry};
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
data_frame()
->read(from_array([
['id' => 1, 'data' => xml_entry('data', $domDocument)],
['id' => 2, 'data' => xml_entry('data', $domElement)],
]))
->write(to_dbal_table_insert($connection, 'xml_table'))
->run();
// XML entries are automatically converted to strings before database insertion
Extractor - DbalQuery
This simple but powerful extractor let you extract data from a single or multiple parametrized queries.
Single Query
data_frame()
->read(DbalQueryExtractor::singleQuery($connection, "SELECT * FROM {$table} ORDER BY id"))
->write(to_())
->run()
Single Parametrized Query
data_frame()
->read(DbalQueryExtractor::singleQuery($connection, "SELECT * FROM {$table} WHERE id = :id", ['id' => 1]))
->write(to_())
->run()
Multiple Parametrized Query
data_frame()
->read(
new DbalQueryExtractor(
$connection
"SELECT * FROM {$table} ORDER BY id LIMIT :limit OFFSET :offset",
new ParametersSet(
['limit' => 2, 'offset' => 0],
['limit' => 2, 'offset' => 2],
['limit' => 2, 'offset' => 4],
['limit' => 2, 'offset' => 6],
['limit' => 2, 'offset' => 8],
)
)
)
->write(to_())
->run()
In this case, query will be executed exactly five times, taking every time next entry of parameters from ParametersSet.
Schema Converter
With to_dbal_schema_table()
function we can convert any Flow Schema (which represents a dataset)
to Doctrine DBAL Schema Table.
By providing metadata defined in \Flow\ETL\Adapter\Doctrine\DbalMetadata
we can also add additional information to the schema,
like length, primary key, index, precision, etc
use function Flow\ETL\DSL\bool_schema;
use function Flow\ETL\DSL\date_schema;
use function Flow\ETL\DSL\float_schema;
use function Flow\ETL\DSL\int_schema;
use function Flow\ETL\DSL\json_schema;
use function Flow\ETL\DSL\list_schema;
use function Flow\ETL\DSL\map_schema;
use function Flow\ETL\DSL\schema;
use function Flow\ETL\DSL\str_schema;
use function Flow\ETL\DSL\type_integer;
use function Flow\ETL\DSL\type_list;
use function Flow\ETL\DSL\type_map;
use function Flow\ETL\DSL\type_string;
$flowSchema = schema(
int_schema('int', nullable: false, metadata: DbalMetadata::primaryKey('pk_test')),
str_schema('str', nullable: true, metadata: DbalMetadata::primaryKey('pk_test')),
str_schema('str_with_length', true, DbalMetadata::length(255)),
str_schema('str_unique', true, DbalMetadata::indexUnique('idx_str_unique')),
date_schema('date', nullable: true, metadata: DbalMetadata::index('idx_date')),
float_schema('float', nullable: true, metadata: DbalMetadata::precision(10)->merge(DbalMetadata::scale(2))),
bool_schema('bool', nullable: true, metadata: DbalMetadata::default(true)),
json_schema('json', nullable: true, metadata: DbalMetadata::platformOptions(['jsonb' => true])),
list_schema('list', type_list(type_integer()), metadata: DbalMetadata::columnDefinition('integer[]')),
map_schema('map', type_map(type_integer(), type_string()), metadata: DbalMetadata::comment('test comment!')),
);
Can be converted to Doctrine DBAL Schema Table like this:
use function Flow\ETL\Adapter\Doctrine\to_dbal_schema_table;
to_dbal_schema_table($flowSchema, 'test')
Will generate:
use Doctrine\DBAL\Schema\Column;
use Doctrine\DBAL\Schema\Index;
use Doctrine\DBAL\Schema\Table;
new Table(
'test',
[
new Column('int', Type::getType('integer'), ['notnull' => true]),
new Column('str', Type::getType('string'), ['notnull' => false]),
new Column('str_with_length', Type::getType('string'), ['notnull' => false, 'length' => 255]),
new Column('str_unique', Type::getType('string'), ['notnull' => false]),
new Column('float', Type::getType('float'), ['notnull' => false, 'precision' => 10, 'scale' => 2]),
new Column('bool', Type::getType('boolean'), ['notnull' => false, 'default' => true]),
new Column('json', Type::getType('json'), ['notnull' => false, 'platformOptions' => ['jsonb' => true]]),
new Column('list', Type::getType('json'), ['notnull' => true, 'columnDefinition' => 'integer[]']),
new Column('map', Type::getType('json'), ['notnull' => true, 'comment' => 'test comment!']),
new Column('date', Type::getType('date_immutable'), ['notnull' => false]),
],
[
new Index('pk_test', ['int', 'str'], true, true),
new Index('idx_date', ['date'], false, false),
new Index('idx_str_unique', ['str_unique'], true, false),
]
);
Schema Converter - Types Map
When types map is not provided, the default one will be used:
public const FLOW_TYPES = [
StringType::class => \Doctrine\DBAL\Types\StringType::class,
IntegerType::class => \Doctrine\DBAL\Types\IntegerType::class,
FloatType::class => \Doctrine\DBAL\Types\FloatType::class,
BooleanType::class => \Doctrine\DBAL\Types\BooleanType::class,
DateType::class => \Doctrine\DBAL\Types\DateImmutableType::class,
TimeType::class => \Doctrine\DBAL\Types\TimeImmutableType::class,
DateTimeType::class => \Doctrine\DBAL\Types\DateTimeImmutableType::class,
UuidType::class => \Doctrine\DBAL\Types\GuidType::class,
JsonType::class => \Doctrine\DBAL\Types\JsonType::class,
XMLType::class => \Doctrine\DBAL\Types\StringType::class,
XMLElementType::class => \Doctrine\DBAL\Types\StringType::class,
ListType::class => \Doctrine\DBAL\Types\JsonType::class,
MapType::class => \Doctrine\DBAL\Types\JsonType::class,
StructureType::class => \Doctrine\DBAL\Types\JsonType::class,
];
The TypesMap
class provides bidirectional mapping between Flow types and Doctrine DBAL types, allowing for flexible type conversion in both directions:
use Flow\ETL\Adapter\Doctrine\TypesMap;
use Flow\Types\Type\Native\StringType;
use Doctrine\DBAL\Types\TextType;
// Create custom type mapping
$customMap = new TypesMap([
StringType::class => TextType::class,
]);
// Convert Flow type to DBAL type
$dbalType = $customMap->toDbalType(StringType::class);
// Convert DBAL type to Flow type instance
$flowType = $customMap->toFlowType(TextType::class);