Stream processing

In some cases, you may need to perform certain data aggregations from streams of data to reduce incoming data traffic or to derive insights at the edge. Use cases of stream processing include:

  • Group all incoming Nginx logs by HTTP code.
  • Snapshot surrounding messages when an error is found.
  • Calculate the average, maximum, and minimum of a response time from a log message.

Define a streams file

By default, Calyptia Core automatically configures a stream file for every pipeline. You need only define new stream tasks on top of incoming data. For a full list of directives and capabilities of the stream processor, see the Fluent Bit documentation (opens in a new tab)

The following stream file counts the number of records in a five-second period:

streams

[STREAM_TASK]
    Name count_5_second
    Exec CREATE STREAM count5 WITH (tag='count') AS SELECT COUNT(*) FROM TAG:'foobar' WINDOW TUMBLING (5 SECOND);

You can now add this file as before, for the Lua script:

calyptia create pipeline_file --file streams --pipeline $PIPELINE