Files
360/distance_calculator.py
2025-07-16 18:37:38 +08:00

378 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
FAST-LIO Distance Calculator + STM32 Communication
Real-time calculation of radar movement distance and position,
and send data to STM32 via UART
"""
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
import math
import time
from datetime import datetime
import serial
import struct
import os
import sys
class DistanceCalculator(Node):
def __init__(self):
super().__init__('distance_calculator')
# Subscribe to odometry topic
self.subscription = self.create_subscription(
Odometry, '/Odometry', self.odom_callback, 10)
# Initialize variables
self.last_position = None
self.total_distance = 0.0
self.start_time = time.time()
self.last_log_time = time.time()
self.position_history = []
self.last_sent_time = time.time()
# 添加静态检测和噪声过滤
self.static_threshold = 0.002 # 静态检测阈值(米)
self.static_count = 0
self.static_position_buffer = []
self.max_static_buffer_size = 10
self.velocity_threshold = 0.001 # 速度阈值(米/秒)
self.last_position_filtered = None
# 系统启动稳定机制
self.startup_delay = 5.0 # 启动后5秒延迟发送
self.system_stable = False
self.data_valid_count = 0
self.min_valid_data_count = 10 # 需要10个连续有效数据才开始发送
# Initialize UART communication
self.uart_port = '/dev/ttyTHS1' # Jetson Orin NX UART1
self.baudrate = 115200
self.ser = None
try:
self.ser = serial.Serial(
port=self.uart_port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=0.1
)
self.get_logger().info(f"UART port opened: {self.uart_port}, Baudrate: {self.baudrate}")
except Exception as e:
self.get_logger().error(f"Failed to open UART port: {str(e)}")
self.get_logger().info("Data will not be sent to STM32")
# Create log file
self.log_filename = f"trajectory_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
self.get_logger().info("Distance calculator started")
self.get_logger().info(f"Trajectory data will be saved to: {self.log_filename}")
self.get_logger().info("Waiting for radar positioning data...")
# Write log file header
with open(self.log_filename, 'w', encoding='utf-8') as f:
f.write("Timestamp,Runtime(s),X(m),Y(m),Z(m),Total Distance(m),Velocity X(m/s),Velocity Y(m/s),Velocity Z(m/s)\n")
def send_to_stm32(self, x, y, z, vx, vy, vz):
"""Send data to STM32 using anonymous protocol"""
if self.ser is None or not self.ser.is_open:
return False
try:
# Protocol format: [0xAA][0xFF][0x01][Data Length][Data][Sum Check][Add Check]
# Scale factors
POS_SCALE_FACTOR = 1000 # 1 mm precision
VEL_SCALE_FACTOR = 1000 # 1 mm/s precision
# 数据有效性检查
# 检查是否为有效数值
if not all(math.isfinite(val) for val in [x, y, z, vx, vy, vz]):
self.get_logger().warning("Invalid data detected (NaN or Inf), skipping transmission")
return False
# 位置范围限制±30米
MAX_POS = 30.0
MIN_POS = -30.0
x = max(MIN_POS, min(MAX_POS, x))
y = max(MIN_POS, min(MAX_POS, y))
z = max(MIN_POS, min(MAX_POS, z))
# 速度范围限制±30 m/s
MAX_VEL = 30.0
MIN_VEL = -30.0
vx = max(MIN_VEL, min(MAX_VEL, vx))
vy = max(MIN_VEL, min(MAX_VEL, vy))
vz = max(MIN_VEL, min(MAX_VEL, vz))
# Convert to integer values
x_int = int(x * POS_SCALE_FACTOR)
y_int = int(y * POS_SCALE_FACTOR)
z_int = int(z * POS_SCALE_FACTOR)
vx_int = int(vx * VEL_SCALE_FACTOR)
vy_int = int(vy * VEL_SCALE_FACTOR)
vz_int = int(vz * VEL_SCALE_FACTOR)
# 短整型范围检查(-32768 到 32767
SHORT_MAX = 32767
SHORT_MIN = -32768
# 进一步限制确保在短整型范围内
x_int = max(SHORT_MIN, min(SHORT_MAX, x_int))
y_int = max(SHORT_MIN, min(SHORT_MAX, y_int))
z_int = max(SHORT_MIN, min(SHORT_MAX, z_int))
vx_int = max(SHORT_MIN, min(SHORT_MAX, vx_int))
vy_int = max(SHORT_MIN, min(SHORT_MAX, vy_int))
vz_int = max(SHORT_MIN, min(SHORT_MAX, vz_int))
# Pack data: 3 int16 (x,y,z) + 3 int16 (vx,vy,vz)
data_content = struct.pack('<hhhhhh',
x_int, y_int, z_int,
vx_int, vy_int, vz_int)
LEN = len(data_content) # Data length = 6 * 2 = 12 bytes
# Build frame
frame = bytearray()
frame.append(0xAA) # Header
frame.append(0xFF) # Target address
frame.append(0x01) # Function code
frame.append(LEN) # Data length
frame.extend(data_content)
# Calculate checksums
sum_check = 0
add_check = 0
for byte in frame:
sum_check = (sum_check + byte) & 0xFF
add_check = (add_check + sum_check) & 0xFF
# Add checksums
frame.append(sum_check)
frame.append(add_check)
# Send data
self.ser.write(frame)
return True
except Exception as e:
self.get_logger().error(f"Error sending data: {str(e)}")
return False
def filter_static_noise(self, current_pos, vx, vy, vz):
"""过滤静态噪声,当设备静止时减少位置抖动"""
# 计算速度大小
velocity_magnitude = math.sqrt(vx**2 + vy**2 + vz**2)
# 如果速度很小,认为是静止状态
if velocity_magnitude < self.velocity_threshold:
# 添加到静态位置缓冲区
self.static_position_buffer.append((current_pos.x, current_pos.y, current_pos.z))
# 保持缓冲区大小
if len(self.static_position_buffer) > self.max_static_buffer_size:
self.static_position_buffer.pop(0)
# 如果有足够的静态数据,使用平均值
if len(self.static_position_buffer) >= 5:
avg_x = sum(pos[0] for pos in self.static_position_buffer) / len(self.static_position_buffer)
avg_y = sum(pos[1] for pos in self.static_position_buffer) / len(self.static_position_buffer)
avg_z = sum(pos[2] for pos in self.static_position_buffer) / len(self.static_position_buffer)
# 创建过滤后的位置
class FilteredPos:
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z
return FilteredPos(avg_x, avg_y, avg_z), True
else:
# 运动状态,清空缓冲区
self.static_position_buffer.clear()
return current_pos, False
def odom_callback(self, msg):
current_time = time.time()
current_pos = msg.pose.pose.position
# Calculate runtime
elapsed_time = current_time - self.start_time
# Get linear velocity components
linear_vel = msg.twist.twist.linear
vx = linear_vel.x
vy = linear_vel.y
vz = linear_vel.z
# Calculate total velocity magnitude
velocity = math.sqrt(vx**2 + vy**2 + vz**2)
# 应用静态噪声过滤
filtered_pos, is_static = self.filter_static_noise(current_pos, vx, vy, vz)
if is_static:
self.get_logger().debug(f"Static mode: using filtered position")
# 使用过滤后的位置
current_pos = filtered_pos
if self.last_position is not None:
# Calculate distance between points
dx = current_pos.x - self.last_position.x
dy = current_pos.y - self.last_position.y
dz = current_pos.z - self.last_position.z
distance = math.sqrt(dx*dx + dy*dy + dz*dz)
# Update total distance
self.total_distance += distance
# Display info every 2 seconds
if current_time - self.last_log_time > 2.0:
self.display_status(current_pos, elapsed_time, vx, vy, vz, velocity)
self.last_log_time = current_time
# 检查系统稳定性
if not self.system_stable:
# 检查启动延迟
if elapsed_time < self.startup_delay:
self.get_logger().debug(f"System warming up... {self.startup_delay - elapsed_time:.1f}s remaining")
return
# 检查数据有效性
data_is_valid = (
abs(current_pos.x) < 10.0 and abs(current_pos.y) < 10.0 and abs(current_pos.z) < 10.0 and
abs(vx) < 5.0 and abs(vy) < 5.0 and abs(vz) < 5.0 and
all(math.isfinite(val) for val in [current_pos.x, current_pos.y, current_pos.z, vx, vy, vz])
)
if data_is_valid:
self.data_valid_count += 1
if self.data_valid_count >= self.min_valid_data_count:
self.system_stable = True
self.get_logger().info("System stabilized, starting UART transmission")
else:
self.data_valid_count = 0 # 重置计数
self.get_logger().debug("Waiting for stable data...")
# Send data to STM32 every 50ms (20Hz) - 优化发送频率
if self.system_stable and current_time - self.last_sent_time > 0.05:
if self.ser and self.ser.is_open:
success = self.send_to_stm32(
current_pos.x,
current_pos.y,
current_pos.z,
vx,
vy,
vz
)
if success:
self.get_logger().info(f"Sent to STM32: X={current_pos.x:.3f}, Y={current_pos.y:.3f}, Z={current_pos.z:.3f}, Vx={vx:.3f}, Vy={vy:.3f}, Vz={vz:.3f}")
self.last_sent_time = current_time
# Save position history
self.position_history.append({
'time': current_time,
'position': current_pos,
'velocity': (vx, vy, vz),
'distance': self.total_distance
})
# Write to log file
self.log_to_file(elapsed_time, current_pos, vx, vy, vz)
# Update last position
self.last_position = current_pos
def display_status(self, position, elapsed_time, vx, vy, vz, velocity):
"""Display current status"""
self.get_logger().info("=" * 60)
self.get_logger().info(f"Runtime: {elapsed_time:.1f} sec")
self.get_logger().info(f"Position: X={position.x:.3f}m, Y={position.y:.3f}m, Z={position.z:.3f}m")
self.get_logger().info(f"Total distance: {self.total_distance:.3f} m")
self.get_logger().info(f"Velocity X: {vx:.3f} m/s")
self.get_logger().info(f"Velocity Y: {vy:.3f} m/s")
self.get_logger().info(f"Velocity Z: {vz:.3f} m/s")
self.get_logger().info(f"Total speed: {velocity:.3f} m/s")
self.get_logger().info(f"Avg speed: {self.total_distance/elapsed_time:.3f} m/s")
# Add UART status
if self.ser and self.ser.is_open:
self.get_logger().info("UART communication: Connected")
else:
self.get_logger().warning("UART communication: Disconnected")
def log_to_file(self, elapsed_time, position, vx, vy, vz):
"""Log data to file"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
with open(self.log_filename, 'a', encoding='utf-8') as f:
f.write(f"{timestamp},{elapsed_time:.3f},{position.x:.6f},{position.y:.6f},{position.z:.6f},{self.total_distance:.6f},{vx:.6f},{vy:.6f},{vz:.6f}\n")
def print_summary(self):
"""Print summary information"""
if len(self.position_history) > 0:
elapsed_time = time.time() - self.start_time
print("\n" + "="*80)
print("FAST-LIO Trajectory Report")
print("="*80)
print(f"Total runtime: {elapsed_time:.1f} sec ({elapsed_time/60:.1f} min)")
print(f"Total distance: {self.total_distance:.3f} m")
print(f"Avg speed: {self.total_distance/elapsed_time:.3f} m/s")
print(f"Data points: {len(self.position_history)}")
print(f"Trajectory saved to: {self.log_filename}")
# Add UART status
if self.ser and self.ser.is_open:
print(f"UART port: {self.uart_port} @ {self.baudrate} bps")
print(f"Protocol: Anonymous (Header 0xAA, Target 0xFF, ID 0x01)")
print(f"Data format: X/Y/Z/Vx/Vy/Vz (all int16)")
else:
print("UART communication: Disconnected")
if len(self.position_history) >= 2:
start_pos = self.position_history[0]['position']
end_pos = self.position_history[-1]['position']
# Calculate straight-line distance
dx = end_pos.x - start_pos.x
dy = end_pos.y - start_pos.y
dz = end_pos.z - start_pos.z
straight_distance = math.sqrt(dx*dx + dy*dy + dz*dz)
print(f"\nStart position: ({start_pos.x:.3f}, {start_pos.y:.3f}, {start_pos.z:.3f})")
print(f"End position: ({end_pos.x:.3f}, {end_pos.y:.3f}, {end_pos.z:.3f})")
print(f"Straight distance: {straight_distance:.3f} m")
print(f"Path efficiency: {(straight_distance/self.total_distance)*100:.1f}%")
print("="*80)
# Close UART
if self.ser and self.ser.is_open:
self.ser.close()
print("UART connection closed")
def main():
print("Starting FAST-LIO distance calculator...")
print("Tip: Press Ctrl+C to stop and view report")
print(f"Initializing UART: /dev/ttyTHS1 @ 115200 bps")
rclpy.init()
calculator = DistanceCalculator()
try:
rclpy.spin(calculator)
except KeyboardInterrupt:
print("\nStopping distance calculator...")
finally:
calculator.print_summary()
calculator.destroy_node()
rclpy.shutdown()
print("Program finished")
if __name__ == '__main__':
main()