Example Data Loader Plugin#
Introduction#
The data loader can be easily extended to implement a custom data source. For an introduction see Data Loader CLI Tool.
In this example a quick loader will be implemented that can handle PubMed data in the Medline format. Pubmed is a database of scientific publications for biomedical literature.
The Medline format can be retrieved from the site using a simple export.
Data#
For this example you can use a list of 106 articles that have been manually extracted. Download the file pubmed.zip and extract it into the tutorial folder. This should create a folder called “pubmed”.
A sample file in this folder looks as follows:
<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<pre>
PMID- 26785463
OWN - NLM
STAT- Publisher
DA - 20160119
LR - 20160119
IS - 1095-9203 (Electronic)
IS - 0036-8075 (Linking)
VI - 350
IP - 6265
DP - 2015 Dec 4
TI - Teamwork: The tumor cell edition.
PG - 1174-1175
FAU - Cleary, Allison S
AU - Cleary AS
AD - Pennsylvania State University College of Medicine, Hershey PA 17078, USA.
[email protected].
LA - ENG
PT - JOURNAL ARTICLE
TA - Science
JT - Science (New York, N.Y.)
JID - 0404511
CRDT- 2016/01/20 06:00
AID - 350/6265/1174 [pii]
AID - 10.1126/science.aad7103 [doi]
PST - ppublish
SO - Science. 2015 Dec 4;350(6265):1174-1175.
</pre>
It becomes quickly obvious that this is mostly a textual format consisting of key/value pairs.
Data Loader Command#
Note
Labels were previously referred to as facets in the Squirro UI. You will still see references to facets in the code, and in some places within the Squirro UI. All facets can be treated as labels.
To import this format, start by specifying the data load command for the command line data loader:
squirro_data_load \
-v \
--cluster $CLUSTER \
--project-id $PROJECT_ID \
--token $TOKEN \
--source-script medline.py \
--source-path pubmed \
--map-id PMID \
--map-title TI \
--map-created-at DA \
--map-body AB \
--source-name "PubMed" \
--facets-file facets.json
There is one key change in this command compared to using a built-in format: instead of using the --source-type
argument, this uses the --source-script
. That script will be defined below and defines how Medline data is processed.
The mapping is done using these keys that were present above in the example.
The labels file (called facets as a JSON file and in the code) is also quite straightforward and makes sure that some of those keywords are indexed as item keywords. Use this facets.json
file:
{
"DA": {
"data_type": "datetime",
"input_format_string": "%Y%m%d"
},
"JT": {
"name": "Journal"
},
"PT": {
"name": "Publication Type"
},
"PST": {
"name": "Publication Status"
},
"OWN": {
"name": "Owner"
},
"STAT": {
"name": "Status"
},
"FAU": {
"name": "Author",
"delimiter": "|"
}
}
Plugin File#
The last step is to create the actual data source. That is a bit more involved. The main blocks are commented below. The goal of this data source is to go through all the Medline files on the disk (as specified with the --source-path
argument) and for each of those files return one dictionary. That dictionary is then processed by the data loader through the mappings, label configurations, templates, etc. in the exact same way as if it had come straight from a CSV file or a SQL database.
# -*- coding: utf-8 -*-
"""Data source implementation for PubMed Medline data.
Data is expected to be on disk, hierarchically stored in the `source_path`.
"""
import codecs
import collections
import logging
import os
from squirro.dataloader.data_source import DataSource
log = logging.getLogger(__name__)
KEYS = ['AB', 'CI', 'AD', 'IRAD', 'AID', 'AU', 'AUID', 'FAU', 'BTI', 'CTI',
'CN', 'CRDT', 'DCOM', 'DA', 'LR', 'DEP', 'DP', 'EN', 'ED', 'FED',
'EDAT', 'GS', 'GN', 'GR', 'IR', 'FIR', 'ISBN', 'IS', 'IP', 'TA', 'JT',
'LA', 'LID', 'MID', 'MHDA', 'MH', 'JID', 'RF', 'OAB', 'OABL', 'OCI',
'OID', 'OT', 'OTO', 'OWN', 'PG', 'PS', 'FPS', 'PL', 'PHST', 'PST',
'PT', 'PUBM', 'PMC', 'PMCR', 'PMID', 'RN', 'NM', 'SI', 'SO', 'SFM',
'STAT', 'SB', 'TI', 'TT', 'VI', 'VTI']
class MedLineSource(DataSource):
def __init__(self):
self.args = None
def connect(self, inc_column=None, max_inc_value=None):
"""Create connection with the source."""
if not os.path.isdir(self.args.source_path):
raise IOError("Folder {} does not exist".format(self.args.source_path))
def disconnect(self):
"""Disconnect from the source."""
pass
def getDataBatch(self, batch_size):
"""
Generator - Get data from source on batches.
:returns a list of dictionaries
"""
for root, dirs, files in os.walk(self.args.source_path):
items = []
for fname in files:
item = self._parse_file(os.path.join(root, fname))
if item and (not item.get('TI') or not item.get('OWN')):
log.warn('Missing data %r', fname)
elif item:
items.append(item)
if items:
yield items
def getJobId(self):
"""
Return a unique string for each different select
:returns a string
"""
return os.path.basename(self.args.source_path)
def getSchema(self):
"""
Return the schema of the data set
:returns a List containing the names of the columns retrieved from the source
"""
return KEYS
def getArguments(self):
"""
Return source arguments.
"""
return [
{
"name": "source_path",
"help": "Path of MedLine data folder.",
}
]
def _parse_file(self, file_name):
"""
:param file: Medline text file
:return: Dictionary with all the key/value pairs from the file.
Multi-value keys are joined with a pipe (`|`).
"""
ret = collections.defaultdict(list)
key = None
value = None
try:
with codecs.open(file_name, encoding='utf8') as file:
for line in file:
if 'Error occurred:' in line:
log.warn("Encountered error in file: %s", file_name)
return None
if line[0] == '<':
# Ignore the XML lines at the beginning and end.
continue
elif line[0:4].strip():
# This introduces a new key / value
if key:
ret[key].append(value)
key = line[0:4].strip()
value = line[6:].strip()
elif line.strip():
# No new key, this is a continuation of the value from
# the last key.
value += ' ' + line.strip()
if key:
ret[key].append(value)
except Exception as err:
log.error("Problem parsing file: %s with error %r", file_name, err)
item = {}
for key, value in ret.iteritems():
item[key] = '|'.join(value)
for key in KEYS:
if key not in item:
item[key] = None
return item