#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ FAST-LIO Distance Calculator + STM32 Communication Real-time calculation of radar movement distance and position, and send data to STM32 via UART """ import rclpy from rclpy.node import Node from nav_msgs.msg import Odometry import math import time from datetime import datetime import serial import struct import os import sys class DistanceCalculator(Node): def __init__(self): super().__init__('distance_calculator') # Subscribe to odometry topic self.subscription = self.create_subscription( Odometry, '/Odometry', self.odom_callback, 10) # Initialize variables self.last_position = None self.total_distance = 0.0 self.start_time = time.time() self.last_log_time = time.time() self.position_history = [] self.last_sent_time = time.time() # 添加静态检测和噪声过滤 self.static_threshold = 0.002 # 静态检测阈值(米) self.static_count = 0 self.static_position_buffer = [] self.max_static_buffer_size = 10 self.velocity_threshold = 0.001 # 速度阈值(米/秒) self.last_position_filtered = None # 系统启动稳定机制 self.startup_delay = 5.0 # 启动后5秒延迟发送 self.system_stable = False self.data_valid_count = 0 self.min_valid_data_count = 10 # 需要10个连续有效数据才开始发送 # Initialize UART communication self.uart_port = '/dev/ttyTHS1' # Jetson Orin NX UART1 self.baudrate = 115200 self.ser = None try: self.ser = serial.Serial( port=self.uart_port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=0.1 ) self.get_logger().info(f"UART port opened: {self.uart_port}, Baudrate: {self.baudrate}") except Exception as e: self.get_logger().error(f"Failed to open UART port: {str(e)}") self.get_logger().info("Data will not be sent to STM32") # Create log file self.log_filename = f"trajectory_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" self.get_logger().info("Distance calculator started") self.get_logger().info(f"Trajectory data will be saved to: {self.log_filename}") self.get_logger().info("Waiting for radar positioning data...") # Write log file header with open(self.log_filename, 'w', encoding='utf-8') as f: f.write("Timestamp,Runtime(s),X(m),Y(m),Z(m),Total Distance(m),Velocity X(m/s),Velocity Y(m/s),Velocity Z(m/s)\n") def send_to_stm32(self, x, y, z, vx, vy, vz): """Send data to STM32 using anonymous protocol""" if self.ser is None or not self.ser.is_open: return False try: # Protocol format: [0xAA][0xFF][0x01][Data Length][Data][Sum Check][Add Check] # Scale factors POS_SCALE_FACTOR = 1000 # 1 mm precision VEL_SCALE_FACTOR = 1000 # 1 mm/s precision # 数据有效性检查 # 检查是否为有效数值 if not all(math.isfinite(val) for val in [x, y, z, vx, vy, vz]): self.get_logger().warning("Invalid data detected (NaN or Inf), skipping transmission") return False # 位置范围限制(±30米) MAX_POS = 30.0 MIN_POS = -30.0 x = max(MIN_POS, min(MAX_POS, x)) y = max(MIN_POS, min(MAX_POS, y)) z = max(MIN_POS, min(MAX_POS, z)) # 速度范围限制(±30 m/s) MAX_VEL = 30.0 MIN_VEL = -30.0 vx = max(MIN_VEL, min(MAX_VEL, vx)) vy = max(MIN_VEL, min(MAX_VEL, vy)) vz = max(MIN_VEL, min(MAX_VEL, vz)) # Convert to integer values x_int = int(x * POS_SCALE_FACTOR) y_int = int(y * POS_SCALE_FACTOR) z_int = int(z * POS_SCALE_FACTOR) vx_int = int(vx * VEL_SCALE_FACTOR) vy_int = int(vy * VEL_SCALE_FACTOR) vz_int = int(vz * VEL_SCALE_FACTOR) # 短整型范围检查(-32768 到 32767) SHORT_MAX = 32767 SHORT_MIN = -32768 # 进一步限制确保在短整型范围内 x_int = max(SHORT_MIN, min(SHORT_MAX, x_int)) y_int = max(SHORT_MIN, min(SHORT_MAX, y_int)) z_int = max(SHORT_MIN, min(SHORT_MAX, z_int)) vx_int = max(SHORT_MIN, min(SHORT_MAX, vx_int)) vy_int = max(SHORT_MIN, min(SHORT_MAX, vy_int)) vz_int = max(SHORT_MIN, min(SHORT_MAX, vz_int)) # Pack data: 3 int16 (x,y,z) + 3 int16 (vx,vy,vz) data_content = struct.pack(' self.max_static_buffer_size: self.static_position_buffer.pop(0) # 如果有足够的静态数据,使用平均值 if len(self.static_position_buffer) >= 5: avg_x = sum(pos[0] for pos in self.static_position_buffer) / len(self.static_position_buffer) avg_y = sum(pos[1] for pos in self.static_position_buffer) / len(self.static_position_buffer) avg_z = sum(pos[2] for pos in self.static_position_buffer) / len(self.static_position_buffer) # 创建过滤后的位置 class FilteredPos: def __init__(self, x, y, z): self.x = x self.y = y self.z = z return FilteredPos(avg_x, avg_y, avg_z), True else: # 运动状态,清空缓冲区 self.static_position_buffer.clear() return current_pos, False def odom_callback(self, msg): current_time = time.time() current_pos = msg.pose.pose.position # Calculate runtime elapsed_time = current_time - self.start_time # Get linear velocity components linear_vel = msg.twist.twist.linear vx = linear_vel.x vy = linear_vel.y vz = linear_vel.z # Calculate total velocity magnitude velocity = math.sqrt(vx**2 + vy**2 + vz**2) # 应用静态噪声过滤 filtered_pos, is_static = self.filter_static_noise(current_pos, vx, vy, vz) if is_static: self.get_logger().debug(f"Static mode: using filtered position") # 使用过滤后的位置 current_pos = filtered_pos if self.last_position is not None: # Calculate distance between points dx = current_pos.x - self.last_position.x dy = current_pos.y - self.last_position.y dz = current_pos.z - self.last_position.z distance = math.sqrt(dx*dx + dy*dy + dz*dz) # Update total distance self.total_distance += distance # Display info every 2 seconds if current_time - self.last_log_time > 2.0: self.display_status(current_pos, elapsed_time, vx, vy, vz, velocity) self.last_log_time = current_time # 检查系统稳定性 if not self.system_stable: # 检查启动延迟 if elapsed_time < self.startup_delay: self.get_logger().debug(f"System warming up... {self.startup_delay - elapsed_time:.1f}s remaining") return # 检查数据有效性 data_is_valid = ( abs(current_pos.x) < 10.0 and abs(current_pos.y) < 10.0 and abs(current_pos.z) < 10.0 and abs(vx) < 5.0 and abs(vy) < 5.0 and abs(vz) < 5.0 and all(math.isfinite(val) for val in [current_pos.x, current_pos.y, current_pos.z, vx, vy, vz]) ) if data_is_valid: self.data_valid_count += 1 if self.data_valid_count >= self.min_valid_data_count: self.system_stable = True self.get_logger().info("System stabilized, starting UART transmission") else: self.data_valid_count = 0 # 重置计数 self.get_logger().debug("Waiting for stable data...") # Send data to STM32 every 50ms (20Hz) - 优化发送频率 if self.system_stable and current_time - self.last_sent_time > 0.05: if self.ser and self.ser.is_open: success = self.send_to_stm32( current_pos.x, current_pos.y, current_pos.z, vx, vy, vz ) if success: self.get_logger().info(f"Sent to STM32: X={current_pos.x:.3f}, Y={current_pos.y:.3f}, Z={current_pos.z:.3f}, Vx={vx:.3f}, Vy={vy:.3f}, Vz={vz:.3f}") self.last_sent_time = current_time # Save position history self.position_history.append({ 'time': current_time, 'position': current_pos, 'velocity': (vx, vy, vz), 'distance': self.total_distance }) # Write to log file self.log_to_file(elapsed_time, current_pos, vx, vy, vz) # Update last position self.last_position = current_pos def display_status(self, position, elapsed_time, vx, vy, vz, velocity): """Display current status""" self.get_logger().info("=" * 60) self.get_logger().info(f"Runtime: {elapsed_time:.1f} sec") self.get_logger().info(f"Position: X={position.x:.3f}m, Y={position.y:.3f}m, Z={position.z:.3f}m") self.get_logger().info(f"Total distance: {self.total_distance:.3f} m") self.get_logger().info(f"Velocity X: {vx:.3f} m/s") self.get_logger().info(f"Velocity Y: {vy:.3f} m/s") self.get_logger().info(f"Velocity Z: {vz:.3f} m/s") self.get_logger().info(f"Total speed: {velocity:.3f} m/s") self.get_logger().info(f"Avg speed: {self.total_distance/elapsed_time:.3f} m/s") # Add UART status if self.ser and self.ser.is_open: self.get_logger().info("UART communication: Connected") else: self.get_logger().warning("UART communication: Disconnected") def log_to_file(self, elapsed_time, position, vx, vy, vz): """Log data to file""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] with open(self.log_filename, 'a', encoding='utf-8') as f: f.write(f"{timestamp},{elapsed_time:.3f},{position.x:.6f},{position.y:.6f},{position.z:.6f},{self.total_distance:.6f},{vx:.6f},{vy:.6f},{vz:.6f}\n") def print_summary(self): """Print summary information""" if len(self.position_history) > 0: elapsed_time = time.time() - self.start_time print("\n" + "="*80) print("FAST-LIO Trajectory Report") print("="*80) print(f"Total runtime: {elapsed_time:.1f} sec ({elapsed_time/60:.1f} min)") print(f"Total distance: {self.total_distance:.3f} m") print(f"Avg speed: {self.total_distance/elapsed_time:.3f} m/s") print(f"Data points: {len(self.position_history)}") print(f"Trajectory saved to: {self.log_filename}") # Add UART status if self.ser and self.ser.is_open: print(f"UART port: {self.uart_port} @ {self.baudrate} bps") print(f"Protocol: Anonymous (Header 0xAA, Target 0xFF, ID 0x01)") print(f"Data format: X/Y/Z/Vx/Vy/Vz (all int16)") else: print("UART communication: Disconnected") if len(self.position_history) >= 2: start_pos = self.position_history[0]['position'] end_pos = self.position_history[-1]['position'] # Calculate straight-line distance dx = end_pos.x - start_pos.x dy = end_pos.y - start_pos.y dz = end_pos.z - start_pos.z straight_distance = math.sqrt(dx*dx + dy*dy + dz*dz) print(f"\nStart position: ({start_pos.x:.3f}, {start_pos.y:.3f}, {start_pos.z:.3f})") print(f"End position: ({end_pos.x:.3f}, {end_pos.y:.3f}, {end_pos.z:.3f})") print(f"Straight distance: {straight_distance:.3f} m") print(f"Path efficiency: {(straight_distance/self.total_distance)*100:.1f}%") print("="*80) # Close UART if self.ser and self.ser.is_open: self.ser.close() print("UART connection closed") def main(): print("Starting FAST-LIO distance calculator...") print("Tip: Press Ctrl+C to stop and view report") print(f"Initializing UART: /dev/ttyTHS1 @ 115200 bps") rclpy.init() calculator = DistanceCalculator() try: rclpy.spin(calculator) except KeyboardInterrupt: print("\nStopping distance calculator...") finally: calculator.print_summary() calculator.destroy_node() rclpy.shutdown() print("Program finished") if __name__ == '__main__': main()