Flow PHP

DataFrame

FinalYes

Methods

__construct()  : mixed
aggregate()  : self
autoCast()  : self
batchSize()  : self
Merge/Split Rows yielded by Extractor into batches of given size.
cache()  : self
Start processing rows up to this moment and put each instance of Rows into previously defined cache.
collect()  : self
Before transforming rows, collect them and merge into single Rows instance.
collectRefs()  : self
This method allows to collect references to all entries used in this pipeline.
constrain()  : self
count()  : int
crossJoin()  : self
display()  : string
drop()  : self
Drop given entries.
dropDuplicates()  : $this
dropPartitions()  : self
Drop all partitions from Rows, additionally when $dropPartitionColumns is set to true, partition columns are also removed.
duplicateRow()  : self
fetch()  : Rows
Be aware that fetch is not memory safe and will load all rows into memory.
filter()  : self
filterPartitions()  : self
filters()  : self
forEach()  : void
get()  : Generator<string|int, Rows>
Yields each row as an instance of Rows.
getAsArray()  : Generator<string|int, array<string|int, array<string|int, mixed>>>
Yields each row as an array.
getEach()  : Generator<string|int, Row>
Yield each row as an instance of Row.
getEachAsArray()  : Generator<string|int, array<string|int, mixed>>
Yield each row as an array.
groupBy()  : GroupedDataFrame
join()  : self
joinEach()  : self
limit()  : self
load()  : self
map()  : self
match()  : self
mode()  : $this
SaveMode defines how Flow should behave when writing to a file/files that already exists.
onError()  : self
partitionBy()  : self
pivot()  : self
printRows()  : void
printSchema()  : void
rename()  : self
renameAll()  : self
renameAllLowerCase()  : self
renameAllStyle()  : self
renameAllUpperCase()  : self
renameAllUpperCaseFirst()  : self
renameAllUpperCaseWord()  : self
reorderEntries()  : self
rows()  : self
run()  : mixed
saveMode()  : self
Alias for DataFrame::mode.
schema()  : Schema
select()  : self
sortBy()  : self
transform()  : self
Alias for DataFrame::with().
until()  : self
The difference between filter and until is that filter will keep filtering rows until extractors finish yielding rows.
validate()  : self
void()  : self
with()  : self
withEntries()  : self
withEntry()  : self
write()  : self

Methods

autoCast()

public autoCast() : self
Return values
self

batchSize()

Merge/Split Rows yielded by Extractor into batches of given size.

public batchSize(int<1, max> $size) : self

For example, when Extractor is yielding one row at time, this method will merge them into batches of given size before passing them to the next pipeline element. Similarly when Extractor is yielding batches of rows, this method will split them into smaller batches of given size.

In order to merge all Rows into a single batch use DataFrame::collect() method or set size to -1 or 0.

Parameters
$size : int<1, max>
Tags
lazy
Return values
self

cache()

Start processing rows up to this moment and put each instance of Rows into previously defined cache.

public cache([null|string $id = null ][, int|null $cacheBatchSize = null ]) : self

Cache type can be set through ConfigBuilder. By default everything is cached in system tmp dir.

Important: cache batch size might significantly improve performance when processing large amount of rows. Larger batch size will increase memory consumption but will reduce number of IO operations. When not set, the batch size is taken from the last DataFrame::batchSize() call.

Parameters
$id : null|string = null
$cacheBatchSize : int|null = null
Tags
lazy
throws
InvalidArgumentException
Return values
self

collect()

Before transforming rows, collect them and merge into single Rows instance.

public collect() : self

This might lead to memory issues when processing large amount of rows, use with caution.

Tags
lazy
Return values
self

collectRefs()

This method allows to collect references to all entries used in this pipeline.

public collectRefs(References $references) : self
(new Flow())
  ->read(From::chain())
  ->collectRefs($refs = refs())
  ->run();
Parameters
$references : References
Tags
lazy
Return values
self

count()

public count() : int
Tags
trigger

Return total count of rows processed by this pipeline.

Return values
int

crossJoin()

public crossJoin(self $dataFrame[, string $prefix = '' ]) : self
Parameters
$dataFrame : self
$prefix : string = ''
Tags
lazy
Return values
self

display()

public display([int $limit = 20 ][, bool|int $truncate = 20 ][, Formatter $formatter = new AsciiTableFormatter() ]) : string
Parameters
$limit : int = 20

maximum numbers of rows to display

$truncate : bool|int = 20

false or if set to 0 columns are not truncated, otherwise default truncate to 20 characters

$formatter : Formatter = new AsciiTableFormatter()
Tags
trigger
throws
InvalidArgumentException
Return values
string

drop()

Drop given entries.

public drop(string|Reference ...$entries) : self
Parameters
$entries : string|Reference
Tags
lazy
Return values
self

dropDuplicates()

public dropDuplicates(Reference|string ...$entries) : $this
Parameters
$entries : Reference|string
Tags
lazy
Return values
$this

dropPartitions()

Drop all partitions from Rows, additionally when $dropPartitionColumns is set to true, partition columns are also removed.

public dropPartitions([bool $dropPartitionColumns = false ]) : self
Parameters
$dropPartitionColumns : bool = false
Tags
lazy
Return values
self

duplicateRow()

public duplicateRow(mixed $condition, WithEntry ...$entries) : self
Parameters
$condition : mixed
$entries : WithEntry
Return values
self

fetch()

Be aware that fetch is not memory safe and will load all rows into memory.

public fetch([int|null $limit = null ]) : Rows

If you want to safely iterate over Rows use oe of the following methods:.

DataFrame::get() : \Generator DataFrame::getAsArray() : \Generator DataFrame::getEach() : \Generator DataFrame::getEachAsArray() : \Generator

Parameters
$limit : int|null = null
Tags
trigger
throws
InvalidArgumentException
Return values
Rows

forEach()

public forEach([null|callable(Rows $rows): void $callback = null ]) : void
Parameters
$callback : null|callable(Rows $rows): void = null
Tags
trigger

get()

Yields each row as an instance of Rows.

public get() : Generator<string|int, Rows>
Tags
trigger
Return values
Generator<string|int, Rows>

getAsArray()

Yields each row as an array.

public getAsArray() : Generator<string|int, array<string|int, array<string|int, mixed>>>
Tags
trigger
Return values
Generator<string|int, array<string|int, array<string|int, mixed>>>

getEach()

Yield each row as an instance of Row.

public getEach() : Generator<string|int, Row>
Tags
trigger
Return values
Generator<string|int, Row>

getEachAsArray()

Yield each row as an array.

public getEachAsArray() : Generator<string|int, array<string|int, mixed>>
Tags
trigger
Return values
Generator<string|int, array<string|int, mixed>>

join()

public join(self $dataFrame, Expression $on[, string|Join $type = Join::left ]) : self
Parameters
$dataFrame : self
$on : Expression
$type : string|Join = Join::left
Tags
lazy
Return values
self

map()

public map(callable(Row $row): Row $callback) : self
Parameters
$callback : callable(Row $row): Row
Tags
lazy
Return values
self

mode()

SaveMode defines how Flow should behave when writing to a file/files that already exists.

public mode(SaveMode $mode) : $this

For more details please see SaveMode enum.

Parameters
$mode : SaveMode
Tags
lazy
Return values
$this

printRows()

public printRows([int|null $limit = 20 ][, int|bool $truncate = 20 ][, Formatter $formatter = new AsciiTableFormatter() ]) : void
Parameters
$limit : int|null = 20
$truncate : int|bool = 20
$formatter : Formatter = new AsciiTableFormatter()
Tags
trigger

printSchema()

public printSchema([int|null $limit = 20 ][, SchemaFormatter $formatter = new ASCIISchemaFormatter() ]) : void
Parameters
$limit : int|null = 20
$formatter : SchemaFormatter = new ASCIISchemaFormatter()
Tags
trigger

rename()

public rename(string $from, string $to) : self
Parameters
$from : string
$to : string
Tags
lazy
Return values
self

renameAll()

public renameAll(string $search, string $replace) : self
Parameters
$search : string
$replace : string
Tags
lazy

Iterate over all entry names and replace given search string with replace string.

Return values
self

renameAllLowerCase()

public renameAllLowerCase() : self
Tags
lazy
Return values
self

renameAllStyle()

public renameAllStyle(StringStyles|string $style) : self
Parameters
$style : StringStyles|string
Tags
lazy

Rename all entries to given style. Please look into \Flow\ETL\Function\StyleConverter\StringStyles class for all available styles.

Return values
self

renameAllUpperCase()

public renameAllUpperCase() : self
Tags
lazy
Return values
self

renameAllUpperCaseFirst()

public renameAllUpperCaseFirst() : self
Tags
lazy
Return values
self

renameAllUpperCaseWord()

public renameAllUpperCaseWord() : self
Tags
lazy
Return values
self

reorderEntries()

public reorderEntries([Comparator $comparator = new TypeComparator() ]) : self
Parameters
$comparator : Comparator = new TypeComparator()
Return values
self

run()

public run([null|callable(Rows $rows, FlowContext $context): void $callback = null ][, Analyze|bool $analyze = false ]) : mixed
Parameters
$callback : null|callable(Rows $rows, FlowContext $context): void = null
$analyze : Analyze|bool = false
  • when set run will return Report
Tags
trigger

When analyzing pipeline execution we can chose to collect various metrics through analyze()->with*() method

  • column statistics - analyze()->withColumnStatistics()
  • schema - analyze()->withSchema()

saveMode()

Alias for DataFrame::mode.

public saveMode(SaveMode $mode) : self
Parameters
$mode : SaveMode
Tags
lazy
Return values
self

select()

public select(string|Reference ...$entries) : self
Parameters
$entries : string|Reference
Tags
lazy

Keep only given entries.

Return values
self

until()

The difference between filter and until is that filter will keep filtering rows until extractors finish yielding rows.

public until(ScalarFunction $function) : self

Until will send a STOP signal to the Extractor when the condition is not met.

Parameters
$function : ScalarFunction
Tags
lazy
Return values
self

validate()

public validate(Schema $schema[, null|SchemaValidator $validator = null ]) : self

Please use DataFrame::match instead

Parameters
$schema : Schema
$validator : null|SchemaValidator = null
  • when null, StrictValidator gets initialized
Tags
lazy
Return values
self

void()

public void() : self
Tags
lazy

This method is useful mostly in development when you want to pause processing at certain moment without removing code. All operations will get processed up to this point, from here no rows are passed forward.

Return values
self

write()

public write(Loader $loader) : self
Parameters
$loader : Loader
Tags
lazy

Alias for ETL::load function.

Return values
self

        
On this page

Search results