본문 바로가기
AI/LLM

[LLM] LMCache 코드 분석 - prefetch,eviction

by 잔디🌿 2026. 1. 10.

    prefetch

    prefetch는 곧 필요할 kv캐시를 미리 당겨오는 background 작업이다.

     

    시작점은 cache_engine.py 내의 async_lookup_and_prefetch 함수이다.

    이 함수는 토큰을 캐시키로 변환해서 해당 키에 맞는 캐시를 미리 가져오는 역할을 한다.

     

    def async_lookup_and_prefetch(
        lookup_id: str,
        tokens: Optional[Union[torch.Tensor, List[int]]] = None,
        hashes: Optional[List[int]] = None,
        offsets: Optional[List[int]] = None,
        search_range: Optional[List[str]] = None,
        pin: bool = False,
        request_configs: Optional[dict] = None,
    )

    함수 시그니처를 살펴보면

    요청의 id인 lookup_id, 찾아야 할 캐시의 토큰이나 해시를 가지고있는 tokens, hashes

    그리고 토큰 위치 정보를 나타내는 Offsets와 어느 스토리지에서 찾을지 나타내는 search_range, 메모리에 pin할지 여부를 나타내는 pin과 chunk size, policy 같은 추가 설정을 나타내는 request_configs로 이루어져있다.

     

            assert self.storage_manager is not None
    
            keys: list[CacheEngineKey] = []
            cum_chunk_lengths = [0]

    이후 storage_manager가 존재하는지 확인 후, key 생성을 준비한다.

    여기서 cum_chunk_lengths는 각 chunk의 끝 토큰의 위치를 누적한 값이다. (청크들을 구별할 수 있도록 해줌)

     

    for start, end, key in self.token_database.process_tokens(
        tokens=tokens,
        hashes=hashes,
        offsets=offsets,
        request_configs=request_configs,
    ):

    여기서 토큰을 캐시엔진키로 변환한다.

    토큰 시퀀스를 청크 단위로 나눈 후, 해시를 계산해서 offset을 적용 후 cacheEngineKey를 생성한다.

    이후 특정 청크의 시작점, 끝점, 그리고 key값을 반환한다.

    여기서 offset은 토큰의 위치를 나타낸다. kv캐시는 토큰 정보 뿐만 아니라 토큰의 위치도 중요하므로 이와 같이 계산한다.

     

    assert isinstance(key, CacheEngineKey)
    keys.append(key)
    cum_chunk_lengths.append(end)

    이후 다음과같이 만들어진 키를 keys에 저장하고, 토큰의 끝 부분을 chunk_lengths에 추가한다.

     

    asyncio.run_coroutine_threadsafe(
        self.storage_manager.async_lookup_and_prefetch(
            lookup_id, keys, cum_chunk_lengths, search_range, pin
        ),
        self.storage_manager.loop,
    )

    그 다음 coroutine을 비동기 제출한다. 이 함수는 리턴이 없이 비동기로 실행되는데, 실제 로딩은 storage_manager.async_lookup_and_prefetch()에서 실행된다.

     

    coroutine은 비동기 객체이다. 해당 코드에서는 이 객체를 만들어 storage_manager.loop에 맡겨 실행시킨다.

     

    그럼 이제 storage_manager.py 내의 async_lookup_and_prefetch()를 보겠다.

    이 함수는 여러 저장소를 위에서 아래로 보면서 앞쪽부터 몇 chunk가 hit인지 확인하고, hit된 prefix chunk들을 각 tier에서 비동기로 동시에 prefetch 걸어둔 다음 callback로 다음 처리를 이어간다.

     

    순서대로 보자면,

    for backend_name, backend in self.storage_backends.items():
                if search_range and backend_name not in search_range:
                    continue
                num_hit_chunks = await backend.batched_async_contains(lookup_id, keys, pin)
                
                if num_hit_chunks == 0:
                    continue

    일단 각 백엔드에 몇개의 prefix chunk가 있는지 확인한다.

    for문으로 하나씩 탐색하며, tire의 순서는 CreateStorageBackends()에서 결정된다.

    기본적으로 가까운/빠른 버퍼 -> 특수 -> 로컬스토리지 -> 원격 -> 플러그인 순서이다.

     

    keys는 아직 못찾은 chunk들의 키 리스트이다. batched_asynx_contains()에서는 연속된 prefix로 몇 chunk까지 존재하는지를 반환한다. 예를들어 5개 key 중 앞 2개는 존재하고, 3개부터 없으면 num_hit_chunks는 2이다.

    만약 히트한 청크가 없어서 해당 값이 0이라면, 해당 백엔드는 생략한다.

     

                backend_keys = keys[:num_hit_chunks]
                loading_task_keys.append(backend_keys)
    
                assert self.async_serializer is not None, (
                    "Async serializer must be initialized via post_init before using "
                    "async_lookup_and_prefetch."
                )
                # num_hit_chunks is only used for the multi serializer
                get_coro = self.async_serializer.run(
                    backend.batched_get_non_blocking(
                        lookup_id,
                        backend_keys,
                        {"cum_chunk_lengths": cum_chunk_lengths[: num_hit_chunks + 1]},
                    ),
                    num_hit_chunks,
                )
                loading_task = asyncio.create_task(get_coro)
                loading_task.add_done_callback(
                    functools.partial(
                        self.prefetch_single_done_callback,
                        keys=keys,
                        backend_name=backend_name,
                    )
                )
                
                loading_tasks.append(loading_task)
    
                cum_chunk_lengths = cum_chunk_lengths[num_hit_chunks:]
    
                if num_total_hit_chunks == num_total_chunks:
                    break
                keys = keys[num_hit_chunks:]

    여기서는 backend가 가지고있는 캐시 chunk를 비동기로 가져오는 작업을 task로 만들어서 수행하는 부분이다.

    backend.batched_get_non_blocking는 여러 chunk를 한번에 가져오는 비동기 get이다. 

    cun_chunk_lengths는 prefix chunk들에 해당하는 토큰 경계 정보이다. (어디까지 발견했는지)

    이 task를 serializer을 통해서 관리방식으로 실행할 수있도록 한다.

     

    이제 asyncio.create_task를 통해서 prefetch를 실행시킨다. 이 함수는 기다리지 않고 바로 실행된다. 따라서 이를 사용하면 여러개의 backend가 prefetching을 동시에 실행할 수 있다.

     

    이후 callback 함수는 loading task가 완료되거나 실패했을 때 호출된다.

    여기서 몇 chunk 성공했는지, 실패, 성공 여부 등등을 저장한다.

     

    이후, 현재 tire(backend)가 hit된 prefix chunk들을 담당해서 로딩을 걸었으니, 남은 suffix에 해당하는 cum_chunk_lengths로 기준을 옮긴다. (prefetch 맡겼으니 해당 부분은 현재 tire가 할 것이라는 것을 의미, 다음 tire은 그 다음 토큰에 대한 캐시 찾음)

     

     

            if num_total_hit_chunks == 0:
                if self.async_lookup_server is not None:
                    self.async_lookup_server.send_response_to_scheduler(lookup_id, 0)
                return
                
            async def gather_with_keys() -> list[list[tuple[CacheEngineKey, MemoryObj]]]:
                loading_results = await asyncio.gather(*loading_tasks)
                return [
                    list(zip(keys, results, strict=False))
                    for keys, results in zip(
                        loading_task_keys, loading_results, strict=False
                    )
                ]
    
            all_done = asyncio.create_task(gather_with_keys())
            # Register the event before adding the callback to avoid race conditions
            self.event_manager.add_event(
                EventType.LOADING,
                lookup_id,
                all_done,
            )
    
            all_done.add_done_callback(
                lambda future: self.prefetch_all_done_callback(
                    future,
                    lookup_id,
                    cum_chunk_lengths_total,
                    tier_expected_chunks,
                )
            )

    만약 total hit chunks가 0이면 프리패치 할 것이 없으므로 바로 종료한다.

     

    이후 await asyncio.gather(*loading_tasks)를 호출해서 앞서 만든 task가 다 끝날때까지 기다리고, 결과값을 반환받는다. 결과값에는 tire 별로 key들을 결과 memobj와 인덱스로 매칭해서 반환한다.

     

    이 alldone이 끝나면, callback 함수로 실제 로드된 chunk를 계산하고, 몇토큰까지 준비되었는지 저장, 이벤트 해제/종료 등의 마무리를 한다.

     

    해당 코드를 본 이유는 어떤 기준으로 prefetch가 이루어지는지를 보기 위해서였다.

    전체적인 과정은 storage_backend의 iteration 순서로 backend 탐색 -> prefetch(앞 chunk)부터 순차적으로 탐색 -> 비동기적으로 가져옴)

    이다.

     

    따라서 만약 맨 앞의 chunk에 대한 캐시가 없으면 해당 chunk 리스트에 대한 kv캐시는 prefetching이 불가능하다.

     

    eviction

    이제 DRAM에서 캐시를 어떤 기준으로 유지하는지를 확인해보겠다.

    아래 코드는 local_cpu_backend의 코드이다.

     

    여기서 hot cache란, CPU DRAM에 실제로 올라와있는 활성 kv캐시를 의미한다.

        def contains(self, key: CacheEngineKey, pin: bool = False) -> bool:
            with self.cpu_lock:
                if key not in self.hot_cache:
                    return False
                if pin:
                    self.hot_cache[key].pin()
                    # vllm lookup sets pin to True
                    self.keys_in_request.append(key)
                return True
    
        def touch_cache(self):
            # flip the order of the keys in the request
            with self.cpu_lock:
                for key in reversed(self.keys_in_request):
                    self.cache_policy.update_on_hit(key, self.hot_cache)
                self.keys_in_request = []

    contains 함수는 현재 key에 해당하는 cache가 dram에 존재하는지를 확인하는 함수이다. 

    만약 존재하지 않는다면, false를 리턴하고, 존재한다면 pin 여부를 확인하고, 이 값이 true이면 pin처리를 한다.

    pin처리 된 캐시는 eviction에서 제외된다.

     

    touch_cache()는 모아둔 key(keys_in_request)들을 대상으로 LRU recency를 갱신한다.(최근 사용됨으로 바꿈)

    이때, reversed 함수로 뒤에서부터 hit를 갱신하는데, 이는 실제 hit 순서대로 업데이트 하기 위함이다.

     

        def submit_put_task(
            self, key: CacheEngineKey, memory_obj: MemoryObj
        ) -> Optional[Future]:
            """
            Synchronously put the MemoryObj into the local cpu backend.
            """
    
            with self.cpu_lock:
                if key in self.hot_cache:
                    return None
    
                memory_obj.ref_count_up()
                self.hot_cache[key] = memory_obj
    
                self.cache_policy.update_on_put(key)
    
                # Push kv admit msg with batching
                if self.batched_msg_sender is not None:
                    self.batched_msg_sender.add_kv_op(
                        op_type=OpType.ADMIT,
                        key=key.chunk_hash,
                    )
    
            return None

     

    이 함수에서는 memobj를 hot cache에 올린다.

    이후, cache_policy에 업데이트 요청을 해서 LRU 등과 같은 내부 정책을 갱신한다.

     

    이제 할당 과정인 allocate 함수를 보겠다.

            memory_obj = self.memory_allocator.allocate(shape, dtype, fmt)
            if memory_obj is not None or not eviction:
                return memory_obj

    eviction은 할당에 실패했고, eviction이 true일 때 실행된다.

    memory_obj가 none이면, 할당에 실패했다는 뜻이다. 또한 Eviction이 false이면 eviction을 금지한다는 뜻이다.

    즉 이 if 문에 걸려 return으로 함수를 종료하는 경우는, 할당에 성공했거나 eviction이 false라는 이야기이다.

     

            evict_keys_count = 0
            num_attempts = 0
            while True:
                # whether or not this request needs to wait or other requests
                wait_other_requests = True
                if self.use_hot:
                    # TODO(Jiayi): optimize `num_candidates` with estimation.
                    # Accurate estimation is hard due to fragmentation
                    num_candidates = 1
                    evict_keys = None
                    with self.cpu_lock:
                        evict_keys = self.cache_policy.get_evict_candidates(
                            self.hot_cache, num_candidates=num_candidates
                        )
                        if evict_keys:
                            # we can continue trying to evict from the hot_cache
                            # and don't need to wait for other requests yet
                            wait_other_requests = False
                            logger.debug(
                                f"Evicting {len(evict_keys)} chunks from cpu memory"
                            )
                            # remove
                            self.batched_remove(evict_keys, force=False)
                            evict_keys_count += len(evict_keys)
                        else:
                            self.stats_monitor.update_local_cpu_evict_failed_count(
                                num_candidates
                            )

    이후, While문으로 진입한다.

    여기서 num_candidates는 1이다. 이 의미는 fragmentation 때문에 한번에 후보를 1개 뽑아 지우고 다시 allocate한다는 뜻이다.

     

    evict_keys = self.cache_policy.get_evict_candidates(
        self.hot_cache, num_candidates=num_candidates
    )

     

    그 다음 해당 코드로 cache_policy에 맞는 정책을 사용하여 evict_keys(삭제할 키)를 선택한다.

     

    self.batched_remove(evict_keys, force=False)
    evict_keys_count += len(evict_keys)

     

    이후, 위 코드로 선정한 key들의 삭제를 실행한다. 

    만약 삭제 가능한 key들이 있으면 loop를 통해서 다른 동작이 evict 가능하게 풀어줄때까지 기다린다.

     

    여기서 보면 무언가 정해진 policy를 통해서 evict 할 key 목록을 얻어서 삭제한다는 것을 알 수 있다.

     

    그럼 policy에 대해서 더 알아보겠다.

     

            self.cache_policy = get_cache_policy(config.cache_policy)

    해당 파일 상단에는 self.cache_policy를 다음과 같이 정의한다.

     

    config에 있는 policy 값을 파라미터로 전달해서 cache_policy 객체를 얻는다.

    이를 쭉 따라가다보면

    config 파일 내에 있는 _CONFIG_DEFINITIONS를 보면, 기본 값은 LRU임을 알 수 있다. 즉, 기본 poilcy는 LRU이다.

     

    LMCache 파일 내부를 보면, 다음과 같이 정책별로 파일이 있는 것을 볼 수 있다.

     

    LRU는 가장 오래 안쓰인 것 부터 삭제, LFU는 가장 적게 쓰인것 삭제, FIFO는 가장 먼저 들어온 것 삭제, MRU는 가장 최근에 쓰인 것을 삭제한다.

    모든 정책 파일은 추상인터페이스인 base_policy를 구현한다.

     

    LRU

        def update_on_hit(
            self,
            key: KeyType,
            cache_dict: OrderedDict[KeyType, Any],
        ) -> None:
            self.update_chunk_hash_dict(key)
            cache_dict.move_to_end(key)

    이 코드는 Hit 시 순서를 갱신하는 부분이다.

    이 경우, hit했을 떄, 가장 뒤로 보내서 최근 hit 되었음을 표시한다.

     

        def get_evict_candidates(
            self,
            cache_dict: OrderedDict[KeyType, Any],
            num_candidates: int = 1,
        ) -> list[KeyType]:
            evict_keys = []
            for key, cache in cache_dict.items():
                if not cache.can_evict:
                    continue
                evict_keys.append(key)
                if len(evict_keys) == num_candidates:
                    break

    또한 삭제할 key들의 리스트를 보낼때에는, cache_dict의 앞에서부터 꺼내 리스트에 차례로 넣는다.

    이로서 가장 hit된지 오래된 캐시를 차례로 후보에 올릴 수 있게 된다.

     

    MRU

    MRU의 경우, 최근에 참조한 순서대로 삭제되기 때문에 위와 반대로 구현된다.

     

    LFU

    LFU에서는 가장 적게 사용된 것부터 뽑고, 동일빈도 내에서는 FIFO로 처리한다.

    self.freq_to_keys = SortedDict()   # freq -> {key: None} (FIFO dict)
    self.key_to_freq = {}             # key -> freq

    freq_to_keys는 freq 오름차순으로 정렬되어있다.

    def update_on_put(self, key):
        self.key_to_freq[key] = 1
        self.freq_to_keys[1][key] = None

    신규 엔트리는 무조건 1회 사용으로 들어가고, 

     

    def update_on_hit(self, key, cache_dict):
        curr_freq = self.key_to_freq[key]
        self.freq_to_keys[curr_freq].pop(key)
        if not self.freq_to_keys[curr_freq]:
            self.freq_to_keys.pop(curr_freq)
    
        curr_freq += 1
        self.key_to_freq[key] = curr_freq
        self.freq_to_keys[curr_freq][key] = None

    hit 시에는 freq값이 하나씩 증가하게된다. 

    많이 hit 할 수록 freq 값이 커져서 evict 확률이 줄어든다.

     

        def get_evict_candidates(
            self,
            cache_dict: dict[KeyType, Any],
            num_candidates: int = 1,
        ) -> list[KeyType]:
            evict_keys = []
            evict_freqs = []
            for curr_min_freq, fifo_keys in self.freq_to_keys.items():
                for key in fifo_keys:
                    if not cache_dict[key].can_evict:
                        continue
                    evict_keys.append(key)
                    evict_freqs.append(curr_min_freq)
                    self.key_to_freq.pop(key)
                    if len(evict_keys) == num_candidates:
                        break
    
                if len(evict_keys) == num_candidates:
                    break
    
            for freq, key in zip(evict_freqs, evict_keys, strict=False):
                self.freq_to_keys[freq].pop(key)
                if not self.freq_to_keys[freq]:
                    self.freq_to_keys.pop(freq)

    삭제 후보를 전달할 때는 freq가 낮은 순으로 전달하고, 리스트에 넣었으면 해당 key는 pop하여 삭제한다.

     

    FIFO

        def init_mutable_mapping(self) -> dict[KeyType, Any]:
            # NOTE(Jiayi): python dict maintains insertion order.
            return {}

    fifo에서는 다음과 같이 dict가 큐 역할을 한다.

    앞쪽은 오래된 엔트리, 뒤쪽은 최근 들어온 엔트리가 된다.

     

    fifo에서는 cache dict를 그대로 사용하므로, 해당 명령이 와도 별다른 조치를 취하지 않는다.

     

        def get_evict_candidates(
            self,
            cache_dict: dict[KeyType, Any],
            num_candidates: int = 1,
        ) -> list[KeyType]:
            evict_keys = []
            for key, cache in cache_dict.items():
                if not cache.can_evict:
                    continue
                evict_keys.append(key)
                if len(evict_keys) == num_candidates:
                    break
    
            return evict_keys

    또한 삭제할 key의 리스트도 cache_dict 순서대로 그대로 보낸다.

     

     

    이로서 evict 되는 기준은 config 내의 policy에 따라 달라지고, policy는 기본 policy를 구현한 구현체인 lru, fifo... 객체로 생성되어 삭제될 캐시의 우선순위를 관리하는 역할을 수행한다고 볼 수 있다.