An Example Pipeline¶
The primary goal is to make it as easy as possible to write new pipelines. The main additional work that is required to write a new pipeline is to construct a new Schema. Here, we’ll deconstruct an example to see some of the tricks that can be used, especially when building the schema.
Building the schema¶
Writing pipelines a matter of understanding where your data is coming from, and writing a schema to handle it. Let’s take the Pittsburgh Police Blotter dataset and build an example schema from it. The raw source data is available from the Police website.
from marshmallow import pre_load, post_load, fields
import pipeline as pl
class PoliceBlotterSchema(pl.BaseSchema):
report_name = fields.String()
ccr = fields.Integer()
section = fields.String()
description = fields.String()
arrest_date = fields.DateTime(format='%m/%d/%Y', load_only=True)
arrest_time = fields.DateTime(format='%H:%M')
address = fields.String(allow_none=True)
neighborhood = fields.String(allow_none=True)
zone = fields.Integer(allow_none=True)
age = fields.Integer(allow_none=True)
gender = fields.String(allow_none=True)
@pre_load
def process_na_zone(self, data):
zone = data.get('zone')
if zone.lower() == 'n/a':
data['zone'] = None
return data
@post_load
def combine_date_and_time(self, in_data):
in_data['arrest_time'] = str(datetime.datetime(
in_data['arrest_date'].year, in_data['arrest_date'].month,
in_data['arrest_date'].day, in_data['arrest_time'].hour,
in_data['arrest_time'].minute, in_data['arrest_time'].second
))
There are a few notable things going on here, specifically with the pre_load()
and post_load()
decorators.
@pre_load
will register a method to be run on the raw data before it is processed by the Schema’s main load()
method. In this case, we set a custom ‘n/a’ designation to None
.
@post_load
, as you might expect, registers a method to run after the deserialization of an object and the schema is constructed. In this case, we take our arrest_time
field and combine it with the arrest_date
field to make one condensed date field. Because our loaded arrest_date
field is now a duplicate, we can ensure that it isn’t passed on to our loader
by using the load_only
flag in the field. Similarly, if we wanted to create a new field from our deserialized data, we could use the dump_only
flag and ensure that field only exists in the dumped data.
Writing the rest of the pipeline¶
Now that we have our schema sorted out, we can go ahead and write the rest of our pipeline. Note that we will be using a RemoteFileConnector
to connect to yesteryday’s raw data.
import datetime
import pipeline as pl
yesterday = (datetime.datetime.now() - datetime.timedelta(days=1))
url = 'http://apps.pittsburghpa.gov/police/arrest_blotter/arrest_blotter_%s.csv' % (yesterday.strftime("%A"))
police_blotter_pipeline = pl.Pipeline('police_blotter_pipeline', 'Police Blotter Pipeline') \
.connect(pl.RemoteFileConnector, url) \
.extract(pl.CSVExtractor) \
.schema(PoliceBlotterSchema) \
.load(pl.CKANDatastoreLoader,
fields=PoliceBlotterSchema().serialize_to_ckan_fields(capitalize=True),
package_id='your-ckan-package-id-here'
resource_name='Incidents',
method='insert') \
.run()
Let’s deconstruct what’s going on here. As we can see, the steps are all laid out in the primary methods:
- First, we connect to the remote file’s location, which is generated from a base URL, string interpolated with the name of the previous day of the week. If necessary, we could override the file’s encoding to ensure that it opens as expected.
- Secondly, we note that this is going to be a comma-separated input source. If necessary, we could override the delimiter if it were tab- or pipe-separated data. We could also pass in custom headers. One thing to note here is that headers are automatically mapped to
schema_headers
, which are the names of the marshmallow fields in our schema. If headers are manually passed, they need match the schema headers so that marshmallow knows what to look for. Optionally, the individual marshmallow fields can also take aload_from
field which allows headers to be mapped on a per-field level.- Our above-specified schema is passed to the pipeline.
- We specify that we are going to use a
CKANDatastoreLoader
. This has some required kwargs, which include the data insertion method (must be eitherinsert
orupsert
), and thefields
to use. There is aserialize_to_ckan_fields()
convenience method attached to all pipeline schema, which will automatically build the requiredfields
in the correct format for CKAN.- Finally, with everything declared, we run the pipeline!