Standard Types#
Functions#
make_step#
Classes#
base#
Step base class
- class Step(config)#
The
Step
class represents single operations that can be performed on eachDocument
in aPipeline
stream.In the config the class of a step is specified by a step parameter, and the type is specified via type. Each step will then have additional parameters (some with default values), which are documented in their corresponding section.
It is also possible to create your own custom step by writing a class that inherits from
Step
and specifying the step as ‘custom’.NOTE: [Optional] Receive a reference to the executing pipeline runner. Each step gets access to shared models kept in memory (managed via pipeline).
NOTE: For all steps (including custom steps), the input config JSON is validated against what appears in the `Parameters` section of the step class docstring. The specification format is as follows:
Parameters: VARIABLE_NAME (SCHEMA, OPTIONAL_DEFAULT): DESCRIPTION
For example, for the base
Step
class we specify:Parameters: name (str, None): Name of step (defaults to value of `type`) path (str): Path to step storage step (str): Class of step type (str): Type of step modes (list, ['train', 'process']): Modes for which step will be run handle_skipped (bool, False): Execute actions for skipped items model_cache_expiration (int, 3600): Time in seconds after cached model expires _pipeline (weakref.ReferenceType, None): Reference to the executing pipeline
This is reflected in the documentation below:
- Parameters
name (str, None) – Name of step (defaults to value of type)
path (str) – Path to step storage
step (str) – Class of step
type (str) – Type of step
modes (list, ['train', 'process']) – Modes for which step will be run
handle_skipped (bool, False) – Execute actions for skipped items
cache_document (bool, False) –
Whether to cache document
Deprecated since version 3.10.4: Document level cache is not supported anymore
document_cache_expiration (int, 300) –
Time in seconds after cached document expires
Deprecated since version 3.10.4: Document level cache is not supported anymore
model_cache_expiration (int, 3600) – Time in seconds after cached model expires
_pipeline (weakref.ReferenceType, None) – Reference to the executing pipeline
Example:
from squirro.lib.nlp.steps import make_step csv_loader_step = make_step({ "step": "loader", "type": "csv", "fields": ["sepal length", "sepal width", "petal length", "class"] })
- property model_cache: squirro.lib.nlp.utils.cache.services.ModelCacheClient#
Cache client to enable model sharing across whole process.
- Return type
- Returns
Model cache
- property inmem_models: squirro.lib.nlp.utils.model_management.inmemory.InMemModel#
Memory cache for models.
Deprecated since version 3.4.4: Use the
model_cache
property.- Return type
- property in_mem_models: squirro.lib.nlp.utils.model_management.inmemory.InMemModel#
Memory cache for models.
Deprecated since version 3.6.0: Use the
model_cache
property.- Return type
- property pipeline: Optional[squirro.lib.nlp.pipeline.Pipeline]#
Reference to parent pipeline-runner.
- save()#
Save a step
- load()#
Load a step
- clean()#
Clean step
- terminate()#
Terminate step
- process(docs)#
Process a set of documents
- train(docs)#
Train on a step of a set of documents
- filter_skipped(docs, skipped=None)#
Helper to return all documents that have to be processed.
By default, this filters out all documents that are skipped. But if handle_skipped is set, then all documents are returned, both skipped and not.
batched_step#
Batched step base class
- class BatchedStep(config)#
Bases:
Step
Process an in-memory batch of documents
This is an extension of the standard class:.Step class, which allows executing batches of
Document
in aPipeline
Note - this is a generic base step.
- Parameters
batch_size (int, 128) – Batch size
use_batches (bool, True) – yield the data in #ds/batch_size groups size
struct_log_enable (bool, False) – Whether to log in structured way
struct_log_name (str, "step-debug") – Name of the struct logger
struct_log_input_step_fields (list, None) – Which fields log before step processing
struct_log_output_step_fields (list, None) – Which fields log after step processing
- clean()#
Clean step
- process(docs)#
Process a set of documents
- train(docs)#
Train on a step of a set of documents
- process_batch(batch)#
Process a batch of documents. If not defined will default to using self.process_doc for each document in the batch.
- train_batch(batch)#
Train on a batch of documents
balancer_step#
Balancer step class
- class BalancerStep(config)#
Bases:
Step
It creates a dataset of documents with the provided classes and optional makes sure that the dataset is balanced (i.e., there is a roughly equal amount of elements per each class).
Note - the balancing gets only applied within a batch if batching is not switched off.
Input - the class field needs to be of type
str
.Output - the output field is filled with data of type
str
.- Parameters
type (str) – balancer
classes (list) – List of the classes which get used in classifier
class_field (str) – Field name in which the classes are
output_label_field (str) – Field name in which the balanced label are stored
not_class (bool,True) – Is a class which includes all neg/opposite elements needed?
deviation (int,0.02) – Max deviation of the size of the classes (1. = 100%, 0.0 = 0%)
seed (int,7400) – Seed for randomization process
balancing_enabled (bool,True) – If true, the balancing step will be executed on the dataset. Otherwise, the dataset will not be forcefully balanced.
Example
{ "name": "balancer", "step": "balancer", "type": "balancer", "classes": ["classA","classB"], "class_field": "labels", "output_label_field": "balanced_labels", "not_class": false, "deviation": 0.02, "balancing_enabled": true, }
checkpoint_step#
Checkpoint step class
endpoint_step#
Endpoint step class
- class EndpointStep(config)#
Bases:
BatchedStep
Step that uses an external API endpoint.
It sends a batch of
Document
in the shape of {“docs”:LIST_OF_DOCS, “fields”:LIST_OF_FIELDS} to the endpoint and returns the received batch ofDocument
- Parameters
Example
{ "name": "endpoint", "step": "endpoint", "type": "endpoint", "fields": ["title","body"], "process_endpoint": "https://localhost:<PORT>/process", "train_endpoint": "https://localhost:<PORT>/train" }
- process_batch(batch)#
Process a batch of documents. If not defined will default to using self.process_doc for each document in the batch.
mlflow_maas_step#
MLFlow MaaS Endpoint step class
- class MlflowMaasEndpointStep(config)#
Bases:
BatchedStep
Step that uses an external MLFlow based MaaS API endpoint.
The tutorial to set up Model-as-a-Service (MaaS) within the Squirro can be found under: https://nektoon.atlassian.net/wiki/spaces/SQ/pages/2127954343/Tutorial+to+use+MaaS+with+Squirro
- Parameters
type (str) – mlflow_maas
input_mapping (dict) – dictionary of input fields mapping: output of the prior step field (key) to the MaaS endpoint input field (value)
output_mapping (dict) – dictionary of output fields mapping: the MaaS endpoint output field (key) to output field of that step (value)
process_endpoint (str) – URL of the batched process endpoint (MaaS)
Example - In the example configuration below we assume that our MaaS requires the field ‘input’ as input and returns in the field ‘output’ the predictions. So the step’s config could look like this:
{ "name": "mlflow_maas", "step": "mlflow_maas", "type": "mlflow_maas", "input_mapping": {"body": "input"}, "output_mapping": {"output": "prediction"}, "process_endpoint": "https://localhost:<PORT>/invocations", }
- process_batch(batch)#
Process a batch of documents.
remote_spacy#
- class RemoteSpacy(config)#
Bases:
BatchedStep
Step that uses an external API endpoint.
It sends a batch of
Document
in the shape of {“docs”:LIST_OF_DOCS, “fields”:LIST_OF_FIELDS} to spaCy and returns the annotated batch ofDocument
- Parameters
step (str, "external") – external
type (str, "remote_spacy") – remote_spacy
field_mapping (dict) – Mapping of input field to output field
max_concurrent (int, 10) – Maximum concurrent requests
language_service_mapping (dict,{}) – Define custom service URL that should be used for the detected language. If the language is not found in the mapping, the default_worker is used and the service URL is read from the configuration service.
default_worker (str, "fast") – Default worker that is used to read service URL from the configuration service if detected language is not found in the language_service_mapping.
disable_pipes__field (str, None) – Programmatic selection of disabled pipelines (read disabled pipelines from field). Uses disable_pipes__default if empty.
disable_pipes__default (list, []) – specified pipes are disabled by default, if no disable_pipes__field specified or value is null
Example
{ "step": "external", "type": "remote_spacy", "field_mapping": {"body": "nlp__body", "title": "nlp__title"}, "language_service_mapping": { "en": "http://localhost:8000" }, "default_worker: "fast", "disable_pipes__default": ["ner"] }
- process_batch(batch)#
Process a batch of documents. If not defined will default to using self.process_doc for each document in the batch.
- get_client(language, worker)#
- Return type
SpacyClient
remote_qa#
- class RemoteQuestionAnswering(config)#
Bases:
BatchedStep
Step that uses an external API endpoint.
It sends a batch of
Document
to qa remote service and returns the annotated batch ofDocument
- Parameters
step (str, "external") – external
type (str, "remote_qa") – remote_qa
input_field__question (str, "qa_question") – Input field containing the question
input_field__context (str, "qa_context") – Input field containing the context
output_field__answer (str, "qa_answer") – Output field
max_concurrent (int, 10) – Maximum concurrent requests
language_service_mapping (dict,{}) – Define custom service URL that should be used for the detected language. If the language is not found in the mapping, the default_worker is used and the service URL is read from the configuration service.
default_worker (str, "accurate") – Default worker that is used to read service URL from the configuration service if detected language is not found in the language_service_mapping.
Example
{ "step": "external", "type": "remote_qa", "language_service_mapping": { "en": "http://localhost:8000" }, "default_worker: "accurate" }
- process_batch(batch)#
Process a batch of documents. If not defined will default to using self.process_doc for each document in the batch.
- get_client(language, worker)#
- Return type
QAClient
parallel_section#
Parallel section class
- class ExceptionThread(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)#
Bases:
Thread
Thread class which catches, stores, and propagates exceptions
- run()#
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
- join()#
Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
- class PipelineWorker(configs, path, from_queue, to_queue, queue_status, timeout=0.1)#
Bases:
Process
Multiprocessing process specialized for instantiated pipelines
- set_mode(mode)#
- wait_for_completed(mode)#
- run()#
Method to be run in sub-process; can be overridden in sub-class
- class ParallelSection(config)#
Bases:
Step
Step that creates a parallelized pipeline section
- Parameters
Example
{ "step": "parallel_section", "type": "parallel_section", "n_workers": 2, "steps": [ { "step": "classifier", "type": "sklearn", "input_fields": [ "sepal length", "sepal width", "petal length", "petal width" ], "label_field": "class", "model_type": "SVC", "model_kwargs": { "probability": true }, "output_field": "pred_class", "explanation_field": "explanation" } ] }
- load()#
Load a step
- clean()#
Clean step
- terminate()#
Terminate step
- process(docs)#
Process a set of documents
randomizer_step#
Randomizer step class
truncate#
- class TruncateStep(config)#
Bases:
Step
Step for truncating a specific field of a
Document
to the first X words. It uses the str.split() method from Python, so it’s not a “smart” step. Particularly useful for large text fields, such as the body.- Parameters
Example
{ "name": "truncate", "step": "truncate", "type": "truncate", "fields": ["body"], "suffix": "", "num_of_tokens": 800 }