API¶
The Pipeline Class¶
-
class
pipeline.pipeline.
Pipeline
(name, display_name, settings_file=None, settings_from_file=True, log_status=False, conn=None, conn_name=None)¶ Main pipeline class
The pipeline class binds together extractors, schema, and loaders and runs everything together. Almost all Pipeline methods return the pipeline object, allowing for methods to be chained together.
-
close
()¶ Close any open database connections.
-
enforce_full_pipeline
()¶ Ensure that a pipeline has an extractor, schema, and loader
Raises: RuntimeError
– if an extractor, schema, and loader are not all specified
-
extract
(extractor, *args, **kwargs)¶ Set the extractor class and related arguments
Parameters: - extractor – Extractor class, see Built-in Extractors
- target – location of the extraction target (file, url, etc.)
-
load
(loader, config_string=None, *args, **kwargs)¶ Sets the loader class
Parameters: loader – Loader class. See Built-in Loaders Returns: modified Pipeline object
-
load_line
(data)¶ Load a line into the pipeline’s data or throw an error
Parameters: data – A parsed line from an extractor’s handle_line method
-
parse_config_piece
(pipeline_piece, config_piece_string)¶ Parse out a small piece of the overall pipeline configuration
This is used to pass only the relevant parts of configuration to the relevant loaders. The structure allows configuration to grow larger without forcing implementing functions to know exactly how the configuration must be structured.
Parameters: - pipeline_piece – which part of the pipeline to use (for example, ‘loader’). This should not be modified by the user.
- config_piece_string – passed by the user, allows accessing a deeper nested part of the configuration
Returns: Isolated configuration only for the specified piece
Raises: - InvalidConfigException when the specified configuration
- piece cannot be found
-
pre_run
()¶ Method to be run immediately before the pipeline runs
Enforces that a pipeline is complete and, connects to the statusdb
Returns: A unix timestamp of the pipeline’s start time.
-
run
()¶ Main pipeline run method
One of the main features is that the connector, extractor, schema, and loader are all instantiated here as opposed to when they are declared on pipeline instantiation. This delays opening connections until the last possible moment.
The run method works essentially as follow:
- Run the
pre_run
method, which gives us the pipeline start time, ensures that our pipeline has all of the required component pieces, and connects to the status db. - Boot up a new connection object, and get the checksum of the connected iterable.
- Check to make sure that the incoming checksum is different from the previous run’s input_checksum
- Instantiate our schema
- Iterate through the iterable returned from the connector’s
connect method, handling each element with the extractor’s
handle_line
method before passing it to the theload_line
method to attach each row to the pipeline’s data. - After iteration, clean up the connector
- Instantiate the loader and load the data
- Finally, update the status to successful run and close down and clean up the pipeline.
- Run the
-
schema
(schema)¶ Set the schema class
Parameters: schema – Schema class Returns: modified Pipeline object
-
set_config_from_file
(file)¶ Sets the pipeline’s configuration from file
Parameters: file – Location of the configuration to load
Raises: InvalidConfigException
– if configuration is found or- the found configuration is not valid json
-
Job Status¶
-
class
pipeline.status.
Status
(conn, name, display_name, last_ran, start_time, status, frequency, num_lines, input_checksum)¶ Object to represent row in status table
Variables: - conn – database connection, usually sqlite3 connection object
- name – name of pipeline job running
- display_name – pretty formatted display name for pipeline
- last_ran – UNIX timestamp (number) for last complete run
- start_time – UNIX timestamp (number) for last complete run start
- status – string representing status/errors with the pipeline
- num_lines – if successful, number of lines processed
- input_checksum – a checksum of the input’s contents
-
update
(**kwargs)¶ Update the Status object with passed kwargs and write the result
-
write
()¶ Insert or replace a status row
Built-in Connectors¶
-
class
pipeline.connectors.
Connector
(*args, **kwargs)¶ Base connector class.
Subclasses must implement
connect
,checksum_contents
, andclose
methods.-
connect
(target)¶ Base connect method
Should return an object that can be iterated through via the
next()
builtin method.
-
checksum_contents
(target)¶ Should return an md5 hash of the contents of the conn object
-
close
()¶ Teardown any open connections (like to a file, for example)
-
-
class
pipeline.connectors.
FileConnector
(*args, **kwargs)¶ Base connector for file objects.
-
connect
(target)¶ Connect to a file
Runs
open()
on the passedtarget
, sets the result on the class as_file
, and returns it.Parameters: target – a valid filepath Returns: A file-object
-
checksum_contents
(target, blocksize=8192)¶ Open a file and get a md5 hash of its contents
Parameters: target – a valid filepath Keyword Arguments: blocksize – the size of the block to read at a time in the file. Defaults to 8192. Returns: A hexidecimal representation of a file’s contents.
-
close
()¶ Closes the connected file if it is not closed already
-
-
class
pipeline.connectors.
RemoteFileConnector
(*args, **kwargs)¶ Connector for a file located at a remote (HTTP-accessible) resource
This class should be used to connect to a file available over HTTP. For example, if there is a CSV that is streamed from a web server, this is the correct connector to use.
-
connect
(target)¶ Connect to a remote target
Parameters: target – Remote URL Returns: io.TextIOWrapper
around the opened URL.
-
-
class
pipeline.connectors.
HTTPConnector
(*args, **kwargs)¶ Connect to remote file via HTTP
-
class
pipeline.connectors.
SFTPConnector
(*args, **kwargs)¶ Connect to remote file via SFTP
Built-in Extractors¶
-
class
pipeline.extractors.
TableExtractor
(connection, *args, **kwargs)¶ Abstract Extractor subclass for extracting data in a tabular format
-
set_headers
(headers=None)¶ Sets headers from file or passed headers
This method sets two attributes on the class: the headers attribute and the schema_headers attribute. schema_headers must align with the schema attributes for the pipeline.
If no headers are passed, then we check the first line of the file. The headers attribute is set from the first line, and schema headers are by default created by lowercasing and replacing spaces with underscores. Custom header -> schema header mappings can be created by subclassing the CSVExtractor and overwriting the create_schema_headers method.
Keyword Arguments: headers – Optional headers that can be passed to be used as the headers and schema headers
Raises: RuntimeError
– if self.headers is not passed and the- firstline_headers kwarg is not set.
-
create_schema_headers
(headers)¶ Maps headers to schema headers
Parameters: headers – A list of headers Returns: a list of formatted schema headers. By default, the passed headers are lowercased and have their spaces replaced with underscores
-
handle_line
(line)¶ Replace empty strings with None types.
-
-
class
pipeline.extractors.
ExcelExtractor
(connection, *args, **kwargs)¶ TableExtractor subclass for Microsft Excel spreadsheet files (xls, xlsx)
Built-in Schema¶
-
class
pipeline.schema.
BaseSchema
(extra=None, only=(), exclude=(), prefix='', strict=False, many=False, context=None, load_only=(), dump_only=(), partial=False)¶ Base schema for the pipeline. Extends
marshmallow.Schema
-
serialize_to_ckan_fields
(capitalize=False)¶ Convert schema fieldlist to CKAN-friendly Fields
Returns: A list of dictionaries with proper name/type mappings for CKAN. For example, name=fields.String() would go to: [ { 'id': 'name', 'type': 'text' } ]
-
Built-in Loaders¶
-
class
pipeline.loaders.
CKANLoader
(*args, **kwargs)¶ Connection to ckan datastore
-
get_resource_id
(package_id, resource_name)¶ Search for resource within CKAN dataset and returns its id
- Params:
- package_id: id of resources parent dataset resource_name: name of the resource
Returns: The resource id if the resource is found within the package, None
otherwise
-
resource_exists
(package_id, resource_name)¶ Search for resource the existence of a resource on ckan instance
- Params:
- package_id: id of resources parent dataset resource_name: name of the resource
Returns: True
if the resource is found within the package,False
otherwise
-
create_resource
(package_id, resource_name)¶ Create new resource in ckan instance
- Params:
- package_id: dataset under which to add new resource resource_name: name of new resource
Returns: id of newly created resource if successful, None
otherwise
-
create_datastore
(resource_id, fields)¶ Create new datastore for specified resource
- Params:
- resource_id: resource id for which new datastore is being made fields: header fields for csv file
Returns: resource_id for the new datastore if successful Raises: CKANException if resource creation is unsuccessful
-
delete_datastore
(resource_id)¶ Deletes datastore table for resource
- Params:
- resource: resource_id to remove table from
Returns: Status code from the request
-
upsert
(resource_id, data, method='upsert')¶ Upsert data into datastore
- Params:
- resource_id: resource_id to which data will be inserted data: data to be upserted
Returns: request status
-
update_metadata
(resource_id)¶ Update a resource’s metadata
TODO: Make this versatile
- Params:
- resource_id: resource whose metadata willbe modified
Returns: request status
-
-
class
pipeline.loaders.
CKANDatastoreLoader
(*args, **kwargs)¶ Store data in CKAN using an upsert strategy
-
load
(data)¶ Load data to CKAN using an upsert strategy
Parameters: data – a list of data to be inserted or upserted to the configured CKAN instance Raises: RuntimeError if the upsert or update metadata – calls are unsuccessful Returns: A two-tuple of the status codes for the upsert and metadata update calls
-