How To Write a Custom 1-Click Connector#

This tutorial goes step by step through building a custom 1-click connector. It describes in detail the whole process from setting up the environment to upload the custom plugin. By providing code from already-built examples it gives an insight into how particular parts actually work, and it shows the best practices of building 1-click connectors.

Introduction#

1-click connectors are an easy way to fetch data from different sources without any configuration on the user side. By using OAuth2 authentication and providing pre-defined custom mappings, the whole loading process is very intuitive, easy, and it is almost just one click.

Prerequisites#

  • To get started, install the Squirro Toolbox.

  • You will also need access to a Squirro server.

Tip: A self-service instance is a good way to get started if you do not yet have access to a Squirro server.

Getting Started#

This tutorial uses the code examples from the official OneDrive connector.

Setting up an OAuth app#

The first step to create a 1-click connector is setting up an OAuth app. That process deeply depends on the provider. The commons things which are necessary in most cases are:

  • Creating an app

  • Defining scopes

  • Providing redirect URL

At the end of that process, you should have access to client keys.

In most cases, the naming convention is Client ID and Client Secret, but it may differ depending on the provider (In Dropbox case this is App Key and App Secret).

There are some rules which should be followed:

Name of the application#

When creating an app only for local testing don’t use the names which can be used later to creating official apps e.g. “Squirro”, “Squirro OneDrive Connector” etc. In some cases it makes impossible to use these names in official apps later.

When creating an official Squirro app we follow the naming convention:

  • Squirro <PROVIDER> Connector

Or if somehow the above convention is not possible:

  • Squirro <PROVIDER>

Scopes

As a general rule, choose the most narrowly focused scope possible, and avoid requesting scopes that creating app does not actually need.

Redirect URL

When creating an app for local testing using a Squirro in a box, use the local host redirect URL e.g. http://localhost:8300/dataloader/onedrive_plugin/pl/azure/authorized:

When creating an app to be used by a data loader deployed on a self-service server, please use the following URL:

  • https://start.squirro.com/oauth2_middleman

Example:

  • Squirro OneDrive Connector -> https://start.squirro.com/oauth2_middleman

Configuring INI files#

The OAuth2 client ID and Secret should be set in Squirro’s configuration files. To do that edit the /etc/squirro/common.ini file and add the following lines:

[dataloader]
<provider>_client_id = CLIENT ID
<provider>_client_secret = CLIENT SECRET

Replace <provider> with the name of the related plugin name e.g.:

onedrive_client_id = CLIENT ID
onedrive_client_secret = CLIENT SECRET

Note: Use separate configuration keys, even if multiple plugins are using same OAuth2 provider. Each plugin should be configured to use its own OAuth2 Application.

For a local setup without HTTPS also /etc/squirro/frontend.ini and add the following lines:

[dataloader]
OAUTHLIB_INSECURE_TRANSPORT=true

NOTE: If the section [dataloader] already exists, copy the line into the existing section.

When registering the OAuth2 application to http://start.squirro.com do not register plugin-specific URL, but instead register the oauth2_middleman endpoint URL.

Redirect URI example: https://start.squirro.com/oauth2_middleman.

Add the same URL to Squirro configuration file /etc/squirro/frontend.ini

[dataloader]
oauth2_middleman_url=https://start.squirro.com/oauth2_middleman

After making these changes, restart Squirro.

Creating a plugin#

Note

Labels were previously referred to as facets in the Squirro UI. You will still see references to facets in the code, and in some places within the Squirro UI. All facets can be treated as labels.

Example of file tree for OneDrive connector:

File

Required?

Purpose

__init__.py

Yes

Marks it as Python package

README.md

No

Describes plugin installation steps, like OAuth2 app configuration

auth.py

Yes for 1-click

Deals with OAuth2 or other authorization process

dataloader_plugin.json

Yes

Contains references to other files and plugin name

facets.json

Yes for 1-click

Describes common selection of labels (facets)

icon.png

Yes

Plugin icon

mappings.json

Yes

Mapping of data loader Items to Squirro fields

onedrive_plugin.py

Yes

Core code

pipeline_workflow.json

Yes for 1-click

Default pipeline workflow for this plugin

requirements.txt

Describes Python dependencies

scheduling_options.json

Yes for 1-click

Defines scheduling defaults

When creating your own plugin replace onedrive in the file and folder names with a name corresponding to connector target service. Use alphanumeric, lowercase characters.

Configuration files#

To be able to create a 1-click connector there is a need to provide pre-defined configuration files. These files speed up the plugin setup process and make it much easier for the user to load the data from the source. For an explanation of each of the listed files, see Data Loader Plugin Boilerplate.

dataloader_plugin.json#

{
    "title": "OneDrive",
    "description": "Index files from Microsoft OneDrive and SharePoint",
    "plugin_file": "onedrive_plugin.py",

    "auth_file": "auth.py",
    "auth_description": "Authenticate with Microsoft Azure for Squirro to get permission to access your data.",

    "category": "enterprise",
    "thumbnail_file": "icon.png",

    "scheduling_options_file": "scheduling_options.json",
    "dataloader_options_file": "mappings.json",
    "pipeline_workflow_file": "pipeline_workflow.json"
}

The file specifies general information about a plugin. Title, description or category are described here. It also specifies which files should be loaded to authorization, scheduling etc.

mappings.json

{
    "map_id": "id",
    "map_title": "name",
    "map_created_at": "createdDateTime",
    "map_file_name": "name",
    "map_file_mime": "file.mimeType",
    "map_file_data": "content",
    "map_url": "webUrl",
    "facets_file": "facets.json"
}

The file is used to set the mapping of various fields coming from source to corresponding Squirro item fields. In that place also a file containing the labels can be specified.

facets.json

{
    "createdBy.user.displayName": {
        "name": "creator",
        "display_name": "Creator",
        "visible": true,
        "searchable": true,
        "typeahead": true,
        "analyzed": true
    }
}

It creates and specifies what labels should be use in the plugin.

Pay close attention on how you set up the labels.

  • Each added labels increases the index size and slows down query times.

  • Use only a few which can be useful for the end-user and be consistent across plugins.

  • If a label is only needed for filtering or simple aggregations, set analyzed to false to reduce size / performance impact.

  • Check if similar labels were already used in existing plugins and try to use the same naming convention e.g. Author or Owner label in new plugin could be treated also as Creator in others

pipeline_workflow.json

{
    "steps": [
        {
            "config": {
                "policy": "replace"
            },
            "id": "deduplication",
            "name": "Deduplication",
            "type": "deduplication"
        },
        {
            "config": {
                "fetch_link_content": false
            },
            "id": "content-augmentation",
            "name": "Content Augmentation",
            "type": "content-augmentation"
        },
        {
            "config": {},
            "id": "content-conversion",
            "name": "Content Extraction",
            "type": "content-conversion"
        },
        {
            "id": "language-detection",
            "name": "Language Detection",
            "type": "language-detection"
        },
        {
            "id": "cleanup",
            "name": "Content Standardization",
            "type": "cleanup"
        },
        {
            "id": "index",
            "name": "Indexing",
            "type": "index"
        },
        {
            "id": "cache",
            "name": "Cache Cleaning",
            "type": "cache"
        }
    ]
}

Here the steps to create pipeline workflow are described.

NOTE: If your plugin loads binary data such as PDF files, add the Content Augmentation and Content Conversion steps as well.

scheduling_options.json

{
    "schedule": true,
    "repeat": "2h"
}

The file specifies parameters for scheduling the plugin.

facets_extended.json

This file is optional, and it is not used by the plugin, but it only serves to stash some advanced labels in case of future using. If you are able to extract some information from fetched data, and they can be useful in the future you can place them in that file. It allows to just copy the labels to facets.json file instead of reading the documentation and code again to extract useful information.

icon.png

Thumbnail of the data loader plugin displaying on the frontend. Keep the size between 512x512px to 16x16px.

README.md

Each plugin should contain a readme file with instructions on how to configure and use the connector. It is supposed to describe in detail the process how to configure the OAuth app on the provider side, which scopes are necessary, and how to set up and upload the data loader plugin.

requirements.txt

flask==2.0.1
flask_dance
requests

The file lists all the required packages, with one package dependency per line. More information about data loader dependencies: Data Loader Plugin Dependencies.

Auth file#

config = get_injected("config")
auth_tools.configure_oauth2_lib(config)

client_id = config.get("dataloader", "onedrive_client_id", fallback=None)
client_secret = config.get("dataloader", "onedrive_client_secret", fallback=None)

if not client_id or not client_secret:
    log.warning("Client keys are missing in %s plugin", target_name)

The code above loads client keys from the environment variables and configures the necessary OAuth2 options.

NOTE: Pay attention to provide fallback values to the keys and log a warning if that keys won’t be provided. Authorization files for all plugins are loaded when the frontend is initialized, so fallback values prevent the error occurs when some keys has not been configured.

plugin = DataloaderFrontendPlugin(__name__)
auth_blueprint = make_azure_blueprint(
    client_id=client_id,
    client_secret=client_secret,
    scope=[
        "offline_access",
        "https://graph.microsoft.com/Files.Read.All",
        "https://graph.microsoft.com/GroupMember.Read.All",
        "https://graph.microsoft.com/Sites.Read.All",
        "https://graph.microsoft.com/User.ReadBasic.All",
        "https://graph.microsoft.com/User.Read",
    ],
    redirect_to="done",
    login_url="/login",
    session_class=auth_tools.flask_dance_session_factory(config),
)
plugin.register_blueprint(auth_blueprint, url_prefix="/pl")

The next step is creating and registering blueprint. To do that the flask-dance library is used. Necessary information like client keys and scopes are passed as arguments to the creating function. Other arguments:

  • redirect_to - the name of the view to redirect to after the authentication flow is complete.

  • login_url - the URL path for the login view.

  • session_class - provides session based on whether oauth2_middleman_url is set up in the config file.

NOTE: Refresh token is a necessary factor for the proper running plugin. It is used to acquire a new access token when it expired. Pay attention to specify a scope or parameter which is responsible for returning the refresh token from the provider (offline_access in the example above).

NOTE: The flask-Dance library provides blueprints for many well-known providers however sometimes you may not find out-of-the-box solution for your connector. In that case you can easily write your custom blueprint based on existing ones. See the flask-dance contrib folder for examples.

@plugin.route("/")
def start():
    session["next_url"] = auth_tools.assert_valid_next_url(request.args["back_to"])
    login_url = url_for(f"{auth_blueprint.name}.login")
    log.info("Redirecting to %r with next URL %r", login_url, session["next_url"])
    return redirect(login_url)

The whole OAuth flow starts from the / endpoint. The back_to URL is saved in the session in order to know where to come back after the flow will be done. After saving URL there is a redirection to the login page of the provider.

assert_valid_next_url() checks if next_url’s hostname is the same as the requesting one. It provides minimal security for a situation when the user specifies an external URL as the one where he wants to go after authorization.

@plugin.route("/done")
def done():
    next_url = session["next_url"]
    start_url = f"{url_for('start')}?{urllib.parse.urlencode({'back_to': next_url})}"
    token = oauth.token # populated by flask-dance
    if not token:
        log.warning("OAuth token not acquired, redirecting to OAuth `start`")
        return redirect(start_url)

    log.info("Token scope: %s", sorted(token["scope"]))
    token_as_str = _pack_token(token)
    log.info("Redirecting to %r with token of len %d", next_url, len(token_as_str))
    next_url = f"{next_url}?{urllib.parse.urlencode({'token': token_as_str})}"
    return redirect(next_url)

When the authorization process is completed, the view defined as redirect_to parameter in the blueprint is loaded. In the above case this is the done view.

If the token is populated in the oauth storage it means that the user is authorized. Otherwise, the user is redirected back to the / endpoint. When authorization goes properly, the token is passed as query parameters to the next URL.

@plugin.route("/verify", methods=["POST"])
def verify_token():
    token = request.json["token"]
    try:
        login = get_account_id(token)
    except TokenExpiredError:
        log.warning("Token expired")
        token_delete()
        raise
    log.info("Successful login with %s", target_name)
    return jsonify(
        {"message": f"You are logged into {target_name} as {html.escape(login)}."}
    )

The last endpoint to define is /verify. It is used on the frontend to determine and display the logged user identity and should return the message containing information about the actual logged user.

Data loader plugin file#

Data loader plugin files must define a custom Source class which inherits from DataSource class. The data loader plugin boilerplate you can find in Data Loader Plugin Boilerplate.

DataSource#

conn-ect

def connect(self, inc_column: Optional = None, max_inc_value: Optional = None):
    self._stats_reset()
    if self.args.reset:
        log.info("Resetting key-value stores")
        self.key_value_cache.clear()
        self.key_value_store.clear()
    token = self.args.token
    log.debug("Token keys %r", sorted(token.keys()))

    # see if we have access token from previous run
    access_token, access_token_expires_at = self._load_access_token()
    if access_token:
        log.debug("Using access_token from previous run")
    else:
        log.debug("Using access_token supplied in args")
        access_token = token.get("access_token")
        access_token_expires_at = token.get("expires_at")
    log.debug("access_token expires_at %s", access_token_expires_at)

    config = get_injected("config")
    self.client = OnedriveClient(
        access_token=access_token,
        access_token_expiration=(
            datetime.utcfromtimestamp(int(access_token_expires_at))
            if access_token_expires_at
            else None
        ),
        refresh_token=token.get("refresh_token"),
        client_id=config.get("dataloader", "Onedrive_client_id"),
        client_secret=config.get("dataloader", "Onedrive_client_secret"),
        scope=token.get("scope"),
    )

self.args.reset gives the user the possibility to reset the current state of cache and storage.

As was mentioned before after authorization is completed the token is passed to DataSource class using query parameters. self.args.token variable can be used to retrieve that token. However, the access token can be refreshed many times during running data loader. Because of that checking first if the token is not already saved in the storage gives the ability to using the latest token and not the old one got during OAuth flow. If a token is not provided in the storage it is loaded from the query parameters.

The next step is to define a client. Here any third-part library or a custom client can be used. The client has to handle the refreshing token mechanism to be able to fetch a new access token after expiration. Because of that pay attention if a third-party library implements that solution, in the other case creating a custom client is recommended.

disconnect

def disconnect(self):
    log.info("Disconnecting; cleaning up session & dumping state")
    if self.client:
        self._save_access_token(
            self.client.access_token,
            self.client.access_token_expiration.timestamp()
            if self.client.access_token_expiration
            else None,
        )
        self.client.close()
        self.client = None
    self._stats_log()

When the data loader finishes its run or any error occurs, the disconnect method is called. Here all the tear down functions should be called. In the example above the access token is saved to the storage, the client is closed and stats are logged.

NOTE: As was mentioned before, saving access token let us to keep the latest token (token with the longest expiration time). During the run the access token can be refreshed several times, so it’s good idea to save the latest one after the data loader finishes its run. It provides that in the next run the latest token is loaded from the storage and not the old one provided in query parameters (as was showed in the connect method).

getDataBatch

def getDataBatch(
    self, batch_size: Optional[int] = None
) -> Generator[List[Dict[str, Any]], None, None]:
    pass

The method is the core of the data loading process. It contains the whole instruction of how data is fetched and process. This method returns dictionary structure which later data loader convert into Squirro items.

The getDataBatch method deeply depends on the way how a provider returns the data, what is the type of data and what process is needed to extract useful information.

NOTE: Displaying binary data on the preview dashboard is not possible. Because of that when the data loader is running in the preview mode, the binary content of the files should not be downloaded. Moreover, it speeds up the loading preview process. To do that in place of binary content some dummy string should be provided e.g.:

if preview:
    item["file_content"] = "BINARY CONTENT [disabled in Data Preview]"
else:
    download_content()

More about preview mode: Data Loader Plugin Preview.

getSchema

def getSchema(self):
    fields = set(get_mapped_fields())
    # fetch few entries to gather possible fields
    # to speed things up do not fetch the file content
    for batch in self._get_data_batch(10, preview=True):
        for entry in batch:
            fields |= set(entry.keys())
    fields = sorted(fields)
    log.info("Fields: %r", fields)
    return fields

This method is used to decide the valid mapping options. The best practice is to call getDataBatch method there, fetch some data examples and returns all their keys. In the example above 10 items are fetched and all their keys are returned. In simplest cases, where field list is not dynamic, you can always return a hardcoded python list.

NOTE: When calling getDataBatch here, the binary content of the files should not be downloaded (as was described below). It speeds up the process of loading keys to create schema.

getJobId

def getJobId(self) -> str:
    """Generate a stable ID that changes with the main parameters."""
    m = hashlib.blake2b(digest_size=20)
    for v in (
        __plugin_name__,
        __version__,
        self.arg_index_all,
        self.arg_file_size_limit,
        self.arg_batch_size_limit,
        self.arg_download_media_files,
        self.arg_access_id,
    ):
        m.update(repr(v).encode())
    job_id = base64.urlsafe_b64encode(m.digest()).rstrip(b"=").decode()
    log.debug("Job ID: %r", job_id)
    return job_id

Since the project creator can configure multiple data sources with this plugin, we need to be able to distinguish them. The key reasons is to keep state and caching information of each instance separated. Therefore, this method is used to define a unique ID for the actual job. To provide that unique ID as many custom parameters as possible should be used. The common approach is used all arguments which can be set up by the user. In case of one click-connector passing refresh token or access token (arg_access_id in the above code) is one of the best option to provide unique argument to generate stable ID.

getArguments

def getArguments(self):
    return [
        {
            "name": "file_size_limit",
            "display_label": "File Size Limit",
            "default": 50,
            "help": "Size limit in megabytes, if a file is bigger than "
            "this limit, the file content won't be downloaded.",
            "type": "int",
            "advanced": True,
        },
        {
            "name": "batch_size_limit",
            "display_label": "Batch Size Limit",
            "default": 50,
            "help": "Size limit in megabytes for a batch of files (triggering early batch release if necessary).",
            "type": "int",
            "advanced": True,
        },
        {
            "name": "index_all",
            "display_label": "Index All",
            "help": "If set, all files will be indexed, even if content"
            "cannot be retrieved. Those files will be indexed "
            "only with the metadata. These files can still be "
            "found using typeahead search and facet filtering",
            "type": "bool",
            "default": True,
            "action": "store_true",
            "advanced": True,
        },
        {
            "name": "download_media_files",
            "display_label": "Download media files content",
            "help": "If set, content for media files (image, video, audio as"
            "determined by filename) will be downloaded."
            "Otherwise it will be skipped.",
            "type": "bool",
            "default": False,
            "action": "store_true",
            "advanced": True,
        },
    ]

The method specifies custom arguments that allow the user to customize plugin.

If it is only possible the following common arguments should be implemented:

  • file_size_limit - restricts downloading of large files

  • batch_size_limit - allows to implement early batch releasing mechanism

  • download_media_files - gives the possibility to exclude media files

Upload the plugin#

After creating the plugin the following method is used to upload it on the cluster:

squirro_asset dataloader_plugin upload --folder FOLDER_WITH_PLUGIN --token $TOKEN --cluster CLUSTER_IP