Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
""" Hidden Markov Models
This is a complete pure-Cython optimized implementation of Hidden Markov Models. It fully supports Discrete, Gaussian, and Mixed Gaussian emissions.
The best references for the basic HMM algorithms implemented here are:
- Tapas Kanungo's "Hidden Markov Models"
- Jackson's HMM tutorial: http://personal.ee.surrey.ac.uk/Personal/P.Jackson/tutorial/
LICENSE: Some of the code in this file is based on reading Kanungo's GPLv2+ implementation of discrete HMM's, hence the present code must be licensed with a GPLv2+ compatible license.
AUTHOR:
- William Stein, 2010-03 """
#***************************************************************************** # Copyright (C) 2010 William Stein <wstein@gmail.com> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # http://www.gnu.org/licenses/ #*****************************************************************************
from __future__ import print_function
from libc.math cimport log from cysignals.signals cimport sig_on, sig_off
from sage.finance.time_series cimport TimeSeries from sage.misc.randstate cimport current_randstate, randstate from cpython.object cimport PyObject_RichCompare
from .util cimport HMM_Util
###########################################
cdef class HiddenMarkovModel: """ Abstract base class for all Hidden Markov Models. """ def initial_probabilities(self): """ Return the initial probabilities, which as a TimeSeries of length N, where N is the number of states of the Markov model.
EXAMPLES::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.1,0.9],[0.5,0.5]], [.2,.8]) sage: pi = m.initial_probabilities(); pi [0.2000, 0.8000] sage: type(pi) <... 'sage.finance.time_series.TimeSeries'>
The returned time series is a copy, so changing it does not change the model.
sage: pi[0] = .1; pi[1] = .9 sage: m.initial_probabilities() [0.2000, 0.8000]
Some other models::
sage: hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,1), (-1,1)], [.1,.9]).initial_probabilities() [0.1000, 0.9000] sage: hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3]).initial_probabilities() [0.7000, 0.3000] """
def transition_matrix(self): """ Return the state transition matrix.
OUTPUT:
- a Sage matrix with real double precision (RDF) entries.
EXAMPLES::
sage: M = hmm.DiscreteHiddenMarkovModel([[0.7,0.3],[0.9,0.1]], [[0.5,.5],[.1,.9]], [0.3,0.7]) sage: T = M.transition_matrix(); T [0.7 0.3] [0.9 0.1]
The returned matrix is mutable, but changing it does not change the transition matrix for the model::
sage: T[0,0] = .1; T[0,1] = .9 sage: M.transition_matrix() [0.7 0.3] [0.9 0.1]
Transition matrices for other types of models::
sage: hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,1), (-1,1)], [.5,.5]).transition_matrix() [0.1 0.9] [0.5 0.5] sage: hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3]).transition_matrix() [0.9 0.1] [0.4 0.6] """
def graph(self, eps=1e-3): """ Create a weighted directed graph from the transition matrix, not including any edge with a probability less than eps.
INPUT:
- eps -- nonnegative real number
OUTPUT:
- a digraph
EXAMPLES::
sage: m = hmm.DiscreteHiddenMarkovModel([[.3,0,.7],[0,0,1],[.5,.5,0]], [[.5,.5,.2]]*3, [1/3]*3) sage: G = m.graph(); G Looped digraph on 3 vertices sage: G.edges() [(0, 0, 0.3), (0, 2, 0.7), (1, 2, 1.0), (2, 0, 0.5), (2, 1, 0.5)] sage: G.plot() Graphics object consisting of 11 graphics primitives """ cdef int i, j
def sample(self, Py_ssize_t length, number=None, starting_state=None): """ Return number samples from this HMM of given length.
INPUT:
- ``length`` -- positive integer - ``number`` -- (default: None) if given, compute list of this many sample sequences - ``starting_state`` -- int (or None); if specified then generate a sequence using this model starting with the given state instead of the initial probabilities to determine the starting state.
OUTPUT:
- if number is not given, return a single TimeSeries. - if number is given, return a list of TimeSeries.
EXAMPLES::
sage: set_random_seed(0) sage: a = hmm.DiscreteHiddenMarkovModel([[0.1,0.9],[0.1,0.9]], [[1,0],[0,1]], [0,1]) sage: print(a.sample(10, 3)) [[1, 0, 1, 1, 1, 1, 0, 1, 1, 1], [1, 1, 0, 0, 1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 0, 1, 0, 1, 1, 1]] sage: a.sample(15) [1, 1, 1, 1, 0 ... 1, 1, 1, 1, 1] sage: a.sample(3, 1) [[1, 1, 1]] sage: list(a.sample(1000)).count(0) 88
If the emission symbols are set::
sage: set_random_seed(0) sage: a = hmm.DiscreteHiddenMarkovModel([[0.5,0.5],[0.1,0.9]], [[1,0],[0,1]], [0,1], ['up', 'down']) sage: a.sample(10) ['down', 'up', 'down', 'down', 'down', 'down', 'up', 'up', 'up', 'up']
Force a starting state::
sage: set_random_seed(0); a.sample(10, starting_state=0) ['up', 'up', 'down', 'down', 'down', 'down', 'up', 'up', 'up', 'up'] """
cdef Py_ssize_t i
######################################################### # Some internal functions used for various general # HMM algorithms. ######################################################### cdef TimeSeries _baum_welch_gamma(self, TimeSeries alpha, TimeSeries beta): """ Used internally to compute the scaled quantity gamma_t(j) appearing in the Baum-Welch reestimation algorithm.
The quantity gamma_t(j) is the (scaled!) probability of being in state j at time t, given the observation sequence.
INPUT:
- ``alpha`` -- TimeSeries as output by the scaled forward algorithm - ``beta`` -- TimeSeries as output by the scaled backward algorithm
OUTPUT:
- TimeSeries gamma such that gamma[t*N+j] is gamma_t(j). """ cdef double denominator
cdef class DiscreteHiddenMarkovModel(HiddenMarkovModel): """ A discrete Hidden Markov model implemented using double precision floating point arithmetic.
INPUT:
- ``A`` -- a list of lists or a square N x N matrix, whose (i,j) entry gives the probability of transitioning from state i to state j.
- ``B`` -- a list of N lists or a matrix with N rows, such that B[i,k] gives the probability of emitting symbol k while in state i.
- ``pi`` -- the probabilities of starting in each initial state, i.e,. pi[i] is the probability of starting in state i.
- ``emission_symbols`` -- None or list (default: None); if None, the emission_symbols are the ints [0..N-1], where N is the number of states. Otherwise, they are the entries of the list emissions_symbols, which must all be hashable.
- ``normalize`` --bool (default: True); if given, input is normalized to define valid probability distributions, e.g., the entries of A are made nonnegative and the rows sum to 1, and the probabilities in pi are normalized.
EXAMPLES::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.1,0.9],[0.5,0.5]], [.5,.5]); m Discrete Hidden Markov Model with 2 States and 2 Emissions Transition matrix: [0.4 0.6] [0.1 0.9] Emission matrix: [0.1 0.9] [0.5 0.5] Initial probabilities: [0.5000, 0.5000] sage: m.log_likelihood([0,1,0,1,0,1]) -4.66693474691329... sage: m.viterbi([0,1,0,1,0,1]) ([1, 1, 1, 1, 1, 1], -5.378832842208748) sage: m.baum_welch([0,1,0,1,0,1]) (0.0, 22) sage: m # rel tol 1e-10 Discrete Hidden Markov Model with 2 States and 2 Emissions Transition matrix: [1.0134345614745788e-70 1.0] [ 1.0 3.9974352713558623e-19] Emission matrix: [ 7.380221566254936e-54 1.0] [ 1.0 3.9974352626002193e-19] Initial probabilities: [0.0000, 1.0000] sage: m.sample(10) [0, 1, 0, 1, 0, 1, 0, 1, 0, 1] sage: m.graph().plot() Graphics object consisting of 6 graphics primitives
A 3-state model that happens to always outputs 'b'::
sage: m = hmm.DiscreteHiddenMarkovModel([[1/3]*3]*3, [[0,1,0]]*3, [1/3]*3, ['a','b','c']) sage: m.sample(10) ['b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b'] """ cdef TimeSeries B cdef int n_out cdef object _emission_symbols, _emission_symbols_dict
def __init__(self, A, B, pi, emission_symbols=None, bint normalize=True): """ Create a discrete emissions HMM with transition probability matrix A, emission probabilities given by B, initial state probabilities pi, and given emission symbols (which default to the first few nonnegative integers).
EXAMPLES::
sage: hmm.DiscreteHiddenMarkovModel([.5,0,-1,.5], [[1],[1]],[.5,.5]).transition_matrix() [1.0 0.0] [0.0 1.0] sage: hmm.DiscreteHiddenMarkovModel([0,0,.1,.9], [[1],[1]],[.5,.5]).transition_matrix() [0.5 0.5] [0.1 0.9] sage: hmm.DiscreteHiddenMarkovModel([-1,-2,.1,.9], [[1],[1]],[.5,.5]).transition_matrix() [0.5 0.5] [0.1 0.9] sage: hmm.DiscreteHiddenMarkovModel([1,2,.1,1.2], [[1],[1]],[.5,.5]).transition_matrix() [ 0.3333333333333333 0.6666666666666666] [0.07692307692307693 0.923076923076923]
"""
raise ValueError("number of rows of B must equal number of states") raise ValueError("number of emission symbols must equal number of output states") cdef Py_ssize_t i
def __reduce__(self): """ Used in pickling.
EXAMPLES::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.0,1.0],[1,1]], [0,1], ['a','b']) sage: loads(dumps(m)) == m True """
def __richcmp__(self, other, op): """ EXAMPLES::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.0,1.0],[0.5,0.5]], [.5,.5]) sage: n = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.0,1.0],[0.5,0.5]], [1,0]) sage: m == n False sage: m == m True sage: m < n True sage: n < m False """ return NotImplemented
def emission_matrix(self): """ Return the matrix whose i-th row specifies the emission probability distribution for the i-th state. More precisely, the i,j entry of the matrix is the probability of the Markov model outputing the j-th symbol when it is in the i-th state.
OUTPUT:
- a Sage matrix with real double precision (RDF) entries.
EXAMPLES::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.1,0.9],[0.5,0.5]], [.5,.5]) sage: E = m.emission_matrix(); E [0.1 0.9] [0.5 0.5]
The returned matrix is mutable, but changing it does not change the transition matrix for the model::
sage: E[0,0] = 0; E[0,1] = 1 sage: m.emission_matrix() [0.1 0.9] [0.5 0.5] """
def __repr__(self): r""" Return string representation of this discrete hidden Markov model.
EXAMPLES::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.1,0.9],[0.5,0.5]], [.2,.8]) sage: m.__repr__() 'Discrete Hidden Markov Model with 2 States and 2 Emissions\nTransition matrix:\n[0.4 0.6]\n[0.1 0.9]\nEmission matrix:\n[0.1 0.9]\n[0.5 0.5]\nInitial probabilities: [0.2000, 0.8000]' """
def _emission_symbols_to_IntList(self, obs): """ Internal function used to convert a list of emission symbols to an IntList.
INPUT:
- obs -- a list of objects
OUTPUT:
- an IntList
EXAMPLES::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.1,0.9],[0.5,0.5]], [.2,.8], ['smile', 'frown']) sage: m._emission_symbols_to_IntList(['frown','smile']) [1, 0] """
def _IntList_to_emission_symbols(self, obs): """ Internal function used to convert a list of emission symbols to an IntList.
INPUT:
- obs -- a list of objects
OUTPUT:
- an IntList
EXAMPLES::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.1,0.9],[0.5,0.5]], [.2,.8], ['smile', 'frown']) sage: m._IntList_to_emission_symbols(stats.IntList([0,0,1,0])) ['smile', 'smile', 'frown', 'smile'] """
def log_likelihood(self, obs, bint scale=True): """ Return the logarithm of the probability that this model produced the given observation sequence. Thus the output is a non-positive number.
INPUT:
- ``obs`` -- sequence of observations
- ``scale`` -- boolean (default: True); if True, use rescaling to overoid loss of precision due to the very limited dynamic range of floats. You should leave this as True unless the obs sequence is very small.
EXAMPLES::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.1,0.9],[0.5,0.5]], [.2,.8]) sage: m.log_likelihood([0, 1, 0, 1, 1, 0, 1, 0, 0, 0]) -7.3301308009370825 sage: m.log_likelihood([0, 1, 0, 1, 1, 0, 1, 0, 0, 0], scale=False) -7.330130800937082 sage: m.log_likelihood([]) 0.0
sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.1,0.9],[0.5,0.5]], [.2,.8], ['happy','sad']) sage: m.log_likelihood(['happy','happy']) -1.6565295199679506 sage: m.log_likelihood(['happy','sad']) -1.4731602941415523
Overflow from not using the scale option::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.1,0.9],[0.5,0.5]], [.2,.8]) sage: m.log_likelihood([0,1]*1000, scale=True) -1433.820666652728 sage: m.log_likelihood([0,1]*1000, scale=False) -inf """ else:
def _forward(self, IntList obs): """ Memory-efficient implementation of the forward algorithm, without scaling.
INPUT:
- ``obs`` -- an integer list of observation states.
OUTPUT:
- ``float`` -- the log of the probability that the model produced this sequence
EXAMPLES::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.1,0.9],[0.5,0.5]], [.2,.8]) sage: m._forward(stats.IntList([0,1]*10)) -14.378663512219456
The forward algorithm computes the log likelihood::
sage: m.log_likelihood(stats.IntList([0,1]*10), scale=False) -14.378663512219456
But numerical overflow will happen (without scaling) for long sequences::
sage: m._forward(stats.IntList([0,1]*1000)) -inf """ raise ValueError("invalid observation sequence, since it includes unknown states")
# Initialization
# Induction cdef double s
# Termination
def _forward_scale(self, IntList obs): """ Memory-efficient implementation of the forward algorithm, with scaling.
INPUT:
- ``obs`` -- an integer list of observation states.
OUTPUT:
- ``float`` -- the log of the probability that the model produced this sequence
EXAMPLES::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.1,0.9],[0.5,0.5]], [.2,.8]) sage: m._forward_scale(stats.IntList([0,1]*10)) -14.378663512219454
The forward algorithm computes the log likelihood::
sage: m.log_likelihood(stats.IntList([0,1]*10)) -14.378663512219454
Note that the scale algorithm ensures that numerical overflow won't happen for long sequences like it does with the forward non-scaled algorithm::
sage: m._forward_scale(stats.IntList([0,1]*1000)) -1433.820666652728 sage: m._forward(stats.IntList([0,1]*1000)) -inf
A random sequence produced by the model is more likely::
sage: set_random_seed(0); v = m.sample(1000) sage: m._forward_scale(v) -686.8753189365056 """ # This is just like self._forward(obs) above, except at every step of the # algorithm, we rescale the vector alpha so that the sum of # the entries in alpha is 1. Then, at the end of the # algorithm, the sum of probabilities (alpha.sum()) is of # course just 1. However, the true probability that we want # is the product of the numbers that we divided by when # rescaling, since at each step of the iteration the next term # depends linearly on the previous one. Instead of returning # the product, we return the sum of the logs, which avoid # numerical error.
# The running some of the log probabilities
# Initialization
# Induction # The code below is just an optimized version of: # alpha = (alpha * A).pairwise_product(B[O[t+1]]) # alpha = alpha.scale(1/alpha.sum()) # along with keeping track of the log of the scaling factor. cdef double s
# Termination
def generate_sequence(self, Py_ssize_t length, starting_state=None): """ Return a sample of the given length from this HMM.
INPUT:
- ``length`` -- positive integer - ``starting_state`` -- int (or None); if specified then generate a sequence using this model starting with the given state instead of the initial probabilities to determine the starting state.
OUTPUT:
- an IntList or list of emission symbols - IntList of the actual states the model was in when emitting the corresponding symbols
EXAMPLES:
In this example, the emission symbols are not set::
sage: set_random_seed(0) sage: a = hmm.DiscreteHiddenMarkovModel([[0.1,0.9],[0.1,0.9]], [[1,0],[0,1]], [0,1]) sage: a.generate_sequence(5) ([1, 0, 1, 1, 1], [1, 0, 1, 1, 1]) sage: list(a.generate_sequence(1000)[0]).count(0) 90
Here the emission symbols are set::
sage: set_random_seed(0) sage: a = hmm.DiscreteHiddenMarkovModel([[0.5,0.5],[0.1,0.9]], [[1,0],[0,1]], [0,1], ['up', 'down']) sage: a.generate_sequence(5) (['down', 'up', 'down', 'down', 'down'], [1, 0, 1, 1, 1])
Specify the starting state::
sage: set_random_seed(0); a.generate_sequence(5, starting_state=0) (['up', 'up', 'down', 'down', 'down'], [0, 0, 1, 1, 1]) """ raise ValueError("length must be nonnegative")
# Create Integer lists for states and observations # A special case if self._emission_symbols is None: return states, obs else: return states, []
# Setup variables, including random state. cdef Py_ssize_t i, j cdef double r, accum
# Now choose random variables from our discrete distribution.
# This standard naive algorithm has complexity that is linear # in the number of states. It might be possible to replace it # by something that is more efficient. However, make sure to # refactor this code into distribution.pyx before doing so. # Note that state switching involves the same algorithm # (below). Use GSL as described here to make this O(1): # http://www.gnu.org/software/gsl/manual/html_node/General-Discrete-Distributions.html
# Choose initial state: else: else: raise ValueError("starting state must be between 0 and %s"%(self.N-1))
# Generate a symbol from state q
cdef double* row cdef int O # Choose next state else: # Generate symbol from this new state q
# No emission symbol mapping else: # Emission symbol mapping, so change our intlist into a list of symbols
cdef int _gen_symbol(self, int q, double r): """ Generate a symbol in state q using the randomly chosen floating point number r, which should be between 0 and 1.
INPUT:
- ``q`` -- a nonnegative integer, which specifies a state - ``r`` -- a real number between 0 and 1
OUTPUT:
- a nonnegative int
EXAMPLES::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.1,0.9],[0.9,0.1]], [.2,.8]) sage: set_random_seed(0) sage: m.generate_sequence(10) # indirect test ([0, 1, 0, 0, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 1, 1, 0, 0, 0, 0]) """ cdef Py_ssize_t j # See the comments above about switching to GSL for this; also note # that this should get factored out into a call to something # defined in distributions.pyx. else: # None of the values was selected: shouldn't this only happen if the # distribution is invalid? Anyway, this will get factored out. return self.n_out - 1
def viterbi(self, obs, log_scale=True): """ Determine "the" hidden sequence of states that is most likely to produce the given sequence seq of observations, along with the probability that this hidden sequence actually produced the observation.
INPUT:
- ``seq`` -- sequence of emitted ints or symbols
- ``log_scale`` -- bool (default: True) whether to scale the sequence in order to avoid numerical overflow.
OUTPUT:
- ``list`` -- "the" most probable sequence of hidden states, i.e., the Viterbi path.
- ``float`` -- log of probability that the observed sequence was produced by the Viterbi sequence of states.
EXAMPLES::
sage: a = hmm.DiscreteHiddenMarkovModel([[0.1,0.9],[0.1,0.9]], [[0.9,0.1],[0.1,0.9]], [0.5,0.5]) sage: a.viterbi([1,0,0,1,0,0,1,1]) ([1, 0, 0, 1, ..., 0, 1, 1], -11.06245322477221...)
We predict the state sequence when the emissions are 3/4 and 'abc'.::
sage: a = hmm.DiscreteHiddenMarkovModel([[0.1,0.9],[0.1,0.9]], [[0.9,0.1],[0.1,0.9]], [0.5,0.5], [3/4, 'abc'])
Note that state 0 is common below, despite the model trying hard to switch to state 1::
sage: a.viterbi([3/4, 'abc', 'abc'] + [3/4]*10) ([0, 1, 1, 0, 0 ... 0, 0, 0, 0, 0], -25.299405845367794) """ else: return self._viterbi(obs)
cpdef _viterbi(self, IntList obs): """ Used internally to compute the viterbi path, without rescaling. This can be useful for short sequences.
INPUT:
- ``obs`` -- IntList
OUTPUT:
- IntList (most likely state sequence)
- log of probability that the observed sequence was produced by the Viterbi sequence of states.
EXAMPLES::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.1,0.9],[0.9,0.1]], [[1,0],[0,1]], [.2,.8]) sage: m._viterbi(stats.IntList([1]*5)) ([1, 1, 1, 1, 1], -9.433483923290392) sage: m._viterbi(stats.IntList([0]*5)) ([0, 0, 0, 0, 0], -10.819778284410283)
Long sequences will overflow::
sage: m._viterbi(stats.IntList([0]*1000)) ([0, 0, 0, 0, 0 ... 0, 0, 0, 0, 0], -inf) """ return state_sequence, 0.0
# delta[i] is the maximum of the probabilities over all # paths ending in state i.
# We view psi as an N x T matrix of ints. The quantity # psi[N*t + j] # is a most probable hidden state at time t-1, given # that j is a most probable state at time j.
# Initialization:
# Recursion: cdef double mx, tmp cdef int index # delta_t[j] = max_i(delta_{t-1}(i) a_{i,j}) * b_j(obs[t])
# Termination:
# Path (state sequence) backtracking:
cpdef _viterbi_scale(self, IntList obs): """ Used internally to compute the viterbi path with rescaling.
INPUT:
- obs -- IntList
OUTPUT:
- IntList (most likely state sequence)
- log of probability that the observed sequence was produced by the Viterbi sequence of states.
EXAMPLES::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.1,0.9],[0.9,0.1]], [[.5,.5],[0,1]], [.2,.8]) sage: m._viterbi_scale(stats.IntList([1]*10)) ([1, 0, 1, 0, 1, 0, 1, 0, 1, 0], -4.637124095034373)
Long sequences should not overflow::
sage: m._viterbi_scale(stats.IntList([1]*1000)) ([1, 0, 1, 0, 1 ... 0, 1, 0, 1, 0], -452.05188897345...) """ # The algorithm is the same as _viterbi above, except # we take the logarithms of everything first, and add # instead of multiply. return state_sequence, 0.0
# delta[i] is the maximum of the probabilities over all # paths ending in state i.
# We view psi as an N x T matrix of ints. The quantity # psi[N*t + j] # is a most probable hidden state at time t-1, given # that j is a most probable state at time j.
# Log Preprocessing
# Initialization:
# Recursion: cdef int index
# delta_t[j] = max_i(delta_{t-1}(i) a_{i,j}) * b_j(obs[t])
# Termination:
# Path (state sequence) backtracking:
cdef TimeSeries _backward_scale_all(self, IntList obs, TimeSeries scale): r""" Return the scaled matrix of values `\beta_t(i)` that appear in the backtracking algorithm. This function is used internally by the Baum-Welch algorithm.
The matrix is stored as a TimeSeries T, such that `\beta_t(i) = T[t*N + i]` where N is the number of states of the Hidden Markov Model.
The quantity beta_t(i) is the probability of observing the sequence obs[t+1:] assuming that the model is in state i at time t.
INPUT:
- ``obs`` -- IntList - ``scale`` -- series that is *changed* in place, so that after calling this function, scale[t] is value that is used to scale each of the `\beta_t(i)`.
OUTPUT:
- a TimeSeries of values beta_t(i). - the input object scale is modified """ cdef double s
# 1. Initialization
# 2. Induction self.B._values[j*self.n_out+obs._values[t+1]] * beta._values[(t+1)*N+j]
cdef _forward_scale_all(self, IntList obs): """ Return scaled values alpha_t(i), the sequence of scalings, and the log probability.
INPUT:
- ``obs`` -- IntList
OUTPUT:
- TimeSeries alpha with alpha_t(i) = alpha[t*N + i] - TimeSeries scale with scale[t] the scaling at step t - float -- log_probability of the observation sequence being produced by the model. """
# The running some of the log probabilities
# Initialization
# Induction # The code below is just an optimized version of: # alpha = (alpha * A).pairwise_product(B[O[t+1]]) # alpha = alpha.scale(1/alpha.sum()) # along with keeping track of the log of the scaling factor. cdef double s
# Termination
cdef TimeSeries _baum_welch_xi(self, TimeSeries alpha, TimeSeries beta, IntList obs): """ Used internally to compute the scaled quantity xi_t(i,j) appearing in the Baum-Welch reestimation algorithm.
INPUT:
- ``alpha`` -- TimeSeries as output by the scaled forward algorithm - ``beta`` -- TimeSeries as output by the scaled backward algorithm - ``obs ``-- IntList of observations
OUTPUT:
- TimeSeries xi such that xi[t*N*N + i*N + j] = xi_t(i,j). """ cdef double sum self.A._values[i*N + j] * self.B._values[j*self.n_out + obs._values[t+1]]
def baum_welch(self, obs, int max_iter=100, double log_likelihood_cutoff=1e-4, bint fix_emissions=False): """ Given an observation sequence obs, improve this HMM using the Baum-Welch algorithm to increase the probability of observing obs.
INPUT:
- ``obs`` -- list of emissions
- ``max_iter`` -- integer (default: 100) maximum number of Baum-Welch steps to take
- ``log_likelihood_cutoff`` -- positive float (default: 1e-4); the minimal improvement in likelihood with respect to the last iteration required to continue. Relative value to log likelihood.
- ``fix_emissions`` -- bool (default: False); if True, do not change emissions when updating
OUTPUT:
- changes the model in places, and returns the log likelihood and number of iterations.
EXAMPLES::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.1,0.9],[0.9,0.1]], [[.5,.5],[0,1]], [.2,.8]) sage: m.baum_welch([1,0]*20, log_likelihood_cutoff=0) (0.0, 4) sage: m # rel tol 1e-14 Discrete Hidden Markov Model with 2 States and 2 Emissions Transition matrix: [1.3515269707707603e-51 1.0] [ 1.0 0.0] Emission matrix: [ 1.0 6.462537138850569e-52] [ 0.0 1.0] Initial probabilities: [0.0000, 1.0000]
The following illustrates how Baum-Welch is only a local optimizer, i.e., the above model is far more likely to produce the sequence [1,0]*20 than the one we get below::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.5,0.5],[0.5,0.5]], [[.5,.5],[.5,.5]], [.5,.5]) sage: m.baum_welch([1,0]*20, log_likelihood_cutoff=0) (-27.725887222397784, 1) sage: m Discrete Hidden Markov Model with 2 States and 2 Emissions Transition matrix: [0.5 0.5] [0.5 0.5] Emission matrix: [0.5 0.5] [0.5 0.5] Initial probabilities: [0.5000, 0.5000]
We illustrate fixing emissions::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.1,0.9],[0.9,0.1]], [[.5,.5],[.2,.8]], [.2,.8]) sage: set_random_seed(0); v = m.sample(100) sage: m.baum_welch(v,fix_emissions=True) (-66.98630856918774, 100) sage: m.emission_matrix() [0.5 0.5] [0.2 0.8] sage: m = hmm.DiscreteHiddenMarkovModel([[0.1,0.9],[0.9,0.1]], [[.5,.5],[.2,.8]], [.2,.8]) sage: m.baum_welch(v) (-66.782360659293..., 100) sage: m.emission_matrix() # rel tol 1e-14 [ 0.5303085748626447 0.46969142513735535] [ 0.2909775550173978 0.7090224449826023] """ obs = self._emission_symbols_to_IntList(obs) cdef TimeSeries alpha, beta, scale, gamma, xi cdef double log_probability, log_probability_prev, delta cdef int i, j, k, N, n_iterations cdef Py_ssize_t t, T cdef double denominator_A, numerator_A, denominator_B, numerator_B
# Initialization
# Re-estimation
# Reestimate frequency of state i in time t=0
# Reestimate transition matrix and emissions probability in # each state.
# Initialization
# Compute the difference between the log probability of # two iterations.
# If the log probability does not change by more than # delta, then terminate
# Keep this -- it's for backwards compatibility with the GHMM based implementation """ TESTS::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.0,1.0],[0.5,0.5]], [1,0]) sage: sage.stats.hmm.hmm.unpickle_discrete_hmm_v0(m.transition_matrix(), m.emission_matrix(), m.initial_probabilities(), ['a','b'], 'test model') Discrete Hidden Markov Model with 2 States and 2 Emissions... """
r""" Return a :class:`DiscreteHiddenMarkovModel`, restored from the arguments.
This function is used internally for unpickling.
TESTS::
sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.0,1.0],[0.5,0.5]], [1,0],['a','b']) sage: m2 = loads(dumps(m)) # indirect doctest sage: m2 == m True
Test that :trac:`15711` has been resolved::
sage: str(m2) == str(m) True sage: m2.log_likelihood('baa'*2) == m.log_likelihood('baa'*2) True """ |