flow php

UNIFIED DATA PROCESSING FRAMEWORK

composer require flow-php/etl ^0.10.0

Changelog

elephant
extract

Extracts

Read from various data sources.

arrow
transform

Transforms

Shape and optimize for your needs.

arrow
load

Loads

Store and secure in one of many available data sinks.

DSL Functions


DSL stands for Domain Specific Language. In the case of Flow, the DSL is used to define simple functions that can be used to transform data. Most of those functions are initializing a new instance of a class under the hood since Flow is fully object-oriented. Please look at the examples below to get a better understanding of how to use the DSL functions.

SCHEMA


array_schema(string $name, bool $empty, bool $nullable, ?Metadata $metadata) : Definition
/**
 * @param class-string<\UnitEnum> $type
 */
enum_schema(string $name, string $type, bool $nullable, ?Metadata $metadata) : Definition

TYPE


/**
 * @param array<StructureElement> $elements
 */
structure_type(array $elements, bool $nullable) : StructureType
/**
 * @param array<StructureElement> $elements
 */
struct_type(array $elements, bool $nullable) : StructureType
/**
 * @param class-string<\UnitEnum> $class
 */
type_enum(string $class, bool $nullable) : EnumType
type_map(ScalarType $key_type, Type $value_type, bool $nullable) : MapType
/**
 * @param class-string $class
 */
type_object(string $class, bool $nullable) : ObjectType
/**
 * @param array<StructureElement> $elements
 */
type_structure(array $elements, bool $nullable) : StructureType

ENTRY


/**
 * @param array<mixed> $data
 */
array_entry(string $array, ?array $data) : ArrayEntry
/**
 * @throws InvalidArgumentException
 */
json_object_entry(string $name, array|string|null $data) : JsonEntry

DATA_FRAME


/**
 * @param array<array<mixed>>|array<mixed|string> $data
 * @param array<Partition>|Partitions $partitions
 */
array_to_row(array $data, EntryFactory $entryFactory, Partitions|array $partitions, ?Schema $schema) : Row
/**
 * @param array<array<mixed>>|array<mixed|string> $data
 * @param array<Partition>|Partitions $partitions
 */
array_to_rows(array $data, EntryFactory $entryFactory, Partitions|array $partitions, ?Schema $schema) : Rows
/**
 * An alias for `ref`.
 */
col(string $entry) : EntryReference
/**
 * Alias for data_frame() : Flow.
 */
df(Config|ConfigBuilder|null $config) : Flow
filesystem_cache(Path|string|null $cache_dir, Filesystem $filesystem, Serializer $serializer) : FilesystemCache
/**
 * @param array<string|Type> $types
 * @param mixed $value
 */
is_type(array $types, ?mixed $value) : bool
join_on(Comparison|array $comparisons, string $join_prefix) : Expression

EXTRACTOR


/**
 * @param int<1, max> $chunk_size
 */
chunks_from(Extractor $extractor, int $chunk_size) : ChunkExtractor
/**
 * @param iterable $array
 * @param null|Schema $schema - @deprecated use withSchema() method instead
 */
from_array(iterable $array, ?Schema $schema) : ArrayExtractor
/**
 * @param string $id - cache id from which data will be extracted
 * @param null|Extractor $fallback_extractor - extractor that will be used when cache is empty - @deprecated use withFallbackExtractor() method instead
 * @param bool $clear - clear cache after extraction - @deprecated use withClearOnFinish() method instead
 */
from_cache(string $id, ?Extractor $fallback_extractor, bool $clear) : CacheExtractor
from_sequence_date_period(string $entry_name, DateTimeInterface $start, DateInterval $interval, DateTimeInterface $end, int $options) : SequenceExtractor
from_sequence_number(string $entry_name, string|int|float $start, string|int|float $end, int|float $step) : SequenceExtractor

AGGREGATING_FUNCTION


SCALAR_FUNCTION


/**
 * Expands each value into entry, if there are more than one value, multiple rows will be created.
 * Array keys are ignored, only values are used to create new rows.
 *
 * Before:
 *   +--+-------------------+
 *   |id|              array|
 *   +--+-------------------+
 *   | 1|{"a":1,"b":2,"c":3}|
 *   +--+-------------------+
 *
 * After:
 *   +--+--------+
 *   |id|expanded|
 *   +--+--------+
 *   | 1|       1|
 *   | 1|       2|
 *   | 1|       3|
 *   +--+--------+
 */
array_expand(ScalarFunction $function, ArrayExpand $expand) : ArrayExpand
array_key_rename(ScalarFunction $ref, ScalarFunction|string $path, ScalarFunction|string $newName) : ArrayKeyRename
array_reverse(ScalarFunction|array $function, ScalarFunction|bool $preserveKeys) : ArrayReverse
array_sort(ScalarFunction $function, ScalarFunction|Sort|null $sort_function, ScalarFunction|int|null $flags, ScalarFunction|bool $recursive) : ArraySort
/**
 * Unpacks each element of an array into a new entry, using the array key as the entry name.
 *
 * Before:
 * +--+-------------------+
 * |id|              array|
 * +--+-------------------+
 * | 1|{"a":1,"b":2,"c":3}|
 * | 2|{"d":4,"e":5,"f":6}|
 * +--+-------------------+
 *
 * After:
 * +--+-----+-----+-----+-----+-----+
 * |id|arr.b|arr.c|arr.d|arr.e|arr.f|
 * +--+-----+-----+-----+-----+-----+
 * | 1|    2|    3|     |     |     |
 * | 2|     |     |    4|    5|    6|
 * +--+-----+-----+-----+-----+-----+
 */
array_unpack(ScalarFunction|array $array, ScalarFunction|array $skip_keys, ScalarFunction|string|null $entry_prefix) : ArrayUnpack
between(?mixed $value, ?mixed $lower_bound, ?mixed $upper_bound, ScalarFunction|Boundary $boundary) : Between
/**
 * @param array<mixed> $params
 */
call_method(object $object, ScalarFunction|string $method, array $params) : CallMethod
cast(?mixed $value, ScalarFunction|Type|string $type) : Cast
combine(ScalarFunction|array $keys, ScalarFunction|array $values) : Combine
/**
 * An alias for `ref`.
 */
entry(string $entry) : EntryReference
number_format(ScalarFunction|int|float $value, ScalarFunction|int $decimals, ScalarFunction|string $decimal_separator, ScalarFunction|string $thousands_separator) : NumberFormat
regex(ScalarFunction|string $pattern, ScalarFunction|string $subject, ScalarFunction|int $flags, ScalarFunction|int $offset) : Regex
regex_all(ScalarFunction|string $pattern, ScalarFunction|string $subject, ScalarFunction|int $flags, ScalarFunction|int $offset) : RegexAll
regex_match(ScalarFunction|string $pattern, ScalarFunction|string $subject, ScalarFunction|int $flags, ScalarFunction|int $offset) : RegexMatch
regex_match_all(ScalarFunction|string $pattern, ScalarFunction|string $subject, ScalarFunction|int $flags, ScalarFunction|int $offset) : RegexMatchAll
regex_replace(ScalarFunction|string $pattern, ScalarFunction|string $replacement, ScalarFunction|string $subject, ScalarFunction|int|null $limit) : RegexReplace
round(ScalarFunction|int|float $value, ScalarFunction|int $precision, ScalarFunction|int $mode) : Round
sanitize(ScalarFunction|string $value, ScalarFunction|string $placeholder, ScalarFunction|int|null $skipCharacters) : Sanitize
split(ScalarFunction|string $value, ScalarFunction|string $separator, ScalarFunction|int $limit) : Split
sprintf(ScalarFunction|string $format, ScalarFunction|string|int|float|null $args) : Sprintf
to_date(?mixed $ref, ScalarFunction|string $format, ScalarFunction|DateTimeZone $timeZone) : ToDate
to_date_time(?mixed $ref, ScalarFunction|string $format, ScalarFunction|DateTimeZone $timeZone) : ToDateTime
to_timezone(ScalarFunction|DateTimeInterface $value, ScalarFunction|DateTimeZone|string $timeZone) : ToTimeZone
when(?mixed $condition, ?mixed $then, ?mixed $else) : When

WINDOW_FUNCTION


COMPARISON


equal(Reference|string $left, Reference|string $right) : Equal

LOADER


/**
 * Convert rows to an array and store them in passed array variable.
 *
 * @param-out array<array<mixed>> $array
 */
to_array(array $array) : ArrayLoader
to_output(int|bool $truncate, Output $output, Formatter $formatter, SchemaFormatter $schemaFormatter) : StreamLoader
to_stderr(int|bool $truncate, Output $output, Formatter $formatter, SchemaFormatter $schemaFormatter) : StreamLoader
to_stdout(int|bool $truncate, Output $output, Formatter $formatter, SchemaFormatter $schemaFormatter) : StreamLoader
to_stream(string $uri, int|bool $truncate, Output $output, string $mode, Formatter $formatter, SchemaFormatter $schemaFormatter) : StreamLoader

Contributors

Join us on GitHub external resource
scroll back to top