DonaldRauscher.com

A Blog About D4T4 & M47H

Setting up Apache Airflow on GKE

06 February ’18

Historically, I have used Luigi for a lot of my data pipelining. Recently, however, I have started experimenting with Airflow for a variety of reasons. Some things I really like about Airflow:

  • Easier to parallize - Luigi can only be scaled locally. You can create multiple worker threads by passing --workers N when kicking off a job, but you cannot parallelize Luigi jobs across multiple machines! Airflow parallelizes quite well. For instance, you can use Celery to scale out your workers.
  • Superior scheduler - The Luigi "central scheduler" is a bit of a misnomer; it doesn't actually schedule anything! Its main purpose is to prevent worker threads from running the same task concurrently. That's it. You still need to initiate Luigi jobs with a cronjob. The Airflow scheduler is much more useful. You can use it to set up a cronjob-like schedule for a DAG and even initiate retries following errors.
  • Connection management - Airflow has a nice mechanism for organizing connections to your resources. This is really useful, especially in a multiuser environment. It allows you to avoid storing secrets in .gitignore'd config files all over the place.
  • Better ongoing support - Luigi, originally open sourced at Spotify, is currently maintained on a "for fun basis" by Arash Rouhani, who currently works at Google. Meanwhile, Airflow, originally open sourced at Airbnb, is being incubated by Apache.

Given that I have been on a Docker/Kubernetes kick of-late, I decided to spend some time setting up Airflow on GKE. I leveraged an awesome Docker image with Airflow from Matthieu Roisil. I used a Postgres instance on CloudSQL for the Airflow meta database and Redis as the Celery backend. Also used git-sync sidecar container to continuously sync DAGs and plugins on running cluster, so you only need to rebuild the Docker image when changing the Python environment! Finally, I used Terraform for managing all my GCP infrastructure.

Terraform Configuration

# infrastructure.tf

variable "project" {}

variable "postgres_user" {
  default = "airflow"
}
variable "postgres_pw" {
  default = "airflow"
}

variable "region" {
  default = "us-central1"
}

variable "zone" {
  default = "us-central1-f"
}

provider "google" {
  version = "~> 1.5"
  project = "${var.project}"
  region = "${var.region}"
}

resource "google_compute_global_address" "airflow-static-ip" {
  name = "airflow-static-ip"
}

resource "google_compute_disk" "airflow-redis-disk" {
  name  = "airflow-redis-disk"
  type  = "pd-ssd"
  size = "200"
  zone  = "${var.zone}"
}

resource "google_sql_database_instance" "airflow-db" {
  name = "airflow-db"
  database_version = "POSTGRES_9_6"
  region = "${var.region}"
  settings {
    tier = "db-g1-small"
  }
}

resource "google_sql_database" "airflow-schema" {
  name = "airflow"
  instance = "${google_sql_database_instance.airflow-db.name}"
}

resource "google_sql_user" "proxyuser" {
  name = "${var.postgres_user}"
  password = "${var.postgres_pw}"
  instance = "${google_sql_database_instance.airflow-db.name}"
  host = "cloudsqlproxy~%"
}

resource "google_container_cluster" "airflow-cluster" {
  name = "airflow-cluster"
  zone = "${var.zone}"
  initial_node_count = "1"
  node_config {
    machine_type = "n1-standard-4"
    oauth_scopes = ["https://www.googleapis.com/auth/devstorage.read_only"]
  }
}

Kubernetes Manifest

Note: I packaged all Kubernetes resources in a Helm chart. Helm has several features (e.g. named templates, value substitutions) that allow you write your Kubernetes manifests in a more DRY way.

# config.yaml
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: config-airflow
data:
  EXECUTOR: Celery
  POSTGRES_USER: airflow
  POSTGRES_DB: airflow
  POSTGRES_HOST: postgres
  POSTGRES_PORT: "5432"
  REDIS_HOST: redis
  REDIS_PORT: "6379"
  FLOWER_PORT: "5555"
  {{- if .Values.fernetKey }}
  FERNET_KEY: {{ .Values.fernetKey }}
  {{- end }}
  AIRFLOW__CORE__DAGS_FOLDER: "/git/git/dags/"
  AIRFLOW__CORE__PLUGINS_FOLDER: "/git/git/plugins/"
  AIRFLOW__CORE__LOAD_EXAMPLES: "0"
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: config-git-sync
data:
  GIT_SYNC_REPO: {{ .Values.dagRepo }}
  GIT_SYNC_DEST: git
# db.yaml
---
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: postgres
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: airflow
        tier: postgres
    spec:
      restartPolicy: Always
      containers:
        - name: cloudsql-proxy
          image: gcr.io/cloudsql-docker/gce-proxy:1.11
          command: ["/cloud_sql_proxy", "--dir=/cloudsql",
                    "-instances={{ .Values.projectId }}:us-central1:airflow-db=tcp:0.0.0.0:5432",
                    "-credential_file=/secrets/cloudsql/credentials.json"]
          ports:
            - name: postgres
              containerPort: 5432
          volumeMounts:
            - name: cloudsql-instance-credentials
              mountPath: /secrets/cloudsql
              readOnly: true
            - name: ssl-certs
              mountPath: /etc/ssl/certs
            - name: cloudsql
              mountPath: /cloudsql
      volumes:
        - name: cloudsql-instance-credentials
          secret:
            secretName: cloudsql-instance-credentials
        - name: cloudsql
          emptyDir:
        - name: ssl-certs
          hostPath:
            path: /etc/ssl/certs
---
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: redis
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: airflow
        tier: redis
    spec:
      restartPolicy: Always
      containers:
        - name: redis
          image: redis:3.0-alpine
          ports:
            - name: redis
              containerPort: 6379
          volumeMounts:
            - name: redis-disk
              mountPath: /data/redis
      volumes:
        - name: redis-disk
          gcePersistentDisk:
            pdName: airflow-redis-disk
            fsType: ext4
# ingress.yaml
---
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: ingress
  annotations:
    kubernetes.io/ingress.global-static-ip-name: airflow-static-ip
    kubernetes.io/tls-acme: "true"
spec:
  tls:
  - secretName: airflow-tls
    hosts:
    - web.{{ .Values.domain }}
    - flower.{{ .Values.domain }}
  rules:
  - host: web.{{ .Values.domain }}
    http:
      paths:
      - backend:
          serviceName: web
          servicePort: 8080
  - host: flower.{{ .Values.domain }}
    http:
      paths:
      - backend:
          serviceName: flower
          servicePort: 5555
# service.yaml
---
  apiVersion: v1
  kind: Service
  metadata:
    name: web
  spec:
    type: NodePort
    selector:
      app: airflow
      tier: web
    ports:
      - name: web
        port: 8080
  ---
  apiVersion: v1
  kind: Service
  metadata:
    name: flower
  spec:
    type: NodePort
    selector:
      app: airflow
      tier: flower
    ports:
      - name: flower
        port: 5555
  ---
  kind: Service
  apiVersion: v1
  metadata:
    name: postgres
  spec:
    type: ClusterIP
    selector:
      app: airflow
      tier: postgres
    ports:
      - name: postgres
        port: 5432
        protocol: TCP
  ---
  kind: Service
  apiVersion: v1
  metadata:
    name: redis
  spec:
    type: ClusterIP
    selector:
      app: airflow
      tier: redis
    ports:
      - name: redis
        port: 6379
# deploy.yaml
{{- define "airflow" -}}
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: {{ .name }}
spec:
  replicas: {{ .replicas | default 1 }}
  template:
    metadata:
      labels:
        app: airflow
        tier: {{ .name }}
    spec:
      restartPolicy: Always
      containers:
        - name: web
          image: gcr.io/{{ .projectId }}/airflow-gke:latest
          ports:
            - name: web
              containerPort: 8080
          volumeMounts:
          - name: dagbag
            mountPath: /git
          envFrom:
          - configMapRef:
              name: config-airflow
          {{- if eq .name "web" }}
          livenessProbe:
            httpGet:
              path: /
              port: 8080
            initialDelaySeconds: 60
            timeoutSeconds: 30
          readinessProbe:
            httpGet:
              path: /
              port: 8080
            initialDelaySeconds: 60
            timeoutSeconds: 30
          {{- end }}
          command: ["/entrypoint.sh"]
          args:  {{ .commandArgs }}
        - name: git-sync
          image: gcr.io/google_containers/git-sync:v2.0.4
          volumeMounts:
          - name: dagbag
            mountPath: /git
          envFrom:
          - configMapRef:
              name: config-git-sync
      volumes:
        - name: dagbag
          emptyDir: {}
{{- end -}}

---
{{- $_ := set .Values.web "projectId" .Values.projectId }}
{{- template "airflow" .Values.web }}
---
{{- $_ := set .Values.scheduler "projectId" .Values.projectId }}
{{- template "airflow" .Values.scheduler }}
---
{{- $_ := set .Values.workers "projectId" .Values.projectId }}
{{- template "airflow" .Values.workers }}
---
{{- $_ := set .Values.flower "projectId" .Values.projectId }}
{{- template "airflow" .Values.flower }}

Deploy Instructions

(1) Store project id and Fernet key as env variables; create SSL cert / key

export PROJECT_ID=$(gcloud config get-value project -q)

if [ ! -f '.keys/fernet.key' ]; then
  export FERNET_KEY=$(python -c "from cryptography.fernet import Fernet; FERNET_KEY = Fernet.generate_key().decode(); print(FERNET_KEY)")
  echo $FERNET_KEY > .keys/fernet.key
else
  export FERNET_KEY=$(cat .keys/fernet.key)
fi

(2) Create Docker image and upload to Google Container Repository

docker build -t airflow-gke:latest .
docker tag airflow-gke gcr.io/${PROJECT_ID}/airflow-gke:latest
gcloud docker -- push gcr.io/${PROJECT_ID}/airflow-gke

(3) Create infrastructure with Terraform

Note: You will also need to create a Service Account for the CloudSQL proxy in Kubernetes. Create that (Role = "Cloud SQL Client"), download the JSON key, and attach as secret. Stored in .keys/airflow-cloudsql.json in this example.

terraform apply -var project=${PROJECT_ID}

gcloud container clusters get-credentials airflow-cluster
gcloud config set container/cluster airflow-cluster

kubectl create secret generic cloudsql-instance-credentials \
  --from-file=credentials.json=.keys/airflow-cloudsql.json

(4) Set up Helm / Kube-Lego for TLS

Note: You only need to set up kube-lego if you want to set up TLS using Let's Encrypt. I only set up HTTPS because I secured my instance with Cloud IAP, which requires a HTTPS load balancer.

kubectl create serviceaccount -n kube-system tiller
kubectl create clusterrolebinding tiller-binding --clusterrole=cluster-admin --serviceaccount kube-system:tiller
helm init --service-account tiller

kubectl create namespace kube-lego

helm install \
  --namespace kube-lego \
  --set config.LEGO_EMAIL=donald.rauscher@gmail.com \
  --set config.LEGO_URL=https://acme-v01.api.letsencrypt.org/directory \
  --set config.LEGO_DEFAULT_INGRESS_CLASS=gce \
  stable/kube-lego

(5) Deploy with Kubernetes

helm install . \
  --set projectId=${PROJECT_ID} \
  --set fernetKey=${FERNET_KEY}

Test Pipeline

The example pipeline (citibike.py) streams data from this Citibike API into Google BigQuery. I had a lot of issues with the GCP contrib classes in Airflow (BQ hook did not support BQ streaming, base GCP hook based on now-deprecated oauth2client library instead of google-auth) so I built my own plugin!

Note: To run Citibike example pipeline, will need to create a Service Account with BigQuery access and add to the google_cloud_default connection in the Airflow UI.

---

Overall, I'm really excited to start using Airflow for more of my data pipelining. Here is a link to all my code on GitHub. Cheers!

Quick and Easy BI: Setting up Redash on GKE

31 December ’17

Professionally, I have worked quite a lot with BI platforms Looker and Tableau. They are great BI platforms for an organization, though probably too heavy (and too expensive) for a small project or a bootstrapping startup. Sometimes you just need something where you can write queries and dump them into a visualization. Recently, I was looking to implement a lightweight BI tool for a personal project. I chose to use Redash, which you can self-host on your own infrastructure. This post documents how to set up Redash on Google Cloud using GKE. Because I am using CloudSQL as the Postgres backend and a persistent drive for Redis, we can delete our cluster when we're not using it and spin it back up as needed, without losing any data!

Infrastructure Setup

We will use the following Google Cloud components to set up Redash:

  • Postgres DB (via CloudSQL)
  • Persistent disk for Redis instance
  • Kubernetes cluster for Redash Docker image

Here is a Terraform configuration which defines all the necessary infrastructure:

# infrastructure.tf

variable "project" {}

variable "postgres_user" {
  default = "redash"
}
variable "postgres_pw" {
  default = "hsader"
}

variable "region" {
  default = "us-central1"
}

variable "zone" {
  default = "us-central1-f"
}

provider "google" {
  version = "~> 1.4"
  project = "${var.project}"
  region = "${var.region}"
}

resource "google_compute_global_address" "redash-static-ip" {
  name = "redash-static-ip"
}

resource "google_compute_disk" "redash-redis-disk" {
  name  = "redash-redis-disk"
  type  = "pd-ssd"
  size = "200"
  zone  = "${var.zone}"
}

resource "google_sql_database_instance" "redash-db" {
  name = "redash-db"
  database_version = "POSTGRES_9_6"
  region = "${var.region}"
  settings {
    tier = "db-f1-micro"
  }
}

resource "google_sql_database" "redash-schema" {
  name = "redash"
  instance = "${google_sql_database_instance.redash-db.name}"
}

resource "google_sql_user" "proxyuser" {
  name = "${var.postgres_user}"
  password = "${var.postgres_pw}"
  instance = "${google_sql_database_instance.redash-db.name}"
  host = "cloudsqlproxy~%"
}

resource "google_container_cluster" "redash-cluster" {
  name = "redash-cluster"
  zone = "${var.zone}"
  initial_node_count = "1"
  node_config {
    machine_type = "n1-standard-4"
  }
}

Create our infrastructure with Terraform and install Helm Tiller on our Kubernetes cluster. You will also need to create a service account that the CloudSQL proxy on Kubernetes will use. Create that (Role = "Cloud SQL Client"), download the JSON key, and attach key as secret.

export PROJECT_ID=$(gcloud config get-value project -q)
terraform apply -var project=${PROJECT_ID}

gcloud container clusters get-credentials redash-cluster
gcloud config set container/cluster redash-cluster

helm init

kubectl create secret generic cloudsql-instance-credentials \
    --from-file=credentials.json=[PROXY_KEY_FILE_PATH]

Redash Deployment

Next, we need to deploy Redash on our Kubernetes cluster. I packaged my Kubernetes resources in a Helm chart, which you can use to inject values / variables via template directives (e.g. {{ ... }}). I used a Helm hook to set up the configuration and the database resources (CloudSQL proxy + Redis) and also run a job to initialize the Redash schema before deploying the app.

helm install . --set projectId=${PROJECT_ID}

Redash resources:

# config.yaml
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: config
  annotations:
    "helm.sh/hook": pre-install
    "helm.sh/hook-weight": "1"
data:
  REDASH_DATABASE_URL: postgresql://{{ .Values.postgres.user }}:{{ .Values.postgres.pw }}@postgres:5432/redash
  REDASH_REDIS_URL: "redis://redis:6379/0"
  PYTHONUNBUFFERED: "0"
  REDASH_LOG_LEVEL: "INFO"
# db.yaml
---
kind: Service
apiVersion: v1
metadata:
  name: postgres
  annotations:
    "helm.sh/hook": pre-install
    "helm.sh/hook-weight": "2"
spec:
  type: ClusterIP
  selector:
    app: redash
    tier: postgres
  ports:
    - name: postgres
      port: 5432
---
kind: Service
apiVersion: v1
metadata:
  name: redis
  annotations:
    "helm.sh/hook": pre-install
    "helm.sh/hook-weight": "2"
spec:
  type: ClusterIP
  selector:
    app: redash
    tier: redis
  ports:
    - name: redis
      port: 6379
---
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: postgres
  annotations:
    "helm.sh/hook": pre-install
    "helm.sh/hook-weight": "2"
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: redash
        tier: postgres
    spec:
      containers:
        - name: cloudsql-proxy
          image: gcr.io/cloudsql-docker/gce-proxy:1.11
          command: ["/cloud_sql_proxy", "--dir=/cloudsql",
                    "-instances={{ .Values.projectId }}:us-central1:redash-db=tcp:0.0.0.0:5432",
                    "-credential_file=/secrets/cloudsql/credentials.json"]
          ports:
            - name: postgres
              containerPort: 5432
          volumeMounts:
            - name: cloudsql-instance-credentials
              mountPath: /secrets/cloudsql
              readOnly: true
            - name: ssl-certs
              mountPath: /etc/ssl/certs
            - name: cloudsql
              mountPath: /cloudsql
      volumes:
        - name: cloudsql-instance-credentials
          secret:
            secretName: cloudsql-instance-credentials
        - name: cloudsql
          emptyDir:
        - name: ssl-certs
          hostPath:
            path: /etc/ssl/certs
---
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: redis
  annotations:
    "helm.sh/hook": pre-install
    "helm.sh/hook-weight": "2"
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: redash
        tier: redis
    spec:
      containers:
        - name: redis
          image: redis:3.0-alpine
          ports:
            - name: redis
              containerPort: 6379
          volumeMounts:
            - name: redis-disk
              mountPath: /data/redis
      volumes:
        - name: redis-disk
          gcePersistentDisk:
            pdName: redash-redis-disk
            fsType: ext4

Redash DB initialization job:

# init.yaml
---
kind: Job
apiVersion: batch/v1
metadata:
  name: init-db
  annotations:
    "helm.sh/hook": pre-install
    "helm.sh/hook-weight": "3"
spec:
  template:
    spec:
      restartPolicy: Never
      containers:
        - name: redash-init
          image: redash/redash:latest
          resources:
            requests:
              memory: 1Gi
          envFrom:
            - configMapRef:
                name: config
          args: ["create_db"]

Redash deployment:

# app.yaml
---
kind: Service
apiVersion: v1
metadata:
  name: redash
  annotations:
    kubernetes.io/ingress.global-static-ip-name: redash-static-ip
spec:
  type: LoadBalancer
  selector:
    app: redash
    tier: app
  ports:
    - port: 80
      targetPort: 5000
---
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: redash
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: redash
        tier: app
    spec:
      containers:
        - name: server
          image: redash/redash:latest
          resources:
            requests:
              memory: 1Gi
          ports:
            - containerPort: 5000
          envFrom:
            - configMapRef:
                name: config
          env:
            - name: REDASH_COOKIE_SECRET
              value: {{ .Values.cookieSecret }}
          args: ["server"]
        - name: workers
          image: redash/redash:latest
          resources:
            requests:
              memory: 1Gi
          envFrom:
            - configMapRef:
                name: config
          env:
            - name: WORKERS_COUNT
              value: "{{ .Values.numWorkers }}"
            - name: QUEUES
              value: "queries,scheduled_queries,celery"
          args: ["scheduler"]

---

You can find all of my code up on my GitHub here. Cheers!

High Cardinality Categoricals with Sklearn

18 December ’17

I used a Bayesian approach to encode high cardinality categorical variables in a Kaggle a few months back. My original implementation was in R. However, I have recently been doing most of my modeling in sklearn, so I decided to also implement this approach there as well.

This approach lends itself to the sklearn framework very well! The fit method uses MLE to estimate a and b for the prior distribution and calculates descriptive stats for each level. The transform method calculates posterior probabilities on what is assumed to be out-of-sample data. And the fit_transform method runs fit and then calculates posterior probabilities, leaving out the current sample, and applying some noise (deterring overfitting in tree-based models) if specified.

You can see this method implemented in a familiar friend: my hospital readmission model. Cheers!

import pandas as pd
import numpy as np

from scipy.optimize import minimize
from scipy.special import beta

from sklearn.base import BaseEstimator
from sklearn.utils.validation import check_is_fitted, column_or_1d
from sklearn.utils.multiclass import type_of_target

# beta binomial density function
@np.vectorize
def dbetabinom(a, b, k, n):
    n2 = np.clip(n, 0, 100)
    k2 = round(k * n2 / n)
    return beta(k2 + a, n2 - k2 + b) / beta(a, b)

# beta binomial log loss
def betabinom_ll(par, arg):
    return np.sum(-np.log(dbetabinom(par[0], par[1], arg[0], arg[1])))

# default params for MLE
mle_param_defaults = dict(method = "L-BFGS-B", x0 = [1,1], bounds = [(0.5, 500), (0.5, 500)])

# encodes single high cardinality categorical variable
class SingleHCCEncoder(BaseEstimator):

    def __init__(self, add_noise = True, noise_sd = 0.05, mle_params = mle_param_defaults):
        self.add_noise = add_noise
        self.noise_sd = noise_sd
        self.mle_params = mle_params
        self.a, self.b = None, None
        self.df, self.df_dict = None, None

    # calibrate a and b of beta distribution
    def fit_beta(self):
        check_is_fitted(self, 'df')
        k, n = self.df.k, self.df.n
        mle = minimize(fun = betabinom_ll, args = [k, n], **self.mle_params)
        self.a, self.b = mle.x

    # descriptive stats for each level
    def fit_df(self, x, y):
        df = pd.DataFrame(data = dict(x = x, y = y))
        df = df.groupby(['x']).agg(['sum', 'count', 'mean'])
        df.columns = ['k', 'n', 'p']
        self.df = df
        self.df_dict = df.to_dict(orient = "index")

    @np.vectorize
    def transform_one_loo(self, x, y):
        xval = self.df_dict.get(x, dict(k = 0, n = 0))
        return (xval['k'] + self.a - y) * 1.0 / (xval['n'] + self.a + self.b - 1)

    @np.vectorize
    def transform_one(self, x):
        xval = self.df_dict.get(x, dict(k = 0, n = 0))
        return (xval['k'] + self.a) * 1.0 / (xval['n'] + self.a + self.b)

    def fit(self, x, y):
        assert(type_of_target(y) == "binary")
        x = column_or_1d(x)
        y = column_or_1d(y)
        self.fit_df(x, y)
        self.fit_beta()
        return self

    def fit_transform(self, x, y):
        self.fit(x, y)
        if self.add_noise:
            noise = self.noise_sd * np.random.randn(len(x)) + 1
        else:
            noise = np.repeat(1, len(x))
        return self.transform_one_loo(self, x, y) * noise

    def transform(self, x):
        check_is_fitted(self, 'df_dict')
        x = column_or_1d(x)
        return self.transform_one(self, x)

# encodes multiple high cardinality categorical variables
class HCCEncoder(BaseEstimator):

    def __init__(self, columns, hcc_encode_params = {}, seed = 1):
        self.columns = columns
        self.hcc_encode_params = hcc_encode_params
        self.seed = seed
        self.labellers = {}

    def fit(self, df, y):
        for c in self.columns:
            hcc_encode_params = self.hcc_encode_params.get(c, {})
            labeller = SingleHCCEncoder(**hcc_encode_params)
            labeller.fit(df[c], y)
            self.labellers[c] = labeller
        return self

    def fit_transform(self, df, y):
        np.random.seed(self.seed)
        df = df.copy()
        for c in self.columns:
            hcc_encode_params = self.hcc_encode_params.get(c, {})
            labeller = SingleHCCEncoder(**hcc_encode_params)
            df[c] = labeller.fit_transform(df[c], y)
            self.labellers[c] = labeller
        return df

    def transform(self, df):
        df = df.copy()
        for c in self.columns:
            df[c] = self.labellers[c].transform(df[c])
        return df