standard types

Functions

make_step

squirro.lib.nlp.steps.make_step(config)

Make a step

Parameters

config (dict) – Dictionary config

Returns

Step object

Return type

Step

Classes

base

Step base class

class squirro.lib.nlp.steps.base.Step(config)

The Step class represents single operations that can be performed on each Document in a Pipeline stream.

In the config the class of a step is specified by a step parameter, and the type is specified via type. Each step will then have additional parameters (some with default values), which are documented in their corresponding section.

It is also possible to create your own custom step by writing a class that inherits from Step and specifying the step as ‘custom’.

NOTE: [Optional] Receive a reference to the executing pipeline runner. Each step gets access to shared models kept in memory (managed via pipeline).

NOTE: For all steps (including custom steps), the input config JSON is validated against what appears in the `Parameters` section of the step class docstring. The specification format is as follows:

Parameters:
    VARIABLE_NAME (SCHEMA, OPTIONAL_DEFAULT): DESCRIPTION

For example, for the base Step class we specify:

Parameters:
    name (str, None): Name of step (defaults to value of `type`)
    path (str): Path to step storage
    step (str): Class of step
    type (str): Type of step
    modes (list, ['train', 'process']): Modes for which step will be run
    handle_skipped (bool, False): Execute actions for skipped items
    cache_document (bool, False): Whether to cache document
    document_cache_expiration (int, 300): Time in seconds after cached document expires
    model_cache_expiration (int, 3600): Time in seconds after cached model expires
    _pipeline (weakref.ReferenceType, None): Reference to the executing pipeline

This is reflected in the documentation below:

Parameters
  • name (str, None) – Name of step (defaults to value of type)

  • path (str) – Path to step storage

  • step (str) – Class of step

  • type (str) – Type of step

  • modes (list, ['train', 'process']) – Modes for which step will be run

  • handle_skipped (bool, False) – Execute actions for skipped items

  • cache_document (bool, False) – Whether to cache document

  • document_cache_expiration (int, 300) – Time in seconds after cached document expires

  • model_cache_expiration (int, 3600) – Time in seconds after cached model expires

  • _pipeline (weakref.ReferenceType, None) – Reference to the executing pipeline

Example:

from squirro.lib.nlp.steps import make_step

csv_loader_step = make_step({
  "step": "loader",
  "type": "csv",
  "fields": ["sepal length", "sepal width", "petal length", "class"]
})
property model_cache

Cache client to enable model sharing across whole process.

Return type

ModelCacheClient

Returns

Model cache

property inmem_models

Deprecated property, use the model_cache instead. Backwards compatible property name :rtype: InMemModel :return: Model cache

property in_mem_models

Deprecated property, use the model_cache instead. :rtype: InMemModel :return: Model cache

property pipeline

Reference to parent pipeline-runner.

Return type

Optional[Pipeline]

save()

Save a step

load()

Load a step

clean()

Clean step

terminate()

Terminate step

process(docs)

Process a set of documents

Parameters

docs (generator(Document)) – Generator of documents

Returns

Generator of processed documents

Return type

generator(Document)

train(docs)

Train on a step of a set of documents

Parameters

docs (generator(Document)) – Generator of documents

Returns

Generator of processed documents

Return type

generator(Document)

filter_skipped(docs, skipped=None)

Helper to return all documents that have to be processed.

By default, this filters out all documents that are skipped. But if handle_skipped is set, then all documents are returned, both skipped and not.

Parameters
  • docs (generator(Document)) – Generator of documents

  • skipped (list, None) – An optional list where skipped items are pushed. Used when skipped items still need to be returned separately.

Returns

Generator of processed documents

Return type

list(Document)

batched_step

Batched step base class

class squirro.lib.nlp.steps.batched_step.BatchedStep(config)

Bases: squirro.lib.nlp.steps.base.Step

Process an in-memory batch of documents

This is an extension of the standard class:.Step class, which allows executing batches of Document in a Pipeline

Note - this is a generic base step.

Parameters
  • batch_size (int, 128) – Batch size

  • use_batches (bool, True) – yield the data in #ds/batch_size groups size

  • struct_log_enable (bool, False) – Whether to log in structured way

  • struct_log_name (str, "step-debug") – Name of the struct logger

  • struct_log_input_step_fields (list, None) – Which fields log before step processing

  • struct_log_output_step_fields (list, None) – Which fields log after step processing

clean()

Clean step

process(docs)

Process a set of documents

Parameters

docs (generator(Document)) – Generator of documents

Returns

Generator of processed documents

Return type

generator(Document)

train(docs)

Train on a step of a set of documents

Parameters

docs (generator(Document)) – Generator of documents

Returns

Generator of processed documents

Return type

generator(Document)

process_batch(batch)

Process a batch of documents. If not defined will default to using self.process_doc for each document in the batch.

Parameters

batch (list(Document)) – List of documents

Returns

List of processed documents

Return type

list(Document)

train_batch(batch)

Train on a batch of documents

Parameters

batch (list(Document)) – List of documents

process_doc(doc)

Process a document

Parameters

doc (Document) – Document

Returns

Processed document

Return type

Document

balancer_step

Balancer step class

class squirro.lib.nlp.steps.balancer_step.BalancerStep(config)

Bases: squirro.lib.nlp.steps.base.Step

It creates a data set with equal amount of elements per each class

Note - the balancing gets only applied with in a batch if batching is not switched off.

Input - the class field needs to be of type str.

Output - the output field is filled with data of type str.

Parameters
  • type (str) – balancer

  • classes (list) – List of the classes which get used in classifier

  • class_field (str) – Field name in which the classes are

  • output_label_field (str) – Field name in which the balanced label are stored

  • not_class (bool,True) – Is a class which includes all neg/opposite elements needed?

  • deviation (int,0.02) – Max deviation of the size of the classes (1. = 100%, 0.0 = 0%)

  • seed (int,7400) – Seed for randomization process

Example

{
    "name": "balancer",
    "step": "balancer",
    "type": "balancer",
    "classes": ["classA","classB"],
    "class_field": "labels",
    "output_label_field": "balanced_labels",
    "not_class": false
}
process(docs)

Process a set of documents

Parameters

docs (generator(Document)) – Generator of documents

Returns

Generator of processed documents

Return type

generator(Document)

checkpoint_step

Checkpoint step class

class squirro.lib.nlp.steps.checkpoint_step.CheckpointStep(config)

Bases: squirro.lib.nlp.steps.base.Step

Checkpoint Document to disk to provide a repeatable iterable.

Parameters
  • type (str) – checkpoint

  • batch_size (int, 128) – Size of checkpoint batches

  • checkpoint_processing (bool, False) – Whether or not to do checkpoint processing in addition to training

  • do_randomize (bool, False) – Whether or not to randomize order

Example

{
    "name": "checkpoint",
    "step": "checkpoint",
    "type": "checkpoint"
}
clean()

Clean step

process(docs)

Process a set of documents

Parameters

docs (generator(Document)) – Generator of documents

Returns

Generator of processed documents

Return type

generator(Document)

train(docs)

Train on a step of a set of documents

Parameters

docs (generator(Document)) – Generator of documents

Returns

Generator of processed documents

Return type

generator(Document)

endpoint_step

Endpoint step class

class squirro.lib.nlp.steps.endpoint_step.EndpointStep(config)

Bases: squirro.lib.nlp.steps.batched_step.BatchedStep

Step that uses an external API endpoint.

It sends a batch of Document in the shape of {“docs”:LIST_OF_DOCS, “fields”:LIST_OF_FIELDS} to the endpoint and returns the received batch of Document

Parameters
  • type (str) – endpoint

  • fields (list) – list of relevant fields

  • process_endpoint (str) – URL of batched process endpoint

  • train_endpoint (str) – URL of batched train endpoint

Example

{
    "name": "endpoint",
    "step": "endpoint",
    "type": "endpoint",
    "fields": ["title","body"],
    "process_endpoint": "https://localhost:<PORT>/process",
    "train_endpoint": "https://localhost:<PORT>/train"
}
process_batch(batch)

Process a batch of documents. If not defined will default to using self.process_doc for each document in the batch.

Parameters

batch (list(Document)) – List of documents

Returns

List of processed documents

Return type

list(Document)

train_batch(batch)

Train on a batch of documents

Parameters

batch (list(Document)) – List of documents

mlflow_maas_step

MLFlow MaaS Endpoint step class

class squirro.lib.nlp.steps.mlflow_maas_step.MlflowMaasEndpointStep(config)

Bases: squirro.lib.nlp.steps.batched_step.BatchedStep

Step that uses an external MLFlow based MaaS API endpoint.

The tutorial to set up Model-as-a-Service (MaaS) within the Squirro can be found under: https://nektoon.atlassian.net/wiki/spaces/SQ/pages/2127954343/Tutorial+to+use+MaaS+with+Squirro

Parameters
  • type (str) – mlflow_maas

  • input_mapping (dict) – dictionary of input fields mapping: output of the prior step field (key) to the MaaS endpoint input field (value)

  • output_mapping (dict) – dictionary of output fields mapping: the MaaS endpoint output field (key) to output field of that step (value)

  • process_endpoint (str) – URL of the batched process endpoint (MaaS)

Example - In the example configuration below we assume that our MaaS requires the field ‘input’ as input and returns in the field ‘output’ the predictions. So the step’s config could look like this:

{
    "name": "mlflow_maas",
    "step": "mlflow_maas",
    "type": "mlflow_maas",
    "input_mapping": {"body": "input"},
    "output_mapping": {"output": "prediction"},
    "process_endpoint": "https://localhost:<PORT>/invocations",
}
process_batch(batch)

Process a batch of documents.

parallel_section

Parallel section class

class squirro.lib.nlp.steps.parallel_section.ExceptionThread(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)

Bases: threading.Thread

Thread class which catches, stores, and propagates exceptions

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

join()

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

class squirro.lib.nlp.steps.parallel_section.PipelineWorker(configs, path, from_queue, to_queue, queue_status, timeout=0.1)

Bases: multiprocessing.context.Process

Multiprocessing process specialized for instantiated pipelines

set_mode(mode)
wait_for_completed(mode)
run()

Method to be run in sub-process; can be overridden in sub-class

class squirro.lib.nlp.steps.parallel_section.ParallelSection(config)

Bases: squirro.lib.nlp.steps.base.Step

Step that creates a parallelized pipeline section

Parameters
  • type (str) – parallel_section

  • batch_size (int, 128) – Batch size

  • n_workers (int, 1) – Number of parallel workers

  • steps (list) – Step configs to be parallelized

  • timeout (float, 0.1) – Seconds to wait in control loops

Example

{
    "step": "parallel_section",
    "type": "parallel_section",
    "n_workers": 2,
    "steps": [
        {
            "step": "classifier",
            "type": "sklearn",
            "input_fields": [
                "sepal length",
                "sepal width",
                "petal length",
                "petal width"
            ],
            "label_field": "class",
            "model_type": "SVC",
            "model_kwargs": {
                "probability": true
            },
            "output_field": "pred_class",
            "explanation_field": "explanation"
        }
    ]
}
load()

Load a step

clean()

Clean step

terminate()

Terminate step

process(docs)

Process a set of documents

Parameters

docs (generator(Document)) – Generator of documents

Returns

Generator of processed documents

Return type

generator(Document)

train(docs)

Train on a step of a set of documents

Parameters

docs (generator(Document)) – Generator of documents

Returns

Generator of processed documents

Return type

generator(Document)

randomizer_step

Randomizer step class

class squirro.lib.nlp.steps.randomizer_step.RandomizerStep(config)

Bases: squirro.lib.nlp.steps.base.Step

Step for randomizing the Document

Note - only the documents with in a batch are getting shuffled if batching is not switched off.

Parameters
  • type (str) – randomizer

  • seed (int,7400) – Seed for randomization process

Example

{
    "name": "randomizer",
    "step": "randomizer",
    "type": "randomizer",
    "seed": 8000
}
process(docs)

Process a set of documents

Parameters

docs (generator(Document)) – Generator of documents

Returns

Generator of processed documents

Return type

generator(Document)