Rerunning a Pipelet

Pipelets, just as other enrichments, are only executed on items that are loaded into a project after the pipelet has been configured. However, it is often desired to run the pipelet on all previously loaded items without having to reload the items into Squirro. Pipelets are either rerun from the command line or directly in the user interface.

Command Line

Warning

Pipelet rerunning using the command line tool is implemented using the Update Item API. Because of this, the only changes that can be applied to an item are changes in the keywords. It is currently not possible to update any of the other Item Fields when rerunning a pipelet.

You can use the rerun individual pipeline step functionality to work around this limitation.

The basic command to rerun a pipelet is:

pipelet rerun --cluster CLUSTER --token TOKEN --project-id PROJECT mypipelet.py

This executes the pipelet contained in mypipelet.py on all the items in the given project (see Connecting to Squirro for the cluster, token and project options).

It is possible to limit the rerunning to just a subset of the project’s items, by specifying a query (the connection options are now omitted for brevity):

pipelet rerun --query 'big data' mypipelet.py

To pass in configuration, that the pipelet needs, use the config parameter which is a JSON string:

pipelet rerun --config '{"file":"test.txt"}' mypipelet.py

Versioning

A common use of pipelet rerun is to change the way some keywords are calculated. To easily update the data, it is recommended to introduce a separate keyword for the pipelet’s version. This way, the version can be incremented when the logic is improved, and the rerun command can be applied to all older items.

Take for example this pipelet:

import re

from squirro.sdk import PipeletV1, require


VERSION = 1


@require('log')
class PricePipelet(PipeletV1):
    """Extract the price of the item from the body.

    Searches for the first number prefixed with $ and uses that as the price.
    """
    def __init__(self, config):
        self.config = config

    def consume(self, item):
        body = item.get('body')
        kw = item.setdefault('keywords', {})
        kw['price_version'] = [VERSION]

        if not body:
            return item

        match = re.search('\$(\d+)', body)
        if not match:
            return item

        kw['price'] = int(match.group(1))
        return item

This sets a price_version facet to the value 1 (the facet should be declared in the project as being a numeric facet).

When the pipelet is updated, increment the version to VERSION = 2. Then rerun is then called as follows:

pipelet rerun --cluster CLUSTER --token TOKEN --project-id PROJECT --query '-price_version:2' price_pipelet.py

This runs the pipelet on all items that do not have the price_version set to the value 2 - either the value hasn’t been set at all, or it’s still on a different version.

User Interface

To rerun a pipelet in the user interface, follow the explanations for rerunning an individual pipeline step in the pipeline editor.

Note

Using the rerun functionality in the pipeline editor changes are not limited to keywords. You can also change Item Fields like the title or the created_at field.

Pipelet Features

  • Number of items: only pipelets that return exactly one item are supported. There is no support for pipelets that return None (to remove an item from the index) or that yield more than one item.