从零到端到端自动驾驶实战-第三章-CARLA 仿真与模仿学习

一、启动Carla环境

carla 安装,请参考之前的博文;

(base) lionsking@ai-dev:~$ conda env list

# conda environments:
#
# * -> active
# + -> frozen
base                 *   /home/lionsking/miniconda3
bevformer                /home/lionsking/miniconda3/envs/bevformer
carla                    /home/lionsking/miniconda3/envs/carla
uniad2.0                 /home/lionsking/miniconda3/envs/uniad2.0

(base) lionsking@ai-dev:~$ conda activate carla
(carla) lionsking@ai-dev:~$ cd /home/lionsking/carla
(carla) lionsking@ai-dev:~/carla$ ./CarlaUE4.sh -windowed -ResX=1280 -ResY=720 -quality-level=Epic
4.26.2-0+++UE4+Release-4.26 522 0
Disabling core dumps.

file

你的第一个CARLA脚本:生成一辆车并让它自动开
01_spawn_vehicle.py

import carla, random, time

# 连接到CARLA服务端
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
world = client.get_world()

#从蓝图库中找到Tesla
bp_lib = world.get_blueprint_library()
vehicle_bp = bp_lib.filter('vehicle.tesla.model3')[0]

#随机选一个出生点
spawn_point=random.choice(world.get_map().get_spawn_points())
#生成车辆
vehicle=world.spawn_actor(vehicle_bp,spawn_point)
print(f'✅车辆已生成: {vehicle.type_id}')
#开启自动驾驶(CARLA内置的规则引擎,不是神经网络)
vehicle.set_autopilot(True)
print('✅Autopilot已开启,车辆正在自动行驶...')
#观察30秒
time.sleep(30)
#清理(必须!否则车会留在场景里)
vehicle.destroy()
print('✅车辆已销毁')

代码执行结果:

(carla) lionsking@ai-dev:~/Code/auto_self/carla_ch03$ python 01_spawn_vehicle.py
✅车辆已生成: vehicle.tesla.model3
✅Autopilot已开启,车辆正在自动行驶...
✅车辆已销毁

3.2 绑定摄像头 + 数据采集脚本

现在车能自己开了,但我们需要“看到”它看到的世界。下一步是给车辆安装一个前视摄像头,并收集“图像 + 方向盘/油门”的配对数据。

概念解释:模仿学习(Imitation Learning)
模仿学习的思路很简单:找一个“老司机”开给你看,记录他每一刻看到的画面和做的操作,然后训练神经网络学会“看到这个画面就做这个操作”。在 CARLA 中,“老司机”就是 CARLA 内置的 Autopilot(一个基于规则的自动驾驶引擎)。

完整的数据采集脚本:
02_collect_data.py

# 02_collect_data.py
# 采集 CARLA Autopilot 的驾驶数据:图像 + 方向盘 + 油门
import carla, random, time, os, csv
import numpy as np

# ===== 配置 =====
IMG_WIDTH, IMG_HEIGHT = 640, 480
SAVE_DIR = '/data/projects/carla_data'
NUM_FRAMES = 5000  # 采集 5000 帧
# sudo chmod -R 777 /data/projects/carla_data
os.makedirs(f'{SAVE_DIR}/images', exist_ok=True)

# ===== 连接 CARLA =====
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
world = client.get_world()

# 开启同步模式(确保图像和控制信号帧对齐)
settings = world.get_settings()
settings.synchronous_mode = True
settings.fixed_delta_seconds = 0.05  # 20 FPS
world.apply_settings(settings)

# ===== 生成车辆 =====
bp_lib = world.get_blueprint_library()
vehicle_bp = bp_lib.filter('vehicle.tesla.model3')[0]
spawn_point = random.choice(world.get_map().get_spawn_points())
vehicle = world.spawn_actor(vehicle_bp, spawn_point)
vehicle.set_autopilot(True)

# ===== 安装前视摄像头 =====
camera_bp = bp_lib.find('sensor.camera.rgb')
camera_bp.set_attribute('image_size_x', str(IMG_WIDTH))
camera_bp.set_attribute('image_size_y', str(IMG_HEIGHT))
camera_bp.set_attribute('fov', '110')  # 视场角 110°

# 摄像头位置:车前方 1.5米、高度 2.4米
camera_transform = carla.Transform(
    carla.Location(x=1.5, z=2.4),
    carla.Rotation(pitch=-15)  # 微微俯视
)
camera = world.spawn_actor(camera_bp, camera_transform,
                           attach_to=vehicle)

# ===== 数据采集回调 =====
frame_data = []  # 存储每一帧的数据
current_image = None

def camera_callback(image):
    global current_image
    # 把 CARLA 图像转为 numpy 数组
    array = np.frombuffer(image.raw_data, dtype=np.uint8)
    array = array.reshape((IMG_HEIGHT, IMG_WIDTH, 4))  # BGRA
    current_image = array[:, :, :3]  # 去掉 Alpha 通道

camera.listen(camera_callback)

# ===== 主采集循环 =====
csv_file = open(f'{SAVE_DIR}/labels.csv', 'w', newline='')
writer = csv.writer(csv_file)
writer.writerow(['frame', 'steer', 'throttle', 'brake', 'speed'])

print(f'开始采集 {NUM_FRAMES} 帧数据...')
for frame_id in range(NUM_FRAMES):
    world.tick()  # 同步模式下手动推进一帧

    if current_image is None:
        continue

    # 获取当前控制信号(Autopilot 的操作)
    control = vehicle.get_control()
    velocity = vehicle.get_velocity()
    speed = (velocity.x**2 + velocity.y**2 + velocity.z**2)**0.5

    # 保存图片
    import cv2
    img_path = f'{SAVE_DIR}/images/frame_{frame_id:06d}.jpg'
    cv2.imwrite(img_path, current_image)

    # 保存标签
    writer.writerow([
        frame_id,
        round(control.steer, 4),     # 方向盘 [-1, 1]
        round(control.throttle, 4),   # 油门 [0, 1]
        round(control.brake, 4),      # 刹车 [0, 1]
        round(speed, 2)               # 速度 m/s
            ])

    if frame_id % 500 == 0:
        print(f'  已采集 {frame_id}/{NUM_FRAMES} 帧')

csv_file.close()
camera.stop()
camera.destroy()
vehicle.destroy()
print(f'✅ 采集完成!数据保存在 {SAVE_DIR}')

执行结果:

(carla) lionsking@ai-dev:~/Code/auto_self/carla_ch03$ python 02_collect_data.py 
开始采集 5000 帧数据...
  已采集 500/5000 帧
  已采集 1000/5000 帧
  已采集 1500/5000 帧
  已采集 2000/5000 帧
  已采集 2500/5000 帧
  已采集 3000/5000 帧
  已采集 3500/5000 帧
  已采集 4000/5000 帧
  已采集 4500/5000 帧
✅ 采集完成!数据保存在 /data/projects/carla_data

file

️> 同步模式为什么重要?

异步模式下,CARLA 服务端和 Python 客户端以不同速率运行。你获取的图像和控制信号可能不是同一帧的!同步模式下,服务端会等客户端调用 world.tick() 后才推进一帧,确保数据帧对齐。这对于训练数据的质量至关重要。

3.3 第一个模仿学习模型:ResNet → steer/throttle

3.3.1 模型设计思路

我们的第一个模型非常简单:输入一张摄像头图片,输出方向盘角度和油门深度。用 ResNet-18 作为图像编码器(ImageNet 预训练),后接一个 MLP 头输出控制值。
03_model.py

# 03_model.py
import torch
import torch.nn as nn
import torchvision.models as models

class DrivingModel(nn.Module):
    """
    最简单的端到端驾驶模型:
    输入:RGB 图像 (3, 224, 224)
    输出:steer (-1~1) + throttle (0~1)
    """
    def __init__(self):
        super().__init__()
        # ResNet18:一个超强的图像识别神经网络,18 层。
        # 预训练(pretrained=True):它已经在 1400 万张图片上学过边缘、形状、物体、道路、车、人等所有视觉知识。
        # 用预训练的 ResNet-18 作为视觉编码器
        resnet = models.resnet18(pretrained=True)

        # 去掉最后的分类层,只保留特征提取部分
        # resnet.children():把 ResNet 所有层拆成一个列表, [:-1]:去掉最后一层(最后一层是 1000 类分类器,对你没用)
        # nn.Sequential(...):把剩下的层重新打包
        # 命名 backbone = 主干特征提取器
        # ✅ 去掉最后一层后,ResNet 变成什么?变成一个纯图像特征提取器,输入图片 → 输出 512 维特征向量
        # 把现成 ResNet 的层拆出来再打包
        self.backbone = nn.Sequential(*list(resnet.children())[:-1])

        # 控制头:512维特征 → 2个输出值
        # 自己新建层再打包, 神经网络模块(nn.Module)
        # nn.Sequential 是 PyTorch 内置的函数 / 类, 它创建出来的东西,叫 “模块(Module)”
        self.control_head = nn.Sequential(
            nn.Linear(512, 256), # 第一层:输入512 → 输出256, 作用:把高维特征压缩、提炼关键信息
            nn.ReLU(), # 激活函数:增加非线性, 给网络加非线性能力, 没有它,网络再深也只是线性变换,学不会复杂逻辑
            nn.Dropout(0.3),  ## 随机丢30%神经元,防止过拟合, 防止网络死记硬背训练集(过拟合)
            nn.Linear(256, 64), # 第二层:256 → 64, 让网络专注学习 “转向、油门” 相关的特征
            nn.ReLU(), # 再激活
            nn.Linear(64, 2)  # [steer, throttle], # 输出 2 个值:方向盘、油门
        )

        # 把图像特征 → 变成驾驶动作(输入:512 维特征, 输出:2 个数 → [转向角, 油门])

    def forward(self, image):
        """
        前向传播 forward(数据怎么流)
        这是模型真正运行时的逻辑。

        ---
        3. 去掉之后,后面怎么做?
        流程如下:

        图片 → 预训练 ResNet(去掉最后一层)→ 512 维特征
        512 维特征 → 你的小网络 → 2 个输出值
        用 tanh /sigmoid 限制范围 → 得到方向盘、油门

        这就叫 迁移学习(Transfer Learning)

        ---
        输入图像 (3×224×224)
                ↓
        预训练 ResNet18(去掉最后一层)→ 提取图像特征
                ↓
        512维特征向量
                ↓
        自定义全连接层(控制头)
                ↓
        输出 2 个值
                ↓
        steer: tanh → [-1, 1]
        throttle: sigmoid → [0, 1]

        ---
        六、这个模型的专业名称
        端到端自动驾驶模型 + 迁移学习 + ResNet 主干

        端到端:图像直接输出控制量
        迁移学习:用别人训练好的模型做自己的任务
        Backbone:特征提取主干
        Head:任务头(输出控制指令)

        """
        #(1)图像 → 特征, backbone 就是去掉最后一层的预训练 ResNet
        #    输出形状:批量 B × 512 通道 × 1×1(特征图)
        # (B, 3, 224, 224)= (批量大小, 通道数, 高度, 宽度)
        # 输出: 每张图变成 512 个数字的特征图。
        features = self.backbone(image)  # (B, 512, 1, 1)

        #(2)展平成向量
        # 把 512×1×1 展平成 512 维向量
        features = features.flatten(1)    # (B, 512)

        #(3)送入控制头 → 得到 2 个输出
        output = self.control_head(features)  # (B, 2)

        # 输出范围限制(真实驾驶必须)
        # steer 用 tanh 限制到 [-1, 1]
        # 方向盘 steer:必须在 -1(左)~ 1(右)→ 使用 tanh 自动压缩到 [-1,1]
        steer = torch.tanh(output[:, 0])

        # 油门 throttle:必须在 0(不踩)~ 1(踩满) → 使用 sigmoid 自动压缩到 [0,1]
        # throttle 用 sigmoid 限制到 [0, 1]
        throttle = torch.sigmoid(output[:, 1])
        return steer, throttle

    #-------------------
    """
    # 为什么能写成 self.backbone(image)?
    # 因为 PyTorch 所有网络层,内部都实现了 __call__
    # 只要一个类实现了 __call__,它就能像函数一样使用。这是 Python 规则,不是魔法!
    # 举个最简单的例子(你马上懂)

    class MyFunction:
        def __call__(self, x):
            return x * 2

    obj = MyFunction()
    result = obj(10)  # 能调用!像函数一样!
    print(result)     # 输出 20

    # nn.Sequential 内部就像上面的 MyFunction,它实现了 __call__
    # self.backbone 是对象, 但能像函数一样调用:self.backbone (参数)

    6. 最最最核心总结(你一定能懂)
    **self.backbone 是一个 PyTorch 模块
    所有 PyTorch 模块都能像函数一样调用:模块 (输入)
    调用时必须传输入,因为网络需要输入才能计算
    forward 里的 image 就是这个输入 **

    7. 用 3 行话彻底终结你的困惑
    self.backbone 是神经网络,不是普通变量
    神经网络必须接收输入才能运行
    所以你写 self.backbone(image),把图片传给它

    """
    ###

04_train.py

# 04_train.py
import torch, os, csv
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from model_03 import DrivingModel

class DrivingDataset(Dataset):
    def __init__(self, data_dir):
        self.data_dir = data_dir

        # transforms.Compose = 把多个图像处理步骤「打包成一个流水线」
        # 把「缩放 → 转张量 → 归一化」这三步,合成一个可调用的函数
        # self.transform = transforms.Compose([步骤1, 步骤2, 步骤3])
        # 以后使用时,只需要写:img = self.transform(img)
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)), # Resize(224,224):把图片统一缩成模型能接受的大小
            transforms.ToTensor(), # 转成 PyTorch 能用的张量
            transforms.Normalize([0.485,0.456,0.406],
                                 [0.229,0.224,0.225]) # 使用 ResNet 预训练必须的标准化值, → 必须和预训练模型匹配,否则效果极差
        ])
        # 读取标签 CSV
        self.samples = []
        with open(f'{data_dir}/labels.csv') as f:
            reader = csv.DictReader(f)
            for row in reader:
                self.samples.append(row)

    def __len__(self): return len(self.samples)

    def __getitem__(self, idx):
        s = self.samples[idx]
        # 1,打开图片
        img = Image.open(
            f"{self.data_dir}/images/frame_{int(s['frame']):06d}.jpg"
        ).convert('RGB')

        # 2.预处理
        # 做预处理(Resize → Tensor → Normalize)
        img = self.transform(img)
        steer = float(s['steer'])
        throttle = float(s['throttle'])
        return img, torch.tensor([steer, throttle])

# ===== 训练 =====
dataset = DrivingDataset('/data/projects/carla_data')
loader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=4)
model = DrivingModel().cuda()

# 优化器 + 损失函数
# optimizer = 大脑, 负责:根据误差,调整模型权重, Adam = 最常用、最稳定的优化器
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
# loss_fn = 评分器, SmoothL1Loss = 回归损失
loss_fn = torch.nn.SmoothL1Loss()

for epoch in range(30):
    # 开启训练模式
    model.train()
    total_loss = 0

    # 一批一批读数据
    for images, labels in loader:
        images, labels = images.cuda(), labels.cuda()

        # # 1. 模型前向传播 → 预测
        steer_pred, throttle_pred = model(images)

        # 2. 拼接预测结果 → [steer, throttle]
        pred = torch.stack([steer_pred, throttle_pred], dim=1)

        # 3. 计算误差
        loss = loss_fn(pred, labels)

        # 4. 反向传播 → 改权重
        optimizer.zero_grad() ## 清空上一次的梯度
        loss.backward() ## 反向传播,计算哪里错了
        optimizer.step() # 优化器更新模型,改权重

        total_loss += loss.item()
    avg_loss = total_loss / len(loader)
    print(f'Epoch {epoch+1}/30, Loss: {avg_loss:.4f}')

torch.save(model.state_dict(), 'driving_model.pth')
print('✅ 模型已保存')

模型训练结果:

(carla) lionsking@ai-dev:~/Code/auto_self/carla_ch03$ python 04_train.py 
/home/lionsking/miniconda3/envs/carla/lib/python3.10/site-packages/torchvision/models/_utils.py:208: UserWarning: The parameter 'pretrained' is deprecated since 0.13 and may be removed in the future, please use 'weights' instead.
  warnings.warn(
/home/lionsking/miniconda3/envs/carla/lib/python3.10/site-packages/torchvision/models/_utils.py:223: UserWarning: Arguments other than a weight enum or `None` for 'weights' are deprecated since 0.13 and may be removed in the future. The current behavior is equivalent to passing `weights=ResNet18_Weights.IMAGENET1K_V1`. You can also use `weights=ResNet18_Weights.DEFAULT` to get the most up-to-date weights.
  warnings.warn(msg)
Epoch 1/30, Loss: 0.0071
Epoch 2/30, Loss: 0.0005
Epoch 3/30, Loss: 0.0002
Epoch 4/30, Loss: 0.0001
Epoch 5/30, Loss: 0.0001
Epoch 6/30, Loss: 0.0000
...
Epoch 29/30, Loss: 0.0000
Epoch 30/30, Loss: 0.0000
✅ 模型已保存
(carla) lionsking@ai-dev:~/Code/auto_self/carla_ch03$ 

05_closed_loop_test.py

import carla, time, torch, cv2
import numpy as np
import random

# torchvision.transforms:PyTorch 官方图像预处理工具
# 用途:把摄像头原始图像 → 变成模型能识别的标准格式
from torchvision import transforms
from model_03 import DrivingModel # 导入自己定义的自动驾驶模型类
# gai模型一般是 CNN/ResNet,输入摄像头画面,输出方向盘转角 + 油门

IMG_WIDTH, IMG_HEIGHT = 640, 480

# 加载训练好的模型
model = DrivingModel().cuda() # .cuda():把模型放到 GPU 上运行,加速推理

# 1,torch.load():加载保存好的模型权重文件
# 2,model.load_state_dict():把权重加载到模型里
model.load_state_dict(torch.load('driving_model.pth'))

# model.eval() 切换到评估模式, 关闭训练时的 Dropout / BatchNorm 等随机化层, 保证推理结果稳定、可复现
model.eval()

# 图像预处理(模型输入标准格式)
# Compose:把多个图像处理步骤打包成一个流水线, 用途:统一摄像头图像 → 模型要求的输入格式
transform = transforms.Compose([
    transforms.ToPILImage(), # 把 numpy 格式的图像 → 转成 PIL 图像, 因为 PyTorch 预处理工具优先支持 PIL
    transforms.Resize((224, 224)),# 把图像缩放到 224x224,这是绝大多数图像模型(ResNet/MobileNet)的标准输入尺寸
    transforms.ToTensor(), # 把图像 → 转为 PyTorch 张量(Tensor)
    transforms.Normalize([0.485,0.456,0.406], [0.229,0.224,0.225]) # 形状从 (H, W, 3) → (3, H, W),数值归一化到 0~1, 用途:让你的图像和训练时的数据分布一致,模型预测才准
    # ImageNet 标准归一化, 两个数组分别是:RGB 三通道均值、标准差
])

# ====================== 连接 CARLA 并开启同步模式 ======================
# 连接 CARLA,生成车辆和摄像头(同 02 脚本)
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
world = client.get_world()

# 【修复 1】强制开启同步模式,保证车辆不消失、不卡顿
settings = world.get_settings()
settings.synchronous_mode = True
settings.fixed_delta_seconds = 0.05  # 20 FPS 稳定仿真
world.apply_settings(settings)
world.tick()  # 必须先 tick 一次让设置生效
time.sleep(0.5)

# ====================== 生成车辆(固定在地图起点,你能直接看到) =====================
bp_lib = world.get_blueprint_library()
vehicle_bp = bp_lib.filter('vehicle.tesla.model3')[0]

# 【修复 2】不随机!固定在第一个出生点,保证你能看到车
spawn_points = world.get_map().get_spawn_points()
spawn_point = spawn_points[0]  # 固定位置,不再随机
# spawn_point = random.choice(world.get_map().get_spawn_points())
# vehicle = world.spawn_actor(vehicle_bp, spawn_point)
# vehicle.set_autopilot(True)

# 【修复 3】安全生成车辆,失败会提示,不会崩溃
vehicle = None
for i in range(10):
    try:
        vehicle = world.spawn_actor(vehicle_bp, spawn_point)
        print("✅ 车辆生成成功!")
        break
    except:
        spawn_point = spawn_points[i]
        time.sleep(0.3)

if vehicle is None:
    raise Exception("❌ 车辆生成失败,退出程序")

# ====================== 安装摄像头 + 【修复 4:添加回调函数】 ======================
camera_bp = bp_lib.find('sensor.camera.rgb')
camera_bp.set_attribute('image_size_x', str(IMG_WIDTH))
camera_bp.set_attribute('image_size_y', str(IMG_HEIGHT))
camera_bp.set_attribute('fov', '110')  # 视场角 110°

# 摄像头位置:车前方 1.5米、高度 2.4米
camera_transform = carla.Transform(
    carla.Location(x=1.5, z=2.4),
    carla.Rotation(pitch=-15)  # 微微俯视
)
camera = world.spawn_actor(camera_bp, camera_transform,
                           attach_to=vehicle)

# fix bug:
# 摄像头没有设置回调函数 → current_image 永远是 None → 车辆永远不执行控制
# 你创建了摄像头,但没有写回调接收图像,所以 current_image 永远是 None,车辆永远不动,你也看不到控制效果。
# ===== 闭环控制循环 =====
current_image = None  # 回调更新, 存储最新一帧摄像头图像, 由摄像头回调函数实时更新
# 【关键修复】摄像头回调函数 → 接收图像并赋值给 current_image
def image_callback(image):
    global current_image
    array = np.frombuffer(image.raw_data, dtype=np.uint8)
    array = array.reshape((IMG_HEIGHT, IMG_WIDTH, 4))
    array = array[:, :, :3]  # 去掉 Alpha 通道
    current_image = array

camera.listen(image_callback)  # 启动监听

# ====================== 闭环控制 ======================

collision_count = 0 # 碰撞计数器:记录车辆撞墙 / 撞车次数(代码里没写检测逻辑,只是占位)
total_distance = 0.0 # 总行驶距离

print("开始自动驾驶...")
for step in range(2000):  # 跑 2000 步
    # 让世界往前走一帧,更新所有物体位置、传感器数据
    world.tick() # # 推动仿真

    # 如果还没收到摄像头图像,跳过这一步, 防止程序崩溃
    if current_image is None: 
        continue

    # 模型推理
    # transform(current_image):用流水线预处理图像, .unsqueeze(0):增加 batch 维度
    # 模型输入要求:(batch, 3, 224, 224)
    # .cuda():把图像数据放到 GPU,和模型在一起
    img_tensor = transform(current_image).unsqueeze(0).cuda()

    # 关闭梯度计算, 推理时不需要反向传播,能省显存、提速
    with torch.no_grad():
        steer, throttle = model(img_tensor)

    # ==============================================
    # 【终极修复 1】强制油门 = 0.5 (直接让车跑起来)
    # ==============================================
    # min(float(throttle.item()) * 2000, 1.0)
    actual_throttle = 0.5

    # 用模型输出控制车辆(而不是 Autopilot!)
    vehicle.apply_control(carla.VehicleControl(
        steer=float(steer.item()), # 把模型输出的张量 → 转成 Python 浮点数
        throttle=actual_throttle, # 强制放大油门
        brake=0.0, # 不刹车(你可以扩展:根据速度 / 距离自动刹车)
        hand_brake=False,  # 必须加!松开手刹
        manual_gear_shift=True,  # 手动挡
        gear=1                   # 1挡前进
    ))

    if step % 100 == 0:
        v = vehicle.get_velocity() # 获取车辆 3D 速度向量(x/y/z)
        speed = (v.x**2 + v.y**2 + v.z**2)**0.5 # 计算合速度(m/s), 欧几里得距离公式
        print(f'Step {step}: steer={steer.item():.3f}, '
              f'throttle={throttle.item():.3f}, speed={speed:.1f}m/s')

# ====================== 清理资源 ======================
camera.stop()
camera.destroy()
vehicle.destroy()

# 恢复CARLA设置
settings = world.get_settings()
settings.synchronous_mode = False
world.apply_settings(settings)

print("\n✅ 闭环测试完成!")
print(f'✅ 闭环测试完成!碰撞次数: {collision_count}')

执行记录:

(carla) lionsking@ai-dev:~/Code/auto_self/carla_ch03$ python 05_closed_loop_test.py 
/home/lionsking/miniconda3/envs/carla/lib/python3.10/site-packages/torchvision/models/_utils.py:208: UserWarning: The parameter 'pretrained' is deprecated since 0.13 and may be removed in the future, please use 'weights' instead.
  warnings.warn(
/home/lionsking/miniconda3/envs/carla/lib/python3.10/site-packages/torchvision/models/_utils.py:223: UserWarning: Arguments other than a weight enum or `None` for 'weights' are deprecated since 0.13 and may be removed in the future. The current behavior is equivalent to passing `weights=ResNet18_Weights.IMAGENET1K_V1`. You can also use `weights=ResNet18_Weights.DEFAULT` to get the most up-to-date weights.
  warnings.warn(msg)
✅ 车辆生成成功!
开始自动驾驶...
Step 100: steer=-0.020, throttle=0.010, speed=8.5m/s
Step 200: steer=0.022, throttle=0.005, speed=0.0m/s
Step 300: steer=0.035, throttle=0.004, speed=0.0m/s
Step 400: steer=0.029, throttle=0.004, speed=0.0m/s
Step 500: steer=0.042, throttle=0.003, speed=0.0m/s
Step 600: steer=0.023, throttle=0.002, speed=0.0m/s
Step 700: steer=0.061, throttle=0.002, speed=0.0m/s
Step 800: steer=0.064, throttle=0.003, speed=0.0m/s
Step 900: steer=0.053, throttle=0.003, speed=0.0m/s
Step 1000: steer=0.049, throttle=0.003, speed=0.0m/s
Step 1100: steer=0.028, throttle=0.005, speed=0.0m/s
Step 1200: steer=0.031, throttle=0.005, speed=0.0m/s
Step 1300: steer=0.005, throttle=0.006, speed=0.0m/s
Step 1400: steer=0.004, throttle=0.006, speed=0.0m/s
Step 1500: steer=-0.005, throttle=0.006, speed=0.0m/s
Step 1600: steer=0.021, throttle=0.006, speed=0.0m/s
Step 1700: steer=0.027, throttle=0.004, speed=0.0m/s
Step 1800: steer=0.015, throttle=0.003, speed=0.0m/s
Step 1900: steer=0.034, throttle=0.004, speed=0.0m/s

✅ 闭环测试完成!
✅ 闭环测试完成!碰撞次数: 0
(carla) lionsking@ai-dev:~/Code/auto_self/carla_ch03$ 

新的模拟闭环

import carla
import torch
import cv2
import numpy as np
import time
import random
from torchvision import transforms
from model_03 import DrivingModel

# ===================== 固定尺寸 =====================
IMG_WIDTH, IMG_HEIGHT = 400, 200

# ===================== 视频录制 =====================
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video = cv2.VideoWriter('test_run.mp4', fourcc, 20, (IMG_WIDTH, IMG_HEIGHT))

# ===================== 加载模型 =====================
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
model = DrivingModel().to(DEVICE)

# 直接加载权重,忽略层名不匹配
state_dict = torch.load('driving_model.pth', map_location=DEVICE)
#model.load_state_dict(torch.load('driving_model.pth', map_location=DEVICE))
model.load_state_dict(state_dict, strict=False)  # strict=False 跳过名字不匹配
model.eval()

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) # 
])

# ===================== 连接 CARLA =====================
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
world = client.get_world()

# 同步模式(必须开)
settings = world.get_settings()
settings.synchronous_mode = True
settings.fixed_delta_seconds = 0.05
world.apply_settings(settings)

# 清理旧物体
for actor in world.get_actors().filter('*vehicle*'):
    actor.destroy()
for sensor in world.get_actors().filter('*sensor*'):
    sensor.destroy()

# ===================== 生成车辆 =====================
bp_lib = world.get_blueprint_library()
vehicle_bp = bp_lib.filter('vehicle.tesla.model3')[0]
spawn_point = random.choice(world.get_map().get_spawn_points())
vehicle = world.spawn_actor(vehicle_bp, spawn_point)

# 视角跟随车辆
spectator = world.get_spectator()

# ===================== 摄像头 =====================
camera_bp = bp_lib.find('sensor.camera.rgb')
camera_bp.set_attribute('image_size_x', str(IMG_WIDTH))
camera_bp.set_attribute('image_size_y', str(IMG_HEIGHT))
camera_bp.set_attribute('fov', '110')

camera_transform = carla.Transform(
    carla.Location(x=2.5, z=0.7),
    carla.Rotation(pitch=-15)
)
camera = world.spawn_actor(camera_bp, camera_transform, attach_to=vehicle)

latest_image = None

def camera_callback(image):
    global latest_image
    array = np.frombuffer(image.raw_data, dtype=np.uint8)
    array = array.reshape((IMG_HEIGHT, IMG_WIDTH, 4))[:, :, :3]
    latest_image = array

camera.listen(camera_callback)

# ===================== 主驾驶循环 =====================
total_distance = 0
prev_location = vehicle.get_transform().location

print("开始 AI 驾驶...")

for step in range(1500):
    world.tick()

    if latest_image is None:
        continue

    # 视角跟随
    try:
        v_trans = vehicle.get_transform()
        spectator.set_transform(carla.Transform(
            carla.Location(
                x=v_trans.location.x - 8,
                y=v_trans.location.y,
                z=v_trans.location.z + 4
            ),
            carla.Rotation(pitch=-30)
        ))
    except:
        pass

    # 图像预处理
    img_rgb = cv2.cvtColor(latest_image, cv2.COLOR_BGR2RGB)
    img_tensor = transform(img_rgb).unsqueeze(0).to(DEVICE)

    # 推理
    with torch.no_grad():
        pred = model(img_tensor)[0].cpu().numpy()

    # ===================== 修复:转成原生 float =====================
    steer = float(np.clip(pred[0], -1.0, 1.0))
    #throttle = float(np.clip(pred[1], 0.2, 0.5))
    throttle = 0.4 # 固定油门,车一定动

    # 车辆控制(绝对不报错)
    control = carla.VehicleControl()
    control.steer = steer
    control.throttle = throttle
    control.brake = 0.0
    vehicle.apply_control(control)

    # 计算距离
    now_loc = vehicle.get_transform().location
    total_distance += now_loc.distance(prev_location)
    prev_location = now_loc

    # 显示 + 写视频
    display = latest_image.copy()
    cv2.putText(display, f'Steer: {steer:.2f}', (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
    cv2.putText(display, f'Throttle: {throttle:.1f}', (10, 60),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

    video.write(display)
    #cv2.imshow("AI Driving", display)
    #if cv2.waitKey(1) & 0xFF == ord('q'):
    #    break

# ===================== 释放资源 =====================
video.release()
#cv2.destroyAllWindows()
vehicle.destroy()
camera.destroy()

print(f'行驶距离: {total_distance:.1f} 米')
print("视频已保存:test_run.mp4")

打印日志:

(carla) lionsking@ai-dev:~/Code/auto_self/carla_ch03$ python 06_closed_loop_test.py 
/home/lionsking/miniconda3/envs/carla/lib/python3.10/site-packages/torchvision/models/_utils.py:208: UserWarning: The parameter 'pretrained' is deprecated since 0.13 and may be removed in the future, please use 'weights' instead.
  warnings.warn(
/home/lionsking/miniconda3/envs/carla/lib/python3.10/site-packages/torchvision/models/_utils.py:223: UserWarning: Arguments other than a weight enum or `None` for 'weights' are deprecated since 0.13 and may be removed in the future. The current behavior is equivalent to passing `weights=ResNet18_Weights.IMAGENET1K_V1`. You can also use `weights=ResNet18_Weights.DEFAULT` to get the most up-to-date weights.
  warnings.warn(msg)
开始 AI 驾驶...

file

为者常成,行者常至