Expectation Maximization Algorithm and MAP Estimation#

Prepared by: Ken Sible (ksible@nd.edu, @kennethsible, 2023)

!wget -N "https://huggingface.co/datasets/papluca/language-identification/resolve/main/train.csv"
try: import nltk
except ImportError:
    !pip install nltk
    import nltk
    nltk.download('punkt')

Introduction#

In this tutorial notebook, we introduce the expectation maximization (EM) algorithm for parameter estimation given partially observed data. We use a Naive Bayes classifier for language identification as a running example throughout the notebook. In order to train the classifier, we use maximum a posteriori (MAP) estimation, a statistical optimization method for estimating the parameters of a distribution given (fully or partially) observed data.

Naive Bayes Classifier#

Let \(\{(x^{(i)},y^{(i)}):x^{(i)}\in\{+1,-1\}^d,y^{(i)}\in\{1,2,\ldots,k\}\}_{i=1}^n\) be a training set where \(n\) is the number of documents, \(k\) is the number of labels and \(d\) is the number of features. In language identification, an example of text classification, the labels are languages and the features are tokens, which are meaningful sequences of characters. We define \(x^{(i)}_j\) to be \(+1\) if document \(i\) contains token \(j\) and \(-1\) otherwise. Hence, each component of the binary vector \(x^{(i)}\) represents either the presence or absence of a particular token in document \(i\). Now, we derive the Naive Bayes model for classification.

Let \(Y\) and \(X_1,X_2,\ldots,X_d\) be random variables corresponding to the label \(y\) and the vector components \(x_1,x_2,\ldots,x_d\), respectively. The Naive Bayes classifier is a generative model, so we need to estimate the parameters of the following joint probablity distribution: $\(\mathbb{P}(Y=y,X_1=x_1,X_2=x_2,\ldots,X_d=x_d)\)\( for all labels \)y\( paired with attribute values \)x_1,x_2,\ldots,x_d\(. Suppose that \)X_j\( is independent of \)X_{j’}\( for all \)j’\neq j\(, called the **Naive Bayes assumption**, which is equivalent to assuming that the positions of tokens are independent in the documents. Then, we observe that \)$

\[\begin{align*} \mathbb{P}(Y=y,X_1=x_1,\ldots,X_d=x_d)&=\mathbb{P}(Y=y)\cdot\mathbb{P}(X_1=x_1,\ldots,X_d=x_d\mid Y=y) \\ &=\mathbb{P}(Y=y)\cdot\prod_{j=1}^d\mathbb{P}(X_j=x_j\mid X_1=x_1,\ldots,X_{j-1}=x_{j-1},Y=y) \\ &=\mathbb{P}(Y=y)\cdot\prod_{j=1}^d\mathbb{P}(X_j=x_j\mid Y=y) \\ \end{align*}\]
\[ where the final equality follows from the Naive Bayes assumption. The Naive Bayes classifier has two types of **parameters**: $q(y)$, called the **prior distribution**, and $q_j(x\mid y)$, called the **likelihood distribution**. Here, $q(y)=\mathbb{P}(Y=y)$ and $q_j(x\mid y)=\mathbb{P}(X_j=x\mid Y=y)$. Then, \]

p(y\mid x_1,\ldots,x_d)\propto q(y)\prod_{j=1}^dq_j(x_j\mid y) $\( where we define \)p(y\mid x_1,\ldots,x_d)=\mathbb{P}(Y=y\mid X_1=x_1,\ldots,X_d=x_d)\(, called the **posterior distribution**. Note, the posterior distribution and the joint distribution \)\mathbb{P}(Y=y,X_1=x_1,\ldots,X_d=x_d)\( only differs by a constant factor \)\mathbb{P}(X_1=x_1,\ldots,X_d=x_d)$, called the marginal distribution.

In order to estimate the parameters of the Naive Bayes classifier, we use maximum a posteriori (MAP) estimation, which is equivalent to maximum likelihood estimation (MLE) if the prior distribution is uniform.

Maximum A Posteriori Estimation#

Given a training set \(\{(x^{(i)},y^{(i)}):x^{(i)}\in\{+1,-1\}^d,y^{(i)}\in\{1,2,\ldots,k\}\}_{i=1}^n\), the log-likelihood function \(L(\theta)\) for the classifier is $$

\[\begin{align*} L(\theta)&=\sum_{i=1}^n\log p(x^{(i)},y^{(i)})\\ &=\sum_{i=1}^n\log \left(q(y)\prod_{j=1}^dq_j(x^{(i)}_j\mid y^{(i)})\right)\\ &=\sum_{i=1}^n\log q(y)+\sum_{i=1}^n\log\left(\prod_{j=1}^dq_j(x^{(i)}_j\mid y^{(i)})\right)\\ &=\sum_{i=1}^n\log q(y)+\sum_{i=1}^n\sum_{j=1}^d\log q_j(x^{(i)}_j\mid y^{(i)}) \end{align*}\]
\[ where $\theta$ is a parameter vector consisting of $q(y)$ and $q_j(x\mid y)$. The parameters of the classifier are then estimated by maximizing $L(\theta)$. \]
\[\begin{align*} \text{max }& \sum_{i=1}^n\log q(y)+\sum_{i=1}^n\sum_{j=1}^d\log q_j(x^{(i)}_j\mid y^{(i)})\\ \text{s.t. }& q(y)\geq 0\text{ for all }y\text{ and }\textstyle\sum_{y}q(y)=1,\\ \hphantom{\text{s.t. }}& q_j(x\mid y)\geq 0\text{ for all }j,x,y\text{ and }\textstyle\sum_{x}q_j(x\mid y)=1. \end{align*}\]
\[ By using the method of Lagrange multipliers, one can derive the following global solution for the Naive Bayes classifier: \]

q(y)=\frac{\sum_{i=1}^n[![y^{(i)}=y]!]}{n}=\frac{\text{count}(y)}{n} $\( and \)\( q_j(x\mid y)=\frac{\sum_{i=1}^n[\![y^{(i)}=y\text{ and }x^{(i)}_j=x]\!]}{\sum_{i=1}^n[\![y^{(i)}=y]\!]}=\frac{\text{count}_j(x\mid y)}{\text{count}(y)}. \)$

from collections import Counter
import random, math

class BernoulliNB:

    def __init__(self, labels: set[str], tokens: set[str]):
        self.labels = labels
        self.tokens = tokens
        self.prior = {}
        self.likelihood = {}

    def initialize(self):
        """ Initializes model parameters from random numbers """

        # randomly sample numbers from a uniform distribution
        distribution = [random.uniform(0.01, 0.99) for _ in self.labels]

        # calculate normalizing constant for prior distribution
        denominator = sum(distribution)

        # assign normalized probabilities to prior distribution
        for label, probability in zip(self.labels, distribution):
            self.prior[label] = probability / denominator

        for label in self.labels:
            if label not in self.likelihood:
                self.likelihood[label] = {}

            # randomly sample numbers from a uniform distribution
            distribution = [random.uniform(0.01, 0.99) for _ in self.tokens]

            # assign normalized probabilities to likelihood distribution
            for token, probability in zip(self.tokens, distribution):
                self.likelihood[label][token] = probability

    def train(self, data: list[tuple[str, str]]) -> None:
        """ Trains a Bernoulli Naive Bayes classifier using MAP estimation
        
            Inputs:
                data - list of (document, label) pairs
        """

        # count the number of occurrences of labels and tokens
        count, total = {}, Counter()
        for document, label in data:
            if label not in count:
                count[label] = Counter()
            for token in set(document):
                count[label][token] += 1
            total[label] += 1

        # update the model parameters from the frequency estimates
        for label in self.labels:
            probability = total[label] / len(data)
            self.prior[label] = probability

            self.likelihood[label] = {}
            for token in self.tokens:
                # apply add-one Laplace smoothing for a Bernoulli distribution
                probability = (count[label][token] + 1) / (total[label] + 2)
                self.likelihood[label][token] = probability

    def calculate_posterior(self, document: str) -> dict[str, float]:
        """ Calculates a probability distribution over labels

            Inputs:
                document - string of text

            Outputs:
                posterior - probability distribution over labels
        """

        # calculate the posterior distribution from the model parameters
        document, posterior = set(document), {}
        for label in self.labels:
            posterior[label] = math.log(self.prior[label])

            for token in self.tokens:
                # switch to log space to avoid arithmetic underflow
                if token in document:
                    posterior[label] += math.log(self.likelihood[label][token])
                else:
                    posterior[label] += math.log(1 - self.likelihood[label][token])

        return posterior

    def classify(self, document: str) -> str:
        """ Classifies a document using a Bernoulli Naive Bayes classifier

            Inputs:
                document - string of text

            Outputs:
                label - document label
        """

        # calculate the posterior distribution over labels
        posterior = self.calculate_posterior(document)

        # return the label with the highest probability
        return max(posterior, key=posterior.get) # argmax(posterior)

Supervised Learning (with Labeled Data)#

import csv

# import the language identification data
data, train_data, valid_data = {}, [], []
with open('train.csv') as csv_file:
    for i, sample in enumerate(csv.DictReader(csv_file)):
        if sample['labels'] in ('en', 'de', 'fr', 'es'):
            if sample['labels'] not in data:
                data[sample['labels']] = []
            document = nltk.word_tokenize(sample['text'])
            data[sample['labels']].append(document)

# split the data into training and validation
train_data = [(x_i, y) for y, x in data.items() for x_i in x[:75]]
valid_data = [(x_i, y) for y, x in data.items() for x_i in x[75:100]]

# extract the labels and tokens from the data
labels, tokens = set(), set()
for document, label in train_data:
    labels.add(label)
    for token in document:
        tokens.add(token)
# construct a Bernoulli Naive Bayes classifier
model = BernoulliNB(labels, tokens)
model.train(train_data)

# calculate the accuracy using the validation data
count, total = 0, 0
for text, label in valid_data:
    if model.classify(text) == label:
        count += 1
    total += 1
print(f'{count / total * 100}% Accuracy')
96.0% Accuracy

Exercise 1#

In practice, we use subwords of length \(n\), called \(n\)-grams, instead of entire words for language identification. In the Python implementation, change the features from entire words to bigrams (\(n=2\)) or trigrams (\(n=3\)), and compare both the accuracy and the number of parameters.

Hint: In Cell [4], replace nltk.word_tokenize(text) with list(nltk.bigrams(text)) or list(nltk.trigrams(text)).

Exercise 2#

In the Python implementation, we used add-one Laplace smoothing (see below). Why do we require smoothing with maximum a posteriori estimation?

\[ q_j(x\mid y)=\frac{\text{count}_j(x\mid y)+\alpha}{\text{count}(y)+\alpha d} \]

where \(\alpha\) is a hyperparameter and \(d\) is the dimension of the multinomial distribution (\(2\) for Bernoulli and \(|V|\) for categorical where \(V\) is the vocabulary).

Exercise 3#

We implemented a Bernoulli Naive Bayes classifier where each likelihood \(p(x_j\mid y)\) is a Bernoulli distribution and each \(x_j\) is binary variable representing the presence or absence of feature \(j\). However, \(p(x_j\mid y)\) can be any distribution, but Bernoulli (discrete), Multinomial (discrete) and Gaussian (continuous) are the most common. In a multinomial Naive Bayes classifier, each likelihood \(p(x_j\mid y)\) is categorical distribution and each \(x_j\) is the number of times feature \(j\) is observed. In the Python implementation, change the features \(\{x_j\}_{j=1}^k\) from binary variables to frequency counts and update the likelihoods.

Hint: The likelihoods \(p(x_j\mid y)\) are estimated by calculating the fraction of documents for Bernoulli and the fraction of tokens for multinomial (see below).

Bernoulli#

\[ p(y\mid x_1,\ldots,x_d)\propto q(y)\prod_{j=1}^d[q_j(x_j\mid y)]^{x_j}\cdot[1-q_j(x_j\mid y)]^{(1-x_j)} \]

where \(x_j\in\{+1,-1\}\) for all \(j\).

Multinomial#

\[ p(y\mid x_1,\ldots,x_d)\propto q(y)\prod_{j=1}^d[q_j(x_j\mid y)]^{x_j} \]

where \(x_j\in\mathbb{N}\) for all \(j\).

class MultinomialNB:

    def __init__(self, labels: set[str], tokens: set[str]):
        self.labels = labels
        self.tokens = tokens
        self.prior = {}
        self.likelihood = {}

    def initialize(self):
        """ Initializes model parameters from random numbers """

        # randomly sample numbers from a uniform distribution
        distribution = [random.uniform(0.01, 0.99) for _ in self.labels]

        # calculate normalizing constant for prior distribution
        denominator = sum(distribution)

        # assign normalized probabilities to prior distribution
        for label, probability in zip(self.labels, distribution):
            self.prior[label] = probability / denominator

        for label in self.labels:
            if label not in self.likelihood:
                self.likelihood[label] = {}

            # randomly sample numbers from a uniform distribution
            distribution = [random.uniform(0.01, 0.99) for _ in self.tokens]

            # calculate normalizing constant for likelihood distribution
            denominator = sum(distribution)

            # assign normalized probabilities to likelihood distribution
            for token, probability in zip(self.tokens, distribution):
                self.likelihood[label][token] = probability / denominator

    def train(self, data: list[tuple[str, str]]) -> None:
        """ Trains a multinomial Naive Bayes classifier using MAP estimation
        
            Inputs:
                data - list of (document, label) pairs
        """

        # count the number of occurrences of labels and tokens
        count, total = {}, Counter()
        for document, label in data:
            if label not in count:
                count[label] = Counter()
            # Add your solution here

        # update the model parameters from the frequency estimates
        for label in self.labels:
            probability = total[label] / len(data)
            self.prior[label] = probability

            self.likelihood[label] = {}
            for token in self.tokens:
                # Add your solution here
                self.likelihood[label][token] = probability

    def calculate_posterior(self, document: str) -> dict[str, float]:
        """ Calculates a probability distribution over labels

            Inputs:
                document - string of text

            Outputs:
                posterior - probability distribution over labels
        """

        # calculate the posterior distribution from the model parameters
        posterior = {}
        for label in self.likelihood:
            posterior[label] = math.log(self.prior[label])

            # Add your solution here

        return posterior

    def classify(self, document: str) -> str:
        """ Classifies a document using a Bernoulli Naive Bayes classifier

            Inputs:
                document - string of text

            Outputs:
                label - document label
        """

        # calculate the posterior distribution over labels
        posterior = self.calculate_posterior(document)

        # return the label with the highest probability
        return max(posterior, key=posterior.get) # argmax(posterior)
# construct a multinomial Naive Bayes classifier
model = MultinomialNB(labels, tokens)
model.train(train_data)

# calculate the accuracy using the validation data
count, total = 0, 0
for text, label in valid_data:
    if model.classify(text) == label:
        count += 1
    total += 1
print(f'{count / total * 100}% Accuracy')
100.0% Accuracy

Expectation Maximization Algorithm#

In statistics, a random variable that is never observed is called a latent variable. The expectation maximization (EM) algorithm is an iterative method for performing maximum a posteriori (or maximum likelihood) estimation for a latent variable model. First, the initial parameters \(\theta^0\) are chosen at random. Then, new parameters \(\theta^t\) are calculated from the previous posterior distribution \(p(y\mid x^{(i)};\theta^{t-1})\) at each iteration. The EM algorithm is guaranteed to converge, but only to a local solution. Note, both the initial parameters and the posteriors must be normalized. The parameters for Naive Bayes are $\( q^t(y)=\frac{1}{n}\sum_{i=1}^n\delta(y\mid i)\quad\text{ and }\quad q^t_j(x\mid y)=\frac{\sum_{i:x^{(i)}_j=x}\delta(y\mid i)}{\sum_{i=1}^n\delta(y\mid i)} \)\( where \)\( \delta(y\mid i)=p(y\mid x^{(i)};\theta^{t-1})=\frac{q^{t-1}(y)\prod_{j=1}^dq^{t-1}_j(x^{(i)}_j\mid y)}{\sum_{y=1}^kq^{t-1}(y)\prod_{j=1}^dq^{t-1}_j(x^{(i)}_j\mid y)}. \)\( Note, these parameter formulas are equivalent to those for fully observed data if we define \)\delta(y\mid i)=1\( if \)y_i=y\( and \)0$ otherwise.

Pseudocode#

Inputs:
    data - training examples
    n - number of documents
    k - number of labels
    T - number of iterations

Initialization: 
    Set model parameters (prior and likelihood) to random values

Algorithm:
    for t = 1:T
        for i = 1:n, y = 1:k
            calculate posterior distribution from model parameters
        update model parameters using normalized posterior distribution
Output:
    model parameters - prior and likelihood

Python Implementation#

def logsumexp(data: list[float]) -> float:
    """ Calculates the log of the sum of exponentials of the input data

        Inputs:
            data - list of numbers

        Outputs:
            the log of the sum of exponentials of the input data
    """

    # calculate the maximum value of the input data
    c = max(data)

    # shift the exponential arguments to avoid underflow
    return c + math.log(sum(math.exp(x - c) for x in data))

def expectation_maximization(model: BernoulliNB, data: list[str], k: int, T: int) -> None:
    """ Expectation Maximization Algorithm for Bernoulli Naive Bayes """

    for t in range(T):
        # estimate posterior distribution
        posterior = {}
        for i, document in enumerate(data):
            posterior[i] = model.calculate_posterior(document)

        # normalize posterior distribution
        for i in range(len(data)):
            denominator = logsumexp([posterior[i][label] for label in model.labels])
            for label in model.labels:
                posterior[i][label] = math.exp(posterior[i][label] - denominator)

        # update parameters (prior and likelihood)
        print(f'[{t}] {[model.prior[label] for label in model.labels]}')
        for label in model.labels:
            marginal = sum(posterior[i][label] for i in range(len(data)))
            model.prior[label] = marginal / len(data)
            for token in model.tokens:
                probability = sum(posterior[i][label] * document.count(token) for i, document in enumerate(data))
                model.likelihood[label][token] = (probability + 1) / (marginal + 2)

    print(f'[{T}] {[model.prior[label] for label in model.labels]}')

Unsupervised Learning (with Unlabeled Data)#

unlabeled_data = [['A', 'B'], ['C', 'D'], ['C', 'D'], ['A', 'B'], ['A', 'B']]

# extract the labels and tokens from the data
labels, tokens = set([1, 2]), set()
for document in unlabeled_data:
    for token in document:
        tokens.add(token)

# construct a Bernoulli Naive Bayes classifier
model = BernoulliNB(labels, tokens)
model.initialize()

# apply the EM algorithm to the unlabled data
expectation_maximization(model, unlabeled_data, 2, 9)
[0] [0.05035805143403056, 0.9496419485659694]
[1] [0.5684630939482381, 0.4315369060517618]
[2] [0.5966452242425817, 0.40335477575741824]
[3] [0.598941474959691, 0.4010585250403089]
[4] [0.5991012977476698, 0.40089870225233026]
[5] [0.5991127955638306, 0.40088720443616943]
[6] [0.5991136736773947, 0.4008863263226054]
[7] [0.5991137447735392, 0.4008862552264606]
[8] [0.599113750820036, 0.4008862491799639]
[9] [0.5991137513539058, 0.40088624864609423]
print(f"label({['A', 'B']}) =", model.classify(['A', 'B']))
print(model.calculate_posterior(['A', 'B', 'C', 'C', 'C']))
label(['A', 'B']) = 1
{1: -2.787002126164065, 2: -5.308464141745352}

Unfortunately, we cannot apply the EM algorithm for unsupervised language identification (see McCallum et al.). Instead, we verified that the EM algorithm converges by providing training data with a known conditional distribution and comparing that against the output distribution from EM.

Conclusion#

In this tutorial notebook, we covered the Naive Bayes classifier, maximum a posteriori estimation, and the expectation maximization algorithm. The Naive Bayes classifier is a statistical model for classification based on Bayes’ theorem and the Naive Bayes independence assumption. MAP (or MLE) estimation is a statistical technique for estimating the parameters of a posterior (or likelihood) distribution given fully or partially observed data. The EM algorithm is an interative method for calculating new parameters or MAP (or MLE) estimates given unobserved data in a latent model.

References#