Pipelets Tutorial#

Python Engineer, Project Creator

This tutorial is aimed towards Squirro newcomers and provides the fundamentals of how Pipelets are developed and used in the Squirro pipeline.

It is best suited for python engineers, but project creators with an understanding of the Python programming language may also find themselves developing pipelets.

Objectives#

Upon completion of this tutorial, you should be able to:

  • write your own pipelet

  • access contents and keywords of the item within the pipelet

  • validate, test and upload your pipelet

  • include your pipelet as a custom step in the Squirro pipeline

  • set a configuration in the pipeline editor and pass it to the pipelet

  • log pipelet output

Overview#

In this tutorial, we will work with the following components: pipelets, items, Squirro pipeline.

Pipelets are essentially plugins to apply custom enrichments or processing steps to items that run through the Squirro pipeline.

Items are represented in JSON format. A Squirro item has a number of properties, such as a body, title, id and keywords.

The Squirro pipeline applies different transformation, processing and enrichment steps on each item that runs through it.

The tutorial guides you through the steps to develop a pipelet that extracts the publication date from the text body of an item and stores the extracted date as keyword along with the item.

To follow the instructions you need an installation of the Squirro toolbox and access to a Squirro installation (best on a server or in a VM). You also need to be familiar with loading data into Squirro and the basic concepts of the pipeline editor.

If you haven’t already done so, it is now a good moment to create a Python virtual environment and install the Squirro toolbox.

Write the Pipelet#

In a new directory, start off by creating the pipelet file publication_date.py with the following content:

from squirro.sdk import PipeletV1

class PublicationDatePipelet(PipeletV1):

    def __init__(self, config):

        self.config = config

    def consume(self, item):
        # process item here
        return item

    @staticmethod
    def getArguments():

        return []

The above code is the minimal structure we need for our pipelet. We first import the required base class for pipelets (PipeletV1). Here you can also import any other packages that you need to make the pipelet work correctly. For this pipelet we need the regular expression operations package re (https://docs.python.org/3/library/re.html). Add the re import to your script:

import re
from squirro.sdk import PipeletV1

class PublicationDatePipelet(PipeletV1):

We then create the class PublicationDatePipelet(PipeletV1) for our pipelet which inherits from the PipeletV1 class.

Next, implement the constructor for the pipelet:

def __init__(self, config):

    self.config = config

As we will see later, we can pass a configuration to the pipelet. By default, the config is only available in __init__(). Assigning it to self.config will make it available for the rest of the pipelet code to use.

Then we can start implementing the consume() method of the pipelet. This method takes the contents of a Squirro item as the input, and returns a processed version of the same Squirro item:

def consume(self, item):
    # process item here
    return item

Let’s modify the consume() method to perform the extraction of the publication date:

def consume(self, item):
    # process item here

    content = item.get("body", "")
    date_dd_mm_yyyy = re.compile(
        r"(0[1-9]|[12][0-9]|3[01])[-/.](0[1-9]|1[012])[-/.]((?:19|20)\d\d)"
    )

The first step for this pipelet is to fetch the text body of the item. The text body is where we will look for a matching publication date. Then we define the regular expression pattern we want to match in the text. A good tool to test regular expression patterns is https://regex101.com/. The pattern is designed to match dates in the format DD.MM.YYYY (or DD-MM-YYYY) where the years are restricted from 1900 to 2099.

Now let’s try to match the defined pattern in the text and format the matched date according to the format we require for datetime keywords (see also here). Finally write the matched date to the keyword publication_date.

def consume(self, item):
    # process item here

    content = item.get("body", "")
    date_dd_mm_yyyy = re.compile(
        r"(0[1-9]|[12][0-9]|3[01])[-/.](0[1-9]|1[012])[-/.]((?:19|20)\d\d)"
    )

    try:
        matched_date = date_dd_mm_yyyy.findall(content)
        # Let's take the first match for this tutorial
        # and reverse and UTC format YYYY-MM-DD
        publication_date = "-".join(
            matched_date[0][::-1]
        )
        item["keywords"]["publication_date"] = [publication_date + "T12:00:00"]

    except Exception as e:
        # pass for now
        pass

    return item

Note, keywords are of type list. Make sure to write your publication date into a list.

Validate, Test and Upload#

Open a terminal and navigate to the location of your pipelet. Start by validating the pipelet to verify that there are no errors in the pipelet code:

pipelet validate publication_date.py

If all is fine, you should see:

OK: Validated and loaded

If not, you will be presented with the error.

In order to test the pipelet, we require test items. At the location of your pipelet, create the file test_item.json with the following content:

{
    "id": "test-item-1",
    "title": "Test item date extraction",
    "created_at": "2021-10-25T09:20:30",
    "body": "17.11.2021 - Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laborisnisi ut aliquip ex ea commodo consequat. This date is not matched 21.10.2021.",
    "keywords": {
        "author": [
            "Cicero"
        ]
    }
}

Now we can test the pipelet. To do so, run the pipelet consume command for the pipelet and specify the test items using the -i flag:

pipelet consume publication_date.py -i test_item.json

Observe the added keyword publication_date in the output:

Loading items...
Loading test_item.json ...
Loaded.
Consuming item test-item-1
yielded item
{'body': '17.11.2021 - Lorem ipsum dolor sit amet, consectetur adipiscing '
        'elit, sed do eiusmod tempor incididunt ut labore et dolore magna '
        'aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco '
        'laboris nisi ut aliquip ex ea commodo consequat. This date is not '
        'matched 21.10.2021.',
'created_at': '2021-10-25T09:20:30',
'id': 'test-item-1',
'keywords': {'author': ['Cicero'],
            'publication_date': ['2021-11-17T12:00:00']},
'title': 'Test item date extraction'}

This keyword is not present in the item (see test_item.json file) and was just added by our pipelet to the item’s keywords.

Having successfully validated and tested the pipelet, it is now time to upload it to your Squirro project using the upload command:

pipelet upload --token <your_token> --cluster <cluster> publication_date.py "Date Extraction"

Specify your token and the cluster to where the pipelet should be uploaded. Also make sure to give your pipelet a name such that you easily find it later on. In this example we name the pipelet “Date Extraction”

If the upload was successful, you see the following message in your terminal:

OK: pipelet squirro/Date Extraction uploaded and ready to use

Switch to the instance where you just uploaded your pipelet to. If not yet done, create a project. In the Setup Space, navigate to the PIPELINE tab. Edit the Standard or create a new one and add your pipelet to the Enrichment section. You can search for the pipelet by it’s name, just start typing “Date” and it should show up. Having added the pipelet to the workflow, every item that runs through that workflow will now also run through the “Date Extraction” step.

Let’s now run an item through the workflow. In the Setup Space of your project, switch to the DATA tab. Click the plus icon in the top right corner and select the Data Import tab. Use the JSON data loader, upload the test_item.json, map the item fields and keywords, make sure the Standard is selected and hit the SAVE button. Now you can inspect the uploaded item on the EXPLORE tab and verify that the publication_date keyword was added to the item.

Pipelet Configuration#

Let’s make some changes to our pipelet and add a configurable option.

In the current version of the pipelet, we are only checking the body of an item for the date. We add the option to also check the title for a publication date. Let’s start by defining the configuration argument.

Change the get getArguments() method as follows:

@staticmethod
def getArguments():
    return [
        {
            "name": "match_in_title",
            "display_label": "Match date in title",
            "type": "bool",
            "default": False,
        }
    ]

This adds a configuration option of type bool which shows as a checkbox under the STEP PROPERTIES of the pipelet in the pipeline editor. For other types of configuration see here. The display_label defines the text next to the checkbox. The default is set to False, showing as a an un-checked checkbox.

We can retrieve the configuration property in the constructor and make it available to the pipelet in the following way:

def __init__(self, config):
    self.config = config
    self.match_in_title = self.config.get("match_in_title")

Now, let’s add the check for the title matching in the consume() method. Make the following changes:

try:
    matched_date = date_dd_mm_yyyy.findall(content)
    # Let's take the first match for this tutorial
    # and reverse and UTC format YYYY-MM-DD
    if not matched_date and self.match_in_title:
        self.log.error(f"Matching date in title..")
        title = item.get("title", "")
        matched_date = date_dd_mm_yyyy.findall(title)

    self.log.error(f"Matched date: {matched_date}")

    publication_date = "-".join(matched_date[0][::-1])
    item["keywords"]["publication_date"] = [publication_date + "T12:00:00"]

except Exception as e:
    # pass for now
    pass

Note, we only check the title if no matching date is found in the text body and the configuration is set to check also the title.

Let’s create a second item to test the configuration and the new behavior of the pipelet. Create the file test_item_title.json with content:

{
    "id": "test-item-2",
    "title": "Test item date extraction - 17.11.2021",
    "created_at": "2021-10-25T09:20:30",
    "body": "Lorem ipsum dolor sit amet, consectetur adipiscing elit,
    sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.
    Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris
    nisi ut aliquip ex ea commodo consequat.",
    "keywords": {
        "author": [
            "Cicero"
        ]
    }
}

In the item above, a date in the title field of the item was added.

You can test the pipelet with a specified configuration by passing the -c flag with the pipelet consume command:

pipelet consume publication_date.py -i test_item_title.json -c '{"match_in_title": true}'

Check and observe the behavior of the configuration setting by switching to {"match_in_title": false} in the above command.

Pipelet Documentation#

Now that we finished the development of the pipelet, it is good practice to write a small documentation of the pipelet class using doc-strings. For example:

class PublicatioDatepipelet(PipeletV1):
"""Extract the publication date from text content.

The pipelet matches dates of the format DD.MM.YYYY in the text body of items.
If a date is matched, it is added as keyword publication_date to the item.
Optionally, dates are also matched in the title if no match in the body is found.
"""

The first sentence is used as a summary in the user interface and shows up when hovering over the pipelet in the pipeline editor. All the remaining text is used as a more detailed description and describes the expected configuration. In the pipeline editor, the description shows when clicking the information icon in editing mode.

After editing the pipelet, we need to upload it again:

pipelet upload --token <your_token> --cluster <cluster> publication_date.py "Date Extraction"

Go and check it out in the pipeline editor. Hovering over the Date Extraction step shows the summary sentence you just added. Editing the pipelet, you see the checkbox that represents the boolean configuration property that was added. And clicking on the information icon you can read the detailed description.

Pipelet Logs#

It might be helpful to log some output of your pipelet, for example, for debugging purposes.

We can use the log dependency of pipelets for logging. For more information see here. By using the @require("log") decorator, the log dependency makes a logging.Logger instance from Python’s standard logging framework available to the pipelet class. So let’s import require and add the decorator to our class:

import re
from squirro.sdk import PipeletV1, require

@require("log")
class PublicatioDatepipelet(PipeletV1):

In the consume() method, log the matched date:

self.log.debug(f"Matched date: {matched_date}")

We can also log the Exception, for example:

except Exception as e:
    self.log.error(f"Error while matching date in content: {str(e)}")

The logging can be defined on different levels: debug, info, warn, error, critical (see https://docs.python.org/3/howto/logging.html). Above we have added logging on level debug and error.

It can be useful to control the log-level of the pipelet. For example, add the following in the constructor of the pipelet:

self.log.setLevel("DEBUG")

Now run

pipelet consume publication_date.py -i item_title.json -c '{"match_in_title": true}'

and observe the Matched date:... line. Change the "match_in_title" to false and run the command again to observe the Exception that is now logged as error.

On your server, the output from pipelets is logged to the plumber.log file which can be checked in a terminal like this:

tail -f /var/log/squirro/plumber/plumber.log

How-To Guides#

How to Access File Contents in Pipelets

How to Use Pipelets With the Squirro Data Loader