Pipelets Tutorial#
Python Engineer, Project Creator
This tutorial is aimed towards Squirro newcomers and provides the fundamentals of how Pipelets are developed and used in the Squirro pipeline. It is best suited for python engineers, but project creators with an understanding of the Python programming language may also find themselves developing pipelets. Note that adding pipelets to a Squirro instance requires server-level administrator privileges.
Objectives#
Upon completion of this tutorial, you should be able to:
write your own pipelet
access contents and keywords of the item within the pipelet
validate, test and upload your pipelet
include your pipelet as a custom step in the Squirro pipeline
set a configuration in the pipeline editor and pass it to the pipelet
log pipelet output
Overview#
In this tutorial, we will work with the following components: pipelets, items, Squirro pipeline.
Pipelets are essentially plugins to apply custom enrichments or processing steps to items that run through the Squirro pipeline.
Items are represented in JSON format. A Squirro item has a number of properties, such as a body, title, id and keywords.
The Squirro pipeline applies different transformation, processing and enrichment steps on each item that runs through it.
The tutorial guides you through the steps to develop a pipelet that extracts the publication date from the text body of an item and stores the extracted date as keyword along with the item.
To follow the instructions you need an installation of the Squirro toolbox and access to a Squirro installation (best on a server or in a VM). You also need to be familiar with loading data into Squirro and the basic concepts of the pipeline editor.
If you haven’t already done so, it is now a good moment to create a Python virtual environment and install the Squirro toolbox.
Write the Pipelet#
In a new directory, start off by creating the pipelet file publication_date.py
with the following content:
from squirro.sdk import PipeletV1
class PublicationDatePipelet(PipeletV1):
def __init__(self, config):
self.config = config
def consume(self, item):
# process item here
return item
@staticmethod
def getArguments():
return []
The above code is the minimal structure we need for our pipelet. We first import the required base class for pipelets (PipeletV1
). Here you can also import any other packages that you need to make the pipelet work correctly. For this pipelet we need the regular expression operations package re
(https://docs.python.org/3/library/re.html). Add the re
import to your script:
import re
from squirro.sdk import PipeletV1
class PublicationDatePipelet(PipeletV1):
We then create the class PublicationDatePipelet(PipeletV1)
for our pipelet which inherits from the PipeletV1
class.
Next, implement the constructor for the pipelet:
def __init__(self, config):
self.config = config
As we will see later, we can pass a configuration to the pipelet. By default, the config is only available in __init__()
. Assigning it to self.config
will make it available for the rest of the pipelet code to use.
Then we can start implementing the consume()
method of the pipelet. This method takes the contents of a Squirro item as the input, and returns a processed version of the same Squirro item:
def consume(self, item):
# process item here
return item
Let’s modify the consume()
method to perform the extraction of the publication date:
def consume(self, item):
# process item here
content = item.get("body", "")
date_dd_mm_yyyy = re.compile(
r"(0[1-9]|[12][0-9]|3[01])[-/.](0[1-9]|1[012])[-/.]((?:19|20)\d\d)"
)
The first step for this pipelet is to fetch the text body of the item. The text body is where we will look for a matching publication date. Then we define the regular expression pattern we want to match in the text. A good tool to test regular expression patterns is https://regex101.com/. The pattern is designed to match dates in the format DD.MM.YYYY (or DD-MM-YYYY) where the years are restricted from 1900 to 2099.
Now let’s try to match the defined pattern in the text and format the matched date according to the format we require for datetime keywords (see also here). Finally write the matched date to the keyword publication_date
.
def consume(self, item):
# process item here
content = item.get("body", "")
date_dd_mm_yyyy = re.compile(
r"(0[1-9]|[12][0-9]|3[01])[-/.](0[1-9]|1[012])[-/.]((?:19|20)\d\d)"
)
try:
matched_date = date_dd_mm_yyyy.findall(content)
# Let's take the first match for this tutorial
# and reverse and UTC format YYYY-MM-DD
publication_date = "-".join(
matched_date[0][::-1]
)
item["keywords"]["publication_date"] = [publication_date + "T12:00:00"]
except Exception as e:
# pass for now
pass
return item
Note, keywords are of type list
. Make sure to write your publication date into a list.
Validate, Test and Upload#
Open a terminal and navigate to the location of your pipelet. Start by validating the pipelet to verify that there are no errors in the pipelet code:
pipelet validate publication_date.py
If all is fine, you should see:
OK: Validated and loaded
If not, you will be presented with the error.
In order to test the pipelet, we require test items. At the location of your pipelet, create the file test_item.json
with the following content:
{
"id": "test-item-1",
"title": "Test item date extraction",
"created_at": "2021-10-25T09:20:30",
"body": "17.11.2021 - Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laborisnisi ut aliquip ex ea commodo consequat. This date is not matched 21.10.2021.",
"keywords": {
"author": [
"Cicero"
]
}
}
Now we can test the pipelet. To do so, run the pipelet consume
command for the pipelet and specify the test items using the -i
flag:
pipelet consume publication_date.py -i test_item.json
Observe the added keyword publication_date
in the output:
Loading items...
Loading test_item.json ...
Loaded.
Consuming item test-item-1
yielded item
{'body': '17.11.2021 - Lorem ipsum dolor sit amet, consectetur adipiscing '
'elit, sed do eiusmod tempor incididunt ut labore et dolore magna '
'aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco '
'laboris nisi ut aliquip ex ea commodo consequat. This date is not '
'matched 21.10.2021.',
'created_at': '2021-10-25T09:20:30',
'id': 'test-item-1',
'keywords': {'author': ['Cicero'],
'publication_date': ['2021-11-17T12:00:00']},
'title': 'Test item date extraction'}
This keyword is not present in the item (see test_item.json
file) and was just added by our pipelet to the item’s keywords.
Having successfully validated and tested the pipelet, a server administrator can upload it to a Squirro project using the upload
command:
pipelet upload --token <your_token> --cluster <cluster> publication_date.py "Date Extraction"
Specify the token
and cluster
to where the pipelet should be uploaded. Also make sure to give the pipelet a name such that you easily find it later on. In this example we name the pipelet Date Extraction.
If the upload is successful, you see the following message in your terminal:
OK: pipelet squirro/Date Extraction uploaded and ready to use
Switch to the instance where you just uploaded your pipelet to. If not yet done, create a project. In the Setup Space, navigate to the PIPELINE tab. Edit the Standard or create a new one and add your pipelet to the Enrichment section. You can search for the pipelet by it’s name, just start typing “Date” and it should show up. Having added the pipelet to the workflow, every item that runs through that workflow will now also run through the “Date Extraction” step.
Let’s now run an item through the workflow. In the Setup Space of your project, switch to the DATA tab. Click the plus icon in the top right corner and select the Data Import tab. Use the JSON data loader, upload the test_item.json
, map the item fields and keywords, make sure the Standard is selected and hit the SAVE button.
Now you can inspect the uploaded item on the EXPLORE tab and verify that the publication_date
keyword was added to the item.
Pipelet Configuration#
Let’s make some changes to our pipelet and add a configurable option.
In the current version of the pipelet, we are only checking the body of an item for the date. We add the option to also check the title for a publication date. Let’s start by defining the configuration argument.
Change the get getArguments()
method as follows:
@staticmethod
def getArguments():
return [
{
"name": "match_in_title",
"display_label": "Match date in title",
"type": "bool",
"default": False,
}
]
This adds a configuration option of type bool
which shows as a checkbox under the STEP PROPERTIES of the pipelet in the pipeline editor. For other types of configuration see here. The display_label
defines the text next to the checkbox. The default
is set to False
, showing as a an un-checked checkbox.
We can retrieve the configuration property in the constructor and make it available to the pipelet in the following way:
def __init__(self, config):
self.config = config
self.match_in_title = self.config.get("match_in_title")
Now, let’s add the check for the title matching in the consume()
method. Make the following changes:
try:
matched_date = date_dd_mm_yyyy.findall(content)
# Let's take the first match for this tutorial
# and reverse and UTC format YYYY-MM-DD
if not matched_date and self.match_in_title:
self.log.error(f"Matching date in title..")
title = item.get("title", "")
matched_date = date_dd_mm_yyyy.findall(title)
self.log.error(f"Matched date: {matched_date}")
publication_date = "-".join(matched_date[0][::-1])
item["keywords"]["publication_date"] = [publication_date + "T12:00:00"]
except Exception as e:
# pass for now
pass
Note, we only check the title if no matching date is found in the text body and the configuration is set to check also the title.
Let’s create a second item to test the configuration and the new behavior of the pipelet. Create the file test_item_title.json
with content:
{
"id": "test-item-2",
"title": "Test item date extraction - 17.11.2021",
"created_at": "2021-10-25T09:20:30",
"body": "Lorem ipsum dolor sit amet, consectetur adipiscing elit,
sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.
Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris
nisi ut aliquip ex ea commodo consequat.",
"keywords": {
"author": [
"Cicero"
]
}
}
In the item above, a date in the title
field of the item was added.
You can test the pipelet with a specified configuration by passing the -c
flag with the pipelet consume
command:
pipelet consume publication_date.py -i test_item_title.json -c '{"match_in_title": true}'
Check and observe the behavior of the configuration setting by switching to {"match_in_title": false}
in the above command.
Pipelet Documentation#
Now that we finished the development of the pipelet, it is good practice to write a small documentation of the pipelet class using doc-strings. For example:
class PublicationDatePipelet(PipeletV1):
"""Extract the publication date from text content.
The pipelet matches dates of the format DD.MM.YYYY in the text body of items.
If a date is matched, it is added as keyword publication_date to the item.
Optionally, dates are also matched in the title if no match in the body is found.
"""
The first sentence is used as a summary in the user interface and shows up when hovering over the pipelet in the pipeline editor. All the remaining text is used as a more detailed description and describes the expected configuration. In the pipeline editor, the description shows when clicking the information icon in editing mode.
After editing the pipelet, a server administrator needs to upload it again:
pipelet upload --token <your_token> --cluster <cluster> publication_date.py "Date Extraction"
Go and check it out in the pipeline editor. Hovering over the Date Extraction step shows the summary sentence you just added. Editing the pipelet, you see the checkbox that represents the boolean configuration property that was added. And clicking on the information icon you can read the detailed description.
Pipelet Logs#
It might be helpful to log some output of your pipelet, for example, for debugging purposes.
We can use the log
dependency of pipelets for logging. For more information about dependencies, see here. By using the @require("log")
decorator, the log
dependency makes three class attributes available for logging: log
, slog
, and dlog
. The log
attribute is a classic logging.Logger
instance from Python’s standard logging framework. Logs emitted by this logger are written to the service’s main log file (plumber.log
). The slog
attribute is an instance of a structured logger from the structlog
third-party library, pre-configured and ready to use. Logs emitted by this logger are visible in the Data Ingestion Logs
dashboard within the Squirro Monitoring
project. Finally, the dlog
attribute is a special type of logger, known as a dual logger. It emits the same message as both an unstructured and a structured log by utilizing the two aforementioned loggers.
Now, let’s import require
and add the decorator to our class:
import re
from squirro.sdk import PipeletV1, require
@require("log")
class PublicationDatePipelet(PipeletV1):
In the consume()
method, log the matched date using all available loggers:
self.log.debug(f"Matched date: {matched_date}")
self.slog.debug(f"Matched date: {matched_date}", key1="value1")
self.dlog.debug(f"Matched date: {matched_date}", key2="value2")
The message emitted with the log
attribute will be written to plumber.log
. The message emitted with the slog
attribute will appear in the Data Ingestion Logs
dashboard, with the provided keyword argument key1
being a label of the log record. The message emitted with the dlog
attribute will be visible in both locations, with the key2
keyword argument omitted in the unstructured record in plumber.log
.
We can also log an Exception
, for example:
except Exception as e:
self.log.error(f"Error while matching date in content: {str(e)}")
The logging can be defined on different levels: debug
, info
, warn
, error
, critical
(see https://docs.python.org/3/howto/logging.html). Above we have added logging on level debug
and error
.
It can be useful to control the log-level of the pipelet. For example, add the following in the constructor of the pipelet:
self.log.setLevel("DEBUG")
Now run
pipelet consume publication_date.py -i item_title.json -c '{"match_in_title": true}'
and observe the Matched date:...
line. Change the "match_in_title"
to false
and run the command again to observe the Exception
that is now logged as error.
As mentioned earlier, when using the log
attribute, the output from the pipelet is logged to the plumber.log
file, which can be checked in a terminal like this:
tail -f /var/log/squirro/plumber/plumber.log