SourcesMixin#
- class SourcesMixin#
Bases:
object
Methods Summary
delete_source
(project_id, source_id)Delete an existing Source.
get_max_inc
(project_id, source_id)Fetches the maximum incremental value of a source.
get_preview
(project_id, config)Preview the source configuration.
get_source
(project_id, source_id[, ...])Get source details.
get_source_logs
(project_id, source_id, ...)Get the run logs of a particular source run.
get_sources
(project_id[, include_config, ...])Get all sources for the provided project.
get_sources_v1
(project_id[, search, status, ...])Get sources for the provided project.
kill_source
(project_id, source_id)Try to terminate (SIGTERM) a dataload job if it is already running.
modify_source
(project_id, source_id[, name, ...])Modify an existing source.
new_source
(project_id, name, config[, ...])Create a new source.
pause_source
(project_id, source_id)Pause a source.
reset_source
(project_id, source_id[, ...])Resets and run the source.
resume_source
(project_id, source_id)Resume a paused source.
retry_failed_batches
(project_id, source_id)Move all the failed batches of a given source back for processing.
run_source
(project_id, source_id)Runs a source now.
set_max_inc
(project_id, source_id, max_inc_value)Sets the maximum incremental value of the incremental column of a source.
Methods Documentation
- delete_source(project_id, source_id)#
Delete an existing Source.
- Parameters:
project_id – Project identifier.
source_id – Source identifier.
Example:
>>> client.delete_source('Vlg5Z1hOShm0eYmjtsqSqg', ... 'oTvI6rlaRmKvmYCfCvLwpw')
- get_max_inc(project_id, source_id)#
Fetches the maximum incremental value of a source.
- Parameters:
project_id – Project identifier.
source_id – Source identifier.
Example:
>>> client.get_max_inc( ... 'Vlg5Z1hOShm0eYmjtsqSqg', ... 'hw8j7LUBRM28-jAellgQdA')
- get_preview(project_id, config)#
Preview the source configuration.
- Parameters:
project_id – Project identifier.
config – Provider configuration.
- Returns:
A dictionary which contains the source preview items.
Example:
>>> client.get_preview( ... project_id, ... config={ ... "dataloader_plugin_options": { ... "source_file": "path:/tmp/test.csv" ... }, ... "dataloader_options": { ... "plugin_name": "csv_plugin", ... "project_id": project_id, ... "map_title": "title" ... } ... }) { "count": 2, "items": [ { "id": "CTHQDLwzQsOq93zHAUcCRg", "name": "name01", "title": "title01" }, { "id": ",yYNWBDgQQ2Uhuz32boDAg", "name": "name02", "title": "title02" } ], "data_schema": [ "name", "title" ] }
- get_source(project_id, source_id, include_config=None, include_run_stats=None, include_pipeline_backlog=None)#
Get source details.
- Parameters:
project_id – Project identifier.
source_id – Source identifier.
include_config – Bool, whether or not to include the config for the Source.
include_run_stats – Bool, whether or not to include the run stats for the Source.
include_pipeline_backlog – Bool, whether or not to include the the backlog of items in the data pipeline for this source.
- Returns:
A dictionary which contains the source.
Example:
>>> client.get_source( ... project_id='Vlg5Z1hOShm0eYmjtsqSqg', ... source_id='pqTn4vBZRdS5hYw0TBt0pQ', ... include_config=True, ... include_run_stats=True, ... include_pipeline_backlog=True) { "items_fetched_total": 2, "last_error": "", "last_check_at": "2019-01-23T10:23:23", "last_items_at": "2019-01-23T10:23:23", "paused": false, "error_count": 0, "id": "pqTn4vBZRdS5hYw0TBt0pQ", "total_error_count": 0, "project_id": "Vlg5Z1hOShm0eYmjtsqSqg", "config": { "dataloader_plugin_options": { "source_file": "path:/tmp/test.csv" }, "dataloader_options": { "map_title": "title", "project_id": "Vlg5Z1hOShm0eYmjtsqSqg", "plugin_name": "csv_plugin" } }, "status": "complete", "total_runs": 1, "pipeline_workflow_id": "S0fVQ-K0TmS1UgT0msZRBA", "last_error_at": null, "last_update_at": "2019-01-23T10:23:23", "last_success_at": "2019-01-23T10:23:23", "items_fetched_last_run": 2, "tenant": "squirro", "next_run_time_at": "2019-01-23T10:52:51", "name": "test source", "scheduling_options": { "repeat": "30m", "schedule": true }, "created_at": "2019-01-23T10:23:23", "modified_at": "2019-01-23T10:23:23", "processed": true, "pipeline_backlog": 10 }
- get_source_logs(project_id, source_id, last_n_log_lines)#
Get the run logs of a particular source run.
- Parameters:
project_id – Project identifier.
source_id – Source identifier.
last_n_log_lines – Last n log lines from the last run of the source.
Example:
>>> client.get_source_logs('Vlg5Z1hOShm0eYmjtsqSqg', ... 'hw8j7LUBRM28-jAellgQdA', ... 10)
- get_sources(project_id, include_config=None, include_run_stats=None, include_pipeline_backlog=None)#
Get all sources for the provided project.
- Parameters:
project_id (
str
) – Project identifier.include_config (
Optional
[bool
]) – Bool, whether or not to include the config for all the Sources.include_run_stats (
Optional
[bool
]) – Bool, whether or not to include the run stats for all the Sources.include_pipeline_backlog (
Optional
[bool
]) – Bool, whether or not to include the the backlog of items in the data pipeline for sources.
- Returns:
A list of sources.
Example:
>>> client.get_sources( ... project_id='Vlg5Z1hOShm0eYmjtsqSqg', ... include_config=True, ... include_run_stats=True, ... include_pipeline_backlog=True) [ { "items_fetched_total": 2, "last_error": "", "last_check_at": "2019-01-23T10:23:23", "last_items_at": "2019-01-23T10:23:23", "paused": false, "error_count": 0, "id": "pqTn4vBZRdS5hYw0TBt0pQ", "total_error_count": 0, "project_id": "Vlg5Z1hOShm0eYmjtsqSqg", "config": { "dataloader_plugin_options": { "source_file": "path:/tmp/test.csv" }, "dataloader_options": { "map_title": "title", "project_id": "Vlg5Z1hOShm0eYmjtsqSqg", "plugin_name": "csv_plugin" } }, "status": "complete", "total_runs": 1, "pipeline_workflow_id": "S0fVQ-K0TmS1UgT0msZRBA", "last_error_at": null, "last_update_at": "2019-01-23T10:23:23", "last_success_at": "2019-01-23T10:23:23", "items_fetched_last_run": 2, "tenant": "squirro", "next_run_time_at": "2019-01-23T10:52:51", "name": "test source", "scheduling_options": { "repeat": "30m", "schedule": true }, "created_at": "2019-01-23T10:23:23", "modified_at": "2019-01-23T10:23:23", "processed": true, "pipeline_backlog": 10 } ]
- get_sources_v1(project_id, search=None, status=None, plugin_name=None, include=None, counts_agg=None, start=0, count=-1)#
Get sources for the provided project.
- Parameters:
project_id (
str
) – Project identifier.include (
Optional
[str
]) – comma seperated list of additional Source fields. config - Source config run_stats - Source run stats pipeline_backlog - Source backlog information from ingester items_indexed - number of indexed items from Sourcecounts_agg (
Optional
[str
]) – whether or not to include Sources field value aggregations. Specified as comma seperated string of aggregated fields. Possible fields: status,plugin_name.start (
int
) – Integer. Used for pagination of objects. If set, the objects starting with offset start are returned.count (
int
) – Integer. Used for pagination of objects. If set, count number of objects are returned. To return all objects, set to -1.
- Returns:
A paginated list of sources.
Example:
>>> client.get_sources_v1( ... project_id='Vlg5Z1hOShm0eYmjtsqSqg', ... include="config,run_stats,pipeline_backlog,items_indexed", ... counts_agg="status,plugin_name", ... count=1, ... ) { "count": 1, "sources": [ { "items_fetched_total": 2, "last_error": "", "last_check_at": "2019-01-23T10:23:23", "last_items_at": "2019-01-23T10:23:23", "paused": false, "error_count": 0, "id": "pqTn4vBZRdS5hYw0TBt0pQ", "total_error_count": 0, "project_id": "Vlg5Z1hOShm0eYmjtsqSqg", "config": { "dataloader_plugin_options": { "source_file": "path:/tmp/test.csv" }, "dataloader_options": { "map_title": "title", "project_id": "Vlg5Z1hOShm0eYmjtsqSqg", "plugin_name": "csv_plugin" } }, "status": "complete", "total_runs": 1, "pipeline_workflow_id": "S0fVQ-K0TmS1UgT0msZRBA", "last_error_at": null, "last_update_at": "2019-01-23T10:23:23", "last_success_at": "2019-01-23T10:23:23", "items_fetched_last_run": 2, "tenant": "squirro", "next_run_time_at": "2019-01-23T10:52:51", "name": "test source", "scheduling_options": { "repeat": "30m", "schedule": true }, "created_at": "2019-01-23T10:23:23", "modified_at": "2019-01-23T10:23:23", "processed": true, "pipeline_backlog": 10, "items_indexed": 100 } ], "counts_agg": { "plugin_name": {"values": [{"key": "csv_plugin", "value": 3}]}, "status": {"values": [{"key": "queued", "value": 2}, {"key": "running", "value": 0}, {"key": "errored", "value": 0}, {"key": "complete", "value": 1}]} }, "total": 3, "next_params": { "count": 1, "start": 1 } }
- kill_source(project_id, source_id)#
Try to terminate (SIGTERM) a dataload job if it is already running. After a fixed timeout, a SIGKILL signal is sent instead.
- Parameters:
project_id – Project identifier.
source_id – Source identifier.
Example:
>>> client.kill_source('Vlg5Z1hOShm0eYmjtsqSqg', ... 'hw8j7LUBRM28-jAellgQdA')
- modify_source(project_id, source_id, name=None, config=None, scheduling_options=None, pipeline_workflow_id=None, enable_scheduling=None, validate_schema=None, notify_scheduler=None, execute_rerun=None, priority=None, description=None)#
Modify an existing source.
- Parameters:
project_id – Project identifier.
source_id – Source identifier.
name – Name for the Source.
config – Changed config of the source.
scheduling_options – dict, scheduling options for the run of a Source.
pipeline_workflow_id – Optional pipeline workflow id to change the source to.
enable_scheduling – DEPRECATED; Will be removed in a future release. Optional boolean. Indicate whether or not to enable the scheduling of this source.
validate_schema – Optional boolean. Indicate whether or not to validate the provided configuration of the source.
notify_scheduler – Optional boolean. Indicate whether or not to notify the scheduler to immediately start the procedure of loading data from the source.
execute_rerun – Optional boolean. Indicate whether or not to queue for reprocessing the batches (if any) of this source.
priority – Optional string parameter to define the priority for the source.
description – Optional string parameter for a description of the source
- Returns:
A dictionary which contains the source.
Example:
>>> client.modify_source( ... project_id='Vlg5Z1hOShm0eYmjtsqSqg', ... source_id='601AoqmkSFWGt4sAwaX8ag', ... name="new name") { "pipeline_workflow_id": "S0fVQ-K0TmS1UgT0msZRBA", "name": "new name", "scheduling_options": { "repeat": "30m", "schedule": true }, "created_at": "2019-01-23T10:32:13", "modified_at": "2019-01-23T10:34:41", "paused": false, "processed": true, "project_id": "Vlg5Z1hOShm0eYmjtsqSqg", "id": "601AoqmkSFWGt4sAwaX8ag", "tenant": "squirro" }
- new_source(project_id, name, config, scheduling_options=None, pipeline_workflow_id=None, source_id=None, paused=False, use_default_options=None, notify_scheduler=None, priority=None, description=None)#
Create a new source.
- Parameters:
project_id – Project identifier.
name – Name for the Source.
config – dict, config including dataloader_options and dataloader_plugin_options for the Source.
scheduling_options – dict, scheduling options for the run of a Source.
pipeline_workflow_id – Optional id of the pipeline workflow to apply to the data of this Source. If not specified, then the default workflow of the project with project_id will be applied.
source_id – Optional string parameter to create the source with the provided id. The length of the parameter must be 22 characters. Useful when exporting and importing projects across multiple Squirro servers.
paused – Optional boolean. Indicate whether to immediately start data loading, or rather create the source in a paused state
use_default_options – Optional boolean. Indicate whether or not to use the default mappings for facets, fields, scheduling_options and pipeline workflow provided by the dataloader plugin itself. Setting this to True will throw a 400 HTTP error code if these default mappings are not available for a specific plugin
notify_scheduler – Optional boolean. Indicate whether or not to notify the scheduler to immediately start the procedure of loading data from the source.
priority – Optional string parameter to define the priority for the source.
description – Optional string parameter for a description of the source
- Returns:
A dictionary which contains the new source.
Example:
>>> client.new_source( ... project_id='Vlg5Z1hOShm0eYmjtsqSqg', ... name='test source', ... config={ ... "dataloader_plugin_options": { ... "source_file": "path:/tmp/test.csv" ... }, ... "dataloader_options": { ... "plugin_name": "csv_plugin", ... "project_id": 'Vlg5Z1hOShm0eYmjtsqSqg', ... "map_title": "title" ... } ... }, ... scheduling_options={'schedule': True, 'repeat': '30m'}) { "items_fetched_total": 0, "last_error": "", "last_check_at": null, "last_items_at": null, "paused": false, "error_count": 0, "id": "601AoqmkSFWGt4sAwaX8ag", "total_error_count": 0, "project_id": "Vlg5Z1hOShm0eYmjtsqSqg", "status": "queued", "total_runs": 0, "pipeline_workflow_id": "S0fVQ-K0TmS1UgT0msZRBA", "last_error_at": null, "last_update_at": null, "last_success_at": null, "items_fetched_last_run": 0, "tenant": "squirro", "next_run_time_at": "2019-01-23T10:32:13", "name": "test source", "scheduling_options": { "repeat": "30m", "schedule": true }, "created_at": "2019-01-23T10:32:13", "modified_at": "2019-01-23T10:32:13", "processed": false }
- pause_source(project_id, source_id)#
Pause a source.
- Parameters:
project_id – Project identifier.
source_id – Source identifier.
Example:
>>> client.pause_source('Vlg5Z1hOShm0eYmjtsqSqg', ... 'hw8j7LUBRM28-jAellgQdA')
- reset_source(project_id, source_id, delete_source_data=None)#
Resets and run the source.
- Parameters:
project_id – Project identifier.
source_id – Source identifier.
delete_source_data – Bool, to determine whether to delete the data associated with a source or not
Example:
>>> client.reset_source( ... 'Vlg5Z1hOShm0eYmjtsqSqg', ... 'hw8j7LUBRM28-jAellgQdA')
- resume_source(project_id, source_id)#
Resume a paused source.
- Parameters:
project_id – Project identifier.
source_id – Source identifier.
Example:
>>> client.resume_source( ... 'Vlg5Z1hOShm0eYmjtsqSqg', ... 'hw8j7LUBRM28-jAellgQdA')
- retry_failed_batches(project_id, source_id, batch_id=None, batch_priority=None)#
Move all the failed batches of a given source back for processing.
If batch identifier will be specified, only this specific batch will be moved.
If batch priority level will be specified, only batches with this priority level will be moved.
- Parameters:
- Returns:
tuple with number of moved batches and items
Example
>>> client.retry_failed_batches( ... "Oqc-DYVWRGe9gtDYBIiKyA", ... "uu6nikJZTiWazZJ0-iOXKA", ... batch_id="1zxAeuo1Syyi5zgZgihwkg", ... batch_priority="high")
- run_source(project_id, source_id)#
Runs a source now.
- Parameters:
project_id – Project identifier.
source_id – Source identifier.
Example:
>>> client.run_source( ... 'Vlg5Z1hOShm0eYmjtsqSqg', ... 'hw8j7LUBRM28-jAellgQdA')
- set_max_inc(project_id, source_id, max_inc_value)#
Sets the maximum incremental value of the incremental column of a source.
- Parameters:
project_id – Project identifier.
source_id – Source identifier.
max_inc_val – The maximum incremental value to be set.
Example:
>>> client.set_max_inc( ... 'Vlg5Z1hOShm0eYmjtsqSqg', ... 'hw8j7LUBRM28-jAellgQdA', ... '2020-08-17T19:10:33')