Source code for logio.dynamic_time_warping.step_pattern

import numpy as np
import matplotlib.pyplot as plt
import networkx as nx

##DOCS
"""
Dynamic Time Warping (DTW)
**Details**

A step pattern defines teh allowed transitions between matched pairs of logs in a
specific of a DTW variant. They also known as local-constraints or transition types.

A variety of classification schemes have been proposed for step
patterns, including Sakoe-Chiba (Sakoe1978); Rabiner-Juang
(Rabiner1993); and Rabiner-Myers (Myers1980). The dynamic_time_warping module
implements some of the transition types found in those papers.

**Pre-defined step patterns**

      ## Well-known step patterns:
      symmetric1
      symmetric2
      asymmetric

      ## Slope-constrained step patterns from Sakoe-Chiba (Sakoe1978)
      symmetricP0;  asymmetricP0
      symmetricP05; asymmetricP05
      symmetricP1;  asymmetricP1
      symmetricP2;  asymmetricP2

      ## Step patterns classified according to Rabiner-Myers (Myers1980)
      typeIa;   typeIb;   typeIc;   typeId;
      typeIas;  typeIbs;  typeIcs;  typeIds;  
      typeIIa;  typeIIb;  typeIIc;  typeIId;
      typeIIIc; typeIVc;


For convenience, we review pre-defined step patterns grouped by
classification. Note that the same pattern may be listed under different
names.

**1. Well-known step patterns**

Common DTW implementations are based on one of the following transition
types.

``symmetric2`` is the normalizable, symmetric, with no local slope
constraints. It can be normalized dividing by ``N+M``
(query+reference lengths). It is widely used and the default.

``asymmetric`` is asymmetric, slope constrained between 0 and 2. Matches
each element of the query logs exactly once, so the warping path
is guaranteed to be single-valued. Normalized by ``N`` (length of query).

``symmetric1`` has no local constraint, It is non-normalizable.

**2. The Rabiner-Juang set**

A comprehensive table of step patterns is proposed in Rabiner-Juang’s
book (Rabiner1993).
The classification foresees seven types, labelled with Roman numerals
I-VII. Each type has four slope weighting sub-types, named in
“Type (a)” to “Type (d)”.

     Subtype | Rule       | Norm | Unbiased 
     --------|------------|------|---------
        a    | min step   |  --  |   NO 
        b    | max step   |  --  |   NO 
        c    | Di step    |   N  |  YES 
        d    | Di+Dj step | N+M  |  YES 

**3. The Sakoe-Chiba set**

Sakoe-Chiba (Sakoe1978) discuss a family of slope-constrained patterns;
they are implemented as well. Here, they are called
``symmetricP<x>`` and ``asymmetricP<x>``, where ``<x>`` corresponds to
Sakoe’s integer slope parameter *P*. Values available are accordingly:
``0`` (no constraint), ``1``, ``05`` (one half) and ``2``. See
(Sakoe1978) for details.

**4. The Rabiner-Myers set**

The ``type<XX><y>`` step patterns follow the older Rabiner-Myers’
classification proposed in (Myers1980) and (MRR1980). Note that this is
a subset of the Rabiner-Juang set (Rabiner1993), and the latter should
be preferred in order to avoid confusion. ``<XX>`` is a Roman numeral
specifying the shape of the transitions; ``<y>`` is a letter in the
range ``a-d`` specifying the weighting used per step, as above;
``typeIIx`` patterns also have a version ending in ``s``, meaning the
smoothing is used (which does not permit skipping points). The
``typeId, typeIId`` and ``typeIIds`` are unbiased and symmetric.

References
----------

-  (GiorginoJSS) Toni Giorgino. *Computing and Visualizing Dynamic Time
   Warping Alignments in R: The dtw Package.* Journal of Statistical
   Software, 31(7), 1-24.
   `doi:10.18637/jss_v031.i07 <https://doi.org/10.18637/jss_v031.i07>`__
-  (Itakura1975) Itakura, F., *Minimum prediction residual principle
   applied to speech recognition,* Acoustics, Speech, and Signal
   Processing, IEEE Transactions on , vol.23, no.1, pp. 67-72, Feb 1975.
   `doi:10.1109/TASSP.1975.1162641 <https://doi.org/10.1109/TASSP.1975.1162641>`__
-  (MRR1980) Myers, C.; Rabiner, L. & Rosenberg, A. *Performance
   tradeoffs in dynamic time warping algorithms for isolated word
   recognition*, IEEE Trans. Acoust., Speech, Signal Process., 1980, 28,
   623-635.
   `doi:10.1109/TASSP.1980.1163491 <https://doi.org/10.1109/TASSP.1980.1163491>`__
-  (Mori2006) Mori, A.; Uchida, S.; Kurazume, R.; Taniguchi, R.;
   Hasegawa, T. & Sakoe, H. Early Recognition and Prediction of Gestures
   Proc. 18th International Conference on Pattern Recognition ICPR 2006,
   2006, 3, 560-563.
   `doi:10.1109/ICPR.2006.467 <https://doi.org/10.1109/ICPR.2006.467>`__
-  (Myers1980) Myers, Cory S. *A Comparative Study Of Several Dynamic
   Time Warping Algorithms For Speech Recognition*, MS and BS thesis,
   Dept. of Electrical Engineering and Computer Science, Massachusetts
   Institute of Technology, archived Jun 20 1980,
   https://hdl_handle_net/1721.1/27909
-  (Rabiner1993) Rabiner, L. R., & Juang, B.-H. (1993). *Fundamentals of
   speech recognition.* Englewood Cliffs, NJ: Prentice Hall.
-  (Sakoe1978) Sakoe, H.; Chiba, S., *Dynamic programming algorithm
   optimization for spoken word recognition,* Acoustics, Speech, and
   Signal Processing, IEEE Transactions on , vol.26, no.1, pp. 43-49,
   Feb 1978
   `doi:10.1109/TASSP.1978.1163055 <https://doi.org/10.1109/TASSP.1978.1163055>`__
"""

def _num_to_str(num):
    if type(num) == int:
        return str(num)
    elif type(num) == float:
        return "{0:1.2f}".format(num)
    else:
        return str(num)

[docs]class BasePattern(): """Step pattern base class. A ``BasePattern`` object lists the transitions allowed while searching for the minimum-distance path. **Methods** ``Plot`` ``_normalize`` ``_get_array`` ``__repr__`` prints a user-readable description of the recurrence equation defined by the given pattern. """
[docs] def __init__(self): # number of patterns self.num_pattern = len(self.pattern_info) # max length of pattern self.max_pattern_len = max([len(pi["indices"]) for pi in self.pattern_info]) self._get_array()
@property def is_normalizable(self): return self.normalize_guide != "none"
[docs] def plot(self): """Visualize step pattern. """ plt.figure(figsize=(6,6)) if not hasattr(self, "_graph"): self._gen_graph() nx.draw_networkx_nodes(self._graph, pos=self._graph_layout) nx.draw_networkx_edges(self._graph, pos=self._graph_layout) nx.draw_networkx_edge_labels(self._graph, pos=self._graph_layout, edge_labels=self._edge_labels) min_index = min([min(pat["indices"][0]) for pat in self.pattern_info]) plt.xlim([min_index - 0.5, 0.5]) plt.ylim([min_index - 0.5, 0.5]) plt.title(self.label + str(" pattern")) plt.xlabel("query index") plt.ylabel("reference index") plt.show()
def _normalize(self, row, n, m): """Normalize **Details** It is necessary to normalize the log measurements that have been selected for dynamic time warping. Variation in different units such as Hz or ms is unlikely to be equivalent! It appears that how parameters are normalized `normalization_guide` plays a big role in the overall success of the DTW algorithm. row : 1D array expect last row of D n : int length of query (D.shape[0]) m : int length of reference (D.shape[1]) """ if not self.is_normalizable: return None if self.normalize_guide == "N+M": return row/(n + np.arange(1, m+1)) elif self.normalize_guide == "N": return row/n elif self.normalize_guide == "M": return row/np.arange(1, m+1) else: raise Exception() def _gen_graph(self): graph = nx.DiGraph() graph_layout = dict() edge_labels = dict() node_names = [] # set node for pidx, pat in enumerate(self.pattern_info): step_len = len(pat["indices"]) nn = [] for sidx in range(step_len): node_name = str(pidx) + str(sidx) graph.add_node(node_name) graph_layout[node_name] = \ np.array(pat["indices"][sidx]) nn.append(node_name) node_names.append(nn) # set edge for pidx, pat in enumerate(self.pattern_info): step_len = len(pat["indices"]) for sidx in range(step_len-1): graph.add_edge(node_names[pidx][sidx], node_names[pidx][sidx+1]) edge_labels[(node_names[pidx][sidx], node_names[pidx][sidx+1])] = _num_to_str(pat["weights"][sidx]) self._graph = graph self._graph_layout = graph_layout self._edge_labels = edge_labels def _get_array(self): """Get pattern information as np.ndarray. (number of patterns * max number of steps * 3(Node*2,Edge*1)) ex) in the case of asymmetricP1: number of patterns: 3 max number of steps: 3 --------------- Node | Edge ---------|----- -1 | -2 | --- ----|----|----- 0 | -1 | 0.5 ----|----|----- 0 | 0 | 0.5 --------------- --------------- Node | Edge ---------|----- -1 | -1 | --- ----|----|----- 0 | 0 | 1.0 ----|----|----- 0 | 0 | --- --------------- --------------- Node | Edge ---------|----- -2 | -1 | --- ----|----|----- -1 | 0 | 1.0 ----|----|----- 0 | 0 | 1.0 --------------- """ array = np.zeros([self.num_pattern, self.max_pattern_len, 3], dtype="float") for pidx in range(self.num_pattern): pattern_len = len(self.pattern_info[pidx]["indices"]) for sidx in range(pattern_len): array[pidx, sidx, 0:2] = self.pattern_info[pidx]["indices"][sidx] if sidx == 0: array[pidx, sidx, 2] = np.nan else: array[pidx, sidx, 2] = self.pattern_info[pidx]["weights"][sidx-1] self.array = array def __repr__(self): p_info = self.pattern_info rv = self.label + " pattern: \n\n" for pidx in range(self.num_pattern): rv += "pattern " + str(pidx) + ": " pattern_len = len(p_info[pidx]["indices"]) p_str = str(p_info[pidx]["indices"][0]) for sidx in range(1, pattern_len): p_str += " - [" p_str += _num_to_str(p_info[pidx]["weights"][sidx-1]) p_str += "] - " p_str += str(p_info[pidx]["indices"][sidx]) rv += p_str + "\n" rv += "\nnormalization guide: " + self.normalize_guide return rv
[docs]class Symmetric1(BasePattern): label = "symmetric1" pattern_info = [ dict( indices=[(-1, 0), (0, 0)], weights=[1] ), dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(0, -1), (0, 0)], weights=[1] ) ] normalize_guide = "none" def __init__(self): super().__init__()
[docs]class Symmetric2(BasePattern): label = "symmetric2" pattern_info = [ dict( indices=[(-1, 0), (0, 0)], weights=[1] ), dict( indices=[(-1, -1), (0, 0)], weights=[2] ), dict( indices=[(0, -1), (0, 0)], weights=[1] ) ] normalize_guide = "N+M" def __init__(self): super().__init__()
[docs]class SymmetricP0(Symmetric2): """Same as symmetric2 pattern.""" label = "symmetricP05"
[docs]class SymmetricP05(BasePattern): label = "symmetricP05" pattern_info = [ dict( indices=[(-1, -3), (0, -2), (0, -1), (0, 0)], weights=[2, 1, 1] ), dict( indices=[(-1, -2), (0, -1), (0, 0)], weights=[2, 1] ), dict( indices=[(-1, -1), (0, 0)], weights=[2] ), dict( indices=[(-2, -1), (-1, 0), (0, 0)], weights=[2, 1] ), dict( indices=[(-3, -1), (-2, 0), (-1, 0), (0, 0)], weights=[2, 1, 1] ) ] normalize_guide = "N+M" def __init__(self): super().__init__()
[docs]class SymmetricP1(BasePattern): label = "symmetricP1" pattern_info = [ dict( indices=[(-2, -1), (-1, 0), (0, 0)], weights=[2, 1] ), dict( indices=[(-1, -1), (0, 0)], weights=[2] ), dict( indices=[(-1, -2), (0, -1), (0, 0)], weights=[2, 1] ) ] normalize_guide = "N+M" def __init__(self): super().__init__()
[docs]class SymmetricP2(BasePattern): label = "symmetricP2" pattern_info = [ dict( indices=[(-3, -2), (-2, -1), (-1, 0), (0, 0)], weights=[2, 2, 1] ), dict( indices=[(-1, -1), (0, 0)], weights=[2] ), dict( indices=[(-2, -3), (-1, -2), (0, -1), (0, 0)], weights=[2, 2, 1] ) ] normalize_guide = "N+M" def __init__(self): super().__init__()
[docs]class Asymmetric(BasePattern): label = "asymmetric" pattern_info = [ dict( indices=[(-1, 0), (0, 0)], weights=[1] ), dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(-1, -2), (0, 0)], weights=[1] ) ] normalize_guide = "N" def __init__(self): super().__init__()
[docs]class AsymmetricP0(BasePattern): label = "asymmetricP0" pattern_info = [ dict( indices=[(0, -1), (0, 0)], weights=[0] ), dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(-1, 0), (0, 0)], weights=[1] ) ] normalize_guide = "N" def __init__(self): super().__init__()
[docs]class AsymmetricP05(BasePattern): label = "asymmetricP05" pattern_info = [ dict( indices=[(-1, -3), (0, -2), (0, -1), (0, 0)], weights=[1/3, 1/3, 1/3] ), dict( indices=[(-1, -2), (0, -1), (0, 0)], weights=[0.5, 0.5] ), dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(-2, -1), (-1, 0), (0, 0)], weights=[1, 1] ), dict( indices=[(-3, -1), (-2, 0), (-1, 0), (0, 0)], weights=[1, 1, 1] ) ] normalize_guide = "N" def __init__(self): super().__init__()
[docs]class AsymmetricP1(BasePattern): label = "asymmetricP1" pattern_info = [ dict( indices=[(-1, -2), (0, -1), (0, 0)], weights=[0.5, 0.5] ), dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(-2, -1), (-1, 0), (0, 0)], weights=[1, 1] ) ] normalize_guide = "N" def __init__(self): super().__init__()
[docs]class AsymmetricP2(BasePattern): label = "asymmetricP2" pattern_info = [ dict( indices=[(-2, -3), (-1, -2), (0, -1), (0, 0)], weights=[2/3, 2/3, 2/3] ), dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(-3, -2), (-2, -1), (-1, 0), (0, 0)], weights=[1, 1, 1] ) ] normalize_guide = "N" def __init__(self): super().__init__()
[docs]class TypeIa(BasePattern): label = "typeIa" pattern_info = [ dict( indices=[(-2, -1), (-1, 0), (0, 0)], weights=[1, 0] ), dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(-1, -2), (0, -1), (0, 0)], weights=[1, 0] ) ] normalize_guide = "none" def __init__(self): super().__init__()
[docs]class TypeIb(BasePattern): label = "typeIb" pattern_info = [ dict( indices=[(-2, -1), (-1, 0), (0, 0)], weights=[1, 1] ), dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(-1, -2), (0, -1), (0, 0)], weights=[1, 1] ), ] normalize_guide = "none" def __init__(self): super().__init__()
[docs]class TypeIc(BasePattern): label = "typeIc" pattern_info = [ dict( indices=[(-2, -1), (-1, 0), (0, 0)], weights=[1, 1] ), dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(-1, -2), (0, -1), (0, 0)], weights=[1, 0] ), ] normalize_guide = "N" def __init__(self): super().__init__()
[docs]class TypeId(BasePattern): label = "typeId" pattern_info = [ dict( indices=[(-2, -1), (-1, 0), (0, 0)], weights=[2, 1] ), dict( indices=[(-1, -1), (0, 0)], weights=[2] ), dict( indices=[(-1, -2), (0, -1), (0, 0)], weights=[2, 1] ), ] normalize_guide = "N+M" def __init__(self): super().__init__()
[docs]class TypeIas(BasePattern): label = "typeIas" pattern_info = [ dict( indices=[(-2, -1), (-1, 0), (0, 0)], weights=[0.5, 0.5] ), dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(-1, -2), (0, -1), (0, 0)], weights=[0.5, 0.5] ), ] normalize_guide = "none" def __init__(self): super().__init__()
[docs]class TypeIbs(BasePattern): label = "typeIbs" pattern_info = [ dict( indices=[(-2, -1), (-1, 0), (0, 0)], weights=[1, 1] ), dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(-1, -2), (0, -1), (0, 0)], weights=[1, 1] ), ] normalize_guide = "none" def __init__(self): super().__init__()
[docs]class TypeIcs(BasePattern): label = "typeIcs" pattern_info = [ dict( indices=[(-2, -1), (-1, 0), (0, 0)], weights=[1, 1] ), dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(-1, -2), (0, -1), (0, 0)], weights=[0.5, 0.5] ), ] normalize_guide = "N" def __init__(self): super().__init__()
[docs]class TypeIds(BasePattern): label = "typeIds" pattern_info = [ dict( indices=[(-2, -1), (-1, 0), (0, 0)], weights=[1.5, 1.5] ), dict( indices=[(-1, -1), (0, 0)], weights=[2] ), dict( indices=[(-1, -2), (0, -1), (0, 0)], weights=[1.5, 1.5] ), ] normalize_guide = "N+M" def __init__(self): super().__init__()
[docs]class TypeIIa(BasePattern): label = "typeIIa" pattern_info = [ dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(-1, -2), (0, 0)], weights=[1] ), dict( indices=[(-2, -1), (0, 0)], weights=[1] ), ] normalize_guide = "none" def __init__(self): super().__init__()
[docs]class TypeIIb(BasePattern): label = "typeIIb" pattern_info = [ dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(-1, -2), (0, 0)], weights=[2] ), dict( indices=[(-2, -1), (0, 0)], weights=[2] ), ] normalize_guide = "none" def __init__(self): super().__init__()
[docs]class TypeIIc(BasePattern): label = "typeIIc" pattern_info = [ dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(-1, -2), (0, 0)], weights=[1] ), dict( indices=[(-2, -1), (0, 0)], weights=[2] ), ] normalize_guide = "none" def __init__(self): super().__init__()
[docs]class TypeIId(BasePattern): label = "typeIId" pattern_info = [ dict( indices=[(-1, -1), (0, 0)], weights=[2] ), dict( indices=[(-1, -2), (0, 0)], weights=[3] ), dict( indices=[(-2, -1), (0, 0)], weights=[3] ), ] normalize_guide = "N+M" def __init__(self): super().__init__()
[docs]class TypeIIIc(BasePattern): label = "typeIIIc" pattern_info = [ dict( indices=[(-1, -2), (0, 0)], weights=[1] ), dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(-2, -1), (-1, 0), (0, 0)], weights=[1, 1] ), dict( indices=[(-2, -2), (-1, 0), (0, 0)], weights=[1, 1] ), ] normalize_guide = "N" def __init__(self): super().__init__()
[docs]class TypeIVc(BasePattern): label = "typeIVc" pattern_info = [ dict( indices=[(-1, -1), (0, 0)], weights=[1] ), dict( indices=[(-1, -2), (0, 0)], weights=[1] ), dict( indices=[(-1, -3), (0, 0)], weights=[1] ), dict( indices=[(-2, -1), (-1, 0), (0, 0)], weights=[1, 1] ), dict( indices=[(-2, -2), (-1, 0), (0, 0)], weights=[1, 1] ), dict( indices=[(-2, -3), (-1, 0), (0, 0)], weights=[1, 1] ), dict( indices=[(-3, -1), (-2, 0), (-1, 0), (0, 0)], weights=[1, 1, 1] ), dict( indices=[(-3, -2), (-2, 0), (-1, 0), (0, 0)], weights=[1, 1, 1] ), dict( indices=[(-3, -3), (-2, 0), (-1, 0), (0, 0)], weights=[1, 1, 1] ), ] normalize_guide = "N" def __init__(self): super().__init__()
[docs]class Mori2006(BasePattern): label = "mori2006" pattern_info = [ dict( indices=[(-2, -1), (-1, 0), (0, 0)], weights=[2, 1] ), dict( indices=[(-1, -1), (0, 0)], weights=[3] ), dict( indices=[(-1, -2), (0, -1), (0, 0)], weights=[3, 3] ), ] normalize_guide = "M" def __init__(self): super().__init__()
[docs]class Unitary(BasePattern): label = "unitary" pattern_info = [ dict( indices=[(-1, -1), (0, 0)], weights=[1] ), ] normalize_guide = "N" def __init__(self): super().__init__()
[docs]class UserStepPattern(BasePattern): label = "user defined step pattern"
[docs] def __init__(self, pattern_info, normalize_guide): """User defined step pattern. Parameters ---------- pattern_info : list list contains pattern information. example: the case of symmetric2 pattern: pattern_info = [ dict( indices=[(-1,0),(0,0)], weights=[1] ), dict( indices=[(-1,-1),(0,0)], weights=[2] ), dict( indices=[(0,-1),(0,0)], weights=[1] ) ] normalize_guide : string ('N','M','N+M','none') Guide to compute normalized distance. """ # validation if normalize_guide not in ("N", "M", "N+M", "none"): raise ValueError("normalize_guide argument must be \ one of followings: 'N','M','N+M','none'") self.pattern_info = pattern_info self.normalize_guide = normalize_guide # number of patterns self.num_pattern = len(self.pattern_info) # max length of pattern self.max_pattern_len = max([len(pi["indices"]) for pi in self.pattern_info]) self._get_array()