A Blog About D4T4 & M47H

Moving My Blog from Jekyll/GitHub to Pelican/GCS

28 May ’18

I recently moved this blog from Jekyll/GitHub to Pelican/GCS. Mainly, I wanted to move to a Python-based framework where I would have more flexibility to customize (e.g. add/create plugins). And cost isn't really a consideration as both options are free. GitHub pages is actually powered by Jekyll and gives you free hosting on GitHub's domain. The fully-rendered blog is <<1Gb, and you get 5Gb of GCS storage for free.

I used Google Container Builder for CI/CD to publish my website on a GCS bucket, drawing upon a lot of ideas from this tutorial. The build has the following steps:

  1. Pull from GitHub - This performs a shallow git clone of my repo on GitHub.
  2. Generate site with Pelican - I needed to create a custom Google Container Builder step for this. See Dockerfile below. In addition to installing Pelican and some other Python depenencies, this container also installs Sass for CSS.
  3. Push to GCS Bucket - This uploads the HTML generated by Pelican to a GCS bucket using gsutil rsync.

I needed to update a few settings on the GCS bucket to serve the site, namely setting a link to the index page (MainPageSuffix) and making the site globally readable. Finally, I set up build triggers; whenever I push to my master branch, it automatically triggers a build.

Overall, I'm loving Pelican so far. You can find my blog's new repo here. Cheers!


  - name:
    args: ['clone', '-b', '${_BRANCH}', '--single-branch', '--depth', '1', '']
  - name:${PROJECT_ID}/pelican:latest
    args: ["content", "-v"]
    dir: blog-pelican
  - name:
    entrypoint: gsutil
    args: ["-m", "rsync", "-r", "-c", "-d", "./output", "gs://${_SUB_DOMAIN}"]
    dir: blog-pelican
  _BRANCH: master
  _SUB_DOMAIN: www

Dockerfile for Pelican GCB step


ENV PATH /builder/dart-sass:${PATH}

COPY requirements.txt .

# requirements.txt:
# blinker==1.4        Markdown==2.6.11        pytz==2018.4
# docutils==0.14      MarkupSafe==1.0         six==1.11.0
# feedgenerator==1.9  pelican==3.7.1          Unidecode==1.0.22
# Jinja2==2.10        Pygments==2.2.0         webassets==0.12.1
# jsmin==2.2.2        python-dateutil==2.7.3

RUN pip install --no-cache-dir --upgrade setuptools \
  && pip install --no-cache-dir --upgrade -r requirements.txt

RUN apt-get update \
  && apt-get install -y wget \
  && rm -rf /var/lib/apt/lists/*

RUN wget -q -O /builder/dart-sass.tar.gz${SASS_VERSION}/dart-sass-${SASS_VERSION}-linux-x64.tar.gz \
  && tar xvzf /builder/dart-sass.tar.gz --directory=/builder \
  && rm /builder/dart-sass.tar.gz

ENTRYPOINT ["pelican"]

Using Word2Vec for "Code Names"

12 May ’18

"Code Names" Rules: People are divided into two teams. The board is comprised of 25 words divided into 4 categories: blue team, red team, neutral, and the death word. People are divided evenly into two teams (red and blue). In each round, two people from either team take turns giving 1 word clues. The goal is to get the other members of your team to guess your teams' words and NOT the other words, especially not the death word; if your team guesses the death word, you immediately lose.

It is a really fun game. I also thought it might be an interesting application for Word2Vec. Word2Vec is a two-layer neural network which models the linguistic contexts of words. There are two approaches to training Word2Vec: CBOW (continuous bag of words) and skip-gram. CBOW predicts a word from a window of surrounding words. Skip-gram uses a single word to predict words in the surrounding window. This is a nice summary. Also cool, you don't need to train your own Word2Vec model! Lots of people/organizations provide pre-trained word vectors that you can easily implement, e.g. Google News and Facebook.

I built a small app that uses Word2Vec to generate word hints for "Code Names". I used Python's gensim package to measure word similarities / generate hints using pre-trained word vectors from Stanford NLP's GloVe. The app itself is built using Plotly's Dash, which is analogous to Shiny for R. I packaged the entire thing in a Docker container.

Dash app (

import os

import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output

import pandas as pd
import numpy as np

from gensim.models import KeyedVectors

import plotly.figure_factory as ff

# initialize app
app = dash.Dash()
server = app.server

# load model
model = 'glove/w2v.{}.txt.gz'.format(os.getenv('GLOVE_MODEL', 'glove.6B.50d'))
word_vectors = KeyedVectors.load_word2vec_format(model, binary=False)

# precompute L2-normalized vectors (saves lots of memory)

# pandas df to html
def generate_table(df, max_rows=10):
    return html.Table(
        # header
        [html.Tr([html.Th(col) for col in df.columns])] +

        # body
            html.Td(df.iloc[i][col]) for col in df.columns
        ]) for i in range(min(len(df), max_rows))]

# generate some clues
def generate_hints(words):
        hints = word_vectors.most_similar(positive=words)
        hints = pd.DataFrame.from_records(hints, columns=['word','similarity'])
        return generate_table(hints)
    except KeyError as e:
        return html.Div(str(e))

# generate dendrogram for word similarity
def generate_dendro(words):
        similarities = np.array([word_vectors.distances(w, words) for w in words])
        figure = ff.create_dendrogram(similarities, labels=words)
        figure['layout'].update({'width': 800, 'height': 500})
        return figure
    except KeyError as e:

# set up app layout
app.layout = html.Div(children=[
    html.H1(children='Code Names Hints'),
        html.Tr([html.Td("All Words:"), html.Td("Words for Hints:")]),
        html.Tr([html.Td(dcc.Textarea(id='words-all', value='god zeus bat ball mountain cold snow', style={'width': 500})),
                 html.Td(dcc.Input(id='words', value='bat ball', type='text'))]),
        html.Tr([html.Td(dcc.Graph(id='dendro')), html.Td(html.Div(id='hints'))])

# set up app callbacks
    Output(component_id='dendro', component_property='figure'),
    [Input(component_id='words-all', component_property='value')]
def update_dendro(input_value):
    words = [w.lower() for w in input_value.strip().split(' ')]
    return generate_dendro(words)

    Output(component_id='hints', component_property='children'),
    [Input(component_id='words', component_property='value')]
def update_hints(input_value):
    words = [w.lower() for w in input_value.strip().split(' ')]
    return generate_hints(words)

# run
if __name__ == '__main__':


FROM python:3.5-slim

ENV GLOVE_MODEL glove.6B.200d


RUN apt-get update \
  && apt-get install -y unzip gzip wget \
  && rm -rf /var/lib/apt/lists/*

COPY requirements.txt ./
RUN chmod +x

RUN pip install -r requirements.txt

# requirements.txt:
# dash==0.21.1                    gensim==3.4.0
# dash-core-components==0.22.1    pandas==0.22.0
# dash-html-components==0.10.1    gunicorn==19.8.1
# dash-renderer==0.12.1           gevent==1.2.2

RUN wget -q \
  && unzip -d glove \
  && rm \
  && python -m gensim.scripts.glove2word2vec --input glove/${GLOVE_MODEL}.txt --output glove/w2v.${GLOVE_MODEL}.txt \
  && gzip glove/w2v.${GLOVE_MODEL}.txt \
  && rm glove/*.txt



echo Starting Gunicorn...
gunicorn app:server \
    --name code-names \
    --bind$PORT \
    --workers $GUNICORN_WORKERS \
    --preload \
    --worker-class gevent \
    --timeout 600 \
    --log-level info \

Example output:

Overall, it does...okay haha. In some cases, it does surprisingly well. For instance, the app provides "published" as a top hint for "book" and "penguin". However, the algorithm struggles to identify commonalities that may not be explicitly collocated in text. For instance, for "dog" and "whale", "mammal" might be a good hint. However, our app simply lists other animals, e.g. "cat" and "shark".

I'm hosting a version of the app here on And a link to my repo on GH. Cheers!

Doc2Vec + Dask + K8s for the Toxic Comment Classification Challenge

22 March ’18

The goal of this Kaggle challenge was to build a model to flag toxic Wikipedia comments. The training dataset included 159,571 Wikipedia comments which were labeled by human raters. Each comment was evaluated on 6 dimensions: toxic, severe toxic, obscene, threat, insult, and identity hate.

Model Approach

This challenge is a great application for Doc2Vec, where we treat each of the toxicity dimensions as a label. For Doc2Vec, I used the gensim package. I also used gensim's Phraser for combining words into common phrases. To put everything in a sklearn pipeline, I needed to create sklearn transformers/estimators for each step.

My final model was a two model blend of Doc2Vec and TF-IDF + LR. For the LR model, I used the nifty OneVsRestClassifier to build models for each of the 6 y-variables.

Hyperparameter Tuning

I tuned each input model individually and subsequently the blend. I used Dask.distributed, specifically the dask-searchcv package, to parallelize my hyperparameter tuning step. One of the big advantages of the dask-searchcv implementations of GridSearchCV and RandomizedSearchCV is that they avoid repeated work. Estimators with identical parameters and inputs will only be fit once! In my example, I tested the following grid for my TF-IDF + LR model:

param_grid = {
  'cv__lowercase': [True, False],
  'cv__ngram_range': [(1, 1), (1, 2)],
  'tfidf__norm': ['l1', 'l2', None],
  'lr__estimator__C': [0.01, 0.1],
  'lr__estimator__penalty': ['l1', 'l2']

Even though this parameter grid has 48 different combinations, GridSearchCV will only run the CountVectorizer step 4 times, the TF-IDF step 12 times, etc. Much more efficient!

Here's a snapshot of the Dask web UI during hyper parameter tuning:

Dask Cluster

I set up my Dask cluster using Kubernetes. And, of course, there was a very useful for this already. This Helm chart sets up a Dask scheduler + web UI, Dask worker(s), and a Jupyter Notebook instance. When installing the Helm chart, you can use an accompanying values.yaml file to specify which Python packages you need to install. I also used Terraform to create/scale my K8s cluster.

I created a modified version of this Dask Helm chart which adds a nodeSelector option for each of the deployments. In K8s, we can create two node pools: one for the worker pods and one for the Jupyter/scheduler pods. That way, when we want to add/remove workers, we can do so without taking down Jupyter!

I set up three scripts for initializing cluster, scaling up the number of nodes / workers, and destroying the cluster when we're done.

Note: The helm init --wait command will wait until the Tiller is running and ready to receive requests. Very useful for CI/CD workflows. You will need to be running v2.8.2 (most recent as of the time of this post) to use this.


import pandas as pd
import numpy as np
import yaml, re

from import storage
from io import BytesIO

from gensim.models.doc2vec import Doc2Vec, TaggedDocument
from gensim.models.phrases import Phrases, Phraser

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.base import TransformerMixin, BaseEstimator, clone
from sklearn.preprocessing import FunctionTransformer, StandardScaler
from sklearn.multiclass import OneVsRestClassifier
from sklearn.linear_model import LogisticRegression

from sklearn.feature_extraction.text import TfidfTransformer, CountVectorizer, strip_tags
from sklearn.feature_extraction.stop_words import ENGLISH_STOP_WORDS

from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
from sklearn.utils.validation import check_is_fitted

import distributed
from dask_ml.model_selection import GridSearchCV as GridSearchCVBase
# load the data
client_gcs = storage.Client()
bucket = client_gcs.get_bucket('djr-data')

def gcs_to_df(f):
    blob = bucket.blob(f)
    buf = BytesIO()
    return pd.read_csv(buf, encoding = "utf-8")

df_train = gcs_to_df("kaggle-jigsaw/train.csv")
df_test = gcs_to_df("kaggle-jigsaw/test.csv")
yvar = ['toxic', 'severe_toxic', 'obscene', 'threat', 'insult', 'identity_hate']
# initialize client for interacting with dask
# DASK_SCHEDULER_ADDRESS env variable specifies scheduler ip
client_dask = distributed.Client()
# correlation matrix
toxic severe_toxic obscene threat insult identity_hate
toxic 1.000000 0.308619 0.676515 0.157058 0.647518 0.266009
severe_toxic 0.308619 1.000000 0.403014 0.123601 0.375807 0.201600
obscene 0.676515 0.403014 1.000000 0.141179 0.741272 0.286867
threat 0.157058 0.123601 0.141179 1.000000 0.150022 0.115128
insult 0.647518 0.375807 0.741272 0.150022 1.000000 0.337736
identity_hate 0.266009 0.201600 0.286867 0.115128 0.337736 1.000000
df_train[yvar].apply(np.mean, axis=0)
toxic            0.095844
severe_toxic     0.009996
obscene          0.052948
threat           0.002996
insult           0.049364
identity_hate    0.008805
dtype: float64
# train/test split
xdata = df_train.comment_text
ydata = df_train[yvar]
xdata_train, xdata_eval, ydata_train, ydata_eval = train_test_split(xdata, ydata, test_size = 0.2, random_state = 1)
# return words from corpus
# TODO: also try r"([\w][\w']*\w)"
def tokenize(doc, token=r"(?u)\b\w\w+\b"):
    doc = strip_tags(doc.lower())
    doc = re.compile(r"\s\s+").sub(" ", doc)
    words = re.compile(token).findall(doc)
    return words

# remove stop words
def remove_stop_words(x, stop_words=ENGLISH_STOP_WORDS):
    return [i for i in x if i not in stop_words]
# wrapper for gensim Phraser
COMMON_TERMS = ["of", "with", "without", "and", "or", "the", "a"]
class PhraseTransformer(TransformerMixin, BaseEstimator):

    def __init__(self, common_terms=COMMON_TERMS):
        self.phraser = None
        self.common_terms = common_terms

    def fit(self, X, y=None):
        phrases = Phrases(X, common_terms=self.common_terms)
        self.phraser = Phraser(phrases)
        return self

    def transform(self, X):
        return X.apply(lambda x: self.phraser[x])
# for making tagged documents
# NOTE: can't use FunctionTransformer since TransformerMixin doesn't pass y to fit_transform anymore
class MakeTaggedDocuments(BaseEstimator):

    def fit(self, X, y):
        return self

    def transform(self, X, y=None):
        if y is not None:
            yvar = list(y.columns)
            tags = y.apply(lambda row: [i for i,j in zip(yvar, row) if j == 1], axis=1)
            return [TaggedDocument(words=w, tags=t) for w,t in zip(X, tags)]
            return [TaggedDocument(words=w, tags=[]) for w in X]

    def fit_transform(self, X, y):
        return self.transform(X, y)
# wrapper for gensim Doc2Vec
class D2VEstimator(BaseEstimator):

    def __init__(self, min_count=10, alpha=0.025, min_alpha=0.0001, vector_size=200, dm=0, epochs=20):
        self.min_count = min_count
        self.alpha = alpha
        self.min_alpha = min_alpha
        self.vector_size = vector_size = dm
        self.epochs = epochs
        self.yvar = None
        self.model = Doc2Vec(seed=1, hs=1, negative=0, dbow_words=0,
                             min_count=self.min_count, alpha=self.alpha, min_alpha=self.min_alpha,
                             vector_size=self.vector_size,, epochs=self.epochs)

    def get_tags(self, doc):
        vec = self.model.infer_vector(doc.words, self.model.alpha, self.model.min_alpha, self.model.epochs)
        return dict(self.model.docvecs.most_similar([vec]))

    def fit(self, X, y=None):
        self.model.train(X, epochs=self.model.epochs, total_examples=self.model.corpus_count)
        self.yvar = list(y.columns)
        return self

    def predict_proba(self, X):
        pred = [self.get_tags(d) for d in X]
        pred = pd.DataFrame.from_records(data=pred)
        return pred[self.yvar]
# blend predictions from multiple models
class Blender(FeatureUnion):

    def __init__(self, transformer_list, n_jobs=1, transformer_weights=None):
        self.transformer_list = transformer_list
        self.scaler_list = [(t, StandardScaler()) for t, _ in transformer_list]
        self.n_jobs = n_jobs
        default_transformer_weights = list(np.ones(len(transformer_list)) / len(transformer_list))
        self.transformer_weights = transformer_weights if transformer_weights else default_transformer_weights

    def transformer_weights(self):
        return self._transformer_weights

    def transformer_weights(self, values):
        self._transformer_weights = {t[0]:v for t,v in zip(self.transformer_list, values)}

    # don't need to check for fit and transform
    def _validate_transformers(self):

    # iterator with scalers
    def _iter_ss(self):
        get_weight = (self.transformer_weights or {}).get
        return [(t[0], t[1], s[1], get_weight(t[0])) for t, s in zip(self.transformer_list, self.scaler_list)]

    # also fit scalers
    def fit(self, X, y):
        super(Blender, self).fit(X, y)
        self.scaler_list = [(name, for name, trans, ss, _ in self._iter_ss()]
        return self

    # generate probabilities
    def predict_proba(self, X):
        Xs = [ss.transform(trans.predict_proba(X))*weight for name, trans, ss, weight in self._iter_ss()]
        return np.sum(Xs, axis=0)
# create pipeline
d2v_pipeline = Pipeline(steps=[
    ('tk', FunctionTransformer(func=lambda x: x.apply(tokenize), validate=False)),
    ('ph', PhraseTransformer()),
    ('sw', FunctionTransformer(func=lambda x: x.apply(remove_stop_words), validate=False)),
    ('doc', MakeTaggedDocuments()),
    ('d2v', D2VEstimator())

lr_pipeline = Pipeline(steps=[
    ('cv', CountVectorizer(min_df=5, max_features=50000, strip_accents='unicode',
                           stop_words='english', analyzer='word')),
    ('tfidf', TfidfTransformer(sublinear_tf=True, use_idf=True)),
    ('lr', OneVsRestClassifier(LogisticRegression(class_weight="balanced")))

pipeline = Blender(transformer_list=[('d2v', d2v_pipeline), ('lr', lr_pipeline)])
# for non-multimetric, don't require refit = True for best_params_ / best_score_
class GridSearchCV(GridSearchCVBase):

    # For multiple metric evaluation, refit is a string denoting the scorer that should be
    # used to find the best parameters for refitting the estimator
    def scorer_key(self):
        return self.refit if self.multimetric_ else 'score'

    def best_index(self):
        check_is_fitted(self, 'cv_results_')
        return np.flatnonzero(self.cv_results_['rank_test_{}'.format(self.scorer_key)] == 1)[0]

    def best_params_(self):
        return self.cv_results_['params'][self.best_index]

    def best_score_(self):
        return self.cv_results_['mean_test_{}'.format(self.scorer_key)][self.best_index]
# some functions for dealing with parameter grids
def add_prefix(prefix, x):
    return {'{}__{}'.format(prefix, k):v for k,v in x.items()}

def flatten_dict(x):
    temp = {}
    for k,v in x.items():
        if isinstance(v, dict):
            temp.update(add_prefix(k, flatten_dict(v.copy())))
            temp.update({k: v})
    return temp
# hyperparameter tuning
param_grid = {
    'd2v': {
        'd2v__min_count': [10, 25],
        'd2v__alpha': [0.025, 0.05],
        'd2v__epochs': [10, 20, 30],
        'd2v__vector_size': [200, 300]        
    'lr': {
        'cv__lowercase': [True, False],
        'cv__ngram_range': [(1, 1), (1, 2)],
        'tfidf__norm': ['l1', 'l2', None],
        'lr__estimator__C': [0.01, 0.1],
        'lr__estimator__penalty': ['l1', 'l2']        
    'blender': {
        'transformer_weights': [(0.3, 0.7), (0.4, 0.6), (0.5, 0.5), (0.6, 0.4), (0.7, 0.3)]        

# wrapper for hyperparameter tuning
def hyperparameter_tune(pipeline, param_grid):
    # create tuner
    tuner = GridSearchCV(pipeline, param_grid, scheduler=client_dask, scoring='roc_auc',
                         cv=3, refit=False, return_train_score=False)

    # determine optimal hyperparameters, ydata_train)
    print('Best params: %s' % (str(tuner.best_params_)))
    print('Best params score: %s' % (str(tuner.best_score_)))

    return tuner.best_params_

# load saved hyperparameters if available; o.w. tune
    with open('model_param_d2v.yaml', 'r') as f:
        param_optimal = yaml.load(f)

except IOError:
    param_optimal = {}

    # tune each model
    param_optimal['d2v'] = hyperparameter_tune(d2v_pipeline, param_grid['d2v'])
    param_optimal['lr'] = hyperparameter_tune(lr_pipeline, param_grid['lr'])

    # tune blender
    param_optimal.update(hyperparameter_tune(pipeline, param_grid['blender']))

    # flatten
    param_optimal = flatten_dict(param_optimal)

    # save best params
    with open('model_param_d2v.yaml', 'w') as f:
        yaml.dump(param_optimal, f)
Best params: {'d2v__alpha': 0.025, 'd2v__epochs': 30, 'd2v__min_count': 10, 'd2v__vector_size': 200}
Best params score: 0.9520673206887134
Best params: {'cv__lowercase': True, 'cv__ngram_range': (1, 1), 'lr__estimator__C': 0.1, 'lr__estimator__penalty': 'l2', 'tfidf__norm': 'l2'}
Best params score: 0.9764642394949188
Best params: {'transformer_weights': (0.3, 0.7)}
Best params score: 0.9774035665175447
# build model with optimal param
pipeline.set_params(**param_optimal), ydata_train)
    transformer_list=[('d2v', Pipeline(memory=None,
     steps=[('tk', FunctionTransformer(accept_sparse=False,
          func= at 0x7f39416d12f0>, inv_kw_args=None,
          inverse_func=None, kw_args=None, pass_y='deprecated',
          validate=False)), ('ph', PhraseTransformer(,
          solver='liblinear', tol=0.0001, verbose=0, warm_start=False),
    transformer_weights={'d2v': 0.3, 'lr': 0.7})
# apply to eval set
ydata_eval_pred = pipeline.predict_proba(xdata_eval)
# calculate auc
auc = [roc_auc_score(ydata_eval[y], ydata_eval_pred[:,i]) for i,y in enumerate(yvar)]
print('Model AUCs: %s' % auc)
print('Avg AUC: %s' % np.mean(auc))
Model AUCs: [0.9662283198414882, 0.9857095145804597, 0.982421955124849, 0.9849362663053255, 0.9757783792333873, 0.9768901227451926]
Avg AUC: 0.9786607596384505
# generate final model
pipeline_final = clone(pipeline)
pipeline_final.set_params(**param_optimal), ydata)
    transformer_list=[('d2v', Pipeline(memory=None,
     steps=[('tk', FunctionTransformer(accept_sparse=False,
          func= at 0x7f39416d12f0>, inv_kw_args=None,
          inverse_func=None, kw_args=None, pass_y='deprecated',
          validate=False)), ('ph', PhraseTransformer(,
          solver='liblinear', tol=0.0001, verbose=0, warm_start=False),
    transformer_weights={'d2v': 0.3, 'lr': 0.7})
# generate output
xdata_test = df_test.comment_text
ydata_test_pred = pipeline_final.predict_proba(xdata_test)
ydata_test_pred = pd.DataFrame(data=ydata_test_pred, columns=yvar)
ydata_test_pred['id'] =
ydata_test_pred.to_csv('submission.csv', index=False)


Pretty good! With more time, I definitely would have focused on adding more models to the stack, e.g. Naive Bayes and RF/XGBoost. A link to my repo on GH.