YOLOv5 监控工地工人是否戴安全帽

在工地监控场景下,可以使用海康威视的SDK(如海康威视的 HCNetSDK 或 OpenCV 的流媒体读取功能)读取摄像头视频流,然后结合 YOLOv5 模型进行安全帽检测。以下是一个完整的流程和实现思路:

实现步骤:

  1. 连接摄像头:通过 OpenCV 和海康威视摄像头的 RTSP 流链接获取实时画面。
  2. 安全帽检测:使用 YOLOv5 模型检测画面中的人员,并判断是否戴有安全帽。
  3. 警报和视频录制:如果检测到工人没有戴安全帽,触发警报,记录20秒的前后视频,并发送通知。
  4. 日志记录:打印并保存检测到的报警日志信息。

代码实现

在代码实现中,我们将使用 OpenCV 读取摄像头视频流,使用 YOLOv5 模型进行安全帽检测,记录未佩戴安全帽的事件并保存视频。

import cv2
import torch
import time
import datetime
import os

# 海康威视摄像头的RTSP流地址
CAMERA_STREAMS = [
    "rtsp://username:password@camera_ip1:554/Streaming/Channels/1",
    "rtsp://username:password@camera_ip2:554/Streaming/Channels/1",
    "rtsp://username:password@camera_ip3:554/Streaming/Channels/1"
]

# 加载YOLOv5模型用于安全帽检测(此模型需要包含“person”和“helmet”类别)
model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)

# 设定录像参数
VIDEO_DURATION = 20  # 秒
ALERT_DURATION = 5   # 每次警报间隔
output_dir = "alerts_videos"
os.makedirs(output_dir, exist_ok=True)

# 用于记录上次警报时间
last_alert_time = time.time() - ALERT_DURATION

def detect_helmet(frame):
    """
    检测是否有未佩戴安全帽的工人
    """
    results = model(frame)
    no_helmet_workers = []

    for det in results.xyxy[0]:  # 遍历检测结果
        x1, y1, x2, y2, conf, cls = det
        label = results.names[int(cls)]
        if label == 'person':
            # 过滤出未戴安全帽的人员
            if not any(obj[0] == 'helmet' and obj[1] > conf for obj in results.xyxy[0]): 
                no_helmet_workers.append((x1, y1, x2, y2))  # 存储未戴帽人员坐标
    return no_helmet_workers

def record_and_alert(cap, camera_id):
    """
    录制20秒的视频并触发警报
    """
    global last_alert_time
    alert_time = time.time()
    if alert_time - last_alert_time < ALERT_DURATION:
        return  # 避免频繁触发警报

    last_alert_time = alert_time
    alert_timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    video_name = os.path.join(output_dir, f"alert_{camera_id}_{alert_timestamp}.avi")

    # 获取视频帧信息
    frame_width, frame_height = int(cap.get(3)), int(cap.get(4))
    out = cv2.VideoWriter(video_name, cv2.VideoWriter_fourcc(*'XVID'), 20, (frame_width, frame_height))

    # 录制20秒视频
    end_time = time.time() + VIDEO_DURATION
    while time.time() < end_time:
        ret, frame = cap.read()
        if ret:
            out.write(frame)
            cv2.putText(frame, "ALERT: No Helmet Detected!", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
            cv2.imshow(f"Camera {camera_id}", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    out.release()
    # 触发警报和日志信息
    print(f"ALERT: No helmet detected on camera {camera_id}. Recorded video saved to {video_name}.")
    # 在实际场景中可以发送通知或调用管理系统 API

def monitor_cameras():
    """
    监控多个摄像头的视频流
    """
    caps = [cv2.VideoCapture(stream) for stream in CAMERA_STREAMS]
    while True:
        for camera_id, cap in enumerate(caps):
            ret, frame = cap.read()
            if not ret:
                print(f"摄像头 {camera_id + 1} 无法读取视频流")
                continue

            no_helmet_workers = detect_helmet(frame)

            # 如果检测到无安全帽的工人
            if no_helmet_workers:
                record_and_alert(cap, camera_id)
            else:
                # 输出正常信息
                cv2.putText(frame, "All workers are wearing helmets", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                cv2.imshow(f"Camera {camera_id}", frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    for cap in caps:
        cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    monitor_cameras()

代码说明

  1. 摄像头读取CAMERA_STREAMS 数组中包含了每个摄像头的 RTSP 流地址。
  2. 安全帽检测detect_helmet 函数使用 YOLO 模型检测画面中的人员,并通过是否检测到安全帽标签来判断是否戴了安全帽。
  3. 录制报警视频record_and_alert 函数用于录制20秒的视频片段,并保存为 .avi 文件。
  4. 实时监控和警报触发monitor_cameras 函数持续监控摄像头画面。如果检测到未佩戴安全帽的工人,记录视频并打印报警信息。

注意事项

  • 模型选择:默认的 yolov5s 模型不一定包含安全帽标签。如果需要精确的检测效果,建议使用专门训练过包含“person”和“helmet”类别的数据集模型。
  • 流式处理效率:YOLOv5 模型可能会导致帧率下降。如果需要实时性,可考虑使用轻量化模型(如 yolov5n)或更高性能的硬件。
  • 系统集成:在生产环境中,警报可通过消息系统或邮件通知实时推送给管理人员。

这样,监控系统可以实现对未佩戴安全帽工人的自动检测和报警。

为者常成,行者常至