YOLOv5 监控工地工人是否戴安全帽
在工地监控场景下,可以使用海康威视的SDK(如海康威视的 HCNetSDK
或 OpenCV 的流媒体读取功能)读取摄像头视频流,然后结合 YOLOv5 模型进行安全帽检测。以下是一个完整的流程和实现思路:
实现步骤:
- 连接摄像头:通过 OpenCV 和海康威视摄像头的 RTSP 流链接获取实时画面。
- 安全帽检测:使用 YOLOv5 模型检测画面中的人员,并判断是否戴有安全帽。
- 警报和视频录制:如果检测到工人没有戴安全帽,触发警报,记录20秒的前后视频,并发送通知。
- 日志记录:打印并保存检测到的报警日志信息。
代码实现
在代码实现中,我们将使用 OpenCV 读取摄像头视频流,使用 YOLOv5 模型进行安全帽检测,记录未佩戴安全帽的事件并保存视频。
import cv2
import torch
import time
import datetime
import os
# 海康威视摄像头的RTSP流地址
CAMERA_STREAMS = [
"rtsp://username:password@camera_ip1:554/Streaming/Channels/1",
"rtsp://username:password@camera_ip2:554/Streaming/Channels/1",
"rtsp://username:password@camera_ip3:554/Streaming/Channels/1"
]
# 加载YOLOv5模型用于安全帽检测(此模型需要包含“person”和“helmet”类别)
model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
# 设定录像参数
VIDEO_DURATION = 20 # 秒
ALERT_DURATION = 5 # 每次警报间隔
output_dir = "alerts_videos"
os.makedirs(output_dir, exist_ok=True)
# 用于记录上次警报时间
last_alert_time = time.time() - ALERT_DURATION
def detect_helmet(frame):
"""
检测是否有未佩戴安全帽的工人
"""
results = model(frame)
no_helmet_workers = []
for det in results.xyxy[0]: # 遍历检测结果
x1, y1, x2, y2, conf, cls = det
label = results.names[int(cls)]
if label == 'person':
# 过滤出未戴安全帽的人员
if not any(obj[0] == 'helmet' and obj[1] > conf for obj in results.xyxy[0]):
no_helmet_workers.append((x1, y1, x2, y2)) # 存储未戴帽人员坐标
return no_helmet_workers
def record_and_alert(cap, camera_id):
"""
录制20秒的视频并触发警报
"""
global last_alert_time
alert_time = time.time()
if alert_time - last_alert_time < ALERT_DURATION:
return # 避免频繁触发警报
last_alert_time = alert_time
alert_timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
video_name = os.path.join(output_dir, f"alert_{camera_id}_{alert_timestamp}.avi")
# 获取视频帧信息
frame_width, frame_height = int(cap.get(3)), int(cap.get(4))
out = cv2.VideoWriter(video_name, cv2.VideoWriter_fourcc(*'XVID'), 20, (frame_width, frame_height))
# 录制20秒视频
end_time = time.time() + VIDEO_DURATION
while time.time() < end_time:
ret, frame = cap.read()
if ret:
out.write(frame)
cv2.putText(frame, "ALERT: No Helmet Detected!", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
cv2.imshow(f"Camera {camera_id}", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
out.release()
# 触发警报和日志信息
print(f"ALERT: No helmet detected on camera {camera_id}. Recorded video saved to {video_name}.")
# 在实际场景中可以发送通知或调用管理系统 API
def monitor_cameras():
"""
监控多个摄像头的视频流
"""
caps = [cv2.VideoCapture(stream) for stream in CAMERA_STREAMS]
while True:
for camera_id, cap in enumerate(caps):
ret, frame = cap.read()
if not ret:
print(f"摄像头 {camera_id + 1} 无法读取视频流")
continue
no_helmet_workers = detect_helmet(frame)
# 如果检测到无安全帽的工人
if no_helmet_workers:
record_and_alert(cap, camera_id)
else:
# 输出正常信息
cv2.putText(frame, "All workers are wearing helmets", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow(f"Camera {camera_id}", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
for cap in caps:
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
monitor_cameras()
代码说明
- 摄像头读取:
CAMERA_STREAMS
数组中包含了每个摄像头的 RTSP 流地址。 - 安全帽检测:
detect_helmet
函数使用 YOLO 模型检测画面中的人员,并通过是否检测到安全帽标签来判断是否戴了安全帽。 - 录制报警视频:
record_and_alert
函数用于录制20秒的视频片段,并保存为.avi
文件。 - 实时监控和警报触发:
monitor_cameras
函数持续监控摄像头画面。如果检测到未佩戴安全帽的工人,记录视频并打印报警信息。
注意事项
- 模型选择:默认的
yolov5s
模型不一定包含安全帽标签。如果需要精确的检测效果,建议使用专门训练过包含“person”和“helmet”类别的数据集模型。 - 流式处理效率:YOLOv5 模型可能会导致帧率下降。如果需要实时性,可考虑使用轻量化模型(如
yolov5n
)或更高性能的硬件。 - 系统集成:在生产环境中,警报可通过消息系统或邮件通知实时推送给管理人员。
这样,监控系统可以实现对未佩戴安全帽工人的自动检测和报警。
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)