Example Data Loader Plugin

Example Data Loader Plugin#

Introduction#

The data loader can be easily extended to implement a custom data source. For an introduction see Data Loader CLI Tool.

In this example a quick loader will be implemented that can handle PubMed data in the Medline format. Pubmed is a database of scientific publications for biomedical literature.

The Medline format can be retrieved from the site using a simple export.

Data#

For this example you can use a list of 106 articles that have been manually extracted. Download the file pubmed.zip and extract it into the tutorial folder. This should create a folder called “pubmed”.

A sample file in this folder looks as follows:

<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<pre>
PMID- 26785463
OWN - NLM
STAT- Publisher
DA  - 20160119
LR  - 20160119
IS  - 1095-9203 (Electronic)
IS  - 0036-8075 (Linking)
VI  - 350
IP  - 6265
DP  - 2015 Dec 4
TI  - Teamwork: The tumor cell edition.
PG  - 1174-1175
FAU - Cleary, Allison S
AU  - Cleary AS
AD  - Pennsylvania State University College of Medicine, Hershey PA 17078, USA.
      [email protected].
LA  - ENG
PT  - JOURNAL ARTICLE
TA  - Science
JT  - Science (New York, N.Y.)
JID - 0404511
CRDT- 2016/01/20 06:00
AID - 350/6265/1174 [pii]
AID - 10.1126/science.aad7103 [doi]
PST - ppublish
SO  - Science. 2015 Dec 4;350(6265):1174-1175.
</pre>

It becomes quickly obvious that this is mostly a textual format consisting of key/value pairs.

Data Loader Command#

Note

Labels were previously referred to as facets in the Squirro UI. You will still see references to facets in the code, and in some places within the Squirro UI. All facets can be treated as labels.

To import this format, start by specifying the data load command for the command line data loader:

squirro_data_load \
    -v \
    --cluster $CLUSTER \
    --project-id $PROJECT_ID \
    --token $TOKEN \
    --source-script medline.py \
    --source-path pubmed \
    --map-id PMID \
    --map-title TI \
    --map-created-at DA \
    --map-body AB \
    --source-name "PubMed" \
    --facets-file facets.json

There is one key change in this command compared to using a built-in format: instead of using the --source-type argument, this uses the --source-script. That script will be defined below and defines how Medline data is processed.

The mapping is done using these keys that were present above in the example.

The labels file (called facets as a JSON file and in the code) is also quite straightforward and makes sure that some of those keywords are indexed as item keywords. Use this facets.json file:

{
    "DA": {
        "data_type": "datetime",
        "input_format_string": "%Y%m%d"
    },
    "JT": {
        "name": "Journal"
    },
    "PT": {
        "name": "Publication Type"
    },
    "PST": {
        "name": "Publication Status"
    },
    "OWN": {
        "name": "Owner"
    },
    "STAT": {
        "name": "Status"
    },
    "FAU": {
        "name": "Author",
        "delimiter": "|"
    }
}

Plugin File#

The last step is to create the actual data source. That is a bit more involved. The main blocks are commented below. The goal of this data source is to go through all the Medline files on the disk (as specified with the --source-path argument) and for each of those files return one dictionary. That dictionary is then processed by the data loader through the mappings, label configurations, templates, etc. in the exact same way as if it had come straight from a CSV file or a SQL database.

# -*- coding: utf-8 -*-
"""Data source implementation for PubMed Medline data.

Data is expected to be on disk, hierarchically stored in the `source_path`.
"""
import codecs
import collections
import logging
import os

from squirro.dataloader.data_source import DataSource

log = logging.getLogger(__name__)


KEYS = ['AB', 'CI', 'AD', 'IRAD', 'AID', 'AU', 'AUID', 'FAU', 'BTI', 'CTI',
        'CN', 'CRDT', 'DCOM', 'DA', 'LR', 'DEP', 'DP', 'EN', 'ED', 'FED',
        'EDAT', 'GS', 'GN', 'GR', 'IR', 'FIR', 'ISBN', 'IS', 'IP', 'TA', 'JT',
        'LA', 'LID', 'MID', 'MHDA', 'MH', 'JID', 'RF', 'OAB', 'OABL', 'OCI',
        'OID', 'OT', 'OTO', 'OWN', 'PG', 'PS', 'FPS', 'PL', 'PHST', 'PST',
        'PT', 'PUBM', 'PMC', 'PMCR', 'PMID', 'RN', 'NM', 'SI', 'SO', 'SFM',
        'STAT', 'SB', 'TI', 'TT', 'VI', 'VTI']


class MedLineSource(DataSource):
    def __init__(self):
        self.args = None

    def connect(self, inc_column=None, max_inc_value=None):
        """Create connection with the source."""
        if not os.path.isdir(self.args.source_path):
            raise IOError("Folder {} does not exist".format(self.args.source_path))

    def disconnect(self):
        """Disconnect from the source."""
        pass

    def getDataBatch(self, batch_size):
        """
        Generator - Get data from source on batches.

        :returns a list of dictionaries
        """
        for root, dirs, files in os.walk(self.args.source_path):
            items = []
            for fname in files:
                item = self._parse_file(os.path.join(root, fname))
                if item and (not item.get('TI') or not item.get('OWN')):
                    log.warn('Missing data %r', fname)
                elif item:
                    items.append(item)
            if items:
                yield items

    def getJobId(self):
        """
        Return a unique string for each different select
        :returns a string
        """
        return os.path.basename(self.args.source_path)

    def getSchema(self):
        """
        Return the schema of the data set
        :returns a List containing the names of the columns retrieved from the source
        """
        return KEYS

    def getArguments(self):
        """
        Return source arguments.
        """
        return [
            {
                "name": "source_path",
                "help": "Path of MedLine data folder.",
            }
        ]

    def _parse_file(self, file_name):
        """
        :param file: Medline text file
        :return: Dictionary with all the key/value pairs from the file.
                 Multi-value keys are joined with a pipe (`|`).
        """
        ret = collections.defaultdict(list)
        key = None
        value = None

        try:
            with codecs.open(file_name, encoding='utf8') as file:
                for line in file:
                    if 'Error occurred:' in line:
                        log.warn("Encountered error in file: %s", file_name)
                        return None
                    if line[0] == '<':
                        # Ignore the XML lines at the beginning and end.
                        continue
                    elif line[0:4].strip():
                        # This introduces a new key / value
                        if key:
                            ret[key].append(value)
                        key = line[0:4].strip()
                        value = line[6:].strip()
                    elif line.strip():
                        # No new key, this is a continuation of the value from
                        # the last key.
                        value += ' ' + line.strip()
                if key:
                    ret[key].append(value)

        except Exception as err:
            log.error("Problem parsing file: %s with error %r", file_name, err)

        item = {}
        for key, value in ret.iteritems():
            item[key] = '|'.join(value)
        for key in KEYS:
            if key not in item:
                item[key] = None
        return item