A Blog About D4T4 & M47H

Using tf.Transform For Input Pipelines

09 September ’18

When initially building my movie classification model, I used a version of the dataset that had already been preprocessed into TFRecords. Though convenient, this created a problem when deploying the model; I wasn't able to replicate the preprocessing in my serving environment leading to training-serving skew. My solution: tf.Transform.

You can use tf.Transform to construct preprocessing pipelines that can be run as part of a Tensorflow graph. tf.Transform prevents skew by ensuring that the data seen during serving is consistent with the data seen during training. Furthermore, you can execute tf.Transform pipelines at scale with Apache Beam, a huge advantage when preparing large datasets for training.


Here is the code that I used to preprocess my data:

# load data into TFRecords
def load_data(g, out):
    inputs = glob.glob(g)
    with tf.python_io.TFRecordWriter(out) as writer:
        for i in inputs:
            label = 1 if i.split('/')[2] == 'pos' else 0
            with open(i, 'r') as f:
                review =

            example = tf.train.Example()


load_data('aclImdb/train/[posneg]*/*.txt', 'data/train.tfrecord')
load_data('aclImdb/test/[posneg]*/*.txt', 'data/test.tfrecord')
# schema for raw data
    'review': tf.FixedLenFeature(shape=[1], dtype=tf.string),
    'label': tf.FixedLenFeature(shape=[1], dtype=tf.int64)

RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(
# train our tft transformer
with beam.Pipeline() as pipeline:
    with beam_impl.Context(temp_dir=tempfile.mkdtemp()):
        coder = tft.coders.ExampleProtoCoder(RAW_DATA_METADATA.schema)

        train_data = (
            | 'ReadTrain' >> tfrecordio.ReadFromTFRecord('data/train.tfrecord')
            | 'DecodeTrain' >> beam.Map(coder.decode))

        test_data = (
            | 'ReadTest' >> tfrecordio.ReadFromTFRecord('data/test.tfrecord')
            | 'DecodeTest' >> beam.Map(coder.decode))

        # remove links, tags, quotes, apostraphes, and number commas
        # then lowercase, split by punctuation, and remove low frequency words
        def preprocessing_fn(inputs):
            remove = ["https?:\/\/(www\.)?([^\s]*)", "<([^>]+)>", "\'", "\""]
            remove = '|'.join(remove)

            reviews = tf.reshape(inputs['review'], [-1])
            reviews = tf.regex_replace(reviews, remove, '')
            reviews = tf.regex_replace(reviews, r"([0-9]),([0-9])", '\\1\\2')

            for letter in list('ABCDEFGHIJKLMNOPQRSTUVWXYZ'):
                reviews = tf.regex_replace(reviews, letter, letter.lower())

            terms = tf.string_split(reviews, '.,!?() ')
            terms_indices = tft.compute_and_apply_vocabulary(terms, top_k=VOCAB_SIZE, default_value=VOCAB_SIZE, vocab_filename='vocab')

            return {
                'terms': terms_indices,
                'label': inputs['label']

        (transformed_train_data, transformed_metadata), transform_fn = (
            (train_data, RAW_DATA_METADATA)
            | 'AnalyzeAndTransform' >> beam_impl.AnalyzeAndTransformDataset(preprocessing_fn))

        transformed_test_data, _ = (
            ((test_data, RAW_DATA_METADATA), transform_fn)
            | 'Transform' >> beam_impl.TransformDataset())

        transformed_data_coder = tft.coders.ExampleProtoCoder(transformed_metadata.schema)

        _ = (
            | 'EncodeTrain' >> beam.Map(transformed_data_coder.encode)
            | 'WriteTrain' >> tfrecordio.WriteToTFRecord('data/train_transformed.tfrecord'))

        _ = (
            | 'EncodeTest' >> beam.Map(transformed_data_coder.encode)
            | 'WriteTest' >> tfrecordio.WriteToTFRecord('data/test_transformed.tfrecord'))

        _ = (
            | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn('tft_output'))

And here we attach the tf.Transform preprocessing function (exported previously) to the trained classifier and export both for serving:

tf_transform_output = tft.TFTransformOutput('tft_output')

def serving_input_fn():
    review = tf.placeholder(dtype=tf.string)
    label = tf.zeros(dtype=tf.int64, shape=[1, 1]) # just a placeholder

    transformed_features = tf_transform_output.transform_raw_features({'review': review, 'label': label})

    return tf.estimator.export.ServingInputReceiver(transformed_features, {'review': review})


NOTE: My preprocessing function requires a 'label' input, which we obviously don't have for inference requests. I impute an arbitrary tensor here to avoid an error.

While I have found tf.Transform super-useful, I am still constrained by preprocessing that can be done with native TF ops! tf.py_func lets you insert a Python function as a TF op. However, a documented limitation is that it is not serialized in the GraphDef, so it cannot be used for serving, which requires serializing the model and restoring in a different environment. This has prevended me from doing more complicated text preprocessing steps like Porter stemming.

Nevertheless, I still love tf.Transform, an unsung hero of the TF ecosystem! Here's a link to all the code for the model build. Cheers!

Classifying Movie Reviews with TensorFlow

29 August ’18

09-09-2018 Update: My initial deployment of this model had training-serving skew since I was simply splitting words by spaces and feeding into the model. To properly serve this model, I needed to replicate the preprocessing in the serving input receiver. There is a nifty tool for this in the TF ecosystem called tf.Transform. See this post.


Recently, I've been having a lot of fun with Tensorflow! Here I'm building a DNN bag-of-words classifier with TF for classifying movie reviews as positive or negative. The data source is the ACL 2011 IMDB dataset. I used a custom estimator so that I could implement cosine annealing for learning rate decay.

I used Cloud ML Engine to deploy the model on GCP. I also used a Cloud Function to make the model accessible via a simple HTTP function. Give it a try!

Here's a link to all the code for the model build and a link to the Cloud Function for serving. Cheers!


Build TF Estimator

# set up feature columns
terms_feature_column = tf.feature_column.categorical_column_with_vocabulary_list(key='terms', 

terms_embedding_column = tf.feature_column.embedding_column(terms_feature_column, dimension=10)
feature_columns = [terms_embedding_column]

# create estimator spec
def make_model(features, labels, mode):

    # build graph
    net = tf.feature_column.input_layer(features, feature_columns)
    net = tf.layers.dense(net, units=10, activation=tf.nn.leaky_relu)
    net = tf.layers.dropout(net, rate=0.3, training=(mode == tf.estimator.ModeKeys.TRAIN))
    net = tf.layers.dense(net, units=10)
    logits = tf.layers.dense(net, 2)

    # compute predictions
    predicted_classes = tf.argmax(logits, 1)
    predicted_probs = tf.nn.softmax(logits)

    # generate predictions
    if mode == tf.estimator.ModeKeys.PREDICT:
        predictions = {
            'class': predicted_classes,
            'prob': predicted_probs

        export_outputs = {
          'predict': tf.estimator.export.PredictOutput(outputs=predictions)

        return tf.estimator.EstimatorSpec(mode, predictions=predictions, export_outputs=export_outputs)

    # compute loss
    loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)

    # create training op with cosine annealing for learning rate
    if mode == tf.estimator.ModeKeys.TRAIN:
        global_step = tf.train.get_global_step()

        learning_rate = tf.train.cosine_decay(learning_rate=0.2, global_step=global_step, alpha=0.05, decay_steps=10000)

        optimizer = tf.train.AdagradOptimizer(learning_rate=learning_rate)
        optimizer = tf.contrib.estimator.clip_gradients_by_norm(optimizer, 5.0)

        train_op = optimizer.minimize(loss, global_step=global_step)

        return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

    # compute evaluation metrics
    eval_metric_ops = {
        'accuracy': tf.metrics.accuracy(labels=labels, predictions=predicted_classes),
        'auc': tf.metrics.auc(labels=labels, predictions=predicted_probs[:, 1])
    return tf.estimator.EstimatorSpec(mode, loss=loss, eval_metric_ops=eval_metric_ops)

# create estimator
classifier = tf.estimator.Estimator(model_fn=make_model)

# train and evaluate
classifier.train(input_fn=lambda: input_fn([train_path], num_epochs=10))
test_stats = classifier.evaluate(input_fn=lambda: input_fn([test_path], num_epochs=1))

# export
def serving_input_receiver_fn():
    reviews = tf.placeholder(dtype=tf.string, shape=(None), name='reviews')
    terms = tf.sparse_tensor_to_dense(tf.string_split(reviews), default_value='')
    return tf.estimator.export.ServingInputReceiver({'terms': terms}, {'reviews': reviews})

export_path = classifier.export_savedmodel(export_dir_base='exports',

export_path = export_path.decode('utf-8')

Model Results

Train Test
Accuracy 0.918600 0.870160
AUC 0.971972 0.942018
Loss 0.214713 0.321892

Upload to Cloud ML Engine


MODEL_TIMESTAMP=$(ls -t exports/ | head -1)


gsutil rsync -c -d -r exports/$MODEL_TIMESTAMP $DEPLOYMENT_SOURCE

gcloud ml-engine models create $MODEL_NAME

gcloud ml-engine versions create $MODEL_VERSION --model $MODEL_NAME --origin $DEPLOYMENT_SOURCE \
    --python-version 3.5 --runtime-version 1.9

NOTE: Make sure the Python environment in which you build your model matches the serving environment in Cloud ML!

Expose Model with a Cloud Function

# gets predictions from cloud ml engine
def ml_predict(request):
    import flask
    import json
    import googleapiclient.discovery
    import google.auth

    headers = {
        'Access-Control-Allow-Origin': '*',
        'Access-Control-Allow-Methods': 'POST',
        'Access-Control-Allow-Headers': 'Content-Type'

    # handle pre-flight options request
    if request.method == 'OPTIONS':
        return flask.make_response(('', 204, headers))

    _, project = google.auth.default()

    request_json = request.get_json()

    model = request_json['model']
    version = request_json['version']
    instances = request_json['instances']

    service ='ml', 'v1')
    name = 'projects/{}/models/{}/versions/{}'.format(project, model, version)

    response = service.projects().predict(
        body={'instances': instances}

    if 'error' in response:
        raise RuntimeError(response['error'])

    return flask.make_response((

Two Options for Hosting a Private PyPI Repository

11 August ’18

A few years back, I read an interesting post about how Airbnb's data science team developed their own internal R package, Rbnb, to standardize solutions to common problems and reduce redundancy across projects. I really like this idea and have implemented a similar solution for Python at places that I have worked. This post details two options for hosting a private Python package, both of which leverage Google Cloud Build for CI/CD.

Option #1 - Gemfury

Gemfury is a cloud package respository that you can use to host both public and private packages for Python (and lots of other languages). Some useful instructions for how to upload Python packages to Gemfury and install them with pip. The following Cloud Build pipeline will, on tagged commits, download the package from Google Cloud Repositories, run tests, package, and curl to Gemfury:

  - name:
    args: ['source', 'repos', 'clone', '${_PACKAGE}', '--project=${PROJECT_ID}']
  - name:
    args: ['checkout', '${TAG_NAME}']
    dir: '/workspace/${_PACKAGE}'
  - name:${PROJECT_ID}/python-packager:latest
    entrypoint: 'bash'
    args: ['-c', 'pip3 install -e . && python3 -m pytest -s']
    dir: '/workspace/${_PACKAGE}'
  - name:${PROJECT_ID}/python-packager:latest
    args: ['', 'sdist']
    dir: '/workspace/${_PACKAGE}'
  - name:
    entrypoint: 'bash'
    args: ['-c', 'curl -f -F package=@dist/${_PACKAGE}-${TAG_NAME}.tar.gz https://$${FURY_TOKEN}${_FURY_USER}/']
    secretEnv: ['FURY_TOKEN']
    dir: '/workspace/${_PACKAGE}'
- kmsKeyName: projects/blog-180218/locations/global/keyRings/djr/cryptoKeys/fury
    FURY_TOKEN: CiQAUrbjD9VjSHPnmMvLV0Jv+duPGyuaIgS0C2u1LmcVRGHY/BwSPQCP7mNtRVGShanmgHUx5RHoohNDGWX4FnscAmbMBVplms0uOQfHLmLy/wkfaxAHYoK2pX/LKDxDIwQzAz0=
  _PACKAGE: djr-py
  _FURY_USER: donaldrauscher

NOTE: Need to create a KMS key/keyring, give Cloud Build access to it, and use that key to encrypt your Fury token. You can find additional instructions on how to do this here.

echo -n ${FURY_TOKEN} | gcloud kms encrypt --plaintext-file=- --ciphertext-file=- --location=global --keyring=djr --key=fury | base64

Option #2 - GCS Bucket

If you don't care about restricting which people can access your package (I clearly do not), then you can host a simple PyPI respository on a GCS bucket using dumb-pypi. First, you will need to set up a GCS bucket where you can host a static site. This Cloud Build pipeline uploads the package to GCS and triggers a second Cloud Build pipeline which rebuilds the PyPI repository on the specified GCS bucket.

  - name:
    args: ['clone', '-b', '${TAG_NAME}', '--single-branch', '--depth', '1', '${_GITHUB_USER}/${_PACKAGE}.git']
  - name:${PROJECT_ID}/python-packager:latest
    entrypoint: 'bash'
    args: ['-c', 'pip3 install -e . && python3 -m pytest -s']
    dir: '/workspace/${_PACKAGE}'
  - name:${PROJECT_ID}/python-packager:latest
    args: ['', 'sdist']
    dir: '/workspace/${_PACKAGE}'
  - name:
    args: ['cp', 'dist/${_PACKAGE}-${TAG_NAME}.tar.gz', 'gs://${_BUCKET}/raw/']
    dir: '/workspace/${_PACKAGE}'
  - name:
    args: ['clone', '']
  - name:
    args: ['builds', 'submit', '--config', 'cloudbuild.yaml', '--no-source', '--async', '--substitutions', '_BUCKET=${_BUCKET}']
    dir: '/workspace/gcs-pypi'
  _PACKAGE: djr-py
  _GITHUB_USER: donaldrauscher


NOTE: Both of these Cloud Build jobs require a python-packager custom Cloud Build step. This is a simple Docker container with some Python utilities:


RUN apt-get update \
  && apt-get install -y python3-pip \
  && rm -rf /var/lib/apt/lists/*

RUN pip3 install --upgrade pip setuptools wheel pylint pytest

ENTRYPOINT ["python3"]

I used option #2 to host my personal Python package (djr-py) on Enjoy!