DonaldRauscher.com

A Blog About D4T4 & M47H

Building and Deploying a Deep Learning Model Part 3: Deploying a Serverless Microservice

16 September ’18

This is part 3 in a 3-part series (part 1, part 2) on building and deploying a deep learning model for the popular ACL 2011 IMDB dataset. In this part, I host the model on Cloud ML Engine and make it accessible via a simple HTTP Cloud Function. Give it a try!


===

Upload Model to Cloud ML Engine

#!/bin/bash

MODEL_NAME=movie_reviews
MODEL_VERSION=v1
MODEL_TIMESTAMP=$(ls -t exports/ | head -1)

DEPLOYMENT_SOURCE=gs://djr-data/movie-reviews

gsutil rsync -c -d -r exports/$MODEL_TIMESTAMP $DEPLOYMENT_SOURCE

gcloud ml-engine models create $MODEL_NAME

gcloud ml-engine versions create $MODEL_VERSION --model $MODEL_NAME --origin $DEPLOYMENT_SOURCE \
    --python-version 2.7 --runtime-version 1.9

NOTE: Make sure the Python environment in which you build your model matches the serving environment in Cloud ML!

Expose Model with a Cloud Function

# gets predictions from cloud ml engine
def classify_movie_reviews(request):
    import flask
    import json
    import re
    import math
    import googleapiclient.discovery
    import google.auth

    headers = {
        'Access-Control-Allow-Origin': '*',
        'Access-Control-Allow-Methods': 'POST',
        'Access-Control-Allow-Headers': 'Content-Type'
    }

    # handle pre-flight options request
    if request.method == 'OPTIONS':
        return flask.make_response(('', 204, headers))

    _, project = google.auth.default()

    request_json = request.get_json()

    # this pulls out our proper nouns and treats them as single words
    def preprocessing(review):
        proper = r"([A-Z]([a-z]+|\.)(?:\s+[A-Z]([a-z]+|\.))*(?:\s+[a-z][a-z\-]+){0,2}\s+[A-Z]([a-z]+|\.)(?:\s+[0-9]+)?)"
        space_between_brackets = r"[\.\s]+(?=[^\[\]]*]])"
        brackets = r"(?:[\[]{2})(.*?)(?:[\]]{2})"

        review = re.sub(proper, '[[\\1]]', review)
        review = re.sub(space_between_brackets, '~', review)
        review = re.sub(brackets, '\\1', review)
        return review

    model = 'movie_reviews'
    version = request_json['version']
    instances = [preprocessing(i) for i in request_json['instances']]

    service = googleapiclient.discovery.build('ml', 'v1')
    name = 'projects/{}/models/{}/versions/{}'.format(project, model, version)

    response = service.projects().predict(
        name=name,
        body={'instances': instances}
    ).execute()

    if 'error' in response:
        raise RuntimeError(response['error'])

    # clear out nan if they exist
    for r in response['predictions']:
        if all([math.isnan(i) for i in r['prob']]):
            r['prob'] = []
            r['class'] = -1

    return flask.make_response((
        json.dumps(response['predictions']),
        200,
        headers
    ))

NOTE: Additional preprocessing for grouping movie names and proper nouns is replicated here since it could not be embedded in the TF input serving function.

Link to all code: https://github.com/donaldrauscher/movie-reviews-tf

Building and Deploying a Deep Learning Model Part 2: Building the Custom Estimator

09 September ’18

This is part 2 in a 3-part series (part 1, part 3) on building and deploying a deep learning model for the popular ACL 2011 IMDB dataset. In this part, I build a custom estimator in Tensorflow.

===

A few details on the model itself:

  • I used cosine annealing to reduce the learning rate throughout training
  • I used dropout to counteract overfitting and batch normalization before each activation layer
  • I used leaky ReLU rather than regular ReLU to mitigate the "dying ReLU" problem where neurons get stuck in negative states
  • I leveraged transfer learning, using Glove to initialize my word embedding
  • Rather than using bag-of-words which ignores the structure of sentences, I used a 1D convolution layer to model the interaction between words and their neighbors


Initialize word embeddings with GloVe

# get vocabulary
vocab = tft_output.vocabulary_by_name('vocab')
vocab_size = len(vocab)
# load glove embeddings
embedding_size = 200
glove_embeddings = {}

with open('glove/glove.twitter.27B.{}d.txt'.format(embedding_size), mode='r') as f:  
    for line in f:
        values = line.strip().split()
        w = values[0]
        vectors = np.asarray(values[1:], dtype='float32')
        glove_embeddings[w] = vectors
# create initialized embedding matrix
embedding_matrix = truncnorm.rvs(a=-2, b=2, size=(vocab_size+1, embedding_size))

glove_np = pd.DataFrame(glove_embeddings).values
glove_mu, glove_std = np.mean(glove_np), np.std(glove_np)

for i, w in enumerate(vocab):
    try:
        embedding_matrix[i] = np.clip((glove_embeddings[w] - glove_mu)/glove_std, -2, 2)
    except KeyError:
        pass

embedding_matrix = embedding_matrix / math.sqrt(embedding_size)

def embedding_initializer(shape=None, dtype=tf.float32, partition_info=None):  
    assert dtype is tf.float32
    return embedding_matrix

Build classifier

# input function
def input_fn(input_file_pattern, num_epochs=None, batch_size=25, shuffle=True, prefetch=1):  
    input_file_names = glob.glob(input_file_pattern)

    ds = tf.data.TFRecordDataset(input_file_names)
    ds = ds.cache()

    if shuffle:
        ds = ds.apply(tf.contrib.data.shuffle_and_repeat(buffer_size=1000, count=num_epochs))
    else:
        ds = ds.repeat(num_epochs)

    ds = ds.apply(tf.contrib.data.map_and_batch(
        map_func=lambda x: tf.parse_single_example(x, feature_spec), 
        batch_size=batch_size,
        num_parallel_calls=multiprocessing.cpu_count()
    ))

    if prefetch > 0:
        ds = ds.prefetch(prefetch)

    features = ds.make_one_shot_iterator().get_next()
    labels = features.pop('label')
    return features, labels

train_input_fn = functools.partial(input_fn,
                                   input_file_pattern=wildcard(TRAIN_TRANSFORMED_PATH),
                                   num_epochs=1)

test_input_fn = functools.partial(input_fn,
                                  input_file_pattern=wildcard(TEST_TRANSFORMED_PATH),
                                  num_epochs=1)
# create estimator spec
def make_model(features, labels, mode, params, config):

    # hyperparameters
    dropout = params['dropout']
    conv_filters = params['conv_filters']
    dense_units = params['dense_units']
    learning_rate_start = params['learning_rate_start']
    learning_rate_steps = params['learning_rate_steps']

    # flag if training
    is_training = (mode == tf.estimator.ModeKeys.TRAIN)

    # set up feature columns
    terms = features['terms_indices']

    terms_shape = terms.dense_shape
    terms_shape = tf.stack([terms_shape[0], tf.where(terms_shape[1] < 3, tf.constant(3, dtype=tf.int64), terms_shape[1])], axis=0)

    terms = tf.sparse_to_dense(terms.indices, terms_shape, terms.values, default_value=vocab_size)
    terms_embed_seq = tf.contrib.layers.embed_sequence(terms, vocab_size=vocab_size+1, embed_dim=embedding_size, initializer=embedding_initializer)

    # build graph
    net = terms_embed_seq
    net = tf.layers.dropout(net, rate=dropout, training=is_training)
    net = tf.layers.conv1d(inputs=net, filters=conv_filters, kernel_size=3, strides=1, activation=tf.nn.leaky_relu)
    net = tf.reduce_max(input_tensor=net, axis=1)      
    net = tf.layers.dropout(net, rate=dropout, training=is_training)
    net = tf.layers.batch_normalization(net, training=is_training)
    net = tf.layers.dense(net, units=dense_units, activation=tf.nn.leaky_relu)
    logits = tf.layers.dense(net, 2)

    # compute predictions
    predicted_classes = tf.argmax(logits, 1)
    predicted_probs = tf.nn.softmax(logits)

    # generate predictions
    if mode == tf.estimator.ModeKeys.PREDICT:
        predictions = {
            'class': predicted_classes,
            'prob': predicted_probs
        }

        export_outputs = {
          'predict': tf.estimator.export.PredictOutput(outputs=predictions)
        }

        return tf.estimator.EstimatorSpec(mode, predictions=predictions, export_outputs=export_outputs)

    # compute loss
    loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)

    # create training op with cosine annealing for learning rate
    if mode == tf.estimator.ModeKeys.TRAIN:
        global_step = tf.train.get_global_step()

        learning_rate = tf.train.cosine_decay(learning_rate=learning_rate_start, global_step=global_step, 
                                              alpha=0.05, decay_steps=learning_rate_steps)

        optimizer = tf.train.AdagradOptimizer(learning_rate=learning_rate)
        optimizer = tf.contrib.estimator.clip_gradients_by_norm(optimizer, 5.0)


        update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
        with tf.control_dependencies(update_ops):
            train_op = optimizer.minimize(loss, global_step=global_step)

        return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

    # compute evaluation metrics
    eval_metric_ops = {
        'accuracy': tf.metrics.accuracy(labels=labels, predictions=predicted_classes),
        'auc': tf.metrics.auc(labels=labels, predictions=predicted_probs[:, 1])
    }
    return tf.estimator.EstimatorSpec(mode, loss=loss, eval_metric_ops=eval_metric_ops)

Train classifier

# build classifier
!rm -Rf $MODEL_LOG

epoch_size = 25000
num_epochs = 5
batch_size = 10
num_steps = epoch_size * num_epochs / batch_size // 1000 * 1000

params = dict(
    dropout=0.2,
    conv_filters=500,
    dense_units=100,
    learning_rate_start=0.1,
    learning_rate_steps=num_steps
)

ckpt_config = tf.estimator.RunConfig(keep_checkpoint_max=num_epochs)

classifier = tf.estimator.Estimator(model_fn=make_model,
                                    params=params,
                                    model_dir=MODEL_LOG,
                                    config=ckpt_config)
# train classifier
train_stats = []
for i in range(num_epochs):
    print("Starting epoch {}/{}...".format(i+1, num_epochs))
    classifier.train(input_fn=lambda: train_input_fn(batch_size=batch_size))
    ckpt = classifier.latest_checkpoint()
    train_auc = classifier.evaluate(input_fn=lambda: train_input_fn())['auc']
    test_auc = classifier.evaluate(input_fn=lambda: test_input_fn())['auc']
    train_stats.append((ckpt, train_auc, test_auc))

train_stats = pd.DataFrame(train_stats, columns=['ckpt', 'train_auc', 'test_auc'])
Starting epoch 1/5...
Starting epoch 2/5...
Starting epoch 3/5...
Starting epoch 4/5...
Starting epoch 5/5...

Evaluate classifier

# plot train stats
ind = np.arange(len(train_stats)) + 1
width = 0.35

fig, ax = plt.subplots()
train_bar = ax.bar(ind - width/2, train_stats['train_auc'].round(4), width, color='SkyBlue', label='Train')
test_bar = ax.bar(ind + width/2, train_stats['test_auc'].round(4), width,  color='IndianRed', label='Test')

# adds labels to a bar chart series
def autolabel(ax, rects, xpos='center'):
    ha = {'center': 'center', 'right': 'left', 'left': 'right'}
    offset = {'center': 0.5, 'right': 0.57, 'left': 0.43}  # x_txt = x + w*off
    for rect in rects:
        height = rect.get_height()
        ax.text(rect.get_x() + rect.get_width()*offset[xpos], 1.01*height,
                '{}'.format(height), ha=ha[xpos], va='bottom')

autolabel(ax, train_bar, "center")
autolabel(ax, test_bar, "center")

ax.set_ylabel('AUC')
ax.set_xlabel('Epochs')
ax.set_xticks(ind)
ax.legend()
ax.set_ylim(0.8, 1.1)

plt.show()

png

# overall stats
best_ckpt = train_stats.sort_values(by=['test_auc'], ascending=False)['ckpt'].values[0]

train_stats = classifier.evaluate(input_fn=train_input_fn, checkpoint_path=best_ckpt)
test_stats = classifier.evaluate(input_fn=test_input_fn, checkpoint_path=best_ckpt)

train_stats = pd.DataFrame.from_dict(train_stats, orient='index', columns=['train'])
test_stats = pd.DataFrame.from_dict(test_stats, orient='index', columns=['test'])
stats = train_stats.join(test_stats)
stats
train test
loss 0.088654 0.230451
auc 0.997005 0.969034
global_step 12500.000000 12500.000000
accuracy 0.973600 0.911200

Export

def serving_input_fn():
    review = tf.placeholder(dtype=tf.string)
    label = tf.zeros(dtype=tf.int64, shape=[1, 1]) # just a placeholder

    transformed_features = tft_output.transform_raw_features({'review': review, 'label': label})

    return tf.estimator.export.ServingInputReceiver(transformed_features, {'review': review})


export_path = classifier.export_savedmodel(export_dir_base='exports',
                                           serving_input_receiver_fn=serving_input_fn,
                                           checkpoint_path=best_ckpt)

export_path = export_path.decode('utf-8')

Link to all code: https://github.com/donaldrauscher/movie-reviews-tf

Building and Deploying a Deep Learning Model Part 1: Using tf.Transform For Input Pipelines

02 September ’18

This is part 1 in a 3-part series (part 2, part 3) on building and deploying a deep learning model for the popular ACL 2011 IMDB dataset. In this part, I tackle data preprocessing.

===

The sklearn.preprocessing module has some great utility functions and transformer classes (e.g. scaling, encoding categorical features) for converting raw data into a numeric representation that can be modelled. How do we do this in the context of Tensorflow? And how do we ensure serving-time preprocessing transformations are exactly as those performed during training? The solution: tf.Transform.

Source: https://ai.googleblog.com/2017/02/preprocessing-for-machine-learning-with.html

You can use tf.Transform to construct preprocessing pipelines that can be run as part of a Tensorflow graph. tf.Transform prevents skew by ensuring that the data seen during serving is consistent with the data seen during training. Furthermore, you can execute tf.Transform pipelines at scale with Apache Beam, a huge advantage when preparing large datasets for training. Currently, you can only use tf.Transform in Python 2 since Apache Beam doesn't yet have Python 3 support.

Here is the code that I used to preprocess my data. I start by converting raw data into TFRecords files, then I transform those TFRecords files with tf.Transform.

# this pulls out our proper nouns and treats them as single words
def proper_preprocessing(review):
    proper = r"([A-Z]([a-z]+|\.)(?:\s+[A-Z]([a-z]+|\.))*(?:\s+[a-z][a-z\-]+){0,2}\s+[A-Z]([a-z]+|\.)(?:\s+[0-9]+)?)"
    space_between_brackets = r"[\.\s]+(?=[^\[\]]*]])"
    brackets = r"(?:[\[]{2})(.*?)(?:[\]]{2})"

    review = re.sub(proper, '[[\\1]]', review)
    review = re.sub(space_between_brackets, '~', review)
    review = re.sub(brackets, '\\1', review)
    return review
# load into TFRecords
def load_data(g, out):
    inputs = glob.glob(g)
    np.random.shuffle(inputs)
    with tf.python_io.TFRecordWriter(out) as writer:
        for i in inputs:
            label = 1 if i.split('/')[2] == 'pos' else 0
            with open(i, 'r') as f:
                review = f.read()

            example = tf.train.Example()
            example.features.feature['review'].bytes_list.value.append(proper_preprocessing(review))
            example.features.feature['label'].int64_list.value.append(label)

            writer.write(example.SerializeToString())


load_data('aclImdb/train/[posneg]*/*.txt', TRAIN_PATH)
load_data('aclImdb/test/[posneg]*/*.txt', TEST_PATH)
# schema for raw data
RAW_DATA_FEATURE = {
    'review': tf.FixedLenFeature(shape=[1], dtype=tf.string),
    'label': tf.FixedLenFeature(shape=[1], dtype=tf.int64)
}

RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec(RAW_DATA_FEATURE))
# train our tft transformer
with beam.Pipeline() as pipeline:
    with beam_impl.Context(temp_dir=tempfile.mkdtemp()):
        coder = tft.coders.ExampleProtoCoder(RAW_DATA_METADATA.schema)

        train_data = (
            pipeline
            | 'ReadTrain' >> tfrecordio.ReadFromTFRecord(TRAIN_PATH)
            | 'DecodeTrain' >> beam.Map(coder.decode))

        test_data = (
            pipeline
            | 'ReadTest' >> tfrecordio.ReadFromTFRecord(TEST_PATH)
            | 'DecodeTest' >> beam.Map(coder.decode))


        # remove links, tags, quotes, apostraphes
        # bracketize proper nouns, names, and numbers
        # then lowercase, split by punctuation, and remove low frequency words
        def preprocessing_fn(inputs):
            remove = '|'.join(["https?:\/\/(www\.)?([^\s]*)", "<([^>]+)>", "\'", "\""])
            punctuation = r"([.,;!?\(\)\/])+"
            number_commas = r"([0-9]),([0-9])"

            reviews = tf.reshape(inputs['review'], [-1])

            reviews = tf.regex_replace(reviews, remove, '')
            reviews = tf.regex_replace(tf.regex_replace(reviews, punctuation, ' \\1 '), r"\s+", ' ')
            reviews = tf.regex_replace(reviews, number_commas, '\\1\\2')

            for letter in list('ABCDEFGHIJKLMNOPQRSTUVWXYZ'):
                reviews = tf.regex_replace(reviews, letter, letter.lower())

            terms = tf.string_split(reviews, ' ')
            terms_indices = tft.compute_and_apply_vocabulary(terms, frequency_threshold=5, num_oov_buckets=1, vocab_filename='vocab')

            return {
                'terms': terms,
                'terms_indices': terms_indices,
                'label': inputs['label']
            }


        (transformed_train_data, transformed_metadata), transform_fn = (
            (train_data, RAW_DATA_METADATA)
            | 'AnalyzeAndTransform' >> beam_impl.AnalyzeAndTransformDataset(preprocessing_fn))

        transformed_test_data, _ = (
            ((test_data, RAW_DATA_METADATA), transform_fn)
            | 'Transform' >> beam_impl.TransformDataset())

        transformed_data_coder = tft.coders.ExampleProtoCoder(transformed_metadata.schema)

        _ = (
            transformed_train_data
            | 'EncodeTrain' >> beam.Map(transformed_data_coder.encode)
            | 'WriteTrain' >> tfrecordio.WriteToTFRecord(TRAIN_TRANSFORMED_PATH))

        _ = (
            transformed_test_data
            | 'EncodeTest' >> beam.Map(transformed_data_coder.encode)
            | 'WriteTest' >> tfrecordio.WriteToTFRecord(TEST_TRANSFORMED_PATH))

        _ = (
            transform_fn
            | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn(TFT_OUT_PATH))

NOTE: RE2 does not support constructs for which only backtracking solutions are known to exist. Thus, backreferences and look-around assertions are not supported! As a result, I can't put my logic for identifying movie names / proper nouns into tf.regex_replace(...).

While I have found tf.Transform super-useful, we are still constrained by preprocessing that can be done with native TF ops! tf.py_func lets you insert a Python function as a TF op. However, a documented limitation is that it is not serialized in the GraphDef, so it cannot be used for serving, which requires serializing the model and restoring in a different environment. This has prevended me from doing more complicated text preprocessing steps like Porter stemming. Nevertheless, I still love tf.Transform, an unsung hero of the TF ecosystem!

Link to all code: https://github.com/donaldrauscher/movie-reviews-tf