# Ticket #8547: trac_8547-take2-part2.patch

File trac_8547-take2-part2.patch, 18.4 KB (added by was, 12 years ago)

part 2; apply this and the previous patch

• ## sage/stats/hmm/chmm.pyx

# HG changeset patch
# User William Stein <wstein@gmail.com>
# Date 1270763713 25200
# Node ID 3b663ff7aae85cc171f09923eecdc15c5bde9c75
trac 8547 -- part 2 -- implement hidden markov models in Cython from scratch (so can remove the GHMM standard package from Sage)

diff -r add8a97d36c8 -r 3b663ff7aae8 sage/stats/hmm/chmm.pyx
 a from sage.misc.randstate cimport current_randstate, randstate # DELETE THIS FUNCTION WHEN MOVE Gaussian stuff to distributions.pyx!!! # TODO: DELETE THIS FUNCTION WHEN MOVE Gaussian stuff to distributions.pyx!!! (next version) cdef double random_normal(double mean, double std, randstate rstate): """ Return a number chosen randomly with given mean and standard deviation. INPUT: - mean -- double - std -- double, standard deviation - rstate -- a randstate object OUTPUT: - a double """ # Ported from http://users.tkk.fi/~nbeijar/soft/terrain/source_o2/boxmuller.c # This the box muller algorithm. # Client code can get the current random state from: if i < 0 or i >= self.N: raise IndexError, 'index out of range' # TODO: change to be a normal distribution class # TODO: change to be a normal distribution class (next version) return self.B[2*i], self.B[2*i+1] def __reduce__(self): cdef probability_init(self): """ Used internally to compute caching information that makes certain computations in the Baum-Welch algorithm faster. certain computations in the Baum-Welch algorithm faster.  This function has no input or output. """ self.prob = TimeSeries(2*self.N) cdef int i # Termination return alpha, scale, log_probability cdef TimeSeries _baum_welch_gamma(self, TimeSeries alpha, TimeSeries beta): # TODO: factor out to hmm.pyx cdef int j, N = self.N cdef Py_ssize_t t, T = alpha._length//N cdef TimeSeries gamma = TimeSeries(alpha._length, initialize=False) cdef double denominator for t in range(T): denominator = 0 for j in range(N): gamma._values[t*N + j] = alpha._values[t*N + j] * beta._values[t*N + j] denominator += gamma._values[t*N + j] for j in range(N): gamma._values[t*N + j] /= denominator return gamma cdef TimeSeries _baum_welch_xi(self, TimeSeries alpha, TimeSeries beta, TimeSeries obs): """ Used internally to compute the scaled quantity xi_t(i,j) appearing in the Baum-Welch reestimation algorithm. cdef TimeSeries _baum_welch_xi(self, TimeSeries alpha, TimeSeries beta, TimeSeries obs): # TODO: factor out to hmm.pyx """ Return 3-dimensional array of xi values. We have x[t,i,j] = x[t*N*N + i*N + j]. INPUT: - alpha -- TimeSeries as output by the scaled forward algorithm - beta -- TimeSeries as output by the scaled backward algorithm - obs -- TimeSeries of observations OUTPUT: - TimeSeries xi such that xi[t*N*N + i*N + j] = xi_t(i,j). """ cdef int i, j, N = self.N cdef double sum Gaussian mixture Hidden Markov Model. INPUT: - A  -- matrix; the N x N transition matrix - B -- list of pairs (mu,sigma) that define the distributions - pi -- initial state probabilities - A  -- matrix; the N x N transition matrix - B -- list of pairs (mu,sigma) that define the distributions - pi -- initial state probabilities - normalize --bool (default: True); if given, input is normalized to define valid probability distributions, e.g., the entries of A are made nonnegative and the rows sum to 1, and the probabilities in pi are normalized. EXAMPLES:: def __init__(self, A, B, pi=None, bint normalize=True): """ Initialize a Gaussian mixture hidden Markov model. EXAMPLES:: sage: hmm.GaussianMixtureHiddenMarkovModel([[.9,.1],[.4,.6]], [[(.4,(0,1)), (.6,(1,0.1))],[(1,(0,1))]], [.7,.3]) """ Used in pickling. EXAMPLES: EXAMPLES:: sage: m = hmm.GaussianMixtureHiddenMarkovModel([[1]], [[(.4,(0,1)), (.6,(1,0.1))]], [1]) sage: loads(dumps(m)) == m True cdef GaussianMixtureDistribution G = self.mixture[state] return G.prob(observation) # TODO: factor out to hmm.pyx cdef TimeSeries _baum_welch_gamma(self, TimeSeries alpha, TimeSeries beta): """ Returns gamma and gamma_all, where gamma_all contains the (double-precision float) numbers gamma_t(j,m), and gamma contains just the gamma_t(j). """ cdef int j, N = self.N cdef Py_ssize_t t, T = alpha._length//N cdef TimeSeries gamma = TimeSeries(alpha._length, initialize=False) cdef double denominator for t in range(T): denominator = 0 for j in range(N): gamma._values[t*N + j] = alpha._values[t*N + j] * beta._values[t*N + j] denominator += gamma._values[t*N + j] for j in range(N): gamma._values[t*N + j] /= denominator return gamma cdef TimeSeries _baum_welch_mixed_gamma(self, TimeSeries alpha, TimeSeries beta, TimeSeries obs, int j): """
• ## sage/stats/hmm/hmm.pxd

diff -r add8a97d36c8 -r 3b663ff7aae8 sage/stats/hmm/hmm.pxd
 a from sage.finance.time_series cimport TimeSeries from sage.stats.intlist cimport IntList cdef class HiddenMarkovModel: cdef int N cdef TimeSeries A, pi cdef TimeSeries _baum_welch_gamma(self, TimeSeries alpha, TimeSeries beta)
• ## sage/stats/hmm/hmm.pyx

diff -r add8a97d36c8 -r 3b663ff7aae8 sage/stats/hmm/hmm.pyx
 a double log(double) from sage.finance.time_series cimport TimeSeries from sage.stats.intlist cimport IntList from sage.matrix.all import is_Matrix, matrix from sage.misc.randstate cimport current_randstate, randstate Return number samples from this HMM of given length. INPUT: length -- positive integer number -- (default: None) if given, compute list of this many sample sequences - length -- positive integer - number -- (default: None) if given, compute list of this many sample sequences OUTPUT: if number is not given, return a single TimeSeries. if number is given, return a list of TimeSeries. - if number is not given, return a single TimeSeries. - if number is given, return a list of TimeSeries. EXAMPLES: EXAMPLES:: sage: set_random_seed(0) sage: a = hmm.DiscreteHiddenMarkovModel([[0.1,0.9],[0.1,0.9]], [[1,0],[0,1]], [0,1]) sage: print a.sample(10, 3) sage: list(a.sample(1000)).count(0) 88 If the emission symbols are set If the emission symbols are set:: sage: set_random_seed(0) sage: a = hmm.DiscreteHiddenMarkovModel([[0.5,0.5],[0.1,0.9]], [[1,0],[0,1]], [0,1], ['up', 'down']) sage: a.sample(10) cdef Py_ssize_t i return [self.generate_sequence(length)[0] for i in range(number)] ######################################################### # Some internal funcitons used for various general # HMM algorithms. ######################################################### cdef TimeSeries _baum_welch_gamma(self, TimeSeries alpha, TimeSeries beta): """ Used internally to compute the scaled quantity gamma_t(j) appearing in the Baum-Welch reestimation algorithm. The quantity gamma_t(j) is the (scaled!) probability of being in state j at time t, given the observation sequence. INPUT: - alpha -- TimeSeries as output by the scaled forward algorithm - beta -- TimeSeries as output by the scaled backward algorithm OUTPUT: - TimeSeries gamma such that gamma[t*N+j] is gamma_t(j). """ cdef int j, N = self.N cdef Py_ssize_t t, T = alpha._length//N cdef TimeSeries gamma = TimeSeries(alpha._length, initialize=False) cdef double denominator _sig_on for t in range(T): denominator = 0 for j in range(N): gamma._values[t*N + j] = alpha._values[t*N + j] * beta._values[t*N + j] denominator += gamma._values[t*N + j] for j in range(N): gamma._values[t*N + j] /= denominator _sig_off return gamma cdef class DiscreteHiddenMarkovModel(HiddenMarkovModel): """ A discrete Hidden Markov model implemented using double precision INPUT:: - A -- a list of lists or a square N x N matrix, whose - A -- a list of lists or a square N x N matrix, whose (i,j) entry gives the probability of transitioning from state i to state j. - B -- a list of N lists or a matrix with N rows, such that - B -- a list of N lists or a matrix with N rows, such that B[i,k] gives the probability of emitting symbol k while in state i. - pi -- the probabilities of starting in each initial - pi -- the probabilities of starting in each initial state, i.e,. pi[i] is the probability of starting in state i. - emission_symbols -- None or list (default: None); if - emission_symbols -- None or list (default: None); if None, the emission_symbols are the ints [0..N-1], where N is the number of states.  Otherwise, they are the entries of the list emissions_symbols, which must all be hashable. - normalize --bool (default: True); if given, input is - normalize --bool (default: True); if given, input is normalized to define valid probability distributions, e.g., the entries of A are made nonnegative and the rows sum to 1. sum to 1, and the probabilities in pi are normalized. EXAMPLES:: if self._emission_symbols is not None: self._emission_symbols_dict = dict([(y,x) for x,y in enumerate(emission_symbols)]) # TODO : normalization if not is_Matrix(B): B = matrix(B) if B.nrows() != self.N: """ Used in pickling. EXAMPLES: EXAMPLES:: sage: m = hmm.DiscreteHiddenMarkovModel([[0.4,0.6],[0.1,0.9]], [[0.0,1.0],[1,1]], [0,1], ['a','b']) sage: loads(dumps(m)) == m True INPUT: - obs -- sequence of observations - obs -- sequence of observations - scale -- boolean (default: True); if True, use rescaling - scale -- boolean (default: True); if True, use rescaling to overoid loss of precision due to the very limited dynamic range of floats.  You should leave this as True unless the obs sequence is very small. INPUT: - obs -- an integer list of observation states. - obs -- an integer list of observation states. OUTPUT: - float -- the log of the probability that the model produced this sequence - float -- the log of the probability that the model produced this sequence EXAMPLES:: INPUT: - obs -- an integer list of observation states. - obs -- an integer list of observation states. OUTPUT: - float -- the log of the probability that the model produced this sequence - float -- the log of the probability that the model produced this sequence EXAMPLES:: INPUT: - length -- positive integer - length -- positive integer OUTPUT: INPUT: - q -- a nonnegative integer, which specifies a state - r -- a real number between 0 and 1 - q -- a nonnegative integer, which specifies a state - r -- a real number between 0 and 1 OUTPUT: sage: set_random_seed(0) sage: m.generate_sequence(10)  # indirect test ([0, 1, 0, 0, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 1, 1, 0, 0, 0, 0]) """ cdef Py_ssize_t j cdef double a, accum = 0 INPUT: - seq -- sequence of emitted ints or symbols - seq -- sequence of emitted ints or symbols - log_scale -- bool (default: True) whether to scale the - log_scale -- bool (default: True) whether to scale the sequence in order to avoid numerical overflow. OUTPUT: - list -- "the" most probable sequence of hidden states, i.e., - list -- "the" most probable sequence of hidden states, i.e., the Viterbi path. - float -- log of probability that the observed sequence - float -- log of probability that the observed sequence was produced by the Viterbi sequence of states. EXAMPLES:: INPUT: - obs -- IntList - obs -- IntList OUTPUT: INPUT: - obs -- IntList - scale -- series that is *changed* in place, so that - obs -- IntList - scale -- series that is *changed* in place, so that after calling this function, scale[t] is value that is used to scale each of the \beta_t(i). INPUT: - obs -- IntList - obs -- IntList OUTPUT: # Termination return alpha, scale, log_probability cdef TimeSeries _baum_welch_gamma(self, TimeSeries alpha, TimeSeries beta): """ Used internally to compute the scaled quantity gamma_t(j) appearing in the Baum-Welch reestimation algorithm. The quantity gamma_t(j) is the (scaled!) probability of being in state j at time t, given the observation sequence. INPUT: - alpha -- TimeSeries as output by the scaled forward algorithm - beta -- TimeSeries as output by the scaled backward algorithm OUTPUT: - TimeSeries gamma such that gamma[t*N+j] is gamma_t(j). """ cdef int j, N = self.N cdef Py_ssize_t t, T = alpha._length//N cdef TimeSeries gamma = TimeSeries(alpha._length, initialize=False) cdef double denominator _sig_on for t in range(T): denominator = 0 for j in range(N): gamma._values[t*N + j] = alpha._values[t*N + j] * beta._values[t*N + j] denominator += gamma._values[t*N + j] for j in range(N): gamma._values[t*N + j] /= denominator _sig_off return gamma cdef TimeSeries _baum_welch_xi(self, TimeSeries alpha, TimeSeries beta, IntList obs): """ Used internally to compute the scaled quantity xi_t(i,j) INPUT: - alpha -- TimeSeries as output by the scaled forward algorithm - beta -- TimeSeries as output by the scaled backward algorithm - obs -- IntList of observations - alpha -- TimeSeries as output by the scaled forward algorithm - beta -- TimeSeries as output by the scaled backward algorithm - obs -- IntList of observations OUTPUT: INPUT: - obs -- list of emissions - obs -- list of emissions - max_iter -- integer (default: 100) maximum number - max_iter -- integer (default: 100) maximum number of Baum-Welch steps to take - log_likehood_cutoff -- positive float (default: 1e-4); - log_likehood_cutoff -- positive float (default: 1e-4); the minimal improvement in likelihood with respect to the last iteration required to continue. Relative value to log likelihood. - fix_emissions -- bool (default: False); if True, do not - fix_emissions -- bool (default: False); if True, do not change emissions when updating OUTPUT:
• ## sage/stats/hmm/util.pyx

diff -r add8a97d36c8 -r 3b663ff7aae8 sage/stats/hmm/util.pyx
 a sage: u.normalize_probability_TimeSeries(T,0,4) sage: T [0.0606, 0.1818, 0.4242, 0.3333] sage: T.sum() 1.0                   # 32-bit 0.99999999999999978   # 64-bit sage: abs(T.sum()-1) < 1e-8    # might not exactly equal 1 due to rounding True """ # One single bounds check only if i < 0 or j < 0 or i > T._length or j > T._length: