news 2026/2/4 3:50:36

检测前方障碍物距离,当距离过近时语音报警,避免视障用户碰撞受伤。

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
检测前方障碍物距离,当距离过近时语音报警,避免视障用户碰撞受伤。

视障人士防撞预警系统

一、实际应用场景与痛点

应用场景

视障用户李先生在户外使用导盲杖行走。虽然导盲杖能探测地面障碍,但无法检测空中障碍物(如低垂树枝、广告牌、打开的窗户)和前方较远障碍。在一次行走中,他不慎撞到低矮的伸缩门,导致头部受伤。他急需一款能实时探测前方障碍物并语音预警的智能辅助设备。

核心痛点

1. 探测盲区:导盲杖主要探测地面,对空中和前方障碍探测不足

2. 反应延迟:发现障碍时已距离过近,来不及避让

3. 预警不直观:传统振动反馈无法表达障碍物的距离和方位

4. 环境适应性差:不同光照、天气条件下探测准确性下降

5. 操作复杂:多设备配合使用对视障用户不友好

6. 续航问题:长时间户外使用需要持续供电

二、核心逻辑设计

1. 初始化传感器(超声波/雷达/深度摄像头)

2. 实时扫描前方区域,获取距离数据

3. 处理原始数据,滤除噪声干扰

4. 划分安全区域(近、中、远距离警告区)

5. 根据距离计算危险等级

6. 生成多级别语音警报

7. 通过骨传导耳机输出警报

8. 记录行走路径和障碍信息

三、模块化代码实现

主程序文件:obstacle_detection_system.py

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

"""

视障人士防撞预警系统

实时检测前方障碍物,语音预警避免碰撞

版本:2.0.0

作者:无障碍智能助手

"""

import os

import sys

import time

import threading

import queue

import json

import math

from datetime import datetime

from typing import Dict, List, Tuple, Optional, Any

from dataclasses import dataclass, asdict

from enum import Enum

import warnings

warnings.filterwarnings('ignore')

# 硬件接口

try:

import RPi.GPIO as GPIO

RASPBERRY_PI_AVAILABLE = True

except ImportError:

RASPBERRY_PI_AVAILABLE = False

print("提示: 非树莓派平台,使用模拟传感器")

try:

import numpy as np

NUMPY_AVAILABLE = True

except ImportError:

NUMPY_AVAILABLE = False

print("警告: numpy未安装,部分功能受限")

# 音频处理

try:

import pyaudio

import wave

import simpleaudio as sa

AUDIO_AVAILABLE = True

except ImportError:

AUDIO_AVAILABLE = False

print("警告: 音频模块未安装,语音提示功能受限")

# 语音合成

try:

import pyttsx3

TTS_AVAILABLE = True

except ImportError:

TTS_AVAILABLE = False

print("警告: 语音合成未安装,使用蜂鸣器提示")

class AlertLevel(Enum):

"""警报级别枚举"""

SAFE = 0 # 安全

CAUTION = 1 # 注意

WARNING = 2 # 警告

DANGER = 3 # 危险

CRITICAL = 4 # 紧急

@dataclass

class DetectionZone:

"""探测区域定义"""

name: str

angle_start: float # 起始角度(度)

angle_end: float # 结束角度(度)

distance_min: float # 最小探测距离(米)

distance_max: float # 最大探测距离(米)

alert_thresholds: Dict[AlertLevel, float] # 各级别警报阈值

@dataclass

class Obstacle:

"""障碍物信息"""

timestamp: float

distance: float # 距离(米)

angle: float # 角度(度)

width: float # 宽度(米,估计值)

height: float # 高度(米,估计值)

confidence: float # 置信度

alert_level: AlertLevel

position_x: float # 相对位置X

position_y: float # 相对位置Y

class SensorInterface:

"""传感器接口基类"""

def __init__(self, config: Dict):

"""

初始化传感器

Args:

config: 传感器配置

"""

self.config = config

self.is_connected = False

self.calibration_data = {}

def connect(self) -> bool:

"""连接传感器"""

raise NotImplementedError

def disconnect(self) -> None:

"""断开连接"""

raise NotImplementedError

def get_distance(self, angle: float = 0.0) -> Optional[float]:

"""

获取指定角度的距离

Args:

angle: 探测角度(度)

Returns:

距离(米),None表示无效

"""

raise NotImplementedError

def scan_sector(self, start_angle: float, end_angle: float,

step: float = 1.0) -> List[Tuple[float, float]]:

"""

扫描扇形区域

Args:

start_angle: 起始角度

end_angle: 结束角度

step: 扫描步长

Returns:

角度-距离对列表

"""

raise NotImplementedError

def calibrate(self) -> Dict:

"""校准传感器"""

raise NotImplementedError

class UltrasonicSensor(SensorInterface):

"""超声波传感器实现"""

def __init__(self, config: Dict):

"""

初始化超声波传感器

Args:

config: 配置字典

trigger_pin: 触发引脚

echo_pin: 回声引脚

max_distance: 最大探测距离

temperature: 环境温度(用于声速校准)

"""

super().__init__(config)

self.trigger_pin = config.get('trigger_pin', 23)

self.echo_pin = config.get('echo_pin', 24)

self.max_distance = config.get('max_distance', 4.0) # 米

self.temperature = config.get('temperature', 25.0) # 摄氏度

# 计算声速(米/秒)

self.sound_speed = 331.3 + 0.606 * self.temperature

# 伺服控制(如果支持扫描)

self.servo_pin = config.get('servo_pin')

self.current_angle = 0.0

self.setup_pins()

def setup_pins(self):

"""设置GPIO引脚"""

if RASPBERRY_PI_AVAILABLE:

GPIO.setmode(GPIO.BCM)

GPIO.setup(self.trigger_pin, GPIO.OUT)

GPIO.setup(self.echo_pin, GPIO.IN)

GPIO.output(self.trigger_pin, GPIO.LOW)

if self.servo_pin:

GPIO.setup(self.servo_pin, GPIO.OUT)

self.servo = GPIO.PWM(self.servo_pin, 50) # 50Hz PWM

self.servo.start(0)

def connect(self) -> bool:

"""连接传感器"""

try:

# 测试传感器

distance = self.get_distance()

if distance and 0 < distance <= self.max_distance:

self.is_connected = True

print("超声波传感器连接成功")

return True

except Exception as e:

print(f"超声波传感器连接失败: {e}")

self.is_connected = False

return False

def disconnect(self) -> None:

"""断开连接"""

if RASPBERRY_PI_AVAILABLE:

if hasattr(self, 'servo'):

self.servo.stop()

GPIO.cleanup()

self.is_connected = False

def set_angle(self, angle: float) -> bool:

"""

设置伺服角度

Args:

angle: 目标角度(-90到90度)

Returns:

是否成功

"""

if not self.servo_pin or not RASPBERRY_PI_AVAILABLE:

return False

# 限制角度范围

angle = max(-90, min(90, angle))

# 转换为PWM占空比(假设伺服角度范围:0.5ms-2.5ms对应0-180度)

duty_cycle = 2.5 + (angle + 90) * (10.0 / 180.0) # 映射到2.5-12.5%

self.servo.ChangeDutyCycle(duty_cycle)

time.sleep(0.1) # 给伺服时间转动

self.servo.ChangeDutyCycle(0) # 停止PWM信号防止抖动

self.current_angle = angle

return True

def get_distance(self, angle: float = 0.0) -> Optional[float]:

"""

获取指定角度的距离

Args:

angle: 探测角度

Returns:

距离(米),None表示无效

"""

if not self.is_connected and not self.connect():

return None

# 设置角度

if angle != self.current_angle:

self.set_angle(angle)

if RASPBERRY_PI_AVAILABLE:

try:

# 发送触发脉冲

GPIO.output(self.trigger_pin, GPIO.HIGH)

time.sleep(0.00001) # 10微秒

GPIO.output(self.trigger_pin, GPIO.LOW)

# 等待回声引脚变高

start_time = time.time()

while GPIO.input(self.echo_pin) == GPIO.LOW:

if time.time() - start_time > 0.1: # 超时100ms

return None

start_time = time.time()

pulse_start = time.time()

# 等待回声引脚变低

while GPIO.input(self.echo_pin) == GPIO.HIGH:

pulse_end = time.time()

if pulse_end - pulse_start > 0.1: # 超时100ms

return None

# 计算距离

pulse_duration = pulse_end - pulse_start

distance = (pulse_duration * self.sound_speed) / 2

# 有效性检查

if 0.02 <= distance <= self.max_distance: # 2cm到最大距离

return distance

except Exception as e:

print(f"超声波测距错误: {e}")

else:

# 模拟模式,用于测试

import random

time.sleep(0.05) # 模拟测量延迟

# 模拟障碍物:距离随角度变化

base_distance = 2.0

angle_factor = 1.0 + abs(angle) / 90.0 # 角度越大,距离越小

noise = random.uniform(-0.1, 0.1)

distance = base_distance / angle_factor + noise

distance = max(0.1, min(self.max_distance, distance))

return distance

return None

def scan_sector(self, start_angle: float, end_angle: float,

step: float = 1.0) -> List[Tuple[float, float]]:

"""

扫描扇形区域

Args:

start_angle: 起始角度

end_angle: 结束角度

step: 扫描步长

Returns:

角度-距离对列表

"""

measurements = []

if start_angle > end_angle:

start_angle, end_angle = end_angle, start_angle

current_angle = start_angle

while current_angle <= end_angle:

distance = self.get_distance(current_angle)

if distance:

measurements.append((current_angle, distance))

current_angle += step

time.sleep(0.05) # 防止扫描过快

return measurements

def calibrate(self) -> Dict:

"""校准传感器"""

print("开始超声波传感器校准...")

calibration_results = {

'temperature': self.temperature,

'sound_speed': self.sound_speed,

'measurements': []

}

# 测量已知距离

test_distances = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0]

for known_distance in test_distances:

print(f"请将障碍物放置在 {known_distance} 米处,按Enter继续...")

input()

measurements = []

for _ in range(5):

distance = self.get_distance(0)

if distance:

measurements.append(distance)

time.sleep(0.1)

if measurements:

avg_measured = sum(measurements) / len(measurements)

error = avg_measured - known_distance

calibration_results['measurements'].append({

'known': known_distance,

'measured': avg_measured,

'error': error,

'error_percent': (error / known_distance) * 100

})

print(f"已知距离: {known_distance:.2f}m, 测量: {avg_measured:.2f}m, "

f"误差: {error:.3f}m ({error/known_distance*100:.1f}%)")

# 计算校准系数

if calibration_results['measurements']:

total_error = sum(m['error'] for m in calibration_results['measurements'])

avg_error = total_error / len(calibration_results['measurements'])

calibration_results['calibration_offset'] = -avg_error

calibration_results['calibration_factor'] = 1.0

print(f"校准完成,建议偏移量: {-avg_error:.3f}m")

return calibration_results

class RadarSensor(SensorInterface):

"""毫米波雷达传感器(模拟)"""

def __init__(self, config: Dict):

super().__init__(config)

# 雷达特定初始化

self.resolution = config.get('resolution', 0.05) # 距离分辨率

self.fov = config.get('fov', 120) # 视场角

self.connected = False

def connect(self) -> bool:

"""连接雷达"""

# 模拟连接

time.sleep(0.5)

self.connected = True

print("雷达传感器连接成功(模拟)")

return True

def get_distance(self, angle: float = 0.0) -> Optional[float]:

"""雷达测距"""

if not self.connected:

return None

# 模拟雷达数据

time.sleep(0.01)

return 1.5 + 0.5 * math.sin(time.time()) # 模拟动态距离

class DepthCameraSensor(SensorInterface):

"""深度摄像头传感器(模拟)"""

def __init__(self, config: Dict):

super().__init__(config)

self.width = config.get('width', 640)

self.height = config.get('height', 480)

self.connected = False

def connect(self) -> bool:

"""连接深度摄像头"""

# 模拟连接

time.sleep(0.5)

self.connected = True

print("深度摄像头连接成功(模拟)")

return True

def get_distance(self, angle: float = 0.0) -> Optional[float]:

"""深度摄像头测距"""

if not self.connected:

return None

# 模拟深度数据

time.sleep(0.02)

return 2.0 + 0.3 * math.cos(time.time() * 2) # 模拟动态距离

class SensorManager:

"""传感器管理器(支持多传感器融合)"""

def __init__(self, config: Dict):

"""

初始化传感器管理器

Args:

config: 传感器配置

"""

self.config = config

self.sensors = {}

self.data_history = []

self.history_size = 100

# 初始化传感器

self.init_sensors()

def init_sensors(self):

"""初始化所有传感器"""

sensor_configs = self.config.get('sensors', {})

# 超声波传感器

if 'ultrasonic' in sensor_configs and sensor_configs['ultrasonic'].get('enabled', True):

try:

self.sensors['ultrasonic'] = UltrasonicSensor(

sensor_configs['ultrasonic']

)

print("超声波传感器初始化完成")

except Exception as e:

print(f"超声波传感器初始化失败: {e}")

# 雷达传感器

if 'radar' in sensor_configs and sensor_configs['radar'].get('enabled', False):

try:

self.sensors['radar'] = RadarSensor(sensor_configs['radar'])

print("雷达传感器初始化完成")

except Exception as e:

print(f"雷达传感器初始化失败: {e}")

# 深度摄像头

if 'depth_camera' in sensor_configs and sensor_configs['depth_camera'].get('enabled', False):

try:

self.sensors['depth_camera'] = DepthCameraSensor(sensor_configs['depth_camera'])

print("深度摄像头初始化完成")

except Exception as e:

print(f"深度摄像头初始化失败: {e}")

if not self.sensors:

print("警告: 没有可用的传感器,使用模拟传感器")

self.sensors['simulated'] = UltrasonicSensor({

'trigger_pin': None,

'echo_pin': None,

'max_distance': 4.0

})

def connect_all(self) -> bool:

"""连接所有传感器"""

connected = False

for name, sensor in self.sensors.items():

if sensor.connect():

connected = True

print(f"{name}传感器连接成功")

else:

print(f"{name}传感器连接失败")

return connected

def get_fused_distance(self, angle: float = 0.0) -> Optional[float]:

"""

获取融合后的距离数据

Args:

angle: 探测角度

Returns:

融合后的距离

"""

distances = []

weights = []

for name, sensor in self.sensors.items():

if sensor.is_connected:

distance = sensor.get_distance(angle)

if distance:

distances.append(distance)

# 根据传感器类型设置权重

if name == 'ultrasonic':

weight = 0.6

elif name == 'radar':

weight = 0.8

elif name == 'depth_camera':

weight = 1.0

else:

weight = 0.5

weights.append(weight)

if not distances:

return None

# 加权平均

weighted_sum = sum(d * w for d, w in zip(distances, weights))

total_weight = sum(weights)

fused_distance = weighted_sum / total_weight

# 记录到历史数据

self.data_history.append({

'timestamp': time.time(),

'angle': angle,

'distance': fused_distance,

'sensor_readings': distances

})

# 限制历史数据大小

if len(self.data_history) > self.history_size:

self.data_history.pop(0)

return fused_distance

def scan_fused_sector(self, start_angle: float, end_angle: float,

step: float = 1.0) -> List[Tuple[float, float]]:

"""

扫描融合扇形区域

Args:

start_angle: 起始角度

end_angle: 结束角度

step: 扫描步长

Returns:

角度-距离对列表

"""

measurements = []

if start_angle > end_angle:

start_angle, end_angle = end_angle, start_angle

# 使用主传感器扫描

main_sensor = next(iter(self.sensors.values()))

current_angle = start_angle

while current_angle <= end_angle:

distance = self.get_fused_distance(current_angle)

if distance:

measurements.append((current_angle, distance))

current_angle += step

return measurements

def disconnect_all(self):

"""断开所有传感器"""

for sensor in self.sensors.values():

sensor.disconnect()

class ObstacleDetector:

"""障碍物检测器"""

def __init__(self, config: Dict):

"""

初始化障碍物检测器

Args:

config: 检测器配置

"""

self.config = config

self.zones = self.setup_detection_zones()

self.obstacle_history = []

self.history_size = 50

# 滤波参数

self.filter_window = config.get('filter_window', 5)

self.distance_history = {}

def setup_detection_zones(self) -> List[DetectionZone]:

"""设置探测区域"""

zones_config = self.config.get('zones', [

{

'name': 'center',

'angle_start': -15,

'angle_end': 15,

'distance_min': 0.1,

'distance_max': 4.0,

'alert_thresholds': {

AlertLevel.SAFE: 2.0,

AlertLevel.CAUTION: 1.5,

AlertLevel.WARNING: 1.0,

AlertLevel.DANGER: 0.5,

AlertLevel.CRITICAL: 0.3

}

},

{

'name': 'left',

'angle_start': -60,

'angle_end': -15,

'distance_min': 0.1,

'distance_max': 3.0,

'alert_thresholds': {

AlertLevel.SAFE: 1.5,

AlertLevel.CAUTION: 1.0,

AlertLevel.WARNING: 0.7,

AlertLevel.DANGER: 0.4,

AlertLevel.CRITICAL: 0.25

}

},

{

'name': 'right',

'angle_start': 15,

'angle_end': 60,

'distance_min': 0.1,

'distance_max': 3.0,

'alert_thresholds': {

AlertLevel.SAFE: 1.5,

AlertLevel.CAUTION: 1.0,

AlertLevel.WARNING: 0.7,

AlertLevel.DANGER: 0.4,

AlertLevel.CRITICAL: 0.25

}

}

])

zones = []

for zone_config in zones_config:

thresholds = {

AlertLevel[key.upper()]: value

for key, value in zone_config['alert_thresholds'].items()

}

zone = DetectionZone(

name=zone_config['name'],

angle_start=zone_config['angle_start'],

angle_end=zone_config['angle_end'],

distance_min=zone_config['distance_min'],

distance_max=zone_config['distance_max'],

alert_thresholds=thresholds

)

zones.append(zone)

return zones

def apply_filter(self, angle: float, distance: float) -> float:

"""

应用滤波器去除噪声

Args:

如果你觉得这个工具好用,欢迎关注我!

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/3 11:55:32

告别混乱分支:基于Docker的Git工作树隔离方案,实现秒级切换

第一章&#xff1a;告别混乱分支&#xff1a;Docker与Git工作树的融合之道在现代软件开发中&#xff0c;频繁的特性迭代和多环境部署常常导致 Git 分支泛滥&#xff0c;开发、测试与生产环境之间难以保持一致性。通过将 Docker 容器化技术与 Git 工作树策略深度融合&#xff0c…

作者头像 李华
网站建设 2026/2/3 12:45:59

Semantic Kernel插件化尝试:微软生态下的AI能力扩展

Semantic Kernel插件化尝试&#xff1a;微软生态下的AI能力扩展 在当今 AI 技术飞速演进的背景下&#xff0c;一个明显的趋势正在浮现&#xff1a;我们不再一味追求“更大”的模型&#xff0c;而是开始思考如何让模型“更聪明地做事”。尤其是在教育、编程辅助和算法训练这类高…

作者头像 李华
网站建设 2026/2/3 4:49:41

推三返一单品商城抖音快手微信小程序看广告流量主开源

② 分享即得 - 微信小程序介绍 项目概述 这是一个创新的社交购物小程序&#xff0c;通过"分享返现"模式&#xff0c;让用户邀请好友购买即可获得全额返现&#xff0c;实现免费获得心仪商品。核心功能 1. 精选商城 商品展示展示多款高性价比智能硬件产品包括&#xff…

作者头像 李华
网站建设 2026/2/3 14:28:12

金融-央行数字货币:离线交易安全性测试

央行数字货币&#xff08;CBDC&#xff09;作为数字化法定货币的代表&#xff0c;正迅速重塑全球金融体系。其中&#xff0c;离线交易功能——即在无网络连接环境下完成支付——是CBDC的关键优势&#xff0c;但也是安全风险的温床。对于软件测试从业者而言&#xff0c;确保离线…

作者头像 李华
网站建设 2026/2/1 5:39:09

《计算机网络》深入学:虚电路

在计算机网络的分组交换&#xff08;Packet Switching&#xff09;技术中&#xff0c;存在两种基本的网络层架构设计思路&#xff1a;数据报网络&#xff08;Datagram Network&#xff09;与虚电路网络&#xff08;Virtual Circuit Network&#xff09;。虽然现代互联网&#x…

作者头像 李华
网站建设 2026/1/31 20:57:58

日志监控体系搭建:跟踪推理请求状态与性能指标

日志监控体系搭建&#xff1a;跟踪推理请求状态与性能指标 在 AI 模型加速落地生产环境的今天&#xff0c;一个尖锐的问题摆在工程团队面前&#xff1a;我们如何知道模型“跑得好不好”&#xff1f;尤其是在部署像 VibeThinker-1.5B-APP 这类专精于数学与算法推理的小参数模型时…

作者头像 李华