Source code for kineticstoolkit.files

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020-2025 Félix Chénier

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Provide functions to load and save data.

The classes defined in this module are accessible directly from the
toplevel Kinetics Toolkit namespace (i.e. ktk.load, ktk.save).

"""

__author__ = "Félix Chénier"
__copyright__ = "Copyright (C) 2020-2025 Félix Chénier"
__email__ = "chenier.felix@uqam.ca"
__license__ = "Apache 2.0"

from kineticstoolkit.timeseries import TimeSeries
import kineticstoolkit.dev.kinetics as kinetics
import kineticstoolkit.config
from kineticstoolkit.typing_ import check_param

import os
import numpy as np
import pandas as pd
from typing import Any
import warnings
import shutil
import json
from datetime import datetime
import time
import getpass
import zipfile


def __dir__():  # pragma: no cover
    return ["save", "load"]


[docs] def save(filename: str, variable: Any) -> None: """ Save a variable to a ktk.zip file. A ktk.zip file is a zipped folder containing two files: - metadata.json, which includes save date, user, etc. - data.json, which includes the data. The following classes are supported: - dict containing any supported class - list containing any supported class - str - int - float - True - False - None - numpy.array - pandas.DataFrame (basic DataFrames, e.g., without multi-indexing.) - pandas.Series - ktk.TimeSeries Parameters ---------- filename: Name of the file to save to (e.g., "file.ktk.zip") variable: The variable to save. Returns ------- None Caution ------- Tuples are also supported but will be loaded back as lists, without warning. Complex Pandas Series (e.g., series or different types) may not be supported: only name, index and data are saved. Complex Pandas DataFrames (e.g., multiindex, columns of different types) may not be supported: only columns, index, and data are saved. See Also -------- ktk.load """ check_param("filename", filename, str) class CustomEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.ndarray): return {"class__": "numpy.array", "value": obj.tolist()} elif isinstance(obj, TimeSeries): out = {} out["class__"] = "ktk.TimeSeries" out["time"] = obj.time.tolist() out["time_info"] = obj.time_info out["data_info"] = obj.data_info out["data"] = {} for key in obj.data: out["data"][key] = obj.data[key].tolist() out["events"] = [] for event in obj.events: out["events"].append( { "time": event.time, "name": event.name, } ) return out elif isinstance(obj, pd.Series): return { "class__": "pandas.Series", "name": str(obj.name), "index": obj.index.tolist(), "data": obj.tolist(), } elif isinstance(obj, pd.DataFrame): return { "class__": "pandas.DataFrame", "columns": obj.columns.tolist(), "index": obj.index.tolist(), "data": obj.to_numpy().tolist(), } elif isinstance(obj, complex): return { "class__": "complex", "real": obj.real, "imag": obj.imag, } else: return super().default(obj) now = datetime.now() if kineticstoolkit.config.is_pc: computer = "PC" elif kineticstoolkit.config.is_mac: computer = "Mac" elif kineticstoolkit.config.is_linux: computer = "Linux" else: computer = "Unknown" metadata = { "Software": "Kinetics Toolkit", "Version": kineticstoolkit.config.version, "Computer": computer, "FileFormat": 1.0, "SaveDate": now.strftime("%Y-%m-%d"), "SaveTime": now.strftime("%H:%M:%S"), "User": getpass.getuser(), } # Save temp_folder = ( kineticstoolkit.config.temp_folder + "/save" + str(time.time()) ) try: shutil.rmtree(temp_folder) except: pass os.mkdir(temp_folder) with open(temp_folder + "/metadata.json", "w") as fid: json.dump(metadata, fid, indent="\t") with open(temp_folder + "/data.json", "w") as fid: json.dump(variable, fid, cls=CustomEncoder, indent="\t") shutil.make_archive(temp_folder, "zip", temp_folder) shutil.move(temp_folder + ".zip", filename) shutil.rmtree(temp_folder)
def _load_object_hook(obj): if "class__" in obj: to_class = obj["class__"] if to_class == "numpy.array": return np.array(obj["value"]) elif to_class == "ktk.TimeSeries": out = TimeSeries() out.time = np.array(obj["time"]) out.time_info = obj["time_info"] out.data_info = obj["data_info"] for key in obj["data"]: out.data[key] = np.array(obj["data"][key]) for event in obj["events"]: out = out.add_event(event["time"], event["name"]) return out elif to_class == "pandas.DataFrame": return pd.DataFrame( obj["data"], columns=obj["columns"], index=obj["index"], ) elif to_class == "pandas.Series": return pd.Series( obj["data"], name=obj["name"], index=obj["index"], ) elif to_class == "complex": return obj["real"] + obj["imag"] * 1j else: warnings.warn( f'The "{to_class}" class is not supported by ' "this version of Kinetics Toolkit. Please check " "that Kinetics Toolkit is up to date." ) return obj else: return obj
[docs] def load(filename: str, *, include_metadata: bool = False) -> Any: """ Load a ktk.zip file. Load a data file as saved using the ``ktk.save`` function. Usage:: data = ktk.load(filename) data, metadata = ktk.load(filename, include_metadata=True) Parameters ---------- filename The path of the zip file to load. include_metadata Optional. If True, the output is a tuple of this form: (data, metadata). Returns ------- Any The loaded variable. See Also -------- ktk.save """ check_param("filename", filename, str) check_param("include_metadata", include_metadata, bool) archive = zipfile.ZipFile(filename, "r") data = json.loads( archive.read("data.json").decode(), object_hook=_load_object_hook ) if include_metadata: metadata = json.loads( archive.read("metadata.json").decode(), object_hook=_load_object_hook, ) return data, metadata else: return data
[docs] def read_c3d( filename: str, *, include_event_context: bool = False, convert_point_unit: bool | None = None, convert_forceplate_moment_unit: bool = True, convert_forceplate_position_unit: bool = True, **kwargs, ) -> dict[str, TimeSeries]: """ Read point, analog and rotation data from a C3D file. If available, point positions are returned in `output["Points"]` as a TimeSeries, where each point corresponds to a data key. Each point position is expressed as an Nx4 point series:: [ [x0, y0, z0, 1.0], [x1, y1, z1, 1.0], [x2, y2, z2, 1.0], ..., ] If available, analog data is returned in `output["Analogs"]` as a TimeSeries, where each analog signal is expressed as a unidimensional array of length N. If available, force platform data is returned in `output["ForcePlatforms"]` as a TimeSeries containing the following keys where FPi means Force Platform i, i being the index of the platform (e.g., FP0, FP1, etc.): - FPi_Force: Ground reaction force components in global coordinates, as an Nx4 vector series [[Fx, Fy, Fz, 0.0], ...]. - FPi_Moment: Ground reaction moment components in global coordinates, expressed at the origin of the force platform, as an Nx4 vector series [[Mx, My, Mz, 0.0], ...]. - FPi_MomentAtCOP: Ground reaction moment components in global coordinates, expressed at the center of pressure, as an Nx4 vector series. - FPi_COP: Centre of pressure in global coordinates as n Nx4 point series. - FPi_LCS: Local coordinate system of the force platform, expressed in global coordinates as an Nx4x4 transform series. The origin is at the middle point of the four corners, with z pointing down. - FPi_Corner1: Coordinates of the first corner (+x, +y) in global coordinates, as an Nx4 point series. - FPi_Corner2: Coordinates of the second corner (-x, +y) in global coordinates, as an Nx4 point series. - FPi_Corner3: Coordinates of the third corner (-x, -y) in global coordinates, as an Nx4 point series. - FPi_Corner4: Coordinates of the fourth corner (+x, -y) in global coordinates, as an Nx4 point series. If available, rigid body orientations are returned in `output["Rotations"]` as a TimeSeries, where each body orientation is expressed as an Nx4x4 transforms series. Some software stores calculated values such as angles, forces, moments, powers, etc. into the C3D file. Storing these data is software-specific and is not standardized in the C3D file format (https://www.c3d.org). This function reads these values as points regardless of their nature. Parameters ---------- filename Path of the C3D file include_event_context Optional. True to include the event context, for C3D files that use this field. If False, the events in the output TimeSeries are named after the events names in the C3D files, e.g.: "Start", "Heel Strike", "Toe Off". If True, the events in the output TimeSeries are named using this scheme "context:name", e.g.,: "General:Start", "Right:Heel strike", "Left:Toe Off". Default is False. convert_point_unit Optional. True to convert the point units to meters, if they are expressed in other units such as mm in the C3D file. False to keep points as is. When unset, a warning is issued if points are stored in a different unit than meters. See caution note below. convert_forceplate_moment_unit Optional. True to convert forceplate moment unit to Nm. Default is True. convert_forceplate_position_unit Optional. True to convert forceplate position units to meters. Default is True. Returns ------- dict[str, ktk.TimeSeries] A dict of TimeSeries, with keys being, if available: "Points", "Analogs", "ForcePlates" and/or "Rotations". Caution ------- If, for a given C3D file, points are expressed in another unit than meters (e.g., mm), and that this file also contains calculated points such as angles, powers, etc., then you need to be cautious with the `convert_point_unit` parameter: - Setting `convert_point_unit` to False reads the file as is, but you then need to manually convert the points to meters. This can be done easily using ktk.geometry.scale. - Setting `convert_point_unit` to True scales all points to meters, but also scales every calculated angle, power, etc. as they are read as any other point. For these special cases, we recommend to set `convert_point_unit` to False, and then scale the points manually. Warning ------- This function, which has been introduced in version 0.9, is still experimental and its behavior or API may change slightly in the future. See Also -------- ktk.write_c3d, ktk.geometry.scale Notes ----- - This function relies on `ezc3d`, which is installed by default using conda, but not using pip. If you installed kineticstoolkit using pip, then please install ezc3d manually: `pip install ezc3d`. - As for any instrument, please check that your data loads correctly on your first use (e.g., sampling frequency, position unit, location and orientation of the force platforms, etc.). """ check_param("filename", filename, str) if convert_point_unit is not None: check_param("convert_point_unit", convert_point_unit, bool) check_param( "convert_forceplate_moment_unit", convert_forceplate_moment_unit, bool ) check_param( "convert_forceplate_position_unit", convert_forceplate_position_unit, bool, ) check_param("include_event_context", include_event_context, bool) if not filename.endswith(".c3d"): raise ValueError("The file name must end with '.c3d'.") try: import ezc3d except ModuleNotFoundError: raise ModuleNotFoundError( "The optional module ezc3d is not installed, but it is required " "to use this function. Please install it using: " "conda install -c conda-forge ezc3d" ) if "return_ezc3d" in kwargs: return_ezc3d = kwargs["return_ezc3d"] else: return_ezc3d = False # Create the output output = {} # Create the reader if isinstance(filename, str) and os.path.exists(filename): try: reader = ezc3d.c3d(filename, extract_forceplat_data=True) except OSError: # Maybe there's an invalid character in filename. # Try to workaround # https://github.com/pyomeca/ezc3d/issues/252 tempfile = kineticstoolkit.config.temp_folder + "/temp.c3d" shutil.copyfile(filename, tempfile) reader = ezc3d.c3d(tempfile, extract_forceplat_data=True) os.remove(tempfile) else: raise FileNotFoundError(f"File {filename} was not found.") if return_ezc3d: output["C3D"] = reader # --------------------------------- # List the events try: event_names = reader["parameters"]["EVENT"]["LABELS"]["value"] event_times = reader["parameters"]["EVENT"]["TIMES"]["value"].T event_times = event_times[:, 0] * 60 + event_times[:, 1] except KeyError: event_names = [] event_times = [] try: event_contexts = reader["parameters"]["EVENT"]["CONTEXTS"]["value"] except KeyError: event_contexts = ["" for _ in event_names] # Create a list of events to copy in the output TimeSeries temp_ts = TimeSeries() for i_event in range(len(event_names)): event_time = event_times[i_event] if include_event_context: event_name = event_contexts[i_event] + ":" + event_names[i_event] else: event_name = event_names[i_event] temp_ts.add_event( event_time, event_name, in_place=True, ) events = temp_ts.events # ----------------- # Points # ----------------- # Get the marker label names and create a timeseries data entry for each # Get the labels point_rate = reader["parameters"]["POINT"]["RATE"]["value"][0] if reader["parameters"]["POINT"]["USED"]["value"][0] > 0: point_unit = reader["parameters"]["POINT"]["UNITS"]["value"][0] else: point_unit = "m" point_start = reader["header"]["points"]["first_frame"] if point_rate > 0: start_time = point_start / point_rate else: start_time = 0 n_points = reader["parameters"]["POINT"]["USED"]["value"][0] labels = reader["parameters"]["POINT"]["LABELS"]["value"] # Check if labels2, labels3, labels4,.... exist. # https://www.c3d.org/HTML/default.htm?turl=Documents%2Fpointlabels2.htm i_additional_labels = 2 while True: str_labels = f"LABELS{i_additional_labels}" try: additional_labels = reader["parameters"]["POINT"][str_labels][ "value" ] except KeyError: break labels.extend(additional_labels) i_additional_labels += 1 # Solve the point unit conversion mess (issue #147) scales = {"mm": 0.001, "cm": 0.01, "dm": 0.1, "m": 1.0} if convert_point_unit is None: if point_unit == "m": point_factor = 1.0 point_unit = "m" elif point_unit in scales: warnings.warn( "In the specified file, points are expressed in " f"{point_unit}. They have been automatically converted to meters " f"(scaled by {scales[point_unit]}). Please note that " "if this file also contains calculated values such as " "angles, powers, etc., they have been also (wrongly) scaled by " f"{scales[point_unit]}. Consult " "https://kineticstoolkit.uqam.ca/doc/api/ktk.read_c3d.html " "for more information. You can mute this warning " "by explicitely setting `convert_point_unit` to either True " "or False." ) point_factor = scales[point_unit] point_unit = "m" else: warnings.warn( "In the specified file, points are expressed in " f"`{point_unit}`, which is not recognized by ktk.read_c3d. They " "have been left as is, without attempting to convert to meters. " "You can mute this warning by setting `convert_point_unit` to " "False." ) point_factor = 1.0 # point_unit = Do not update elif convert_point_unit is True: try: point_factor = scales[point_unit] point_unit = "m" except KeyError: raise ValueError( "In the specified file, points are expressed in " f"`{point_unit}`, which is not recognized by ktk.read_c3d. " "Please set `convert_point_unit` to None of False." ) else: point_factor = 1 # point_unit = Do not update if n_points > 0: # There are points points = TimeSeries() for i_label in range(n_points): # Make sure it's UTF8, and strip leading and ending spaces label = labels[i_label] key = label.encode("utf-8", "ignore").decode("utf-8").strip() # Ensure key is unique, in case of multiple series labelled # with the same name if (key == "") or (key in points.data): suffix_integer = 1 while f"{key}_{suffix_integer}" in points.data: suffix_integer += 1 key = f"{key}_{suffix_integer}" points.data[key] = np.array( [point_factor, point_factor, point_factor, 1] * reader["data"]["points"][:, i_label, :].T ) points.add_data_info(key, "Unit", point_unit, in_place=True) if n_points > 0: points.time = ( np.arange(points.data[key].shape[0]) / point_rate + start_time ) # Add events points.events = events.copy() output["Points"] = points # ----------------- # Analogs # ----------------- labels = reader["parameters"]["ANALOG"]["LABELS"]["value"] analog_rate = reader["parameters"]["ANALOG"]["RATE"]["value"][0] n_analogs = reader["parameters"]["ANALOG"]["USED"]["value"][0] try: units = reader["parameters"]["ANALOG"]["UNITS"]["value"] if len(units) == 0: raise KeyError("No unit") except KeyError: # No units in the file, create an empty unit for each label units = ["" for _ in labels] # Check if labels2, labels3, labels4,.... exist. # https://www.c3d.org/HTML/default.htm?turl=Documents%2Fpointlabels2.htm # In contrast to the points case, units is an array of strings. i_additional_labels = 2 while True: str_labels = f"LABELS{i_additional_labels}" str_units = f"UNITS{i_additional_labels}" try: additional_labels = reader["parameters"]["ANALOG"][str_labels][ "value" ] try: additional_units = reader["parameters"]["ANALOG"][str_units][ "value" ] except KeyError: # If there are no additional units, just fill with blank spaces additional_units = ["" for _ in additional_labels] except KeyError: break labels.extend(additional_labels) units.extend(additional_units) i_additional_labels += 1 if len(labels) > 0: # There are analogs analogs = TimeSeries() for i_label in range(n_analogs): # Strip leading and ending spaces label = labels[i_label] key = label.encode("utf-8", "ignore").decode("utf-8").strip() # Ensure key is unique, in case of multiple series labelled # with the same name if (key == "") or (key in analogs.data): suffix_integer = 1 while f"{key}_{suffix_integer}" in analogs.data: suffix_integer += 1 key = f"{key}_{suffix_integer}" analogs.data[key] = reader["data"]["analogs"][0, i_label].T if units[i_label] != "": analogs.add_data_info( key, "Unit", units[i_label].encode("utf-8", "ignore").decode("utf-8"), in_place=True, ) if n_analogs > 0: analogs.time = ( np.arange(analogs.data[key].shape[0]) / analog_rate + start_time ) # Add events analogs.events = events.copy() output["Analogs"] = analogs # ----------------- # Rotations # ----------------- # Some files do not have a ROTATION parameter, do nothing for those. if "ROTATION" in reader["parameters"]: rotations = TimeSeries() # Get the marker label names and create a timeseries data entry for each # Get the labels rotation_rate = reader["parameters"]["ROTATION"]["RATE"]["value"][0] rotation_start = reader["header"]["rotations"]["first_frame"] start_time = rotation_start / rotation_rate n_rotations = reader["parameters"]["ROTATION"]["USED"]["value"][0] labels = reader["parameters"]["ROTATION"]["LABELS"]["value"] if len(labels) > 0: # There are rotations # no additional labels and scale conversion for rotation matrices # move to adding data to the TimeSeries for rotation_id in range(n_rotations): # Make sure it's UTF8, and strip leading and ending spaces label = labels[rotation_id] key = label.encode("utf-8", "ignore").decode("utf-8").strip() # Ensure key is unique, in case of multiple series labelled # with the same name if (key == "") or (key in rotations.data): suffix_integer = 1 while f"{key}_{suffix_integer}" in rotations.data: suffix_integer += 1 key = f"{key}_{suffix_integer}" rotations.data[key] = np.array( np.transpose( reader["data"]["rotations"][:, :, rotation_id, :], (2, 0, 1), ) ) if n_rotations > 0: rotations.time = ( np.arange(rotations.data[key].shape[0]) / rotation_rate + start_time ) # Matrices with nans should be complete nans. Some c3d may contain # nans in the data but [0, 0, 0, 1] on the 4th line. for data in rotations.data: rotations.data[data][rotations.isnan(data), :, :] = np.nan # Add events rotations.events = events.copy() # Add to output output["Rotations"] = rotations # ----------------- # Platforms # ----------------- if reader["data"]["platform"] != []: platforms = TimeSeries(time=analogs.time) # type: ignore n_platforms = len(reader["data"]["platform"]) for i_platform in range(n_platforms): # Define unit conversion factors forceplate_position_unit = reader["data"]["platform"][i_platform][ "unit_position" ] if convert_forceplate_position_unit and ( forceplate_position_unit == "mm" ): forceplate_position_factor = 0.001 forceplate_position_unit = "m" elif convert_forceplate_position_unit and ( forceplate_position_unit == "cm" ): forceplate_position_factor = 0.01 forceplate_position_unit = "m" elif convert_forceplate_position_unit and ( forceplate_position_unit == "dm" ): forceplate_position_factor = 0.1 forceplate_position_unit = "m" else: forceplate_position_factor = 1 forceplate_moment_unit = reader["data"]["platform"][i_platform][ "unit_moment" ] if convert_forceplate_moment_unit and ( forceplate_moment_unit == "Nmm" ): moment_factor = 0.001 forceplate_moment_unit = "Nm" elif forceplate_moment_unit == "Nm": moment_factor = 1 else: moment_factor = 1 warnings.warn( f"Moment unit is {forceplate_moment_unit} instead of Nm or Nmm." ) # Add corners for i_corner in range(4): key = f"FP{i_platform}_Corner{i_corner+1}" platforms.data[key] = np.ones((len(platforms.time), 4)) platforms.data[key][:, 0:3] = ( forceplate_position_factor * reader["data"]["platform"][i_platform]["corners"][ 0:3, i_corner ] ) platforms.add_data_info( key, "Unit", forceplate_position_unit, in_place=True ) # Add origin and the whole local coordinate system lcs = kinetics.create_forceplatform_lcs( platforms.data[f"FP{i_platform}_Corner1"], platforms.data[f"FP{i_platform}_Corner2"], platforms.data[f"FP{i_platform}_Corner3"], platforms.data[f"FP{i_platform}_Corner4"], ) platforms.data[f"FP{i_platform}_LCS"] = lcs # Add ground reaction force force_unit = reader["data"]["platform"][i_platform]["unit_force"] if force_unit != "N": warnings.warn( f"Force unit is {force_unit} instead of newtons." ) key = f"FP{i_platform}_Force" force = np.zeros((len(platforms.time), 4)) force[:, 0:3] = reader["data"]["platform"][i_platform]["force"].T platforms.data[key] = force platforms.add_data_info(key, "Unit", force_unit, in_place=True) # Add moment around origin key = f"FP{i_platform}_MomentAtCenter" moment = np.zeros((len(platforms.time), 4)) moment[:, 0:3] = ( moment_factor * reader["data"]["platform"][i_platform]["moment"].T ) platforms.data[key] = moment platforms.add_data_info( key, "Unit", forceplate_moment_unit, in_place=True ) # Add COP key = f"FP{i_platform}_COP" # # Calculated by KTK # local_force = geometry.get_local_coordinates(force, lcs) # local_moment = geometry.get_local_coordinates(moment, lcs) # local_cop = kinetics.calculate_cop(local_force, local_moment) # platforms.data[key] = geometry.get_global_coordinates( # local_cop, lcs # ) # Already calculated by ezc3d platforms.data[key] = np.ones((len(platforms.time), 4)) platforms.data[key][:, 0:3] = ( forceplate_position_factor * reader["data"]["platform"][i_platform][ "center_of_pressure" ].T ) platforms.add_data_info( key, "Unit", forceplate_position_unit, in_place=True ) # Add moments at COP key = f"FP{i_platform}_MomentAtCOP" platforms.data[key] = np.zeros((len(platforms.time), 4)) platforms.data[key][:, 0:3] = ( moment_factor * reader["data"]["platform"][i_platform]["Tz"].T ) platforms.add_data_info( key, "Unit", forceplate_moment_unit, in_place=True ) # Add events platforms.events = events.copy() output["ForcePlatforms"] = platforms return output
[docs] def write_c3d( filename: str, points: TimeSeries | None = None, analogs: TimeSeries | None = None, rotations: TimeSeries | None = None, ) -> None: """ Write points, analog, and rotations data to a C3D file. Parameters ---------- filename Path of the C3D file points Optional. Points trajectories, where data key corresponds to a point, expressed as an Nx4 point series:: [ [x0, y0, z0, 1.0], [x1, y1, z1, 1.0], [x2, y2, z2, 1.0], ..., ] Events from this TimeSeries are also added to the c3d. analogs Optional. Analog signals, where each data key is one series. Series that are not unidimensional are converted to multiple unidimensional series. For instance, if the shape of analogs.data["Forces"] is 1000x3, then three unidimensional series of length 1000 are created in the C3D: Forces[0], Forces[1] and Forces[2]. If both `analogs` and `points` are specified, the sample rate of `analogs` must be an integer multiple of the `points`'s sample rate. Also, `analogs.time[0]` must be the same as `points.time[0]`. rotations Optional. Rotation matrices, where each data key corresponds to a rotation matrix, expressed as a Nx4x4 array. If both `rotations` and `points` are specified, the sample rate of `rotations` must be an integer multiple of the `points`'s sample rate. Also, `rotations.time[0]` must be the same as `rotations.time[0]`. See Also -------- ktk.read_c3d Notes ----- This function relies on `ezc3d`, which is installed by default using conda, but not using pip. Please install ezc3d before using write_c3d. https://github.com/pyomeca/ezc3d Example ------- Create a simple c3d file with two markers sampled at 240 Hz and two sinusoidal analog signals sampled at 1200 Hz, during 10 seconds:: import kineticstoolkit.lab as ktk import numpy as np points = ktk.TimeSeries() points.time = np.linspace(0, 10, 10*240, endpoint=False) points.data["Marker1"] = np.ones((2400, 4)) points.data["Marker2"] = np.ones((2400, 4)) analogs = ktk.TimeSeries() analogs.time = np.linspace(0, 10, 10*2400, endpoint=False) analogs.data["Signal1"] = np.sin(analogs.time) analogs.data["Signal2"] = np.cos(analogs.time) ktk.write_c3d("testfile.c3d", points=points, analogs=analogs) """ check_param("filename", filename, str) check_param("points", points, (TimeSeries, type(None))) check_param("analogs", analogs, (TimeSeries, type(None))) check_param("rotations", rotations, (TimeSeries, type(None))) if not filename.endswith(".c3d"): raise ValueError("The file name must end with '.c3d'.") try: import ezc3d except ModuleNotFoundError: raise ModuleNotFoundError( "The optional module ezc3d is not installed, but it is required " "to use this function. Please install it using: " "conda install -c conda-forge ezc3d" ) if points is None: # Dummy point data must be created since analogs and rotations rate ratio # are based on point rate. points = TimeSeries() if rotations is not None: points = rotations.copy(copy_data=False, copy_data_info=False) elif analogs is not None: points = analogs.copy(copy_data=False, copy_data_info=False) else: raise ValueError( "At least one of points, analogs, or rotations must be " "provided. Writing empty C3D files is not supported." ) # Create an empty c3d structure c3d = ezc3d.c3d() # Add the points, but first make some checks points._check_constant_sample_rate() point_rate = points.get_sample_rate() point_unit = None for key in points.data: # Check that this is a series of points if points.data[key].shape[1] != 4: raise ValueError(f"Point {key} is not a Nx4 series of points.") if not np.all( np.isclose(points.data[key][:, 3], 1.0), where=~points.isnan(key), ): raise ValueError(f"Point {key} is not a series of [x, y, z, 1.0]") # Check that units are all the same (or None) if key not in points.data_info: continue if "Unit" not in points.data_info[key]: continue this_unit = points.data_info[key]["Unit"] if this_unit is None: continue if point_unit is None: point_unit = this_unit continue if point_unit != this_unit: raise ValueError( "Found different point units in the TimeSeries: " f"{point_unit} and {this_unit}." ) if point_unit is None: point_unit = "m" # Default # Now format and add the points point_list = [] point_data = np.zeros((4, len(points.data), len(points.time))) for i_point, point in enumerate(points.data): point_list.append(point) point_data[0, i_point, :] = points.data[point][:, 0] point_data[1, i_point, :] = points.data[point][:, 1] point_data[2, i_point, :] = points.data[point][:, 2] point_data[3, i_point, :] = points.data[point][:, 3] # Fill point data c3d["header"]["points"]["first_frame"] = round(points.time[0] * point_rate) c3d.add_parameter("POINT", "RATE", [point_rate]) if len(point_list) > 0: c3d.add_parameter("POINT", "LABELS", [tuple(point_list)]) c3d.add_parameter("POINT", "UNITS", point_unit) c3d["data"]["points"] = point_data # Fill analog data if analogs is not None: analogs._check_constant_sample_rate() analog_rate = analogs.get_sample_rate() rate_ratio = analog_rate / point_rate if ~np.isclose(rate_ratio, round(rate_ratio)): raise ValueError( "The sample rate of analogs must be an integer " "multiple of the points sample rate." ) if ~np.isclose(analogs.time[0], points.time[0]): raise ValueError( "Points and analogs must share the same starting time. " f"However, points.time[0] = {points.time[0]} whereas " f"analogs.time[0] = {analogs.time[0]}." ) # Since analogs are unidimensional, we will use the DataFrame exporter # to get one column per analog value. This way, forces would become # forces[0], forces[1], forces[2] and forces[3]. df_analogs, analogs_data_info = analogs._to_dataframe_and_info() c3d.add_parameter("ANALOG", "LABELS", list(df_analogs.columns)) c3d.add_parameter("ANALOG", "RATE", [analog_rate]) c3d.add_parameter( "ANALOG", "UNITS", [_["Unit"] if "Unit" in _ else "" for _ in analogs_data_info], ) c3d.add_parameter("ANALOG", "USED", [len(analogs_data_info)]) c3d["header"]["analogs"]["first_frame"] = round( analogs.time[0] * analog_rate ) c3d["data"]["analogs"] = (df_analogs.to_numpy().T)[np.newaxis] # fill rotation data if rotations is not None: rotations._check_constant_sample_rate() rotation_rate = rotations.get_sample_rate() rate_ratio = rotation_rate / point_rate if ~np.isclose(rate_ratio, round(rate_ratio)): raise ValueError( "The sample rate of rotations must be an integer " "multiple of the points sample rate." ) if ~np.isclose(rotations.time[0], points.time[0]): raise ValueError( "Points and rotations must share the same starting time. " f"However, points.time[0] = {points.time[0]} whereas " f"rotations.time[0] = {rotations.time[0]}." ) # Final data should be a 4x4xlen(rotations.data.keys())xlen(rotations.time) # np.ndarray c3d_rotations = np.zeros( (4, 4, len(rotations.data.keys()), len(rotations.time)) ) for i_rot, key in enumerate(rotations.data): # change from shape (len(rotations.time), 4, 4) to (4, 4, len(rotations.time)) c3d_rotations[:, :, i_rot, :] = np.transpose( rotations.data[key], (1, 2, 0) ) c3d.add_parameter("ROTATION", "LABELS", [*rotations.data.keys()]) c3d.add_parameter("ROTATION", "RATE", [rotation_rate]) c3d.add_parameter("ROTATION", "USED", [len(rotations.data.keys())]) c3d["header"]["rotations"]["first_frame"] = round( rotations.time[0] * rotation_rate ) c3d["data"]["rotations"] = c3d_rotations # Write the data c3d.write(filename) # --------------------------------- # Add the events c3d = ezc3d.c3d(filename) points = points.copy() if analogs is not None: points.events.extend(analogs.events) if rotations is not None: points.events.extend(rotations.events) points.remove_duplicate_events(in_place=True) for event in points.events: c3d.add_event( time=[event.time // 60, np.mod(event.time, 60)], label=event.name ) # Write the data again # (workaround https://github.com/pyomeca/ezc3d/issues/263) c3d.write(filename)
if __name__ == "__main__": # pragma: no cover import doctest doctest.testmod(optionflags=doctest.NORMALIZE_WHITESPACE)