API

The Pipeline Class

class pipeline.pipeline.Pipeline(name, display_name, settings_file=None, settings_from_file=True, log_status=False, conn=None, conn_name=None)

Main pipeline class

The pipeline class binds together extractors, schema, and loaders and runs everything together. Almost all Pipeline methods return the pipeline object, allowing for methods to be chained together.

close()

Close any open database connections.

enforce_full_pipeline()

Ensure that a pipeline has an extractor, schema, and loader

Raises:RuntimeError – if an extractor, schema, and loader are not all specified
extract(extractor, *args, **kwargs)

Set the extractor class and related arguments

Parameters:
  • extractor – Extractor class, see Built-in Extractors
  • target – location of the extraction target (file, url, etc.)
load(loader, config_string=None, *args, **kwargs)

Sets the loader class

Parameters:loader – Loader class. See Built-in Loaders
Returns:modified Pipeline object
load_line(data)

Load a line into the pipeline’s data or throw an error

Parameters:data – A parsed line from an extractor’s handle_line method
parse_config_piece(pipeline_piece, config_piece_string)

Parse out a small piece of the overall pipeline configuration

This is used to pass only the relevant parts of configuration to the relevant loaders. The structure allows configuration to grow larger without forcing implementing functions to know exactly how the configuration must be structured.

Parameters:
  • pipeline_piece – which part of the pipeline to use (for example, ‘loader’). This should not be modified by the user.
  • config_piece_string – passed by the user, allows accessing a deeper nested part of the configuration
Returns:

Isolated configuration only for the specified piece

Raises:
  • InvalidConfigException when the specified configuration
  • piece cannot be found
pre_run()

Method to be run immediately before the pipeline runs

Enforces that a pipeline is complete and, connects to the statusdb

Returns:A unix timestamp of the pipeline’s start time.
run()

Main pipeline run method

One of the main features is that the connector, extractor, schema, and loader are all instantiated here as opposed to when they are declared on pipeline instantiation. This delays opening connections until the last possible moment.

The run method works essentially as follow:

  1. Run the pre_run method, which gives us the pipeline start time, ensures that our pipeline has all of the required component pieces, and connects to the status db.
  2. Boot up a new connection object, and get the checksum of the connected iterable.
  3. Check to make sure that the incoming checksum is different from the previous run’s input_checksum
  4. Instantiate our schema
  5. Iterate through the iterable returned from the connector’s connect method, handling each element with the extractor’s handle_line method before passing it to the the load_line method to attach each row to the pipeline’s data.
  6. After iteration, clean up the connector
  7. Instantiate the loader and load the data
  8. Finally, update the status to successful run and close down and clean up the pipeline.
schema(schema)

Set the schema class

Parameters:schema – Schema class
Returns:modified Pipeline object
set_config_from_file(file)

Sets the pipeline’s configuration from file

Parameters:

file – Location of the configuration to load

Raises:
  • InvalidConfigException – if configuration is found or
  • the found configuration is not valid json

Job Status

class pipeline.status.Status(conn, name, display_name, last_ran, start_time, status, frequency, num_lines, input_checksum)

Object to represent row in status table

Variables:
  • conn – database connection, usually sqlite3 connection object
  • name – name of pipeline job running
  • display_name – pretty formatted display name for pipeline
  • last_ran – UNIX timestamp (number) for last complete run
  • start_time – UNIX timestamp (number) for last complete run start
  • status – string representing status/errors with the pipeline
  • num_lines – if successful, number of lines processed
  • input_checksum – a checksum of the input’s contents
update(**kwargs)

Update the Status object with passed kwargs and write the result

write()

Insert or replace a status row

Built-in Connectors

class pipeline.connectors.Connector(*args, **kwargs)

Base connector class.

Subclasses must implement connect, checksum_contents, and close methods.

connect(target)

Base connect method

Should return an object that can be iterated through via the next() builtin method.

checksum_contents(target)

Should return an md5 hash of the contents of the conn object

close()

Teardown any open connections (like to a file, for example)

class pipeline.connectors.FileConnector(*args, **kwargs)

Base connector for file objects.

connect(target)

Connect to a file

Runs open() on the passed target, sets the result on the class as _file, and returns it.

Parameters:target – a valid filepath
Returns:A file-object
checksum_contents(target, blocksize=8192)

Open a file and get a md5 hash of its contents

Parameters:target – a valid filepath
Keyword Arguments:
 blocksize – the size of the block to read at a time in the file. Defaults to 8192.
Returns:A hexidecimal representation of a file’s contents.
close()

Closes the connected file if it is not closed already

class pipeline.connectors.RemoteFileConnector(*args, **kwargs)

Connector for a file located at a remote (HTTP-accessible) resource

This class should be used to connect to a file available over HTTP. For example, if there is a CSV that is streamed from a web server, this is the correct connector to use.

connect(target)

Connect to a remote target

Parameters:target – Remote URL
Returns:io.TextIOWrapper around the opened URL.
class pipeline.connectors.HTTPConnector(*args, **kwargs)

Connect to remote file via HTTP

class pipeline.connectors.SFTPConnector(*args, **kwargs)

Connect to remote file via SFTP

Built-in Extractors

class pipeline.extractors.TableExtractor(connection, *args, **kwargs)

Abstract Extractor subclass for extracting data in a tabular format

set_headers(headers=None)

Sets headers from file or passed headers

This method sets two attributes on the class: the headers attribute and the schema_headers attribute. schema_headers must align with the schema attributes for the pipeline.

If no headers are passed, then we check the first line of the file. The headers attribute is set from the first line, and schema headers are by default created by lowercasing and replacing spaces with underscores. Custom header -> schema header mappings can be created by subclassing the CSVExtractor and overwriting the create_schema_headers method.

Keyword Arguments:
 

headers – Optional headers that can be passed to be used as the headers and schema headers

Raises:
  • RuntimeError – if self.headers is not passed and the
  • firstline_headers kwarg is not set.
create_schema_headers(headers)

Maps headers to schema headers

Parameters:headers – A list of headers
Returns:a list of formatted schema headers. By default, the passed headers are lowercased and have their spaces replaced with underscores
handle_line(line)

Replace empty strings with None types.

class pipeline.extractors.ExcelExtractor(connection, *args, **kwargs)

TableExtractor subclass for Microsft Excel spreadsheet files (xls, xlsx)

Built-in Schema

class pipeline.schema.BaseSchema(extra=None, only=(), exclude=(), prefix='', strict=False, many=False, context=None, load_only=(), dump_only=(), partial=False)

Base schema for the pipeline. Extends marshmallow.Schema

serialize_to_ckan_fields(capitalize=False)

Convert schema fieldlist to CKAN-friendly Fields

Returns:A list of dictionaries with proper name/type mappings for CKAN. For example, name=fields.String() would go to:
[
    {
        'id': 'name',
        'type': 'text'
    }
]

Built-in Loaders

class pipeline.loaders.CKANLoader(*args, **kwargs)

Connection to ckan datastore

get_resource_id(package_id, resource_name)

Search for resource within CKAN dataset and returns its id

Params:
package_id: id of resources parent dataset resource_name: name of the resource
Returns:The resource id if the resource is found within the package, None otherwise
resource_exists(package_id, resource_name)

Search for resource the existence of a resource on ckan instance

Params:
package_id: id of resources parent dataset resource_name: name of the resource
Returns:True if the resource is found within the package, False otherwise
create_resource(package_id, resource_name)

Create new resource in ckan instance

Params:
package_id: dataset under which to add new resource resource_name: name of new resource
Returns:id of newly created resource if successful, None otherwise
create_datastore(resource_id, fields)

Create new datastore for specified resource

Params:
resource_id: resource id for which new datastore is being made fields: header fields for csv file
Returns:resource_id for the new datastore if successful
Raises:CKANException if resource creation is unsuccessful
delete_datastore(resource_id)

Deletes datastore table for resource

Params:
resource: resource_id to remove table from
Returns:Status code from the request
upsert(resource_id, data, method='upsert')

Upsert data into datastore

Params:
resource_id: resource_id to which data will be inserted data: data to be upserted
Returns:request status
update_metadata(resource_id)

Update a resource’s metadata

TODO: Make this versatile

Params:
resource_id: resource whose metadata willbe modified
Returns:request status
class pipeline.loaders.CKANDatastoreLoader(*args, **kwargs)

Store data in CKAN using an upsert strategy

load(data)

Load data to CKAN using an upsert strategy

Parameters:data – a list of data to be inserted or upserted to the configured CKAN instance
Raises:RuntimeError if the upsert or update metadata – calls are unsuccessful
Returns:A two-tuple of the status codes for the upsert and metadata update calls