378 lines
15 KiB
Python
378 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
FAST-LIO Distance Calculator + STM32 Communication
|
||
Real-time calculation of radar movement distance and position,
|
||
and send data to STM32 via UART
|
||
"""
|
||
|
||
import rclpy
|
||
from rclpy.node import Node
|
||
from nav_msgs.msg import Odometry
|
||
import math
|
||
import time
|
||
from datetime import datetime
|
||
import serial
|
||
import struct
|
||
import os
|
||
import sys
|
||
|
||
class DistanceCalculator(Node):
|
||
def __init__(self):
|
||
super().__init__('distance_calculator')
|
||
|
||
# Subscribe to odometry topic
|
||
self.subscription = self.create_subscription(
|
||
Odometry, '/Odometry', self.odom_callback, 10)
|
||
|
||
# Initialize variables
|
||
self.last_position = None
|
||
self.total_distance = 0.0
|
||
self.start_time = time.time()
|
||
self.last_log_time = time.time()
|
||
self.position_history = []
|
||
self.last_sent_time = time.time()
|
||
|
||
# 添加静态检测和噪声过滤
|
||
self.static_threshold = 0.002 # 静态检测阈值(米)
|
||
self.static_count = 0
|
||
self.static_position_buffer = []
|
||
self.max_static_buffer_size = 10
|
||
self.velocity_threshold = 0.001 # 速度阈值(米/秒)
|
||
self.last_position_filtered = None
|
||
|
||
# 系统启动稳定机制
|
||
self.startup_delay = 5.0 # 启动后5秒延迟发送
|
||
self.system_stable = False
|
||
self.data_valid_count = 0
|
||
self.min_valid_data_count = 10 # 需要10个连续有效数据才开始发送
|
||
|
||
# Initialize UART communication
|
||
self.uart_port = '/dev/ttyTHS1' # Jetson Orin NX UART1
|
||
self.baudrate = 115200
|
||
self.ser = None
|
||
|
||
try:
|
||
self.ser = serial.Serial(
|
||
port=self.uart_port,
|
||
baudrate=self.baudrate,
|
||
bytesize=serial.EIGHTBITS,
|
||
parity=serial.PARITY_NONE,
|
||
stopbits=serial.STOPBITS_ONE,
|
||
timeout=0.1
|
||
)
|
||
self.get_logger().info(f"UART port opened: {self.uart_port}, Baudrate: {self.baudrate}")
|
||
except Exception as e:
|
||
self.get_logger().error(f"Failed to open UART port: {str(e)}")
|
||
self.get_logger().info("Data will not be sent to STM32")
|
||
|
||
# Create log file
|
||
self.log_filename = f"trajectory_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
|
||
self.get_logger().info("Distance calculator started")
|
||
self.get_logger().info(f"Trajectory data will be saved to: {self.log_filename}")
|
||
self.get_logger().info("Waiting for radar positioning data...")
|
||
|
||
# Write log file header
|
||
with open(self.log_filename, 'w', encoding='utf-8') as f:
|
||
f.write("Timestamp,Runtime(s),X(m),Y(m),Z(m),Total Distance(m),Velocity X(m/s),Velocity Y(m/s),Velocity Z(m/s)\n")
|
||
|
||
def send_to_stm32(self, x, y, z, vx, vy, vz):
|
||
"""Send data to STM32 using anonymous protocol"""
|
||
if self.ser is None or not self.ser.is_open:
|
||
return False
|
||
|
||
try:
|
||
# Protocol format: [0xAA][0xFF][0x01][Data Length][Data][Sum Check][Add Check]
|
||
# Scale factors
|
||
POS_SCALE_FACTOR = 1000 # 1 mm precision
|
||
VEL_SCALE_FACTOR = 1000 # 1 mm/s precision
|
||
|
||
# 数据有效性检查
|
||
# 检查是否为有效数值
|
||
if not all(math.isfinite(val) for val in [x, y, z, vx, vy, vz]):
|
||
self.get_logger().warning("Invalid data detected (NaN or Inf), skipping transmission")
|
||
return False
|
||
|
||
# 位置范围限制(±30米)
|
||
MAX_POS = 30.0
|
||
MIN_POS = -30.0
|
||
x = max(MIN_POS, min(MAX_POS, x))
|
||
y = max(MIN_POS, min(MAX_POS, y))
|
||
z = max(MIN_POS, min(MAX_POS, z))
|
||
|
||
# 速度范围限制(±30 m/s)
|
||
MAX_VEL = 30.0
|
||
MIN_VEL = -30.0
|
||
vx = max(MIN_VEL, min(MAX_VEL, vx))
|
||
vy = max(MIN_VEL, min(MAX_VEL, vy))
|
||
vz = max(MIN_VEL, min(MAX_VEL, vz))
|
||
|
||
# Convert to integer values
|
||
x_int = int(x * POS_SCALE_FACTOR)
|
||
y_int = int(y * POS_SCALE_FACTOR)
|
||
z_int = int(z * POS_SCALE_FACTOR)
|
||
vx_int = int(vx * VEL_SCALE_FACTOR)
|
||
vy_int = int(vy * VEL_SCALE_FACTOR)
|
||
vz_int = int(vz * VEL_SCALE_FACTOR)
|
||
|
||
# 短整型范围检查(-32768 到 32767)
|
||
SHORT_MAX = 32767
|
||
SHORT_MIN = -32768
|
||
|
||
# 进一步限制确保在短整型范围内
|
||
x_int = max(SHORT_MIN, min(SHORT_MAX, x_int))
|
||
y_int = max(SHORT_MIN, min(SHORT_MAX, y_int))
|
||
z_int = max(SHORT_MIN, min(SHORT_MAX, z_int))
|
||
vx_int = max(SHORT_MIN, min(SHORT_MAX, vx_int))
|
||
vy_int = max(SHORT_MIN, min(SHORT_MAX, vy_int))
|
||
vz_int = max(SHORT_MIN, min(SHORT_MAX, vz_int))
|
||
|
||
# Pack data: 3 int16 (x,y,z) + 3 int16 (vx,vy,vz)
|
||
data_content = struct.pack('<hhhhhh',
|
||
x_int, y_int, z_int,
|
||
vx_int, vy_int, vz_int)
|
||
LEN = len(data_content) # Data length = 6 * 2 = 12 bytes
|
||
|
||
# Build frame
|
||
frame = bytearray()
|
||
frame.append(0xAA) # Header
|
||
frame.append(0xFF) # Target address
|
||
frame.append(0x01) # Function code
|
||
frame.append(LEN) # Data length
|
||
frame.extend(data_content)
|
||
|
||
# Calculate checksums
|
||
sum_check = 0
|
||
add_check = 0
|
||
for byte in frame:
|
||
sum_check = (sum_check + byte) & 0xFF
|
||
add_check = (add_check + sum_check) & 0xFF
|
||
|
||
# Add checksums
|
||
frame.append(sum_check)
|
||
frame.append(add_check)
|
||
|
||
# Send data
|
||
self.ser.write(frame)
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.get_logger().error(f"Error sending data: {str(e)}")
|
||
return False
|
||
|
||
def filter_static_noise(self, current_pos, vx, vy, vz):
|
||
"""过滤静态噪声,当设备静止时减少位置抖动"""
|
||
# 计算速度大小
|
||
velocity_magnitude = math.sqrt(vx**2 + vy**2 + vz**2)
|
||
|
||
# 如果速度很小,认为是静止状态
|
||
if velocity_magnitude < self.velocity_threshold:
|
||
# 添加到静态位置缓冲区
|
||
self.static_position_buffer.append((current_pos.x, current_pos.y, current_pos.z))
|
||
|
||
# 保持缓冲区大小
|
||
if len(self.static_position_buffer) > self.max_static_buffer_size:
|
||
self.static_position_buffer.pop(0)
|
||
|
||
# 如果有足够的静态数据,使用平均值
|
||
if len(self.static_position_buffer) >= 5:
|
||
avg_x = sum(pos[0] for pos in self.static_position_buffer) / len(self.static_position_buffer)
|
||
avg_y = sum(pos[1] for pos in self.static_position_buffer) / len(self.static_position_buffer)
|
||
avg_z = sum(pos[2] for pos in self.static_position_buffer) / len(self.static_position_buffer)
|
||
|
||
# 创建过滤后的位置
|
||
class FilteredPos:
|
||
def __init__(self, x, y, z):
|
||
self.x = x
|
||
self.y = y
|
||
self.z = z
|
||
|
||
return FilteredPos(avg_x, avg_y, avg_z), True
|
||
else:
|
||
# 运动状态,清空缓冲区
|
||
self.static_position_buffer.clear()
|
||
|
||
return current_pos, False
|
||
|
||
def odom_callback(self, msg):
|
||
current_time = time.time()
|
||
current_pos = msg.pose.pose.position
|
||
|
||
# Calculate runtime
|
||
elapsed_time = current_time - self.start_time
|
||
|
||
# Get linear velocity components
|
||
linear_vel = msg.twist.twist.linear
|
||
vx = linear_vel.x
|
||
vy = linear_vel.y
|
||
vz = linear_vel.z
|
||
|
||
# Calculate total velocity magnitude
|
||
velocity = math.sqrt(vx**2 + vy**2 + vz**2)
|
||
|
||
# 应用静态噪声过滤
|
||
filtered_pos, is_static = self.filter_static_noise(current_pos, vx, vy, vz)
|
||
if is_static:
|
||
self.get_logger().debug(f"Static mode: using filtered position")
|
||
|
||
# 使用过滤后的位置
|
||
current_pos = filtered_pos
|
||
|
||
if self.last_position is not None:
|
||
# Calculate distance between points
|
||
dx = current_pos.x - self.last_position.x
|
||
dy = current_pos.y - self.last_position.y
|
||
dz = current_pos.z - self.last_position.z
|
||
distance = math.sqrt(dx*dx + dy*dy + dz*dz)
|
||
|
||
# Update total distance
|
||
self.total_distance += distance
|
||
|
||
# Display info every 2 seconds
|
||
if current_time - self.last_log_time > 2.0:
|
||
self.display_status(current_pos, elapsed_time, vx, vy, vz, velocity)
|
||
self.last_log_time = current_time
|
||
|
||
# 检查系统稳定性
|
||
if not self.system_stable:
|
||
# 检查启动延迟
|
||
if elapsed_time < self.startup_delay:
|
||
self.get_logger().debug(f"System warming up... {self.startup_delay - elapsed_time:.1f}s remaining")
|
||
return
|
||
|
||
# 检查数据有效性
|
||
data_is_valid = (
|
||
abs(current_pos.x) < 10.0 and abs(current_pos.y) < 10.0 and abs(current_pos.z) < 10.0 and
|
||
abs(vx) < 5.0 and abs(vy) < 5.0 and abs(vz) < 5.0 and
|
||
all(math.isfinite(val) for val in [current_pos.x, current_pos.y, current_pos.z, vx, vy, vz])
|
||
)
|
||
|
||
if data_is_valid:
|
||
self.data_valid_count += 1
|
||
if self.data_valid_count >= self.min_valid_data_count:
|
||
self.system_stable = True
|
||
self.get_logger().info("System stabilized, starting UART transmission")
|
||
else:
|
||
self.data_valid_count = 0 # 重置计数
|
||
self.get_logger().debug("Waiting for stable data...")
|
||
|
||
# Send data to STM32 every 50ms (20Hz) - 优化发送频率
|
||
if self.system_stable and current_time - self.last_sent_time > 0.05:
|
||
if self.ser and self.ser.is_open:
|
||
success = self.send_to_stm32(
|
||
current_pos.x,
|
||
current_pos.y,
|
||
current_pos.z,
|
||
vx,
|
||
vy,
|
||
vz
|
||
)
|
||
if success:
|
||
self.get_logger().info(f"Sent to STM32: X={current_pos.x:.3f}, Y={current_pos.y:.3f}, Z={current_pos.z:.3f}, Vx={vx:.3f}, Vy={vy:.3f}, Vz={vz:.3f}")
|
||
self.last_sent_time = current_time
|
||
|
||
# Save position history
|
||
self.position_history.append({
|
||
'time': current_time,
|
||
'position': current_pos,
|
||
'velocity': (vx, vy, vz),
|
||
'distance': self.total_distance
|
||
})
|
||
|
||
# Write to log file
|
||
self.log_to_file(elapsed_time, current_pos, vx, vy, vz)
|
||
|
||
# Update last position
|
||
self.last_position = current_pos
|
||
|
||
def display_status(self, position, elapsed_time, vx, vy, vz, velocity):
|
||
"""Display current status"""
|
||
self.get_logger().info("=" * 60)
|
||
self.get_logger().info(f"Runtime: {elapsed_time:.1f} sec")
|
||
self.get_logger().info(f"Position: X={position.x:.3f}m, Y={position.y:.3f}m, Z={position.z:.3f}m")
|
||
self.get_logger().info(f"Total distance: {self.total_distance:.3f} m")
|
||
self.get_logger().info(f"Velocity X: {vx:.3f} m/s")
|
||
self.get_logger().info(f"Velocity Y: {vy:.3f} m/s")
|
||
self.get_logger().info(f"Velocity Z: {vz:.3f} m/s")
|
||
self.get_logger().info(f"Total speed: {velocity:.3f} m/s")
|
||
self.get_logger().info(f"Avg speed: {self.total_distance/elapsed_time:.3f} m/s")
|
||
|
||
# Add UART status
|
||
if self.ser and self.ser.is_open:
|
||
self.get_logger().info("UART communication: Connected")
|
||
else:
|
||
self.get_logger().warning("UART communication: Disconnected")
|
||
|
||
def log_to_file(self, elapsed_time, position, vx, vy, vz):
|
||
"""Log data to file"""
|
||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
|
||
|
||
with open(self.log_filename, 'a', encoding='utf-8') as f:
|
||
f.write(f"{timestamp},{elapsed_time:.3f},{position.x:.6f},{position.y:.6f},{position.z:.6f},{self.total_distance:.6f},{vx:.6f},{vy:.6f},{vz:.6f}\n")
|
||
|
||
def print_summary(self):
|
||
"""Print summary information"""
|
||
if len(self.position_history) > 0:
|
||
elapsed_time = time.time() - self.start_time
|
||
|
||
print("\n" + "="*80)
|
||
print("FAST-LIO Trajectory Report")
|
||
print("="*80)
|
||
print(f"Total runtime: {elapsed_time:.1f} sec ({elapsed_time/60:.1f} min)")
|
||
print(f"Total distance: {self.total_distance:.3f} m")
|
||
print(f"Avg speed: {self.total_distance/elapsed_time:.3f} m/s")
|
||
print(f"Data points: {len(self.position_history)}")
|
||
print(f"Trajectory saved to: {self.log_filename}")
|
||
|
||
# Add UART status
|
||
if self.ser and self.ser.is_open:
|
||
print(f"UART port: {self.uart_port} @ {self.baudrate} bps")
|
||
print(f"Protocol: Anonymous (Header 0xAA, Target 0xFF, ID 0x01)")
|
||
print(f"Data format: X/Y/Z/Vx/Vy/Vz (all int16)")
|
||
else:
|
||
print("UART communication: Disconnected")
|
||
|
||
if len(self.position_history) >= 2:
|
||
start_pos = self.position_history[0]['position']
|
||
end_pos = self.position_history[-1]['position']
|
||
|
||
# Calculate straight-line distance
|
||
dx = end_pos.x - start_pos.x
|
||
dy = end_pos.y - start_pos.y
|
||
dz = end_pos.z - start_pos.z
|
||
straight_distance = math.sqrt(dx*dx + dy*dy + dz*dz)
|
||
|
||
print(f"\nStart position: ({start_pos.x:.3f}, {start_pos.y:.3f}, {start_pos.z:.3f})")
|
||
print(f"End position: ({end_pos.x:.3f}, {end_pos.y:.3f}, {end_pos.z:.3f})")
|
||
print(f"Straight distance: {straight_distance:.3f} m")
|
||
print(f"Path efficiency: {(straight_distance/self.total_distance)*100:.1f}%")
|
||
|
||
print("="*80)
|
||
|
||
# Close UART
|
||
if self.ser and self.ser.is_open:
|
||
self.ser.close()
|
||
print("UART connection closed")
|
||
|
||
def main():
|
||
print("Starting FAST-LIO distance calculator...")
|
||
print("Tip: Press Ctrl+C to stop and view report")
|
||
print(f"Initializing UART: /dev/ttyTHS1 @ 115200 bps")
|
||
|
||
rclpy.init()
|
||
calculator = DistanceCalculator()
|
||
|
||
try:
|
||
rclpy.spin(calculator)
|
||
except KeyboardInterrupt:
|
||
print("\nStopping distance calculator...")
|
||
finally:
|
||
calculator.print_summary()
|
||
calculator.destroy_node()
|
||
rclpy.shutdown()
|
||
print("Program finished")
|
||
|
||
if __name__ == '__main__':
|
||
main()
|