In [1]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse
%matplotlib notebook

In [2]:
mu_vals, sigma_vals = [[2, 2], [2, -2], [-2, -2], [-2, 2]], [[[.1, 0], [0, .1]] for i in range(4)]

samples = []
for mu, sigma in zip(mu_vals, sigma_vals):
samples.append(np.random.multivariate_normal(mu, sigma, 500))


# Describing the General Gaussian Mixture Model¶

We can represent a Gaussian Mixture Model as follows

$$p(\mathbf{x}) = \sum_{k=1}^K \pi_k(\mathbf{x}) \mathcal{N}(\mathbf{x} | \mu_k, \Sigma_k)$$

In other words, given a random sample $\mathbf{x}$ we model the probability density function of $\mathbf{x}$ by a linear combination of Gaussian distributions. Alternatively, we can introduce a conditioning variable $\mathbf{z}$ and write the above model in the following way

$$p(\mathbf{x}) = \sum_{\mathbf{z}} p(\mathbf{z})p(\mathbf{x} | \mathbf{z})$$

where $\mathbf{z}$ is $K$-dimensional and has some $z_k = 1$ for some $k$ and $z_k = 0$ for all other values of $k$. Then we stipulate that $p(z_k = 1) = \pi_k$ and that $\pi_k$ is defined such that $0 \leq \pi_k \leq 1$ and $\sum_{k=1}^K \pi_k = 1$. Similarly, we state $p(\mathbf{x} | z_k = 1) = \mathcal{N}(\mathbf{x} | \mu_k, \Sigma_k)$. As such, we can use the 1-of-$K$ representation for $\mathbf{z}$ to write

$$p(z) = \prod_{k=1}^K \pi_k^{z_k} \\ p(\mathbf{x} | \mathbf{z}) = \prod_{k=1}^K \mathcal{N}(\mathbf{x} | \mu_k, \Sigma_k)^{z_k}$$

The marginal distribution $p(\mathbf{x})$ of $\mathbf{x}$ can be acquired by summing the joint distribution $p(\mathbf{z})p(\mathbf{x}|\mathbf{z})$ over all possible 1-of-K representations of $\mathbf{z}$, i.e.

$$p(\mathbf{x}) = \sum_{\mathbf{z}} p(\mathbf{x}|\mathbf{z})p(\mathbf{z})$$

In this way, we have represented any observed data point $\mathbf{x}$ sampled from the mixture as a distribution over some latent variable $\mathbf{z}$.

# The EM Algorithm¶

The EM (Expectation Maximization) algorithm consists of two steps. In the first, we evaluate for each $\mathbf{x_n}$ the conditional probabilities $p(z_k = 1 | \mathbf{x_n})$ and denote its value $\gamma(z_{nk})$. This is called the expectation step, and the calculation is as follows

$$\gamma(z_{nk}) = \frac{\pi_k \mathcal{N}(\mathbf{x_n} | \mu_k, \Sigma_k)}{\sum_{j=1}^K \pi_j \mathcal{N}(\mathbf{x_n} | \mu_j, \Sigma_j)}$$

In the next step, we re-estimate the parameters $\mu_k, \Sigma_k, \pi_k$ in the following way

\begin{align*} \mu_k^{\text{new}} &= \frac{1}{N_k} \sum_{n=1}^N \gamma(z_{nk}) \mathbf{x_n} \\ \Sigma_k^{\text{new}} &= \frac{1}{N_k} \sum_{n=1}^N \gamma(z_{nk})(\mathbf{x_n} - \mu_k^{\text{new}})(\mathbf{x_n} - \mu_k^{\text{new}})^{T} \\ \pi_k^{\text{new}} &= \frac{N_k}{N} \end{align*}
In [3]:
def initialParams(K, muSpread, sigmaWidth):
means = np.array([np.random.uniform(low=-muSpread, high=muSpread, size=2) for _ in range(K)])

sigmas = np.array([sigmaWidth * np.eye(2) for _ in range(K)])

pi = np.array([1/K] * K)

return {'mu': means, 'sigma': sigmas, 'pi': pi, 'K': K}

def expectation(data, parameters):
# Evaluate responsibilities
N = len(data)
K = parameters['K']
mu = parameters['mu']
sigma = parameters['sigma']
pi = parameters['pi']

gammas = np.empty((N, K))

for n in range(N):
x_n = data[n, :]
samples = np.array([multivariateNormal(x_n, mu[j], sigma[j]) for j in range(K)])
for k in range(K):
gamma_num = pi[k] * samples[k]
gamma_denom = np.sum(pi * samples)
gamma_nk = gamma_num / gamma_denom

gammas[n, k] = gamma_nk

return gammas

def multivariateNormal(x, mu, sigma):
# Sample a single point from N(x | mu, sigma)
sigma_inv = np.linalg.inv(sigma)
diff = x - mu
sigma_sqrt = np.sqrt(np.linalg.det(sigma))
n = len(mu)

# Calculate inverse of coefficient
Z = (2 * np.pi) ** (n / 2) * sigma_sqrt

# Calculate sample
sample = (1 / Z) * np.exp( -.5 * np.dot(np.dot(diff.T, sigma_inv), diff))

return sample

def maximization(data, gammas, parameters):
mu_old = parameters['mu']
sigma_old = parameters['sigma']
pi_old = parameters['pi']
K = parameters['K']
N = len(data)

mu_new = np.zeros_like(mu_old)
sigma_new = np.zeros_like(sigma_old)
pi_new = np.empty_like(pi_old)

for k in range(K):
N_k = np.sum(gammas[:, k])
# Calculate new pi values
pi_new[k] = N_k / N
for n in range(N):
# Calculate new mu values
mu_new[k] += (1 / N_k) * data[n, :] * gammas[n, k]
for n in range(N):
# Calculate new sigma values
sigma_new[k] += (1 / N_k) * gammas[n, k] * \
np.outer((data[n, :] - mu_new[k]), (data[n, :] - mu_new[k]))

parameters['mu'] = mu_new
parameters['sigma'] = sigma_new
parameters['pi'] = pi_new

return parameters

def hasConverged(logLikelihood, prevLikelihood, threshold=.1):
if np.abs(logLikelihood - prevLikelihood) < threshold:
return True
return False

def calcLogLikelihood(data, parameters):
N = len(data)
K = parameters['K']
mu = parameters['mu']
sigma = parameters['sigma']
pi = parameters['pi']
logLikelihood = 0
partialSum = 1

for n in range(N):
logLikelihood += np.log(partialSum)
partialSum = 0
for k in range(K):
partialSum += pi[k] * multivariateNormal(data[n, :], mu[k], sigma[k])

return logLikelihood

def plotEM(data, parameters, ax):
colors = ['red', 'cyan', 'green', 'black']
K = parameters['K']

for k in range(K):
mu = parameters['mu'][k]
sigma = parameters['sigma'][k]
color = k % len(colors)
plotContour(mu, sigma, ax, color=colors[color], numContours=5)

def runEM(data, num_iters=10, parameters=None, plot=True):
if not parameters:
# Adjust initial parameter generation as desired.
parameters = initialParams(4, 1.5, .4)
prevLikelihood = 0
logLikelihood = 1

for _ in range(num_iters):
gammas = expectation(data, parameters)
parameters = maximization(data, gammas, parameters)
prevLikelihood = logLikelihood
logLikelihood = calcLogLikelihood(data, parameters)

if plot:
plotEMResults(data, parameters['K'], parameters)

def plotContour(mu,sigma,ax,color='blue',numContours=3):
eigvalues, eigvectors = np.linalg.eig(sigma)
primaryEigvector = eigvectors[:,0]
angle = computeRotation(primaryEigvector)
isoProbContours = [Ellipse(mu,
l*np.sqrt(eigvalues[0]),
l*np.sqrt(eigvalues[1]),
alpha=0.3/l,color=color,
angle=angle)
for l in range(1,numContours+1)]
[ax.add_patch(isoProbContour) for isoProbContour in isoProbContours]

def computeRotation(vector):
return (180/np.pi)*np.arctan2(vector[1],vector[0])

def dataScatter(data,color='grey'):
plt.scatter(data[:,0],data[:,1],color=color,edgecolor=None,alpha=0.1)
return

def plotEMResults(dataset,K,parameters):
plt.figure()
dataScatter(dataset)
for idx in range(K):
mu = parameters['mu'][idx]
sigma = parameters['sigma'][idx]
color = idx % (len(colors))
plotContour(mu,sigma,plt.gca(),color=colors[color],numContours=5)

In [4]:
fig, ax = plt.subplots(1)
colors = ['red', 'cyan', 'green', 'black']
for sample, color in zip(samples, colors):
ax.plot(sample[:, 0], sample[:, 1], '.',  c=color, alpha=0.3, markersize=10);

In [5]:
data = np.vstack((samples[0], samples[1]))
data = np.vstack((data, samples[2]))
data = np.vstack((data, samples[3]))

In [6]:
runEM(data)