Source code for ktk.cycles

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020-2025 Félix Chénier

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Identify cycles and time-normalize data."""
__author__ = "Félix Chénier"
__copyright__ = "Copyright (C) 2020-2025 Félix Chénier"
__email__ = "chenier.felix@uqam.ca"
__license__ = "Apache 2.0"


import numpy as np
from typing import cast
from kineticstoolkit.timeseries import TimeSeries, TimeSeriesEvent
from kineticstoolkit.exceptions import TimeSeriesEventNotFoundError
from tqdm import tqdm
from kineticstoolkit.typing_ import ArrayLike, check_param


def __dir__():
    return [
        "detect_cycles",
        "time_normalize",
        "stack",
        "unstack",
        "most_repeatable_cycles",
    ]


[docs] def detect_cycles( ts: TimeSeries, data_key: str, *, event_names: tuple[str, str] = ("phase1", "phase2"), thresholds: tuple[float, float] = (0.0, 1.0), directions: tuple[str, str] = ("rising", "falling"), min_durations: tuple[float, float] = (0.0, 0.0), max_durations: tuple[float, float] = (np.inf, np.inf), min_peak_heights: tuple[float, float] = (-np.inf, -np.inf), max_peak_heights: tuple[float, float] = (np.inf, np.inf), ) -> TimeSeries: """ Detect cycles in a TimeSeries based on a dual threshold approach. This function detects biphasic cycles and identifies the transitions as new events in the output TimeSeries. These new events are named: - event_names[0]: corresponds to the start of phase 1 - event_names[1]: corresponds to the start of phase 2 - "_": corresponds to the end of the cycle. Parameters ---------- ts TimeSeries to analyze. data_key Name of the data key to analyze in the TimeSeries. This data must be unidimensional. event_names Optional. Event names to add in the output TimeSeries. Default is ("phase1", "phase2"). thresholds Optional. Values to cross to register phase changes. Default is [0., 1.]. directions Optional. Directions to cross thresholds to register phase changes. Either ("rising", "falling") or ("falling", "rising"). Default is ("rising", "falling"). min_durations Optional. Minimal phase durations in seconds. Default is (0.0, 0.0). max_durations Optional. Maximal phase durations in seconds. Default is (np.inf, np.inf) min_peak_heights Optional. Minimal peak values to be reached in both phases. Default is (-np.inf, -np.inf). max_peak_heights Optional. Maximal peak values to be reached in both phases. Default is (np.inf, np.inf). Returns ------- TimeSeries A copy of `ts` with the events added. """ check_param("ts", ts, TimeSeries) check_param("data_key", data_key, str) event_names = cast(tuple[str, str], tuple(event_names)) check_param( "event_names", event_names, tuple, length=2, contents_type=str, ) thresholds = cast(tuple[float, float], tuple(thresholds)) check_param( "thresholds", thresholds, tuple, length=2, contents_type=float, ) directions = cast(tuple[str, str], tuple(directions)) check_param( "directions", directions, tuple, length=2, contents_type=str, ) min_durations = cast(tuple[float, float], tuple(min_durations)) check_param( "min_durations", min_durations, tuple, length=2, contents_type=float, ) max_durations = cast(tuple[float, float], tuple(max_durations)) check_param( "max_durations", max_durations, tuple, length=2, contents_type=float, ) min_peak_heights = cast(tuple[float, float], tuple(min_peak_heights)) check_param( "min_peak_heights", min_peak_heights, tuple, length=2, contents_type=float, ) max_peak_heights = cast(tuple[float, float], tuple(max_peak_heights)) check_param( "max_peak_heights", max_peak_heights, tuple, length=2, contents_type=float, ) # lowercase directions directions = (directions[0].lower(), directions[1].lower()) if directions[0] != "rising" and directions[0] != "falling": raise ValueError("directions[0] must be 'rising' or 'falling'") # Find the pushes time = ts.time data = ts.data[data_key] events = [] is_phase1 = True for i in tqdm(range(time.shape[0]), desc="Detecting cycles", delay=1): if directions[0] == "rising": crossing1 = data[i] >= thresholds[0] crossing2 = data[i] <= thresholds[1] else: crossing1 = data[i] <= thresholds[0] crossing2 = data[i] >= thresholds[1] if is_phase1 and crossing1: is_phase1 = False events.append(TimeSeriesEvent(time[i], event_names[0])) elif (not is_phase1) and crossing2: is_phase1 = True events.append(TimeSeriesEvent(time[i], event_names[1])) # Ensure that we start with event_name1 and that it's not on time0 while (events[0].name != event_names[0]) or (events[0].time == time[0]): events = events[1:] # Remove cycles where criteria are not reached. valid_events = [] for i_event in tqdm( range(0, len(events) - 1, 2), desc="Removing cycles that do not match the specified criteria", delay=1, ): time1 = events[i_event].time time2 = events[i_event + 1].time try: time3 = events[i_event + 2].time except IndexError: time3 = np.inf sub_ts1 = ts.get_ts_between_times(time1, time2, inclusive=True) sub_ts2 = ts.get_ts_between_times(time1, time3, inclusive=True) if directions[0] == "rising": the_peak1 = np.max(sub_ts1.data[data_key]) the_peak2 = np.min(sub_ts2.data[data_key]) else: the_peak1 = np.min(sub_ts1.data[data_key]) the_peak2 = np.max(sub_ts2.data[data_key]) if ( time2 - time1 >= min_durations[0] and time2 - time1 <= max_durations[0] and time3 - time2 >= min_durations[1] and time3 - time2 <= max_durations[1] and the_peak1 >= min_peak_heights[0] and the_peak1 <= max_peak_heights[0] and the_peak2 >= min_peak_heights[1] and the_peak2 <= max_peak_heights[1] ): # Save it. valid_events.append(events[i_event]) valid_events.append(events[i_event + 1]) if not np.isinf(time3): valid_events.append(TimeSeriesEvent(time3, "_")) # Form the output timeseries tsout = ts.copy() for event in valid_events: tsout = tsout.add_event(event.time, event.name) return tsout
[docs] def time_normalize( ts: TimeSeries, event_name1: str, event_name2: str, *, n_points: int = 100, span: list[int] | None = None, ) -> TimeSeries: """ Time-normalize cycles in a TimeSeries. This method time-normalizes the TimeSeries at each cycle defined by event_name1 and event_name2 on n_points. The time-normalized cycles are put end to end. For example, for a TimeSeries that contains three cycles, a time normalization with 100 points will give a TimeSeries of length 300. The TimeSeries' events are also time-normalized, including event_name1 but with event_name2 renamed as "_". Parameters ---------- ts The TimeSeries to analyze. event_name1 The event name that corresponds to the beginning of a cycle. event_name2 The event name that corresponds to the end of a cycle. n_points Optional. The number of points of the output TimeSeries. span Optional. Specifies which normalized points to include in the output TimeSeries. See note below. Returns ------- TimeSeries A new TimeSeries where each cycle has been time-normalized. Warning ------- The span argument is experimental and was introduced in version 0.4. **The following behavior may change in the future**. Don't rely on it in long-term scripts for now. You can use it to define which normalized points to include in the output TimeSeries. For example, to normalize in percents and to include only data from 10 to 90% of each cycle, assign 100 to n_points and [10, 90] to span. The resulting TimeSeries will then be expressed in percents and wrap each 80 points. It is also possible to include pre-cycle or post-cycle data. For example, to normalize in percents and to include 20% pre-cycle and 15% post-cycle, assign 100 to n_points and [-20, 15] to span. The resulting TimeSeries will then wrap each 135 points with the cycles starting at 20, 155, etc. and ending at 119, 254, etc. For each cycle, events outside the 0-100% spans are ignored. """ check_param("ts", ts, TimeSeries) check_param("event_name1", event_name1, str) check_param("event_name2", event_name2, str) check_param("n_points", n_points, int) if span is None: span = [0, n_points] else: span = list(span) check_param("span", span, list, length=2, contents_type=int) if len(ts.events) < 2: raise ValueError("This TimeSeries does not have events.") if ts.count_events(event_name1) == 0: raise ValueError( f"No occurrence of event `{event_name1}` was found in this " "TimeSeries." ) if ts.count_events(event_name2) == 0: raise ValueError( f"No occurrence of event `{event_name2}` was found in this " "TimeSeries." ) # Initialize the destination TimeSeries dest_ts = ts.copy() dest_ts.events = [] if n_points == 100: dest_ts.add_info("Time", "Unit", "%", overwrite=True, in_place=True) else: dest_ts.add_info( "Time", "Unit", f"1/{n_points}", overwrite=True, in_place=True ) dest_data = {} # type: dict[str, list[np.ndarray]] dest_data_shape = {} # type: dict[str, tuple[int, ...]] # Go through all cycles i_cycle = 0 break_now = False while True: # Get the begin time for this cycle try: event_index = ts._get_event_index(event_name1, i_cycle) except TimeSeriesEventNotFoundError: break_now = True else: begin_time = ts.events[event_index].time if break_now: break # Get the end time for this cycle end_cycle = 0 end_time = ts.events[ts._get_event_index(event_name2, end_cycle)].time while end_time <= begin_time: end_cycle += 1 try: end_time = ts.events[ ts._get_event_index(event_name2, end_cycle) ].time except TimeSeriesEventNotFoundError: break_now = True break if break_now: break # Get the extended begin and end times considering relative_span extended_begin_time = begin_time + span[0] / n_points * ( end_time - begin_time ) extended_end_time = begin_time + span[1] / n_points * ( end_time - begin_time ) # Extract this cycle subts = ts.get_ts_between_times( extended_begin_time, extended_end_time, inclusive=True ) if subts.time.shape[0] == 0: raise ValueError("") # Resample this cycle on span + 1 point # (and remove the last point after) subts = subts.resample( np.linspace( extended_begin_time, extended_end_time, span[1] - span[0] + 1, ), extrapolate=True, ) # Keep only the first points (the last one belongs to the next cycle) subts = subts.get_ts_between_indexes( 0, span[1] - span[0] - 1, inclusive=True ) # Keep only the events in the unextended span events = [] for event in subts.events: if event.time >= begin_time and event.time < end_time: events.append(event) subts.events = events # Separate start/end events from the other start_end_events = [] other_events = [] for event in subts.events: if event.name == event_name1 or event.name == event_name2: start_end_events.append(event) else: other_events.append(event) # Add event_name1 at the beginning and end (duplicates will be # cancelled at the end) dest_ts = dest_ts.add_event( float(-span[0] + i_cycle * (span[1] - span[0])), event_name1 ) dest_ts = dest_ts.add_event( float(-span[0] + n_points + i_cycle * (span[1] - span[0])), "_" ) # Add the other events def time_to_normalized_time(time): """Resample the events times.""" return (time - extended_begin_time) / ( extended_end_time - extended_begin_time ) * (span[1] - span[0]) + i_cycle * (span[1] - span[0]) for i_event, event in enumerate(other_events): # Resample new_time = time_to_normalized_time(event.time) dest_ts = dest_ts.add_event(new_time, event.name) # Add this cycle to dest_time and dest_data for key in subts.data: if key not in dest_data: dest_data[key] = [] dest_data_shape[key] = ts.data[key].shape dest_data[key].append(subts.data[key]) i_cycle += 1 n_cycles = i_cycle # Put back dest_time and dest_data in dest_ts dest_ts.time = 1.0 * np.arange(n_cycles * (span[1] - span[0])) for key in ts.data: # Stack the data into a [cycle, percent, values] shape temp = np.array(dest_data[key]) # Reshape to put all cycles end to end new_shape = list(dest_data_shape[key]) new_shape[0] = n_cycles * (span[1] - span[0]) dest_ts.data[key] = np.reshape(temp, new_shape) return dest_ts
[docs] def stack(ts: TimeSeries, *, n_points: int = 100) -> dict[str, np.ndarray]: """ Stack time-normalized TimeSeries data into a dict of arrays. This method returns the data of a time-normalized TimeSeries as a dict where each key corresponds to a TimeSeries data key, and contains a numpy array where the first dimension is the cycle, the second dimension is the percentage of the cycle, and the other dimensions are the data itself. Parameters ---------- ts The time-normalized TimeSeries. n_points Optional. The number of points the TimeSeries has been time-normalized on. Returns ------- dict[str, np.ndarray] See Also -------- ktk.cycles.unstack """ check_param("ts", ts, TimeSeries) check_param("n_points", n_points, int) if np.mod(len(ts.time), n_points) != 0: raise ( ValueError("It seems that this TimeSeries is not time-normalized.") ) data = dict() for key in ts.data.keys(): current_shape = ts.data[key].shape new_shape = [-1, n_points] for i in range(1, len(current_shape)): new_shape.append(ts.data[key].shape[i]) data[key] = ts.data[key].reshape(new_shape, order="C") return data
[docs] def unstack(data: dict[str, np.ndarray], /) -> TimeSeries: """ Unstack time-normalized data from a dict of arrays to a TimeSeries. This method creates a time-normalized TimeSeries by putting each cycle from the provided data dictionary end to end. Parameters ---------- data A dict where each key contains a numpy array where the first dimension is the cycle, the second dimension is the percentage of the cycle, and the other dimensions are the data itself. Returns ------- TimeSeries See Also -------- ktk.cycles.stack """ check_param("data", data, dict, key_type=str, contents_type=np.ndarray) ts = TimeSeries() for key in data.keys(): current_data = np.array(data[key]) current_shape = current_data.shape n_cycles = current_shape[0] n_points = current_shape[1] ts.data[key] = current_data.reshape([n_cycles * n_points], order="C") ts.time = np.arange(n_cycles * n_points) if n_points == 100: ts.add_info("Time", "Unit", "%", overwrite=True, in_place=True) else: ts.add_info( "Time", "Unit", f"1/{n_points}", overwrite=True, in_place=True ) return ts
# The stack_events function is working but commented for now, since I could not # figure an obvious, undiscutable way to represent its output (use lists, # TimeSeriesEvents, numpy arrays?). It's also unclear for me how to integrate # with the standard stack function and its unstack counterpart. # # def stack_events( # ts: TimeSeries, /, # n_points: int = 100) -> dict[str, np.ndarray]: # """ # Stack time-normalized TimeSeries' events into a dict of arrays. # This methods returns the a dictionary where each key corresponds to an # event name, and contains a 2d numpy array that contains the event's # normalized time, with the first dimension being the cycle and the second # dimension being the occurrence of this event during this cycle. # Warning # ------- # This function is currently experimental and may change signature and # behaviour in the future. # Parameters # ---------- # ts # The time-normalized TimeSeries. # n_points # Optional. The number of points the TimeSeries has been # time-normalized on. # Returns # ------- # dict[str, np.ndarray] # Example # ------- # >>> import kineticstoolkit.lab as ktk # >>> # Create a TimeSeries with different time-normalized events # >>> ts = ktk.TimeSeries(time=np.arange(400)) # 4 cycles of 100% # >>> ts = ts.add_event(9, "event1") # event1 at 9% of cycle 0 # >>> ts = ts.add_event(110, "event1") # event1 at 10% of cycle 1 # >>> ts = ts.add_event(312, "event1") # event1 at 12% of cycle 3 # >>> ts = ts.add_event(382, "event1") # 2nd occ. event1 at 82% of cycle 3 # >>> # Stack these events # >>> events = ktk.cycles.stack_events(ts) # >>> events["event1"] # [[9.0], [10.0], [], [12.0, 82.0]] # """ # ts = ts.copy() # n_cycles = int(ts.time.shape[0] / n_points) # out = {} # type: dict[str, np.ndarray] # # Init # for event in ts.events: # if event.name not in out: # out[event.name] = [[] for i in range(n_cycles)] # for event in ts.events: # event_cycle = int(event.time / n_points) # out[event.name][event_cycle].append(np.mod(event.time, n_points)) # return out
[docs] def most_repeatable_cycles(data: ArrayLike, /) -> list[int]: """ Get the indexes of the most repeatable cycles in an array. This function returns an ordered list of the most repeatable to the least repeatable cycles. It works by recursively discarding the cycle that maximizes the root-mean-square error between the cycle and the average of every remaining cycle, until there are only two cycles remaining. The function returns a list that is the reverse order of cycle removal: first the two last cycles, then the last-removed cycle, and so on. If two cycles are equivalently repeatable, they are returned in order of appearance. Cycles that include at least one NaN are excluded. Parameters ---------- data Stacked time-normalized data to analyze, in the shape (n_cycles, n_points). Returns ------- list[int] List of indexes corresponding to the cycles in most to least repeatable order. Example ------- >>> import kineticstoolkit.lab as ktk >>> import numpy as np >>> # Create a data sample with four different cycles, the most different >>> # being cycle 2 (cos instead of sin), then cycle 0. >>> x = np.arange(0, 10, 0.1) >>> data = np.array([np.sin(x), \ np.sin(x) + 0.14, \ np.cos(x) + 0.14, \ np.sin(x) + 0.15]) >>> ktk.cycles.most_repeatable_cycles(data) [1, 3, 0, 2] """ data = np.array(data) check_param("data", data, np.ndarray, ndims=2) n_cycles = data.shape[0] out_cycles = [] # type: list[int] done_cycles = [] # type: list[int] # Like out_cycles but includes NaNs # Exclude cycles with nans: put nans for all data of this cycle for i_cycle in range(n_cycles - 1, -1, -1): if np.isnan(np.sum(data[i_cycle])): data[i_cycle] = np.nan done_cycles.append(i_cycle) # Iteratively remove the cycle that is the most different from the # mean of the remaining cycles. while len(done_cycles) < n_cycles - 2: current_mean_cycle = np.nanmean(data, axis=0) rms = np.zeros(n_cycles) for i_curve in range(n_cycles - 1, -1, -1): rms[i_curve] = np.sqrt( np.mean(np.sum((data[i_curve] - current_mean_cycle) ** 2)) ) i_cycle = int(np.nanargmax(rms)) out_cycles.append(i_cycle) done_cycles.append(i_cycle) data[i_cycle] = np.nan # Find the two remaining cycles set_all = set(range(n_cycles)) set_done = set(done_cycles) remain = sorted(list(set_all - set_done)) if len(remain) > 1: out_cycles.append(remain[1]) if len(remain) > 0: out_cycles.append(remain[0]) return out_cycles[-1::-1]
if __name__ == "__main__": import doctest doctest.testmod(optionflags=doctest.NORMALIZE_WHITESPACE)