Source code for kineticstoolkit.timeseries

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020-2024 Félix Chénier

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Provide the TimeSeries and TimeSeriesEvent classes.

The classes defined in this module are accessible directly from the toplevel
Kinetics Toolkit's namespace (i.e. ktk.TimeSeries, ktk.TimeSeriesEvent)

"""
from __future__ import annotations  # For forward refs to self


__author__ = "Félix Chénier"
__copyright__ = "Copyright (C) 2020-2024 Félix Chénier"
__email__ = "chenier.felix@uqam.ca"
__license__ = "Apache 2.0"


import kineticstoolkit._repr
from kineticstoolkit.decorators import deprecated
from kineticstoolkit.exceptions import (
    TimeSeriesRangeError,
    TimeSeriesEventNotFoundError,
    TimeSeriesMergeConflictError,
)
from kineticstoolkit.tools import check_interactive_backend

import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import scipy as sp
import pandas as pd
import limitedinteraction as li
from dataclasses import dataclass
from kineticstoolkit.typing_ import ArrayLike, check_param
from typing import Any, cast
from numbers import Real

import warnings
from ast import literal_eval
from copy import deepcopy

import kineticstoolkit as ktk  # For doctests


WINDOW_PLACEMENT = {"top": 50, "right": 0}


class TimeSeriesEventList(list):
    """Event list that ensures every element is a TimeSeriesEvent."""

    def __init__(self, source: list = []):
        """Initialize the class instance using a source list."""
        check_param("source", source, list)
        for element in source:
            self.append(element)

    def __setitem__(self, index, value):
        """Cast the value to a TimeSeriesEvent."""
        check_param("index", index, int)
        try:
            event = TimeSeriesEvent(time=value.time, name=value.name)
        except AttributeError:
            raise AttributeError(
                f"The provided value {value} cannot be interpreted as a "
                "TimeSeriesEvent, because it does not have `time` and `name` "
                "attributes."
            )
        super(TimeSeriesEventList, self).__setitem__(index, event)
        # Sort the events
        self.sort()

    def append(self, value):
        """Ensure the appended value is a TimeSeriesEvent."""
        super(TimeSeriesEventList, self).append(None)
        self[-1] = value  # Calls __setitem__ which does the check

    def extend(self, values):
        """Ensure the extended values are TimeSeriesEvent."""
        for value in values:
            self.append(value)  # Calls append that calls __setitem__ that
            # does the check.


class TimeSeriesDataDict(dict):
    """Data dictionary that checks sizes and converts to NumPy arrays."""

    def __init__(self, source: dict = {}):
        """Initialize the class instance using a source dictionary."""
        check_param("source", source, dict, key_type=str)
        for key in source:
            self[key] = source[key]

    def __setitem__(self, key, value):
        """Cast the added data as a NumPy array."""
        check_param("key", key, str)
        to_set = np.array(value, copy=True)

        if len(to_set.shape) == 0:
            raise AttributeError(
                "Data must be an array. However, a value of "
                f"{value} was provided."
            )

        super(TimeSeriesDataDict, self).__setitem__(key, to_set)


[docs] @dataclass class TimeSeriesEvent: """ Define an event in a timeseries. This class is rarely used by itself, it is easier to use `TimeSeries`' methods to manage events. Attributes ---------- time : float Event time. name : str Event name. Does not need to be unique. Example ------- >>> event = ktk.TimeSeriesEvent(time=1.5, name="event_name") >>> event TimeSeriesEvent(time=1.5, name='event_name') """ time: float = 0.0 name: str = "event" def __lt__(self, other): """Define < operator.""" return self.time < other.time def __le__(self, other): """Define <= operator.""" return self.time <= other.time def __gt__(self, other): """Define > operator.""" return self.time > other.time def __ge__(self, other): """Define >= operator.""" return self.time >= other.time def _to_tuple(self) -> tuple[float, str]: """ Convert a TimeSeriesEvent to a tuple. Example ------- >>> event = ktk.TimeSeriesEvent(time=1.5, name="event_name") >>> event._to_tuple() (1.5, 'event_name') """ return (self.time, self.name) def _to_list(self) -> list[float | str]: """ Convert a TimeSeriesEvent to a list. Example ------- >>> event = ktk.TimeSeriesEvent(time=1.5, name="event_name") >>> event._to_list() [1.5, 'event_name'] """ return [self.time, self.name] def _to_dict(self) -> dict[str, float | str]: """ Convert a TimeSeriesEvent to a dict. Example ------- >>> event = ktk.TimeSeriesEvent(time=1.5, name="event_name") >>> event._to_dict() {'Time': 1.5, 'Name': 'event_name'} """ return {"Time": self.time, "Name": self.name}
[docs] class TimeSeries: """ A class that holds time, data series, events and metadata. Attributes ---------- time : np.ndarray Time attribute as 1-dimension np.array. data : dict[str, np.ndarray] Contains the data, where each element contains a np.array which first dimension corresponds to time. time_info : dict[str, Any] Contains metadata relative to time. The default is {"Unit": "s"} data_info : dict[str, dict[str, Any]] Contains facultative metadata relative to data. For example, the data_info attribute could indicate the unit of data["Forces"]:: data["Forces"] = {"Unit": "N"} To facilitate the management of data_info, please use `ktk.TimeSeries.add_data_info`. events : list[TimeSeriesEvent] list of events. Examples -------- A TimeSeries can be constructed from another TimeSeries, a Pandas DataFrame or any array with at least one dimension. 1. Creating an empty TimeSeries: >>> ktk.TimeSeries() TimeSeries with attributes: time: array([], dtype=float64) data: {} time_info: {'Unit': 's'} data_info: {} events: [] 2. Creating a TimeSeries and setting time and data: >>> ktk.TimeSeries(time=np.arange(0, 10), data={"test":np.arange(0, 10)}) TimeSeries with attributes: time: array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) data: {'test': array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])} time_info: {'Unit': 's'} data_info: {} events: [] 3. Creating a TimeSeries as a copy of another TimeSeries: >>> ts1 = ktk.TimeSeries(time=np.arange(0, 10), data={"test":np.arange(0, 10)}) >>> ts2 = ktk.TimeSeries(ts1) >>> ts2 TimeSeries with attributes: time: array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) data: {'test': array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])} time_info: {'Unit': 's'} data_info: {} events: [] See Also: TimeSeries.copy 4. Creating a TimeSeries from a Pandas DataFrame: >>> df = pd.DataFrame() >>> df.index = [0., 0.1, 0.2, 0.3, 0.4] # Time in seconds >>> df["x"] = [0., 1., 2., 3., 4.] >>> df["y"] = [5., 6., 7., 8., 9.] >>> df["z"] = [0., 0., 0., 0., 0.] >>> df x y z 0.0 0.0 5.0 0.0 0.1 1.0 6.0 0.0 0.2 2.0 7.0 0.0 0.3 3.0 8.0 0.0 0.4 4.0 9.0 0.0 >>> ts = ktk.TimeSeries(df) >>> ts TimeSeries with attributes: time: array([0. , 0.1, 0.2, 0.3, 0.4]) data: <dict with 3 entries> time_info: {'Unit': 's'} data_info: {} events: [] >>> ts.data {'x': array([0., 1., 2., 3., 4.]), 'y': array([5., 6., 7., 8., 9.]), 'z': array([0., 0., 0., 0., 0.])} See Also: TimeSeries.from_dataframe 5. Creating a multidimensional TimeSeries from a Pandas DataFrame (using brackets in column names): >>> df = pd.DataFrame() >>> df.index = [0., 0.1, 0.2, 0.3, 0.4] # Time in seconds >>> df["point[0]"] = [0., 1., 2., 3., 4.] >>> df["point[1]"] = [5., 6., 7., 8., 9.] >>> df["point[2]"] = [0., 0., 0., 0., 0.] >>> df point[0] point[1] point[2] 0.0 0.0 5.0 0.0 0.1 1.0 6.0 0.0 0.2 2.0 7.0 0.0 0.3 3.0 8.0 0.0 0.4 4.0 9.0 0.0 >>> ts = ktk.TimeSeries(df) >>> ts.data {'point': array([[0., 5., 0.], [1., 6., 0.], [2., 7., 0.], [3., 8., 0.], [4., 9., 0.]])} See Also: TimeSeries.from_dataframe 6. Creating a multidimensional TimeSeries of higher order from a Pandas DataFrame (using brackets and commas in column names): >>> df = pd.DataFrame() >>> df.index = [0., 0.1, 0.2, 0.3, 0.4] # Time in seconds >>> df["rot[0,0]"] = np.cos([0., 0.1, 0.2, 0.3, 0.4]) >>> df["rot[0,1]"] = -np.sin([0., 0.1, 0.2, 0.3, 0.4]) >>> df["rot[1,0]"] = np.sin([0., 0.1, 0.2, 0.3, 0.4]) >>> df["rot[1,1]"] = np.cos([0., 0.1, 0.2, 0.3, 0.4]) >>> df["trans[0]"] = [0., 0.1, 0.2, 0.3, 0.4] >>> df["trans[1]"] = [5., 6., 7., 8., 9.] >>> df rot[0,0] rot[0,1] rot[1,0] rot[1,1] trans[0] trans[1] 0.0 1.000000 -0.000000 0.000000 1.000000 0.0 5.0 0.1 0.995004 -0.099833 0.099833 0.995004 0.1 6.0 0.2 0.980067 -0.198669 0.198669 0.980067 0.2 7.0 0.3 0.955336 -0.295520 0.295520 0.955336 0.3 8.0 0.4 0.921061 -0.389418 0.389418 0.921061 0.4 9.0 >>> ts = ktk.TimeSeries(df) >>> ts.data {'rot': array([[[ 1. , -0. ], [ 0. , 1. ]], <BLANKLINE> [[ 0.99500417, -0.09983342], [ 0.09983342, 0.99500417]], <BLANKLINE> [[ 0.98006658, -0.19866933], [ 0.19866933, 0.98006658]], <BLANKLINE> [[ 0.95533649, -0.29552021], [ 0.29552021, 0.95533649]], <BLANKLINE> [[ 0.92106099, -0.38941834], [ 0.38941834, 0.92106099]]]), 'trans': array([[0. , 5. ], [0.1, 6. ], [0.2, 7. ], [0.3, 8. ], [0.4, 9. ]])} See Also: TimeSeries.from_dataframe 7. Creating a TimeSeries from any array (results in a TimeSeries with a single data key named "data" and with a matching time property with a period of 1 second - unless time attribute is also defined): >>> ktk.TimeSeries([0.1, 0.2, 0.3, 0.4, 0.5]) TimeSeries with attributes: time: array([0., 1., 2., 3., 4.]) data: {'data': array([0.1, 0.2, 0.3, 0.4, 0.5])} time_info: {'Unit': 's'} data_info: {} events: [] >>> ktk.TimeSeries([0.1, 0.2, 0.3, 0.4, 0.5], time=[0.1, 0.2, 0.3, 0.4, 0.5]) TimeSeries with attributes: time: array([0.1, 0.2, 0.3, 0.4, 0.5]) data: {'data': array([0.1, 0.2, 0.3, 0.4, 0.5])} time_info: {'Unit': 's'} data_info: {} events: [] See Also: TimeSerise.from_array """ # %% Initialization and properties def __init__( self, src: None | TimeSeries | pd.DataFrame | ArrayLike = None, *, time: ArrayLike = [], time_info: dict[str, Any] = {"Unit": "s"}, data: dict[str, ArrayLike] = {}, data_info: dict[str, dict[str, Any]] = {}, events: list[TimeSeriesEvent] = [], ): # Default constructor if src is None: self.time = time self.data = data self.time_info = time_info.copy() self.data_info = data_info.copy() self.events = events.copy() return # Else, construct based on a source: def _assign_self(src): self.time = src.time self.data = src.data self.time_info = src.time_info.copy() self.data_info = src.data_info.copy() self.events = src.events.copy() # If src is compatible with a TimeSeries, then assign it. try: _assign_self(src) return except AttributeError: pass # It was not a TimeSeries, or something compatible. # From DataFrame if isinstance(src, pd.DataFrame): _assign_self( TimeSeries.from_dataframe( src, time_info=time_info, data_info=data_info, events=events, ) ) return # Else, try as an array _assign_self( TimeSeries.from_array( np.array(src), time=time, time_info=time_info, data_info=data_info, events=events, ) ) # Properties @property def time(self): """Time Property.""" return self._time @time.setter def time(self, value): to_set = np.array(value, copy=True) if len(to_set.shape) != 1: raise AttributeError( "Time must be a unidimensional array. However, a value of " f"{value} was provided." ) self._time = to_set @time.deleter def time(self): raise AttributeError("time property cannot be deleted.") @property def data(self): """Data Property.""" return self._data @data.setter def data(self, value): self._data = TimeSeriesDataDict(value) @data.deleter def data(self): raise AttributeError("data property cannot be deleted.") @property def events(self): """Data Property.""" return self._events @events.setter def events(self, value): self._events = TimeSeriesEventList(value) @events.deleter def events(self): raise AttributeError("events property cannot be deleted.") # %% Dunders @classmethod def __dir__(cls): """Return the directory for the TimeSeries.""" return [ "copy", # Data info management "add_data_info", "remove_data_info", # Data management "get_subset", "merge", "add_data", "rename_data", "remove_data", # Time management "shift", "get_sample_rate", "resample", # Event management "add_event", "rename_event", "remove_event", "count_events", "remove_duplicate_events", "trim_events", # Get index from time "get_index_at_time", "get_index_before_time", "get_index_after_time", # Get index from event "get_index_at_event", "get_index_before_event", "get_index_after_event", # Get TimeSeries from index "get_ts_before_index", "get_ts_after_index", "get_ts_between_indexes", # Get TimeSeries from time "get_ts_before_time", "get_ts_after_time", "get_ts_between_times", # Get TimeSeries from event "get_ts_before_event", "get_ts_after_event", "get_ts_between_events", # Missing data "isnan", "fill_missing_samples", # Interactive and plotting "ui_edit_events", "ui_sync", "plot", # IO "to_dataframe", "from_dataframe", "from_array", ] def __str__(self): """ Print a textual descriptive of the TimeSeries contents. Returns ------- str String that describes the contents of each attribute ot the TimeSeries """ return kineticstoolkit._repr._format_class_attributes( self, overrides={"_time": "time", "_data": "data", "_events": "events"}, ) def __repr__(self): """Generate the class representation.""" return str(self) def __eq__(self, ts): """ Compare two timeseries for equality. Returns ------- True if each attribute of ts is equal to the TimeSeries' attributes. """ return self._is_equivalent(ts) # %% Private check functions def _is_equivalent( self, ts, *, equal: bool = True, atol: float = 1e-8, rtol: float = 1e-5 ): """ Test is two TimeSeries are equal or equivalent. Parameters ---------- ts The TimeSeries to compare to. equal Optional. True to test for complete equality, False to compare withint a given tolerance. atol Optional. Absolute tolerance if using equal=False. rtol Optional. Relative tolerance if using equal=False. Returns ------- bool True if the TimeSeries are equivalent. """ if equal: atol = 0 rtol = 0 def compare(var1, var2, atol, rtol): if var1.size == 0 and var2.size == 0: return np.equal(var1.shape, var2.shape) elif var1.size == 0 and var2.size != 0: return False elif var1.size != 0 and var2.size == 0: return False else: return np.allclose( var1, var2, atol=atol, rtol=rtol, equal_nan=True ) try: ts._check_well_typed() except AttributeError: print("The variable begin compared is not a TimeSeries.") if not compare(self.time, ts.time, atol=atol, rtol=rtol): print("Time is not equal") return False for data in [self.data, ts.data]: for one_data in data: try: if not compare( self.data[one_data], ts.data[one_data], atol=atol, rtol=rtol, ): print(f"{one_data} is not equal") return False except KeyError: print(f"{one_data} is missing in one of the TimeSeries") return False except ValueError: print( f"{one_data} does not have the same size in both " "TimeSeries" ) return False if self.time_info != ts.time_info: print("time_info is not equal") return False if self.data_info != ts.data_info: print("data_info is not equal") return False if self.events != ts.events: print("events is not equal") return False return True def _check_well_typed(self) -> None: """ Check that every element of every attribute has correct type. This is the most basic check: Every component of a TimeSeries must be of the correct type at each step of a code. Therefore, any other check* starts by calling this function. This is not a performance hit because apart from interactive functions (which are not affected by test overhead), the check functions are run in case of error only, to help the users in fixing their code. *except _check_increasing_time, which is run in proprocessing and not only in failures. Raises ------ AttributeError If the TimeSeries' miss some attributes. TypeError If the TimeSeries' attributes are of wrong type. """ # Ensure that the TimeSeries has all its attributes try: self.time except AttributeError: raise AttributeError( "This TimeSeries does not have a time attribute anymore." ) try: self.data except AttributeError: raise AttributeError( "This TimeSeries does not have a data attribute anymore." ) try: self.events except AttributeError: raise AttributeError( "This TimeSeries does not have a events attribute anymore." ) try: self.time_info except AttributeError: raise AttributeError( "This TimeSeries does not have a time_info attribute anymore." ) try: self.data_info except AttributeError: raise AttributeError( "This TimeSeries does not have a data_info attribute anymore." ) # Ensure that time is a numpy array of dimension 1. if not isinstance(self.time, np.ndarray): raise TypeError( "A TimeSeries' time attribute must be a numpy array. " f"However, the current time type is {type(self.time)}." ) if not np.all(~np.isnan(self.time)): raise TypeError( "A TimeSeries' time attribute must not contain nans. " f"However, a total of {np.sum(~np.isnan(self.time.shape))} " f"nans were found among the {self.time.shape[0]} samples of " "the TimeSeries." ) if not np.array_equal(np.unique(self.time), np.sort(self.time)): raise TypeError( "A TimeSeries' time attribute must not contain duplicates. " f"However, while the TimeSeries has {len(self.time)} samples, " f"only {len(np.unique(self.time))} are unique." ) # Ensure that the data attribute is a dict if not isinstance(self.data, dict): raise TypeError( "The TimeSeries data attribute must be a dict. However, " "this TimeSeries' data attribute is of type " f"{type(self.data)}." ) # Ensure that each data are numpy arrays for key in self.data: data = self.data[key] if not isinstance(data, np.ndarray): raise TypeError( "A TimeSeries' data attribute must contain only numpy " "arrays. However, at least one of the TimeSeries data " f"is not an array: the data named {key} contains a " f"value of type {type(data)}." ) # Ensure that events is a list of TimeSeriesEvent if not isinstance(self.events, list): raise TypeError( "The TimeSeries' events attribute must be a list. " "However, this TimeSeries' events attribute is of type " f"{type(self.events)}." ) # Ensure that all events are an instance of TimeSeriesEvent for i_event, event in enumerate(self.events): if not isinstance(event, TimeSeriesEvent): raise TypeError( "The TimeSeries' events attribute must be a list of " "TimeSeriesEvent. However, at least one element of this " f"list is not: element {i_event} is " f"of type {type(event)}." ) # Ensure that TimeInfo is a dict if not isinstance(self.time_info, dict): raise TypeError( "The TimeSeries' time_info attribute must be a dict. " "However, this TimeSeries' time_info attribute is of type " f"{type(self.time_info)}." ) # Ensure that DataInfo is a dict if not isinstance(self.data_info, dict): raise TypeError( "The TimeSeries' data_info attribute must be a dict. " "However, this TimeSeries' data_info attribute is of type " f"{type(self.data_info)}." ) # Ensure that every element of DataInfo is a dict for key in self.data_info: if not isinstance(self.data_info[key], dict): raise TypeError( "Each element of a TimeSeries' data_info attribute must " f"be a dict. However, the element '{key}' of this " "TimeSeries' data_info attribute is of type " f"{type(self.data_info[key])}." ) def _check_well_shaped(self) -> None: """ Check that the TimeSeries' time and data shapes concord. Raises ------ ValueError If the TimeSeries' time and data do not concord in shape. """ self._check_well_typed() if len(self.time.shape) != 1: raise TypeError( "A TimeSeries' time attribute must be a numpy array of " "dimension 1. However, the current time shape is " f"{self.time.shape}, which is a dimension of " f"{len(self.time.shape)}." ) for key in self.data: data = self.data[key] # Ensure that it's coherent in shape with time if data.shape[0] != self.time.shape[0]: raise ValueError( "Every data of a TimeSeries must have its first " "dimension corresponding to time. At least one of the " "TimeSeries data has a dimension problem: the data " f"named '{key}' has a shape of {data.shape} while the " f"time's shape is {self.time.shape}." ) def _check_not_empty_time(self) -> None: """ Check that the TimeSeries' time attribute is not empty. Raises ------ ValueError If the TimeSeries time is empty """ if self.time.shape[0] == 0: raise ValueError( "The TimeSeries is empty: the length of its time " "attribute is 0." ) def _check_increasing_time(self) -> None: """ Check that the TimeSeries' time attribute is always increasing. Raises ------ ValueError If the TimeSeries' time is not always increasing. """ if not np.array_equal(self.time, np.sort(self.time)): raise ValueError( "The TimeSeries' time attribute is not always increasing, " "which is required by the requested function. You can " "resample the TimeSeries on an always increasing time attribute " "using ts = ts.resample(np.sort(ts.time))." ) def _check_constant_sample_rate(self) -> None: """ Check that the TimeSeries's sampling rate is constant. Raises ------ ValueError If the TimeSeries's sampling rate is not constant. """ if np.isnan(self.get_sample_rate()): raise ValueError( "The TimeSeries's sample rate is not constant, which is " "required by the requested function. You can resample the " "TimeSeries on a constant sample rate using " "ts = ts.resample(np.linspace(" "np.min(ts.time), np.max(ts.time), len(ts.time)))." ) def _check_not_empty_data(self) -> None: """ Check that the TimeSeries's data dict is not empty. Raises ------ ValueError: If the TimeSeries as no time """ if len(self.data) == 0: raise ValueError( "The TimeSeries is empty: it does not contain any data." ) def _raise_data_key_error(self, data_key) -> None: raise KeyError( f"The key '{data_key}' was not found among the " f"{len(self.data)} key(s) of the TimeSeries' " "data_info attribute." ) def _raise_data_info_key_error(self, data_key, info_key) -> None: raise KeyError( f"The key '{info_key}' was not found among the " f"{len(self.data_info[data_key])} key(s) of the TimeSeries' " f"data_info[{data_key}] attribute." ) # %% Copy
[docs] def copy( self, *, copy_time: bool = True, copy_data: bool = True, copy_time_info: bool = True, copy_data_info: bool = True, copy_events: bool = True, ) -> TimeSeries: """ Deep copy of a TimeSeries. Parameters ---------- copy_time Optional. True to copy time to the new TimeSeries, False to keep the time attribute empty. Default is True. copy_data Optional. True to copy data to the new TimeSeries, False to keep the data attribute empty. Default is True. copy_time_info Optional. True to copy time_info to the new TimeSeries, False to keep the time_info attribute empty. Default is True. copy_data_info Optional. True to copy data_into to the new TimeSeries, False to keep the data_info attribute empty. Default is True. copy_events Optional. True to copy events to the new TimeSeries, False to keep the events attribute empty. Default is True. Returns ------- TimeSeries A deep copy of the TimeSeries. """ check_param("copy_time", copy_time, bool) check_param("copy_data", copy_data, bool) check_param("copy_time_info", copy_time_info, bool) check_param("copy_data_info", copy_data_info, bool) check_param("copy_events", copy_events, bool) self._check_well_typed() if copy_data and copy_time_info and copy_data_info and copy_events: # General case return deepcopy(self) else: # Specific cases ts = ktk.TimeSeries() if copy_time: ts.time = deepcopy(self.time) if copy_data: ts.data = deepcopy(self.data) if copy_time_info: ts.time_info = deepcopy(self.time_info) if copy_data_info: ts.data_info = deepcopy(self.data_info) if copy_events: ts.events = deepcopy(self.events) return ts
# %% Data info management
[docs] def add_data_info( self, data_key: str, info_key: str, value: Any, *, overwrite: bool = False, in_place: bool = False, ) -> TimeSeries: """ Add metadata to TimeSeries' data. Parameters ---------- data_key The data key the info corresponds to. info_key The key of the info dict. value The info. overwrite Optional. True to overwrite the data info if it is already present in the TimeSeries. Default is False. in_place Optional. True to modify the original TimeSeries. False to return a modified copy of the TimeSeries while leaving the original TimeSeries intact. Default is False. Returns ------- TimeSeries The TimeSeries with the added data info. See Also -------- ktk.TimeSeries.remove_data_info Example ------- >>> ts = ktk.TimeSeries() >>> ts = ts.add_data_info("Forces", "Unit", "N") >>> ts = ts.add_data_info("Marker1", "Color", [43, 2, 255]) >>> ts.data_info["Forces"] {'Unit': 'N'} >>> ts.data_info["Marker1"] {'Color': [43, 2, 255]} """ check_param("data_key", data_key, str) check_param("info_key", info_key, str) check_param("overwrite", overwrite, bool) check_param("in_place", in_place, bool) self._check_well_typed() ts = self if in_place else self.copy() try: self.data_info[data_key][info_key] # It worked, therefore this data already exists. if overwrite is False: warnings.warn( f"A data info with same data_key ({data_key}) and " f"info_key ({info_key}) already exists in ths TimeSeries. " "Please use overwrite=True to suppress this warning. " "This warning will become an error in Kinetics Toolkit " "1.0." ) except KeyError: pass # This data does not exist yet. try: ts.data_info[data_key][info_key] = value except KeyError: ts.data_info[data_key] = {info_key: value} return ts
[docs] def remove_data_info( self, data_key: str, info_key: str, *, in_place: bool = False ) -> TimeSeries: """ Remove metadata from a TimeSeries' data. Parameters ---------- data_key The data key the info corresponds to. info_key The key of the info dict. in_place Optional. True to modify the original TimeSeries. False to return a modified copy of the TimeSeries while leaving the original TimeSeries intact. Returns ------- TimeSeries The TimeSeries with the removed data info. Raises ------ KeyError If this data_info could not be found. See Also -------- ktk.TimeSeries.add_data_info Example ------- >>> ts = ktk.TimeSeries() >>> ts = ts.add_data_info("Forces", "Unit", "N") >>> ts.data_info["Forces"] {'Unit': 'N'} >>> ts = ts.remove_data_info("Forces", "Unit") >>> ts.data_info["Forces"] {} """ check_param("data_key", data_key, str) check_param("info_key", info_key, str) check_param("in_place", in_place, bool) self._check_well_typed() ts = self if in_place else self.copy() try: data_info = ts.data_info[data_key] try: data_info.pop(info_key) except KeyError: self._raise_data_info_key_error(data_key, info_key) except KeyError: self._raise_data_key_error(data_key) return ts
# %% Data management
[docs] def add_data( self, data_key: str, data_value: ArrayLike, *, overwrite: bool = False, in_place: bool = False, ) -> TimeSeries: """ Add new data to the TimeSeries. Conceptually, these two lines are equivalent:: timeseries.data["name"] = value timeseries = timeseries.add_data(name, value) However, using add_data performs additional checks to ensure that no existing data is overwritten, and that time is matching for all data. See Raises section for more information on these checks. Parameters ---------- data_key Name of the data key. data_value Any data that can be converted to a NumPy array overwrite Optional. True to overwrite if there is already a data key of this name in the TimeSeries. Default is False. in_place Optional. True to modify and return the original TimeSeries. False to return a modified copy of the TimeSeries while leaving the original TimeSeries intact. Default is False. Returns ------- TimeSeries The TimeSeries with the added data. Raises ------ ValueError In any of these conditions: 1. If data with this key already exists and overwrite is False. 2. If the size of the data (first dimension) does not match the size of existing data or the existing time. 3. If data is a pandas DataFrame and its index does not match the existing time. See Also -------- ktk.TimeSeries.rename_data ktk.TimeSeries.remove_data Example ------- >>> ts = ktk.TimeSeries() >>> ts = ts.add_data("data1", [1.0, 2.0, 3.0]) >>> ts = ts.add_data("data2", [4.0, 5.0, 6.0]) >>> ts TimeSeries with attributes: time: array([], dtype=float64) data: {'data1': array([1., 2., 3.]), 'data2': array([4., 5., 6.])} time_info: {'Unit': 's'} data_info: {} events: [] """ check_param("data_key", data_key, str) check_param("overwrite", overwrite, bool) check_param("in_place", in_place, bool) ts = self if in_place else self.copy() # Cast data data_to_add = np.array(data_value) # Will be set at the very end # Check that the data fits with the TimeSeries' time (if it exists) if ts.time.shape[0] != 0: # If this is a Pandas DataFrame, check that its index is fully # compatible with time if isinstance(data_value, pd.DataFrame): if (ts.time.shape[0] != data_to_add.shape[0]) or ( not np.allclose(ts.time, np.array(data_value.index)) ): raise ValueError( "The index of the provided DataFrame does not match " "this TimeSeries' time attribute. This error was raised " "to prevent merging unsynchronized data. If you are " "confident that this DataFrame's data does match this " "TimeSeries, then set its index to this TimeSeries' time " "before adding it: " "the_dataframe.index = the_timeseries.time" ) # For every other type, check that the dimensions fit at least. elif ts.time.shape[0] != data_to_add.shape[0]: raise ValueError( f"This data has {data_to_add.shape[0]} samples while " f"this TimeSeries' time has {ts.time.shape[0]} samples." ) # Check that the data fits with other data (if it exists) for key in ts.data: if ts.data[key].shape[0] != data_to_add.shape[0]: raise ValueError( f"This data has {data_to_add.shape[0]} samples while " f"this TimeSeries' data {key} has {ts.data[key].shape[0]} " "samples." ) # Check that we would not overwrite by mistake if (data_key in self.data) and (overwrite is False): raise ValueError( f"A data with key '{data_key}' already exists in this " "TimeSeries. Either use another key name or set overwrite to " "True." ) # Add the data ts.data[data_key] = data_to_add return ts
[docs] def rename_data( self, old_data_key: str, new_data_key: str, *, in_place: bool = False ) -> TimeSeries: """ Rename a key in data and data_info. Parameters ---------- old_data_key Name of the current data key. new_data_key New name of the data key. in_place Optional. True to modify and return the original TimeSeries. False to return a modified copy of the TimeSeries while leaving the original TimeSeries intact. Default is False. Returns ------- TimeSeries The TimeSeries with the renamed data. Raises ------ KeyError If this data key could not be found in the TimeSeries' data attribute. See Also -------- ktk.TimeSeries.add_data ktk.TimeSeries.remove_data Example ------- >>> ts = ktk.TimeSeries() >>> ts = ts.add_data("test", np.arange(10)) >>> ts = ts.add_data_info("test", "Unit", "m") >>> ts.data {'test': array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])} >>> ts.data_info {'test': {'Unit': 'm'}} >>> ts = ts.rename_data("test", "signal") >>> ts.data {'signal': array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])} >>> ts.data_info {'signal': {'Unit': 'm'}} """ check_param("old_data_key", old_data_key, str) check_param("new_data_key", new_data_key, str) check_param("in_place", in_place, bool) self._check_well_typed() ts = self if in_place else self.copy() try: ts.data[new_data_key] = ts.data.pop(old_data_key) except KeyError: self._raise_data_key_error(old_data_key) try: ts.data_info[new_data_key] = ts.data_info.pop(old_data_key) except KeyError: pass # It's okay if there was no data info for this data_key return ts
[docs] def remove_data( self, data_key: str, *, in_place: bool = False ) -> TimeSeries: """ Remove a data key and its associated metadata. Parameters ---------- data_key Name of the data key. in_place Optional. True to modify and return the original TimeSeries. False to return a modified copy of the TimeSeries while leaving the original TimeSeries intact. Default is False. Returns ------- TimeSeries The TimeSeries with the removed data. Raises ------ KeyError If this data key could not be found in the TimeSeries' data attribute. See Also -------- ktk.TimeSeries.add_data ktk.TimeSeries.rename_data Example ------- >>> # Prepare a test TimeSeries with data "test" >>> ts = ktk.TimeSeries() >>> ts = ts.add_data("test", np.arange(10)) >>> ts = ts.add_data_info("test", "Unit", "m") >>> ts.data {'test': array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])} >>> ts.data_info {'test': {'Unit': 'm'}} >>> # Now remove data "test" >>> ts = ts.remove_data("test") >>> ts.data {} >>> ts.data_info {} """ check_param("data_key", data_key, str) check_param("in_place", in_place, bool) self._check_well_typed() ts = self if in_place else self.copy() try: ts.data.pop(data_key) except KeyError: self._raise_data_key_error(data_key) try: ts.data_info.pop(data_key) except KeyError: pass # It's okay if there was no data info for this data_key return ts
# %% Event management def _get_event_indexes(self, name: str) -> list[int]: """ Get a list of index of all occurrences of an event. Parameters ---------- name Name of the event to look for in the events list. Returns ------- list[int] The occurrences of this event. """ self._check_well_typed() # list all events with correct name event_times = [] event_indexes = [] for i_event, event in enumerate(self.events): if event.name == name: event_times.append(event.time) event_indexes.append(i_event) # Sort the indexes by time sorted_indexes = np.argsort(event_times) event_indexes = [event_indexes[i] for i in sorted_indexes] return event_indexes def _get_event_index(self, name: str, occurrence: int = 0) -> int: """ Get the events index of a given occurrence of an event name. Parameters ---------- name Name of the event to look for in the events list. occurrence Occurrence of the event Returns ------- int The index of the event occurrence in the events list. Raises ------ TimeSeriesEventNotFoundError If the specified occurrence could not be found. """ self._check_well_typed() occurrence = int(occurrence) if occurrence < 0: raise TimeSeriesEventNotFoundError( "The parameter `occurrence` must be positive a integer. " f"However, a value of {occurrence} was received." ) # Get the event occurrence try: return self._get_event_indexes(name)[occurrence] except IndexError: raise TimeSeriesEventNotFoundError( f"The occurrence {occurrence} of event '{name}' could not " "be found in the TimeSeries. A total of " f"{len(self._get_event_indexes(name))} occurrence(s) of " "this event name were found." ) def _get_duplicate_event_indexes(self) -> list[int]: """ Find events with same name and same time. Returns ------- list[int] A list of list of event indexes. The outer list corresponds to different events. The inner list corresponds to all occurences of this event. The integer corresponds to the event index in the TimeSeries' event list. Example ------- >>> ts = ktk.TimeSeries() # Three occurrences of event1 >>> ts = ts.add_event(0.0, "event1") >>> ts = ts.add_event(1E-12, "event1") >>> ts = ts.add_event(0.0, "event1") # One occurrence of event2, but also at 0.0 second >>> ts = ts.add_event(0.0, "event2") # Two occurrences of event3 >>> ts = ts.add_event(2.0, "event3") >>> ts = ts.add_event(2.0, "event3") """ self._check_well_typed() # Sort all events in a dict with key being tuple(time, name) sorted_events = {} # type: dict[tuple[float, str], list[int]] for i_event, event in enumerate(self.events): tup_event = event._to_tuple() # Check if this event already exist in the list. # If it does, add it to the list. found = False for key in sorted_events: if np.isclose(key[0], event.time) and (key[1] == event.name): sorted_events[key].append(i_event) found = True break if not found: # Otherwise, create it in the list sorted_events[tup_event] = [i_event] # Convert this dict to the desired list of lists out = [] for key in sorted_events: if len(sorted_events[key]) > 1: out.extend(sorted_events[key][1:]) return sorted(out)
[docs] def add_event( self, time: float, name: str = "event", *, in_place: bool = False, unique: bool = False, ) -> TimeSeries: """ Add an event to the TimeSeries. Parameters ---------- time The time of the event, in the same unit as `time_info["Unit"]`. name Optional. The name of the event. Default is "event". in_place Optional. True to modify and return the original TimeSeries. False to return a modified copy of the TimeSeries while leaving the original TimeSeries intact. Default is False. unique Optional. True to prevent duplicating an already existing event. In this case, if an event with the same time and name already exists, no event is added. Default is False. Returns ------- TimeSeries The TimeSeries with the added event. See Also -------- ktk.TimeSeries.rename_event ktk.TimeSeries.remove_event ktk.TimeSeries.trim_events ktk.TimeSeries.ui_edit_events Example ------- >>> ts = ktk.TimeSeries() >>> ts = ts.add_event(5.5, "event1") >>> ts = ts.add_event(10.8, "event2") >>> ts = ts.add_event(20.3, "event2") >>> ts.events [TimeSeriesEvent(time=5.5, name='event1'), TimeSeriesEvent(time=10.8, name='event2'), TimeSeriesEvent(time=20.3, name='event2')] """ check_param("time", time, float) check_param("name", name, str) check_param("in_place", in_place, bool) check_param("unique", unique, bool) self._check_well_typed() ts = self if in_place else self.copy() if unique: # Ensure that no event of that name and time already exists for event in ts.events: if np.isclose(time, event.time) and (name == event.name): return ts ts.events.append(TimeSeriesEvent(time, name)) return ts
[docs] def rename_event( self, old_name: str, new_name: str, occurrence: int | None = None, *, in_place: bool = False, ) -> TimeSeries: """ Rename an event occurrence or all events of a same name. Parameters ---------- old_name Name of the event to look for in the events list. new_name New event name occurrence Optional. i_th occurence of the event to look for in the events list, starting at 0, where the occurrences are sorted in time. If None (default), all occurences of this event name are renamed. in_place Optional. True to modify and return the original TimeSeries. False to return a modified copy of the TimeSeries while leaving the original TimeSeries intact. Default is False. Returns ------- TimeSeries The TimeSeries with the renamed event. See Also -------- ktk.TimeSeries.add_event ktk.TimeSeries.remove_event ktk.TimeSeries.trim_events ktk.TimeSeries.ui_edit_events Example ------- >>> ts = ktk.TimeSeries() >>> ts = ts.add_event(5.5, "event1") >>> ts = ts.add_event(10.8, "event2") >>> ts = ts.add_event(20.3, "event2") >>> ts.events [TimeSeriesEvent(time=5.5, name='event1'), TimeSeriesEvent(time=10.8, name='event2'), TimeSeriesEvent(time=20.3, name='event2')] >>> ts = ts.rename_event("event2", "event3") >>> ts.events [TimeSeriesEvent(time=5.5, name='event1'), TimeSeriesEvent(time=10.8, name='event3'), TimeSeriesEvent(time=20.3, name='event3')] >>> ts = ts.rename_event("event3", "event4", occurrence=0) >>> ts.events [TimeSeriesEvent(time=5.5, name='event1'), TimeSeriesEvent(time=10.8, name='event4'), TimeSeriesEvent(time=20.3, name='event3')] """ check_param("old_name", old_name, str) check_param("new_name", new_name, str) check_param("occurrence", occurrence, (int, None)) check_param("in_place", in_place, bool) self._check_well_typed() ts = self if in_place else self.copy() if old_name == new_name: return ts if occurrence is None: # Rename every occurrence of this event for index in self._get_event_indexes(old_name): ts.events[index].name = new_name else: index = self._get_event_index(old_name, occurrence) ts.events[index].name = new_name return ts
[docs] def remove_event( self, name: str, occurrence: int | None = None, *, in_place: bool = False, ) -> TimeSeries: """ Remove an event occurrence or all events of a same name. Parameters ---------- name Name of the event to look for in the events list. occurrence Optional. i_th occurence of the event to look for in the events list, starting at 0, where the occurrences are sorted in time. If None (default), all occurences of this event name or removed. in_place Optional. True to modify and return the original TimeSeries. False to return a modified copy of the TimeSeries while leaving the original TimeSeries intact. Default is False. Returns ------- TimeSeries The TimeSeries with the removed event. See Also -------- ktk.TimeSeries.add_event ktk.TimeSeries.rename_event ktk.TimeSeries.trim_events ktk.TimeSeries.ui_edit_events Example ------- >>> # Instanciate a timeseries with some events >>> ts = ktk.TimeSeries() >>> ts = ts.add_event(5.5, "event1") >>> ts = ts.add_event(10.8, "event2") >>> ts = ts.add_event(20.3, "event2") >>> ts.events [TimeSeriesEvent(time=5.5, name='event1'), TimeSeriesEvent(time=10.8, name='event2'), TimeSeriesEvent(time=20.3, name='event2')] >>> ts = ts.remove_event("event1") >>> ts.events [TimeSeriesEvent(time=10.8, name='event2'), TimeSeriesEvent(time=20.3, name='event2')] >>> ts = ts.remove_event("event2", 1) >>> ts.events [TimeSeriesEvent(time=10.8, name='event2')] """ check_param("name", name, str) check_param("occurrence", occurrence, (int, None)) check_param("in_place", in_place, bool) self._check_well_typed() ts = self if in_place else self.copy() if occurrence is None: # Remove all occurrences event_index = ts._get_event_index(name, 0) try: # Continually remove the first event of this name, until # there are no more. count = 0 while True: ts.remove_event(name, occurrence=0, in_place=True) count += 1 except TimeSeriesEventNotFoundError: if count == 0: # No event of that name was even found. raise TimeSeriesEventNotFoundError( f"No event named {name} could be found." ) else: # Remove only the specified occurrence event_index = ts._get_event_index(name, occurrence) ts.events.pop(event_index) return ts
[docs] def count_events(self, name: str) -> int: """ Count the number of occurrence of a given event name. Parameters ---------- name The name of the events to count. Returns ------- int The number of occurrences. Example ------- >>> # Instanciate a timeseries with some events >>> ts = ktk.TimeSeries() >>> ts = ts.add_event(5.5, "event1") >>> ts = ts.add_event(10.8, "event2") >>> ts = ts.add_event(20.3, "event2") >>> ts.count_events("event2") 2 """ check_param("name", name, str) self._check_well_typed() indexes = self._get_event_indexes(name) return len(indexes)
[docs] def remove_duplicate_events(self, *, in_place: bool = False) -> TimeSeries: """ Remove events with same name and time so that each event gets unique. Parameters ---------- in_place Optional. True to modify and return the original TimeSeries. False to return a modified copy of the TimeSeries while leaving the original TimeSeries intact. Default is False. Returns ------- TimeSeries The TimeSeries with only unique events. Example ------- >>> ts = ktk.TimeSeries() Three occurrences of event1: >>> ts = ts.add_event(0.0, "event1") >>> ts = ts.add_event(1E-12, "event1") >>> ts = ts.add_event(0.0, "event1") One occurrence of event2, but also at 0.0 second: >>> ts = ts.add_event(0.0, "event2") Two occurrences of event3: >>> ts = ts.add_event(2.0, "event3") >>> ts = ts.add_event(2.0, "event3") >>> ts.events [TimeSeriesEvent(time=0.0, name='event1'), TimeSeriesEvent(time=0.0, name='event1'), TimeSeriesEvent(time=0.0, name='event2'), TimeSeriesEvent(time=1e-12, name='event1'), TimeSeriesEvent(time=2.0, name='event3'), TimeSeriesEvent(time=2.0, name='event3')] >>> ts2 = ts.remove_duplicate_events() >>> ts2.events [TimeSeriesEvent(time=0.0, name='event1'), TimeSeriesEvent(time=0.0, name='event2'), TimeSeriesEvent(time=2.0, name='event3')] """ check_param("in_place", in_place, bool) self._check_well_typed() ts = self if in_place else self.copy() duplicates = ts._get_duplicate_event_indexes() for event_index in duplicates[-1::-1]: ts.events.pop(event_index) return ts
[docs] def trim_events(self, *, in_place: bool = False) -> TimeSeries: """ Delete the events that are outside the TimeSeries' time attribute. Parameters ---------- in_place Optional. True to modify and return the original TimeSeries. False to return a modified copy of the TimeSeries while leaving the original TimeSeries intact. Default is False. Returns ------- TimeSeries The TimeSeries without the trimmed events. See Also -------- ktk.TimeSeries.add_event ktk.TimeSeries.rename_event ktk.TimeSeries.remove_event ktk.TimeSeries.ui_edit_events Example ------- >>> ts = ktk.TimeSeries(time = np.arange(10)) >>> ts.time array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) >>> ts = ts.add_event(-2) >>> ts = ts.add_event(0) >>> ts = ts.add_event(5) >>> ts = ts.add_event(9) >>> ts = ts.add_event(10) >>> ts.events [TimeSeriesEvent(time=-2, name='event'), TimeSeriesEvent(time=0, name='event'), TimeSeriesEvent(time=5, name='event'), TimeSeriesEvent(time=9, name='event'), TimeSeriesEvent(time=10, name='event')] >>> ts = ts.trim_events() >>> ts.events [TimeSeriesEvent(time=0, name='event'), TimeSeriesEvent(time=5, name='event'), TimeSeriesEvent(time=9, name='event')] """ check_param("in_place", in_place, bool) self._check_well_typed() ts = self if in_place else self.copy() events = deepcopy(ts.events) ts.events = [] for event in events: if event.time <= np.max(ts.time) and event.time >= np.min(ts.time): ts.add_event(event.time, event.name, in_place=True) return ts
# %% get_index methods
[docs] def get_index_at_time(self, time: float) -> int: """ Get the time index that is closest to the specified time. Parameters ---------- time Time to look for in the TimeSeries' time attribute. Returns ------- int The index in the time attribute. See Also -------- ktk.TimeSeries.get_index_before_time ktk.TimeSeries.get_index_after_time ktk.TimeSeries.get_index_before_event ktk.TimeSeries.get_index_at_event ktk.TimeSeries.get_index_after_event Example ------- >>> ts = ktk.TimeSeries(time=np.array([0, 0.5, 1, 1.5, 2])) >>> ts.get_index_at_time(0.9) 2 >>> ts.get_index_at_time(1) 2 >>> ts.get_index_at_time(1.1) 2 >>> ts.get_index_at_time(2.1) 4 """ check_param("time", time, float) self._check_well_shaped() self._check_not_empty_time() return int(np.argmin(np.abs(self.time - float(time))))
[docs] def get_index_before_time( self, time: float, *, inclusive: bool = False ) -> int: """ Get the time index that is just before the specified time. Parameters ---------- time Time to look for in the TimeSeries' time attribute. inclusive Optional. True to include the given time in the comparison. Returns ------- int The index in the time attribute. Raises ------ TimeSeriesRangeError If the resulting index would be outside the TimeSeries range. See Also -------- ktk.TimeSeries.get_index_at_time ktk.TimeSeries.get_index_after_time ktk.TimeSeries.get_index_before_event ktk.TimeSeries.get_index_at_event ktk.TimeSeries.get_index_after_event Example ------- >>> ts = ktk.TimeSeries(time=np.array([0, 0.5, 1, 1.5, 2])) >>> ts.get_index_before_time(0.9) 1 >>> ts.get_index_before_time(1) 1 >>> ts.get_index_before_time(1.1) 2 >>> ts.get_index_before_time(1.1, inclusive=True) 2 """ check_param("time", time, float) check_param("inclusive", inclusive, bool) self._check_well_shaped() def _raise(): raise TimeSeriesRangeError( f"There is no data before the requested time of {time} " f"{self.time_info['Unit']}." ) self._check_increasing_time() if inclusive: mask = np.nonzero(self.time <= time) else: mask = np.nonzero(self.time < time) if mask[0].shape == (0,): _raise() return int(mask[0][-1])
[docs] def get_index_after_time( self, time: float, *, inclusive: bool = False ) -> int: """ Get the time index that is just after the specified time. Parameters ---------- time Time to look for in the TimeSeries' time attribute. inclusive Optional. True to include the given time in the comparison. Returns ------- int The index in the time attribute. Raises ------ TimeSeriesRangeError If the resulting index would be outside the TimeSeries range. See Also -------- ktk.TimeSeries.get_index_before_time ktk.TimeSeries.get_index_at_time ktk.TimeSeries.get_index_before_event ktk.TimeSeries.get_index_at_event ktk.TimeSeries.get_index_after_event Example ------- >>> ts = ktk.TimeSeries(time=np.array([0, 0.5, 1, 1.5, 2])) >>> ts.get_index_after_time(0.9) 2 >>> ts.get_index_after_time(0.9, inclusive=True) 2 >>> ts.get_index_after_time(1) 3 >>> ts.get_index_after_time(1, inclusive=True) 2 """ check_param("time", time, float) check_param("inclusive", inclusive, bool) self._check_well_shaped() def _raise(): raise TimeSeriesRangeError( f"There is no data before the requested time of {time} " f"{self.time_info['Unit']}." ) self._check_increasing_time() if inclusive: mask = np.nonzero(self.time >= time) else: mask = np.nonzero(self.time > time) if mask[0].shape == (0,): _raise() return int(mask[0][0])
[docs] def get_index_at_event(self, name: str, occurrence: int = 0) -> int: """ Get the time index that is closest to the specified event occurrence. Parameters ---------- name Event name occurrence Occurrence of the event. The default is 0. Returns ------- int The index in the time attribute. See Also -------- ktk.TimeSeries.get_index_before_time ktk.TimeSeries.get_index_at_time ktk.TimeSeries.get_index_after_time ktk.TimeSeries.get_index_before_event ktk.TimeSeries.get_index_after_event Example ------- >>> ts = ktk.TimeSeries(time=np.arange(10)/10) >>> ts = ts.add_event(0.2, "event") >>> ts = ts.add_event(0.36, "event") >>> ts.time array([0. , 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_index_at_event("event") 2 >>> ts.get_index_at_event("event", occurrence=1) 4 """ check_param("name", name, str) check_param("occurrence", occurrence, int) self._check_well_shaped() return self.get_index_at_time( self.events[self._get_event_index(name, occurrence)].time )
[docs] def get_index_before_event( self, name: str, occurrence: int = 0, inclusive: bool = False ) -> int: """ Get the time index that is just before the specified event occurrence. Parameters ---------- name Event name occurrence Occurrence of the event. The default is 0. inclusive True to allow including one sample after the event if needed, to make sure that the event time is part of the returned TimeSeries's time. False to make sure that the returned TimeSeries does not include the event time. Default is False. Returns ------- int The index in the time attribute. Raises ------ TimeSeriesRangeError If the resulting index would be outside the TimeSeries range. See Also -------- ktk.TimeSeries.get_index_before_time ktk.TimeSeries.get_index_at_time ktk.TimeSeries.get_index_after_time ktk.TimeSeries.get_index_at_event ktk.TimeSeries.get_index_after_event Example ------- >>> ts = ktk.TimeSeries(time=np.arange(10)/10) >>> ts = ts.add_event(0.2, "event") >>> ts = ts.add_event(0.36, "event") >>> ts.time array([0. , 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_index_before_event("event") 1 >>> ts.get_index_before_event("event", occurrence=1) 3 >>> ts.get_index_before_event("event", occurrence=0, inclusive=True) 2 """ check_param("name", name, str) check_param("occurrence", occurrence, int) check_param("inclusive", inclusive, bool) self._check_well_shaped() if inclusive is False: return self.get_index_before_time( self.events[self._get_event_index(name, occurrence)].time, inclusive=False, ) else: return self.get_index_after_time( self.events[self._get_event_index(name, occurrence)].time, inclusive=True, )
[docs] def get_index_after_event( self, name: str, occurrence: int = 0, inclusive: bool = False ) -> int: """ Get the time index that is just after the specified event occurrence. Parameters ---------- name Event name occurrence Occurrence of the event. The default is 0. inclusive True to allow including one sample before the event if needed, to make sure that the event time is part of the output TimeSeries's time. False to make sure that the returned TimeSeries does not include the event time. Default is False. Returns ------- int The index in the time attribute. Raises ------ TimeSeriesRangeError If the resulting index would be outside the TimeSeries range. See Also -------- ktk.TimeSeries.get_index_before_time ktk.TimeSeries.get_index_at_time ktk.TimeSeries.get_index_after_time ktk.TimeSeries.get_index_before_event ktk.TimeSeries.get_index_at_event Example ------- >>> ts = ktk.TimeSeries(time=np.arange(10)/10) >>> ts = ts.add_event(0.2, "event") >>> ts = ts.add_event(0.36, "event") >>> ts.time array([0. , 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_index_after_event("event") 3 >>> ts.get_index_after_event("event", occurrence=1) 4 >>> ts.get_index_after_event("event", inclusive=True) 2 """ check_param("name", name, str) check_param("occurrence", occurrence, int) check_param("inclusive", inclusive, bool) self._check_well_shaped() if inclusive is False: return self.get_index_after_time( self.events[self._get_event_index(name, occurrence)].time, inclusive=False, ) else: return self.get_index_before_time( self.events[self._get_event_index(name, occurrence)].time, inclusive=True, )
# %% get_ts methods
[docs] def get_ts_before_index( self, index: int, *, inclusive: bool = False ) -> TimeSeries: """ Get a TimeSeries before the specified time index. Parameters ---------- index Time index inclusive Optional. True to include the given time index. Returns ------- TimeSeries A new TimeSeries that fulfils the specified conditions. Raises ------ TimeSeriesRangeError If there is no data before the specified index. See Also -------- ktk.TimeSeries.get_ts_before_time ktk.TimeSeries.get_ts_before_event ktk.TimeSeries.get_ts_after_index ktk.TimeSeries.get_ts_between_indexes Example ------- >>> ts = ktk.TimeSeries(time=np.arange(10)/10) >>> ts.time array([0. , 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_ts_before_index(2).time array([0. , 0.1]) >>> ts.get_ts_before_index(2, inclusive=True).time array([0. , 0.1, 0.2]) """ check_param("index", index, int) check_param("inclusive", inclusive, bool) self._check_well_shaped() self._check_increasing_time() if (inclusive and (index < 0)) or (not inclusive and (index <= 0)): raise TimeSeriesRangeError( "Negative indexing is not supported in TimeSeries." ) return self.get_ts_between_indexes( 0, index, inclusive=(True, inclusive) )
[docs] def get_ts_after_index( self, index: int, *, inclusive: bool = False ) -> TimeSeries: """ Get a TimeSeries after the specified time index. Parameters ---------- index Time index inclusive Optional. True to include the given time index. Returns ------- TimeSeries A new TimeSeries that fulfils the specified conditions. Raises ------ TimeSeriesRangeError If there is no data after the specified index. See Also -------- ktk.TimeSeries.get_ts_after_time ktk.TimeSeries.get_ts_after_event ktk.TimeSeries.get_ts_before_index ktk.TimeSeries.get_ts_between_indexes Example ------- >>> ts = ktk.TimeSeries(time=np.arange(10)/10) >>> ts.time array([0. , 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_ts_after_index(2).time array([0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_ts_after_index(2, inclusive=True).time array([0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) """ check_param("index", index, int) check_param("inclusive", inclusive, bool) self._check_well_shaped() self._check_increasing_time() if (inclusive and (index > self.time.shape[0] - 1)) or ( not inclusive and (index >= self.time.shape[0] - 1) ): raise TimeSeriesRangeError( "There is no data in this TimeSeries after the specified " f"index of {index} since the time of this TimeSeries has a " f"shape of {self.time.shape}." ) return self.get_ts_between_indexes( index, self.time.shape[0] - 1, inclusive=(inclusive, True) )
[docs] def get_ts_between_indexes( self, index1: int, index2: int, *, inclusive: bool | tuple[bool, bool] = False, ) -> TimeSeries: """ Get a TimeSeries between two specified time indexes. Parameters ---------- index1, index2 Time indexes inclusive Optional. Either a bool or a tuple of two bools. Used to specify which indexes are returned: - False or (False, False) (default): index1 < index < index2 - True or (True, True): index1 <= index <= index2 - (True, False): index1 <= index < index2 - (False, True): index1 < index <= index2 Returns ------- TimeSeries A new TimeSeries that fulfils the specified conditions. Raises ------ TimeSeriesRangeError If there is no data between the specified indexes. See Also -------- ktk.TimeSeries.get_ts_between_times ktk.TimeSeries.get_ts_between_events ktk.TimeSeries.get_ts_before_index ktk.TimeSeries.get_ts_after_index Example ------- >>> ts = ktk.TimeSeries(time=np.arange(10)/10) >>> ts.time array([0. , 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_ts_between_indexes(2, 5).time array([0.3, 0.4]) >>> ts.get_ts_between_indexes(2, 5, inclusive=True).time array([0.2, 0.3, 0.4, 0.5]) >>> ts.get_ts_between_indexes(2, 5, inclusive=[True, False]).time array([0.2, 0.3, 0.4]) """ check_param("index1", index1, int) check_param("index2", index2, int) if isinstance(inclusive, bool): inclusive = (inclusive, inclusive) try: inclusive = cast(tuple[bool, bool], tuple(inclusive)) check_param( "inclusive", inclusive, tuple, length=2, contents_type=bool, ) except TypeError: raise TypeError( "inclusive must be either a bool or a tuple of two bools." ) self._check_well_shaped() self._check_increasing_time() if index2 < index1: raise ValueError( "The parameter index2 must be higher than index1. " f"However, index2 is {index2} while index1 is {index1}." ) if index1 < 0 or index1 >= len(self.time): raise TimeSeriesRangeError( f"The specified index1 of {index1} is out of " f"range. The TimeSeries has {len(self.time)} samples." ) index1 -= int(inclusive[0]) if index2 < 0 or index2 >= len(self.time): raise TimeSeriesRangeError( f"The specified index2 of {index2} is out of " f"range. The TimeSeries has {len(self.time)} samples." ) index2 += int(inclusive[1]) index_range = range(index1 + 1, index2) out_ts = self.copy(copy_data=False, copy_time=False) out_ts.time = self.time[index_range] for the_data in self.data.keys(): out_ts.data[the_data] = self.data[the_data][index_range] return out_ts
[docs] def get_ts_before_time( self, time: float, *, inclusive: bool = False ) -> TimeSeries: """ Get a TimeSeries before the specified time. Parameters ---------- time Time to look for in the TimeSeries' time attribute. inclusive Optional. True to include the given time in the comparison. Returns ------- TimeSeries A new TimeSeries that fulfils the specified conditions. Raises ------ TimeSeriesRangeError If there is no data before the specified time. See Also -------- ktk.TimeSeries.get_ts_before_index ktk.TimeSeries.get_ts_before_event ktk.TimeSeries.get_ts_after_time ktk.TimeSeries.get_ts_between_times Example ------- >>> ts = ktk.TimeSeries(time=np.arange(10)/10) >>> ts.time array([0. , 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_ts_before_time(0.3).time array([0. , 0.1, 0.2]) >>> ts.get_ts_before_time(0.3, inclusive=True).time array([0. , 0.1, 0.2, 0.3]) """ check_param("time", time, float) check_param("inclusive", inclusive, bool) self._check_well_shaped() self._check_increasing_time() if (inclusive and (time < self.time[0])) or ( not inclusive and (time <= self.time[0]) ): raise TimeSeriesRangeError( "There is no data in this TimeSeries before the specified " f"time of {time} since the begin time of this TimeSeries is " "{self.time[-1]}." ) return self.get_ts_between_times( self.time[0], time, inclusive=(True, inclusive) )
[docs] def get_ts_after_time( self, time: float, *, inclusive: bool = False ) -> TimeSeries: """ Get a TimeSeries after the specified time. Parameters ---------- time Time to look for in the TimeSeries' time attribute. inclusive Optional. True to include the given time in the comparison. Returns ------- TimeSeries A new TimeSeries that fulfils the specified conditions. Raises ------ TimeSeriesRangeError If there is no data after the specified index. See Also -------- ktk.TimeSeries.get_ts_after_index ktk.TimeSeries.get_ts_after_event ktk.TimeSeries.get_ts_before_time ktk.TimeSeries.get_ts_between_times Example ------- >>> ts = ktk.TimeSeries(time=np.arange(10)/10) >>> ts.time array([0. , 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_ts_after_time(0.3).time array([0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_ts_after_time(0.3, inclusive=True).time array([0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) """ check_param("time", time, float) check_param("inclusive", inclusive, bool) self._check_well_shaped() self._check_increasing_time() if (inclusive and (time > self.time[-1])) or ( not inclusive and (time >= self.time[-1]) ): raise TimeSeriesRangeError( "There is no data in this TimeSeries after the specified time " f"of {time} since the end time of this TimeSeries is " f"{self.time[-1]}." ) return self.get_ts_between_times( time, self.time[-1], inclusive=(inclusive, True) )
[docs] def get_ts_between_times( self, time1: float, time2: float, *, inclusive: bool | tuple[bool, bool] = False, ) -> TimeSeries: """ Get a TimeSeries between two specified times. Parameters ---------- time1, time2 Times to look for in the TimeSeries' time attribute. inclusive Optional. Either a bool or a tuple of two bools. Used to specify which times are returned: - False or (False, False) (default): time1 < time < time2 - True or (True, True): time1 <= time <= time2 - (True, False): time1 <= time < time2 - (False, True): time1 < time <= time2 Returns ------- TimeSeries A new TimeSeries that fulfils the specified conditions. Raises ------ TimeSeriesRangeError If there is no data between the specified times. See Also -------- ktk.TimeSeries.get_ts_between_indexes ktk.TimeSeries.get_ts_between_events ktk.TimeSeries.get_ts_before_time ktk.TimeSeries.get_ts_after_time Example ------- >>> ts = ktk.TimeSeries(time=np.arange(10)/10) >>> ts.time array([0. , 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_ts_between_times(0.2, 0.5).time array([0.3, 0.4]) >>> ts.get_ts_between_times(0.2, 0.5, inclusive=True).time array([0.2, 0.3, 0.4, 0.5]) >>> ts.get_ts_between_times(0.2, 0.5, inclusive=[True, False]).time array([0.2, 0.3, 0.4]) """ check_param("time1", time1, float) check_param("teim2", time2, float) if isinstance(inclusive, bool): inclusive = (inclusive, inclusive) try: inclusive = cast(tuple[bool, bool], tuple(inclusive)) check_param( "inclusive", inclusive, tuple, length=2, contents_type=bool, ) except TypeError: raise TypeError( "inclusive must be either a bool or a tuple of two bools." ) if time2 < time1: raise ValueError( "The parameters time2 must be higher or equal to time1. " f"However, time2 is {time2} while time1 is {time1}." ) index1 = self.get_index_after_time(time1, inclusive=inclusive[0]) index2 = self.get_index_before_time(time2, inclusive=inclusive[1]) return self.get_ts_between_indexes(index1, index2, inclusive=True)
[docs] def get_ts_before_event( self, name: str, occurrence: int = 0, *, inclusive: bool = False ) -> TimeSeries: """ Get a TimeSeries before the specified event. Parameters ---------- name Name of the event to look for in the events list. occurrence Optional. i_th occurence of the event to look for in the events list, starting at 0. inclusive Optional. True to include the given time in the comparison. Returns ------- TimeSeries A new TimeSeries that fulfils the specified conditions. Raises ------ TimeSeriesRangeError If there is no data before the specified event. See Also -------- ktk.TimeSeries.get_ts_before_index ktk.TimeSeries.get_ts_before_time ktk.TimeSeries.get_ts_after_event ktk.TimeSeries.get_ts_between_events Example ------- >>> ts = ktk.TimeSeries(time=np.arange(10)/10) >>> ts = ts.add_event(0.2, "event") >>> ts = ts.add_event(0.35, "event") >>> ts.time array([0. , 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_ts_before_event("event").time array([0. , 0.1]) >>> ts.get_ts_before_event("event", inclusive=True).time array([0. , 0.1, 0.2]) >>> ts.get_ts_before_event("event", 1).time array([0. , 0.1, 0.2, 0.3]) >>> ts.get_ts_before_event("event", 1, inclusive=True).time array([0. , 0.1, 0.2, 0.3, 0.4]) """ check_param("name", name, str) check_param("occurrence", occurrence, int) check_param("inclusive", inclusive, bool) self._check_well_shaped() try: retval = self.get_ts_before_index( self.get_index_before_event( name, occurrence, inclusive=inclusive ), inclusive=True, ) except TimeSeriesRangeError: time = self.events[self._get_event_index(name, occurrence)].time raise TimeSeriesRangeError( f"There is no data before the occurrence {occurrence} of " f"event '{name}', which happens at {time} " f"{self.time_info['Unit']}." ) else: return retval
[docs] def get_ts_after_event( self, name: str, occurrence: int = 0, *, inclusive: bool = False ) -> TimeSeries: """ Get a TimeSeries after the specified event. Parameters ---------- name Name of the event to look for in the events list. occurrence Optional. i_th occurence of the event to look for in the events list, starting at 0. inclusive Optional. True to include the given event in the comparison. Returns ------- TimeSeries A new TimeSeries that fulfils the specified conditions. Raises ------ TimeSeriesRangeError If there is no data after the specified event. See Also -------- ktk.TimeSeries.get_ts_after_index ktk.TimeSeries.get_ts_after_time ktk.TimeSeries.get_ts_before_event ktk.TimeSeries.get_ts_between_events Example ------- >>> ts = ktk.TimeSeries(time=np.arange(10)/10) >>> ts = ts.add_event(0.2, "event") >>> ts = ts.add_event(0.35, "event") >>> ts.time array([0. , 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_ts_after_event("event").time array([0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_ts_after_event("event", inclusive=True).time array([0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_ts_after_event("event", 1).time array([0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_ts_after_event("event", 1, inclusive=True).time array([0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) """ check_param("name", name, str) check_param("occurrence", occurrence, int) check_param("inclusive", inclusive, bool) self._check_well_shaped() try: retval = self.get_ts_after_index( self.get_index_after_event( name, occurrence, inclusive=inclusive ), inclusive=True, ) except TimeSeriesRangeError: time = self.events[self._get_event_index(name, occurrence)].time raise TimeSeriesRangeError( f"There is no data after the occurrence {occurrence} of " f"event '{name}', which happens at {time} " f"{self.time_info['Unit']}." ) else: return retval
[docs] def get_ts_between_events( self, name1: str, name2: str, occurrence1: int = 0, occurrence2: int = 0, *, inclusive: bool | tuple[bool, bool] = False, ) -> TimeSeries: """ Get a TimeSeries between two specified events. Parameters ---------- name1, name2 Name of the events to look for in the events list. occurrence1, occurrence2 Optional. i_th occurence of the event to look for in the events list, starting at 0. inclusive Optional. Either a bool or a tuple of two bools. Used to specify which times are returned: - False or (False, False) (default): event1.time < time < event2.time - True or (True, True): event1.time <= time <= event2.time - (True, False): event1.time <= time < event2.time - (False, True): event1.time < time <= event2.time Returns ------- TimeSeries A new TimeSeries that fulfils the specified conditions. Raises ------ TimeSeriesRangeError If there is no data between the specified events. See Also -------- ktk.TimeSeries.get_ts_between_indexes ktk.TimeSeries.get_ts_between_times ktk.TimeSeries.get_ts_before_event ktk.TimeSeries.get_ts_after_event Example ------- >>> ts = ktk.TimeSeries(time=np.arange(10)/10) >>> ts = ts.add_event(0.2, "event") >>> ts = ts.add_event(0.55, "event") >>> ts.time array([0. , 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.get_ts_between_events("event", "event", 0, 1).time array([0.3, 0.4, 0.5]) >>> ts.get_ts_between_events("event", "event", 0, 1, \ inclusive=True).time array([0.2, 0.3, 0.4, 0.5, 0.6]) """ check_param("name1", name1, str) check_param("name2", name2, str) check_param("occurrence1", occurrence2, int) check_param("occurrence1", occurrence2, int) if isinstance(inclusive, bool): inclusive = (inclusive, inclusive) try: inclusive = cast(tuple[bool, bool], tuple(inclusive)) check_param( "inclusive", inclusive, tuple, length=2, contents_type=bool, ) except TypeError: raise TypeError( "inclusive must be either a bool or a tuple of two bools." ) self._check_well_shaped() time1 = self.events[self._get_event_index(name1, occurrence1)].time time2 = self.events[self._get_event_index(name2, occurrence2)].time if time2 < time1: raise ValueError( f"The end event (occurrence {occurrence2} of " f"'{name2}') happens at {time2} {self.time_info['Unit']}, " f"which is before the begin event (occurrence {occurrence1} " f"of '{name1}') that happens at {time1} " f"{self.time_info['Unit']}." ) index1 = self.get_index_after_event( name1, occurrence1, inclusive=inclusive[0] ) index2 = self.get_index_before_event( name2, occurrence2, inclusive=inclusive[1] ) return self.get_ts_between_indexes(index1, index2, inclusive=True)
# %% Time management
[docs] def shift(self, time: float, *, in_place: bool = False) -> TimeSeries: """ Shift time and events.time. Parameters ---------- time Time to be added to time and events.time. in_place Optional. True to modify and return the original TimeSeries. False to return a modified copy of the TimeSeries while leaving the original TimeSeries intact. Default is False. Returns ------- TimeSeries The TimeSeries with the time being shifted. See Also -------- ktk.TimeSeries.ui_sync Example ------- >>> ts = ktk.TimeSeries(time=np.arange(10)/10) >>> ts = ts.add_event(0.35, "start") >>> ts.time array([0. , 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) >>> ts.events [TimeSeriesEvent(time=0.35, name='start')] >>> ts = ts.shift(0.2) >>> ts.time array([0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1. , 1.1]) >>> ts.events [TimeSeriesEvent(time=0.55, name='start')] """ check_param("time", time, float) check_param("in_place", in_place, bool) self._check_well_shaped() ts = self if in_place else self.copy() for event in ts.events: event.time += time ts.time += time return ts
[docs] def get_sample_rate(self) -> float: """ Get the sample rate in samples/s. Returns ------- float The sample rate in samples per second. If time is empty or has only one data, or if sample rate is variable, or if time is not monotonously increasing, a value of np.nan is returned. Warning ------- This feature, which has been introduced in version 0.9, is still experimental and may change in the future. In particular, the value returned if the sample rate is not constant: it is np.nan in all cases for now, but it could change in the future based on discussions and particular use cases. See Also -------- ktk.TimeSeries.resample Example ------- >>> ts = ktk.TimeSeries(time=np.arange(100)/10) # 100 samples at 10 Hz >>> ts.get_sample_rate() 10.0 """ self._check_well_shaped() if self.time.shape[0] <= 1: return np.nan deltas = self.time[1:] - self.time[0:-1] if np.allclose(deltas, [deltas[0]]): return 1.0 / deltas.mean() else: return np.nan
[docs] def resample( self, target: ArrayLike | float, kind: str = "linear", *, extrapolate: bool = False, in_place: bool = False, **kwargs, ) -> TimeSeries: """ Resample the TimeSeries. Resample every data of the TimeSeries over a new frequency or new series of times, using the interpolation method provided by parameter `kind`. This method does not fill missing data. Consequently, time ranges with nans in the original TimeSeries will also contain nans in the resampled TimeSeries. Parameters ---------- target To resample to a target frequency, use a float that represents the sample rate of the output TimeSeries, in Hz. To resample to specific times, use an array of float that will become the time property of the output TimeSeries. kind Optional. The interpolation method. This input may take any value supported by scipy.interpolate.interp1d, such as "linear", "nearest", "zero", "slinear", "quadratic", "cubic", "previous", "next". Additionally, kind can be "pchip". Default is "linear". extrapolate Optional. True to extrapolate outside the original time range. Default is False. in_place Optional. True to modify and return the original TimeSeries. False to return a modified copy of the TimeSeries while leaving the original TimeSeries intact. Default is False. Returns ------- TimeSeries The TimeSeries with a new sample rate. Caution ------- Attempting to resample a series of homogeneous matrices would likely produce non-homogeneous matrices, and as a result, transforms would not be rigid anymore. This function can't detect if you attempt to resample series of homogeneous matrices, and therefore won't generate an error or warning. See Also -------- ktk.TimeSeries.get_sample_rate ktk.TimeSeries.fill_missing_samples Examples -------- >>> ts = ktk.TimeSeries(time=np.arange(10.)) >>> ts = ts.add_data("data", ts.time ** 2) >>> ts.time array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]) >>> ts.data["data"] array([ 0., 1., 4., 9., 16., 25., 36., 49., 64., 81.]) Example 1: Resampling at 2 Hz >>> ts1 = ts.resample(2.0) >>> ts1.time array([0. , 0.5, 1. , 1.5, 2. , 2.5, 3. , 3.5, 4. , 4.5, 5. , 5.5, 6. , 6.5, 7. , 7.5, 8. , 8.5, 9. ]) >>> ts1.data["data"] array([ 0. , 0.5, 1. , 2.5, 4. , 6.5, 9. , 12.5, 16. , 20.5, 25. , 30.5, 36. , 42.5, 49. , 56.5, 64. , 72.5, 81. ]) Example 2: Resampling on new times >>> ts2 = ts.resample([0.0, 0.5, 1.0, 1.5, 2.0]) >>> ts2.time array([0. , 0.5, 1. , 1.5, 2. ]) >>> ts2.data["data"] array([0. , 0.5, 1. , 2.5, 4. ]) Example 3: Resampling at 2 Hz with missing data in the original ts >>> ts.data["data"][[0, 1, 5, 8, 9]] = np.nan >>> ts.data["data"] array([nan, nan, 4., 9., 16., nan, 36., 49., nan, nan]) >>> ts3 = ts.resample(2.0) >>> ts3.time array([0. , 0.5, 1. , 1.5, 2. , 2.5, 3. , 3.5, 4. , 4.5, 5. , 5.5, 6. , 6.5, 7. , 7.5, 8. , 8.5, 9. ]) >>> ts3.data["data"] array([ nan, nan, nan, nan, 4. , 6.5, 9. , 12.5, 16. , nan, nan, nan, 36. , 42.5, 49. , nan, nan, nan, nan]) """ check_param("kind", kind, str) check_param("in_place", in_place, bool) if "fill_value" in kwargs: warnings.warn( "fill_value parameter has been removed in version 0.12 " "because its behavior was unclear and it was ignored in many " "situations " "(https://github.com/felixchenier/kineticstoolkit/issues/174)." ) self._check_well_shaped() ts = self if in_place else self.copy() # -------------------------------------------------------------- # Create the new time if a frequency was provided instead if isinstance(target, Real): # We specifically use arange instead of linspace, because what # is defined is a frequency, not a number of points. new_time = np.arange( ts.time[0], ts.time[-1] + 1 / target, 1 / target, ) # Work around the numerical instability of using arange with floats # by ensuring that the time point is not higher than the original # last time point if new_time[-1] > ts.time[-1]: new_time = new_time[:-1] else: new_time = np.array(target) # type: ignore # -------------------------------------------------------------- if np.any(np.isnan(new_time)): raise ValueError("new_time must not contain nans") # We will progressively fill these data new_data = {} # type: dict[str, np.ndarray] for key in ts.data.keys(): index = ~ts.isnan(key) if sum(index) < 3: # Only Nans, cannot interpolate. # We generate an array of nans of the expected size. new_shape = [len(new_time)] for i in range(1, len(self.data[key].shape)): new_shape.append(self.data[key].shape[i]) new_data[key] = np.empty(new_shape) new_data[key][:] = np.nan continue # Express nans as a range of times to # remove from the final, interpolated timeseries nan_indexes = np.argwhere(~index) # initialize with times outside of the original time range time_ranges_to_remove: list[tuple[float, float]] = [] if not extrapolate: time_ranges_to_remove.append((-np.inf, ts.time[0])) time_ranges_to_remove.append((ts.time[-1], np.inf)) length = ts.time.shape[0] for i in nan_indexes: if i > 0 and i < length - 1: time_range = (ts.time[i - 1], ts.time[i + 1]) elif i == 0: time_range = (-np.inf, ts.time[i + 1]) else: time_range = (ts.time[i - 1], np.inf) time_ranges_to_remove.append(time_range) if kind == "pchip": P = sp.interpolate.PchipInterpolator( ts.time[index], ts.data[key][index], axis=0, extrapolate=True, ) new_data[key] = P(new_time) else: f = sp.interpolate.interp1d( ts.time[index], ts.data[key][index], axis=0, fill_value="extrapolate", kind=kind, ) new_data[key] = f(new_time) # Put back nans in the originally missing data for j in time_ranges_to_remove: new_data[key][(new_time > j[0]) & (new_time < j[1])] = np.nan ts.time = new_time ts.data = new_data return ts
# %% Subsetting and merging
[docs] def get_subset(self, data_keys: str | list[str]) -> TimeSeries: """ Return a subset of the TimeSeries. This method returns a TimeSeries that contains only selected data keys. The corresponding data_info keys are copied in the new TimeSeries. All events are also copied in the new TimeSeries. Parameters ---------- data_keys The data keys to extract from the timeseries. Returns ------- TimeSeries The TimeSeries, minus the unspecified data keys. Raises ------ KeyError If one or more data keys could not be found in the TimeSeries data. See Also -------- ktk.TimeSeries.merge Example ------- >>> ts = ktk.TimeSeries(time = np.arange(10)) >>> ts = ts.add_data("signal1", ts.time) >>> ts = ts.add_data("signal2", ts.time**2) >>> ts = ts.add_data("signal3", ts.time**3) >>> ts.data.keys() dict_keys(['signal1', 'signal2', 'signal3']) >>> ts2 = ts.get_subset(["signal1", "signal3"]) >>> ts2.data.keys() dict_keys(['signal1', 'signal3']) """ try: check_param("data_keys", data_keys, str) except TypeError: try: check_param("data_keys", data_keys, list, contents_type=str) except TypeError: raise TypeError( "data_keys must be a string or a list of strings." ) self._check_well_shaped() if isinstance(data_keys, str): data_keys = [data_keys] ts = TimeSeries() ts.time = self.time.copy() ts.time_info = deepcopy(self.time_info) ts.events = deepcopy(self.events) for key in data_keys: try: ts.data[key] = self.data[key].copy() except KeyError: raise KeyError( f"The key '{key}' could not be found among the " f"{len(self.data)} data entries of the TimeSeries" ) try: ts.data_info[key] = deepcopy(self.data_info[key]) except KeyError: pass return ts
[docs] def merge( self, ts: TimeSeries, data_keys: str | list[str] = [], *, resample: bool = False, overwrite: bool = False, on_conflict: str = "warning", in_place: bool = False, ) -> TimeSeries: """ Merge the TimeSeries with another TimeSeries. Parameters ---------- ts The TimeSeries to merge into the current TimeSeries. data_keys Optional. The data keys to merge from ts. If left empty, all the data keys are merged. resample Optional. Set to True to resample the source TimeSeries to the target one using a linear interpolation. If the time attributes are not equivalent and resample is False, an exception is raised. To resample using other methods than linear interpolation, please resample the source TimeSeries manually before, using TimeSeries.resample. Default is False. overwrite Optional. Select what to do if a key from the source TimeSeries already exists in the destination TimeSeries. True to overwrite the already existing value, False to ignore the new value. Default is False. on_conflict Optional. Select what the warning level when a key from the source TimeSeries already exists in the destination TimeSeries. May take the following values: - "mute": No warning; - "warning": Warns that duplicate keys were found and how the conflict has been resolved following the `overwrite`` parameter. - "error": Raises a TimeSeriesMergeConflictError. Default is "warning". in_place Optional. True to modify and return the original TimeSeries. False to return a modified copy of the TimeSeries while leaving the original TimeSeries intact. Default is False. Returns ------- TimeSeries The merged TimeSeries. Raises ------ TimeSeriesMergeConflictError If a key from the source TimeSeries already exists in the destination TimeSeries and on_conflict is set to "error". See Also -------- ktk.TimeSeries.get_subset ktk.TimeSeries.resample Notes ----- - All events are also merged from both TimeSeries. """ try: check_param("data_keys", data_keys, str) except TypeError: try: data_keys = list(data_keys) check_param("data_keys", data_keys, list, contents_type=str) except TypeError: raise TypeError( "data_keys must be a string or a list of strings." ) check_param("resample", resample, bool) check_param("overwrite", overwrite, bool) check_param("on_conflict", on_conflict, str) if on_conflict not in ["mute", "warning", "error"]: raise ValueError( "Parameter on_conflict must be either 'mute', 'warning' or " "'error'." ) check_param("in_place", in_place, bool) self._check_well_shaped() ts._check_well_shaped() # -- ts_out = self if in_place else self.copy() ts = ts.copy() if len(data_keys) == 0: data_keys = list(ts.data.keys()) elif isinstance(data_keys, str): data_keys = [data_keys] # Check if resampling is needed if len(ts_out.time) == 0: ts_out.time = deepcopy(ts.time) if (ts_out.time.shape == ts.time.shape) and np.all( ts_out.time == ts.time ): must_resample = False else: must_resample = True if must_resample is True and resample is False: raise ValueError( "Time attributes do not match, resampling is required." ) if must_resample is True: ts.resample(ts_out.time, in_place=True) for key in data_keys: # Check if this key is a duplicate, then continue to next key if # required. if key in ts_out.data: if overwrite is True: if on_conflict.lower() == "warning": warnings.warn( f"The key '{key}' exists in both TimeSeries. " f"According to the overwrite={overwrite} " "parameter, its prior value has been overwritten " "by the new value. Use on_conflict='mute' to mute " "this warning." ) elif on_conflict.lower() == "error": raise TimeSeriesMergeConflictError( f"The key '{key}' exists in both TimeSeries. " ) ts_out.data[key] = ts.data[key] else: # overwrite is False if on_conflict.lower() == "warning": warnings.warn( f"The key '{key}' exists in both TimeSeries. " f"According to the overwrite={overwrite} " "parameter, the new value has been ignored. Use " "on_conflict='mute' to mute this warning." ) elif on_conflict.lower() == "error": raise TimeSeriesMergeConflictError( f"The key {key} exist in both TimeSeries. " ) else: # Add this data ts_out.data[key] = ts.data[key] if key in ts.data_info: for info_key in ts.data_info[key].keys(): ts_out.add_data_info( key, info_key, ts.data_info[key][info_key], in_place=True, ) # Merge events for event in ts.events: ts_out.add_event( event.time, event.name, in_place=True, unique=True ) return ts_out
# %% Missing sample management
[docs] def isnan(self, data_key: str) -> np.ndarray: """ Return a boolean array of missing samples. Parameters ---------- data_key Key value of the data signal to analyze. Returns ------- np.ndarray A boolean array of the same size as the time attribute, where True values represent missing samples (samples that contain at least one nan value). See Also -------- ktk.TimeSeries.fill_missing_samples Example ------- >>> ts = ktk.TimeSeries(time=np.arange(4)) >>> ts = ts.add_data("data", np.zeros((4, 2))) >>> ts.data["data"][2, :] = np.nan >>> ts.data {'data': array([[ 0., 0.], [ 0., 0.], [nan, nan], [ 0., 0.]])} >>> ts.isnan("data") array([False, False, True, False]) """ check_param("data_key", data_key, str) self._check_well_shaped() values = self.data[data_key].copy() # Reduce the dimension of values while keeping the time dimension. while len(values.shape) > 1: values = np.sum(values, 1) # type: ignore return np.isnan(values)
[docs] def fill_missing_samples( self, max_missing_samples: int, *, method: str = "linear", in_place: bool = False, ) -> TimeSeries: """ Fill missing samples using a given method. Parameters ---------- max_missing_samples Maximal number of consecutive missing samples to fill. Set to zero to fill all missing samples. method Optional. The interpolation method. This input may take any value supported by scipy.interpolate.interp1d, such as "linear", "nearest", "zero", "slinear", "quadratic", "cubic", "previous" or "next". Default is "linear". in_place Optional. True to modify and return the original TimeSeries. False to return a modified copy of the TimeSeries while leaving the original TimeSeries intact. Default is False. Returns ------- TimeSeries The TimeSeries with the missing samples filled. Raises ------ ValueError If the sample rate is not constant. See Also -------- ktk.TimeSeries.isnan """ check_param("max_missing_samples", max_missing_samples, int) check_param("method", method, str) check_param("in_place", in_place, bool) self._check_well_shaped() if np.isnan(self.get_sample_rate()): raise ValueError("The sample rate must be constant.") ts_out = self if in_place else self.copy() for data in ts_out.data: # Fill missing samples is_visible = ~ts_out.isnan(data) ts = ts_out.get_subset(data) ts.data[data] = ts.data[data][is_visible] ts.time = ts.time[is_visible] ts = ts.resample(ts_out.time, method, extrapolate=True) # Put back missing samples in holes longer than max_missing_samples if max_missing_samples > 0: still_visible_index = -1 to_keep = np.ones(self.time.shape) for current_index in range(ts.time.shape[0]): if is_visible[current_index]: still_visible_index = current_index elif ( current_index - still_visible_index > max_missing_samples ): to_keep[ still_visible_index + 1 : current_index + 1 ] = 0 ts.data[data][to_keep == 0] = np.nan ts_out.data[data] = ts.data[data] return ts_out
# %% Graphical user interfaces
[docs] def ui_edit_events( self, name: str | list[str] = [], data_keys: str | list[str] = [], ) -> TimeSeries: # pragma: no cover """ Edit events interactively. Parameters ---------- name Optional. The name of the event(s) to add. May be a string or a list of strings. These events appear on their own buttons "add `name`". Event names can also be defined interactively. data_keys Optional. A signal name of list of signal name to be plotted, similar to the data_keys argument of ktk.TimeSeries.plot. Returns ------- TimeSeries The TimeSeries with the modified events. If the operation was cancelled by the user, this is the original TimeSeries. Warning ------- This function, which has been introduced in 0.6, is still experimental and may change signature or behaviour in the future. See Also -------- ktk.TimeSeries.add_event ktk.TimeSeries.rename_event ktk.TimeSeries.remove_event ktk.TimeSeries.trim_events Note ---- Matplotlib must be in interactive mode for this function to work. """ check_interactive_backend() try: check_param("name", name, str) except TypeError: try: check_param("name", name, list, contents_type=str) except TypeError: raise TypeError("name must be a string or a list of strings.") try: check_param("data_keys", data_keys, str) except TypeError: try: check_param("data_keys", data_keys, list, contents_type=str) except TypeError: raise TypeError( "data_keys must be a string or a list of strings." ) self._check_well_shaped() self._check_not_empty_time() self._check_not_empty_data() def add_this_event(ts: TimeSeries, name: str) -> TimeSeries: kineticstoolkit.gui.message( "Place the event on the figure.", **WINDOW_PLACEMENT ) this_time = plt.ginput(1)[0][0] ts = ts.add_event(this_time, name) kineticstoolkit.gui.message("") return ts def get_event_index(ts: TimeSeries) -> int: kineticstoolkit.gui.message( "Select an event on the figure.", **WINDOW_PLACEMENT ) this_time = plt.ginput(1)[0][0] event_times = np.array([event.time for event in ts.events]) kineticstoolkit.gui.message("") return int(np.argmin(np.abs(event_times - this_time))) # Set Matplotlib interactive mode isinteractive = plt.isinteractive() plt.ion() ts = self.copy() if isinstance(name, str): event_names = [name] else: event_names = deepcopy(name) fig = plt.figure() ts.plot(data_keys, _raise_on_no_data=True) while True: # Populate the choices to the user choices = [f"Add '{s}'" for s in event_names] choice_index = {} choice_index["add"] = len(choices) if len(event_names) == 0: choices.append("Add event") else: choices.append("Add event with another name") if len(ts.events) > 0: choice_index["remove"] = len(choices) choices.append("Remove event") if len(ts.events) > 0: choice_index["remove_all"] = len(choices) choices.append("Remove all events") choice_index["move"] = len(choices) choices.append("Move event") choice_index["close"] = len(choices) choices.append("Save and close") choice_index["cancel"] = len(choices) choices.append("Cancel") # Show the button dialog choice = kineticstoolkit.gui.button_dialog( "Move and zoom on the figure,\n" "then select an option below.", choices, **WINDOW_PLACEMENT, ) # Execute if choice < choice_index["add"]: ts = add_this_event(ts, event_names[choice]) elif choice == choice_index["add"]: event_names.append( li.input_dialog( "Please enter the event name:", **WINDOW_PLACEMENT ) ) # Add this event name to the list of recently added events if len(event_names) > 5: event_names = event_names[-5:] # Add the event ts = add_this_event(ts, event_names[-1]) elif ("remove" in choice_index) and ( choice == choice_index["remove"] ): event_index = get_event_index(ts) try: ts.events.pop(event_index) except IndexError: li.button_dialog( "No event was removed.", choices=["OK"], icon="error", **WINDOW_PLACEMENT, ) elif ("remove_all" in choice_index) and ( choice == choice_index["remove_all"] ): if ( li.button_dialog( "Do you really want to remove all events from this " "TimeSeries?", ["Yes, remove all events", "No"], icon="alert", **WINDOW_PLACEMENT, ) == 0 ): ts.events = [] elif ("move" in choice_index) and (choice == choice_index["move"]): event_index = get_event_index(ts) event_name = ts.events[event_index].name try: ts.events.pop(event_index) ts = add_this_event(ts, event_name) except IndexError: li.button_dialog( "Could not move this event.", choices=["OK"], icon="error", **WINDOW_PLACEMENT, ) elif ("close" in choice_index) and ( choice == choice_index["close"] ): plt.close(fig) if not isinteractive: plt.ioff() return ts elif (choice == -1) or ( ("cancel" in choice_index) and (choice == choice_index["cancel"]) ): plt.close(fig) if not isinteractive: plt.ioff() return self.copy() # Refresh ts.remove_duplicate_events(in_place=True) axes = plt.axis() plt.cla() ts.plot(data_keys, _raise_on_no_data=True) plt.axis(axes)
[docs] def ui_sync( self, data_keys: str | list[str] = [], ts2: TimeSeries | None = None, data_keys2: str | list[str] = [], ) -> TimeSeries: # pragma: no cover """ Synchronize one or two TimeSeries by shifting their time. If this method is called on only one TimeSeries, an interactive interface asks the user to click on the time to set to zero. If another TimeSeries is given, an interactive interface allows synchronizing both TimeSeries together. Parameters ---------- data_keys Optional. The data keys to plot. If empty, all data is plotted. ts2 Optional. A second TimeSeries to be synced to the first one. This TimeSeries is modified in place. data_keys2 Optional. The data keys from the second TimeSeries to plot. If empty, all data is plotted. Returns ------- TimeSeries The TimeSeries after synchronization. Warning ------- This function, which has been introduced in 0.1, is still experimental and may change signature or behaviour in the future. See Also -------- ktk.TimeSeries.shift Notes ----- Matplotlib must be in interactive mode for this method to work. """ check_interactive_backend() try: check_param("data_keys", data_keys, str) except TypeError: try: check_param("data_keys", data_keys, list, contents_type=str) except TypeError: raise TypeError( "data_keys must be a string or a list of strings." ) try: check_param("data_keys2", data_keys2, str) except TypeError: try: check_param("data_keys2", data_keys2, list, contents_type=str) except TypeError: raise TypeError( "data_keys2 must be a string or a list of strings." ) self._check_well_shaped() self._check_not_empty_time() self._check_not_empty_data() if ts2 is not None: ts2._check_well_shaped() ts2._check_not_empty_time() ts2._check_not_empty_data() ts1 = self.copy() fig = plt.figure("ktk.TimeSeries.ui_sync") if ts2 is None: # Synchronize ts1 only ts1.plot(data_keys) choice = kineticstoolkit.gui.button_dialog( "Please zoom on the time zero and press Next.", ["Cancel", "Next"], **WINDOW_PLACEMENT, ) if choice != 1: plt.close(fig) return ts1 kineticstoolkit.gui.message( "Click on the sync event.", **WINDOW_PLACEMENT ) click = plt.ginput(1) kineticstoolkit.gui.message("") plt.close(fig) ts1 = ts1.shift(-click[0][0]) else: # Sync two TimeSeries together finished = False # list of axes: axes = [] # type: list[Any] while finished is False: if len(axes) == 0: axes.append(fig.add_subplot(2, 1, 1)) axes.append(fig.add_subplot(2, 1, 2, sharex=axes[0])) plt.sca(axes[0]) axes[0].cla() ts1.plot(data_keys) plt.title("First TimeSeries (ts1)") plt.grid(True) plt.tight_layout() plt.sca(axes[1]) axes[1].cla() ts2.plot(data_keys2) plt.title("Second TimeSeries (ts2)") plt.grid(True) plt.tight_layout() choice = kineticstoolkit.gui.button_dialog( "Please select an option.", choices=[ "Zero ts1 only, using ts1", "Zero ts2 only, using ts2", "Zero both TimeSeries, using ts1", "Zero both TimeSeries, using ts2", "Sync both TimeSeries on a common event", "Finished", ], **WINDOW_PLACEMENT, ) if choice == 0: # Zero ts1 only kineticstoolkit.gui.message( "Click on the time zero in ts1.", **WINDOW_PLACEMENT ) click_1 = plt.ginput(1) kineticstoolkit.gui.message("") ts1 = ts1.shift(-click_1[0][0]) elif choice == 1: # Zero ts2 only kineticstoolkit.gui.message( "Click on the time zero in ts2.", **WINDOW_PLACEMENT ) click_1 = plt.ginput(1) kineticstoolkit.gui.message("") ts2 = ts2.shift(-click_1[0][0]) elif choice == 2: # Zero ts1 and ts2 using ts1 kineticstoolkit.gui.message( "Click on the time zero in ts1.", **WINDOW_PLACEMENT ) click_1 = plt.ginput(1) kineticstoolkit.gui.message("") ts1 = ts1.shift(-click_1[0][0]) ts2 = ts2.shift(-click_1[0][0]) elif choice == 3: # Zero ts1 and ts2 using ts2 kineticstoolkit.gui.message( "Click on the time zero in ts2.", **WINDOW_PLACEMENT ) click_2 = plt.ginput(1) kineticstoolkit.gui.message("") ts1 = ts1.shift(-click_2[0][0]) ts2 = ts2.shift(-click_2[0][0]) elif choice == 4: # Sync on a common event kineticstoolkit.gui.message( "Click on the sync event in ts1.", **WINDOW_PLACEMENT ) click_1 = plt.ginput(1) kineticstoolkit.gui.message( "Now click on the same event in ts2.", **WINDOW_PLACEMENT, ) click_2 = plt.ginput(1) kineticstoolkit.gui.message("") ts1 = ts1.shift(-click_1[0][0]) ts2 = ts2.shift(-click_2[0][0]) elif choice == 5 or choice < -1: # OK or closed figure, quit. plt.close(fig) finished = True return ts1
[docs] def plot( self, data_keys: str | list[str] = [], *args, event_names: bool = True, legend: bool = True, **kwargs, ) -> None: """ Plot the TimeSeries in the current matplotlib figure. Parameters ---------- data_keys The data keys to plot. If left empty, all data is plotted. event_names Optional. True to plot the event names on top of the event lines. legend Optional. True to plot a legend, False otherwise. Note ---- Additional positional and keyboard arguments are passed to matplotlib's ``pyplot.plot`` function:: ts.plot(["Forces"], "--") plots the forces using a dashed line style. Example ------- For a TimeSeries ``ts`` with data keys being "Forces", "Moments" and "Angle":: ts.plot() plots all data (Forces, Moments and Angle), whereas:: ts.plot(["Forces", "Moments"]) plots only the forces and moments, without plotting the angle. """ try: check_param("data_keys", data_keys, str) except TypeError: try: check_param("data_keys", data_keys, list, contents_type=str) except TypeError: raise TypeError( "data_keys must be a string or a list of strings." ) check_param("event_names", event_names, bool) check_param("legend", legend, bool) self._check_well_shaped() # Private argument _raise_on_no_data: Raise an EmptyTimeSeriesError # instead of warning when no data is available to plot. if "_raise_on_no_data" in kwargs: raise_on_no_data = kwargs.pop("_raise_on_no_data") else: raise_on_no_data = False if data_keys is None or len(data_keys) == 0: # Plot all ts = self.copy() else: ts = self.get_subset(data_keys) try: self._check_not_empty_time() self._check_not_empty_data() except ValueError as e: if raise_on_no_data: raise e else: warnings.warn("No data available to plot.") return df = ts.to_dataframe() labels = df.columns.to_list() axes = plt.gca() # Don't know why I need to disable mypy on these lines. axes.set_prop_cycle( mpl.cycler(linewidth=[1, 2, 3, 4]) # type: ignore * mpl.cycler(linestyle=["-", "--", "-.", ":"]) # type: ignore * plt.rcParams["axes.prop_cycle"] ) # Plot the curves for i_label, label in enumerate(labels): axes.plot( df.index.to_numpy(), df[label].to_numpy(), *args, label=label, **kwargs, ) # Add labels plt.xlabel("Time (" + ts.time_info["Unit"] + ")") # Make unique list of units unit_set = set() for data in ts.data_info: for info in ts.data_info[data]: if info == "Unit": unit_set.add(ts.data_info[data][info]) # Plot this list unit_str = "" for unit in unit_set: if len(unit_str) > 0: unit_str += ", " unit_str += unit plt.ylabel(unit_str) # Plot the events n_events = len(ts.events) event_times = [] for event in ts.events: event_times.append(event.time) if len(ts.events) > 0: a = plt.axis() min_y = a[2] max_y = a[3] event_line_x = np.zeros(3 * n_events) event_line_y = np.zeros(3 * n_events) for i_event in range(0, n_events): event_line_x[3 * i_event] = event_times[i_event] event_line_x[3 * i_event + 1] = event_times[i_event] event_line_x[3 * i_event + 2] = np.nan event_line_y[3 * i_event] = min_y event_line_y[3 * i_event + 1] = max_y event_line_y[3 * i_event + 2] = np.nan plt.plot(event_line_x, event_line_y, ":k") if event_names: occurrences = {} # type:dict[str, int] for event in ts.events: if event.name == "_": name = "_" elif event.name in occurrences: occurrences[event.name] += 1 name = f"{event.name} {occurrences[event.name]}" else: occurrences[event.name] = 0 name = f"{event.name} 0" plt.text( event.time, max_y, name, rotation="vertical", horizontalalignment="center", fontsize="small", ) if legend: if len(labels) < 20: legend_location = "best" else: legend_location = "upper right" axes.legend( loc=legend_location, ncol=1 + int(len(labels) / 40) ) # Max 40 items per line
# %% Input/Output def _to_dataframe_and_info( self, ) -> tuple[pd.DataFrame, list[dict[str, Any]]]: """ Implement TimeSeries.to_dataframe with additional data_info. The second element of the output tuple is a list where each element corresponds to a column of the DataFrame, and each element is a copy of the inner data_info dictionary for this data. For instance, an element of the list could be: {"Unit": "N"}. """ # Init df_out = pd.DataFrame() info_out = [] # Go through data the_keys = self.data.keys() for the_key in the_keys: # Assign data original_data = self.data[the_key] if original_data.shape[0] > 0: # Not empty original_data_shape = original_data.shape data_length = original_data.shape[0] reshaped_data = np.reshape(original_data, (data_length, -1)) reshaped_data_shape = reshaped_data.shape df_data = pd.DataFrame(reshaped_data) # Get the column names index from the shape of the original data # The strategy here is to build matrices of indices, that have # the same shape as the original data, then reshape these matrices # the same way we reshaped the original data. Then we know where # the original indices are in the new reshaped data. original_indices = np.indices(original_data_shape[1:]) reshaped_indices = np.reshape( original_indices, (-1, reshaped_data_shape[1]) ) # Hint for my future self: # For a one-dimension series, reshaped_indices will be: # [[0]]. # For a two-dimension series, reshaped_indices will be: # [[0 1 2 ...]]. # For a three-dimension series, reshaped_indices will be: # [[0 0 0 ... 1 1 1 ... 2 2 2 ...] # 0 1 2 ... 0 1 2 ... 0 1 2 ...]] # and so on. # Assign column names column_names = [] for i_column in range(0, len(df_data.columns)): this_column_name = the_key n_indices = np.shape(reshaped_indices)[0] if n_indices > 0: # This data is expressed in more than one dimension. # We must add brackets to the column names to specify # the indices. this_column_name += "[" for i_indice in range(0, n_indices): this_column_name += str( reshaped_indices[i_indice, i_column] ) if i_indice == n_indices - 1: this_column_name += "]" else: this_column_name += "," column_names.append(this_column_name) df_data.columns = column_names else: # empty data df_data = pd.DataFrame(columns=[the_key]) # Merge this dataframe with the output dataframe df_out = pd.concat([df_out, df_data], axis=1) # Add the data_info that correspond to this key for i in df_data.columns: try: data_info = self.data_info[the_key] info_out.append(deepcopy(data_info)) except KeyError: info_out.append({}) df_out.index = self.time return (df_out, info_out)
[docs] def to_dataframe(self) -> pd.DataFrame: """ Create a DataFrame by reshaping all data to one bidimensional table. Undimensional data is converted to a single column, and two-dimensional (or more) data are converted to multiple columns with the additional dimensions in brackets. The TimeSeries's events and metadata such as `time_info` and `data_info` are not included in the resulting DataFrame. Returns ------- pd.DataFrame DataFrame with the index as the TimeSeries' time. See Also -------- ktk.TimeSeries.from_dataframe ktk.TimeSeries.from_array ktk.TimeSeries.to_array Examples -------- Example with unidimensional data: >>> ts = ktk.TimeSeries(time=np.arange(3) / 10) >>> ts = ts.add_data("test", np.array([0.0, 2.0, 3.0])) >>> ts.to_dataframe() test 0.0 0.0 0.1 2.0 0.2 3.0 Example with multidimensional data: >>> ts = ktk.TimeSeries(time=np.arange(4) / 10) >>> ts = ts.add_data("test", np.repeat([[0.0, 2.0, 3.0]], 4, axis=0)) >>> ts.data["test"] array([[0., 2., 3.], [0., 2., 3.], [0., 2., 3.], [0., 2., 3.]]) >>> ts.to_dataframe() test[0] test[1] test[2] 0.0 0.0 2.0 3.0 0.1 0.0 2.0 3.0 0.2 0.0 2.0 3.0 0.3 0.0 2.0 3.0 """ self._check_well_shaped() return self._to_dataframe_and_info()[0]
[docs] @staticmethod def from_dataframe( dataframe: pd.DataFrame, /, *, time_info: dict[str, Any] = {"Unit": "s"}, data_info: dict[str, dict[str, Any]] = {}, events: list[TimeSeriesEvent] = [], ) -> TimeSeries: """ Create a new TimeSeries from a Pandas Dataframe. Data in column which names end with bracketed indices such as [0], [1], [0,0], [0,1], etc. are converted to multidimensional arrays. For example, if a DataFrame has these column names:: "Forces[0]", "Forces[1]", "Forces[2]", "Forces[3]" then a single data key is created ("Forces") and the shape of the data is Nx4. Parameters ---------- dataframe A Pandas DataFrame where the index corresponds to time, and where each column corresponds to a data key. time_info Optional. Will be copied to the TimeSeries' time_info attribute. data_info Optional. Will be copied to the TimeSeries' data_info attribute. events Optional. Will be copied to the TimeSeries' events attribute. Returns ------- TimeSeries The converted TimeSeries. See Also -------- ktk.TimeSeries.to_dataframe ktk.TimeSeries.from_array ktk.TimeSeries.to_array Examples -------- **Example with unidimensional data** Create a DataFrame with two series of 3 samples: >>> import pandas as pd >>> df = pd.DataFrame([[1., 2.], [3., 4.], [5., 6.]]) >>> df.columns = ["test1", "test2"] >>> df test1 test2 0 1.0 2.0 1 3.0 4.0 2 5.0 6.0 Convert to a TimeSeries: >>> ts = ktk.TimeSeries.from_dataframe(df) >>> ts.data {'test1': array([1., 3., 5.]), 'test2': array([2., 4., 6.])} **Example with multidimensional data** Create a DataFrame with one series of 3 samples of dimension 2: >>> df = pd.DataFrame([[1., 2.], [3., 4.], [5., 6.]]) >>> df.columns = ["test[0]", "test[1]"] >>> df test[0] test[1] 0 1.0 2.0 1 3.0 4.0 2 5.0 6.0 Convert to a TimeSeries: >>> ts = ktk.TimeSeries.from_dataframe(df) >>> ts.data {'test': array([[1., 2.], [3., 4.], [5., 6.]])} **Example with even more dimensions** Create a DataFrame with one series of 5 samples of dimension 2x2 (rot) and one series of 5 samples of dimension 2 (trans): >>> df = pd.DataFrame() >>> df.index = [0., 0.1, 0.2, 0.3, 0.4] # Time in seconds >>> df["rot[0,0]"] = np.cos([0., 0.1, 0.2, 0.3, 0.4]) >>> df["rot[0,1]"] = -np.sin([0., 0.1, 0.2, 0.3, 0.4]) >>> df["rot[1,0]"] = np.sin([0., 0.1, 0.2, 0.3, 0.4]) >>> df["rot[1,1]"] = np.cos([0., 0.1, 0.2, 0.3, 0.4]) >>> df["trans[0]"] = [0., 0.1, 0.2, 0.3, 0.4] >>> df["trans[1]"] = [5., 6., 7., 8., 9.] >>> df rot[0,0] rot[0,1] rot[1,0] rot[1,1] trans[0] trans[1] 0.0 1.000000 -0.000000 0.000000 1.000000 0.0 5.0 0.1 0.995004 -0.099833 0.099833 0.995004 0.1 6.0 0.2 0.980067 -0.198669 0.198669 0.980067 0.2 7.0 0.3 0.955336 -0.295520 0.295520 0.955336 0.3 8.0 0.4 0.921061 -0.389418 0.389418 0.921061 0.4 9.0 Convert to a TimeSeries: >>> ts = ktk.TimeSeries(df) >>> ts.data {'rot': array([[[ 1. , -0. ], [ 0. , 1. ]], <BLANKLINE> [[ 0.99500417, -0.09983342], [ 0.09983342, 0.99500417]], <BLANKLINE> [[ 0.98006658, -0.19866933], [ 0.19866933, 0.98006658]], <BLANKLINE> [[ 0.95533649, -0.29552021], [ 0.29552021, 0.95533649]], <BLANKLINE> [[ 0.92106099, -0.38941834], [ 0.38941834, 0.92106099]]]), 'trans': array([[0. , 5. ], [0.1, 6. ], [0.2, 7. ], [0.3, 8. ], [0.4, 9. ]])} """ check_param("dataframe", dataframe, pd.DataFrame) check_param("time_info", time_info, dict, key_type=str) check_param( "data_info", data_info, dict, key_type=str, contents_type=dict ) check_param("events", events, list) ts = TimeSeries( time=dataframe.index.to_numpy(), time_info=time_info, data_info=data_info, events=events, ) # Remove spaces in indexes between brackets columns = dataframe.columns new_columns = [] for i_column, column in enumerate(columns): splitted = column.split("[") if len(splitted) > 1: # There are brackets new_columns.append( splitted[0] + "[" + splitted[1].replace(" ", "") ) else: new_columns.append(column) dataframe.columns = columns # Search for the column names and their dimensions # At the end, we end with something like: # dimensions['Data1'] = [] # dimensions['Data2'] = [[0], [1], [2]] # dimensions['Data3'] = [[0,0],[0,1],[1,0],[1,1]] dimensions = dict() # type: dict[str, list] for column in dataframe.columns: splitted = column.split("[") if len(splitted) == 1: # No brackets dimensions[column] = [] else: # With brackets key = splitted[0] index = literal_eval("[" + splitted[1]) if key in dimensions: dimensions[key].append(index) else: dimensions[key] = [index] n_samples = len(dataframe) # Assign the columns to the output for key in dimensions: if len(dimensions[key]) == 0: ts.data[key] = dataframe[key].to_numpy() else: highest_dims = np.max(np.array(dimensions[key]), axis=0) columns = [ key + str(dim).replace(" ", "") for dim in sorted(dimensions[key]) ] ts.data[key] = dataframe[columns].to_numpy() ts.data[key] = np.reshape( ts.data[key], [n_samples] + (highest_dims + 1).tolist() ) return ts
[docs] @staticmethod def from_array( array: ArrayLike, /, *, data_key: str = "data", time: ArrayLike = [], time_info: dict[str, Any] = {"Unit": "s"}, data_info: dict[str, dict[str, Any]] = {}, events: list[TimeSeriesEvent] = [], ) -> TimeSeries: """ Create a new TimeSeries from an array. Parameters ---------- array An array or list where the first dimension corresponds to time. data_key Optional. The name of the data (used as the key in the TimeSeries' data attribute). Default is "data". time Optional. An array that indicates the time for each sample. Its length must match the first dimension of the data array. If None (default), a matching time attribute of with a period of one second is created. time_info Optional. Will be copied to the TimeSeries' time_info attribute. data_info Optional. Will be copied to the TimeSeries' data_info attribute. events Optional. Will be copied to the TimeSeries' events attribute. Returns ------- TimeSeries The new TimeSeries. See Also -------- ktk.TimeSeries.to_array ktk.TimeSeries.from_dataframe ktk.TimeSeries.to_dataframe Examples -------- **Using default time** >>> ktk.TimeSeries([0.1, 0.2, 0.3, 0.4, 0.5]) TimeSeries with attributes: time: array([0., 1., 2., 3., 4.]) data: {'data': array([0.1, 0.2, 0.3, 0.4, 0.5])} time_info: {'Unit': 's'} data_info: {} events: [] **Specifiying time** >>> ktk.TimeSeries([0.1, 0.2, 0.3, 0.4, 0.5], time=[0.1, 0.2, 0.3, 0.4, 0.5]) TimeSeries with attributes: time: array([0.1, 0.2, 0.3, 0.4, 0.5]) data: {'data': array([0.1, 0.2, 0.3, 0.4, 0.5])} time_info: {'Unit': 's'} data_info: {} events: [] """ check_param("data_key", data_key, str) check_param("time_info", time_info, dict, key_type=str) check_param( "data_info", data_info, dict, key_type=str, contents_type=dict ) check_param("events", events, list) time = np.array(time) ts = TimeSeries( data={data_key: array}, time_info=time_info, data_info=data_info, events=events, ) if time.shape[0] == 0: ts.time = np.arange(ts.data[data_key].shape[0]) * 1.0 # floats else: ts.time = time return ts
# %% Deprecrated methods
[docs] @deprecated( since="0.15", until="2027", details=( "Events are now always sorted in the events attribute. " "There is no need to run the sort_events method anymore." ), ) def sort_events( self, *, unique: bool = False, in_place: bool = False ) -> TimeSeries: """ Deprecated. Sorts the TimeSeries' events from the earliest to the latest. Parameters ---------- unique Optional. True to make events unique so that no two events can have both the same name and the same time. in_place Optional. True to modify and return the original TimeSeries. False to return a modified copy of the TimeSeries while leaving the original TimeSeries intact. Default is False. Returns ------- TimeSeries The TimeSeries with the sorted events. """ check_param("unique", unique, bool) check_param("in_place", in_place, bool) self._check_well_typed() ts = self if in_place else self.copy() if unique: ts.remove_duplicate_events(in_place=True) ts.events = sorted(ts.events) return ts
# %% Main if __name__ == "__main__": # pragma: no cover import doctest doctest.testmod(optionflags=doctest.NORMALIZE_WHITESPACE)