The information provided on this page explains Content Streamers and their usage within the Squirro platform.
This information is intended for advanced users.
It is not necessary to read this page to ingest data into Squirro.
In Squirro, a content streamer is a component that provides the necessary functionality for managing the
For example, it provides the ability to:
Enqueue a new batch (data file) to the
Dequeue a batch from the
Move a processed batch to the
Move a failed batch to the
Postpone the processing of a batch.
inputstream is the directory (by default, located at
/var/lib/squirro/inputstream) that stores the data loaded by a Data Loader plugin and is meant to be transformed and indexed into Squirro by a defined data processing pipeline.
Types of Content Streamers#
There are two types of content streamers available:
FileSystemStreamer, which is the default content streamer.
QueueFileSystemStreamer, which builds on top of the
FileSystemStreamerstreamer by adding the notion of a distinct queue for each data source. This streamer relies on the Redis server of the Squirro installation to do its work.
FileSystemStreamer works by manipulating the
When a service wants to enqueue a new batch of data (for example, the
provider service), it uses the
enqueue method of the
This method checks the current date and hour and creates a new
data dir of the form
2023-05-21-18, which corresponds to
May 05 2023 at 18:00) if it doesn’t exist already.
In this data dir, it adds the batch of data into a new file (called
data file or simply
batch) with a filename that adheres to the following form:
project_idis the ID of the project to which the data belongs.
source_idis the ID of the data source from which the data was loaded.
batch_idis simply a unique ID to distinguish this batch from other batches of the same data source.
priority_levelis the priority level of the data source that this batch of data came from. It is only present if the data source has a priority level different from the default one (which is
When a service wants to dequeue a batch from the
inputstream (for example, the
ingester service), it uses the
dequeue method of the
To explain how this method works, consider a simple example.
Assume that the state of the
inputstream directory is as follows:
. ├── 2023-05-21-13 │ ├── data_project_tTSjA3BVRlCFBbCIydh27A_source_Fc723hf2S4Waeo9SVClSDw_batch_YP5Ncdu9QxK8BEgfoZ8msQ.json │ └── data_project_tTSjA3BVRlCFBbCIydh27A_source_IrbxJh20TJKxoFkyci4N9w_batch_Pa6H0a1uR4SHqvroTgN0pg.json └── 2023-05-21-14 └── data_project_tTSjA3BVRlCFBbCIydh27A_source_40G0-m0WSnuv4pcm_FfAUA_batch_7T9Nqb-dQcC6QCAfOhWb7g.json
In the above
tree output you can see 3 batches in 2 data dirs (2023-05-21-13, 2023-05-21-14), from 3 data sources (Fc723hf2S4Waeo9SVClSDw, IrbxJh20TJKxoFkyci4N9w, 40G0-m0WSnuv4pcm_FfAUA), with the same priority level (the default one, which is normal; for this reason is missing from the filename), from the same project (tTSjA3BVRlCFBbCIydh27A).
Also, assume a single processor is used for processing the
normal priority batches.
Reference: For information on configuring the processors of the
ingester service, see Pipeline Prioritization.
Therefore, the 3 batches will be processed by the same processor.
Now, when the processor uses the
dequeue method of the
FileSystemStreamer, the following steps take place:
The streamer scans the
inputstreamdirectory and chooses the oldest data dir first. In this case, the 2023-05-21-13 data dir.
Then, the streamer scans the selected data dir for available batches and sorts them by modification date. The oldest batch of the selected data dir is picked up first.
To signal that this batch is now owned by this processor, it renames the data file by appending the ID of the processor in the end of the filename (_processor_<processor_id>). This is the locking mechanism employed by this type of streamer.
If the locking is successful, it extracts some metadata from the batch in order to find the appropriate pipeline workflow for processing the data in this batch.
If the data in the batch are processed successfully through the pipeline steps of the selected workflow, the streamer removes the batch from the inputstream directory.
QueueFileSystemStreamer is in beta and is not recommended for production use. For production instances, continue using the
FileSystemStreamer. For development instances, you are welcome to use it and report any issues you encounter.
QueueFileSystemStreamer extends the
FileSystemStreamer by adding a layer in which the available batches of every data source are stored in a dedicated queue in Redis.
enqueue method of the
QueueFileSystemStreamer is called, the logic of the
FileSystemStreamer.enqueue method is invoked first to create the batch in the filesystem (i.e., in the
inputstream directory). Then, having the absolute path to the enqueued batch, it does the following:
Adds the queue of the data source that the batch belongs to in an
index of queuesRedis key (a set) called
qfss:queues_index:<priority>. This serves as a registry of all the available queues for a given priority level.
Pushes the absolute path of the batch in the queue of the data source that the batch belongs to. The name of the queue is of the form
dequeue method of the
QueueFileSystemStreamer is called, the following steps occur:
The streamer fetches the current
index of queuesand checks decides from which queue to dequeue from. The decision is based on a simple round-robin algorithm which tries to process a batch from a different source and project every time in order to give a fair chance to all the data sources to be processed.
Based on the selected queue, the streamer pops one item from it, which is the absolute path of the batch to be processed. The locking mechanism is essentially handled here by Redis, no other streamer is able to pop the same batch from the queue anymore.
Finally, having the absolute path to the batch, the streamer invokes part of the
FileSystemStreamer.dequeuelogic to actually dequeue the data from the filesystem and give it to the processor to continue with the processing.
stream_type configuration option in the section
[content] of the
/etc/squirro/common.ini file, controls which type of content streamer is used by all the services of a Squirro instance.
Note: The default value is
filesystem, which corresponds to the
In order to use the
QueueFileSystemStreamer, you need to change the value of this option to
[content] section is not present in the
/etc/squirro/common.ini file, then you are free to add it and define your own value for the
The following example shows that the
QueueFileSystemStreamer is used:
[content] stream_type = filesystem-queue