视障人士防撞预警系统
一、实际应用场景与痛点
应用场景
视障用户李先生在户外使用导盲杖行走。虽然导盲杖能探测地面障碍,但无法检测空中障碍物(如低垂树枝、广告牌、打开的窗户)和前方较远障碍。在一次行走中,他不慎撞到低矮的伸缩门,导致头部受伤。他急需一款能实时探测前方障碍物并语音预警的智能辅助设备。
核心痛点
1. 探测盲区:导盲杖主要探测地面,对空中和前方障碍探测不足
2. 反应延迟:发现障碍时已距离过近,来不及避让
3. 预警不直观:传统振动反馈无法表达障碍物的距离和方位
4. 环境适应性差:不同光照、天气条件下探测准确性下降
5. 操作复杂:多设备配合使用对视障用户不友好
6. 续航问题:长时间户外使用需要持续供电
二、核心逻辑设计
1. 初始化传感器(超声波/雷达/深度摄像头)
2. 实时扫描前方区域,获取距离数据
3. 处理原始数据,滤除噪声干扰
4. 划分安全区域(近、中、远距离警告区)
5. 根据距离计算危险等级
6. 生成多级别语音警报
7. 通过骨传导耳机输出警报
8. 记录行走路径和障碍信息
三、模块化代码实现
主程序文件:obstacle_detection_system.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视障人士防撞预警系统
实时检测前方障碍物,语音预警避免碰撞
版本:2.0.0
作者:无障碍智能助手
"""
import os
import sys
import time
import threading
import queue
import json
import math
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, asdict
from enum import Enum
import warnings
warnings.filterwarnings('ignore')
# 硬件接口
try:
import RPi.GPIO as GPIO
RASPBERRY_PI_AVAILABLE = True
except ImportError:
RASPBERRY_PI_AVAILABLE = False
print("提示: 非树莓派平台,使用模拟传感器")
try:
import numpy as np
NUMPY_AVAILABLE = True
except ImportError:
NUMPY_AVAILABLE = False
print("警告: numpy未安装,部分功能受限")
# 音频处理
try:
import pyaudio
import wave
import simpleaudio as sa
AUDIO_AVAILABLE = True
except ImportError:
AUDIO_AVAILABLE = False
print("警告: 音频模块未安装,语音提示功能受限")
# 语音合成
try:
import pyttsx3
TTS_AVAILABLE = True
except ImportError:
TTS_AVAILABLE = False
print("警告: 语音合成未安装,使用蜂鸣器提示")
class AlertLevel(Enum):
"""警报级别枚举"""
SAFE = 0 # 安全
CAUTION = 1 # 注意
WARNING = 2 # 警告
DANGER = 3 # 危险
CRITICAL = 4 # 紧急
@dataclass
class DetectionZone:
"""探测区域定义"""
name: str
angle_start: float # 起始角度(度)
angle_end: float # 结束角度(度)
distance_min: float # 最小探测距离(米)
distance_max: float # 最大探测距离(米)
alert_thresholds: Dict[AlertLevel, float] # 各级别警报阈值
@dataclass
class Obstacle:
"""障碍物信息"""
timestamp: float
distance: float # 距离(米)
angle: float # 角度(度)
width: float # 宽度(米,估计值)
height: float # 高度(米,估计值)
confidence: float # 置信度
alert_level: AlertLevel
position_x: float # 相对位置X
position_y: float # 相对位置Y
class SensorInterface:
"""传感器接口基类"""
def __init__(self, config: Dict):
"""
初始化传感器
Args:
config: 传感器配置
"""
self.config = config
self.is_connected = False
self.calibration_data = {}
def connect(self) -> bool:
"""连接传感器"""
raise NotImplementedError
def disconnect(self) -> None:
"""断开连接"""
raise NotImplementedError
def get_distance(self, angle: float = 0.0) -> Optional[float]:
"""
获取指定角度的距离
Args:
angle: 探测角度(度)
Returns:
距离(米),None表示无效
"""
raise NotImplementedError
def scan_sector(self, start_angle: float, end_angle: float,
step: float = 1.0) -> List[Tuple[float, float]]:
"""
扫描扇形区域
Args:
start_angle: 起始角度
end_angle: 结束角度
step: 扫描步长
Returns:
角度-距离对列表
"""
raise NotImplementedError
def calibrate(self) -> Dict:
"""校准传感器"""
raise NotImplementedError
class UltrasonicSensor(SensorInterface):
"""超声波传感器实现"""
def __init__(self, config: Dict):
"""
初始化超声波传感器
Args:
config: 配置字典
trigger_pin: 触发引脚
echo_pin: 回声引脚
max_distance: 最大探测距离
temperature: 环境温度(用于声速校准)
"""
super().__init__(config)
self.trigger_pin = config.get('trigger_pin', 23)
self.echo_pin = config.get('echo_pin', 24)
self.max_distance = config.get('max_distance', 4.0) # 米
self.temperature = config.get('temperature', 25.0) # 摄氏度
# 计算声速(米/秒)
self.sound_speed = 331.3 + 0.606 * self.temperature
# 伺服控制(如果支持扫描)
self.servo_pin = config.get('servo_pin')
self.current_angle = 0.0
self.setup_pins()
def setup_pins(self):
"""设置GPIO引脚"""
if RASPBERRY_PI_AVAILABLE:
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.trigger_pin, GPIO.OUT)
GPIO.setup(self.echo_pin, GPIO.IN)
GPIO.output(self.trigger_pin, GPIO.LOW)
if self.servo_pin:
GPIO.setup(self.servo_pin, GPIO.OUT)
self.servo = GPIO.PWM(self.servo_pin, 50) # 50Hz PWM
self.servo.start(0)
def connect(self) -> bool:
"""连接传感器"""
try:
# 测试传感器
distance = self.get_distance()
if distance and 0 < distance <= self.max_distance:
self.is_connected = True
print("超声波传感器连接成功")
return True
except Exception as e:
print(f"超声波传感器连接失败: {e}")
self.is_connected = False
return False
def disconnect(self) -> None:
"""断开连接"""
if RASPBERRY_PI_AVAILABLE:
if hasattr(self, 'servo'):
self.servo.stop()
GPIO.cleanup()
self.is_connected = False
def set_angle(self, angle: float) -> bool:
"""
设置伺服角度
Args:
angle: 目标角度(-90到90度)
Returns:
是否成功
"""
if not self.servo_pin or not RASPBERRY_PI_AVAILABLE:
return False
# 限制角度范围
angle = max(-90, min(90, angle))
# 转换为PWM占空比(假设伺服角度范围:0.5ms-2.5ms对应0-180度)
duty_cycle = 2.5 + (angle + 90) * (10.0 / 180.0) # 映射到2.5-12.5%
self.servo.ChangeDutyCycle(duty_cycle)
time.sleep(0.1) # 给伺服时间转动
self.servo.ChangeDutyCycle(0) # 停止PWM信号防止抖动
self.current_angle = angle
return True
def get_distance(self, angle: float = 0.0) -> Optional[float]:
"""
获取指定角度的距离
Args:
angle: 探测角度
Returns:
距离(米),None表示无效
"""
if not self.is_connected and not self.connect():
return None
# 设置角度
if angle != self.current_angle:
self.set_angle(angle)
if RASPBERRY_PI_AVAILABLE:
try:
# 发送触发脉冲
GPIO.output(self.trigger_pin, GPIO.HIGH)
time.sleep(0.00001) # 10微秒
GPIO.output(self.trigger_pin, GPIO.LOW)
# 等待回声引脚变高
start_time = time.time()
while GPIO.input(self.echo_pin) == GPIO.LOW:
if time.time() - start_time > 0.1: # 超时100ms
return None
start_time = time.time()
pulse_start = time.time()
# 等待回声引脚变低
while GPIO.input(self.echo_pin) == GPIO.HIGH:
pulse_end = time.time()
if pulse_end - pulse_start > 0.1: # 超时100ms
return None
# 计算距离
pulse_duration = pulse_end - pulse_start
distance = (pulse_duration * self.sound_speed) / 2
# 有效性检查
if 0.02 <= distance <= self.max_distance: # 2cm到最大距离
return distance
except Exception as e:
print(f"超声波测距错误: {e}")
else:
# 模拟模式,用于测试
import random
time.sleep(0.05) # 模拟测量延迟
# 模拟障碍物:距离随角度变化
base_distance = 2.0
angle_factor = 1.0 + abs(angle) / 90.0 # 角度越大,距离越小
noise = random.uniform(-0.1, 0.1)
distance = base_distance / angle_factor + noise
distance = max(0.1, min(self.max_distance, distance))
return distance
return None
def scan_sector(self, start_angle: float, end_angle: float,
step: float = 1.0) -> List[Tuple[float, float]]:
"""
扫描扇形区域
Args:
start_angle: 起始角度
end_angle: 结束角度
step: 扫描步长
Returns:
角度-距离对列表
"""
measurements = []
if start_angle > end_angle:
start_angle, end_angle = end_angle, start_angle
current_angle = start_angle
while current_angle <= end_angle:
distance = self.get_distance(current_angle)
if distance:
measurements.append((current_angle, distance))
current_angle += step
time.sleep(0.05) # 防止扫描过快
return measurements
def calibrate(self) -> Dict:
"""校准传感器"""
print("开始超声波传感器校准...")
calibration_results = {
'temperature': self.temperature,
'sound_speed': self.sound_speed,
'measurements': []
}
# 测量已知距离
test_distances = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0]
for known_distance in test_distances:
print(f"请将障碍物放置在 {known_distance} 米处,按Enter继续...")
input()
measurements = []
for _ in range(5):
distance = self.get_distance(0)
if distance:
measurements.append(distance)
time.sleep(0.1)
if measurements:
avg_measured = sum(measurements) / len(measurements)
error = avg_measured - known_distance
calibration_results['measurements'].append({
'known': known_distance,
'measured': avg_measured,
'error': error,
'error_percent': (error / known_distance) * 100
})
print(f"已知距离: {known_distance:.2f}m, 测量: {avg_measured:.2f}m, "
f"误差: {error:.3f}m ({error/known_distance*100:.1f}%)")
# 计算校准系数
if calibration_results['measurements']:
total_error = sum(m['error'] for m in calibration_results['measurements'])
avg_error = total_error / len(calibration_results['measurements'])
calibration_results['calibration_offset'] = -avg_error
calibration_results['calibration_factor'] = 1.0
print(f"校准完成,建议偏移量: {-avg_error:.3f}m")
return calibration_results
class RadarSensor(SensorInterface):
"""毫米波雷达传感器(模拟)"""
def __init__(self, config: Dict):
super().__init__(config)
# 雷达特定初始化
self.resolution = config.get('resolution', 0.05) # 距离分辨率
self.fov = config.get('fov', 120) # 视场角
self.connected = False
def connect(self) -> bool:
"""连接雷达"""
# 模拟连接
time.sleep(0.5)
self.connected = True
print("雷达传感器连接成功(模拟)")
return True
def get_distance(self, angle: float = 0.0) -> Optional[float]:
"""雷达测距"""
if not self.connected:
return None
# 模拟雷达数据
time.sleep(0.01)
return 1.5 + 0.5 * math.sin(time.time()) # 模拟动态距离
class DepthCameraSensor(SensorInterface):
"""深度摄像头传感器(模拟)"""
def __init__(self, config: Dict):
super().__init__(config)
self.width = config.get('width', 640)
self.height = config.get('height', 480)
self.connected = False
def connect(self) -> bool:
"""连接深度摄像头"""
# 模拟连接
time.sleep(0.5)
self.connected = True
print("深度摄像头连接成功(模拟)")
return True
def get_distance(self, angle: float = 0.0) -> Optional[float]:
"""深度摄像头测距"""
if not self.connected:
return None
# 模拟深度数据
time.sleep(0.02)
return 2.0 + 0.3 * math.cos(time.time() * 2) # 模拟动态距离
class SensorManager:
"""传感器管理器(支持多传感器融合)"""
def __init__(self, config: Dict):
"""
初始化传感器管理器
Args:
config: 传感器配置
"""
self.config = config
self.sensors = {}
self.data_history = []
self.history_size = 100
# 初始化传感器
self.init_sensors()
def init_sensors(self):
"""初始化所有传感器"""
sensor_configs = self.config.get('sensors', {})
# 超声波传感器
if 'ultrasonic' in sensor_configs and sensor_configs['ultrasonic'].get('enabled', True):
try:
self.sensors['ultrasonic'] = UltrasonicSensor(
sensor_configs['ultrasonic']
)
print("超声波传感器初始化完成")
except Exception as e:
print(f"超声波传感器初始化失败: {e}")
# 雷达传感器
if 'radar' in sensor_configs and sensor_configs['radar'].get('enabled', False):
try:
self.sensors['radar'] = RadarSensor(sensor_configs['radar'])
print("雷达传感器初始化完成")
except Exception as e:
print(f"雷达传感器初始化失败: {e}")
# 深度摄像头
if 'depth_camera' in sensor_configs and sensor_configs['depth_camera'].get('enabled', False):
try:
self.sensors['depth_camera'] = DepthCameraSensor(sensor_configs['depth_camera'])
print("深度摄像头初始化完成")
except Exception as e:
print(f"深度摄像头初始化失败: {e}")
if not self.sensors:
print("警告: 没有可用的传感器,使用模拟传感器")
self.sensors['simulated'] = UltrasonicSensor({
'trigger_pin': None,
'echo_pin': None,
'max_distance': 4.0
})
def connect_all(self) -> bool:
"""连接所有传感器"""
connected = False
for name, sensor in self.sensors.items():
if sensor.connect():
connected = True
print(f"{name}传感器连接成功")
else:
print(f"{name}传感器连接失败")
return connected
def get_fused_distance(self, angle: float = 0.0) -> Optional[float]:
"""
获取融合后的距离数据
Args:
angle: 探测角度
Returns:
融合后的距离
"""
distances = []
weights = []
for name, sensor in self.sensors.items():
if sensor.is_connected:
distance = sensor.get_distance(angle)
if distance:
distances.append(distance)
# 根据传感器类型设置权重
if name == 'ultrasonic':
weight = 0.6
elif name == 'radar':
weight = 0.8
elif name == 'depth_camera':
weight = 1.0
else:
weight = 0.5
weights.append(weight)
if not distances:
return None
# 加权平均
weighted_sum = sum(d * w for d, w in zip(distances, weights))
total_weight = sum(weights)
fused_distance = weighted_sum / total_weight
# 记录到历史数据
self.data_history.append({
'timestamp': time.time(),
'angle': angle,
'distance': fused_distance,
'sensor_readings': distances
})
# 限制历史数据大小
if len(self.data_history) > self.history_size:
self.data_history.pop(0)
return fused_distance
def scan_fused_sector(self, start_angle: float, end_angle: float,
step: float = 1.0) -> List[Tuple[float, float]]:
"""
扫描融合扇形区域
Args:
start_angle: 起始角度
end_angle: 结束角度
step: 扫描步长
Returns:
角度-距离对列表
"""
measurements = []
if start_angle > end_angle:
start_angle, end_angle = end_angle, start_angle
# 使用主传感器扫描
main_sensor = next(iter(self.sensors.values()))
current_angle = start_angle
while current_angle <= end_angle:
distance = self.get_fused_distance(current_angle)
if distance:
measurements.append((current_angle, distance))
current_angle += step
return measurements
def disconnect_all(self):
"""断开所有传感器"""
for sensor in self.sensors.values():
sensor.disconnect()
class ObstacleDetector:
"""障碍物检测器"""
def __init__(self, config: Dict):
"""
初始化障碍物检测器
Args:
config: 检测器配置
"""
self.config = config
self.zones = self.setup_detection_zones()
self.obstacle_history = []
self.history_size = 50
# 滤波参数
self.filter_window = config.get('filter_window', 5)
self.distance_history = {}
def setup_detection_zones(self) -> List[DetectionZone]:
"""设置探测区域"""
zones_config = self.config.get('zones', [
{
'name': 'center',
'angle_start': -15,
'angle_end': 15,
'distance_min': 0.1,
'distance_max': 4.0,
'alert_thresholds': {
AlertLevel.SAFE: 2.0,
AlertLevel.CAUTION: 1.5,
AlertLevel.WARNING: 1.0,
AlertLevel.DANGER: 0.5,
AlertLevel.CRITICAL: 0.3
}
},
{
'name': 'left',
'angle_start': -60,
'angle_end': -15,
'distance_min': 0.1,
'distance_max': 3.0,
'alert_thresholds': {
AlertLevel.SAFE: 1.5,
AlertLevel.CAUTION: 1.0,
AlertLevel.WARNING: 0.7,
AlertLevel.DANGER: 0.4,
AlertLevel.CRITICAL: 0.25
}
},
{
'name': 'right',
'angle_start': 15,
'angle_end': 60,
'distance_min': 0.1,
'distance_max': 3.0,
'alert_thresholds': {
AlertLevel.SAFE: 1.5,
AlertLevel.CAUTION: 1.0,
AlertLevel.WARNING: 0.7,
AlertLevel.DANGER: 0.4,
AlertLevel.CRITICAL: 0.25
}
}
])
zones = []
for zone_config in zones_config:
thresholds = {
AlertLevel[key.upper()]: value
for key, value in zone_config['alert_thresholds'].items()
}
zone = DetectionZone(
name=zone_config['name'],
angle_start=zone_config['angle_start'],
angle_end=zone_config['angle_end'],
distance_min=zone_config['distance_min'],
distance_max=zone_config['distance_max'],
alert_thresholds=thresholds
)
zones.append(zone)
return zones
def apply_filter(self, angle: float, distance: float) -> float:
"""
应用滤波器去除噪声
Args:
如果你觉得这个工具好用,欢迎关注我!