news 2026/4/18 1:51:16

编写店铺客流热力分析程序,基于监控视频提取客流数据,生成店内热力图,识别高峰时段与热力区域,提供货架摆放优化建议。

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
编写店铺客流热力分析程序,基于监控视频提取客流数据,生成店内热力图,识别高峰时段与热力区域,提供货架摆放优化建议。

我将为您设计一个基于创新思维与战略管理的店铺客流热力分析程序。这个项目将商业智能分析与战略决策支持相结合。

项目结构

store_heatmap_analysis/

├── main.py # 主程序入口

├── video_processor.py # 视频处理模块

├── heatmap_generator.py # 热力图生成模块

├── analytics_engine.py # 数据分析引擎

├── strategic_advisor.py # 战略建议模块

├── data/

│ ├── sample_video.mp4

│ ├── customer_data.csv

│ └── store_layout.json

├── output/

│ ├── heatmaps/

│ ├── reports/

│ └── recommendations/

├── utils/

│ ├── config.py

│ └── helpers.py

├── README.md

└── requirements.txt

1. 依赖文件 (requirements.txt)

opencv-python==4.8.1.78

numpy==1.24.3

matplotlib==3.7.2

seaborn==0.12.2

pandas==2.0.3

pillow==10.0.1

scikit-learn==1.3.0

plotly==5.15.0

scipy==1.11.2

2. 配置文件 (utils/config.py)

"""

配置管理模块

基于战略管理中的标准化原则,统一管理项目配置

"""

import os

from datetime import timedelta

class Config:

# 基础路径配置

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

DATA_DIR = os.path.join(BASE_DIR, 'data')

OUTPUT_DIR = os.path.join(BASE_DIR, 'output')

HEATMAP_DIR = os.path.join(OUTPUT_DIR, 'heatmaps')

REPORT_DIR = os.path.join(OUTPUT_DIR, 'reports')

RECOMMENDATION_DIR = os.path.join(OUTPUT_DIR, 'recommendations')

# 视频处理配置

FRAME_SAMPLE_RATE = 30 # 每30帧采样一次

DETECTION_CONFIDENCE = 0.6

MIN_DETECTION_AREA = 500 # 最小检测区域像素

# 热力图配置

GRID_SIZE = (20, 15) # 将店铺划分为20x15的网格

HEATMAP_COLORS = ['#00FF00', '#FFFF00', '#FF0000'] # 绿->黄->红

OPACITY = 0.7

# 分析时间配置

PEAK_HOUR_THRESHOLD = 0.8 # 超过平均客流80%算作高峰期

TIME_SLOT_MINUTES = 60 # 时间段划分粒度

# 商业策略配置

HIGH_TRAFFIC_ZONE_RATIO = 0.3 # 高流量区域占比

PRODUCT_CATEGORY_WEIGHTS = {

'premium': 2.0, # 高端产品权重

'popular': 1.5, # 热销产品权重

'regular': 1.0, # 常规产品权重

'clearance': 0.8 # 清仓产品权重

}

# 创新分析维度

INNOVATION_METRICS = [

'customer_flow_efficiency',

'space_utilization_rate',

'conversion_opportunity_index',

'strategic_positioning_score'

]

@classmethod

def create_directories(cls):

"""创建必要的目录结构"""

directories = [cls.DATA_DIR, cls.HEATMAP_DIR, cls.REPORT_DIR, cls.RECOMMENDATION_DIR]

for directory in directories:

os.makedirs(directory, exist_ok=True)

@classmethod

def get_time_slots(cls, start_hour=9, end_hour=22):

"""生成时间段列表"""

slots = []

current_time = start_hour

while current_time < end_hour:

slot_end = min(current_time + cls.TIME_SLOT_MINUTES // 60, end_hour)

slots.append(f"{current_time:02d}:00-{slot_end:02d}:00")

current_time = slot_end

return slots

3. 工具函数 (utils/helpers.py)

"""

辅助函数模块

提供通用的工具函数,体现模块化设计思维

"""

import cv2

import numpy as np

import pandas as pd

from datetime import datetime, timedelta

import json

import matplotlib.pyplot as plt

from scipy.spatial.distance import cdist

class DataProcessor:

@staticmethod

def preprocess_frame(frame):

"""

预处理视频帧

创新思维:多步骤预处理提高检测准确性

"""

# 转换为灰度图

gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

# 高斯模糊去噪

blurred = cv2.GaussianBlur(gray, (5, 5), 0)

# 背景减除(简单版本)

# 在实际项目中可以使用更复杂的背景建模算法

return blurred

@staticmethod

def detect_motion_regions(prev_frame, curr_frame, threshold=25):

"""

检测运动区域

基于差分法检测人流

"""

if prev_frame is None or curr_frame is None:

return []

# 计算帧差

frame_diff = cv2.absdiff(prev_frame, curr_frame)

thresh = cv2.threshold(frame_diff, threshold, 255, cv2.THRESH_BINARY)[1]

# 形态学操作去除噪声

kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))

thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)

thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)

# 查找轮廓

contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

motion_regions = []

for contour in contours:

area = cv2.contourArea(contour)

if area > 500: # 过滤小面积噪声

x, y, w, h = cv2.boundingRect(contour)

center_x = x + w // 2

center_y = y + h // 2

motion_regions.append((center_x, center_y, area))

return motion_regions

@staticmethod

def create_store_grid(frame_shape, grid_size):

"""

创建店铺网格系统

战略管理思维:将连续空间离散化便于分析

"""

height, width = frame_shape[:2]

rows, cols = grid_size

grid_cells = []

cell_width = width / cols

cell_height = height / rows

for i in range(rows):

row_cells = []

for j in range(cols):

x1 = int(j * cell_width)

y1 = int(i * cell_height)

x2 = int((j + 1) * cell_width)

y2 = int((i + 1) * cell_height)

row_cells.append({

'bounds': (x1, y1, x2, y2),

'grid_id': f"{i}_{j}",

'coordinates': (i, j)

})

grid_cells.append(row_cells)

return grid_cells

@staticmethod

def calculate_distance_matrix(points):

"""

计算点之间的距离矩阵

用于分析顾客流动路径

"""

if len(points) < 2:

return np.array([])

points_array = np.array(points)

distance_matrix = cdist(points_array, points_array, metric='euclidean')

return distance_matrix

class ReportGenerator:

@staticmethod

def generate_summary_report(analysis_data, output_path):

"""

生成分析报告

体现战略管理中的数据驱动决策理念

"""

report = {

'report_generated': datetime.now().isoformat(),

'summary_statistics': {},

'peak_analysis': {},

'strategic_recommendations': [],

'innovation_insights': {}

}

# 基础统计

if 'hourly_stats' in analysis_data:

hourly_data = analysis_data['hourly_stats']

total_customers = sum(stats['count'] for stats in hourly_data.values())

avg_customers = total_customers / len(hourly_data) if hourly_data else 0

report['summary_statistics'] = {

'total_customers': total_customers,

'average_customers_per_hour': round(avg_customers, 2),

'peak_hour': max(hourly_data.items(),

key=lambda x: x[1]['count'],

default=('N/A', {}))[0],

'analysis_period': f"{len(hourly_data)} hours"

}

# 保存报告

with open(output_path, 'w', encoding='utf-8') as f:

json.dump(report, f, ensure_ascii=False, indent=2)

return report

4. 视频处理模块 (video_processor.py)

"""

视频处理模块

基于创新思维的多层次分析方法,从原始视频中提取有价值的客流数据

"""

import cv2

import numpy as np

import pandas as pd

from datetime import datetime, timedelta

from collections import defaultdict, Counter

import os

from utils.config import Config

from utils.helpers import DataProcessor

class VideoProcessor:

def __init__(self, video_path=None):

self.video_path = video_path

self.cap = None

self.frame_count = 0

self.processed_frames = 0

self.store_grid = None

self.customer_trajectories = defaultdict(list)

self.hourly_data = defaultdict(lambda: defaultdict(int))

# 创新指标

self.innovation_metrics = {

'flow_efficiency': [],

'space_utilization': [],

'conversion_zones': []

}

def initialize_video(self, video_path=None):

"""初始化视频捕获"""

if video_path:

self.video_path = video_path

if not self.video_path or not os.path.exists(self.video_path):

raise ValueError(f"视频文件不存在: {self.video_path}")

self.cap = cv2.VideoCapture(self.video_path)

if not self.cap.isOpened():

raise ValueError("无法打开视频文件")

# 获取视频基本信息

self.frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))

self.fps = self.cap.get(cv2.CAP_PROP_FPS)

self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))

self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

print(f"视频信息: {self.width}x{self.height}, {self.fps} FPS, {self.frame_count} 帧")

# 创建店铺网格

self.store_grid = DataProcessor.create_store_grid(

(self.height, self.width), Config.GRID_SIZE

)

return True

def process_video(self, progress_callback=None):

"""

处理视频并提取客流数据

创新思维:分层处理,从原始数据到结构化信息

"""

if not self.cap:

self.initialize_video()

prev_frame = None

current_time = 0

frame_interval = Config.FRAME_SAMPLE_RATE

print("开始处理视频...")

for frame_num in range(0, self.frame_count, frame_interval):

self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)

ret, frame = self.cap.read()

if not ret:

continue

# 更新进度

if progress_callback and frame_num % (frame_interval * 10) == 0:

progress = (frame_num / self.frame_count) * 100

progress_callback(progress)

# 预处理帧

processed_frame = DataProcessor.preprocess_frame(frame)

# 检测运动区域

motion_regions = DataProcessor.detect_motion_regions(prev_frame, processed_frame)

if motion_regions:

# 分析时间

timestamp = frame_num / self.fps

dt_object = datetime(2024, 1, 1, 9, 0) + timedelta(seconds=timestamp)

hour_key = dt_object.strftime("%H:00")

# 更新网格数据

self._update_grid_data(motion_regions, hour_key, dt_object)

# 记录轨迹

self._record_trajectories(motion_regions, dt_object)

prev_frame = processed_frame

self.processed_frames += 1

print(f"视频处理完成,共处理 {self.processed_frames} 个采样帧")

return self._compile_analysis_data()

def _update_grid_data(self, regions, hour_key, timestamp):

"""更新网格统计数据"""

for region in regions:

x, y, area = region

# 确定所在网格

grid_i = min(int(y / (self.height / Config.GRID_SIZE[0])), Config.GRID_SIZE[0] - 1)

grid_j = min(int(x / (self.width / Config.GRID_SIZE[1])), Config.GRID_SIZE[1] - 1)

grid_id = f"{grid_i}_{grid_j}"

# 更新计数

self.hourly_data[hour_key][grid_id] += 1

def _record_trajectories(self, regions, timestamp):

"""记录顾客轨迹"""

for region in regions:

x, y, area = region

time_key = timestamp.strftime("%H:%M")

self.customer_trajectories[time_key].append((x, y))

def _compile_analysis_data(self):

"""编译分析数据"""

analysis_data = {

'video_info': {

'path': self.video_path,

'duration': self.frame_count / self.fps,

'resolution': f"{self.width}x{self.height}",

'processed_frames': self.processed_frames

},

'hourly_stats': dict(self.hourly_data),

'trajectory_summary': {

'total_time_points': len(self.customer_trajectories),

'peak_traffic_minute': max(self.customer_trajectories.items(),

key=lambda x: len(x[1]),

default=('N/A', []))[0]

},

'grid_statistics': self._calculate_grid_statistics()

}

return analysis_data

def _calculate_grid_statistics(self):

"""计算网格统计信息"""

grid_stats = {}

for hour, grid_data in self.hourly_data.items():

total_count = sum(grid_data.values())

if total_count > 0:

# 找出最热的网格

hottest_grid = max(grid_data.items(), key=lambda x: x[1])

grid_stats[hour] = {

'total_customers': total_count,

'hottest_grid': hottest_grid[0],

'hottest_count': hottest_grid[1],

'avg_per_grid': total_count / len(grid_data) if grid_data else 0

}

return grid_stats

def identify_peak_hours(self, threshold_ratio=None):

"""识别高峰时段"""

if threshold_ratio is None:

threshold_ratio = Config.PEAK_HOUR_THRESHOLD

if not self.hourly_data:

return []

# 计算每小时总客流

hourly_totals = {}

for hour, grid_data in self.hourly_data.items():

hourly_totals[hour] = sum(grid_data.values())

if not hourly_totals:

return []

# 计算阈值

avg_traffic = sum(hourly_totals.values()) / len(hourly_totals)

threshold = avg_traffic * threshold_ratio

# 识别高峰时段

peak_hours = [hour for hour, count in hourly_totals.items()

if count >= threshold]

return sorted(peak_hours, key=lambda x: hourly_totals[x], reverse=True)

def get_popular_zones(self, top_n=5):

"""获取热门区域"""

zone_counts = defaultdict(int)

for hour_data in self.hourly_data.values():

for grid_id, count in hour_data.items():

zone_counts[grid_id] += count

# 排序并返回前N个热门区域

popular_zones = sorted(zone_counts.items(), key=lambda x: x[1], reverse=True)

return popular_zones[:top_n]

def close(self):

"""释放资源"""

if self.cap:

self.cap.release()

5. 热力图生成模块 (heatmap_generator.py)

"""

热力图生成模块

基于数据分析结果生成直观的可视化热力图

体现创新思维的数据可视化呈现

"""

import cv2

import numpy as np

import matplotlib.pyplot as plt

import seaborn as sns

from matplotlib.colors import LinearSegmentedColormap

import plotly.graph_objects as go

import plotly.express as px

from datetime import datetime

from utils.config import Config

from utils.helpers import DataProcessor

class HeatmapGenerator:

def __init__(self):

self.heatmap_data = None

self.store_layout = None

self.time_series_data = []

def generate_static_heatmap(self, analysis_data, time_slot=None, save_path=None):

"""

生成静态热力图

创新思维:多层次可视化展示

"""

if time_slot and time_slot in analysis_data.get('hourly_stats', {}):

# 生成特定时间的热力图

grid_data = analysis_data['hourly_stats'][time_slot]

else:

# 生成总体热力图

grid_data = self._aggregate_all_data(analysis_data)

if not grid_data:

print("没有数据可生成热力图")

return None

# 创建热力图矩阵

heatmap_matrix = self._create_heatmap_matrix(grid_data)

# 使用matplotlib生成热力图

fig, ax = plt.subplots(figsize=(12, 8))

# 自定义颜色映射

colors = Config.HEATMAP_COLORS

cmap = LinearSegmentedColormap.from_list("custom_heatmap", colors, N=256)

# 绘制热力图

sns.heatmap(heatmap_matrix, annot=True, fmt='.0f', cmap=cmap,

ax=ax, cbar_kws={'label': '客流密度'})

ax.set_title(f'店铺客流热力图{" - " + time_slot if time_slot else ""}',

fontsize=16, fontweight='bold')

ax.set_xlabel('店铺宽度方向', fontsize=12)

ax.set_ylabel('店铺深度方向', fontsize=12)

plt.tight_layout()

if save_path:

plt.savefig(save_path, dpi=300, bbox_inches='tight')

print(f"热力图已保存到: {save_path}")

return fig

def generate_interactive_heatmap(self, analysis_data, save_path=None):

"""

生成交互式热力图

创新亮点:使用Plotly提供交互体验

"""

# 准备数据

grid_data = self._prepare_interactive_data(analysis_data)

if not grid_data:

return None

# 创建DataFrame

df = pd.DataFrame(grid_data)

# 使用Plotly生成交互式热力图

fig = px.imshow(df.pivot(index='row', columns='col', values='density'),

title='交互式店铺客流热力图',

color_continuous_scale=['green', 'yellow', 'red'],

labels={'color': '客流密度'})

fig.update_layout(width=800, height=600)

if save_path:

fig.write_html(save_path)

print(f"交互式热力图已保存到: {save_path}")

return fig

def generate_timeline_heatmap(self, analysis_data, save_path=None):

"""

生成时间轴热力图

战略管理视角:展示客流随时间变化趋势

"""

hourly_data = analysis_data.get('hourly_stats', {})

if not hourly_data:

return None

# 准备时间轴数据

time_labels = sorted(hourly_data.keys())

grid_ids = set()

for hour_data in hourly_data.values():

grid_ids.update(hour_data.keys())

grid_ids = sorted(list(grid_ids))

# 创建时间-网格矩阵

timeline_matrix = np.zeros((len(time_labels), len(grid_ids)))

for i, time_label in enumerate(time_labels):

for j, grid_id in enumerate(grid_ids):

timeline_matrix[i, j] = hourly_data[time_label].get(grid_id, 0)

# 生成热力图

fig, ax = plt.subplots(figsize=(15, 8))

im = ax.imshow(timeline_matrix, cmap='YlOrRd', aspect='auto')

# 设置标签

ax.set_xticks(range(len(grid_ids)))

ax.set_xticklabels(grid_ids, rotation=45)

ax.set_yticks(range(len(time_labels)))

ax.set_yticklabels(time_labels)

ax.set_title('客流时间轴热力图', fontsize=16, fontweight='bold')

ax.set_xlabel('店铺网格', fontsize=12)

ax.set_ylabel('时间段', fontsize=12)

# 添加颜色条

plt.colorbar(im, ax=ax, label='客流数量')

plt.tight_layout()

if save_path:

plt.savefig(save_path, dpi=300, bbox_inches='tight')

print(f"时间轴热力图已保存到: {save_path}")

return fig

def _aggregate_all_data(self, analysis_data):

"""聚合所有时间的数据"""

aggregated = defaultdict(int)

for hour_data in analysis_data.get('hourly_stats', {}).values():

for grid_id, count in hour_data.items():

aggregated[grid_id] += count

return dict(aggregated)

def _create_heatmap_matrix(self, grid_data):

"""创建热力图矩阵"""

rows, cols = Config.GRID_SIZE

matrix = np.zeros((rows, cols))

for grid_id, count in grid_data.items():

try:

i, j = map(int, grid_id.split('_'))

if 0 <= i < rows and 0 <= j < cols:

matrix[i, j] = count

except (ValueError, IndexError):

continue

return matrix

def _prepare_interactive_data(self, analysis_data):

"""准备交互式数据"""

grid_data = self._aggregate_all_data(analysis_data)

interactive_data = []

for grid_id, density in grid_data.items():

try:

i, j = map(int, grid_id.split('_'))

interactive_data.append({

'row': i,

'col': j,

'grid_id': grid_id,

'density': density

})

except (ValueError, IndexError):

continue

return interactive_data

def generate_comparison_heatmap(self, before_data, after_data, save_path=None):

"""

生成对比

关注我,有更多实用程序等着你!

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/17 16:58:56

《俄罗斯方块》为何让人停不下来?——从认知心理学、神经科学到游戏设计的深度解析

摘要&#xff1a; 本文系统剖析经典游戏《俄罗斯方块》&#xff08;Tetris&#xff09;为何能跨越40年仍具强大吸引力。文章从认知负荷理论、心流模型、蔡格尼克效应、多巴胺反馈机制、视觉工作记忆、空间推理训练等心理学与神经科学视角出发&#xff0c;结合游戏机制设计、难度…

作者头像 李华
网站建设 2026/4/17 15:18:56

C++函数与string对象、array对象及递归详解

C函数与string对象、array对象及递归详解 一、string对象的数组操作 string对象比C风格字符串更灵活&#xff0c;可以像结构体一样进行赋值和传递。以下示例展示了string数组的用法&#xff1a; #include <iostream> #include <string> using namespace std;const …

作者头像 李华
网站建设 2026/4/17 21:35:48

Miniconda-Python3.9环境下实现PyTorch模型冷启动优化

Miniconda-Python3.9环境下实现PyTorch模型冷启动优化 在部署深度学习服务时&#xff0c;你是否遇到过这样的场景&#xff1a;系统重启后第一个用户请求响应特别慢&#xff0c;甚至超时&#xff1f;日志显示&#xff0c;并非代码逻辑问题&#xff0c;而是模型加载、依赖初始化等…

作者头像 李华
网站建设 2026/4/17 19:30:28

硬核对决:TruthfulRAG如何运用知识图谱化解RAG知识冲突?

&#x1f4cc; RAG系统的困境 问题的根源&#xff1a;知识冲突 RAG&#xff08;检索增强生成&#xff09;系统中&#xff1a;当外部检索到的知识与模型内部参数化知识不一致时&#xff0c;LLM往往会陷入不知所措。 知识冲突示意图 Figure 1: 知识冲突示意图。现有方法在toke…

作者头像 李华
网站建设 2026/4/16 12:21:25

SpringBoot代码集

一、获取Spring容器对象1.1 实现BeanFactoryAware接口实现BeanFactoryAware接口&#xff0c;然后重写setBeanFactory方法&#xff0c;就能从该方法中获取到Spring容器对象。Service public class PersonService implements BeanFactoryAware {private BeanFactory beanFactory;…

作者头像 李华
网站建设 2026/4/16 12:21:26

2025最新!8个AI论文平台测评:本科生写论文还能这么快?

2025最新&#xff01;8个AI论文平台测评&#xff1a;本科生写论文还能这么快&#xff1f; 2025年AI论文平台测评&#xff1a;为何需要这份榜单&#xff1f; 随着人工智能技术的不断进步&#xff0c;越来越多的本科生开始借助AI工具提升论文写作效率。然而&#xff0c;面对市场上…

作者头像 李华