Content Streamers#

The information provided on this page explains Content Streamers and their usage within the Squirro platform.

This information is intended for advanced users.

Note

It is not necessary to read this page to ingest data into Squirro.

Overview#

In Squirro, a content streamer is a component that provides the necessary functionality for managing the inputstream.

For example, it provides the ability to:

  • Enqueue a new batch (data file) to the inputstream.

  • Dequeue a batch from the inputstream.

  • Move a processed batch to the processed sub-directory.

  • Move a failed batch to the failed sub-directory.

  • Postpone the processing of a batch.

The inputstream is the directory (by default, located at /var/lib/squirro/inputstream) that stores the data loaded by a Data Loader plugin and is meant to be transformed and indexed into Squirro by a defined data processing pipeline.

Types of Content Streamers#

There are two types of content streamers available:

  • The FileSystemStreamer, which is the default content streamer.

  • The QueueFileSystemStreamer, which builds on top of the FileSystemStreamer streamer by adding the notion of a distinct queue for each data source. This streamer relies on the Redis server of the Squirro installation to do its work.

FileSystemStreamer#

The FileSystemStreamer works by manipulating the inputstream directory.

When a service wants to enqueue a new batch of data (for example, the provider service), it uses the enqueue method of the FileSystemStreamer.

This method checks the current date and hour and creates a new data dir of the form %Y-%m-%d-%H (e.g., 2023-05-21-18, which corresponds to May 05 2023 at 18:00) if it doesn’t exist already.

In this data dir, it adds the batch of data into a new file (called data file or simply batch) with a filename that adheres to the following form:

data_project_<project_id>_source_<source_id>_batch_<batch_id>[_priority_<priority_level>].json

Where:

  • project_id is the ID of the project to which the data belongs.

  • source_id is the ID of the data source from which the data was loaded.

  • batch_id is simply a unique ID to distinguish this batch from other batches of the same data source.

  • priority_level is the priority level of the data source that this batch of data came from. It is only present if the data source has a priority level different from the default one (which is normal).

When a service wants to dequeue a batch from the inputstream (for example, the ingester service), it uses the dequeue method of the FileSystemStreamer.

Example Usage#

To explain how this method works, consider a simple example.

Assume that the state of the inputstream directory is as follows:

.
├── 2023-05-21-13
│   ├── data_project_tTSjA3BVRlCFBbCIydh27A_source_Fc723hf2S4Waeo9SVClSDw_batch_YP5Ncdu9QxK8BEgfoZ8msQ.json
│   └── data_project_tTSjA3BVRlCFBbCIydh27A_source_IrbxJh20TJKxoFkyci4N9w_batch_Pa6H0a1uR4SHqvroTgN0pg.json
└── 2023-05-21-14
    └── data_project_tTSjA3BVRlCFBbCIydh27A_source_40G0-m0WSnuv4pcm_FfAUA_batch_7T9Nqb-dQcC6QCAfOhWb7g.json

In the above tree output you can see 3 batches in 2 data dirs (2023-05-21-13, 2023-05-21-14), from 3 data sources (Fc723hf2S4Waeo9SVClSDw, IrbxJh20TJKxoFkyci4N9w, 40G0-m0WSnuv4pcm_FfAUA), with the same priority level (the default one, which is normal; for this reason is missing from the filename), from the same project (tTSjA3BVRlCFBbCIydh27A).

Also, assume a single processor is used for processing the normal priority batches.

Reference: For information on configuring the processors of the ingester service, see Pipeline Prioritization.

Therefore, the 3 batches will be processed by the same processor.

Now, when the processor uses the dequeue method of the FileSystemStreamer, the following steps take place:

  1. The streamer scans the inputstream directory and chooses the oldest data dir first. In this case, the 2023-05-21-13 data dir.

  2. Then, the streamer scans the selected data dir for available batches and sorts them by modification date. The oldest batch of the selected data dir is picked up first.

  3. To signal that this batch is now owned by this processor, it renames the data file by appending the ID of the processor in the end of the filename (_processor_<processor_id>). This is the locking mechanism employed by this type of streamer.

  4. If the locking is successful, it extracts some metadata from the batch in order to find the appropriate pipeline workflow for processing the data in this batch.

  5. If the data in the batch are processed successfully through the pipeline steps of the selected workflow, the streamer removes the batch from the inputstream directory.

QueueFileSystemStreamer#

Warning

The QueueFileSystemStreamer is in beta and is not recommended for production use. For production instances, continue using the FileSystemStreamer. For development instances, you are welcome to use it and report any issues you encounter.

The QueueFileSystemStreamer extends the FileSystemStreamer by adding a layer in which the available batches of every data source are stored in a dedicated queue in Redis.

Enqueue Method#

When the enqueue method of the QueueFileSystemStreamer is called, the logic of the FileSystemStreamer.enqueue method is invoked first to create the batch in the filesystem (i.e., in the inputstream directory). Then, having the absolute path to the enqueued batch, it does the following:

  1. Adds the queue of the data source that the batch belongs to in an index of queues Redis key (a set) called qfss:queues_index:<priority>. This serves as a registry of all the available queues for a given priority level.

  2. Pushes the absolute path of the batch in the queue of the data source that the batch belongs to. The name of the queue is of the form qfss:queue:<priority>_<project_id>_<source_id>.

Dequeue Method#

When the dequeue method of the QueueFileSystemStreamer is called, the following steps occur:

  1. The streamer fetches the current index of queues and checks decides from which queue to dequeue from. The decision is based on a simple round-robin algorithm which tries to process a batch from a different source and project every time in order to give a fair chance to all the data sources to be processed.

  2. Based on the selected queue, the streamer pops one item from it, which is the absolute path of the batch to be processed. The locking mechanism is essentially handled here by Redis, no other streamer is able to pop the same batch from the queue anymore.

  3. Finally, having the absolute path to the batch, the streamer invokes part of the FileSystemStreamer.dequeue logic to actually dequeue the data from the filesystem and give it to the processor to continue with the processing.

Configuration#

The stream_type configuration option in the section [content] of the /etc/squirro/common.ini file, controls which type of content streamer is used by all the services of a Squirro instance.

Note: The default value is filesystem, which corresponds to the FileSystemStreamer.

In order to use the QueueFileSystemStreamer, you need to change the value of this option to filesystem-queue.

If the [content] section is not present in the /etc/squirro/common.ini file, then you are free to add it and define your own value for the stream_type option.

Example Usage#

The following example shows that the QueueFileSystemStreamer is used:

[content]
stream_type = filesystem-queue