Gym-like CARLA强化学习环境封装——开源代码学习


开源代码学习

gym-carla

[Task] 

  • random:无目标驾驶
  • roundabout (only for Town03):驶入环岛即停止的无目标驾驶

[Action] =>  accel: -3.0~3.0, steer: -0.3~0.3 => throttle: 0.0~1.0, brake: 0.0~0.375, steer: -0.3~0.3

def step(self, action):
    # Calculate acceleration and steering
    if self.discrete:
        acc = self.discrete_act[0][action // self.n_steer]
        steer = self.discrete_act[1][action % self.n_steer]
    else:
        acc = action[0]
        steer = action[1]

    # Convert acceleration to throttle and brake
    if acc > 0:
        throttle = np.clip(acc / 3, 0, 1)
        brake = 0
    else:
        throttle = 0
        brake = np.clip(-acc / 8, 0, 1)

    # Apply control
    act = carla.VehicleControl(throttle=float(throttle), steer=float(-steer), brake=float(brake))
    self.ego.apply_control(act)

    self.world.tick()
    ......

[Observation]

observation_space_dict = {
    'camera': spaces.Box(low=0, high=255, shape=(self.obs_size, self.obs_size, 3), dtype=np.uint8),
    'lidar': spaces.Box(low=0, high=255, shape=(self.obs_size, self.obs_size, 3), dtype=np.uint8),
    'birdeye': spaces.Box(low=0, high=255, shape=(self.obs_size, self.obs_size, 3), dtype=np.uint8),
# 'state': [当前车辆位置相对路径点在路径方向的横向距离,当前车辆方向与路径方向的夹角,当前车辆速度,表示在当前车辆前方的一定距离内是否存在车辆的逻辑项]
'state': spaces.Box(np.array([-2, -1, -5, 0]), np.array([2, 1, 30, 1]), dtype=np.float32) } if self.pixor: observation_space_dict.update({ 'roadmap': spaces.Box(low=0, high=255, shape=(self.obs_size, self.obs_size, 3), dtype=np.uint8), 'vh_clas': spaces.Box(low=0, high=1, shape=(self.pixor_size, self.pixor_size, 1), dtype=np.float32), 'vh_regr': spaces.Box(low=-5, high=5, shape=(self.pixor_size, self.pixor_size, 6), dtype=np.float32), 'pixor_state': spaces.Box(np.array([-1000, -1000, -1, -1, -5]), np.array([1000, 1000, 1, 1, 20]), dtype=np.float32) }) self.observation_space = spaces.Dict(observation_space_dict)

[Reward]

def _get_reward(self):
    """Calculate the step reward."""
    # Reward for speed tracking
    v = self.ego.get_velocity()
    speed = np.sqrt(v.x**2 + v.y**2)
    r_speed = -abs(speed - self.desired_speed)
        
    # Reward for collision
    r_collision = 0
    if len(self.collision_hist) > 0:
        r_collision = -1

    # Reward for steering:
    r_steer = -self.ego.get_control().steer**2

    # Reward for out of lane
    ego_x, ego_y = get_pos(self.ego)
    dis, w = get_lane_dis(self.waypoints, ego_x, ego_y)
    r_out = 0
    if abs(dis) > self.out_lane_thres:
        r_out = -1

    # Longitudinal speed
    lspeed = np.array([v.x, v.y])
    lspeed_lon = np.dot(lspeed, w)

    # Cost for too fast
    r_fast = 0
    if lspeed_lon > self.desired_speed:
        r_fast = -1

    # Cost for lateral acceleration
    r_lat = - abs(self.ego.get_control().steer) * lspeed_lon**2

    r = 200 * r_collision + 1 * lspeed_lon + 10 * r_fast + 1 * r_out + 5 * r_steer + 0.2 * r_lat - 0.1

    return r

[Done]

def _terminal(self):
    """Calculate whether to terminate the current episode."""
    # Get ego state
    ego_x, ego_y = get_pos(self.ego)

    # If collides
    if len(self.collision_hist) > 0: 
        return True

    # If reach maximum timestep
    if self.time_step > self.max_time_episode:
        return True

    # If at destination
    if self.dests is not None: # If at destination
        for dest in self.dests:
            if np.sqrt((ego_x - dest[0]) ** 2 + (ego_y - dest[1]) ** 2) < 4:
                return True

    # If out of lane
    dis, _ = get_lane_dis(self.waypoints, ego_x, ego_y)
    if abs(dis) > self.out_lane_thres:
        return True

    return False

Carla-Reinforcement-Learning

[Task] 随机目标的导航

[Action]

# action space
self.discrete_controls = discrete_control
self.action_space_size = 3
self.action_space_high = np.array([1, 1, 1])
self.action_space_low = np.array([-1, -1, -1])
self.action_space_abs_range = np.maximum(np.abs(self.action_space_low), np.abs(self.action_space_high))
self.steering_strength = 0.35
self.gas_strength = 1.0
self.brake_strength = 0.6
self.actions = {0: [0., 0.],
                1: [0., -self.steering_strength],
                2: [0., self.steering_strength],
                3: [self.gas_strength - 0.15, 0.],
                4: [-self.brake_strength, 0],
                5: [self.gas_strength - 0.3, -self.steering_strength],
                6: [self.gas_strength - 0.3, self.steering_strength],
                7: [-self.brake_strength, -self.steering_strength],
                8: [-self.brake_strength, self.steering_strength]}
self.actions_description = ['NO-OP', 'TURN_LEFT', 'TURN_RIGHT', 'GAS', 'BRAKE',
                            'GAS_AND_TURN_LEFT', 'GAS_AND_TURN_RIGHT',
                            'BRAKE_AND_TURN_LEFT', 'BRAKE_AND_TURN_RIGHT']
if discrete_control:
    self.action_space = Discrete(len(self.actions))
else:
    self.action_space = Box(low=self.action_space_low, high=self.action_space_high)

[Observation]

self.observation_space = Box(low=-np.inf, high=np.inf, shape=[88, 200, 3])
self.observation = {
    "img": self.process_image(sensor_data['CameraRGB'].data),
    "speed": speed,
    "direction": direction
}

[Reward][Done]

# get measurements and observations
measurements = []
while type(measurements) == list:
    try:
        measurements, sensor_data = self.game.read_data()
    except:
        print("Connection to server lost while reading state. Reconnecting...........")
        # self.game.disconnect()
        # self.setup_client_and_server(reconnect_client_only=True)
        import psutil
        info = psutil.virtual_memory()
        print("memory used", str(info.percent))

        self.close_client_and_server()
        self.setup_client_and_server(reconnect_client_only=False)

        scene = self.game.load_settings(self.cur_experiment.conditions)
        self.positions = scene.player_start_spots
        self.start_point = self.positions[self.pose[0]]
        self.end_point = self.positions[self.pose[1]]
        self.game.start_episode(self.pose[0])

        self.done = True

current_point = measurements.player_measurements.transform
direction = self._get_directions(current_point, self.end_point)
speed = measurements.player_measurements.forward_speed

self.reward = 0
dist = sldist([current_point.location.x, current_point.location.y], [self.end_point.location.x, self.end_point.location.y])

if direction == 5:               # go straight
    if abs(self.control.steer) > 0.2:
        self.reward -= 20
    self.reward += min(35, speed * 3.6)
elif direction == 2:             # follow lane
    self.reward += min(35, speed * 3.6)
elif direction == 3:             # turn left, steer negative
    if self.control.steer > 0:
        self.reward -= 15
    if speed * 3.6 <= 20:
        self.reward += speed * 3.6
    else:
        self.reward += 40 - speed * 3.6
elif direction == 4:             # turn right
    if self.control.steer < 0:
        self.reward -= 15
    if speed * 3.6 <= 20:
        self.reward += speed * 3.6
    else:
        self.reward += 40 - speed * 3.6
elif direction == 0:
    self.done = True
    self.reward += 100
    direction = 2
    print("success", dist)
else:
    print("error direction")
    direction = 5

direction -= 2
direction = int(direction)

intersection_offroad = measurements.player_measurements.intersection_offroad
intersection_otherlane = measurements.player_measurements.intersection_otherlane
collision_veh = measurements.player_measurements.collision_vehicles
collision_ped = measurements.player_measurements.collision_pedestrians
collision_other = measurements.player_measurements.collision_other

if intersection_otherlane > 0 or intersection_offroad > 0 or collision_ped > 0 or collision_veh > 0:
    self.reward -= 100
elif collision_other > 0:
    self.reward -= 50

if collision_other > 0 or collision_veh > 0 or collision_ped > 0:
    self.done = True
if intersection_offroad > 0.2 or intersection_otherlane > 0.2:
    self.done = True
if speed * 3.6 <= 1:
    self.nospeed_time += 1
    if self.nospeed_time > 100:
        self.done = True
    self.reward -= 1
else:
    self.nospeed_time = 0

Carla-ppo

[Task]

  • lab:随机起始点的绕圈
  • route:随机起始点与终止点的导航

[Action]

self.action_space = gym.spaces.Box(np.array([-1, 0]), np.array([1, 1]), dtype=np.float32) # steer, throttle

# Take action
if action is not None:
steer, throttle = [float(a) for a in action]
# steer, throttle, brake = [float(a) for a in action]
self.vehicle.control.steer = self.vehicle.control.steer * self.action_smoothing + steer * (1.0 - self.action_smoothing)
self.vehicle.control.throttle = self.vehicle.control.throttle * self.action_smoothing + throttle * (1.0 - self.action_smoothing)
# self.vehicle.control.brake = self.vehicle.control.brake * self.action_smoothing + brake * (1.0 - self.action_smoothing)

[Observation]

obs_res = (1280, 720)
self.observation_space = gym.spaces.Box(low=0.0, high=1.0, shape=(*obs_res, 3), dtype=np.float32)

[Reward][Done]

# Terminal on max distance
if self.distance_traveled >= self.max_distance:
    self.terminal_state = True
        
# Call external reward fn
self.last_reward = self.reward_fn(self)
self.total_reward += self.last_reward
self.step_count += 1

# Check for ESC press
pygame.event.pump()
if pygame.key.get_pressed()[K_ESCAPE]:
    self.close()
    self.terminal_state = True
def reward_fn(env):
    early_termination = False
    if early_termination:
        # If speed is less than 1.0 km/h after 5s, stop
        if time.time() - env.start_t > 5.0 and env.vehicle.get_speed() < 1.0 / 3.6:
            env.terminal_state = True

        # If distance from center > 3, stop
        if env.distance_from_center > 3.0:
            env.terminal_state = True
        
    fwd    = vector(env.vehicle.get_velocity())
    wp_fwd = vector(env.current_waypoint.transform.rotation.get_forward_vector())
    if np.dot(fwd[:2], wp_fwd[:2]) > 0:
        return env.vehicle.get_speed()
    return 0