Standard Types#

Functions#

make_step#

make_step(config)#

Make a step

Parameters:

config (dict) – Dictionary config

Returns:

Step object

Return type:

Step

Classes#

base#

Step base class

class Step(config)#

The Step class represents single operations that can be performed on each Document in a Pipeline stream.

In the config the class of a step is specified by a step parameter, and the type is specified via type. Each step will then have additional parameters (some with default values), which are documented in their corresponding section.

It is also possible to create your own custom step by writing a class that inherits from Step and specifying the step as ‘custom’.

NOTE: [Optional] Receive a reference to the executing pipeline runner. Each step gets access to shared models kept in memory (managed via pipeline).

NOTE: For all steps (including custom steps), the input config JSON is validated against what appears in the `Parameters` section of the step class docstring. The specification format is as follows:

Parameters:
    VARIABLE_NAME (SCHEMA, OPTIONAL_DEFAULT): DESCRIPTION

For example, for the base Step class we specify:

Parameters:
    name (str, None): Name of step (defaults to value of `type`)
    path (str): Path to step storage
    step (str): Class of step
    type (str): Type of step
    modes (list, ['train', 'process']): Modes for which step will be run
    handle_skipped (bool, False): Execute actions for skipped items
    model_cache_expiration (int, 3600): Time in seconds after cached model expires
    _pipeline (weakref.ReferenceType, None): Reference to the executing pipeline

This is reflected in the documentation below:

Parameters:
  • name (str, None) – Name of step (defaults to value of type)

  • path (str) – Path to step storage

  • step (str) – Class of step

  • type (str) – Type of step

  • modes (list, ['train', 'process']) – Modes for which step will be run

  • handle_skipped (bool, False) – Execute actions for skipped items

  • cache_document (bool, False) –

    Whether to cache document

    Deprecated since version 3.10.4: Document level cache is not supported anymore

  • document_cache_expiration (int, 300) –

    Time in seconds after cached document expires

    Deprecated since version 3.10.4: Document level cache is not supported anymore

  • model_cache_expiration (int, 3600) – Time in seconds after cached model expires

  • _pipeline (weakref.ReferenceType, None) – Reference to the executing pipeline

Example:

from squirro.lib.nlp.steps import make_step

csv_loader_step = make_step({
  "step": "loader",
  "type": "csv",
  "fields": ["sepal length", "sepal width", "petal length", "class"]
})
property model_cache: ModelCacheClient#

Cache client to enable model sharing across whole process.

Returns:

Model cache

property inmem_models: InMemModel#

Memory cache for models.

Deprecated since version 3.4.4: Use the model_cache property.

property in_mem_models: InMemModel#

Memory cache for models.

Deprecated since version 3.6.0: Use the model_cache property.

property pipeline: Pipeline | None#

Reference to parent pipeline-runner.

save()#

Save a step

load()#

Load a step

clean()#

Clean step

terminate()#

Terminate step

process(docs)#

Process a set of documents

Parameters:

docs (generator(Document)) – Generator of documents

Returns:

Generator of processed documents

Return type:

generator(Document)

train(docs)#

Train on a step of a set of documents

Parameters:

docs (generator(Document)) – Generator of documents

Returns:

Generator of processed documents

Return type:

generator(Document)

filter_skipped(docs, skipped=None)#

Helper to return all documents that have to be processed.

By default, this filters out all documents that are skipped. But if handle_skipped is set, then all documents are returned, both skipped and not.

Parameters:
  • docs (generator(Document)) – Generator of documents

  • skipped (list, None) – An optional list where skipped items are pushed. Used when skipped items still need to be returned separately.

Returns:

Generator of processed documents

Return type:

list(Document)

batched_step#

Batched step base class

class BatchedStep(config)#

Bases: Step

Process an in-memory batch of documents

This is an extension of the standard class:.Step class, which allows executing batches of Document in a Pipeline

Note - this is a generic base step.

Parameters:
  • batch_size (int, 128) – Batch size

  • use_batches (bool, True) – yield the data in #ds/batch_size groups size

  • struct_log_enable (bool, False) – Whether to log in structured way

  • struct_log_name (str, "step-debug") – Name of the struct logger

  • struct_log_input_step_fields (list, None) – Which fields log before step processing

  • struct_log_output_step_fields (list, None) – Which fields log after step processing

clean()#

Clean step

process(docs)#

Process a set of documents

Parameters:

docs (generator(Document)) – Generator of documents

Returns:

Generator of processed documents

Return type:

generator(Document)

train(docs)#

Train on a step of a set of documents

Parameters:

docs (generator(Document)) – Generator of documents

Returns:

Generator of processed documents

Return type:

generator(Document)

process_batch(batch)#

Process a batch of documents. If not defined will default to using self.process_doc for each document in the batch.

Parameters:

batch (list(Document)) – List of documents

Returns:

List of processed documents

Return type:

list(Document)

train_batch(batch)#

Train on a batch of documents

Parameters:

batch (list(Document)) – List of documents

process_doc(doc)#

Process a document

Parameters:

doc (Document) – Document

Returns:

Processed document

Return type:

Document

balancer_step#

Balancer step class

class BalancerStep(config)#

Bases: Step

It creates a dataset of documents with the provided classes and optional makes sure that the dataset is balanced (i.e., there is a roughly equal amount of elements per each class).

Note - the balancing gets only applied within a batch if batching is not switched off.

Input - the class field needs to be of type str.

Output - the output field is filled with data of type str.

Parameters:
  • type (str) – balancer

  • classes (list) – List of the classes which get used in classifier

  • class_field (str) – Field name in which the classes are

  • output_label_field (str) – Field name in which the balanced label are stored

  • not_class (bool,True) – Is a class which includes all neg/opposite elements needed?

  • deviation (int,0.02) – Max deviation of the size of the classes (1. = 100%, 0.0 = 0%)

  • seed (int,7400) – Seed for randomization process

  • balancing_enabled (bool,True) – If true, the balancing step will be executed on the dataset. Otherwise, the dataset will not be forcefully balanced.

Example

{
    "name": "balancer",
    "step": "balancer",
    "type": "balancer",
    "classes": ["classA","classB"],
    "class_field": "labels",
    "output_label_field": "balanced_labels",
    "not_class": false,
    "deviation": 0.02,
    "balancing_enabled": true,
}
process(docs)#

Process a set of documents

Parameters:

docs (generator(Document)) – Generator of documents

Returns:

Generator of processed documents

Return type:

generator(Document)

checkpoint_step#

Checkpoint step class

class CheckpointStep(config)#

Bases: Step

Checkpoint Document to disk to provide a repeatable iterable.

Parameters:
  • type (str) – checkpoint

  • batch_size (int, 128) – Size of checkpoint batches

  • checkpoint_processing (bool, False) – Whether or not to do checkpoint processing in addition to training

  • do_randomize (bool, False) – Whether or not to randomize order

Example

{
    "name": "checkpoint",
    "step": "checkpoint",
    "type": "checkpoint"
}
clean()#

Clean step

process(docs)#

Process a set of documents

Parameters:

docs (generator(Document)) – Generator of documents

Returns:

Generator of processed documents

Return type:

generator(Document)

train(docs)#

Train on a step of a set of documents

Parameters:

docs (generator(Document)) – Generator of documents

Returns:

Generator of processed documents

Return type:

generator(Document)

endpoint_step#

Endpoint step class

class EndpointStep(config)#

Bases: BatchedStep

Step that uses an external API endpoint.

It sends a batch of Document in the shape of {“docs”:LIST_OF_DOCS, “fields”:LIST_OF_FIELDS} to the endpoint and returns the received batch of Document

Parameters:
  • type (str) – endpoint

  • fields (list) – list of relevant fields

  • process_endpoint (str) – URL of batched process endpoint

  • train_endpoint (str) – URL of batched train endpoint

Example

{
    "name": "endpoint",
    "step": "endpoint",
    "type": "endpoint",
    "fields": ["title","body"],
    "process_endpoint": "https://localhost:<PORT>/process",
    "train_endpoint": "https://localhost:<PORT>/train"
}
process_batch(batch)#

Process a batch of documents. If not defined will default to using self.process_doc for each document in the batch.

Parameters:

batch (list(Document)) – List of documents

Returns:

List of processed documents

Return type:

list(Document)

train_batch(batch)#

Train on a batch of documents

Parameters:

batch (list(Document)) – List of documents

mlflow_maas_step#

MLFlow MaaS Endpoint step class

class MlflowMaasEndpointStep(config)#

Bases: BatchedStep

Step that uses an external MLFlow based MaaS API endpoint.

The tutorial to set up Model-as-a-Service (MaaS) within the Squirro can be found under: https://nektoon.atlassian.net/wiki/spaces/SQ/pages/2127954343/Tutorial+to+use+MaaS+with+Squirro

Parameters:
  • type (str) – mlflow_maas

  • input_mapping (dict) – dictionary of input fields mapping: output of the prior step field (key) to the MaaS endpoint input field (value)

  • output_mapping (dict) – dictionary of output fields mapping: the MaaS endpoint output field (key) to output field of that step (value)

  • process_endpoint (str) – URL of the batched process endpoint (MaaS)

Example - In the example configuration below we assume that our MaaS requires the field ‘input’ as input and returns in the field ‘output’ the predictions. So the step’s config could look like this:

{
    "name": "mlflow_maas",
    "step": "mlflow_maas",
    "type": "mlflow_maas",
    "input_mapping": {"body": "input"},
    "output_mapping": {"output": "prediction"},
    "process_endpoint": "https://localhost:<PORT>/invocations",
}
process_batch(batch)#

Process a batch of documents.

remote_spacy#

class RemoteSpacy(config)#

Bases: BatchedStep

Step that uses an external API endpoint.

It sends a batch of Document in the shape of {“docs”:LIST_OF_DOCS, “fields”:LIST_OF_FIELDS} to spaCy and returns the annotated batch of Document

Parameters:
  • step (str, "external") – external

  • type (str, "remote_spacy") – remote_spacy

  • field_mapping (dict) – Mapping of input field to output field

  • max_concurrent (int, 10) – Maximum concurrent requests

  • language_service_mapping (dict,{}) – Define custom service URL that should be used for the detected language. If the language is not found in the mapping, the default_worker is used and the service URL is read from the configuration service.

  • default_worker (str, "fast") – Default worker that is used to read service URL from the configuration service if detected language is not found in the language_service_mapping.

  • disable_pipes__field (str, None) – Programmatic selection of disabled pipelines (read disabled pipelines from field). Uses disable_pipes__default if empty.

  • disable_pipes__default (list, []) – specified pipes are disabled by default, if no disable_pipes__field specified or value is null

Example

{
    "step": "external",
    "type": "remote_spacy",
    "field_mapping": {"body": "nlp__body", "title": "nlp__title"},
    "language_service_mapping": {
        "en": "http://localhost:8000"
    },
    "default_worker: "fast",
    "disable_pipes__default": ["ner"]
}
process_batch(batch)#

Process a batch of documents. If not defined will default to using self.process_doc for each document in the batch.

Parameters:

batch (list(Document)) – List of documents

Returns:

List of processed documents

Return type:

list(Document)

get_client(language, worker)#
Return type:

SpacyClient

remote_qa#

class RemoteQuestionAnswering(config)#

Bases: BatchedStep

Step that uses an external API endpoint.

It sends a batch of Document to qa remote service and returns the annotated batch of Document

Parameters:
  • step (str, "external") – external

  • type (str, "remote_qa") – remote_qa

  • input_field__question (str, "qa_question") – Input field containing the question

  • input_field__context (str, "qa_context") – Input field containing the context

  • output_field__answer (str, "qa_answer") – Output field

  • max_concurrent (int, 10) – Maximum concurrent requests

  • language_service_mapping (dict,{}) – Define custom service URL that should be used for the detected language. If the language is not found in the mapping, the default_worker is used and the service URL is read from the configuration service.

  • default_worker (str, "accurate") – Default worker that is used to read service URL from the configuration service if detected language is not found in the language_service_mapping.

Example

{
    "step": "external",
    "type": "remote_qa",
    "language_service_mapping": {
        "en": "http://localhost:8000"
    },
    "default_worker: "accurate"
}
process_batch(batch)#

Process a batch of documents. If not defined will default to using self.process_doc for each document in the batch.

Parameters:

batch (list(Document)) – List of documents

Returns:

List of processed documents

Return type:

list(Document)

get_client(language, worker)#
Return type:

QAClient

parallel_section#

Parallel section class

class ExceptionThread(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)#

Bases: Thread

Thread class which catches, stores, and propagates exceptions

run()#

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

join()#

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

class PipelineWorker(configs, path, from_queue, to_queue, queue_status, timeout=0.1)#

Bases: Process

Multiprocessing process specialized for instantiated pipelines

set_mode(mode)#
wait_for_completed(mode)#
run()#

Method to be run in sub-process; can be overridden in sub-class

class ParallelSection(config)#

Bases: Step

Step that creates a parallelized pipeline section

Parameters:
  • type (str) – parallel_section

  • batch_size (int, 128) – Batch size

  • n_workers (int, 1) – Number of parallel workers

  • steps (list) – Step configs to be parallelized

  • timeout (float, 0.1) – Seconds to wait in control loops

Example

{
    "step": "parallel_section",
    "type": "parallel_section",
    "n_workers": 2,
    "steps": [
        {
            "step": "classifier",
            "type": "sklearn",
            "input_fields": [
                "sepal length",
                "sepal width",
                "petal length",
                "petal width"
            ],
            "label_field": "class",
            "model_type": "SVC",
            "model_kwargs": {
                "probability": true
            },
            "output_field": "pred_class",
            "explanation_field": "explanation"
        }
    ]
}
load()#

Load a step

clean()#

Clean step

terminate()#

Terminate step

process(docs)#

Process a set of documents

Parameters:

docs (generator(Document)) – Generator of documents

Returns:

Generator of processed documents

Return type:

generator(Document)

train(docs)#

Train on a step of a set of documents

Parameters:

docs (generator(Document)) – Generator of documents

Returns:

Generator of processed documents

Return type:

generator(Document)

randomizer_step#

Randomizer step class

class RandomizerStep(config)#

Bases: Step

Step for randomizing the Document

Note - only the documents with in a batch are getting shuffled if batching is not switched off.

Parameters:
  • type (str) – randomizer

  • seed (int,7400) – Seed for randomization process

Example

{
    "name": "randomizer",
    "step": "randomizer",
    "type": "randomizer",
    "seed": 8000
}
process(docs)#

Process a set of documents

Parameters:

docs (generator(Document)) – Generator of documents

Returns:

Generator of processed documents

Return type:

generator(Document)

truncate#

class TruncateStep(config)#

Bases: Step

Step for truncating a specific field of a Document to the first X words. It uses the str.split() method from Python, so it’s not a “smart” step. Particularly useful for large text fields, such as the body.

Parameters:
  • type (str) – truncate

  • fields (list, []) – Fields to join

  • suffix (str, None) – Suffix to append to the truncated field (if the field is truncated). Leave empty to truncate field in place.

  • num_of_tokens (int) – Number of tokens.

Example

{
    "name": "truncate",
    "step": "truncate",
    "type": "truncate",
    "fields": ["body"],
    "suffix": "",
    "num_of_tokens": 800
}
process(docs)#

Process a set of documents

Parameters:

docs (generator(Document)) – Generator of documents

Returns:

Generator of processed documents

Return type:

generator(Document)