Data Loader Plugin Reference#

DataSource Class#

This page serves as a reference for the Data Loader Plugins SDK, showing the various methods that need to be implemented on a data loader class.

See How To Write a Custom Data Loader Plugin for an introduction into how to create these classes.

class DataSource#
abstract connect(inc_column=None, max_inc_value=None)#

The connect method is called before loading any of the data.

In the connect method you should perform one-off actions, such as: :rtype: None

  • Authentication with the API provider

  • Opening and reading files

  • Establishing a connection to an external service

  • Resetting the cache state

You might need some, all or none of the above examples, depending on the nature of your data input.

Incremental Loading

The parameters inc_column and max_inc_value are used for incremental loading. They contain the name of the column on which incremental loading is done (specified on the command line with --incremental-column) and the maximum value of that column received in the previous load. Well-behaved data loader plugins should implement this property so that only results are returned where inc_column >= max_inc_value. The data loader takes care of all the required bookkeeping.

Data loader plugins that do not support incremental loading should raise an error when this option is specified:

def connect(self, inc_column=None, max_inc_value=None):
    if inc_column:
        raise ValueError('Incremental loading not supported.')

Often incremental column is only sensible and supported on one specific column. In that case, it is recommended to enforce that as well:

def connect(self, inc_column=None, max_inc_value=None):
    if inc_column and inc_column != 'updated_at':
        raise ValueError('Incremental loading is only supported on the updated_at column.')

When this is the case, you should also implement the getIncrementalColumns() method.

abstract disconnect()#

Disconnect from the source.

Analogous to the connect() method, disconnect is always called after finishing all data loading. If an exception is thrown anywhere in the script, disconnect will be called before the script is terminated.

Examples of actions that disconnect might handle are: :rtype: None

  • Closing the files

  • Closing any open connections

  • Saving the state of the most recent load into cache

The return value of disconnect() is ignored.

abstract getJobId()#

Return a job identifier, a unique string for each different data loading configuration.

The job identifier is used to distinguish possible multiple data sources, therefore each one should be able to generate a unique job ID.

It’s likely that, within the same project, you’ll use the same custom data loader for different sources, e.g.:

  • Loading from a different file

  • Authenticating with an external API with different credentials

  • Scraping a different website

So it is important that each of these jobs generates a job ID that changes depending on its custom parameters, e.g. filename, credentials, URL, etc.

This is used in order to prevent multiple runs of the same job (only for CLI mode, as jobs run via frontend are managed by the datasource service). If the job-id option is provided in the load script, the return value of getJobId() is ignored.

It is good practice to generate a hash from all the custom parameters, as shown in the example (note the use of repr built-in for safe representation of special characters, and finally encoding the string to string, as hashlib’s update method works on bytes):

def getJobId(self):
    # Generate a stable id that changes with the main parameters
    m = hashlib.sha256()
    m.update(repr(self.args.first_custom_param).encode("utf-8"))
    m.update(repr(self.args.second_custom_param).encode("utf-8"))
    job_id = m.hexdigest()
    return job_id
Return type:

str

Returns:

job identifier as a string

getArguments()#

Return the arguments that are expected by this data loader plugin.

These arguments are used to generate a form when using the data loader plugin in the user interface. They are also accepted on the command line for the data loader, and can also be passed in through the config.json for Known Entity Extraction (KEE).

The return value of this method is a list with each value being a dictionary that can contain the following keys: :rtype: list[ArgType]

  • name (mandatory). The name under which the argument will be made available to the plugin. The argument can be accessed with self.args.<name>.

    On the command line, this is also how the argument is passed in (with underlines escaped with a dash).

  • flag: The short command line flag for this argument.

  • help: The help string. If display_label is not provided, ‘help’ takes its place as a main label. Otherwise, it serves as a secondary help text.

  • display_label: The short label for this argument. Set explicitly to None to hide this argument from the UI.

  • required: True if this argument is mandatory.

  • default: The default value of the option.

  • type: The data type expected. The following values are possible:

    • string (default)

    • int

    • float

    • bool

    • password: will be used as a string in the plugin the argument is provided as an file path, which can be opened as usual with open().

      In the user interface, this allows uploading of files by the user.

    • code: a multi-line code field. A code-editor-like box will show when creating the loader from user interface. This can be useful for example for SQL queries that should be provided by the project creator.

  • action: Action to perform when this flag is given.

    • store (default): assigns the value.

    • store_true: will turn the value of the argument to True if provided, False if not provided. Turned into a checkbox in the UI.

    • store_false: will turn the value of the argument to False if provided, True otherwise. Turned into a checkbox in the UI.

  • nargs: Request multiple values for this field. Set to "*" if multiple values can be provided for this value.``”+”`` can be used if more than one value must be provided. This is used if you want the user to provide a list of arguments, such as list of IDs, URLs etc. The resulting value is made available as a list in the source code.

  • advanced: Set to True if the option should be put into the “Advanced Options” section.

An example implementation:

def getArguments(self):
    return [
        {
            "name": "excel_sheet",
            "default": 0,
            "help": "Excel sheet name. Default: get first sheet.",
        },
        {
            "name": "token",
            "help": "User token",
            "required": true,
        }
    ]
abstract getDataBatch(batch_size)#

Get data from source on batches.

This method is at the core of data loading. It yields batches of records from the data source as a dictionary structure. These records are usually referred to as “rows” in the context of the data loader. The data loader will then use this structure and convert it into Squirro items (see Item Format) using the user-provided mapping configuration.

Examples for this method include:

  • Iterating through a folder and returning each .txt file as a row.

  • Connecting to an RSS feed and returning each news feed article as a row.

  • Connecting to a Redis instance and iterating through keys beginning with "article_" prefix, turning each into a row (just bear in mind that the connecting part should be done within the connect() method).

This method is not expected to return anything. Rather, it will yield items in batches. This allows the data loader to handle very large data loads without overloading the memory.

Batching the rows (as opposed to simply yielding individual rows) makes the process slightly more cumbersome for the data loader plugin, but is essential for performance. By knowing the batch size, the plugin can for example request the correct page size from the database.

It is important to remember that once a batch is yielded, you should clear the old items - otherwise you will return these items twice and also defeat the purpose of efficient processing.

Parameters:

batch_size (int) – How many records to return in each batch.

Return type:

Generator[list[dict[str, Any]], None, None]

Returns:

a list of dictionaries

abstract getSchema()#

Return the schema of the dataset.

This is a list of all the source columns and is used to show the valid column options and to expand the wildcards inside the facets configuration file.

If this is dynamically decided, it may make sense to return all the keys from the first result from getDataBatch. In the example above (where a custom method getRecords does the actual work), this could be implemented like this:

def getSchema(self) -> List[str]:
    fields = set()
    batch = next(self.getDataBatch(10), [])
    for row in batch:
        fields |= set(row.keys())
    return sorted(fields)
Return type:

list[str]

Returns:

A list containing the names of the columns retrieved from the source.

property preview_mode: bool#

Returns True if the data loader plugin is in preview mode.

See Data Loader Plugin Preview for how to deal with this.

property key_value_cache: Cache#

Provides a key-value type cache class implementation with expiring keys.

It can be used, where it is useful and time-saving to preserve the state of some data, but it is not critical. A common example is when your loader makes repeated calls to ask some server for metadata. Let’s say this metadata is expected to change on average every few days. So instead of fetching this metadata with every call, you can cache it and re-use the cached response.

The data will be kept for TTL (time to live), which defaults to a week. However the data may be evicted earlier, as Squirro implements an LRU (least-recently used) algorithm for this data, after a specific memory threshold is met.

This cache can be used like a simple dictionary:

# Retrieving a value
my_key = self.key_value_cache.get('my_key')

# Setting the value
self.key_value_cache['my_key'] = 'hello world'
Returns:

squirro.common.cache.Cache instance.

property key_value_store: Cache#

Provides a key-value type cache class implementation with non-expiring keys.

This is thus a permanent store of information. It should be used for application critical data.

The most common use case of key_value_store is preserving the state of the last run of the dataloader. This is especially critical in long-running data loading jobs, which index thousands of items. Losing the state of what was last loaded can be costly, both in terms of time spent reloading, as well as potential cost incurred as a result of making extra API connections (in the case you have a paid subscription with data provider).

The data written into key_value_store is kept until the user explicitly clears the data. Resetting a data source (using the Reset option in the user interface or --reset on the command line) will achieve this.

Returns:

squirro.common.cache.Cache instance.

getDefaultSourceName()#

Returns the default name to be used for the data source.

This method is used to suggest a default title for the source created in the user interface. If this method has not been implemented, no default title suggestion will be provided. The arguments are available in this method, so this can be used to e.g. suggest a title based on a file name or connection.

For example:

def getDefaultSourceName(self):
    basename = os.path.basename(self.args.filename)
    return f'XYZ data from {basename}'
Return type:

str

Returns:

str

getIncrementalColumns()#

Return available incremental columns.

This is used in the user interface to show the fields for which incremental loading is supported (see Incremental Loading).

There are three possible scenarios for its return value: :rtype: Optional[list[str]]

  • A list of strings is returned. This will provide those values as options to the user to select incremental loading.

  • An empty list is returned (this is the default). This will allow the user to select any column returned by getSchema().

  • None is returned. This will hide the incremental loading options.

Cache Class#

This class is used for cache and state handling as described in API for Caching and Custom State Management.

class Cache#

Class which provides a common interface for all derived cache classes. The methods to be implemented throw an exception by default.

Provides an API like the built-in dictionary class.

__init__()#
__setitem__(key, value)#

Sets the value for the given key.

__getitem__(key)#

Returns the value for the given key.

Raises a KeyError exception when the key does not exist.

__delitem__(key)#

Deletes the value for the given key from the cache.

Raises a KeyError exception when the key does not exist.

__contains__(key)#

Checks if the key exists. Returns True or False.

get(key, default=None)#

Returns the value for the key if it’s in the cache. Returns the specified default value otherwise.

clear(*args, **kwargs)#

Clears the cache by removing it’s entire content.