LMCache示例¶
本文件夹展示了如何使用LMCache进行解耦预填充、CPU卸载和KV缓存共享。
1. vLLM v1 中的解耦预填充¶
本示例展示了如何在单节点上使用 NIXL 运行带解耦预填充的 LMCache。
前提条件¶
- 安装 LMCache。您可以简单运行
pip install lmcache
。 - 安装 NIXL。
- 至少需要 2 个 GPU
- 用于 Llama 3.1 8B Instruct 的有效 Hugging Face token (HF_TOKEN)。
用法¶
运行 cd disagg_prefill_lmcache_v1
进入 disagg_prefill_lmcache_v1
文件夹,然后运行
以运行解耦预填充并测试性能。
组件¶
服务器脚本¶
disagg_prefill_lmcache_v1/disagg_vllm_launcher.sh
- 启动用于预填充/解码的独立 vLLM 服务器,并启动代理服务器。disagg_prefill_lmcache_v1/disagg_proxy_server.py
- 协调预填充器和解码器之间的 FastAPI 代理服务器disagg_prefill_lmcache_v1/disagg_example_nixl.sh
- 运行示例的主脚本
配置¶
disagg_prefill_lmcache_v1/configs/lmcache-prefiller-config.yaml
- 预填充器服务器的配置disagg_prefill_lmcache_v1/configs/lmcache-decoder-config.yaml
- 解码器服务器的配置
日志文件¶
主脚本生成多个日志文件: - prefiller.log
- 来自预填充服务器的日志 - decoder.log
- 来自解码服务器的日志 - proxy.log
- 来自代理服务器的日志
2. CPU卸载示例¶
python cpu_offload_lmcache.py -v v0
- vLLM v0 的 CPU 卸载实现python cpu_offload_lmcache.py -v v1
- vLLM v1 的 CPU 卸载实现
3. KV缓存共享¶
kv_cache_sharing_lmcache_v1.py
示例展示了如何在 vLLM v1 实例之间共享 KV 缓存。
4. vLLM v0 中的解耦预填充¶
disaggregated_prefill_lmcache_v0.py
提供了一个示例,展示了如何在 vLLM v0 中运行解耦预填充。
示例材料¶
cpu_offload_lmcache.py
# SPDX-License-Identifier: Apache-2.0
"""
This file demonstrates the example usage of cpu offloading
with LMCache in vLLM v1 or v0.
Usage:
Specify vLLM version
-v v0 : Use LMCacheConnector
model = mistralai/Mistral-7B-Instruct-v0.2
(Includes enable_chunked_prefill = True)
-v v1 : Use LMCacheConnectorV1 (default)
model = meta-llama/Meta-Llama-3.1-8B-Instruct
(Without enable_chunked_prefill)
Note that `lmcache` is needed to run this example.
Requirements: Linux, Python: 3.10 or higher, CUDA: 12.1
Learn more about LMCache environment setup, please refer to:
https://docs.lmcache.ai/getting_started/installation.html
"""
import argparse
import contextlib
import os
import time
from dataclasses import asdict
from lmcache.experimental.cache_engine import LMCacheEngineBuilder
from lmcache.integration.vllm.utils import ENGINE_NAME
from vllm import LLM, SamplingParams
from vllm.config import KVTransferConfig
from vllm.engine.arg_utils import EngineArgs
def setup_environment_variables(vllm_version: str):
# LMCache-related environment variables
# Use experimental features in LMCache
os.environ["LMCACHE_USE_EXPERIMENTAL"] = "True"
# LMCache is set to use 256 tokens per chunk
os.environ["LMCACHE_CHUNK_SIZE"] = "256"
# Enable local CPU backend in LMCache
os.environ["LMCACHE_LOCAL_CPU"] = "True"
# Set local CPU memory limit to 5.0 GB
os.environ["LMCACHE_MAX_LOCAL_CPU_SIZE"] = "5.0"
if vllm_version == "v0":
os.environ["VLLM_USE_V1"] = "0"
@contextlib.contextmanager
def build_llm_with_lmcache(lmcache_connector: str, model: str, vllm_version: str):
ktc = KVTransferConfig(
kv_connector=lmcache_connector,
kv_role="kv_both",
)
# Set GPU memory utilization to 0.8 for an A40 GPU with 40GB
# memory. Reduce the value if your GPU has less memory.
# Note: LMCache supports chunked prefill (see vLLM#14505, LMCache#392).
if vllm_version == "v0":
llm_args = EngineArgs(
model=model,
kv_transfer_config=ktc,
max_model_len=8000,
gpu_memory_utilization=0.8,
enable_chunked_prefill=True, # Only in v0
)
else:
llm_args = EngineArgs(
model=model,
kv_transfer_config=ktc,
max_model_len=8000,
gpu_memory_utilization=0.8,
)
llm = LLM(**asdict(llm_args))
try:
yield llm
finally:
# Clean up lmcache backend
LMCacheEngineBuilder.destroy(ENGINE_NAME)
def print_output(
llm: LLM,
prompt: list[str],
sampling_params: SamplingParams,
req_str: str,
):
# Should be able to see logs like the following:
# `LMCache INFO: Storing KV cache for 6006 out of 6006 tokens for request 0`
# This indicates that the KV cache has been stored in LMCache.
start = time.time()
outputs = llm.generate(prompt, sampling_params)
print("-" * 50)
for output in outputs:
generated_text = output.outputs[0].text
print(f"Generated text: {generated_text!r}")
print(f"Generation took {time.time() - start:.2f} seconds, {req_str} request done.")
print("-" * 50)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"-v",
"--version",
choices=["v0", "v1"],
default="v1",
help="Specify vLLM version (default: v1)",
)
return parser.parse_args()
def main():
args = parse_args()
if args.version == "v0":
lmcache_connector = "LMCacheConnector"
model = "mistralai/Mistral-7B-Instruct-v0.2"
else:
lmcache_connector = "LMCacheConnectorV1"
model = "meta-llama/Meta-Llama-3.1-8B-Instruct"
setup_environment_variables(args.version)
with build_llm_with_lmcache(lmcache_connector, model, args.version) as llm:
# This example script runs two requests with a shared prefix.
# Define the shared prompt and specific prompts
shared_prompt = "Hello, how are you?" * 1000
first_prompt = [
shared_prompt + "Hello, my name is",
]
second_prompt = [
shared_prompt + "Tell me a very long story",
]
sampling_params = SamplingParams(temperature=0, top_p=0.95, max_tokens=10)
# Print the first output
print_output(llm, first_prompt, sampling_params, "first")
time.sleep(1)
# print the second output
print_output(llm, second_prompt, sampling_params, "second")
if __name__ == "__main__":
main()
disagg_prefill_lmcache_v0.py
# SPDX-License-Identifier: Apache-2.0
"""
This file demonstrates the example usage of disaggregated prefilling
with LMCache.
We will launch 2 vllm instances (GPU 0 for prefill and GPU 1 for decode),
and launch an additional LMCache server.
KV cache is transferred in the following manner:
vLLM prefill node -> LMCache server -> vLLM decode node.
Note that `pip install lmcache` is needed to run this example.
Learn more about LMCache in https://github.com/LMCache/LMCache.
"""
import os
import subprocess
import time
from multiprocessing import Event, Process
from lmcache.experimental.cache_engine import LMCacheEngineBuilder
from lmcache.integration.vllm.utils import ENGINE_NAME
from vllm import LLM, SamplingParams
from vllm.config import KVTransferConfig
# LMCache-related environment variables
# The port to start LMCache server
port = 8100
# Use experimental features in LMCache
os.environ["LMCACHE_USE_EXPERIMENTAL"] = "True"
# LMCache is set to use 256 tokens per chunk
os.environ["LMCACHE_CHUNK_SIZE"] = "256"
# Disable local CPU backend in LMCache
os.environ["LMCACHE_LOCAL_CPU"] = "False"
# Set local CPU memory buffer limit to 5.0 GB
os.environ["LMCACHE_MAX_LOCAL_CPU_SIZE"] = "5.0"
# Set the remote URL for LMCache server
os.environ["LMCACHE_REMOTE_URL"] = f"lm://localhost:{port}"
# Set the serializer/deserializer between vllm and LMCache server
# `naive` indicates using raw bytes of the tensor without any compression
os.environ["LMCACHE_REMOTE_SERDE"] = "naive"
prompts = [
"Hello, how are you?" * 1000,
]
def run_prefill(prefill_done, prompts):
# We use GPU 0 for prefill node.
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
sampling_params = SamplingParams(temperature=0, top_p=0.95, max_tokens=1)
ktc = KVTransferConfig(
kv_connector="LMCacheConnector",
kv_role="kv_producer",
kv_rank=0,
kv_parallel_size=2,
)
# Set GPU memory utilization to 0.8 for an A40 GPU with 40GB
# memory. Reduce the value if your GPU has less memory.
llm = LLM(
model="mistralai/Mistral-7B-Instruct-v0.2",
kv_transfer_config=ktc,
max_model_len=8000,
gpu_memory_utilization=0.8,
enforce_eager=True,
)
# llm.generate(prompts, sampling_params)
outputs = llm.generate(prompts, sampling_params)
for output in outputs:
generated_text = output.outputs[0].text
print(f"Generated text: {generated_text!r}")
print("Prefill node is finished.")
prefill_done.set()
# Clean up lmcache backend
LMCacheEngineBuilder.destroy(ENGINE_NAME)
def run_decode(prefill_done, prompts, timeout=1):
# We use GPU 1 for decode node.
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
sampling_params = SamplingParams(temperature=0, top_p=0.95, max_tokens=10)
ktc = KVTransferConfig(
kv_connector="LMCacheConnector",
kv_role="kv_consumer",
kv_rank=1,
kv_parallel_size=2,
)
# Set GPU memory utilization to 0.8 for an A40 GPU with 40GB
# of memory. Reduce the value if your GPU has less memory.
llm = LLM(
model="mistralai/Mistral-7B-Instruct-v0.2",
kv_transfer_config=ktc,
max_model_len=8000,
gpu_memory_utilization=0.8,
enforce_eager=True,
)
print("Waiting for prefill node to finish...")
prefill_done.wait()
time.sleep(timeout)
outputs = llm.generate(prompts, sampling_params)
for output in outputs:
generated_text = output.outputs[0].text
print(f"Generated text: {generated_text!r}")
# Clean up lmcache backend
LMCacheEngineBuilder.destroy(ENGINE_NAME)
def run_lmcache_server(port):
server_proc = subprocess.Popen(
["python", "-m", "lmcache.experimental.server", "localhost", str(port)]
)
return server_proc
def main():
prefill_done = Event()
prefill_process = Process(target=run_prefill, args=(prefill_done, prompts))
decode_process = Process(target=run_decode, args=(prefill_done, prompts))
lmcache_server_process = run_lmcache_server(port)
# Start prefill node
prefill_process.start()
# Start decode node
decode_process.start()
# Clean up the processes
decode_process.join()
prefill_process.terminate()
lmcache_server_process.terminate()
lmcache_server_process.wait()
if __name__ == "__main__":
main()
disagg_prefill_lmcache_v1/configs/lmcache-decoder-config.yaml
disagg_prefill_lmcache_v1/configs/lmcache-prefiller-config.yaml
disagg_prefill_lmcache_v1/disagg_example_nixl.sh
#!/bin/bash
echo "Warning: LMCache disaggregated prefill support for vLLM v1 is experimental and subject to change."
PIDS=()
# Switch to the directory of the current script
cd "$(dirname "${BASH_SOURCE[0]}")"
check_hf_token() {
if [ -z "$HF_TOKEN" ]; then
echo "HF_TOKEN is not set. Please set it to your Hugging Face token."
exit 1
fi
if [[ "$HF_TOKEN" != hf_* ]]; then
echo "HF_TOKEN is not a valid Hugging Face token. Please set it to your Hugging Face token."
exit 1
fi
echo "HF_TOKEN is set and valid."
}
check_num_gpus() {
# can you check if the number of GPUs are >=2 via nvidia-smi?
num_gpus=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l)
if [ "$num_gpus" -lt 2 ]; then
echo "You need at least 2 GPUs to run disaggregated prefill."
exit 1
else
echo "Found $num_gpus GPUs."
fi
}
ensure_python_library_installed() {
echo "Checking if $1 is installed..."
python -c "import $1" > /dev/null 2>&1
if [ $? -ne 0 ]; then
if [ "$1" == "nixl" ]; then
echo "$1 is not installed. Please refer to https://github.com/ai-dynamo/nixl for installation."
else
echo "$1 is not installed. Please install it via pip install $1."
fi
exit 1
else
echo "$1 is installed."
fi
}
cleanup() {
echo "Stopping everything…"
trap - INT TERM # prevent re-entrancy
kill -- -$$ # negative PID == “this whole process-group”
wait # reap children so we don't leave zombies
exit 0
}
wait_for_server() {
local port=$1
local timeout_seconds=1200
local start_time=$(date +%s)
echo "Waiting for server on port $port..."
while true; do
if curl -s "localhost:${port}/v1/completions" > /dev/null; then
return 0
fi
local now=$(date +%s)
if (( now - start_time >= timeout_seconds )); then
echo "Timeout waiting for server"
return 1
fi
sleep 1
done
}
main() {
check_hf_token
check_num_gpus
ensure_python_library_installed lmcache
ensure_python_library_installed nixl
ensure_python_library_installed pandas
ensure_python_library_installed datasets
ensure_python_library_installed vllm
trap cleanup INT
trap cleanup USR1
trap cleanup TERM
echo "Launching prefiller, decoder and proxy..."
echo "Please check prefiller.log, decoder.log and proxy.log for logs."
bash disagg_vllm_launcher.sh prefiller \
> >(tee prefiller.log) 2>&1 &
prefiller_pid=$!
PIDS+=($prefiller_pid)
bash disagg_vllm_launcher.sh decoder \
> >(tee decoder.log) 2>&1 &
decoder_pid=$!
PIDS+=($decoder_pid)
python3 disagg_proxy_server.py \
--host localhost \
--port 9000 \
--prefiller-host localhost \
--prefiller-port 8100 \
--decoder-host localhost \
--decoder-port 8200 \
> >(tee proxy.log) 2>&1 &
proxy_pid=$!
PIDS+=($proxy_pid)
wait_for_server 8100
wait_for_server 8200
wait_for_server 9000
echo "All servers are up. Starting benchmark..."
# begin benchmark
cd ../../../benchmarks/
python benchmark_serving.py --port 9000 --seed $(date +%s) \
--model meta-llama/Llama-3.1-8B-Instruct \
--dataset-name random --random-input-len 7500 --random-output-len 200 \
--num-prompts 200 --burstiness 100 --request-rate 3.6 | tee benchmark.log
echo "Benchmarking done. Cleaning up..."
cleanup
}
main
disagg_prefill_lmcache_v1/disagg_proxy_server.py
# SPDX-License-Identifier: Apache-2.0
import argparse
import os
import time
from contextlib import asynccontextmanager
import httpx
import numpy as np
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Lifespan context manager to handle startup and shutdown events.
"""
# Startup: Initialize clients
prefiller_base_url = (
f"http://{global_args.prefiller_host}:{global_args.prefiller_port}/v1"
)
decoder_base_url = (
f"http://{global_args.decoder_host}:{global_args.decoder_port}/v1"
)
app.state.prefill_client = httpx.AsyncClient(
timeout=None, base_url=prefiller_base_url
)
app.state.decode_client = httpx.AsyncClient(timeout=None, base_url=decoder_base_url)
yield
# Shutdown: Close clients
await app.state.prefill_client.aclose()
await app.state.decode_client.aclose()
# Update FastAPI app initialization to use lifespan
app = FastAPI(lifespan=lifespan)
class StatsCalculator:
def __init__(self):
self._stats = []
self._last_log_time = time.time()
def add(self, value):
self._stats.append(value)
if time.time() - self._last_log_time > 5:
self._log_stats()
self._last_log_time = time.time()
def _log_stats(self):
# Print average, median, and 99th percentile
np_arr = np.array(self._stats)
output_str = (
f"\nNum requests: {len(self._stats)}"
+ "\nPrefill node TTFT stats:"
+ f"\n - Average (ms): {np.mean(np_arr)}"
+ f"\n - Median (ms): {np.median(np_arr)}"
+ f"\n - 99th Percentile (ms): {np.percentile(np_arr, 99)}\n"
)
print(
"===============================",
output_str,
"===============================",
)
stats_calculator = StatsCalculator()
counter = 0
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=int, default=8000)
parser.add_argument("--host", type=str, default="localhost")
parser.add_argument("--prefiller-host", type=str, default="localhost")
parser.add_argument("--prefiller-port", type=int, default=8100)
parser.add_argument("--decoder-host", type=str, default="localhost")
parser.add_argument("--decoder-port", type=int, default=8200)
args = parser.parse_args()
return args
# Initialize variables to hold the persistent clients
app.state.prefill_client = None
app.state.decode_client = None
async def send_request_to_service(
client: httpx.AsyncClient, endpoint: str, req_data: dict
):
"""
Send a request to a service using a persistent client.
"""
req_data = req_data.copy()
req_data["max_tokens"] = 1
if "max_completion_tokens" in req_data:
req_data["max_completion_tokens"] = 1
headers = {"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"}
response = await client.post(endpoint, json=req_data, headers=headers)
response.raise_for_status()
return response
async def stream_service_response(
client: httpx.AsyncClient, endpoint: str, req_data: dict
):
"""
Asynchronously stream the response from a service using a persistent client.
"""
headers = {"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"}
async with client.stream(
"POST", endpoint, json=req_data, headers=headers
) as response:
response.raise_for_status()
async for chunk in response.aiter_bytes():
yield chunk
@app.post("/v1/completions")
async def handle_completions(request: Request):
global counter, stats_calculator
counter += 1
st = time.time()
try:
req_data = await request.json()
# Send request to prefill service, ignore the response
await send_request_to_service(
app.state.prefill_client, "/completions", req_data
)
et = time.time()
stats_calculator.add(et - st)
# Stream response from decode service
async def generate_stream():
async for chunk in stream_service_response(
app.state.decode_client, "/completions", req_data
):
yield chunk
return StreamingResponse(generate_stream(), media_type="text/event-stream")
except Exception as e:
import sys
import traceback
exc_info = sys.exc_info()
print("Error occurred in disagg prefill proxy server - completions endpoint")
print(e)
print("".join(traceback.format_exception(*exc_info)))
raise
@app.post("/v1/chat/completions")
async def handle_chat_completions(request: Request):
global counter, stats_calculator
counter += 1
st = time.time()
try:
req_data = await request.json()
# Send request to prefill service, ignore the response
await send_request_to_service(
app.state.prefill_client, "/chat/completions", req_data
)
et = time.time()
stats_calculator.add(et - st)
# Stream response from decode service
async def generate_stream():
async for chunk in stream_service_response(
app.state.decode_client, "/chat/completions", req_data
):
yield chunk
return StreamingResponse(generate_stream(), media_type="text/event-stream")
except Exception as e:
import sys
import traceback
exc_info = sys.exc_info()
print(
"Error occurred in disagg prefill proxy server - chat completions endpoint"
)
print(e)
print("".join(traceback.format_exception(*exc_info)))
raise
if __name__ == "__main__":
global global_args
global_args = parse_args()
import uvicorn
uvicorn.run(app, host=global_args.host, port=global_args.port)
disagg_prefill_lmcache_v1/disagg_vllm_launcher.sh
#!/bin/bash
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
if [[ $# -lt 1 ]]; then
echo "Usage: $0 <prefiller | decoder> [model]"
exit 1
fi
if [[ $# -eq 1 ]]; then
echo "Using default model: meta-llama/Llama-3.1-8B-Instruct"
MODEL="meta-llama/Llama-3.1-8B-Instruct"
else
echo "Using model: $2"
MODEL=$2
fi
if [[ $1 == "prefiller" ]]; then
# Prefiller listens on port 8100
prefill_config_file=$SCRIPT_DIR/configs/lmcache-prefiller-config.yaml
UCX_TLS=cuda_ipc,cuda_copy,tcp \
LMCACHE_CONFIG_FILE=$prefill_config_file \
LMCACHE_USE_EXPERIMENTAL=True \
VLLM_ENABLE_V1_MULTIPROCESSING=1 \
VLLM_WORKER_MULTIPROC_METHOD=spawn \
CUDA_VISIBLE_DEVICES=0 \
vllm serve $MODEL \
--port 8100 \
--disable-log-requests \
--enforce-eager \
--kv-transfer-config \
'{"kv_connector":"LMCacheConnectorV1","kv_role":"kv_producer","kv_connector_extra_config": {"discard_partial_chunks": false, "lmcache_rpc_port": "producer1"}}'
elif [[ $1 == "decoder" ]]; then
# Decoder listens on port 8200
decode_config_file=$SCRIPT_DIR/configs/lmcache-decoder-config.yaml
UCX_TLS=cuda_ipc,cuda_copy,tcp \
LMCACHE_CONFIG_FILE=$decode_config_file \
LMCACHE_USE_EXPERIMENTAL=True \
VLLM_ENABLE_V1_MULTIPROCESSING=1 \
VLLM_WORKER_MULTIPROC_METHOD=spawn \
CUDA_VISIBLE_DEVICES=1 \
vllm serve $MODEL \
--port 8200 \
--disable-log-requests \
--enforce-eager \
--kv-transfer-config \
'{"kv_connector":"LMCacheConnectorV1","kv_role":"kv_consumer","kv_connector_extra_config": {"discard_partial_chunks": false, "lmcache_rpc_port": "consumer1"}}'
else
echo "Invalid role: $1"
echo "Should be either prefiller, decoder"
exit 1
fi
kv_cache_sharing_lmcache_v1.py
# SPDX-License-Identifier: Apache-2.0
"""
This file demonstrates the example usage of remote KV cache sharing
with LMCache.
We will launch 2 vllm instances, and launch an additional LMCache server.
KV cache is transferred in the following manner:
(1) vLLM instance 1 -> LMCache server (KV cache store).
(2) LMCache server -> vLLM instance 2 (KV cache reuse/retrieve).
Note that lmcache needs to be installed to run this example.
Learn more about LMCache in https://github.com/LMCache/LMCache.
"""
import os
import subprocess
import time
from multiprocessing import Event, Process
from lmcache.experimental.cache_engine import LMCacheEngineBuilder
from lmcache.integration.vllm.utils import ENGINE_NAME
from vllm import LLM, SamplingParams
from vllm.config import KVTransferConfig
# LMCache-related environment variables
# The port to start LMCache server
port = 8100
# Use experimental features in LMCache
os.environ["LMCACHE_USE_EXPERIMENTAL"] = "True"
# LMCache is set to use 256 tokens per chunk
os.environ["LMCACHE_CHUNK_SIZE"] = "256"
# Disable local CPU backend in LMCache
os.environ["LMCACHE_LOCAL_CPU"] = "False"
# Set local CPU memory buffer limit to 5.0 GB
os.environ["LMCACHE_MAX_LOCAL_CPU_SIZE"] = "5.0"
# Set the remote URL for LMCache server
os.environ["LMCACHE_REMOTE_URL"] = f"lm://localhost:{port}"
# Set the serializer/deserializer between vllm and LMCache server
# `naive` indicates using raw bytes of the tensor without any compression
os.environ["LMCACHE_REMOTE_SERDE"] = "naive"
prompts = [
"Hello, how are you?" * 1000,
]
def run_store(store_done, prompts):
# We use GPU 0 for KV cache store process.
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
sampling_params = SamplingParams(temperature=0, top_p=0.95, max_tokens=10)
ktc = KVTransferConfig(kv_connector="LMCacheConnectorV1", kv_role="kv_both")
# Set GPU memory utilization to 0.8 for an A40 GPU with 40GB
# memory. Reduce the value if your GPU has less memory.
llm = LLM(
model="mistralai/Mistral-7B-Instruct-v0.2",
kv_transfer_config=ktc,
max_model_len=8000,
gpu_memory_utilization=0.8,
enforce_eager=True,
)
outputs = llm.generate(prompts, sampling_params)
for output in outputs:
generated_text = output.outputs[0].text
print(f"Generated text: {generated_text!r}")
print("KV cache store is finished.")
store_done.set()
# Clean up lmcache backend
LMCacheEngineBuilder.destroy(ENGINE_NAME)
def run_retrieve(store_done, prompts, timeout=1):
# We use GPU 1 for KV cache retrieve process.
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
sampling_params = SamplingParams(temperature=0, top_p=0.95, max_tokens=10)
ktc = KVTransferConfig(kv_connector="LMCacheConnectorV1", kv_role="kv_both")
# Set GPU memory utilization to 0.8 for an A40 GPU with 40GB
# of memory. Reduce the value if your GPU has less memory.
llm = LLM(
model="mistralai/Mistral-7B-Instruct-v0.2",
kv_transfer_config=ktc,
max_model_len=8000,
gpu_memory_utilization=0.8,
enforce_eager=True,
)
print("Waiting for KV cache store to finish...")
store_done.wait()
time.sleep(timeout)
outputs = llm.generate(prompts, sampling_params)
for output in outputs:
generated_text = output.outputs[0].text
print(f"Generated text: {generated_text!r}")
# Clean up lmcache backend
LMCacheEngineBuilder.destroy(ENGINE_NAME)
def run_lmcache_server(port):
server_proc = subprocess.Popen(
["python", "-m", "lmcache.experimental.server", "localhost", str(port)]
)
return server_proc
def main():
store_done = Event()
store_process = Process(target=run_store, args=(store_done, prompts))
retrieve_process = Process(target=run_retrieve, args=(store_done, prompts))
lmcache_server_process = run_lmcache_server(port)
# Start KV cache store process
store_process.start()
# Start KV cache retrieve process
retrieve_process.start()
# Clean up the processes
store_process.join()
retrieve_process.terminate()
lmcache_server_process.terminate()
lmcache_server_process.wait()
if __name__ == "__main__":
main()