본문 바로가기
활동정리/SPG 교육일지

[AWS DeepRacer] 보상함수 프로그래밍 가이드

by 잔디🌿 2024. 8. 25.

    서론

    이번에 내가 교육부장으로 있는 SPG 동아리에서 AWS DeepRacer 대회를 열게 되었다. 임원진으로서 보상함수 짜는 방법을 공부하는데 aws에서 나온 공식 문서는 번역이 제대로 되어있지 않았고, 프로그래밍을 어느정도 하는 사람들을 위한 문서인 것 같았다. 또한 구글링해본 결과 한국말로 된 블로그 글도 거의 없어서 대회에 참가하는 새내기 친구들이나 비전공자분들을 위한 글이 필요하다고 느꼈다.

    사실 나도 보상함수를 처음 공부하는거라 이 글에 오류가 있을수도 있고, 제대로 이해가 안되는 부분이 있을 수 있다. 그래서 이 글을 보고 꼭 공식문서도 함께 보는 것을 추천한다. 

    https://docs.aws.amazon.com/ko_kr/deepracer/latest/developerguide/deepracer-reward-function-input.html

     

    AWS DeepRacer 보상 함수의 입력 파라미터 - AWS DeepRacer

    abs | (var1) - (var2)| = how close the car is to an object, WHEN var1 = ["objects_distance"][index] and var2 = params["progress"]*params["track_length"] 차량 앞에서 가장 가까운 객체와 차량 뒤에서 가장 가까운 객체의 인덱스를

    docs.aws.amazon.com

    *참고*

    위 글에서 잘못 변역된 부분이 조금 있다. 에이전트는 차량, 중간지점은 waypoint인 것을 기억하고 글을 읽으면 훨씬 이해가 쉬울것이다.

    또한 우리 대회에서는 장애물을 쓰지 않기 때문에 object 관련 부분은 제외하고 설명하였다.

     

    밑에서 보여준 예시코드 속의 threshold과 보상 등의값은 예시일 뿐 정답이 아니다. 이 값들은 계속 학습시켜가면서 각자 수정해나가야한다.

     

    파이썬 기초 설명

    aws 보상함수를 짜는 과정에서는 파이썬을 사용한다.

    파이썬을 한번도 사용해보지 않은 분들을 위해서 간단하게 설명하고자 한다.

     

    함수 기본 구조

    아마 이 글을 보는 분들 중에 파이썬이 처음이신 분들이 있을 것이다.

    파이썬의 함수는 

    def 함수명(파라미터) :
       함수 내용....

    위와 같이 구성된다.

     

    def reward_function(params) :
        
        reward = ...
    
        return float(reward)

    위 코드는 보상함수이다. aws에서의 보상함수의 함수명은 reward_function이고, params를 파라미터로 받는다.

     

    딕셔너리

    https://easyitwanner.tistory.com/270

     

    [Python] 파이썬 딕셔너리(Dictionary)란?

    목차 키-값 쌍이란? 딕셔너리(Dictionary) 자주 발생하는 실수 유형 키-값 쌍이란? Python 딕셔너리에서 키와 값은 키-값 쌍을 형성하는 두 가지 주요 구성 요소입니다. 키(key) 키는 딕셔너리에 저장된

    easyitwanner.tistory.com

    보상함수에서 현재 트랙의 정보와 차량의 정보를 딕셔너리 형태로 받는다. 

    딕셔너리는 자바에서 클래스와 비슷한 개념이다.

    자세한 설명을 원하면 위 링크를 참고하자!

     

    params에서 원하는 값 추출하기

    params에서 원하는 값을 추출하여 이에 따라 보상함수 값을 조절해야한다. 

    변수명 = params['키 이름']

    이렇게 하면 해당 변수명의 변수에 우리가 원하는 params속의 값이 들어가게 된다.

     

    params 딕셔너리 객체에는 다음과 같은 키-값 페어가 저장된다.

    {
        "all_wheels_on_track": Boolean,        # flag to indicate if the agent is on the track
        "x": float,                            # agent's x-coordinate in meters
        "y": float,                            # agent's y-coordinate in meters
        "closest_objects": [int, int],         # zero-based indices of the two closest objects to the agent's current position of (x, y).
        "closest_waypoints": [int, int],       # indices of the two nearest waypoints.
        "distance_from_center": float,         # distance in meters from the track center 
        "is_crashed": Boolean,                 # Boolean flag to indicate whether the agent has crashed.
        "is_left_of_center": Boolean,          # Flag to indicate if the agent is on the left side to the track center or not. 
        "is_offtrack": Boolean,                # Boolean flag to indicate whether the agent has gone off track.
        "is_reversed": Boolean,                # flag to indicate if the agent is driving clockwise (True) or counter clockwise (False).
        "heading": float,                      # agent's yaw in degrees
        "objects_distance": [float, ],         # list of the objects' distances in meters between 0 and track_length in relation to the starting line.
        "objects_heading": [float, ],          # list of the objects' headings in degrees between -180 and 180.
        "objects_left_of_center": [Boolean, ], # list of Boolean flags indicating whether elements' objects are left of the center (True) or not (False).
        "objects_location": [(float, float),], # list of object locations [(x,y), ...].
        "objects_speed": [float, ],            # list of the objects' speeds in meters per second.
        "progress": float,                     # percentage of track completed
        "speed": float,                        # agent's speed in meters per second (m/s)
        "steering_angle": float,               # agent's steering angle in degrees
        "steps": int,                          # number steps completed
        "track_length": float,                 # track length in meters.
        "track_width": float,                  # width of the track
        "waypoints": [(float, float), ]        # list of (x,y) as milestones along the track center
    
    }

     

    조건문

    다른 언어에서 else if로 쓰이는 코드를 파이썬에서는 elif라고 쓴다.

     

    주석

    파이썬에서의 주석은 #이다.

    #주석입니다.

     

    강화학습이란

    강화학습은 머신러닝의 한 종류로 어떠한 환경에서 어떠한 행동을 했을 때 보상을 줌으로써 그것이 잘 된 행동인지 잘못된 행동인지 판단하고 반복을 통해 보상을 최대화하는 방향으로 스스로 학습하게 하는 분야이다. 

    우리는 이제 보상을 주는 보상함수를 짤 것이다. 보상함수는 현재 차량과 트랙의 정보를 통해서 현재 잘 된 방향으로 가고있는지, 잘못된 방향으로 가고있는지를  판단하고, 잘 되고 있으면 높은 보상을 잘못되고 있으면 낮은 보상을 줌으로서 딥레이서를 학습시킨다.

     

    우리는 어떤 상태를 잘 가고 있는 상태로 인정할 것인지, 그리고 잘못가고 있다면 정도에 따라 얼마나 낮은 보상을 줄 것인지를 판단하여 프로그래밍하면 된다.

     

    입력 파라미터

     

    all_wheels_on_track

    바퀴가 모드 트랙 경계 사이에 있는지 여부를 알려준다.

    자료형은 참, 거짓이 있는 Boolean형태이다. 바퀴가 모두 트랙 내부에 있으면 true, 하나라도 나가면 false이다.

    def reward_function(params):
        #############################################################################
        '''
        Example of using all_wheels_on_track and speed
        '''
    
        # Read input variables
        all_wheels_on_track = params['all_wheels_on_track']
        speed = params['speed']
    
        # Set the speed threshold based your action space
        SPEED_THRESHOLD = 1.0
    
        if not all_wheels_on_track:
            # Penalize if the car goes off track
            reward = 1e-3
        elif speed < SPEED_THRESHOLD:
            # Penalize if the car goes too slow
            reward = 0.5
        else:
            # High reward if the car stays on track and goes fast
            reward = 1.0
    
        return float(reward)

    위 코드는 all_wheels_on_track를 사용한 보상함수 예제이다.

    위 코드에서 all_wheels_on_track를 사용한 부분은 

    all_wheels_on_track = params['all_wheels_on_track']
    
    if not all_wheels_on_track:
            # Penalize if the car goes off track
            reward = 1e-3

    부분이다.

    파이썬에서 변수는 자료형을 따로 설정해 줄 필요 없이 위처럼 바로 써도 된다.

    all_wheels_on_track 변수에 파라미터에서 all_wheels_on_track을 추출한 값을 넣는다. 그 다음 이 값이 false이면 차가 트랙을 벗어났다는 의미이므로 reward 값을 1e-3으로 설정한다. 1e-3은 0.001로 아주 작은 수다. 차량이 트랙을 벗어난 것은 많이 잘못 가고 있는 것이기 때문에 저렇게 작은 보상을 준다.

     

    waypoints

    공식문서에 closeset_waypoint부터 설명해서 이해가 정말 힘들었다. 또한 번역하는 과정에서 waypoint가 '중간지점'으로 잘못 번역되는 바람에(용어 자체를 번역해버림) 더욱 혼란스러웠다. 이제부터 내가 이해한 부분을 설명해보고자 한다.

     

    위와 같이 트랙은 여러개의 waypoint로 이루어져있다. 이 waypoint는 차량이 따라야 할 경로를 형성하고, 차량이 주행하는 동안 어느 위치에 위치하는지 파악하는 기준이 된다. 또한 차량이 경로를 벗어났을 때 waypoint를 참고하여 다시 트랙으로 진입할 수 있도록 도와준다.

    차량은 1번부터 30번까지 차례로 지나가면서  주행해야한다.

     

    params내의 waypoints는 각 waypoint의 좌표를 나타낸다. 자료형은 [float,float]이다.

    즉 waypoints[1]의 값은 (1번 waypoint의 x좌표, 1번 waypoint의 y좌표)이다. 

     

    heading

    여기서 heading을 설명하는게 조금 뜬금없긴하지만 밑에 코드의 이해를 돕기 위해 미리 설명하겠다. heading은 좌표계의 x축에 대한 차량 진행방향이다. 자료형은 float이다.

    기준은 x좌표이고 이 값에 대한 각도를 나타낸다. 위 경우에는 heading값이 160이다. 

    heading의 범위는 -180에서 180이다. 따라서 180이 넘어가면 181이 아닌 -179이다.

    closest_waypoints

    closest_waypoints는 현재 차량과 가까운 waypoint를 나타낸다. 자료형은 [int, int]이다. 앞의 int는 차량 뒷부분과 가까운 waypoint값이고, 뒤의 int는 차량 앞부분과 가까운 waypoint값이다.

     

    즉 위 그림에서의 closest_waypoint는 (16,17)이다.

    그럼 위 파라미터를 사용한 보상함수 예시를 설명해보겠다.

    # Place import statement outside of function (supported libraries: math, random, numpy, scipy, and shapely)
    # Example imports of available libraries
    #
    # import math
    # import random
    # import numpy
    # import scipy
    # import shapely
    
    import math
    
    def reward_function(params):
        ###############################################################################
        '''
        Example of using waypoints and heading to make the car point in the right direction
        '''
    
        # Read input variables
        waypoints = params['waypoints']
        closest_waypoints = params['closest_waypoints']
        heading = params['heading']
    
        # Initialize the reward with typical value
        reward = 1.0
    
        # Calculate the direction of the center line based on the closest waypoints
        next_point = waypoints[closest_waypoints[1]]
        prev_point = waypoints[closest_waypoints[0]]
    
        # Calculate the direction in radius, arctan2(dy, dx), the result is (-pi, pi) in radians
        track_direction = math.atan2(next_point[1] - prev_point[1], next_point[0] - prev_point[0])
        # Convert to degree
        track_direction = math.degrees(track_direction)
    
        # Calculate the difference between the track direction and the heading direction of the car
        direction_diff = abs(track_direction - heading)
        if direction_diff > 180:
            direction_diff = 360 - direction_diff
    
        # Penalize the reward if the difference is too large
        DIRECTION_THRESHOLD = 10.0
        if direction_diff > DIRECTION_THRESHOLD:
            reward *= 0.5
    
        return float(reward)

    으악 어렵다.

    하나하나 뜯어보도록 하자

     

    waypoints = params['waypoints']
    closest_waypoints = params['closest_waypoints']

    params에서 waypoints를 꺼내서 waypoints 변수에 넣는다. 그 다음 closest_waypoints도 꺼낸다.

     

    next_point = waypoints[closest_waypoints[1]]
    prev_point = waypoints[closest_waypoints[0]]

    prev_point는 차량이 지나간 지점 즉 차량의 뒷부분과 가까운 지점을 나타내고, next_point는 차량이 향하는 지점, 즉 차량의 앞부분과 가까운 지점을 나타낸다. 그래서 각각에 맞는 값을 추출해서 넣어준다.

     

    # Calculate the direction in radius, arctan2(dy, dx), the result is (-pi, pi) in radians
    track_direction = math.atan2(next_point[1] - prev_point[1], next_point[0] - prev_point[0])
    # Convert to degree
    track_direction = math.degrees(track_direction)

    next_point[1]은 차량의 앞부분과 가까운 waypoint의 y좌표를 나타낸다. 나머지것들도 다 어떤 값인지 이해했을 것이라고 생각한다!

    위 함수는 두 지점간의 각도를 계산해서 이를 도 단위로 변환한다. 이는 트랙 중앙선의 방향을 나타낸다. 즉 이 값이 현재 차량이 향해야하는 이상적인 방향이다.

    즉 그림으로 표현하자면 저 파란부분이다. -60정도 되겠다.

     

     direction_diff = abs(track_direction - heading)
        if direction_diff > 180:
            direction_diff = 360 - direction_diff

    이 부분은 차량의 heading과 track_direction의 차이를 계산한다. (abs는 절댓값을 의미)

    만약 180을 넘어가면 트랙과 차량의 방향이 반대라는 의미이므로, 이를 보정해준다. (트랙과 차량의 방향은 0~ 180도 사이!)

     

    DIRECTION_THRESHOLD = 10.0
    if direction_diff > DIRECTION_THRESHOLD:
        reward *= 0.5

    threshold는 프로그래밍에서 주로 임계값을 나타낸다. 즉, threshold값을 넘어가지 않으면 아무런 조치를 취하지 않고, 넘어가면 조치를 취한다 뭐 그런 의미이다. 

    위 코드에서는 임계값을 10으로 잡았다. 만약 차량과 트랙의 각도가 10을 넘어가면 이상적인 방향과 다른 방향으로 가고있다고 판단하여 보상함수 값을 반으로 줄인다.

     

    distance_from_center

    차량 중앙과 트랙 중앙 사이의 길이이다. 자료형은 float이다.

     

    is_crashed

    차량이 다른 객체와 충돌했는지를 나타낸다. 충돌했으면 True, 아니면 False이다. 자료형은 Boolean이다.

     

    is_left_of_center

    차량이 트랙 중앙으로부터 왼쪽에 있으면 True, 오른쪽에 있으면 False이다. 자료형은 Boolean이다.

     

    is_offtrack

    차량이 트랙을 벗어났으면 True, 벗어나지 않았으면 False이다. 자료형은 Boolean이다.

     

    is_reversed

    차량이 시계방향으로 주행하면 True, 반시계방향으로 주행하면 False이다. 자료형은 Boolean이다.

     

    prgress

    주행한 트랙의 비율이다. 범위는 0~100이며, 자료형은 float이다.

     

    speed

    차량의 속도이다. 자료형은 float이다.

     

    steering_angle

    차량의 앞바퀴가 움직이는 방향을 나타낸다. 즉 차량이 오른쪽으로 가고있는지 왼쪽으로 가고있는지를 나타낸다. 이 값이 음수이면 차량이 오른쪽으로 움직이고있다는 의미이고, 양수이면 왼쪽으로 움직이고 있다는 의미이다. 

    범위는 -30에서 +30이고, 자료형은 float이다.

     

    def reward_function(params):
        '''
        Example of using steering angle
        '''
    
        # Read input variable
        abs_steering = abs(params['steering_angle']) # We don't care whether it is left or right steering
    
        # Initialize the reward with typical value
        reward = 1.0
    
        # Penalize if car steer too much to prevent zigzag
        ABS_STEERING_THRESHOLD = 20.0
        if abs_steering > ABS_STEERING_THRESHOLD:
            reward *= 0.8
    
        return float(reward)

    위 함수에서는 steering_angle값에 abs를 씌워서 절댓값으로 만들었다. 이것은 차량이 왼쪽으로 가고있는지, 오른쪽으로 가고있는지는 상관하지 않고, 오직 차가 얼마나 기운 상태로 가느냐만 신경쓰겠다는 의미이다.

    만약에 이 값이 20도보다 크다면 보상함수를 줄인다. 즉, 차가 기울어진 각도가 너무 크면 보상함수를 줄여서 너무 기울어지지 않도록 하는 것이다.

     

    steps

    steps는 현재 차량이 얼마나 많은 행동을 하고 보상을 받았는지를 의미한다. 이는 일정 단계마다 진행률이 예상보다 높을 때 추가 보상을 주거나, 특정 steps에 도달하면 충분하다고 판단하는 등으로 사용할 수 있다. 자료형은 int이다. 

    def reward_function(params):
        #############################################################################
        '''
        Example of using steps and progress
        '''
    
        # Read input variable
        steps = params['steps']
        progress = params['progress']
    
        # Total num of steps we want the car to finish the lap, it will vary depends on the track length
        TOTAL_NUM_STEPS = 300
    
        # Initialize the reward with typical value
        reward = 1.0
    
        # Give additional reward if the car pass every 100 steps faster than expected
        if (steps % 100) == 0 and progress > (steps / TOTAL_NUM_STEPS) * 100 :
            reward += 10.0
    
        return float(reward)

    위 코드에서는 steps가 100단위일 때마다(100으로 나눈 나머지가 0일 때) progress가 현재 진행한 steps 비율보다 높은지 확인하고, 그렇다면 추가 보상을 준다.

     

    track_length

    트랙의 길이를 나타낸다. 자료형은 float이다.

     

    track_width

    트랙의 너비를 나타낸다. 자료형은 float이다.

    def reward_function(params):
        #############################################################################
        '''
        Example of using track width
        '''
    
        # Read input variable
        track_width = params['track_width']
        distance_from_center = params['distance_from_center']
    
        # Calculate the distance from each border
        distance_from_border = 0.5 * track_width - distance_from_center
    
        # Reward higher if the car stays inside the track borders
        if distance_from_border >= 0.05:
            reward = 1.0
        else:
            reward = 1e-3 # Low reward if too close to the border or goes off the track
    
        return float(reward)

    위 함수에서 distance_from_border은 차량이 트랙의 중앙으로부터 얼마나 떨어져있는지를 나타낸다. 만약 이 값이 0.05보다 크면 잘 주행하고 있다는 것이므로 보상을 1로 주고, 만약 0.05보다 작다면 차량이 트랙을 벗어났거나 트랙의 가장자리와 가깝다는 의미이므로 낮은 보상을 준다. 

    이 부분에 이해가 어려울 수 있으므로 그림으로 설명하겠다.

    위 그림에서 파란색 부분이 0.5*track_width이고, 빨간색 부분이 distance_from_center이다. 그래서 우리가 구한 distance_from_border은 저 초록색 부분이다. 저 초록 부분이 0.05보다 작으면 차가 트랙의 끝에 가깝게 주행한다는 의미이므로 보상을 낮게 주는 것이다.

     

    x,y

    트랙이 포함된 시뮬레이션 환경에서 각각 x,y축 상에서의 위치이다. 원점은 왼쪽 하단 모퉁이이다.

     

    참고사항

    waypoint를 사용하기 위해서는 아주 많은 시간 학습시켜야한다. 만약 짧은 시간동안 학습시킨다면 오히려 이를 사용하는 것이 독이 될 수 있다. 

    또한 aws플랫폼을 사용해서 모델을 생성할 때에 트랙을 벗어나는 횟수가 너무 많다 싶으면 그냥 중단하자...

     

    공식문서에 있는 보상함수예제를 잘 참고하기!

     

     

    이렇게 해서 모든 설명이 끝났다. 처음 하면 어려울 수 있지만, 열심히 하다보면 좋은 결과를 얻을 수 있을 것이다!!

    다들 화이팅💪

     

    아 그리고 혹시 이 글에 오류가 있으면 꼭 댓글로 알려주시길 바랍니다👩🏻‍💻

    '활동정리 > SPG 교육일지' 카테고리의 다른 글

    [교육일지] 자바 기초교육 회고  (1) 2024.09.17