Content Streamers#
The information provided on this page explains Content Streamers and their usage within the Squirro platform.
This information is intended for advanced users.
Note
It is not necessary to read this page to ingest data into Squirro.
Overview#
In Squirro, a content streamer is a component that provides the necessary functionality for managing the inputstream
.
For example, it provides the ability to:
Enqueue a new batch (data file) to the
inputstream
.Dequeue a batch from the
inputstream
.Move a processed batch to the
processed
sub-directory.Move a failed batch to the
failed
sub-directory.Postpone the processing of a batch.
The inputstream
is the directory (by default, located at /var/lib/squirro/inputstream
) that stores the data loaded by a Data Loader plugin and is meant to be transformed and indexed into Squirro by a defined data processing pipeline.
Types of Content Streamers#
There are two types of content streamers available:
The
FileSystemStreamer
, which is the default content streamer.The
QueueFileSystemStreamer
, which builds on top of theFileSystemStreamer
streamer by adding the notion of a distinct queue for each data source. This streamer relies on the Redis server of the Squirro installation to do its work.
FileSystemStreamer#
The FileSystemStreamer
works by manipulating the inputstream
directory.
When a service wants to enqueue a new batch of data (for example, the provider
service), it uses the enqueue
method of the FileSystemStreamer
.
This method checks the current date and hour and creates a new data dir
of the form %Y-%m-%d-%H
(e.g., 2023-05-21-18
, which corresponds to May 05 2023 at 18:00
) if it doesn’t exist already.
In this data dir, it adds the batch of data into a new file (called data file
or simply batch
) with a filename that adheres to the following form:
data_project_<project_id>_source_<source_id>_batch_<batch_id>[_priority_<priority_level>].json
Where:
project_id
is the ID of the project to which the data belongs.source_id
is the ID of the data source from which the data was loaded.batch_id
is simply a unique ID to distinguish this batch from other batches of the same data source.priority_level
is the priority level of the data source that this batch of data came from. It is only present if the data source has a priority level different from the default one (which isnormal
).
When a service wants to dequeue a batch from the inputstream
(for example, the ingester
service), it uses the dequeue
method of the FileSystemStreamer
.
Example Usage#
To explain how this method works, consider a simple example.
Assume that the state of the inputstream
directory is as follows:
.
├── 2023-05-21-13
│ ├── data_project_tTSjA3BVRlCFBbCIydh27A_source_Fc723hf2S4Waeo9SVClSDw_batch_YP5Ncdu9QxK8BEgfoZ8msQ.json
│ └── data_project_tTSjA3BVRlCFBbCIydh27A_source_IrbxJh20TJKxoFkyci4N9w_batch_Pa6H0a1uR4SHqvroTgN0pg.json
└── 2023-05-21-14
└── data_project_tTSjA3BVRlCFBbCIydh27A_source_40G0-m0WSnuv4pcm_FfAUA_batch_7T9Nqb-dQcC6QCAfOhWb7g.json
In the above tree
output you can see 3 batches in 2 data dirs (2023-05-21-13, 2023-05-21-14), from 3 data sources (Fc723hf2S4Waeo9SVClSDw, IrbxJh20TJKxoFkyci4N9w, 40G0-m0WSnuv4pcm_FfAUA), with the same priority level (the default one, which is normal; for this reason is missing from the filename), from the same project (tTSjA3BVRlCFBbCIydh27A).
Also, assume a single processor is used for processing the normal
priority batches.
Reference: For information on configuring the processors of the ingester
service, see Pipeline Prioritization.
Therefore, the 3 batches will be processed by the same processor.
Now, when the processor uses the dequeue
method of the FileSystemStreamer
, the following steps take place:
The streamer scans the
inputstream
directory and chooses the oldest data dir first. In this case, the 2023-05-21-13 data dir.Then, the streamer scans the selected data dir for available batches and sorts them by modification date. The oldest batch of the selected data dir is picked up first.
To signal that this batch is now owned by this processor, it renames the data file by appending the ID of the processor in the end of the filename (_processor_<processor_id>). This is the locking mechanism employed by this type of streamer.
If the locking is successful, it extracts some metadata from the batch in order to find the appropriate pipeline workflow for processing the data in this batch.
If the data in the batch are processed successfully through the pipeline steps of the selected workflow, the streamer removes the batch from the inputstream directory.
QueueFileSystemStreamer#
Warning
The QueueFileSystemStreamer
is currently in beta. It is supported and recommended for use on single-node deployments. However, it should not be used on multi-node production deployments, where the FileSystemStreamer
should continue to be used. Please report any issues or requests related to the QueueFileSystemStreamer
.
The QueueFileSystemStreamer
(also known as the queue-based filesystem streamer or QFSS) extends the FileSystemStreamer
by adding a layer where the available batches of each data source are stored in a dedicated queue in Redis. This setup distinguishes the inputstream
into two layers: the message queue layer (Redis) and the pure storage layer (filesystem directory).
Enqueue Method#
When the enqueue
method of the QueueFileSystemStreamer
is called, the logic of the FileSystemStreamer.enqueue
method is invoked first to create the batch in the filesystem (i.e., in the inputstream
directory). Then, having the absolute path to the enqueued batch, it does the following:
Adds the queue of the data source that the batch belongs to in an
index of queues
Redis key (a set) calledqfss:queues_index:<priority>
. This serves as a registry of all the available queues for a given priority level.Pushes the absolute path of the batch in the queue of the data source that the batch belongs to. The name of the queue is of the form
qfss:queue:<priority>_<project_id>_<source_id>
.
Dequeue Method#
When the dequeue
method of the QueueFileSystemStreamer
is called, the following steps occur:
The streamer fetches the current
index of queues
and checks decides from which queue to dequeue from. The decision is based on a simple round-robin algorithm which tries to process a batch from a different source and project every time in order to give a fair chance to all the data sources to be processed.Based on the selected queue, the streamer pops one item from it, which is the absolute path of the batch to be processed. The locking mechanism is essentially handled here by Redis, no other streamer is able to pop the same batch from the queue anymore.
Finally, having the absolute path to the batch, the streamer invokes part of the
FileSystemStreamer.dequeue
logic to actually dequeue the data from the filesystem and give it to the processor to continue with the processing.
Configuration#
The stream_type
configuration option in the section [content]
of the /etc/squirro/common.ini
file, controls which type of content streamer is used by all the services of a Squirro instance.
Note: The default value is filesystem
, which corresponds to the FileSystemStreamer
.
In order to use the QueueFileSystemStreamer
, you need to change the value of this option to filesystem-queue
.
If the [content]
section is not present in the /etc/squirro/common.ini
file, then you are free to add it and define your own value for the stream_type
option.
Example Usage#
The following example shows that the QueueFileSystemStreamer
is used:
[content]
stream_type = filesystem-queue