Expectation Maximization Algorithm and MAP Estimation#
Prepared by: Ken Sible (ksible@nd.edu, @kennethsible, 2023)
!wget -N "https://huggingface.co/datasets/papluca/language-identification/resolve/main/train.csv"
try: import nltk
except ImportError:
!pip install nltk
import nltk
nltk.download('punkt')
Introduction#
In this tutorial notebook, we introduce the expectation maximization (EM) algorithm for parameter estimation given partially observed data. We use a Naive Bayes classifier for language identification as a running example throughout the notebook. In order to train the classifier, we use maximum a posteriori (MAP) estimation, a statistical optimization method for estimating the parameters of a distribution given (fully or partially) observed data.
Naive Bayes Classifier#
Let \(\{(x^{(i)},y^{(i)}):x^{(i)}\in\{+1,-1\}^d,y^{(i)}\in\{1,2,\ldots,k\}\}_{i=1}^n\) be a training set where \(n\) is the number of documents, \(k\) is the number of labels and \(d\) is the number of features. In language identification, an example of text classification, the labels are languages and the features are tokens, which are meaningful sequences of characters. We define \(x^{(i)}_j\) to be \(+1\) if document \(i\) contains token \(j\) and \(-1\) otherwise. Hence, each component of the binary vector \(x^{(i)}\) represents either the presence or absence of a particular token in document \(i\). Now, we derive the Naive Bayes model for classification.
Let \(Y\) and \(X_1,X_2,\ldots,X_d\) be random variables corresponding to the label \(y\) and the vector components \(x_1,x_2,\ldots,x_d\), respectively. The Naive Bayes classifier is a generative model, so we need to estimate the parameters of the following joint probablity distribution: $\(\mathbb{P}(Y=y,X_1=x_1,X_2=x_2,\ldots,X_d=x_d)\)\( for all labels \)y\( paired with attribute values \)x_1,x_2,\ldots,x_d\(. Suppose that \)X_j\( is independent of \)X_{j’}\( for all \)j’\neq j\(, called the **Naive Bayes assumption**, which is equivalent to assuming that the positions of tokens are independent in the documents. Then, we observe that \)$
p(y\mid x_1,\ldots,x_d)\propto q(y)\prod_{j=1}^dq_j(x_j\mid y) $\( where we define \)p(y\mid x_1,\ldots,x_d)=\mathbb{P}(Y=y\mid X_1=x_1,\ldots,X_d=x_d)\(, called the **posterior distribution**. Note, the posterior distribution and the joint distribution \)\mathbb{P}(Y=y,X_1=x_1,\ldots,X_d=x_d)\( only differs by a constant factor \)\mathbb{P}(X_1=x_1,\ldots,X_d=x_d)$, called the marginal distribution.
In order to estimate the parameters of the Naive Bayes classifier, we use maximum a posteriori (MAP) estimation, which is equivalent to maximum likelihood estimation (MLE) if the prior distribution is uniform.
Maximum A Posteriori Estimation#
Given a training set \(\{(x^{(i)},y^{(i)}):x^{(i)}\in\{+1,-1\}^d,y^{(i)}\in\{1,2,\ldots,k\}\}_{i=1}^n\), the log-likelihood function \(L(\theta)\) for the classifier is $$
q(y)=\frac{\sum_{i=1}^n[![y^{(i)}=y]!]}{n}=\frac{\text{count}(y)}{n} $\( and \)\( q_j(x\mid y)=\frac{\sum_{i=1}^n[\![y^{(i)}=y\text{ and }x^{(i)}_j=x]\!]}{\sum_{i=1}^n[\![y^{(i)}=y]\!]}=\frac{\text{count}_j(x\mid y)}{\text{count}(y)}. \)$
from collections import Counter
import random, math
class BernoulliNB:
def __init__(self, labels: set[str], tokens: set[str]):
self.labels = labels
self.tokens = tokens
self.prior = {}
self.likelihood = {}
def initialize(self):
""" Initializes model parameters from random numbers """
# randomly sample numbers from a uniform distribution
distribution = [random.uniform(0.01, 0.99) for _ in self.labels]
# calculate normalizing constant for prior distribution
denominator = sum(distribution)
# assign normalized probabilities to prior distribution
for label, probability in zip(self.labels, distribution):
self.prior[label] = probability / denominator
for label in self.labels:
if label not in self.likelihood:
self.likelihood[label] = {}
# randomly sample numbers from a uniform distribution
distribution = [random.uniform(0.01, 0.99) for _ in self.tokens]
# assign normalized probabilities to likelihood distribution
for token, probability in zip(self.tokens, distribution):
self.likelihood[label][token] = probability
def train(self, data: list[tuple[str, str]]) -> None:
""" Trains a Bernoulli Naive Bayes classifier using MAP estimation
Inputs:
data - list of (document, label) pairs
"""
# count the number of occurrences of labels and tokens
count, total = {}, Counter()
for document, label in data:
if label not in count:
count[label] = Counter()
for token in set(document):
count[label][token] += 1
total[label] += 1
# update the model parameters from the frequency estimates
for label in self.labels:
probability = total[label] / len(data)
self.prior[label] = probability
self.likelihood[label] = {}
for token in self.tokens:
# apply add-one Laplace smoothing for a Bernoulli distribution
probability = (count[label][token] + 1) / (total[label] + 2)
self.likelihood[label][token] = probability
def calculate_posterior(self, document: str) -> dict[str, float]:
""" Calculates a probability distribution over labels
Inputs:
document - string of text
Outputs:
posterior - probability distribution over labels
"""
# calculate the posterior distribution from the model parameters
document, posterior = set(document), {}
for label in self.labels:
posterior[label] = math.log(self.prior[label])
for token in self.tokens:
# switch to log space to avoid arithmetic underflow
if token in document:
posterior[label] += math.log(self.likelihood[label][token])
else:
posterior[label] += math.log(1 - self.likelihood[label][token])
return posterior
def classify(self, document: str) -> str:
""" Classifies a document using a Bernoulli Naive Bayes classifier
Inputs:
document - string of text
Outputs:
label - document label
"""
# calculate the posterior distribution over labels
posterior = self.calculate_posterior(document)
# return the label with the highest probability
return max(posterior, key=posterior.get) # argmax(posterior)
Supervised Learning (with Labeled Data)#
import csv
# import the language identification data
data, train_data, valid_data = {}, [], []
with open('train.csv') as csv_file:
for i, sample in enumerate(csv.DictReader(csv_file)):
if sample['labels'] in ('en', 'de', 'fr', 'es'):
if sample['labels'] not in data:
data[sample['labels']] = []
document = nltk.word_tokenize(sample['text'])
data[sample['labels']].append(document)
# split the data into training and validation
train_data = [(x_i, y) for y, x in data.items() for x_i in x[:75]]
valid_data = [(x_i, y) for y, x in data.items() for x_i in x[75:100]]
# extract the labels and tokens from the data
labels, tokens = set(), set()
for document, label in train_data:
labels.add(label)
for token in document:
tokens.add(token)
# construct a Bernoulli Naive Bayes classifier
model = BernoulliNB(labels, tokens)
model.train(train_data)
# calculate the accuracy using the validation data
count, total = 0, 0
for text, label in valid_data:
if model.classify(text) == label:
count += 1
total += 1
print(f'{count / total * 100}% Accuracy')
96.0% Accuracy
Exercise 1#
In practice, we use subwords of length \(n\), called \(n\)-grams, instead of entire words for language identification. In the Python implementation, change the features from entire words to bigrams (\(n=2\)) or trigrams (\(n=3\)), and compare both the accuracy and the number of parameters.
Hint: In Cell [4], replace nltk.word_tokenize(text)
with list(nltk.bigrams(text))
or list(nltk.trigrams(text))
.
Exercise 2#
In the Python implementation, we used add-one Laplace smoothing (see below). Why do we require smoothing with maximum a posteriori estimation?
where \(\alpha\) is a hyperparameter and \(d\) is the dimension of the multinomial distribution (\(2\) for Bernoulli and \(|V|\) for categorical where \(V\) is the vocabulary).
Exercise 3#
We implemented a Bernoulli Naive Bayes classifier where each likelihood \(p(x_j\mid y)\) is a Bernoulli distribution and each \(x_j\) is binary variable representing the presence or absence of feature \(j\). However, \(p(x_j\mid y)\) can be any distribution, but Bernoulli (discrete), Multinomial (discrete) and Gaussian (continuous) are the most common. In a multinomial Naive Bayes classifier, each likelihood \(p(x_j\mid y)\) is categorical distribution and each \(x_j\) is the number of times feature \(j\) is observed. In the Python implementation, change the features \(\{x_j\}_{j=1}^k\) from binary variables to frequency counts and update the likelihoods.
Hint: The likelihoods \(p(x_j\mid y)\) are estimated by calculating the fraction of documents for Bernoulli and the fraction of tokens for multinomial (see below).
Bernoulli#
where \(x_j\in\{+1,-1\}\) for all \(j\).
Multinomial#
where \(x_j\in\mathbb{N}\) for all \(j\).
class MultinomialNB:
def __init__(self, labels: set[str], tokens: set[str]):
self.labels = labels
self.tokens = tokens
self.prior = {}
self.likelihood = {}
def initialize(self):
""" Initializes model parameters from random numbers """
# randomly sample numbers from a uniform distribution
distribution = [random.uniform(0.01, 0.99) for _ in self.labels]
# calculate normalizing constant for prior distribution
denominator = sum(distribution)
# assign normalized probabilities to prior distribution
for label, probability in zip(self.labels, distribution):
self.prior[label] = probability / denominator
for label in self.labels:
if label not in self.likelihood:
self.likelihood[label] = {}
# randomly sample numbers from a uniform distribution
distribution = [random.uniform(0.01, 0.99) for _ in self.tokens]
# calculate normalizing constant for likelihood distribution
denominator = sum(distribution)
# assign normalized probabilities to likelihood distribution
for token, probability in zip(self.tokens, distribution):
self.likelihood[label][token] = probability / denominator
def train(self, data: list[tuple[str, str]]) -> None:
""" Trains a multinomial Naive Bayes classifier using MAP estimation
Inputs:
data - list of (document, label) pairs
"""
# count the number of occurrences of labels and tokens
count, total = {}, Counter()
for document, label in data:
if label not in count:
count[label] = Counter()
# Add your solution here
# update the model parameters from the frequency estimates
for label in self.labels:
probability = total[label] / len(data)
self.prior[label] = probability
self.likelihood[label] = {}
for token in self.tokens:
# Add your solution here
self.likelihood[label][token] = probability
def calculate_posterior(self, document: str) -> dict[str, float]:
""" Calculates a probability distribution over labels
Inputs:
document - string of text
Outputs:
posterior - probability distribution over labels
"""
# calculate the posterior distribution from the model parameters
posterior = {}
for label in self.likelihood:
posterior[label] = math.log(self.prior[label])
# Add your solution here
return posterior
def classify(self, document: str) -> str:
""" Classifies a document using a Bernoulli Naive Bayes classifier
Inputs:
document - string of text
Outputs:
label - document label
"""
# calculate the posterior distribution over labels
posterior = self.calculate_posterior(document)
# return the label with the highest probability
return max(posterior, key=posterior.get) # argmax(posterior)
# construct a multinomial Naive Bayes classifier
model = MultinomialNB(labels, tokens)
model.train(train_data)
# calculate the accuracy using the validation data
count, total = 0, 0
for text, label in valid_data:
if model.classify(text) == label:
count += 1
total += 1
print(f'{count / total * 100}% Accuracy')
100.0% Accuracy
Expectation Maximization Algorithm#
In statistics, a random variable that is never observed is called a latent variable. The expectation maximization (EM) algorithm is an iterative method for performing maximum a posteriori (or maximum likelihood) estimation for a latent variable model. First, the initial parameters \(\theta^0\) are chosen at random. Then, new parameters \(\theta^t\) are calculated from the previous posterior distribution \(p(y\mid x^{(i)};\theta^{t-1})\) at each iteration. The EM algorithm is guaranteed to converge, but only to a local solution. Note, both the initial parameters and the posteriors must be normalized. The parameters for Naive Bayes are $\( q^t(y)=\frac{1}{n}\sum_{i=1}^n\delta(y\mid i)\quad\text{ and }\quad q^t_j(x\mid y)=\frac{\sum_{i:x^{(i)}_j=x}\delta(y\mid i)}{\sum_{i=1}^n\delta(y\mid i)} \)\( where \)\( \delta(y\mid i)=p(y\mid x^{(i)};\theta^{t-1})=\frac{q^{t-1}(y)\prod_{j=1}^dq^{t-1}_j(x^{(i)}_j\mid y)}{\sum_{y=1}^kq^{t-1}(y)\prod_{j=1}^dq^{t-1}_j(x^{(i)}_j\mid y)}. \)\( Note, these parameter formulas are equivalent to those for fully observed data if we define \)\delta(y\mid i)=1\( if \)y_i=y\( and \)0$ otherwise.
Pseudocode#
Inputs:
data - training examples
n - number of documents
k - number of labels
T - number of iterations
Initialization:
Set model parameters (prior and likelihood) to random values
Algorithm:
for t = 1:T
for i = 1:n, y = 1:k
calculate posterior distribution from model parameters
update model parameters using normalized posterior distribution
Output:
model parameters - prior and likelihood
Python Implementation#
def logsumexp(data: list[float]) -> float:
""" Calculates the log of the sum of exponentials of the input data
Inputs:
data - list of numbers
Outputs:
the log of the sum of exponentials of the input data
"""
# calculate the maximum value of the input data
c = max(data)
# shift the exponential arguments to avoid underflow
return c + math.log(sum(math.exp(x - c) for x in data))
def expectation_maximization(model: BernoulliNB, data: list[str], k: int, T: int) -> None:
""" Expectation Maximization Algorithm for Bernoulli Naive Bayes """
for t in range(T):
# estimate posterior distribution
posterior = {}
for i, document in enumerate(data):
posterior[i] = model.calculate_posterior(document)
# normalize posterior distribution
for i in range(len(data)):
denominator = logsumexp([posterior[i][label] for label in model.labels])
for label in model.labels:
posterior[i][label] = math.exp(posterior[i][label] - denominator)
# update parameters (prior and likelihood)
print(f'[{t}] {[model.prior[label] for label in model.labels]}')
for label in model.labels:
marginal = sum(posterior[i][label] for i in range(len(data)))
model.prior[label] = marginal / len(data)
for token in model.tokens:
probability = sum(posterior[i][label] * document.count(token) for i, document in enumerate(data))
model.likelihood[label][token] = (probability + 1) / (marginal + 2)
print(f'[{T}] {[model.prior[label] for label in model.labels]}')
Unsupervised Learning (with Unlabeled Data)#
unlabeled_data = [['A', 'B'], ['C', 'D'], ['C', 'D'], ['A', 'B'], ['A', 'B']]
# extract the labels and tokens from the data
labels, tokens = set([1, 2]), set()
for document in unlabeled_data:
for token in document:
tokens.add(token)
# construct a Bernoulli Naive Bayes classifier
model = BernoulliNB(labels, tokens)
model.initialize()
# apply the EM algorithm to the unlabled data
expectation_maximization(model, unlabeled_data, 2, 9)
[0] [0.05035805143403056, 0.9496419485659694]
[1] [0.5684630939482381, 0.4315369060517618]
[2] [0.5966452242425817, 0.40335477575741824]
[3] [0.598941474959691, 0.4010585250403089]
[4] [0.5991012977476698, 0.40089870225233026]
[5] [0.5991127955638306, 0.40088720443616943]
[6] [0.5991136736773947, 0.4008863263226054]
[7] [0.5991137447735392, 0.4008862552264606]
[8] [0.599113750820036, 0.4008862491799639]
[9] [0.5991137513539058, 0.40088624864609423]
print(f"label({['A', 'B']}) =", model.classify(['A', 'B']))
print(model.calculate_posterior(['A', 'B', 'C', 'C', 'C']))
label(['A', 'B']) = 1
{1: -2.787002126164065, 2: -5.308464141745352}
Unfortunately, we cannot apply the EM algorithm for unsupervised language identification (see McCallum et al.). Instead, we verified that the EM algorithm converges by providing training data with a known conditional distribution and comparing that against the output distribution from EM.
Conclusion#
In this tutorial notebook, we covered the Naive Bayes classifier, maximum a posteriori estimation, and the expectation maximization algorithm. The Naive Bayes classifier is a statistical model for classification based on Bayes’ theorem and the Naive Bayes independence assumption. MAP (or MLE) estimation is a statistical technique for estimating the parameters of a posterior (or likelihood) distribution given fully or partially observed data. The EM algorithm is an interative method for calculating new parameters or MAP (or MLE) estimates given unobserved data in a latent model.
References#
Deisenroth, M. P., Ong, C. S., & Faisa, A. A. (n.d.). Mathematics for Machine Learning. https://mml-book.github.io/book/mml-book.pdf
Collins, M. (n.d.). The Naive Bayes Model, Maximum-Likelihood Estimation, and the EM Algorithm. http://web2.cs.columbia.edu/~mcollins/em.pdf
Jurafsky, D., & Martin, J. H. (n.d.). Speech and Language Processing. https://web.stanford.edu/~jurafsky/slp3/
Papariello, L. (n.d.). Language Identification Dataset. https://huggingface.co/datasets/papluca/language-identification
McCallum A., & Nigam K. (n.d.). A Comparison of Event Models for Naive Bayes Text Classification. http://www.cs.cmu.edu/~dgovinda/pdf/multinomial-aaaiws98.pdf
Nigam K., McCallum A., & Mitchell T. (n.d.). Semi-Supervised Text Classification using EM. https://www.cs.cmu.edu/~tom/pubs/NigamEtAl-bookChapter.pdf