Parse, filter, enrich, and format log lines as a lazy Generator pipeline — you can feed it a multi-gigabyte log file and it only holds a handful of lines in memory at once. Rearrange the stages by reordering the pipeline.
Process a server log file: parse each line into a structured record, drop the noise (health-checks, static-asset requests), enrich with GeoIP, and output a CSV. The file is too big to file_get_contents(). You want one pass, memory-stable.
$lines = function (string $path) {
$fh = fopen($path, 'r');
while (($line = fgets($fh)) !== false) {
yield trim($line);
}
fclose($fh);
};This yields one line at a time. Nothing is materialised.
Every Arrays function in this library accepts an iterable and — for the lazy variants — returns a Generator. Composing them produces one big lazy pipeline.
use PinkCrab\FunctionConstructors\GeneralFunctions as F;
use PinkCrab\FunctionConstructors\Arrays as A;
use PinkCrab\FunctionConstructors\Comparisons as C;
$parse = fn(string $line) => json_decode($line, true) ?? null;
$isNoise = fn(array $r) => in_array($r['path'] ?? '', ['/health', '/metrics'], true)
|| str_starts_with($r['path'] ?? '', '/assets/');
$enrich = fn(array $r) => $r + ['country' => GeoIp::lookup($r['ip'])];
$pipeline = F\compose(
A\map($parse), // string → array|null
A\filter(C\not('is_null')), // drop unparseable lines
A\filter(C\not($isNoise)), // drop health-checks / assets
A\map($enrich)
);foreach ($pipeline($lines('/var/log/access.log')) as $record) {
echo CsvRow::from($record) . "\n";
}Peak memory: roughly one line’s worth of data, regardless of file size.
Processing only events from today, sorted chronologically?
$todaysCutoff = strtotime('tomorrow');
$firstPageOfToday = F\compose(
$pipeline,
A\takeWhile(fn($r) => strtotime($r['time']) < $todaysCutoff),
A\take(100) // only the first 100 of today
);
foreach ($firstPageOfToday($lines('/var/log/access.log')) as $r) {
// ...
}takeWhile stops pulling from the source as soon as it hits tomorrow’s first line. take(100) caps the output. Neither forces a full file scan.
Drop a sideEffect in to count records, send metrics, or sample logs — the pipeline is unchanged:
$meter = F\sideEffect(fn($r) => Metrics::increment('log.processed'));
$observed = F\compose($pipeline, $meter);Because each step is a first-class callable, you can reorder or swap parts freely:
// Cheaper: filter BEFORE enriching so we don't do a GeoIP lookup for noise.
$efficient = F\compose(
A\map($parse),
A\filter(fn($r) => $r !== null),
A\filter(fn($r) => ! $isNoise($r)),
A\map($enrich) // only runs on what made it through
);
// Or: enrich before filtering, if some filters need enriched data.
$alternate = F\compose(
A\map($parse),
A\filter(C\not('is_null')),
A\map($enrich),
A\filter(C\not(F\propertyEquals('country', 'RU')))
);Same steps, different orders, different tradeoffs — no code duplication, no rewrite.