Source code for figaroh.identification.base_identification

# Copyright [2021-2025] Thanh Nguyen
# Copyright [2022-2023] [CNRS, Toward SAS]

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Base class for robot dynamic parameter identification.
This module provides a generalized framework for dynamic parameter identification
that can be inherited by any robot type (TIAGo, UR10, MATE, etc.).
"""

import yaml
import numpy as np
from abc import ABC, abstractmethod

# FIGAROH imports
from figaroh.identification.identification_tools import (
    get_param_from_yaml as get_identification_param_from_yaml,
    unified_to_legacy_identif_config,
)
from figaroh.utils.config_parser import (
    UnifiedConfigParser,
    create_task_config,
    is_unified_config
)
from figaroh.tools.regressor import (
    build_regressor_basic,
    get_index_eliminate,
    build_regressor_reduced,
)
from figaroh.identification.identification_tools import get_standard_parameters


[docs] class BaseIdentification(ABC): """ Base class for robot dynamic parameter identification. Provides common functionality for all robots while allowing robot-specific implementations of key methods. """ def __init__(self, robot, config_file="config/robot_config.yaml"): """Initialize base identification with robot model and configuration. Args: robot: Robot model loaded with FIGAROH config_file: Path to robot configuration YAML file """ self.robot = robot self.model = self.robot.model self.data = self.robot.data # Load configuration using the TIAGo-style approach self.load_param(config_file) # Initialize attributes for identification results self.dynamic_regressor = None self.standard_parameter = None self.params_base = None self.dynamic_regressor_base = None self.phi_base = None self.rms_error = None self.correlation = None self.processed_data = None self.result = None self.num_samples = None self.tau_ref = None self.tau_identif = None self.tau_noised = None self.filter_config = { 'differentiation_method': 'gradient', 'filter_params': {} } print(f"{self.__class__.__name__} initialized")
[docs] def initialize(self, truncate=None): self.process_data(truncate=truncate) self.calculate_full_regressor()
[docs] def solve(self, decimate=True, decimation_factor=10, zero_tolerance=0.001, plotting=True, save_results=False): """Main solving method for dynamic parameter identification. This method implements the complete base parameter identification workflow including column elimination, optional decimation, QR decomposition, and quality metric computation. Args: decimate (bool): Whether to apply decimation to reduce data size decimation_factor (int): Factor for signal decimation (default: 10) zero_tolerance (float): Tolerance for eliminating zero columns plotting (bool): Whether to generate plots save_results (bool): Whether to save parameters to file Returns: ndarray: Base parameters phi_base Raises: AssertionError: If prerequisites not met (dynamic_regressor, standard_parameter) ValueError: If data shapes are incompatible np.linalg.LinAlgError: If QR decomposition fails """ print(f"Starting {self.__class__.__name__} dynamic parameter identification...") # Validate prerequisites self._validate_prerequisites() # Step 1: Eliminate zero columns regressor_reduced, active_params = self._eliminate_zero_columns( zero_tolerance) # Step 2: Apply decimation if requested if decimate: tau_processed, W_processed = self._apply_decimation( regressor_reduced, decimation_factor) else: tau_processed, W_processed = self._prepare_undecimated_data( regressor_reduced) # Step 3: Calculate base parameters results = self._calculate_base_parameters( tau_processed, W_processed, active_params) # Step 4: Store results and compute quality metrics self._compute_quality_metrics() self._store_results(results) # Step 5: Optional plotting if plotting: self.plot_results() # Step 6: Optional parameter saving if save_results: self.save_results() return self.phi_base
[docs] def load_param(self, config_file, setting_type="identification"): """Load the identification parameters from the yaml file. This method supports both legacy YAML format and the new unified configuration format. It automatically detects the format type and applies the appropriate parser. Args: config_file (str): Path to configuration file (legacy or unified) setting_type (str): Configuration section to load """ try: print(f"Loading config from {config_file}") # Check if this is a unified configuration format if is_unified_config(config_file): print("Detected unified configuration format") # Use unified parser parser = UnifiedConfigParser(config_file) unified_config = parser.parse() unified_identif_config = create_task_config( self.robot, unified_config, setting_type ) # Convert unified format to identif_config format self.identif_config = unified_to_legacy_identif_config( self.robot, unified_identif_config ) else: print("Detected legacy configuration format") # Use legacy format parsing with open(config_file, "r") as f: config = yaml.load(f, Loader=yaml.SafeLoader) self.identif_config = get_identification_param_from_yaml( self.robot, config[setting_type] ) except Exception as e: print(f"Error loading config {config_file}: {e}") raise
[docs] @abstractmethod def load_trajectory_data(self): """Load and process CSV data. This method must be implemented by robot-specific subclasses to handle their specific data formats and file structures. Returns: tuple: (timestamps, positions, velocities, torques) as numpy arrays """ pass
[docs] def process_data(self, truncate=None): """Load and process data""" # Set default filter configuration filter_config = self.filter_config # load raw data self.raw_data = self.load_trajectory_data() # Truncate data if truncation indices are provided self.raw_data = self._truncate_data(self.raw_data, truncate) # Apply filtering and differentiation kinematics data self.process_kinematics_data(filter_config) # Process joint torque data self.processed_data["torques"] = self.process_torque_data() # Update sample count to ensure consistency self.num_samples = self.processed_data["positions"].shape[0] # Build full configuration self._build_full_configuration()
[docs] def calculate_full_regressor(self): """Build regressor matrix, compute pre-identified values of standard parameters, compute joint torques based on pre-identified standard parameters.""" # Build full regressor matrix self.dynamic_regressor = build_regressor_basic( self.robot, self.processed_data["positions"], self.processed_data["velocities"], self.processed_data["accelerations"], self.identif_config, ) # Compute standard parameters self.standard_parameter = get_standard_parameters(self.model, self.identif_config) # additional parameters can be added in robot-specific subclass self.add_additional_parameters() # Convert all string values to floats in the standard_parameter dict for key, value in self.standard_parameter.items(): if isinstance(value, str): self.standard_parameter[key] = float(value) # joint torque estimated from p,v,a with std params phi_ref = np.array(list(self.standard_parameter.values())) tau_ref = np.dot(self.dynamic_regressor, phi_ref) # filter only active joints self.tau_ref = tau_ref[range(len(self.identif_config["act_idxv"]) * self.num_samples)]
[docs] def add_additional_parameters(self): """Add additional parameters specific to the robot and recalculate dynamic regressor.""" pass
def _apply_filters(self, *signals, nbutter=4, f_butter=2, med_fil=5, f_sample=100): """Apply median and lowpass filters to any number of signals. Args: *signals: Variable number of signal arrays to filter nbutter (int): Butterworth filter order (default: 4) f_butter (float): Cutoff frequency in Hz (default: 2) med_fil (int): Median filter window size (default: 5) f_sample (float): Sampling frequency in Hz (default: 100) Returns: tuple: Filtered signals in the same order as input """ from scipy import signal # Design Butterworth filter coefficients b1, b2 = signal.butter(nbutter, f_butter / (f_sample / 2), "low") filtered_signals = [] for sig in signals: if sig is None: filtered_signals.append(None) continue # Ensure signal is 2D array if sig.ndim == 1: sig = sig.reshape(-1, 1) sig_filtered = np.zeros(sig.shape) # Apply filters to each column (joint/channel) for j in range(sig.shape[1]): # Apply median filter first sig_med = signal.medfilt(sig[:, j], med_fil) # Apply Butterworth lowpass filter sig_filtered[:, j] = signal.filtfilt( b1, b2, sig_med, padtype="odd", padlen=3 * (max(len(b1), len(b2)) - 1) ) filtered_signals.append(sig_filtered) # Return single array if only one signal, otherwise tuple if len(filtered_signals) == 1: return filtered_signals[0] return tuple(filtered_signals) def _differentiate_signal(self, time_vector, signal, method='gradient'): """Compute first derivative of a time series signal. Args: time_vector (ndarray): Time stamps corresponding to signal samples signal (ndarray): Signal to differentiate (1D or 2D array) method (str): Differentiation method ('gradient', 'forward', 'backward', 'central') Returns: ndarray: First derivative of the signal with same shape as input Raises: ValueError: If time_vector and signal have incompatible shapes ValueError: If method is not supported """ # Validate inputs if signal.shape[0] != time_vector.shape[0]: raise ValueError(f"Time vector length {time_vector.shape[0]} " f"doesn't match signal length {signal.shape[0]}") # Ensure signal is 2D if signal.ndim == 1: signal = signal.reshape(-1, 1) squeeze_output = True else: squeeze_output = False # Handle time vector shape (extract first column if 2D) if time_vector.ndim == 2: t = time_vector[:, 0] else: t = time_vector # Initialize output array derivative = np.zeros_like(signal) # Apply differentiation method to each column for j in range(signal.shape[1]): sig_col = signal[:, j] if method == 'gradient': # Use numpy gradient (handles edge cases automatically) derivative[:, j] = np.gradient(sig_col, t) elif method == 'forward': # Forward difference: df/dt ≈ (f[i+1] - f[i]) / (t[i+1] - t[i]) derivative[:-1, j] = np.diff(sig_col) / np.diff(t) # Extrapolate last point derivative[-1, j] = derivative[-2, j] elif method == 'backward': # Backward difference: df/dt ≈ (f[i] - f[i-1]) / (t[i] - t[i-1]) derivative[1:, j] = np.diff(sig_col) / np.diff(t) # Extrapolate first point derivative[0, j] = derivative[1, j] elif method == 'central': # Central difference: df/dt ≈ (f[i+1] - f[i-1]) / (t[i+1] - t[i-1]) derivative[1:-1, j] = (sig_col[2:] - sig_col[:-2]) / (t[2:] - t[:-2]) # Handle boundary conditions derivative[0, j] = (sig_col[1] - sig_col[0]) / (t[1] - t[0]) derivative[-1, j] = (sig_col[-1] - sig_col[-2]) / (t[-1] - t[-2]) else: raise ValueError(f"Unsupported differentiation method: {method}. " f"Use 'gradient', 'forward', 'backward', or 'central'") # Return with original dimensionality if squeeze_output: return derivative.squeeze() return derivative def _build_full_configuration(self): """Build full configuration arrays for position, velocity, acceleration. This method expands the active joint data to full robot configuration by filling in default values for inactive joints. Uses vectorized operations for optimal performance. """ # Validate required data required_keys = ["positions", "velocities", "accelerations"] for key in required_keys: if key not in self.processed_data or self.processed_data[key] is None: raise ValueError(f"Missing required data: {key}") # Get active joint data q_active = self.processed_data["positions"] dq_active = self.processed_data["velocities"] ddq_active = self.processed_data["accelerations"] # Create full configuration arrays efficiently config_data = [ (q_active, self.robot.q0, self.identif_config["act_idxq"]), (dq_active, self.robot.v0, self.identif_config["act_idxv"]), (ddq_active, self.robot.v0, self.identif_config["act_idxv"]) ] full_configs = [] for active_data, default_config, active_indices in config_data: # Initialize with defaults full_config = np.tile(default_config, (self.num_samples, 1)) # Fill active joints full_config[:, active_indices] = active_data full_configs.append(full_config) # Update processed data efficiently config_keys = ["positions", "velocities", "accelerations"] self.processed_data.update(dict(zip(config_keys, full_configs))) def _truncate_data(self, data_dict, truncate=None): """Truncate data arrays based on provided indices. Args: data_dict (dict): Dictionary containing data arrays to truncate truncate (tuple/list): Truncation indices (start, end) or None for no truncation Returns: dict: Dictionary with truncated data arrays """ if truncate is None: return data_dict.copy() if not isinstance(truncate, (list, tuple)) or len(truncate) != 2: raise ValueError("Truncate parameter must be a tuple/list of length 2 (start, end)") n_i, n_f = truncate truncated_data = {} for key, array in data_dict.items(): if array is not None: truncated_data[key] = array[n_i:n_f] else: truncated_data[key] = None return truncated_data
[docs] def process_kinematics_data(self, filter_config=None): """Process kinematics data (positions, velocities, accelerations) with filtering.""" self.filter_kinematics_data(filter_config)
[docs] def filter_kinematics_data(self, filter_config=None): """Apply filtering to data with configurable parameters. Args: filter_config (dict, optional): Filter configuration with keys: - differentiation_method: Method for derivative estimation - filter_params: Parameters for signal filtering Raises: ValueError: If required data is missing """ # Validate required data if self.raw_data.get("timestamps") is None: raise ValueError("Timestamps are required for data processing") if self.raw_data.get("positions") is None: raise ValueError("Position data is required for processing") # Create processed data copy to avoid modifying raw data self.processed_data = {} # Process timestamps (no filtering needed) self.processed_data["timestamps"] = self.raw_data["timestamps"] # Define signal processing pipeline signal_pipeline = [ ("positions", self.raw_data["positions"], None), ("velocities", self.raw_data.get("velocities"), "positions"), ("accelerations", self.raw_data.get("accelerations"), "velocities") ] # Process signals through pipeline for signal_name, signal_data, dependency in signal_pipeline: if signal_data is not None: # Apply filtering to existing data self.processed_data[signal_name] = self._apply_filters( signal_data, **filter_config['filter_params']) else: # Estimate missing signal from dependency if dependency: dependency_data = self.processed_data[dependency] self.processed_data[signal_name] = \ self._differentiate_signal( self.processed_data["timestamps"], dependency_data, method=filter_config['differentiation_method']) else: raise ValueError( f"Cannot process {signal_name}: no data or dependency")
[docs] def process_torque_data(self, **kwargs): """Process torque data (generic implementation, should be overridden for robot-specific processing).""" # Generic torque processing - robots should override this method if self.raw_data["torques"] is not None: self.processed_data["torques"] = self.raw_data["torques"] return self.processed_data["torques"] else: raise ValueError("Torque data is required for processing")
def _validate_prerequisites(self): """Validate that required data is available for calculation. Raises: AssertionError: If required attributes are not set """ assert hasattr(self, 'dynamic_regressor') and self.dynamic_regressor is not None, \ "Regressor matrix not calculated. " \ "Call calculate_full_regressor() first." assert hasattr(self, 'standard_parameter') and \ self.standard_parameter is not None, \ "Standard parameters not loaded. " \ "Call calculate_full_regressor() first." assert hasattr(self, 'processed_data') and \ self.processed_data is not None, \ "Data not processed. Call process_data() first." def _eliminate_zero_columns(self, zero_tolerance): """Eliminate columns with near-zero values from regressor matrix. Args: zero_tolerance (float): Tolerance for considering columns as zero Returns: tuple: (regressor_reduced, active_parameters) """ idx_eliminated, active_parameters = get_index_eliminate( self.dynamic_regressor, self.standard_parameter, tol_e=zero_tolerance ) regressor_reduced = build_regressor_reduced(self.dynamic_regressor, idx_eliminated) return regressor_reduced, active_parameters def _apply_decimation(self, regressor_reduced, decimation_factor): """Apply signal decimation to reduce data size. Args: regressor_reduced (ndarray): Reduced regressor matrix decimation_factor (int): Factor for decimation Returns: tuple: (tau_decimated, regressor_decimated) """ from scipy import signal # Decimate torque data tau_decimated_list = [] num_joints = len(self.identif_config["act_idxv"]) for i in range(num_joints): tau_joint = self.processed_data["torques"][:, i] tau_dec = signal.decimate(tau_joint, q=decimation_factor, zero_phase=True) tau_decimated_list.append(tau_dec) # Concatenate decimated torque data tau_decimated = tau_decimated_list[0] for i in range(1, len(tau_decimated_list)): tau_decimated = np.append(tau_decimated, tau_decimated_list[i]) # Decimate regressor matrix regressor_decimated = self._decimate_regressor_matrix( regressor_reduced, decimation_factor) # Validate that decimated data is properly aligned if tau_decimated.shape[0] != regressor_decimated.shape[0]: raise ValueError( f"Decimated data size mismatch: " f"tau_decimated has {tau_decimated.shape[0]} samples, " f"regressor_decimated has {regressor_decimated.shape[0]} rows" ) return tau_decimated, regressor_decimated def _decimate_regressor_matrix(self, regressor_reduced, decimation_factor): """Decimate the regressor matrix by joints. Args: regressor_reduced (ndarray): Reduced regressor matrix decimation_factor (int): Decimation factor Returns: ndarray: Decimated regressor matrix """ from scipy import signal num_joints = len(self.identif_config["act_idxv"]) regressor_list = [] for i in range(num_joints): # Extract rows corresponding to joint i start_idx = self.identif_config["act_idxv"][i] * self.num_samples end_idx = (self.identif_config["act_idxv"][i] + 1) * self.num_samples joint_regressor_decimated = [] for j in range(regressor_reduced.shape[1]): column_data = regressor_reduced[start_idx:end_idx, j] decimated_column = signal.decimate( column_data, q=decimation_factor, zero_phase=True) joint_regressor_decimated.append(decimated_column) # Reconstruct matrix for this joint joint_matrix = np.zeros((len(joint_regressor_decimated[0]), len(joint_regressor_decimated))) for k, column in enumerate(joint_regressor_decimated): joint_matrix[:, k] = column regressor_list.append(joint_matrix) # Concatenate all joint matrices total_rows = sum(matrix.shape[0] for matrix in regressor_list) regressor_decimated = np.zeros((total_rows, regressor_list[0].shape[1])) current_row = 0 for matrix in regressor_list: next_row = current_row + matrix.shape[0] regressor_decimated[current_row:next_row, :] = matrix current_row = next_row return regressor_decimated def _prepare_undecimated_data(self, regressor_reduced): """Prepare data without decimation. Args: regressor_reduced (ndarray): Reduced regressor matrix Returns: tuple: (tau_flattened, regressor_reduced) """ tau_data = self.processed_data["torques"] if hasattr(tau_data, 'flatten'): tau_flattened = tau_data.flatten() else: tau_flattened = tau_data return tau_flattened, regressor_reduced def _calculate_base_parameters(self, tau_processed, regressor_processed, active_parameters): """Calculate base parameters using QR decomposition. Args: tau_processed (ndarray): Processed torque data regressor_processed (ndarray): Processed regressor matrix active_parameters (dict): Active parameter dictionary Returns: dict: Results from QR decomposition """ from figaroh.tools.qrdecomposition import double_QR # Perform QR decomposition W_base, base_param_dict, base_parameters, phi_base, phi_std = \ double_QR(tau_processed, regressor_processed, active_parameters, self.standard_parameter) # Calculate torque estimation (avoid redundant computation) tau_estimated = np.dot(W_base, phi_base) # Store key results for backward compatibility self.dynamic_regressor_base = W_base self.phi_base = phi_base self.params_base = list(base_param_dict.keys()) self.tau_identif = tau_estimated self.tau_noised = tau_processed return { "base_regressor": W_base, "base_param_dict": base_param_dict, "base_parameters": base_parameters, "phi_base": phi_base, "tau_estimated": tau_estimated, "tau_processed": tau_processed, } def _compute_quality_metrics(self): """Compute quality metrics for the identification. Side Effects: - Updates self.rms_error - Updates self.correlation """ from figaroh.identification.identification_tools import relative_stdev # Calculate quality metrics self.std_relative = relative_stdev(self.dynamic_regressor_base, self.phi_base, self.tau_noised) residuals = self.tau_noised - self.tau_identif self.rms_error = np.sqrt(np.mean(residuals**2)) if len(self.tau_noised) > 1 and len(self.tau_identif) > 1: try: correlation_matrix = np.corrcoef(self.tau_noised, self.tau_identif) self.correlation = correlation_matrix[0, 1] except (np.linalg.LinAlgError, ValueError): self.correlation = 1.0 else: self.correlation = 1.0 def _store_results(self, identif_results): """Store calculation results in instance attributes. Args: identif_results (dict): Results from base parameter calculation """ # Store results in instance attribute self.result = { "base regressor": identif_results["base_regressor"], "base parameters": identif_results["base_param_dict"], "base parameters values": identif_results["phi_base"], "base parameters names": list(identif_results["base_param_dict"].keys()), "condition number": np.linalg.cond(identif_results["base_regressor"]), "torque estimated": identif_results["tau_estimated"], "torque processed": identif_results["tau_processed"], "std dev of estimated param": self.std_relative, "rmse norm (N/m)": self.rms_error, "num samples": self.num_samples, "identification config": getattr(self, 'identif_config', {}), "task type": "identification" } # Initialize ResultsManager for identification task try: from .results_manager import ResultsManager # Get robot name from class or model robot_name = getattr( self, 'robot_name', getattr( self.model, 'name', self.__class__.__name__.lower().replace( 'identification', ''))) # Initialize results manager for identification task self.results_manager = ResultsManager('identification', robot_name, self.result) except ImportError as e: print(f"Warning: ResultsManager not available: {e}") self.results_manager = None
[docs] def plot_results(self): """Plot identification results using unified results manager.""" if not hasattr(self, 'result') or self.result is None: print("No identification results to plot. Run solve() first.") return # Use pre-initialized results manager if available if hasattr(self, 'results_manager') and \ self.results_manager is not None: try: # Plot using unified manager with self.result data self.results_manager.plot_identification_results() return except Exception as e: print(f"Error plotting with ResultsManager: {e}") print("Falling back to basic plotting...") # Fallback to basic plotting if ResultsManager not available try: import matplotlib.pyplot as plt # Extract data from self.result dictionary tau_measured = self.result.get("torque processed", np.array([])) tau_identified = self.result.get("torque estimated", np.array([])) parameter_values = self.result.get("base parameters values", np.array([])) if len(tau_measured) == 0 or len(tau_identified) == 0: print("No torque data available for plotting") return plt.figure(figsize=(12, 8)) plt.subplot(2, 1, 1) plt.plot(tau_measured, label="Measured (with noise)", alpha=0.7) plt.plot(tau_identified, label="Identified", alpha=0.7) plt.xlabel('Sample') plt.ylabel('Torque (Nm)') plt.title(f'{self.__class__.__name__} Torque Comparison') plt.legend() plt.grid(True, alpha=0.3) plt.subplot(2, 1, 2) if len(parameter_values) > 0: plt.bar(range(len(parameter_values)), parameter_values, alpha=0.7, label="Base Parameters") plt.xlabel('Parameter Index') plt.ylabel('Parameter Value') plt.title('Identified Base Parameters') plt.legend() plt.grid(True, alpha=0.3) plt.tight_layout() plt.show() except ImportError: print("Warning: matplotlib not available for plotting") except Exception as e: print(f"Warning: Plotting failed: {e}")
[docs] def save_results(self, output_dir="results"): """Save identification results using unified results manager.""" if not hasattr(self, 'result') or self.result is None: print("No identification results to save. Run solve() first.") return # Use pre-initialized results manager if available if hasattr(self, 'results_manager') and \ self.results_manager is not None: try: # Save using unified manager with self.result data saved_files = self.results_manager.save_results( output_dir=output_dir, save_formats=['yaml', 'csv', 'npz'] ) print("Identification results saved using ResultsManager") for fmt, path in saved_files.items(): print(f" {fmt}: {path}") return saved_files except Exception as e: print(f"Error saving with ResultsManager: {e}") print("Falling back to basic saving...") # Fallback to basic saving if ResultsManager not available try: import os import yaml import datetime os.makedirs(output_dir, exist_ok=True) # Extract data from self.result dictionary parameter_values = self.result.get("base parameters values", np.array([])) parameter_names = self.result.get("base parameters names", []) condition_number = self.result.get("condition number", 0) rmse_norm = self.result.get("rmse norm (N/m)", 0) std_dev_param = self.result.get("std dev of estimated param", np.array([])) results_dict = { 'base_parameters': ( parameter_values.tolist() if hasattr(parameter_values, 'tolist') else parameter_values), 'parameter_names': [str(p) for p in parameter_names], 'condition_number': float(condition_number), 'rmse_norm': float(rmse_norm), 'standard_deviation': ( std_dev_param.tolist() if hasattr(std_dev_param, 'tolist') else std_dev_param ) } robot_name = self.__class__.__name__.lower().replace( 'identification', '') timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{robot_name}_identification_results_{timestamp}.yaml" with open(os.path.join(output_dir, filename), "w") as f: yaml.dump(results_dict, f, default_flow_style=False) print(f"Results saved to {output_dir}/{filename}") return {filename: os.path.join(output_dir, filename)} except Exception as e: print(f"Error in fallback saving: {e}") return None