项目核心流程与算法原理视频车流量统计的本质是目标检测Object Detection 目标追踪Object Tracking 越线判定Line Crossing Detection。整个项目的核心流水线Pipeline如下[视频输入]──[背景建模与去噪]──[轮廓检测与筛选]──[质心计算与追踪]──[碰撞线计数]图像预处理与背景建模在一幅交通视频中马路、护栏、树木是静止的背景而车辆是运动的前景。我们采用混合高斯模型MOG2, Mixture of Gaussians动态分离前景。背景减除法Background SubtractionMOG2 会对图像中的每个像素点进行概率建模。当新的一帧进来时如果某个像素的值与历史模型相差较大则判定为前景车辆。形态学处理Morphological Operations背景减除后会产生大量噪点如树叶晃动、光影变化且车辆内部可能出现空洞。我们通过腐蚀Erode消除细小噪点通过膨胀Dilateor闭运算Close填补车辆内部空洞使车辆连成一个完成的连通域。车辆定位与质心提取轮廓外接矩形通过cv2.findContours寻找处理后图像中的所有连通域边缘。为了过滤掉行人和小噪点我们会设定一个面积阈值如Area800Area 800Area800像素只有大于该面积的轮廓才被认定为车辆。质心Centroid计算通过矩Moments几何特征计算出车辆外接矩形的中心点坐标(cx,cy)(cx, cy)(cx,cy)。追踪质心比追踪整辆车要高效得多。车辆追踪与越线计数动态追踪简易数据关联对于前后两帧我们计算当前帧所有新质心与上一帧已有车辆质心之间的欧氏距离。如果距离小于预设阈值如 20 像素则认为这是同一辆车并更新其轨迹。基准线相交判定在画面中人为绘制一条“虚拟计数线”线段YYlineY Y_{line}YYline​。当某辆车的质心在前一帧位于线上方而在当前帧位于线下方或反之则触发计数器count 1。Python 代码实现importcv2importnumpy as npimporttimefrom dataclassesimportdataclass, field from typingimportList, Tuple, Dict, Optionalimportlogging# 配置日志logging.basicConfig(levellogging.INFO,format%(asctime)s - %(levelname)s - %(message)s)loggerlogging.getLogger(__name__)# # 1. 配置类集中管理所有超参数# dataclass class Config:集中管理所有配置参数# 视频配置video_path: strtraffic.mp4# 车辆检测参数min_width: int40min_height: int40max_width: int300# 新增最大宽度过滤max_height: int200# 新增最大高度过滤# 计数线配置line_height_ratio: float0.6# 使用相对位置视频高度的60%line_height: Optional[int]None# 动态计算# 追踪器参数max_tracking_distance: int35# 最大追踪距离像素max_trajectory_length: int60# 轨迹最大长度trajectory_timeout: int45# 轨迹超时帧数# MOG2 背景减除参数mog_history: int500mog_var_threshold: int50mog_detect_shadows: boolTrue# 形态学操作参数erode_kernel_size: Tuple[int, int](3,3)dilate_kernel_size: Tuple[int, int](7,7)erode_iterations: int1dilate_iterations: int3# 显示参数show_debug_windows: boolTrue# 是否显示调试窗口fps_display: boolTrue# 是否显示FPSdraw_trajectories: boolTrue# 是否绘制轨迹trajectory_color: Tuple[int, int, int](0,255,255)# 轨迹颜色黄色# 计数模式count_direction: strbidirectional# downward, upward, bidirectionaldef __post_init__(self):验证配置ifself.count_direction notin[downward,upward,bidirectional]: raise ValueError(f无效的计数方向: {self.count_direction})# # 2. 车辆追踪器类封装追踪逻辑# class VehicleTracker: 改进的车辆追踪器 - 使用平方距离避免开方运算 - 添加轨迹超时清理 - 支持双向计数 def __init__(self, config: Config): self.configconfig self.trajectories: Dict[int, dict]{}# 使用ID字典而非列表self.next_id: int0self.down_counter: int0# 向下计数self.up_counter: int0# 向上计数def update(self, current_centroids: List[Tuple[int, int]], line_y: int)-Tuple[int, int]: 更新追踪状态 Args: current_centroids: 当前帧检测到的车辆质心列表 line_y: 计数线Y坐标 Returns:(向下计数, 向上计数)# 构建旧轨迹的匹配映射matched_old_idsset()new_trajectories{}# 计算平方距离阈值避免开方max_dist_sqself.config.max_tracking_distance **2forcx, cyincurrent_centroids: best_match_idNone best_dist_sqmax_dist_sq# 寻找最佳匹配贪心策略fortraj_id, trajinself.trajectories.items():iftraj_idinmatched_old_ids:continuelast_pointtraj[points][-1]dist_sq(cx - last_point[0])**2 (cy - last_point[1])**2ifdist_sqbest_dist_sq: best_dist_sqdist_sq best_match_idtraj_idifbest_match_id is not None:# 匹配到已有轨迹matched_old_ids.add(best_match_id)trajself.trajectories[best_match_id]traj[points].append((cx,cy))traj[last_frame]len(traj[points])# 检查是否跨越计数线self._check_crossing(traj, line_y)new_trajectories[best_match_id]traj else:# 新建轨迹new_idself.next_id self.next_id1new_traj{points:[(cx, cy)],last_frame:1,counted_down:False,counted_up:False}new_trajectories[new_id]new_traj# 保留未匹配但未满超时的轨迹fortraj_id, trajinself.trajectories.items():iftraj_id notinmatched_old_ids: timeout_framesself.config.trajectory_timeoutiflen(traj[points])timeout_frames: new_trajectories[traj_id]traj# 清理过长的轨迹self.trajectories{tid: tfortid, tinnew_trajectories.items()iflen(t[points])self.config.max_trajectory_length}returnself.down_counter, self.up_counter def _check_crossing(self, traj: dict, line_y: int):检查轨迹是否跨越计数线 pointstraj[points]iflen(points)2:returnp1points[-2]p2points[-1]# 向下穿越从上往下ifself.config.count_directionin[downward,bidirectional]:ifp1[1]line_yp2[1]and not traj[counted_down]: self.down_counter1traj[counted_down]True logger.debug(f车辆向下穿越计数线总数: {self.down_counter})# 向上穿越从下往上ifself.config.count_directionin[upward,bidirectional]:ifp2[1]line_yp1[1]and not traj[counted_up]: self.up_counter1traj[counted_up]True logger.debug(f车辆向上穿越计数线总数: {self.up_counter})def get_total_count(self)-int:获取总计数ifself.config.count_directiondownward:returnself.down_counterelifself.config.count_directionupward:returnself.up_counter else:# bidirectionalreturnself.down_counter self.up_counter def draw_trajectories(self, frame: np.ndarray):在帧上绘制所有轨迹ifnot self.config.draw_trajectories:returnfortraj_id, trajinself.trajectories.items(): pointstraj[points]iflen(points)2:continue# 绘制轨迹线points_arrnp.array(points, np.int32)points_arrpoints_arr.reshape(-1,1,2)cv2.polylines(frame,[points_arr], False, self.config.trajectory_color,1)# # 3. 车辆检测器类封装检测逻辑# class VehicleDetector:封装车辆检测逻辑 def __init__(self, config: Config): self.configconfig self.bg_subtractorcv2.createBackgroundSubtractorMOG2(historyconfig.mog_history,varThresholdconfig.mog_var_threshold,detectShadowsconfig.mog_detect_shadows)self.kernel_erodecv2.getStructuringElement(cv2.MORPH_RECT, config.erode_kernel_size)self.kernel_dilatecv2.getStructuringElement(cv2.MORPH_RECT, config.dilate_kernel_size)def detect(self, frame: np.ndarray)-Tuple[List[Tuple[int, int]], np.ndarray, np.ndarray]: 检测车辆并返回质心 Returns:(质心列表, 前景掩膜, 形态学处理后图像)# 灰度化和高斯模糊graycv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)blurcv2.GaussianBlur(gray,(5,5),0)# 背景减除fg_maskself.bg_subtractor.apply(blur)# 阈值过滤阴影_, threshedcv2.threshold(fg_mask,200,255, cv2.THRESH_BINARY)# 形态学操作erodedcv2.erode(threshed, self.kernel_erode,iterationsself.config.erode_iterations)dilatedcv2.dilate(eroded, self.kernel_dilate,iterationsself.config.dilate_iterations)closingcv2.morphologyEx(dilated, cv2.MORPH_CLOSE, self.kernel_dilate)# 轮廓检测contours, _cv2.findContours(closing, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)# 提取质心centroids[]forcontourincontours: x, y, w, hcv2.boundingRect(contour)# 尺寸过滤if(self.config.min_widthwself.config.max_width and self.config.min_heighthself.config.max_height): cxx w //2cyy h //2centroids.append((cx,cy))# 绘制检测框cv2.rectangle(frame,(x, y),(x w, y h),(0,255,0),2)cv2.circle(frame,(cx, cy),4,(0,0,255), -1)returncentroids, fg_mask, closing# # 4. 主程序类# class TrafficCounter:交通计数主程序 def __init__(self, config: Config): self.configconfig self.detectorVehicleDetector(config)self.trackerVehicleTracker(config)self.fps: float0.0self.frame_count: int0self.start_time: float0.0def run(self):运行主循环 capcv2.VideoCapture(self.config.video_path)ifnot cap.isOpened(): logger.error(f无法打开视频文件: {self.config.video_path})returntry:# 获取视频高度以计算计数线位置frame_heightint(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))ifself.config.line_height is None: self.config.line_heightint(frame_height * self.config.line_height_ratio)logger.info(f开始处理视频计数线Y坐标: {self.config.line_height})self.start_timetime.time()whileTrue: ret, framecap.read()ifnot ret: logger.info(视频播放结束)break# 计算FPSself.frame_count1elapsedtime.time()- self.start_timeifelapsed0: self.fpsself.frame_count / elapsed# 检测车辆centroids, fg_mask, closingself.detector.detect(frame)# 更新追踪down_count, up_countself.tracker.update(centroids, self.config.line_height)total_countself.tracker.get_total_count()# 绘制计数线cv2.line(frame,(0, self.config.line_height),(frame.shape[1], self.config.line_height),(0,0,255),3)# 绘制轨迹self.tracker.draw_trajectories(frame)# 绘制统计信息self._draw_info(frame, total_count, down_count, up_count)# 显示窗口cv2.imshow(Traffic Counter, frame)ifself.config.show_debug_windows: cv2.imshow(Foreground Mask, fg_mask)cv2.imshow(Morphological Closing, closing)# 检查退出键keycv2.waitKey(1)0xFFifkey27:# ESClogger.info(用户按下ESC退出)breakelifkeyord(s):# 截图timestampint(time.time())cv2.imwrite(fscreenshot_{timestamp}.jpg, frame)logger.info(f截图已保存: screenshot_{timestamp}.jpg)except Exception as e: logger.error(f运行时错误: {e},exc_infoTrue)finally: cap.release()cv2.destroyAllWindows()logger.info(f处理完成。总计数: {self.tracker.get_total_count()}, FPS: {self.fps:.2f})def _draw_info(self, frame: np.ndarray, total: int, down: int, up: int):绘制统计信息 y_offset40fontcv2.FONT_HERSHEY_SIMPLEX font_scale0.7thickness2# 标题cv2.putText(frame,TRAFFIC COUNTER,(20, y_offset), font,1.0,(255,255,255),2)y_offset35# 总计数cv2.putText(frame, fTOTAL: {total},(20, y_offset), font, font_scale,(0,255,0), thickness)y_offset25# 分方向计数ifself.config.count_directionbidirectional:cv2.putText(frame, fDOWN: {down},(20, y_offset), font, font_scale,(0,165,255), thickness)y_offset25cv2.putText(frame, fUP: {up},(20, y_offset), font, font_scale,(255,165,0), thickness)y_offset25# FPSifself.config.fps_display: cv2.putText(frame, fFPS: {self.fps:.1f},(20, y_offset), font, font_scale,(255,255,255), thickness)y_offset25# 追踪车辆数cv2.putText(frame, fTRACKING: {len(self.tracker.trajectories)},(20, y_offset), font, font_scale,(255,255,255), thickness)# # 5. 入口函数# def main():主入口函数# 创建配置configConfig(video_pathtraffic.mp4,min_width40,min_height40,max_width300,max_height200,line_height_ratio0.6,max_tracking_distance35,trajectory_timeout45,count_directionbidirectional,# 可选: downward, upward, bidirectionalshow_debug_windowsTrue,draw_trajectoriesTrue,fps_displayTrue)# 运行counterTrafficCounter(config)counter.run()if__name____main__:main()视频流 I/O 与基础控制技术这是所有视觉项目的入口和出口。cv2.VideoCapture视频流读取用于建立视频输入管道。它不仅能读取本地的.mp4或.avi视频文件还可以通过传入0或1调用本地摄像头或者传入rtsp://...连接网络监控摄像头。cv2.waitKey(delay)键盘拦截与帧率控制这是 OpenCV 维持视窗刷新的核心函数。它有两大作用一是控制视频播放的刷新间隔毫秒二是捕捉键盘输入例如代码中的key 27拦截ESC键实现非阻塞式的安全退出。图像空间转换与平滑去噪在进行高级分析前必须降低数据维度并滤除传感器噪声。cv2.cvtColor(..., cv2.COLOR_BGR2GRAY)彩色转灰度将三通道的 BGR 彩色图像转换为单通道的灰度图。由于车辆的运动轮廓只取决于像素的亮度变化与颜色无关转为灰度图可以将计算量直接暴降到原来的13\frac{1}{3}31​极大地提升了算法的实时性。cv2.GaussianBlur高斯模糊图像在传输或拍摄过程中会有很多高频噪点如雪花点、蚊子噪。高斯模糊利用二维高斯分布矩阵核大小如5×55 \times 55×5对图像进行平滑卷积让噪声“融化”到周围像素中防止后面的算法把噪点误判为运动车辆。背景减除与运动前景提取这是传统视觉识别运动目标的核心武器。cv2.createBackgroundSubtractorMOG2混合高斯模型背景减除 传统的帧差法Frame Difference无法解决车辆静止、光照渐变的问题。MOG2 算法为每个像素点建立多个高斯分布动态学习视频的“静止背景”马路、建筑。当车辆驶入时像素值偏离了背景模型从而被精准地剥离为前景掩膜Foreground Mask。cv2.threshold二值化变换 由于 MOG2 在提取前景时会将车身投射的阴影Shadows标记为特殊的灰色灰度值 127我们通过二值化强行将大于 200 灰度值的像素设为 255纯白小于的设为 0纯黑从而干净地剔除了地面阴影的干扰。数学形态学Morphological Operations用于修补二值化图像的缺陷。cv2.getStructuringElement结构元素构建用于定义形态学操作的“画笔”形状和大小如3×33 \times 33×3或7×77 \times 77×7的矩形核。cv2.erode腐蚀让图像中的白色区域向内收缩一圈。其主要作用是“消除散点”将画面中由于树叶晃动或噪点产生的小白点直接抹去。cv2.dilate膨胀让图像中的白色区域向外扩张一圈。其主要作用是“桥接断裂”让同一辆汽车断开的轮廓重新融合成一个整体。cv2.morphologyEx(..., cv2.MORPH_CLOSE)闭运算先膨胀后腐蚀的组合拳。它可以填充车辆内部由于挡风玻璃反光造成的“黑色空洞”并平滑车辆的外边缘。几何特征分析与轮廓提取将像素级别的白色块转换为具有物理意义的坐标。cv2.findContours拓扑轮廓寻找 该算法会扫描整张二值化图像将所有连续的白色像素块的边缘提取出来形成一个无序的几何轮廓列表。cv2.boundingRect最大外接矩形 输入一个不规则的轮廓它会自动计算出能包裹该轮廓的最小正矩形并返回左上角坐标(x, y)以及宽高(w, h)。这是我们进行车辆几何尺寸筛选w MIN_W的数学依据。几何图形绘制与 UI 渲染用于在视频画面上实时呈现算法结果。cv2.line与cv2.rectangle线与矩形绘制在图像矩阵上直接修改像素画出绿色的车辆检测框和红色的虚拟计数线。cv2.circle圆形绘制用于将我们计算出的物理质心(cx,cy)(cx, cy)(cx,cy)以红点的形式标刻在画面中。cv2.putText文本渲染将动态累加的car_counter变量以指定的字体、大小和颜色实时“写”在视频帧的左上角。