Streaming log processing

Parse, filter, enrich, and format log lines as a lazy Generator pipeline — you can feed it a multi-gigabyte log file and it only holds a handful of lines in memory at once. Rearrange the stages by reordering the pipeline.

Functions used

The scenario

Process a server log file: parse each line into a structured record, drop the noise (health-checks, static-asset requests), enrich with GeoIP, and output a CSV. The file is too big to file_get_contents(). You want one pass, memory-stable.

A Generator source

$lines = function (string $path) {
    $fh = fopen($path, 'r');
    while (($line = fgets($fh)) !== false) {
        yield trim($line);
    }
    fclose($fh);
};

This yields one line at a time. Nothing is materialised.

Build the pipeline

Every Arrays function in this library accepts an iterable and — for the lazy variants — returns a Generator. Composing them produces one big lazy pipeline.

use PinkCrab\FunctionConstructors\GeneralFunctions as F;
use PinkCrab\FunctionConstructors\Arrays as A;
use PinkCrab\FunctionConstructors\Comparisons as C;

$parse = fn(string $line) => json_decode($line, true) ?? null;

$isNoise = fn(array $r) => in_array($r['path'] ?? '', ['/health', '/metrics'], true)
                       || str_starts_with($r['path'] ?? '', '/assets/');

$enrich = fn(array $r) => $r + ['country' => GeoIp::lookup($r['ip'])];

$pipeline = F\compose(
    A\map($parse),                 // string → array|null
    A\filter(C\not('is_null')),    // drop unparseable lines
    A\filter(C\not($isNoise)),     // drop health-checks / assets
    A\map($enrich)
);

Consume — one line at a time

foreach ($pipeline($lines('/var/log/access.log')) as $record) {
    echo CsvRow::from($record) . "\n";
}

Peak memory: roughly one line’s worth of data, regardless of file size.

Stop early — take elements while a condition holds

Processing only events from today, sorted chronologically?

$todaysCutoff = strtotime('tomorrow');

$firstPageOfToday = F\compose(
    $pipeline,
    A\takeWhile(fn($r) => strtotime($r['time']) < $todaysCutoff),
    A\take(100)                // only the first 100 of today
);

foreach ($firstPageOfToday($lines('/var/log/access.log')) as $r) {
    // ...
}

takeWhile stops pulling from the source as soon as it hits tomorrow’s first line. take(100) caps the output. Neither forces a full file scan.

Sprinkle observability without breaking the flow

Drop a sideEffect in to count records, send metrics, or sample logs — the pipeline is unchanged:

$meter = F\sideEffect(fn($r) => Metrics::increment('log.processed'));

$observed = F\compose($pipeline, $meter);

Rearranging the flow

Because each step is a first-class callable, you can reorder or swap parts freely:

// Cheaper: filter BEFORE enriching so we don't do a GeoIP lookup for noise.
$efficient = F\compose(
    A\map($parse),
    A\filter(fn($r) => $r !== null),
    A\filter(fn($r) => ! $isNoise($r)),
    A\map($enrich)              // only runs on what made it through
);

// Or: enrich before filtering, if some filters need enriched data.
$alternate = F\compose(
    A\map($parse),
    A\filter(C\not('is_null')),
    A\map($enrich),
    A\filter(C\not(F\propertyEquals('country', 'RU')))
);

Same steps, different orders, different tradeoffs — no code duplication, no rewrite.