上传文件至 /

This commit is contained in:
2025-07-16 18:37:38 +08:00
commit 18c43d3b59
5 changed files with 3515 additions and 0 deletions

377
distance_calculator.py Normal file
View File

@@ -0,0 +1,377 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
FAST-LIO Distance Calculator + STM32 Communication
Real-time calculation of radar movement distance and position,
and send data to STM32 via UART
"""
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
import math
import time
from datetime import datetime
import serial
import struct
import os
import sys
class DistanceCalculator(Node):
def __init__(self):
super().__init__('distance_calculator')
# Subscribe to odometry topic
self.subscription = self.create_subscription(
Odometry, '/Odometry', self.odom_callback, 10)
# Initialize variables
self.last_position = None
self.total_distance = 0.0
self.start_time = time.time()
self.last_log_time = time.time()
self.position_history = []
self.last_sent_time = time.time()
# 添加静态检测和噪声过滤
self.static_threshold = 0.002 # 静态检测阈值(米)
self.static_count = 0
self.static_position_buffer = []
self.max_static_buffer_size = 10
self.velocity_threshold = 0.001 # 速度阈值(米/秒)
self.last_position_filtered = None
# 系统启动稳定机制
self.startup_delay = 5.0 # 启动后5秒延迟发送
self.system_stable = False
self.data_valid_count = 0
self.min_valid_data_count = 10 # 需要10个连续有效数据才开始发送
# Initialize UART communication
self.uart_port = '/dev/ttyTHS1' # Jetson Orin NX UART1
self.baudrate = 115200
self.ser = None
try:
self.ser = serial.Serial(
port=self.uart_port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=0.1
)
self.get_logger().info(f"UART port opened: {self.uart_port}, Baudrate: {self.baudrate}")
except Exception as e:
self.get_logger().error(f"Failed to open UART port: {str(e)}")
self.get_logger().info("Data will not be sent to STM32")
# Create log file
self.log_filename = f"trajectory_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
self.get_logger().info("Distance calculator started")
self.get_logger().info(f"Trajectory data will be saved to: {self.log_filename}")
self.get_logger().info("Waiting for radar positioning data...")
# Write log file header
with open(self.log_filename, 'w', encoding='utf-8') as f:
f.write("Timestamp,Runtime(s),X(m),Y(m),Z(m),Total Distance(m),Velocity X(m/s),Velocity Y(m/s),Velocity Z(m/s)\n")
def send_to_stm32(self, x, y, z, vx, vy, vz):
"""Send data to STM32 using anonymous protocol"""
if self.ser is None or not self.ser.is_open:
return False
try:
# Protocol format: [0xAA][0xFF][0x01][Data Length][Data][Sum Check][Add Check]
# Scale factors
POS_SCALE_FACTOR = 1000 # 1 mm precision
VEL_SCALE_FACTOR = 1000 # 1 mm/s precision
# 数据有效性检查
# 检查是否为有效数值
if not all(math.isfinite(val) for val in [x, y, z, vx, vy, vz]):
self.get_logger().warning("Invalid data detected (NaN or Inf), skipping transmission")
return False
# 位置范围限制±30米
MAX_POS = 30.0
MIN_POS = -30.0
x = max(MIN_POS, min(MAX_POS, x))
y = max(MIN_POS, min(MAX_POS, y))
z = max(MIN_POS, min(MAX_POS, z))
# 速度范围限制±30 m/s
MAX_VEL = 30.0
MIN_VEL = -30.0
vx = max(MIN_VEL, min(MAX_VEL, vx))
vy = max(MIN_VEL, min(MAX_VEL, vy))
vz = max(MIN_VEL, min(MAX_VEL, vz))
# Convert to integer values
x_int = int(x * POS_SCALE_FACTOR)
y_int = int(y * POS_SCALE_FACTOR)
z_int = int(z * POS_SCALE_FACTOR)
vx_int = int(vx * VEL_SCALE_FACTOR)
vy_int = int(vy * VEL_SCALE_FACTOR)
vz_int = int(vz * VEL_SCALE_FACTOR)
# 短整型范围检查(-32768 到 32767
SHORT_MAX = 32767
SHORT_MIN = -32768
# 进一步限制确保在短整型范围内
x_int = max(SHORT_MIN, min(SHORT_MAX, x_int))
y_int = max(SHORT_MIN, min(SHORT_MAX, y_int))
z_int = max(SHORT_MIN, min(SHORT_MAX, z_int))
vx_int = max(SHORT_MIN, min(SHORT_MAX, vx_int))
vy_int = max(SHORT_MIN, min(SHORT_MAX, vy_int))
vz_int = max(SHORT_MIN, min(SHORT_MAX, vz_int))
# Pack data: 3 int16 (x,y,z) + 3 int16 (vx,vy,vz)
data_content = struct.pack('<hhhhhh',
x_int, y_int, z_int,
vx_int, vy_int, vz_int)
LEN = len(data_content) # Data length = 6 * 2 = 12 bytes
# Build frame
frame = bytearray()
frame.append(0xAA) # Header
frame.append(0xFF) # Target address
frame.append(0x01) # Function code
frame.append(LEN) # Data length
frame.extend(data_content)
# Calculate checksums
sum_check = 0
add_check = 0
for byte in frame:
sum_check = (sum_check + byte) & 0xFF
add_check = (add_check + sum_check) & 0xFF
# Add checksums
frame.append(sum_check)
frame.append(add_check)
# Send data
self.ser.write(frame)
return True
except Exception as e:
self.get_logger().error(f"Error sending data: {str(e)}")
return False
def filter_static_noise(self, current_pos, vx, vy, vz):
"""过滤静态噪声,当设备静止时减少位置抖动"""
# 计算速度大小
velocity_magnitude = math.sqrt(vx**2 + vy**2 + vz**2)
# 如果速度很小,认为是静止状态
if velocity_magnitude < self.velocity_threshold:
# 添加到静态位置缓冲区
self.static_position_buffer.append((current_pos.x, current_pos.y, current_pos.z))
# 保持缓冲区大小
if len(self.static_position_buffer) > self.max_static_buffer_size:
self.static_position_buffer.pop(0)
# 如果有足够的静态数据,使用平均值
if len(self.static_position_buffer) >= 5:
avg_x = sum(pos[0] for pos in self.static_position_buffer) / len(self.static_position_buffer)
avg_y = sum(pos[1] for pos in self.static_position_buffer) / len(self.static_position_buffer)
avg_z = sum(pos[2] for pos in self.static_position_buffer) / len(self.static_position_buffer)
# 创建过滤后的位置
class FilteredPos:
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z
return FilteredPos(avg_x, avg_y, avg_z), True
else:
# 运动状态,清空缓冲区
self.static_position_buffer.clear()
return current_pos, False
def odom_callback(self, msg):
current_time = time.time()
current_pos = msg.pose.pose.position
# Calculate runtime
elapsed_time = current_time - self.start_time
# Get linear velocity components
linear_vel = msg.twist.twist.linear
vx = linear_vel.x
vy = linear_vel.y
vz = linear_vel.z
# Calculate total velocity magnitude
velocity = math.sqrt(vx**2 + vy**2 + vz**2)
# 应用静态噪声过滤
filtered_pos, is_static = self.filter_static_noise(current_pos, vx, vy, vz)
if is_static:
self.get_logger().debug(f"Static mode: using filtered position")
# 使用过滤后的位置
current_pos = filtered_pos
if self.last_position is not None:
# Calculate distance between points
dx = current_pos.x - self.last_position.x
dy = current_pos.y - self.last_position.y
dz = current_pos.z - self.last_position.z
distance = math.sqrt(dx*dx + dy*dy + dz*dz)
# Update total distance
self.total_distance += distance
# Display info every 2 seconds
if current_time - self.last_log_time > 2.0:
self.display_status(current_pos, elapsed_time, vx, vy, vz, velocity)
self.last_log_time = current_time
# 检查系统稳定性
if not self.system_stable:
# 检查启动延迟
if elapsed_time < self.startup_delay:
self.get_logger().debug(f"System warming up... {self.startup_delay - elapsed_time:.1f}s remaining")
return
# 检查数据有效性
data_is_valid = (
abs(current_pos.x) < 10.0 and abs(current_pos.y) < 10.0 and abs(current_pos.z) < 10.0 and
abs(vx) < 5.0 and abs(vy) < 5.0 and abs(vz) < 5.0 and
all(math.isfinite(val) for val in [current_pos.x, current_pos.y, current_pos.z, vx, vy, vz])
)
if data_is_valid:
self.data_valid_count += 1
if self.data_valid_count >= self.min_valid_data_count:
self.system_stable = True
self.get_logger().info("System stabilized, starting UART transmission")
else:
self.data_valid_count = 0 # 重置计数
self.get_logger().debug("Waiting for stable data...")
# Send data to STM32 every 50ms (20Hz) - 优化发送频率
if self.system_stable and current_time - self.last_sent_time > 0.05:
if self.ser and self.ser.is_open:
success = self.send_to_stm32(
current_pos.x,
current_pos.y,
current_pos.z,
vx,
vy,
vz
)
if success:
self.get_logger().info(f"Sent to STM32: X={current_pos.x:.3f}, Y={current_pos.y:.3f}, Z={current_pos.z:.3f}, Vx={vx:.3f}, Vy={vy:.3f}, Vz={vz:.3f}")
self.last_sent_time = current_time
# Save position history
self.position_history.append({
'time': current_time,
'position': current_pos,
'velocity': (vx, vy, vz),
'distance': self.total_distance
})
# Write to log file
self.log_to_file(elapsed_time, current_pos, vx, vy, vz)
# Update last position
self.last_position = current_pos
def display_status(self, position, elapsed_time, vx, vy, vz, velocity):
"""Display current status"""
self.get_logger().info("=" * 60)
self.get_logger().info(f"Runtime: {elapsed_time:.1f} sec")
self.get_logger().info(f"Position: X={position.x:.3f}m, Y={position.y:.3f}m, Z={position.z:.3f}m")
self.get_logger().info(f"Total distance: {self.total_distance:.3f} m")
self.get_logger().info(f"Velocity X: {vx:.3f} m/s")
self.get_logger().info(f"Velocity Y: {vy:.3f} m/s")
self.get_logger().info(f"Velocity Z: {vz:.3f} m/s")
self.get_logger().info(f"Total speed: {velocity:.3f} m/s")
self.get_logger().info(f"Avg speed: {self.total_distance/elapsed_time:.3f} m/s")
# Add UART status
if self.ser and self.ser.is_open:
self.get_logger().info("UART communication: Connected")
else:
self.get_logger().warning("UART communication: Disconnected")
def log_to_file(self, elapsed_time, position, vx, vy, vz):
"""Log data to file"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
with open(self.log_filename, 'a', encoding='utf-8') as f:
f.write(f"{timestamp},{elapsed_time:.3f},{position.x:.6f},{position.y:.6f},{position.z:.6f},{self.total_distance:.6f},{vx:.6f},{vy:.6f},{vz:.6f}\n")
def print_summary(self):
"""Print summary information"""
if len(self.position_history) > 0:
elapsed_time = time.time() - self.start_time
print("\n" + "="*80)
print("FAST-LIO Trajectory Report")
print("="*80)
print(f"Total runtime: {elapsed_time:.1f} sec ({elapsed_time/60:.1f} min)")
print(f"Total distance: {self.total_distance:.3f} m")
print(f"Avg speed: {self.total_distance/elapsed_time:.3f} m/s")
print(f"Data points: {len(self.position_history)}")
print(f"Trajectory saved to: {self.log_filename}")
# Add UART status
if self.ser and self.ser.is_open:
print(f"UART port: {self.uart_port} @ {self.baudrate} bps")
print(f"Protocol: Anonymous (Header 0xAA, Target 0xFF, ID 0x01)")
print(f"Data format: X/Y/Z/Vx/Vy/Vz (all int16)")
else:
print("UART communication: Disconnected")
if len(self.position_history) >= 2:
start_pos = self.position_history[0]['position']
end_pos = self.position_history[-1]['position']
# Calculate straight-line distance
dx = end_pos.x - start_pos.x
dy = end_pos.y - start_pos.y
dz = end_pos.z - start_pos.z
straight_distance = math.sqrt(dx*dx + dy*dy + dz*dz)
print(f"\nStart position: ({start_pos.x:.3f}, {start_pos.y:.3f}, {start_pos.z:.3f})")
print(f"End position: ({end_pos.x:.3f}, {end_pos.y:.3f}, {end_pos.z:.3f})")
print(f"Straight distance: {straight_distance:.3f} m")
print(f"Path efficiency: {(straight_distance/self.total_distance)*100:.1f}%")
print("="*80)
# Close UART
if self.ser and self.ser.is_open:
self.ser.close()
print("UART connection closed")
def main():
print("Starting FAST-LIO distance calculator...")
print("Tip: Press Ctrl+C to stop and view report")
print(f"Initializing UART: /dev/ttyTHS1 @ 115200 bps")
rclpy.init()
calculator = DistanceCalculator()
try:
rclpy.spin(calculator)
except KeyboardInterrupt:
print("\nStopping distance calculator...")
finally:
calculator.print_summary()
calculator.destroy_node()
rclpy.shutdown()
print("Program finished")
if __name__ == '__main__':
main()