Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
""" Continuous Emission Hidden Markov Models
AUTHOR:
- William Stein, 2010-03 """
#***************************************************************************** # Copyright (C) 2010 William Stein <wstein@gmail.com> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # http://www.gnu.org/licenses/ #*****************************************************************************
from cpython.object cimport PyObject_RichCompare from libc.math cimport log, sqrt, exp, isnormal, isfinite, M_PI from cysignals.signals cimport sig_on, sig_off
from sage.finance.time_series cimport TimeSeries from sage.stats.intlist cimport IntList
from .hmm cimport HiddenMarkovModel from .util cimport HMM_Util from .distributions cimport GaussianMixtureDistribution
from sage.misc.randstate cimport current_randstate, randstate
# TODO: DELETE THIS FUNCTION WHEN MOVE Gaussian stuff to distributions.pyx!!! (next version) cdef double random_normal(double mean, double std, randstate rstate): """ Return a number chosen randomly with given mean and standard deviation.
INPUT:
- ``mean`` -- double - ``std`` -- double, standard deviation - ``rstate`` -- a randstate object
OUTPUT:
- a double """ # Ported from http://users.tkk.fi/~nbeijar/soft/terrain/source_o2/boxmuller.c # This the box muller algorithm. # Client code can get the current random state from: # cdef randstate rstate = current_randstate()
cdef double x1, x2, w, y1, y2
cdef class GaussianHiddenMarkovModel(HiddenMarkovModel): """ GaussianHiddenMarkovModel(A, B, pi)
Gaussian emissions Hidden Markov Model.
INPUT:
- ``A`` -- matrix; the N x N transition matrix - ``B`` -- list of pairs (mu,sigma) that define the distributions - ``pi`` -- initial state probabilities - ``normalize`` --bool (default: True)
EXAMPLES:
We illustrate the primary functions with an example 2-state Gaussian HMM::
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,1), (-1,1)], [.5,.5]); m Gaussian Hidden Markov Model with 2 States Transition matrix: [0.1 0.9] [0.5 0.5] Emission parameters: [(1.0, 1.0), (-1.0, 1.0)] Initial probabilities: [0.5000, 0.5000]
We query the defining transition matrix, emission parameters, and initial state probabilities::
sage: m.transition_matrix() [0.1 0.9] [0.5 0.5] sage: m.emission_parameters() [(1.0, 1.0), (-1.0, 1.0)] sage: m.initial_probabilities() [0.5000, 0.5000]
We obtain a sample sequence with 10 entries in it, and compute the logarithm of the probability of obtaining his sequence, given the model::
sage: obs = m.sample(10); obs [-1.6835, 0.0635, -2.1688, 0.3043, -0.3188, -0.7835, 1.0398, -1.3558, 1.0882, 0.4050] sage: m.log_likelihood(obs) -15.2262338077988...
We compute the Viterbi path, and probability that the given path of states produced obs::
sage: m.viterbi(obs) ([1, 0, 1, 0, 1, 1, 0, 1, 0, 1], -16.67738270170788)
We use the Baum-Welch iterative algorithm to find another model for which our observation sequence is more likely::
sage: m.baum_welch(obs) (-10.6103334957397..., 14) sage: m.log_likelihood(obs) -10.6103334957397...
Notice that running Baum-Welch changed our model::
sage: m # rel tol 3e-14 Gaussian Hidden Markov Model with 2 States Transition matrix: [ 0.4154981366185841 0.584501863381416] [ 0.9999993174253741 6.825746258991804e-07] Emission parameters: [(0.4178882427119503, 0.5173109664360919), (-1.5025208631331122, 0.5085512836055119)] Initial probabilities: [0.0000, 1.0000] """ cdef TimeSeries B, prob cdef int n_out
def __init__(self, A, B, pi, bint normalize=True): """ Create a Gaussian emissions HMM with transition probability matrix A, normal emissions given by B, and initial state probability distribution pi.
INPUT:
- A -- a list of lists or a square N x N matrix, whose (i,j) entry gives the probability of transitioning from state i to state j.
- B -- a list of N pairs (mu,std), where if B[i]=(mu,std), then the probability distribution associated with state i normal with mean mu and standard deviation std.
- pi -- the probabilities of starting in each initial state, i.e,. pi[i] is the probability of starting in state i.
- normalize --bool (default: True); if given, input is normalized to define valid probability distributions, e.g., the entries of A are made nonnegative and the rows sum to 1.
EXAMPLES::
sage: hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,1), (-1,1)], [.5,.5]) Gaussian Hidden Markov Model with 2 States Transition matrix: [0.1 0.9] [0.5 0.5] Emission parameters: [(1.0, 1.0), (-1.0, 1.0)] Initial probabilities: [0.5000, 0.5000]
We input a model in which both A and pi have to be renormalized to define valid probability distributions::
sage: hmm.GaussianHiddenMarkovModel([[-1,.7],[.3,.4]], [(1,1), (-1,1)], [-1,.3]) # rel tol 3e-14 Gaussian Hidden Markov Model with 2 States Transition matrix: [ 0.0 1.0] [0.42857142857142855 0.5714285714285714] Emission parameters: [(1.0, 1.0), (-1.0, 1.0)] Initial probabilities: [0.0000, 1.0000]
Bad things can happen::
sage: hmm.GaussianHiddenMarkovModel([[-1,.7],[.3,.4]], [(1,1), (-1,1)], [-1,.3], normalize=False) Gaussian Hidden Markov Model with 2 States Transition matrix: [-1.0 0.7] [ 0.3 0.4] ... """
# B should be a matrix of N rows, with column 0 the mean and 1 # the standard deviation. B = B.list() else:
def __richcmp__(self, other, op): """ Compare self and other, which must both be GaussianHiddenMarkovModel's.
EXAMPLES::
sage: m = hmm.GaussianHiddenMarkovModel([[1]], [(0,1)], [1]) sage: n = hmm.GaussianHiddenMarkovModel([[1]], [(1,1)], [1]) sage: m < n True sage: m == m True sage: n > m True sage: n < m False """ return NotImplemented
def __getitem__(self, Py_ssize_t i): """ Return the mean and standard distribution for the i-th state.
INPUT:
- i -- integer
OUTPUT:
- 2 floats
EXAMPLES::
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-2,.3)], [.5,.5]) sage: m[0] (1.0, 0.5) sage: m[1] (-2.0, 0.3) sage: m[-1] (-2.0, 0.3) sage: m[3] Traceback (most recent call last): ... IndexError: index out of range sage: m[-3] Traceback (most recent call last): ... IndexError: index out of range """
# TODO: change to be a normal distribution class (next version)
def __reduce__(self): """ Used in pickling.
EXAMPLES::
sage: G = hmm.GaussianHiddenMarkovModel([[1]], [(0,1)], [1]) sage: loads(dumps(G)) == G True """
def emission_parameters(self): """ Return the parameters that define the normal distributions associated to all of the states.
OUTPUT:
- a list B of pairs B[i] = (mu, std), such that the distribution associated to state i is normal with mean mu and standard deviation std.
EXAMPLES::
sage: hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9]).emission_parameters() [(1.0, 0.5), (-1.0, 3.0)] """ cdef Py_ssize_t i
def __repr__(self): r""" Return string representation.
EXAMPLES::
sage: hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9]).__repr__() 'Gaussian Hidden Markov Model with 2 States\nTransition matrix:\n[0.1 0.9]\n[0.5 0.5]\nEmission parameters:\n[(1.0, 0.5), (-1.0, 3.0)]\nInitial probabilities: [0.1000, 0.9000]' """
def generate_sequence(self, Py_ssize_t length, starting_state=None): """ Return a sample of the given length from this HMM.
INPUT:
- length -- positive integer - starting_state -- int (or None); if specified then generate a sequence using this model starting with the given state instead of the initial probabilities to determine the starting state.
OUTPUT:
- an IntList or list of emission symbols - TimeSeries of emissions
EXAMPLES::
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9]) sage: m.generate_sequence(5) ([-3.0505, 0.5317, -4.5065, 0.6521, 1.0435], [1, 0, 1, 0, 1]) sage: m.generate_sequence(0) ([], []) sage: m.generate_sequence(-1) Traceback (most recent call last): ... ValueError: length must be nonnegative
Example in which the starting state is 0 (see :trac:`11452`)::
sage: set_random_seed(23); m.generate_sequence(2) ([0.6501, -2.0151], [0, 1])
Force a starting state of 1 even though as we saw above it would be 0::
sage: set_random_seed(23); m.generate_sequence(2, starting_state=1) ([-3.1491, -1.0244], [1, 1])
Verify numerically that the starting state is 0 with probability about 0.1::
sage: set_random_seed(0) sage: v = [m.generate_sequence(1)[1][0] for i in range(10^5)] sage: 1.0 * v.count(int(0)) / len(v) 0.0998200000000000 """
# Create Integer lists for states and observations
# Setup variables, including random state. cdef Py_ssize_t i, j cdef double r, accum
# Choose the starting state. # See the remark in hmm.pyx about how this should get # replaced by some general fast discrete distribution code. else: else: raise ValueError("starting state must be between 0 and %s"%(self.N-1))
cdef double* row cdef int O else:
cdef probability_init(self): """ Used internally to compute caching information that makes certain computations in the Baum-Welch algorithm faster. This function has no input or output. """ cdef int i
cdef double random_sample(self, int state, randstate rstate): """ Return a random sample from the normal distribution associated to the given state.
This is only used internally, and no bounds or other error checking is done, so calling this improperly can lead to seg faults.
INPUT:
- state -- integer - rstate -- randstate instance
OUTPUT:
- double """
cdef double probability_of(self, int state, double observation): """ Return a useful continuous analogue of "the probability b_j(o)" of seeing the given observation given that we're in the given state j (=state).
The distribution is a normal distribution, and we're asking about the probability of a particular point being observed; the probability of a particular point is 0, which is not useful. Thus we instead consider the limit p = prob([o,o+d])/d as d goes to 0. There is a simple closed form formula for p, derived in the source code. Note that p can be bigger than 1; for example, if we set observation=mean in the closed formula we get p=1/(sqrt(2*pi)*std), so p>1 when std<1/sqrt(2*pi).
INPUT:
- state -- integer - observation -- double
OUTPUT:
- double """ # The code below is an optimized version of the following code: # cdef double mean = self.B._values[2*state], \ # std = self.B._values[2*state+1] # return 1/(sqrt2pi*std) * \ # exp(-(observation-mean)*(observation-mean)/(2*std*std)) # # Here is how to use Sage to verify that the above formula computes # the limit claimed above: # # var('x,d,obs,mean,std') # n = 1/sqrt(2*pi*std^2) * exp(-(x-mean)^2/(2*std^2)) # assume(std>0); assume(d>0) # m = n.integrate(x,obs,obs+d)/d # p = SR(m.limit(d=0).simplify_full()) # q = 1/(sqrt(2*pi)*std) * exp(-(obs-mean)*(obs-mean)/(2*std*std)) # bool(p==q) # outputs True
def log_likelihood(self, obs): """ Return the logarithm of a continuous analogue of the probability that this model produced the given observation sequence.
Note that the "continuous analogue of the probability" above can be bigger than 1, hence the logarithm can be positive.
INPUT:
- obs -- sequence of observations
OUTPUT:
- float
EXAMPLES::
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9]) sage: m.log_likelihood([1,1,1]) -4.297880766072486 sage: set_random_seed(0); s = m.sample(20) sage: m.log_likelihood(s) -40.115714129484... """ return 1.0
def _forward_scale(self, TimeSeries obs): """ Memory-efficient implementation of the forward algorithm (with scaling).
INPUT:
- obs -- an integer list of observation states.
OUTPUT:
- float -- the log of the probability that the model produced this sequence
EXAMPLES::
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9]) sage: m._forward_scale(stats.TimeSeries([1,-1,-1,1])) -7.641988207069133 """
# The running sum of the log probabilities
# Initialization
# Induction cdef double s
# Termination
def viterbi(self, obs): """ Determine "the" hidden sequence of states that is most likely to produce the given sequence seq of observations, along with the probability that this hidden sequence actually produced the observation.
INPUT:
- seq -- sequence of emitted ints or symbols
OUTPUT:
- list -- "the" most probable sequence of hidden states, i.e., the Viterbi path.
- float -- log of probability that the observed sequence was produced by the Viterbi sequence of states.
EXAMPLES:
We find the optimal state sequence for a given model::
sage: m = hmm.GaussianHiddenMarkovModel([[0.5,0.5],[0.5,0.5]], [(0,1),(10,1)], [0.5,0.5]) sage: m.viterbi([0,1,10,10,1]) ([0, 0, 1, 1, 0], -9.0604285688230...)
Another example in which the most likely states change based on the last observation::
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9]) sage: m.viterbi([-2,-1,.1,0.1]) ([1, 1, 0, 1], -9.61823698847639...) sage: m.viterbi([-2,-1,.1,0.3]) ([1, 1, 1, 0], -9.566023653378513) """ cdef TimeSeries _obs else:
# The algorithm is the same as _viterbi above, except # we take the logarithms of everything first, and add # instead of multiply. return state_sequence, 0.0
# delta[i] is the maximum of the probabilities over all # paths ending in state i.
# We view psi as an N x T matrix of ints. The quantity # psi[N*t + j] # is a most probable hidden state at time t-1, given # that j is a most probable state at time j.
# Log Preprocessing
# Initialization:
# Recursion: cdef int index
# Compute delta_t[j] = max_i(delta_{t-1}(i) a_{i,j}) * b_j(_obs[t])
# Termination:
# Path (state sequence) backtracking:
cdef TimeSeries _backward_scale_all(self, TimeSeries obs, TimeSeries scale): """ This function returns the matrix beta_t(i), and is used internally as part of the Baum-Welch algorithm.
The quantity beta_t(i) is the probability of observing the sequence obs[t+1:] assuming that the model is in state i at time t.
INPUT:
- obs -- TimeSeries - scale -- TimeSeries
OUTPUT:
- TimeSeries beta such that beta_t(i) = beta[t*N + i] - scale is also changed by this function """ cdef double s
# 1. Initialization
# 2. Induction self.probability_of(j, obs._values[t+1]) * beta._values[(t+1)*N+j]
cdef _forward_scale_all(self, TimeSeries obs): """ Return scaled values alpha_t(i), the sequence of scalings, and the log probability.
The quantity alpha_t(i) is the probability of observing the sequence obs[:t+1] assuming that the model is in state i at time t.
INPUT:
- obs -- TimeSeries
OUTPUT:
- TimeSeries alpha with alpha_t(i) = alpha[t*N + i] - TimeSeries scale with scale[t] the scaling at step t - float -- log_probability of the observation sequence being produced by the model. """
# The running some of the log probabilities
# Initialization
# Induction # The code below is just an optimized version of: # alpha = (alpha * A).pairwise_product(B[O[t+1]]) # alpha = alpha.scale(1/alpha.sum()) # along with keeping track of the log of the scaling factor. cdef double s
# Termination
cdef TimeSeries _baum_welch_xi(self, TimeSeries alpha, TimeSeries beta, TimeSeries obs): """ Used internally to compute the scaled quantity xi_t(i,j) appearing in the Baum-Welch reestimation algorithm.
INPUT:
- alpha -- TimeSeries as output by the scaled forward algorithm - beta -- TimeSeries as output by the scaled backward algorithm - obs -- TimeSeries of observations
OUTPUT:
- TimeSeries xi such that xi[t*N*N + i*N + j] = xi_t(i,j). """ cdef double sum self.A._values[i*N+j] * self.probability_of(j, obs._values[t+1])
def baum_welch(self, obs, int max_iter=500, double log_likelihood_cutoff=1e-4, double min_sd=0.01, bint fix_emissions=False, bint v=False): """ Given an observation sequence obs, improve this HMM using the Baum-Welch algorithm to increase the probability of observing obs.
INPUT:
- obs -- a time series of emissions
- max_iter -- integer (default: 500) maximum number of Baum-Welch steps to take
- log_likelihood_cutoff -- positive float (default: 1e-4); the minimal improvement in likelihood with respect to the last iteration required to continue. Relative value to log likelihood.
- min_sd -- positive float (default: 0.01); when reestimating, the standard deviation of emissions is not allowed to be less than min_sd.
- fix_emissions -- bool (default: False); if True, do not change emissions when updating
OUTPUT:
- changes the model in places, and returns the log likelihood and number of iterations.
EXAMPLES::
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9]) sage: m.log_likelihood([-2,-1,.1,0.1]) -8.858282215986275 sage: m.baum_welch([-2,-1,.1,0.1]) (4.534646052182..., 7) sage: m.log_likelihood([-2,-1,.1,0.1]) 4.534646052182... sage: m # rel tol 3e-14 Gaussian Hidden Markov Model with 2 States Transition matrix: [ 0.9999999992430161 7.569839394440382e-10] [ 0.49998462791192644 0.5000153720880736] Emission parameters: [(0.09999999999999999, 0.01), (-1.4999508147591902, 0.5000710504895474)] Initial probabilities: [0.0000, 1.0000]
We illustrate bounding the standard deviation below. Note that above we had different emission parameters when the min_sd was the default of 0.01::
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9]) sage: m.baum_welch([-2,-1,.1,0.1], min_sd=1) (-4.07939572755..., 32) sage: m.emission_parameters() [(-0.2663018798..., 1.0), (-1.99850979..., 1.0)]
We watch the log likelihoods of the model converge, step by step::
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.5,.5]], [(1,.5), (-1,3)], [.1,.9]) sage: v = m.sample(10) sage: stats.TimeSeries([m.baum_welch(v,max_iter=1)[0] for _ in range(len(v))]) [-20.1167, -17.7611, -16.9814, -16.9364, -16.9314, -16.9309, -16.9309, -16.9309, -16.9309, -16.9309]
We illustrate fixing emissions::
sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.9,.1]], [(1,2),(-1,.5)], [.3,.7]) sage: set_random_seed(0); v = m.sample(100) sage: m.baum_welch(v,fix_emissions=True) (-164.72944548204..., 23) sage: m.emission_parameters() [(1.0, 2.0), (-1.0, 0.5)] sage: m = hmm.GaussianHiddenMarkovModel([[.1,.9],[.9,.1]], [(1,2),(-1,.5)], [.3,.7]) sage: m.baum_welch(v) (-162.854370397998..., 49) sage: m.emission_parameters() # rel tol 3e-14 [(1.2722419172602375, 2.371368751761901), (-0.9486174675179113, 0.5762360385123765)] """ cdef TimeSeries alpha, beta, scale, gamma, xi cdef double log_probability, log_probability0, log_probability_prev, delta cdef int i, j, k, N, n_iterations cdef Py_ssize_t t, T cdef double denominator_A, numerator_A, denominator_B, numerator_mean, numerator_std
# Initialization return (0.0, 0)
# Re-estimation # Reestimate # Before raising an error, leave self in a valid state. util.normalize_probability_TimeSeries(self.pi, 0, self.pi._length) raise RuntimeError("impossible to compute gamma during reestimation")
# Update the probabilities pi to define a valid discrete distribution
# Reestimate transition matrix and emission probabilities in # each state. # Compute the updated transition matrix raise RuntimeError("unable to re-estimate transition matrix")
# Rescale the i-th row of the transition matrix to be # a valid stochastic matrix:
raise RuntimeError("unable to re-estimate emission probabilities")
(_obs._values[t] - self.B._values[2*i])*(_obs._values[t] - self.B._values[2*i]) # restimated mean # restimated standard deviation
# Initialization for next iteration
# Compute the difference between the log probability of # two iterations.
# If the log probability does not change by more than delta, # then terminate
cdef class GaussianMixtureHiddenMarkovModel(GaussianHiddenMarkovModel): """ GaussianMixtureHiddenMarkovModel(A, B, pi)
Gaussian mixture Hidden Markov Model.
INPUT:
- ``A`` -- matrix; the N x N transition matrix
- ``B`` -- list of mixture definitions for each state. Each state may have a varying number of gaussians with selection probabilities that sum to 1 and encoded as (p,(mu,sigma))
- ``pi`` -- initial state probabilities
- ``normalize`` --bool (default: True); if given, input is normalized to define valid probability distributions, e.g., the entries of A are made nonnegative and the rows sum to 1, and the probabilities in pi are normalized.
EXAMPLES::
sage: A = [[0.5,0.5],[0.5,0.5]] sage: B = [[(0.9,(0.0,1.0)), (0.1,(1,10000))],[(1,(1,1)), (0,(0,0.1))]] sage: hmm.GaussianMixtureHiddenMarkovModel(A, B, [1,0]) Gaussian Mixture Hidden Markov Model with 2 States Transition matrix: [0.5 0.5] [0.5 0.5] Emission parameters: [0.9*N(0.0,1.0) + 0.1*N(1.0,10000.0), 1.0*N(1.0,1.0) + 0.0*N(0.0,0.1)] Initial probabilities: [1.0000, 0.0000]
TESTS:
If a standard deviation is 0, it is normalized to be slightly bigger than 0.::
sage: hmm.GaussianMixtureHiddenMarkovModel([[1]], [[(1,(0,0))]], [1]) Gaussian Mixture Hidden Markov Model with 1 States Transition matrix: [1.0] Emission parameters: [1.0*N(0.0,1e-08)] Initial probabilities: [1.0000]
We test that number of emission distributions must be the same as the number of states::
sage: hmm.GaussianMixtureHiddenMarkovModel([[1]], [], [1]) Traceback (most recent call last): ... ValueError: number of GaussianMixtures must be the same as number of entries of pi
sage: hmm.GaussianMixtureHiddenMarkovModel([[1]], [[]], [1]) Traceback (most recent call last): ... ValueError: must specify at least one component of the mixture model
We test that the number of initial probabilities must equal the number of states::
sage: hmm.GaussianMixtureHiddenMarkovModel([[1]], [[]], [1,2]) Traceback (most recent call last): ... ValueError: number of entries of transition matrix A must be the square of the number of entries of pi """
cdef object mixture # mixture
def __init__(self, A, B, pi=None, bint normalize=True): """ Initialize a Gaussian mixture hidden Markov model.
EXAMPLES::
sage: hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3]) Gaussian Mixture Hidden Markov Model with 2 States Transition matrix: [0.9 0.1] [0.4 0.6] Emission parameters: [0.4*N(0.0,1.0) + 0.6*N(1.0,0.1), 1.0*N(0.0,1.0)] Initial probabilities: [0.7000, 0.3000] """ raise ValueError("number of entries of transition matrix A must be the square of the number of entries of pi")
def __repr__(self): r""" Return string representation.
EXAMPLES::
sage: hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3]).__repr__() 'Gaussian Mixture Hidden Markov Model with 2 States\nTransition matrix:\n[0.9 0.1]\n[0.4 0.6]\nEmission parameters:\n[0.4*N(0.0,1.0) + 0.6*N(1.0,0.1), 1.0*N(0.0,1.0)]\nInitial probabilities: [0.7000, 0.3000]' """
def __reduce__(self): """ Used in pickling.
EXAMPLES::
sage: m = hmm.GaussianMixtureHiddenMarkovModel([[1]], [[(.4,(0,1)), (.6,(1,0.1))]], [1]) sage: loads(dumps(m)) == m True """
def __richcmp__(self, other, op): """ Compare self and other, which must both be GaussianMixtureHiddenMarkovModel's.
EXAMPLES::
sage: m = hmm.GaussianMixtureHiddenMarkovModel([[1]], [[(.4,(0,1)), (.6,(1,0.1))]], [1]) sage: n = hmm.GaussianMixtureHiddenMarkovModel([[1]], [[(.5,(0,1)), (.5,(1,0.1))]], [1]) sage: m < n True sage: m == m True sage: n > m True sage: n < m False """ return NotImplemented
def __getitem__(self, Py_ssize_t i): """ Return the Gaussian mixture distribution associated to the i-th state.
INPUT:
- i -- integer
OUTPUT:
- a Gaussian mixture distribution object
EXAMPLES::
sage: m = hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3]) sage: m[0] 0.4*N(0.0,1.0) + 0.6*N(1.0,0.1) sage: m[1] 1.0*N(0.0,1.0)
Negative indexing works::
sage: m[-1] 1.0*N(0.0,1.0)
Bounds are checked::
sage: m[2] Traceback (most recent call last): ... IndexError: index out of range sage: m[-3] Traceback (most recent call last): ... IndexError: index out of range """
def emission_parameters(self): """ Returns a list of all the emission distributions.
OUTPUT:
- list of Gaussian mixtures
EXAMPLES::
sage: m = hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3]) sage: m.emission_parameters() [0.4*N(0.0,1.0) + 0.6*N(1.0,0.1), 1.0*N(0.0,1.0)] """
cdef double random_sample(self, int state, randstate rstate): """ Return a random sample from the normal distribution associated to the given state.
This is only used internally, and no bounds or other error checking is done, so calling this improperly can lead to seg faults.
INPUT:
- state -- integer - rstate -- randstate instance
OUTPUT:
- double """
cdef double probability_of(self, int state, double observation): """ Return the probability b_j(o) of see the given observation o (=observation) given that we're in the given state j (=state).
This is a continuous probability, so this really returns a number p such that the probability of a value in the interval [o,o+d] is p*d.
INPUT:
- state -- integer - observation -- double
OUTPUT:
- double """
cdef TimeSeries _baum_welch_mixed_gamma(self, TimeSeries alpha, TimeSeries beta, TimeSeries obs, int j): """ Let gamma_t(j,m) be the m-component (in the mixture) of the probability of being in state j at time t, given the observation sequence. This function outputs a TimeSeries v such that v[m*T + t] gives gamma_t(j, m) where T is the number of time steps.
INPUT:
- alpha -- TimeSeries - beta -- TimeSeries - obs -- TimeSeries - j -- int
OUTPUT:
- TimeSeries """
cdef double numer, alpha_minus, P, s, prob
# If the probability of observing obs[t] in state j is 0, then # all of the m-mixture components *have* to automatically be 0, # since prob is the sum of those and they are all nonnegative. for m in range(M): mixed_gamma._values[m*T + t] = 0 else: # Compute the denominator we used when scaling gamma. # The key thing is that this is consistent between # gamma and mixed_gamma.
# Divide out the total probability, so we can multiply back in # the m-components of the probability.
def baum_welch(self, obs, int max_iter=1000, double log_likelihood_cutoff=1e-12, double min_sd=0.01, bint fix_emissions=False): """ Given an observation sequence obs, improve this HMM using the Baum-Welch algorithm to increase the probability of observing obs.
INPUT:
- obs -- a time series of emissions - max_iter -- integer (default: 1000) maximum number of Baum-Welch steps to take - log_likelihood_cutoff -- positive float (default: 1e-12); the minimal improvement in likelihood with respect to the last iteration required to continue. Relative value to log likelihood. - min_sd -- positive float (default: 0.01); when reestimating, the standard deviation of emissions is not allowed to be less than min_sd. - fix_emissions -- bool (default: False); if True, do not change emissions when updating
OUTPUT:
- changes the model in places, and returns the log likelihood and number of iterations.
EXAMPLES::
sage: m = hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3]) sage: set_random_seed(0); v = m.sample(10); v [0.3576, -0.9365, 0.9449, -0.6957, 1.0217, 0.9644, 0.9987, -0.5950, -1.0219, 0.6477] sage: m.log_likelihood(v) -8.31408655939536... sage: m.baum_welch(v) (2.18905068682..., 15) sage: m.log_likelihood(v) 2.18905068682... sage: m # rel tol 6e-14 Gaussian Mixture Hidden Markov Model with 2 States Transition matrix: [ 0.8746363339773399 0.12536366602266016] [ 1.0 1.451685202290174e-40] Emission parameters: [0.500161629343*N(-0.812298726239,0.173329026744) + 0.499838370657*N(0.982433690378,0.029719932009), 1.0*N(0.503260056832,0.145881515324)] Initial probabilities: [0.0000, 1.0000]
We illustrate bounding the standard deviation below. Note that above we had different emission parameters when the min_sd was the default of 0.01::
sage: m = hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3]) sage: m.baum_welch(v, min_sd=1) (-12.617885761692..., 1000) sage: m.emission_parameters() [0.503545634447*N(0.200166509595,1.0) + 0.496454365553*N(0.200166509595,1.0), 1.0*N(0.0543433426535,1.0)]
We illustrate fixing all emissions::
sage: m = hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3]) sage: set_random_seed(0); v = m.sample(10) sage: m.baum_welch(v, fix_emissions=True) (-7.58656858997..., 36) sage: m.emission_parameters() [0.4*N(0.0,1.0) + 0.6*N(1.0,0.1), 1.0*N(0.0,1.0)] """ obs = TimeSeries(obs) cdef TimeSeries alpha, beta, scale, gamma, mixed_gamma, mixed_gamma_m, xi cdef double log_probability, log_probability0, log_probability_prev, delta cdef int i, j, k, m, N, n_iterations cdef Py_ssize_t t, T cdef double denominator_A, numerator_A, denominator_B, numerator_mean, numerator_std, \ numerator_c, c, mu, std, numer, denom, new_mu, new_std, new_c, s cdef GaussianMixtureDistribution G
# Initialization return (0.0, 0)
# Re-estimation
# Reestimate frequency of state i in time t=0 # Before raising an error, leave self in a valid state. util.normalize_probability_TimeSeries(self.pi, 0, self.pi._length) raise RuntimeError("impossible to compute gamma during reestimation")
# Update the probabilities pi to define a valid discrete distribution
# Reestimate transition matrix and emission probabilities in # each state. # Reestimate the state transition matrix raise RuntimeError("unable to re-estimate pi (1)")
# Rescale the i-th row of the transition matrix to be # a valid stochastic matrix:
######################################################################## # Re-estimate the emission probabilities ######################################################################## new_G.append(G[m]) continue
# Compute re-estimated mu_{j,m}
# Compute re-estimated standard deviation (_obs._values[t] - mu)*(_obs._values[t] - mu)
# Compute re-estimated weighting coefficient
######################################################################## # Initialization for next iteration ########################################################################
# Compute the difference between the log probability of # two iterations.
# If the log probability does not improve by more than # delta, then terminate
################################################## # For Unpickling ##################################################
# We keep the _v0 function for backwards compatible. """ EXAMPLES::
sage: m = hmm.GaussianHiddenMarkovModel([[1]], [(0,1)], [1]) sage: sage.stats.hmm.chmm.unpickle_gaussian_hmm_v0(m.transition_matrix(), m.emission_parameters(), m.initial_probabilities(), 'test') Gaussian Hidden Markov Model with 1 States Transition matrix: [1.0] Emission parameters: [(0.0, 1.0)] Initial probabilities: [1.0000] """
""" EXAMPLES::
sage: m = hmm.GaussianHiddenMarkovModel([[1]], [(0,1)], [1]) sage: loads(dumps(m)) == m # indirect test True """
""" EXAMPLES::
sage: m = hmm.GaussianMixtureHiddenMarkovModel([[1]], [[(.4,(0,1)), (.6,(1,0.1))]], [1]) sage: loads(dumps(m)) == m # indirect test True """
|