Gym-like CARLA强化学习环境封装——开源代码学习
开源代码学习
gym-carla
[Task]
- random:无目标驾驶
- roundabout (only for Town03):驶入环岛即停止的无目标驾驶
[Action] => accel: -3.0~3.0, steer: -0.3~0.3 => throttle: 0.0~1.0, brake: 0.0~0.375, steer: -0.3~0.3
def step(self, action): # Calculate acceleration and steering if self.discrete: acc = self.discrete_act[0][action // self.n_steer] steer = self.discrete_act[1][action % self.n_steer] else: acc = action[0] steer = action[1] # Convert acceleration to throttle and brake if acc > 0: throttle = np.clip(acc / 3, 0, 1) brake = 0 else: throttle = 0 brake = np.clip(-acc / 8, 0, 1) # Apply control act = carla.VehicleControl(throttle=float(throttle), steer=float(-steer), brake=float(brake)) self.ego.apply_control(act) self.world.tick() ......
[Observation]
observation_space_dict = { 'camera': spaces.Box(low=0, high=255, shape=(self.obs_size, self.obs_size, 3), dtype=np.uint8), 'lidar': spaces.Box(low=0, high=255, shape=(self.obs_size, self.obs_size, 3), dtype=np.uint8), 'birdeye': spaces.Box(low=0, high=255, shape=(self.obs_size, self.obs_size, 3), dtype=np.uint8),
# 'state': [当前车辆位置相对路径点在路径方向的横向距离,当前车辆方向与路径方向的夹角,当前车辆速度,表示在当前车辆前方的一定距离内是否存在车辆的逻辑项] 'state': spaces.Box(np.array([-2, -1, -5, 0]), np.array([2, 1, 30, 1]), dtype=np.float32) } if self.pixor: observation_space_dict.update({ 'roadmap': spaces.Box(low=0, high=255, shape=(self.obs_size, self.obs_size, 3), dtype=np.uint8), 'vh_clas': spaces.Box(low=0, high=1, shape=(self.pixor_size, self.pixor_size, 1), dtype=np.float32), 'vh_regr': spaces.Box(low=-5, high=5, shape=(self.pixor_size, self.pixor_size, 6), dtype=np.float32), 'pixor_state': spaces.Box(np.array([-1000, -1000, -1, -1, -5]), np.array([1000, 1000, 1, 1, 20]), dtype=np.float32) }) self.observation_space = spaces.Dict(observation_space_dict)
[Reward]
def _get_reward(self): """Calculate the step reward.""" # Reward for speed tracking v = self.ego.get_velocity() speed = np.sqrt(v.x**2 + v.y**2) r_speed = -abs(speed - self.desired_speed) # Reward for collision r_collision = 0 if len(self.collision_hist) > 0: r_collision = -1 # Reward for steering: r_steer = -self.ego.get_control().steer**2 # Reward for out of lane ego_x, ego_y = get_pos(self.ego) dis, w = get_lane_dis(self.waypoints, ego_x, ego_y) r_out = 0 if abs(dis) > self.out_lane_thres: r_out = -1 # Longitudinal speed lspeed = np.array([v.x, v.y]) lspeed_lon = np.dot(lspeed, w) # Cost for too fast r_fast = 0 if lspeed_lon > self.desired_speed: r_fast = -1 # Cost for lateral acceleration r_lat = - abs(self.ego.get_control().steer) * lspeed_lon**2 r = 200 * r_collision + 1 * lspeed_lon + 10 * r_fast + 1 * r_out + 5 * r_steer + 0.2 * r_lat - 0.1 return r
[Done]
def _terminal(self): """Calculate whether to terminate the current episode.""" # Get ego state ego_x, ego_y = get_pos(self.ego) # If collides if len(self.collision_hist) > 0: return True # If reach maximum timestep if self.time_step > self.max_time_episode: return True # If at destination if self.dests is not None: # If at destination for dest in self.dests: if np.sqrt((ego_x - dest[0]) ** 2 + (ego_y - dest[1]) ** 2) < 4: return True # If out of lane dis, _ = get_lane_dis(self.waypoints, ego_x, ego_y) if abs(dis) > self.out_lane_thres: return True return False
Carla-Reinforcement-Learning
[Task] 随机目标的导航
[Action]
# action space self.discrete_controls = discrete_control self.action_space_size = 3 self.action_space_high = np.array([1, 1, 1]) self.action_space_low = np.array([-1, -1, -1]) self.action_space_abs_range = np.maximum(np.abs(self.action_space_low), np.abs(self.action_space_high)) self.steering_strength = 0.35 self.gas_strength = 1.0 self.brake_strength = 0.6 self.actions = {0: [0., 0.], 1: [0., -self.steering_strength], 2: [0., self.steering_strength], 3: [self.gas_strength - 0.15, 0.], 4: [-self.brake_strength, 0], 5: [self.gas_strength - 0.3, -self.steering_strength], 6: [self.gas_strength - 0.3, self.steering_strength], 7: [-self.brake_strength, -self.steering_strength], 8: [-self.brake_strength, self.steering_strength]} self.actions_description = ['NO-OP', 'TURN_LEFT', 'TURN_RIGHT', 'GAS', 'BRAKE', 'GAS_AND_TURN_LEFT', 'GAS_AND_TURN_RIGHT', 'BRAKE_AND_TURN_LEFT', 'BRAKE_AND_TURN_RIGHT'] if discrete_control: self.action_space = Discrete(len(self.actions)) else: self.action_space = Box(low=self.action_space_low, high=self.action_space_high)
[Observation]
self.observation_space = Box(low=-np.inf, high=np.inf, shape=[88, 200, 3]) self.observation = { "img": self.process_image(sensor_data['CameraRGB'].data), "speed": speed, "direction": direction }
[Reward][Done]
# get measurements and observations measurements = [] while type(measurements) == list: try: measurements, sensor_data = self.game.read_data() except: print("Connection to server lost while reading state. Reconnecting...........") # self.game.disconnect() # self.setup_client_and_server(reconnect_client_only=True) import psutil info = psutil.virtual_memory() print("memory used", str(info.percent)) self.close_client_and_server() self.setup_client_and_server(reconnect_client_only=False) scene = self.game.load_settings(self.cur_experiment.conditions) self.positions = scene.player_start_spots self.start_point = self.positions[self.pose[0]] self.end_point = self.positions[self.pose[1]] self.game.start_episode(self.pose[0]) self.done = True current_point = measurements.player_measurements.transform direction = self._get_directions(current_point, self.end_point) speed = measurements.player_measurements.forward_speed self.reward = 0 dist = sldist([current_point.location.x, current_point.location.y], [self.end_point.location.x, self.end_point.location.y]) if direction == 5: # go straight if abs(self.control.steer) > 0.2: self.reward -= 20 self.reward += min(35, speed * 3.6) elif direction == 2: # follow lane self.reward += min(35, speed * 3.6) elif direction == 3: # turn left, steer negative if self.control.steer > 0: self.reward -= 15 if speed * 3.6 <= 20: self.reward += speed * 3.6 else: self.reward += 40 - speed * 3.6 elif direction == 4: # turn right if self.control.steer < 0: self.reward -= 15 if speed * 3.6 <= 20: self.reward += speed * 3.6 else: self.reward += 40 - speed * 3.6 elif direction == 0: self.done = True self.reward += 100 direction = 2 print("success", dist) else: print("error direction") direction = 5 direction -= 2 direction = int(direction) intersection_offroad = measurements.player_measurements.intersection_offroad intersection_otherlane = measurements.player_measurements.intersection_otherlane collision_veh = measurements.player_measurements.collision_vehicles collision_ped = measurements.player_measurements.collision_pedestrians collision_other = measurements.player_measurements.collision_other if intersection_otherlane > 0 or intersection_offroad > 0 or collision_ped > 0 or collision_veh > 0: self.reward -= 100 elif collision_other > 0: self.reward -= 50 if collision_other > 0 or collision_veh > 0 or collision_ped > 0: self.done = True if intersection_offroad > 0.2 or intersection_otherlane > 0.2: self.done = True if speed * 3.6 <= 1: self.nospeed_time += 1 if self.nospeed_time > 100: self.done = True self.reward -= 1 else: self.nospeed_time = 0
Carla-ppo
[Task]
- lab:随机起始点的绕圈
- route:随机起始点与终止点的导航
[Action]
self.action_space = gym.spaces.Box(np.array([-1, 0]), np.array([1, 1]), dtype=np.float32) # steer, throttle # Take action if action is not None: steer, throttle = [float(a) for a in action] # steer, throttle, brake = [float(a) for a in action] self.vehicle.control.steer = self.vehicle.control.steer * self.action_smoothing + steer * (1.0 - self.action_smoothing) self.vehicle.control.throttle = self.vehicle.control.throttle * self.action_smoothing + throttle * (1.0 - self.action_smoothing) # self.vehicle.control.brake = self.vehicle.control.brake * self.action_smoothing + brake * (1.0 - self.action_smoothing)
[Observation]
obs_res = (1280, 720)
self.observation_space = gym.spaces.Box(low=0.0, high=1.0, shape=(*obs_res, 3), dtype=np.float32)
[Reward][Done]
# Terminal on max distance if self.distance_traveled >= self.max_distance: self.terminal_state = True # Call external reward fn self.last_reward = self.reward_fn(self) self.total_reward += self.last_reward self.step_count += 1 # Check for ESC press pygame.event.pump() if pygame.key.get_pressed()[K_ESCAPE]: self.close() self.terminal_state = True
def reward_fn(env): early_termination = False if early_termination: # If speed is less than 1.0 km/h after 5s, stop if time.time() - env.start_t > 5.0 and env.vehicle.get_speed() < 1.0 / 3.6: env.terminal_state = True # If distance from center > 3, stop if env.distance_from_center > 3.0: env.terminal_state = True fwd = vector(env.vehicle.get_velocity()) wp_fwd = vector(env.current_waypoint.transform.rotation.get_forward_vector()) if np.dot(fwd[:2], wp_fwd[:2]) > 0: return env.vehicle.get_speed() return 0