DSL References
DSL stands for Domain Specific Language. In Flow, the DSL is a set of small functions that wrap object construction so pipelines read top-to-bottom. See the examples for usage in context.
EXTRACTOR
/**
* @deprecated use from_dbal_queries() instead
*
* @param null|ParametersSet $parameters_set - each one parameters array will be evaluated as new query
* @param array<int|string, DbalArrayType|DbalParameterType|DbalType|int|string> $types
*/
dbal_from_queries(Connection $connection, string $query, ?ParametersSet $parameters_set, array $types) : DbalQueryExtractor /**
* @deprecated use from_dbal_query() instead
*
* @param array<string, mixed>|list<mixed> $parameters - @deprecated use DbalQueryExtractor::withParameters() instead
* @param array<int<0, max>|string, DbalArrayType|DbalParameterType|DbalType|string> $types - @deprecated use DbalQueryExtractor::withTypes() instead
*/
dbal_from_query(Connection $connection, string $query, array $parameters, array $types) : DbalQueryExtractor from_dbal_key_set_qb(Connection $connection, QueryBuilder $queryBuilder, KeySet $key_set) : DbalKeySetExtractor /**
* @param Connection $connection
* @param string|Table $table
* @param array<OrderBy>|OrderBy $order_by
* @param int $page_size
* @param null|int $maximum
*
* @throws InvalidArgumentException
*/
from_dbal_limit_offset(Connection $connection, Table|string $table, OrderBy|array $order_by, int $page_size, ?int $maximum) : DbalLimitOffsetExtractor /**
* @param Connection $connection
* @param int $page_size
* @param null|int $maximum - maximum can also be taken from a query builder, $maximum however is used regardless of the query builder if it's set
* @param int $offset - offset can also be taken from a query builder, $offset however is used regardless of the query builder if it's set to non 0 value
*/
from_dbal_limit_offset_qb(Connection $connection, QueryBuilder $queryBuilder, int $page_size, ?int $maximum, int $offset) : DbalLimitOffsetExtractor /**
* @param null|ParametersSet $parameters_set - each one parameters array will be evaluated as new query
* @param array<int|string, DbalArrayType|DbalParameterType|DbalType|int|string> $types
*/
from_dbal_queries(Connection $connection, string $query, ?ParametersSet $parameters_set, array $types) : DbalQueryExtractor /**
* @param array<string, mixed>|list<mixed> $parameters - @deprecated use DbalQueryExtractor::withParameters() instead
* @param array<int<0, max>|string, DbalArrayType|DbalParameterType|DbalType|string> $types - @deprecated use DbalQueryExtractor::withTypes() instead
*/
from_dbal_query(Connection $connection, string $query, array $parameters, array $types) : DbalQueryExtractor HELPER
/**
* @param array<string, mixed>|Connection $connection
* @param string $query
* @param QueryParameter ...$parameters
*/
dbal_dataframe_factory(Connection|array $connection, string $query, QueryParameter $parameters) : DbalDataFrameFactory /**
* @param array<string> $update_columns
*/
mysql_insert_options(?bool $skip_conflicts, ?bool $upsert, array $update_columns) : MySQLInsertOptions pagination_key_asc(string $column, ParameterType|Type|string|int $type) : Key pagination_key_desc(string $column, ParameterType|Type|string|int $type) : Key pagination_key_set(Key $keys) : KeySet /**
* @param array<string> $conflict_columns
* @param array<string> $update_columns
*/
postgresql_insert_options(?bool $skip_conflicts, ?string $constraint, array $conflict_columns, array $update_columns) : PostgreSQLInsertOptions /**
* @param array<string> $primary_key_columns
* @param array<string> $update_columns
*/
postgresql_update_options(array $primary_key_columns, array $update_columns) : PostgreSQLUpdateOptions /**
* @param array<string> $conflict_columns
* @param array<string> $update_columns
*/
sqlite_insert_options(?bool $skip_conflicts, array $conflict_columns, array $update_columns) : SqliteInsertOptions /**
* Converts a Doctrine\DBAL\Schema\Table to a Flow\ETL\Schema.
*
* @param array<class-string<\Flow\Types\Type<mixed>>, class-string<\Doctrine\DBAL\Types\Type>> $types_map
*
* @return Schema
*/
table_schema_to_flow_schema(Table $table, array $types_map) : Schema /**
* Converts a Flow\ETL\Schema to a Doctrine\DBAL\Schema\Table.
*
* @param Schema $schema
* @param array<array-key, mixed> $table_options
* @param array<class-string<\Flow\Types\Type<mixed>>, class-string<\Doctrine\DBAL\Types\Type>> $types_map
*/
to_dbal_schema_table(Schema $schema, string $table_name, array $table_options, array $types_map) : Table LOADER
/**
* Delete rows from database table based on the provided data.
*
* In order to control the size of the single request, use DataFrame::chunkSize() method just before calling DataFrame::load().
*
* @param array<string, mixed>|Connection $connection
*
* @throws InvalidArgumentException
*/
to_dbal_table_delete(Connection|array $connection, string $table) : DbalLoader /**
* Insert new rows into a database table.
* Insert can also be used as an upsert with the help of InsertOptions.
* InsertOptions are platform specific, so please choose the right one for your database.
*
* - MySQLInsertOptions
* - PostgreSQLInsertOptions
* - SqliteInsertOptions
*
* In order to control the size of the single insert, use DataFrame::chunkSize() method just before calling DataFrame::load().
*
* @param array<string, mixed>|Connection $connection
*
* @throws InvalidArgumentException
*/
to_dbal_table_insert(Connection|array $connection, string $table, ?InsertOptions $options) : DbalLoader /**
* Update existing rows in database.
*
* In order to control the size of the single request, use DataFrame::chunkSize() method just before calling DataFrame::load().
*
* @param array<string, mixed>|Connection $connection
*
* @throws InvalidArgumentException
*/
to_dbal_table_update(Connection|array $connection, string $table, ?UpdateOptions $options) : DbalLoader /**
* Execute multiple loaders within a database transaction.
* Each batch of rows will be processed in its own transaction.
* If any loader fails, the entire batch will be rolled back.
*
* @param array<string, mixed>|Connection $connection
* @param Loader ...$loaders - Loaders to execute within the transaction
*
* @throws InvalidArgumentException
*/
to_dbal_transaction(Connection|array $connection, Loader $loaders) : TransactionalDbalLoader