SourcesMixin

class squirro_client.topic.SourcesMixin

Bases: object

Methods Summary

delete_source(project_id, source_id)

Delete an existing Source.

get_max_inc(project_id, source_id)

Fetches the maximum incremental value of a source.

get_preview(project_id, config)

Preview the source configuration.

get_source(project_id, source_id[, …])

Get source details.

get_source_logs(project_id, source_id, …)

Get the run logs of a particular source run.

get_sources(project_id[, include_config, …])

Get all sources for the provided project.

get_sources_v1(project_id[, search, status, …])

Get sources for the provided project.

kill_source(project_id, source_id)

Try to terminate (SIGTERM) a dataload job if it is already running.

modify_source(project_id, source_id[, name, …])

Modify an existing source.

new_source(project_id, name, config[, …])

Create a new source.

pause_source(project_id, source_id)

Pause a source.

reset_source(project_id, source_id[, …])

Resets and run the source.

resume_source(project_id, source_id)

Resume a paused source.

run_source(project_id, source_id)

Runs a source now.

set_max_inc(project_id, source_id, max_inc_value)

Sets the maximum incremental value of the incremental column of a source.

Methods Documentation

delete_source(project_id, source_id)

Delete an existing Source.

Parameters
  • project_id – Project identifier.

  • source_id – Source identifier.

Example:

>>> client.delete_source('Vlg5Z1hOShm0eYmjtsqSqg',
...                      'oTvI6rlaRmKvmYCfCvLwpw')
get_max_inc(project_id, source_id)

Fetches the maximum incremental value of a source.

Parameters
  • project_id – Project identifier.

  • source_id – Source identifier.

Example:

>>> client.get_max_inc(
...     'Vlg5Z1hOShm0eYmjtsqSqg',
...     'hw8j7LUBRM28-jAellgQdA')
get_preview(project_id, config)

Preview the source configuration.

Parameters
  • project_id – Project identifier.

  • config – Provider configuration.

Returns

A dictionary which contains the source preview items.

Example:

>>> client.get_preview(
...     project_id,
...     config={
...         "dataloader_plugin_options": {
...             "source_file": "path:/tmp/test.csv"
...         },
...         "dataloader_options": {
...             "plugin_name": "csv_plugin",
...             "project_id": project_id,
...             "map_title": "title"
...             }
...         })

{
  "count": 2,
  "items": [
    {
      "id": "CTHQDLwzQsOq93zHAUcCRg",
      "name": "name01",
      "title": "title01"
    },
    {
      "id": ",yYNWBDgQQ2Uhuz32boDAg",
      "name": "name02",
      "title": "title02"
    }
  ],
  "data_schema": [
    "name",
    "title"
  ]
}
get_source(project_id, source_id, include_config=None, include_run_stats=None, include_pipeline_backlog=None)

Get source details.

Parameters
  • project_id – Project identifier.

  • source_id – Source identifier.

  • include_config – Bool, whether or not to include the config for the Source.

  • include_run_stats – Bool, whether or not to include the run stats for the Source.

  • include_pipeline_backlog – Bool, whether or not to include the the backlog of items in the data pipeline for this source.

Returns

A dictionary which contains the source.

Example:

>>> client.get_source(
...     project_id='Vlg5Z1hOShm0eYmjtsqSqg',
...     source_id='pqTn4vBZRdS5hYw0TBt0pQ',
...     include_config=True,
...     include_run_stats=True,
...     include_pipeline_backlog=True)

{
  "items_fetched_total": 2,
  "last_error": "",
  "last_check_at": "2019-01-23T10:23:23",
  "last_items_at": "2019-01-23T10:23:23",
  "paused": false,
  "error_count": 0,
  "id": "pqTn4vBZRdS5hYw0TBt0pQ",
  "total_error_count": 0,
  "project_id": "Vlg5Z1hOShm0eYmjtsqSqg",
  "config": {
    "dataloader_plugin_options": {
      "source_file": "path:/tmp/test.csv"
    },
    "dataloader_options": {
      "map_title": "title",
      "project_id": "Vlg5Z1hOShm0eYmjtsqSqg",
      "plugin_name": "csv_plugin"
    }
  },
  "status": "complete",
  "total_runs": 1,
  "pipeline_workflow_id": "S0fVQ-K0TmS1UgT0msZRBA",
  "last_error_at": null,
  "last_update_at": "2019-01-23T10:23:23",
  "last_success_at": "2019-01-23T10:23:23",
  "items_fetched_last_run": 2,
  "tenant": "squirro",
  "next_run_time_at": "2019-01-23T10:52:51",
  "name": "test source",
  "scheduling_options": {
    "repeat": "30m",
    "schedule": true
  },
  "created_at": "2019-01-23T10:23:23",
  "modified_at": "2019-01-23T10:23:23",
  "processed": true,
  "pipeline_backlog": 10
}
get_source_logs(project_id, source_id, last_n_log_lines)

Get the run logs of a particular source run.

Parameters
  • project_id – Project identifier.

  • source_id – Source identifier.

  • last_n_log_lines – Last n log lines from the last run of the source.

Example:

>>> client.get_source_logs('Vlg5Z1hOShm0eYmjtsqSqg',
...                     'hw8j7LUBRM28-jAellgQdA',
...                     10)
get_sources(project_id, include_config=None, include_run_stats=None, include_pipeline_backlog=None)

Get all sources for the provided project.

Parameters
  • project_id (str) – Project identifier.

  • include_config (Optional[bool]) – Bool, whether or not to include the config for all the Sources.

  • include_run_stats (Optional[bool]) – Bool, whether or not to include the run stats for all the Sources.

  • include_pipeline_backlog (Optional[bool]) – Bool, whether or not to include the the backlog of items in the data pipeline for sources.

Returns

A list of sources.

Example:

>>> client.get_sources(
...    project_id='Vlg5Z1hOShm0eYmjtsqSqg',
...    include_config=True,
...    include_run_stats=True,
...    include_pipeline_backlog=True)

[
  {
    "items_fetched_total": 2,
    "last_error": "",
    "last_check_at": "2019-01-23T10:23:23",
    "last_items_at": "2019-01-23T10:23:23",
    "paused": false,
    "error_count": 0,
    "id": "pqTn4vBZRdS5hYw0TBt0pQ",
    "total_error_count": 0,
    "project_id": "Vlg5Z1hOShm0eYmjtsqSqg",
    "config": {
      "dataloader_plugin_options": {
        "source_file": "path:/tmp/test.csv"
      },
      "dataloader_options": {
        "map_title": "title",
        "project_id": "Vlg5Z1hOShm0eYmjtsqSqg",
        "plugin_name": "csv_plugin"
      }
    },
    "status": "complete",
    "total_runs": 1,
    "pipeline_workflow_id": "S0fVQ-K0TmS1UgT0msZRBA",
    "last_error_at": null,
    "last_update_at": "2019-01-23T10:23:23",
    "last_success_at": "2019-01-23T10:23:23",
    "items_fetched_last_run": 2,
    "tenant": "squirro",
    "next_run_time_at": "2019-01-23T10:52:51",
    "name": "test source",
    "scheduling_options": {
      "repeat": "30m",
      "schedule": true
    },
    "created_at": "2019-01-23T10:23:23",
    "modified_at": "2019-01-23T10:23:23",
    "processed": true,
    "pipeline_backlog": 10
  }
]
get_sources_v1(project_id, search=None, status=None, plugin_name=None, include=None, counts_agg=None, start=0, count=- 1)

Get sources for the provided project.

Parameters
  • project_id (str) – Project identifier.

  • search (Optional[str]) – filter by part of Source name

  • status (Optional[str]) – filter by Source status

  • plugin_name (Optional[str]) – filter by Source plugin_name

  • include (Optional[str]) – comma seperated list of additional Source fields. config - Source config run_stats - Source run stats pipeline_backlog - Source backlog information from ingester items_indexed - number of indexed items from Source

  • counts_agg (Optional[str]) – whether or not to include Sources field value aggregations. Specified as comma seperated string of aggregated fields. Possible fields: status,plugin_name.

  • start (int) – Integer. Used for pagination of objects. If set, the objects starting with offset start are returned.

  • count (int) – Integer. Used for pagination of objects. If set, count number of objects are returned. To return all objects, set to -1.

Returns

A paginated list of sources.

Example:

>>> client.get_sources_v1(
...    project_id='Vlg5Z1hOShm0eYmjtsqSqg',
...    include="config,run_stats,pipeline_backlog,items_indexed",
...    counts_agg="status,plugin_name",
...    count=1,
... )

{
  "count": 1,
  "sources": [
    {
      "items_fetched_total": 2,
      "last_error": "",
      "last_check_at": "2019-01-23T10:23:23",
      "last_items_at": "2019-01-23T10:23:23",
      "paused": false,
      "error_count": 0,
      "id": "pqTn4vBZRdS5hYw0TBt0pQ",
      "total_error_count": 0,
      "project_id": "Vlg5Z1hOShm0eYmjtsqSqg",
      "config": {
        "dataloader_plugin_options": {
          "source_file": "path:/tmp/test.csv"
        },
        "dataloader_options": {
          "map_title": "title",
          "project_id": "Vlg5Z1hOShm0eYmjtsqSqg",
          "plugin_name": "csv_plugin"
        }
      },
      "status": "complete",
      "total_runs": 1,
      "pipeline_workflow_id": "S0fVQ-K0TmS1UgT0msZRBA",
      "last_error_at": null,
      "last_update_at": "2019-01-23T10:23:23",
      "last_success_at": "2019-01-23T10:23:23",
      "items_fetched_last_run": 2,
      "tenant": "squirro",
      "next_run_time_at": "2019-01-23T10:52:51",
      "name": "test source",
      "scheduling_options": {
        "repeat": "30m",
        "schedule": true
      },
      "created_at": "2019-01-23T10:23:23",
      "modified_at": "2019-01-23T10:23:23",
      "processed": true,
      "pipeline_backlog": 10,
      "items_indexed": 100
    }
  ],
  "counts_agg": {
    "plugin_name": {"values": [{"key": "csv_plugin", "value": 3}]},
    "status": {"values": [{"key": "queued", "value": 2}, {"key": "running", "value": 0}, {"key": "errored", "value": 0}, {"key": "complete", "value": 1}]}
  },
  "total": 3,
  "next_params": {
    "count": 1,
    "start": 1
  }
}
kill_source(project_id, source_id)

Try to terminate (SIGTERM) a dataload job if it is already running. After a fixed timeout, a SIGKILL signal is sent instead.

Parameters
  • project_id – Project identifier.

  • source_id – Source identifier.

Example:

>>> client.kill_source('Vlg5Z1hOShm0eYmjtsqSqg',
...                     'hw8j7LUBRM28-jAellgQdA')
modify_source(project_id, source_id, name=None, config=None, scheduling_options=None, pipeline_workflow_id=None, enable_scheduling=None, validate_schema=None, notify_scheduler=None, execute_rerun=None, priority=None)

Modify an existing source.

Parameters
  • project_id – Project identifier.

  • source_id – Source identifier.

  • name – Name for the Source.

  • config – Changed config of the source.

  • scheduling_options – dict, scheduling options for the run of a Source.

  • pipeline_workflow_id – Optional pipeline workflow id to change the source to.

  • enable_scheduling – DEPRECATED; Will be removed in a future release. Optional boolean. Indicate whether or not to enable the scheduling of this source.

  • validate_schema – Optional boolean. Indicate whether or not to validate the provided configuration of the source.

  • notify_scheduler – Optional boolean. Indicate whether or not to notify the scheduler to immediately start the procedure of loading data from the source.

  • execute_rerun – Optional boolean. Indicate whether or not to queue for reprocessing the batches (if any) of this source.

  • priority – Optional string parameter to define the priority for the source.

Returns

A dictionary which contains the source.

Example:

>>> client.modify_source(
...     project_id='Vlg5Z1hOShm0eYmjtsqSqg',
...     source_id='601AoqmkSFWGt4sAwaX8ag',
...     name="new name")

{
  "pipeline_workflow_id": "S0fVQ-K0TmS1UgT0msZRBA",
  "name": "new name",
  "scheduling_options": {
    "repeat": "30m",
    "schedule": true
  },
  "created_at": "2019-01-23T10:32:13",
  "modified_at": "2019-01-23T10:34:41",
  "paused": false,
  "processed": true,
  "project_id": "Vlg5Z1hOShm0eYmjtsqSqg",
  "id": "601AoqmkSFWGt4sAwaX8ag",
  "tenant": "squirro"
}
new_source(project_id, name, config, scheduling_options=None, pipeline_workflow_id=None, source_id=None, paused=False, use_default_options=None, notify_scheduler=None, priority=None)

Create a new source.

Parameters
  • project_id – Project identifier.

  • name – Name for the Source.

  • config – dict, config including dataloader_options and dataloader_plugin_options for the Source.

  • scheduling_options – dict, scheduling options for the run of a Source.

  • pipeline_workflow_id – Optional id of the pipeline workflow to apply to the data of this Source. If not specified, then the default workflow of the project with project_id will be applied.

  • source_id – Optional string parameter to create the source with the provided id. The length of the parameter must be 22 characters. Useful when exporting and importing projects across multiple Squirro servers.

  • paused – Optional boolean. Indicate whether to immediately start data loading, or rather create the source in a paused state

  • use_default_options – Optional boolean. Indicate whether or not to use the default mappings for facets, fields, scheduling_options and pipeline workflow provided by the dataloader plugin itself. Setting this to True will throw a 400 HTTP error code if these default mappings are not available for a specific plugin

  • notify_scheduler – Optional boolean. Indicate whether or not to notify the scheduler to immediately start the procedure of loading data from the source.

  • priority – Optional string parameter to define the priority for the source.

Returns

A dictionary which contains the new source.

Example:

>>> client.new_source(
...     project_id='Vlg5Z1hOShm0eYmjtsqSqg',
...     name='test source',
...     config={
...         "dataloader_plugin_options": {
...             "source_file": "path:/tmp/test.csv"
...         },
...         "dataloader_options": {
...             "plugin_name": "csv_plugin",
...             "project_id": 'Vlg5Z1hOShm0eYmjtsqSqg',
...             "map_title": "title"
...             }
...         },
...     scheduling_options={'schedule': True, 'repeat': '30m'})

{
  "items_fetched_total": 0,
  "last_error": "",
  "last_check_at": null,
  "last_items_at": null,
  "paused": false,
  "error_count": 0,
  "id": "601AoqmkSFWGt4sAwaX8ag",
  "total_error_count": 0,
  "project_id": "Vlg5Z1hOShm0eYmjtsqSqg",
  "status": "queued",
  "total_runs": 0,
  "pipeline_workflow_id": "S0fVQ-K0TmS1UgT0msZRBA",
  "last_error_at": null,
  "last_update_at": null,
  "last_success_at": null,
  "items_fetched_last_run": 0,
  "tenant": "squirro",
  "next_run_time_at": "2019-01-23T10:32:13",
  "name": "test source",
  "scheduling_options": {
    "repeat": "30m",
    "schedule": true
  },
  "created_at": "2019-01-23T10:32:13",
  "modified_at": "2019-01-23T10:32:13",
  "processed": false
}
pause_source(project_id, source_id)

Pause a source.

Parameters
  • project_id – Project identifier.

  • source_id – Source identifier.

Example:

>>> client.pause_source('Vlg5Z1hOShm0eYmjtsqSqg',
...                     'hw8j7LUBRM28-jAellgQdA')
reset_source(project_id, source_id, delete_source_data=None)

Resets and run the source.

Parameters
  • project_id – Project identifier.

  • source_id – Source identifier.

  • delete_source_data – Bool, to determine whether to delete the data associated with a source or not

Example:

>>> client.reset_source(
...     'Vlg5Z1hOShm0eYmjtsqSqg',
...     'hw8j7LUBRM28-jAellgQdA')
resume_source(project_id, source_id)

Resume a paused source.

Parameters
  • project_id – Project identifier.

  • source_id – Source identifier.

Example:

>>> client.resume_source(
...     'Vlg5Z1hOShm0eYmjtsqSqg',
...     'hw8j7LUBRM28-jAellgQdA')
run_source(project_id, source_id)

Runs a source now.

Parameters
  • project_id – Project identifier.

  • source_id – Source identifier.

Example:

>>> client.run_source(
...     'Vlg5Z1hOShm0eYmjtsqSqg',
...     'hw8j7LUBRM28-jAellgQdA')
set_max_inc(project_id, source_id, max_inc_value)

Sets the maximum incremental value of the incremental column of a source.

Parameters
  • project_id – Project identifier.

  • source_id – Source identifier.

  • max_inc_val – The maximum incremental value to be set.

Example:

>>> client.set_max_inc(
...     'Vlg5Z1hOShm0eYmjtsqSqg',
...     'hw8j7LUBRM28-jAellgQdA',
...     '2020-08-17T19:10:33')