# Copyright [2021-2025] Thanh Nguyen
# Copyright [2022-2023] [CNRS, Toward SAS]
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base Optimal Trajectory Generation Framework
This module provides base classes for optimal trajectory generation with
configuration management, parameter computation, constraint handling, and
IPOPT-based optimization. This framework can be extended for different robots.
"""
import yaml
import logging
import numpy as np
from matplotlib import pyplot as plt
from yaml.loader import SafeLoader
from typing import Dict, List, Tuple, Any
from figaroh.identification.identification_tools import get_standard_parameters
from figaroh.tools.regressor import (
build_regressor_basic,
build_regressor_reduced,
get_index_eliminate,
)
from figaroh.tools.qrdecomposition import get_baseIndex, build_baseRegressor
from figaroh.tools.robotcollisions import CollisionWrapper
from figaroh.identification.identification_tools import get_param_from_yaml
from figaroh.utils.cubic_spline import (
CubicSpline, WaypointsGeneration, calc_torque
)
from figaroh.tools.robotipopt import (
IPOPTConfig, BaseOptimizationProblem, RobotIPOPTSolver
)
[docs]
class ConfigurationManager:
"""Manages configuration loading and validation."""
[docs]
@staticmethod
def load_from_yaml(config_file: str) -> Tuple[Dict[str, Any], Any]:
"""Load trajectory parameters from YAML file."""
try:
with open(config_file, "r") as f:
config = yaml.load(f, Loader=SafeLoader)
identif_data = config["identification"]
traj_params = identif_data.get("trajectory_params", [{}])[0]
# Set default values if not present in config
trajectory_config = {
'n_wps': traj_params.get("n_wps", 5),
'freq': traj_params.get("freq", 100),
't_s': traj_params.get("t_s", 2.0),
'soft_lim': traj_params.get("soft_lim", 0.05),
'max_attempts': traj_params.get("max_attempts", 1000)
}
return trajectory_config, identif_data
except FileNotFoundError:
raise FileNotFoundError(
f"Configuration file {config_file} not found"
)
except KeyError as e:
raise ValueError(f"Missing required configuration key: {e}")
except Exception as e:
raise ValueError(f"Error loading configuration: {e}")
[docs]
class BaseParameterComputer:
"""Handles base parameter computation and indexing."""
def __init__(self, robot, identif_config, active_joints, soft_lim_pool):
self.robot = robot
self.identif_config = identif_config
self.active_joints = active_joints
self.soft_lim_pool = soft_lim_pool
[docs]
def compute_base_indices(self) -> Tuple[np.ndarray, np.ndarray]:
"""Compute base parameter indices from random trajectory."""
logging.info(
"Computing base parameter indices from random trajectory..."
)
try:
# Generate random trajectory for base parameter computation
n_wps_r = 100
freq_r = 100
CB_r = CubicSpline(self.robot, n_wps_r, self.active_joints)
WP_r = WaypointsGeneration(self.robot, n_wps_r, self.active_joints)
WP_r.gen_rand_pool(self.soft_lim_pool)
# Generate waypoints and trajectory
wps_r, vel_wps_r, acc_wps_r = WP_r.gen_rand_wp()
tps_r = np.matrix([0.5 * i for i in range(n_wps_r)]).transpose()
t_r, p_r, v_r, a_r = CB_r.get_full_config(
freq_r, tps_r, wps_r, vel_wps_r, acc_wps_r
)
# Compute base indices
idx_e, idx_b = self._get_idx_from_random(p_r, v_r, a_r)
logging.info(f"Computed {len(idx_b)} base parameters")
return idx_e, idx_b
except Exception as e:
logging.error(f"Error computing base indices: {e}")
raise
def _get_idx_from_random(self, q, v, a) -> Tuple[np.ndarray, np.ndarray]:
"""Get indices of eliminate and base parameters."""
W = build_regressor_basic(self.robot, q, v, a, self.identif_config)
params_std = get_standard_parameters(
self.robot.model, self.identif_config
)
idx_e_, par_r_ = get_index_eliminate(W, params_std, tol_e=0.001)
# Convert to numpy arrays
idx_e_ = np.array(idx_e_, dtype=int)
W_e_ = build_regressor_reduced(W, idx_e_)
idx_base_ = get_baseIndex(W_e_, par_r_)
idx_base_ = np.array(idx_base_, dtype=int)
return idx_e_, idx_base_
return idx_e_, idx_base_
[docs]
class TrajectoryConstraintManager:
"""Manages trajectory constraints and bounds."""
def __init__(self, robot, CB, traj_params, identif_config):
self.robot = robot
self.CB = CB
self.n_wps = traj_params['n_wps']
self.freq = traj_params['freq']
self.identif_config = identif_config
self.collision_wrapper = CollisionWrapper(robot=robot, viz=None)
[docs]
def get_variable_bounds(self) -> Tuple[List[float], List[float]]:
"""Get variable bounds for optimization."""
lb, ub = [], []
for i in range(1, self.n_wps):
lb.extend(self.CB.lower_q)
ub.extend(self.CB.upper_q)
return lb, ub
[docs]
def get_constraint_bounds(self, Ns: int) -> Tuple[List, List]:
"""Get constraint bounds for optimization."""
cl, cu = [], []
# Position constraint bounds
for i in range(1, self.n_wps):
cl.extend(self.CB.lower_q)
cu.extend(self.CB.upper_q)
# Velocity constraint bounds
for j in range(Ns):
cl.extend(self.CB.lower_dq)
cu.extend(self.CB.upper_dq)
# Torque constraint bounds
for j in range(Ns):
cl.extend(self.CB.lower_effort)
cu.extend(self.CB.upper_effort)
# Collision constraint bounds
n_cols = len(self.robot.geom_model.collisionPairs)
cl.extend([0.01] * n_cols * (self.n_wps - 1)) # 1 cm margin
cu.extend([2 * 1e19] * n_cols * (self.n_wps - 1)) # no upper limit
return cl, cu
[docs]
def evaluate_constraints(self, Ns: int, X: np.ndarray, opt_cb: Dict,
tps, vel_wps, acc_wps, wp_init) -> np.ndarray:
"""Evaluate all constraints for optimization."""
try:
# Reshape and arrange waypoints
X = np.array(X)
wps_X = np.reshape(X, (self.n_wps - 1, len(self.CB.act_idxq)))
wps = np.vstack((wp_init, wps_X))
wps = wps.transpose()
# Generate full trajectory configuration
t_f, p_f, v_f, a_f = self.CB.get_full_config(
self.freq, tps, wps, vel_wps, acc_wps
)
# Compute joint torques
tau = calc_torque(
p_f.shape[0], self.robot, p_f, v_f, a_f
)
# Evaluate individual constraint types
q_constraints = self._evaluate_position_constraints(p_f, tps, t_f)
v_constraints = self._evaluate_velocity_constraints(v_f)
tau_constraints = self._evaluate_torque_constraints(tau, Ns)
collision_constraints = self._evaluate_collision_constraints(
p_f, tps, t_f
)
# Concatenate all constraints
return np.concatenate(
(q_constraints, v_constraints, tau_constraints,
collision_constraints),
axis=None
)
except Exception as e:
logging.error(f"Error evaluating constraints: {e}")
raise
def _evaluate_position_constraints(self, p_f, tps, t_f) -> np.ndarray:
"""Evaluate position constraints at waypoints."""
idx_waypoints = self._get_waypoint_indices(tps, t_f)
q_constraints = p_f[idx_waypoints, :]
return q_constraints[:, self.CB.act_idxq]
def _evaluate_velocity_constraints(self, v_f) -> np.ndarray:
"""Evaluate velocity constraints at all samples."""
return v_f[:, self.CB.act_idxv]
def _evaluate_torque_constraints(self, tau, Ns) -> np.ndarray:
"""Evaluate torque constraints at all samples."""
tau_constraints = np.zeros((Ns, len(self.CB.act_idxv)))
for k in range(len(self.CB.act_idxv)):
tau_constraints[:, k] = tau[
range(self.CB.act_idxv[k] * Ns, (self.CB.act_idxv[k] + 1) * Ns)
]
return tau_constraints
def _evaluate_collision_constraints(self, p_f, tps, t_f) -> np.ndarray:
"""Evaluate collision constraints."""
idx_waypoints = self._get_waypoint_indices(tps, t_f)
dist_all = []
for j in idx_waypoints:
self.collision_wrapper.computeCollisions(p_f[j, :])
dist_all = np.append(dist_all, self.collision_wrapper.getDistances())
return np.asarray(dist_all)
def _get_waypoint_indices(self, tps, t_f) -> List[int]:
"""Get indices corresponding to waypoint times."""
idx_waypoints = []
time_points = tps[range(1, self.n_wps), :]
time_points_flat = np.array(time_points).flatten()
for i in range(t_f.shape[0]):
t_val = float(t_f[i, 0]) if hasattr(t_f[i, 0], 'item') else t_f[i, 0]
if t_val in time_points_flat:
idx_waypoints.append(i)
return idx_waypoints
[docs]
class BaseOptimalTrajectory:
"""
Base class for IPOPT-based optimal trajectory generation.
Features:
- Modular design with separated concerns
- Better error handling and logging
- Configuration validation
- Cleaner interfaces
This base class can be extended for specific robots by implementing
robot-specific configuration loading and constraint handling.
"""
def __init__(self, robot, active_joints: List[str],
config_file: str = "config/robot_config.yaml"):
"""Initialize the optimal trajectory generator."""
self.robot = robot
self.active_joints = active_joints
# Set up logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
# Load configuration
self.traj_params, identif_data = ConfigurationManager.load_from_yaml(config_file)
self.identif_config = get_param_from_yaml(robot, identif_data)
# Initialize components
self._initialize_components()
# Compute base parameters
self.idx_e, self.idx_b = self.base_computer.compute_base_indices()
# Results storage
self.results = {
'T_F': [], 'P_F': [], 'V_F': [], 'A_F': [],
'iteration_data': [], 'final_regressor_shape': None
}
self.logger.info(f"BaseOptimalTrajectory initialized with {len(self.idx_b)} base parameters")
def _initialize_components(self):
"""Initialize trajectory generation components."""
# Create soft limit pool
n_active_joints = len(self.active_joints)
self.soft_lim_pool = np.full((3, n_active_joints), self.traj_params['soft_lim'])
# Initialize cubic spline and waypoint generation
self.CB = CubicSpline(
self.robot, self.traj_params['n_wps'],
self.active_joints, self.traj_params['soft_lim']
)
self.WP = WaypointsGeneration(
self.robot, self.traj_params['n_wps'],
self.active_joints, self.traj_params['soft_lim']
)
# Initialize specialized components
self.base_computer = BaseParameterComputer(
self.robot, self.identif_config, self.active_joints, self.soft_lim_pool
)
self.constraint_manager = TrajectoryConstraintManager(
self.robot, self.CB, self.traj_params, self.identif_config
)
[docs]
def build_base_regressor(self, q, v, a, W_stack=None) -> np.ndarray:
"""Build base regressor matrix."""
try:
W = build_regressor_basic(self.robot, q, v, a, self.identif_config)
W_e_ = build_regressor_reduced(W, self.idx_e)
W_b_ = build_baseRegressor(W_e_, self.idx_b)
if isinstance(W_stack, np.ndarray):
W_b_ = np.vstack((W_stack, W_b_))
return W_b_
except Exception as e:
self.logger.error(f"Error building base regressor: {e}")
raise
[docs]
def objective_function(self, X, opt_cb, tps, vel_wps, acc_wps, wp_init, W_stack=None):
"""Objective function: condition number of base regressor matrix."""
try:
# Reshape and arrange waypoints
X = np.array(X)
wps_X = np.reshape(X, (self.traj_params['n_wps'] - 1, len(self.active_joints)))
wps = np.vstack((wp_init, wps_X))
wps = wps.transpose()
# Generate full trajectory configuration
t_f, p_f, v_f, a_f = self.CB.get_full_config(
self.traj_params['freq'], tps, wps, vel_wps, acc_wps
)
# Store in callback dictionary
opt_cb.update({"t_f": t_f, "p_f": p_f, "v_f": v_f, "a_f": a_f})
# Build stacked base regressor and return condition number
W_b = self.build_base_regressor(p_f, v_f, a_f, W_stack=W_stack)
return np.linalg.cond(W_b)
except Exception as e:
self.logger.error(f"Error in objective function: {e}")
return 1e10 # Return large penalty value
[docs]
def generate_feasible_initial_guess(self, wp_init, vel_wp_init, acc_wp_init):
"""Generate a feasible initial guess for optimization."""
self.logger.info("Generating feasible initial trajectory...")
count = 0
is_constr_violated = True
while is_constr_violated and count < self.traj_params['max_attempts']:
count += 1
if count % 100 == 0:
self.logger.info(f"Attempt {count} to find feasible initial trajectory...")
try:
# Generate random waypoints
wps, vel_wps, acc_wps = self.WP.gen_rand_wp(wp_init, vel_wp_init, acc_wp_init)
# Generate time points
tps = np.matrix([
self.traj_params['t_s'] * i_wp
for i_wp in range(self.traj_params['n_wps'])
]).transpose()
# Get full configuration
t_i, p_i, v_i, a_i = self.CB.get_full_config(
self.traj_params['freq'], tps, wps, vel_wps, acc_wps
)
# Compute torques and check constraints
tau_i = calc_torque(
p_i.shape[0], self.robot, p_i, v_i, a_i
)
tau_i = np.reshape(tau_i, (v_i.shape[1], v_i.shape[0])).transpose()
is_constr_violated = self.CB.check_cfg_constraints(p_i, v_i, tau_i)
except Exception as e:
self.logger.warning(f"Error in attempt {count}: {e}")
continue
if count >= self.traj_params['max_attempts']:
self.logger.warning(
f"Could not find feasible initial trajectory after {self.traj_params['max_attempts']} attempts"
)
else:
self.logger.info(f"Found feasible initial trajectory after {count} attempts")
return wps, vel_wps, acc_wps, tps, t_i, p_i, v_i, a_i
[docs]
def solve(self, stack_reps: int = 2) -> Dict[str, Any]:
"""
Solve the optimal trajectory generation problem.
Args:
stack_reps: Number of trajectory segments to stack
Returns:
Dict containing trajectories and optimization info
"""
self.logger.info(f"Starting optimal trajectory generation with {stack_reps} segments...")
try:
# Initialize
self.WP.gen_rand_pool(self.soft_lim_pool)
wp_init = np.zeros(len(self.CB.act_idxq))
vel_wp_init = np.zeros(len(self.CB.act_idxv))
acc_wp_init = np.zeros(len(self.CB.act_idxv))
# Random initial position
for idx in range(len(self.CB.act_idxq)):
wp_init[idx] = np.random.choice(self.WP.pool_q[:, idx], 1)[0]
W_stack = None
for s_rep in range(stack_reps):
self.logger.info(f"Optimizing segment {s_rep + 1}/{stack_reps}")
self.logger.info(f"Initial waypoint: {wp_init}")
success = self._solve_segment(s_rep, wp_init, vel_wp_init, acc_wp_init, W_stack)
if not success:
self.logger.error(f"Failed to solve segment {s_rep + 1}")
break
# Update for next segment
if s_rep < stack_reps - 1: # Not the last segment
wp_init, W_stack = self._prepare_next_segment()
self.logger.info(f"Completed! Generated {len(self.results['T_F'])} trajectory segments")
self.results['final_regressor_shape'] = W_stack.shape if W_stack is not None else None
return self.results
except Exception as e:
self.logger.error(f"Error in solve: {e}")
raise
def _solve_segment(self, s_rep, wp_init, vel_wp_init, acc_wp_init, W_stack) -> bool:
"""Solve a single trajectory segment."""
try:
# Generate feasible initial guess
wps, vel_wps, acc_wps, tps, t_i, p_i, v_i, a_i = self.generate_feasible_initial_guess(
wp_init, vel_wp_init, acc_wp_init
)
# Adjust time points for stacking
tps = self.traj_params['t_s'] * s_rep + tps
# Create and solve IPOPT problem - This should be implemented by subclasses
problem = self._create_ipopt_problem(
len(self.active_joints), self.traj_params['n_wps'],
p_i.shape[0], tps, vel_wps, acc_wps, wp_init, vel_wp_init,
acc_wp_init, W_stack
)
success, result_data = problem.solve_with_waypoints(wps)
if success:
self.results['T_F'].append(result_data['t_f'])
self.results['P_F'].append(result_data['p_f'][:, self.CB.act_idxq])
self.results['V_F'].append(result_data['v_f'][:, self.CB.act_idxv])
self.results['A_F'].append(result_data['a_f'][:, self.CB.act_idxv])
self.results['iteration_data'].append(result_data['iter_data'])
self.logger.info(f"Segment {s_rep + 1} completed successfully!")
return True
else:
return False
except Exception as e:
self.logger.error(f"Error solving segment {s_rep + 1}: {e}")
return False
def _create_ipopt_problem(self, n_joints, n_wps, Ns, tps, vel_wps, acc_wps,
wp_init, vel_wp_init, acc_wp_init, W_stack):
"""Create IPOPT problem instance. Should be implemented by subclasses."""
raise NotImplementedError("Subclasses must implement _create_ipopt_problem")
def _prepare_next_segment(self) -> Tuple[np.ndarray, np.ndarray]:
"""Prepare initial conditions for next segment."""
# Get last result
last_result = self.results['iteration_data'][-1]
wp_init = last_result['final_waypoint']
# Stack regressor
last_p_f = self.results['P_F'][-1]
last_v_f = self.results['V_F'][-1]
last_a_f = self.results['A_F'][-1]
# Convert back to full configuration for regressor building
# This is a simplified version - in practice you'd need to handle this more carefully
W_stack = self.build_base_regressor(last_p_f, last_v_f, last_a_f)
return wp_init, W_stack
[docs]
def plot_results(self):
"""Plot optimal trajectory results using unified results manager."""
if not self.results['T_F']:
self.logger.warning("No trajectory data to plot")
return
try:
from .results_manager import ResultsManager
# Initialize results manager
robot_name = getattr(self, 'robot_name', self.robot.model.name)
results_manager = ResultsManager('optimal_trajectory', robot_name)
# Calculate overall condition number
condition_number = getattr(self, 'final_condition_number', 0.0)
if condition_number == 0.0 and hasattr(self, 'results') and 'condition_numbers' in self.results:
condition_number = self.results['condition_numbers'][-1] if self.results['condition_numbers'] else 0.0
# Plot using unified manager
results_manager.plot_optimal_trajectory_results(
trajectories=self.results,
condition_number=condition_number,
joint_names=[f"Joint {i+1}" for i in range(len(self.CB.act_Jid))],
title="Optimal Trajectory Generation Results"
)
except ImportError:
# Fallback to existing plotting
try:
# Create subplots
n_joints = len(self.CB.act_Jid)
fig, axes = plt.subplots(n_joints, 3, sharex=True, figsize=(15, 2*n_joints))
if n_joints == 1:
axes = axes.reshape(1, -1)
fig.suptitle('Optimal Trajectory Results', fontsize=16)
# Plot each segment
colors = plt.cm.tab10(np.linspace(0, 1, len(self.results['T_F'])))
for seg_idx, (T, P, V, A) in enumerate(zip(
self.results['T_F'], self.results['P_F'],
self.results['V_F'], self.results['A_F']
)):
color = colors[seg_idx]
label = f'Segment {seg_idx + 1}'
for joint_idx in range(n_joints):
axes[joint_idx, 0].plot(T, P[:, joint_idx], color=color, label=label)
axes[joint_idx, 1].plot(T, V[:, joint_idx], color=color, label=label)
axes[joint_idx, 2].plot(T, A[:, joint_idx], color=color, label=label)
# Set labels and formatting
for joint_idx in range(n_joints):
axes[joint_idx, 0].set_ylabel(f'Joint {joint_idx+1}\nPosition (rad)')
axes[joint_idx, 1].set_ylabel(f'Joint {joint_idx+1}\nVelocity (rad/s)')
axes[joint_idx, 2].set_ylabel(f'Joint {joint_idx+1}\nAcceleration (rad/s²)')
if joint_idx == 0:
for col in range(3):
axes[joint_idx, col].legend()
for col in range(3):
axes[joint_idx, col].grid(True, alpha=0.3)
axes[-1, 0].set_xlabel('Time (s)')
axes[-1, 1].set_xlabel('Time (s)')
axes[-1, 2].set_xlabel('Time (s)')
plt.tight_layout()
plt.show()
except Exception as e:
self.logger.error(f"Error plotting results: {e}")
[docs]
def save_results(self, output_dir="results"):
"""Save optimal trajectory results using unified results manager."""
if not self.results['T_F']:
self.logger.warning("No trajectory data to save")
return
try:
from .results_manager import ResultsManager
# Initialize results manager
robot_name = getattr(self, 'robot_name', self.robot.model.name)
results_manager = ResultsManager('optimal_trajectory', robot_name)
# Calculate overall condition number
condition_number = getattr(self, 'final_condition_number', 0.0)
if condition_number == 0.0 and hasattr(self, 'results') and 'condition_numbers' in self.results:
condition_number = self.results['condition_numbers'][-1] if self.results['condition_numbers'] else 0.0
# Prepare results dictionary
results_dict = {
'trajectory_segments': len(self.results['T_F']),
'condition_number': float(condition_number),
'joint_names': [f"Joint {i+1}" for i in range(len(self.CB.act_Jid))],
'configuration': self.CB.identif_config,
'time_segments': [t.tolist() for t in self.results['T_F']],
'position_segments': [p.tolist() for p in self.results['P_F']],
'velocity_segments': [v.tolist() for v in self.results['V_F']],
'acceleration_segments': [a.tolist() for a in self.results['A_F']]
}
# Add condition number history if available
if 'condition_numbers' in self.results:
results_dict['condition_number_history'] = [float(c) for c in self.results['condition_numbers']]
# Save using unified manager
saved_files = results_manager.save_results(
results_dict,
output_dir,
save_formats=['yaml', 'npz']
)
self.logger.info(f"Trajectory results saved successfully")
return saved_files
except ImportError:
# Fallback to basic saving
import os
import yaml
os.makedirs(output_dir, exist_ok=True)
# Basic results dictionary
robot_name = getattr(self, 'robot_name', self.robot.model.name)
filename = f"{robot_name}_optimal_trajectory.yaml"
condition_number = getattr(self, 'final_condition_number', 0.0)
results_dict = {
'trajectory_segments': len(self.results['T_F']),
'condition_number': float(condition_number),
'joint_count': len(self.CB.act_Jid)
}
with open(os.path.join(output_dir, filename), 'w') as f:
yaml.dump(results_dict, f, default_flow_style=False)
self.logger.info(f"Basic results saved to {output_dir}/{filename}")
return {'yaml': os.path.join(output_dir, filename)}
[docs]
class BaseTrajectoryIPOPTProblem(BaseOptimizationProblem):
"""
Base IPOPT problem formulation for trajectory optimization.
This class provides a base implementation for trajectory optimization
that can be extended for specific robots.
"""
def __init__(self, opt_traj, n_joints, n_wps, Ns, tps, vel_wps, acc_wps,
wp_init, vel_wp_init, acc_wp_init, W_stack, problem_name="TrajectoryOptimization"):
super().__init__(problem_name)
self.opt_traj = opt_traj
self.n_joints = n_joints
self.n_wps = n_wps
self.Ns = Ns
self.tps = tps
self.vel_wps = vel_wps
self.acc_wps = acc_wps
self.wp_init = wp_init
self.vel_wp_init = vel_wp_init
self.acc_wp_init = acc_wp_init
self.W_stack = W_stack
# Storage for optimization callback (inherits callback_data from base)
self.opt_cb = {"t_f": None, "p_f": None, "v_f": None, "a_f": None}
[docs]
def get_variable_bounds(self) -> Tuple[List[float], List[float]]:
"""Get variable bounds for optimization."""
return self.opt_traj.constraint_manager.get_variable_bounds()
[docs]
def get_constraint_bounds(self) -> Tuple[List[float], List[float]]:
"""Get constraint bounds for optimization."""
return self.opt_traj.constraint_manager.get_constraint_bounds(self.Ns)
[docs]
def get_initial_guess(self) -> List[float]:
"""Get initial guess from waypoints."""
# This will be set when solve() is called with waypoints
if not hasattr(self, '_initial_wps'):
# Return zeros as fallback
return [0.0] * (self.n_joints * (self.n_wps - 1))
X0 = self._initial_wps[:, range(1, self.n_wps)]
return np.reshape(X0.transpose(),
(self.n_joints * (self.n_wps - 1),)).tolist()
[docs]
def objective(self, X: np.ndarray) -> float:
"""Objective function: condition number of base regressor matrix."""
return self.opt_traj.objective_function(
X, self.opt_cb, self.tps, self.vel_wps, self.acc_wps,
self.wp_init, self.W_stack
)
[docs]
def constraints(self, X: np.ndarray) -> np.ndarray:
"""Constraint function for IPOPT."""
return self.opt_traj.constraint_manager.evaluate_constraints(
self.Ns, X, self.opt_cb, self.tps, self.vel_wps,
self.acc_wps, self.wp_init
)
[docs]
def jacobian(self, X: np.ndarray) -> np.ndarray:
"""
Jacobian of constraints - Custom implementation for better performance.
For trajectory optimization, we can use sparse finite differences
instead of full automatic differentiation which is too slow.
"""
try:
# Get current constraint values
c0 = self.constraints(X)
n_constraints = len(c0)
n_vars = len(X)
# Use finite differences with smaller step size for efficiency
eps = 1e-6
jac = np.zeros((n_constraints, n_vars))
# Compute Jacobian column by column (forward differences)
for i in range(n_vars):
X_plus = X.copy()
X_plus[i] += eps
c_plus = self.constraints(X_plus)
jac[:, i] = (c_plus - c0) / eps
self.logger.debug(f"Constraint jacobian shape: {jac.shape}")
return jac
except Exception as e:
self.logger.warning(f"Error computing jacobian: {e}")
# Return sparse identity matrix as fallback
n_constraints = len(self.constraints(X))
n_vars = len(X)
# Create a sparse jacobian approximation
jac = np.zeros((n_constraints, n_vars))
min_dim = min(n_constraints, n_vars)
jac[:min_dim, :min_dim] = np.eye(min_dim)
return jac
[docs]
def solve_with_waypoints(self, wps) -> Tuple[bool, Dict[str, Any]]:
"""
Solve the optimization problem with given initial waypoints.
Args:
wps: Initial waypoints
Returns:
Tuple of (success, results_dict)
"""
try:
# Store initial waypoints for get_initial_guess
self._initial_wps = wps
# Create solver with trajectory optimization config
config = IPOPTConfig.for_trajectory_optimization()
# Adjust settings for this complex problem
config.tolerance = 1e-3
config.acceptable_tolerance = 1e-2
config.max_iterations = 200
config.print_level = 3 # Reduce output
config.custom_options = {
b"mu_strategy": b"adaptive",
}
solver = RobotIPOPTSolver(self, config)
# Solve the problem
success, results = solver.solve()
if success:
# Extract final waypoint for next segment
X_opt = results['x_opt']
wps_X = np.reshape(np.array(X_opt), (self.n_wps - 1, self.n_joints))
final_waypoint = wps_X[-1, :]
# Update results with trajectory-specific data
results.update({
't_f': self.opt_cb["t_f"],
'p_f': self.opt_cb["p_f"],
'v_f': self.opt_cb["v_f"],
'a_f': self.opt_cb["a_f"],
'iter_data': {
'iterations': self.iteration_data['iterations'],
'obj_values': self.iteration_data['obj_values'],
'solve_time': results['solve_time'],
'status': results['status'],
'final_waypoint': final_waypoint
}
})
return True, results
else:
self.logger.error("Optimization failed")
return False, results
except Exception as e:
self.logger.error(f"Error in IPOPT solve: {e}")
return False, {'error': str(e)}