Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/vllm-benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ jobs:
ls -lah "${BENCHMARK_RESULTS}"

python upload_benchmark_results.py \
--vllm vllm \
--repo vllm \
--benchmark-name "vLLM benchmark" \
--benchmark-results "${BENCHMARK_RESULTS}" \
--device "${GPU_DEVICE}" \
--model "${MODELS//\//_}"
198 changes: 141 additions & 57 deletions vllm-benchmarks/upload_benchmark_results.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3

import requests
import glob
import gzip
import json
Expand All @@ -12,6 +13,7 @@
from argparse import Action, ArgumentParser, Namespace
from logging import info, warning
from typing import Any, Dict, List, Optional, Tuple
from json.decoder import JSONDecodeError

import boto3
import psutil
Expand All @@ -20,8 +22,10 @@

logging.basicConfig(level=logging.INFO)


REPO = "vllm-project/vllm"
S3_BUCKET = "ossci-benchmarks"
UPLOADER_URL = "https://qrr6jzjpvyyd77fkj6mqkes4mq0tpirr.lambda-url.us-east-1.on.aws"
UPLOADER_USERNAME = os.environ.get("UPLOADER_USERNAME")
UPLOADER_PASSWORD = os.environ.get("UPLOADER_PASSWORD")


class ValidateDir(Action):
Expand All @@ -40,46 +44,58 @@ def __call__(


def parse_args() -> Any:
parser = ArgumentParser("Upload vLLM benchmarks results to S3")
vllm_metadata = parser.add_mutually_exclusive_group(required=True)
vllm_metadata.add_argument(
"--vllm",
parser = ArgumentParser("Upload benchmarks results")

# Git metadata
repo_metadata = parser.add_mutually_exclusive_group(required=True)
repo_metadata.add_argument(
"--repo",
type=str,
action=ValidateDir,
help="the directory that vllm repo is checked out",
help="the directory that the repo is checked out",
)
branch_commit = repo_metadata.add_argument_group("the branch and commit metadata")
branch_commit.add_argument(
"--repo-name",
type=str,
help="the name of the repo",
)
branch_commit = vllm_metadata.add_argument_group("vLLM branch and commit metadata")
branch_commit.add_argument(
"--head-branch",
type=str,
default="main",
help="the name of the vLLM branch the benchmark runs on",
help="the name of the branch the benchmark runs on",
)
branch_commit.add_argument(
"--head-sha",
type=str,
help="the commit SHA the benchmark runs on",
)

# Benchmark info
parser.add_argument(
"--benchmark-results",
"--benchmark-name",
type=str,
required=True,
action=ValidateDir,
help="the directory with the benchmark results",
help="the name of the benchmark",
)
parser.add_argument(
"--s3-bucket",
"--benchmark-results",
type=str,
required=False,
default="ossci-benchmarks",
help="the S3 bucket to upload the benchmark results",
required=True,
action=ValidateDir,
help="the directory with the benchmark results",
)

# Device info
parser.add_argument(
"--device",
type=str,
required=True,
help="the name of the GPU device coming from nvidia-smi or amd-smi",
)

# Optional suffix
parser.add_argument(
"--model",
type=str,
Expand All @@ -93,27 +109,33 @@ def parse_args() -> Any:
return parser.parse_args()


def get_git_metadata(vllm_dir: str) -> Tuple[str, str]:
repo = Repo(vllm_dir)
def get_git_metadata(repo_dir: str) -> Tuple[str, str]:
repo = Repo(repo_dir)
# Git metadata
repo_name = repo.remotes.origin.url.split(".git")[0].split(":")[-1]
hexsha = repo.head.object.hexsha
committed_date = repo.head.object.committed_date

try:
return (
repo_name,
repo.active_branch.name,
repo.head.object.hexsha,
repo.head.object.committed_date,
hexsha,
committed_date,
)
except TypeError:
# This is a detached HEAD, default the branch to main
return "main", repo.head.object.hexsha, repo.head.object.committed_date
return repo_name, "main", hexsha, committed_date


def get_benchmark_metadata(
head_branch: str, head_sha: str, timestamp: int
repo_name: str, head_branch: str, head_sha: str, timestamp: int, benchmark_name
) -> Dict[str, Any]:
return {
"timestamp": timestamp,
"schema_version": "v3",
"name": "vLLM benchmark",
"repo": REPO,
"name": benchmark_name,
"repo": repo_name,
"head_branch": head_branch,
"head_sha": head_sha,
"workflow_id": os.getenv("WORKFLOW_ID", timestamp),
Expand Down Expand Up @@ -147,27 +169,58 @@ def get_runner_info() -> Dict[str, Any]:
}


def load(benchmark_results: str) -> Dict[str, List]:
def read_benchmark_results(filepath: str) -> List[Dict[str, Any]]:
results = []
with open(filepath) as f:
try:
r = json.load(f)
# Handle the JSONEachRow case where there is only one record in the
# JSON file, it can still be loaded normally, but will need to be
# added into the list of benchmark results with the length of 1
if isinstance(r, dict):
results.append(r)
elif isinstance(r, list):
results = r

except JSONDecodeError:
f.seek(0)

# Try again in ClickHouse JSONEachRow format
for line in f:
try:
r = json.loads(line)
# Each row needs to be a dictionary in JSON format or a list
if isinstance(r, dict):
results.append(r)
elif isinstance(r, list):
results.extend(r)
else:
warning(f"Not a JSON dict or list {line}, skipping")
continue

except JSONDecodeError:
warning(f"Invalid JSON {line}, skipping")

return results


def load(benchmark_results_dir: str) -> Dict[str, List]:
results = {}

for file in glob.glob(f"{benchmark_results}/*.json"):
for file in glob.glob(f"{benchmark_results_dir}/*.json"):
filename = os.path.basename(file)
with open(file) as f:
try:
r = json.load(f)
except json.JSONDecodeError as e:
warning(f"Fail to load {file}: {e}")
continue
r = read_benchmark_results(file)

if not r:
warning(f"Find no benchmark results in {file}")
continue
if not r:
warning(f"{file} is empty")
continue

if type(r) is not list or "benchmark" not in r[0]:
warning(f"Find no PyTorch benchmark results in {file}")
continue
if type(r) is not list or "benchmark" not in r[0]:
warning(f"Find no PyTorch benchmark results in {file}")
continue

results[filename] = r
info(f"Loading benchmark results from {file}")
results[filename] = r

return results

Expand All @@ -184,8 +237,36 @@ def aggregate(
return aggregated_results


def upload_to_s3(
s3_bucket: str,
def upload_s3(s3_path: str, data: str) -> None:
boto3.resource("s3").Object(
f"{S3_BUCKET}",
f"{s3_path}",
).put(
Body=gzip.compress(data.encode()),
ContentEncoding="gzip",
ContentType="application/json",
)


def upload_via_api(
s3_path: str,
data: str,
) -> None:
json_data = {
"username": UPLOADER_USERNAME,
"password": UPLOADER_PASSWORD,
"s3_path": s3_path,
"content": data,
}

headers = {"content-type": "application/json"}

r = requests.post(UPLOADER_URL, json=json_data, headers=headers)
info(r.content)


def upload(
repo_name: str,
head_branch: str,
head_sha: str,
aggregated_results: List[Dict[str, Any]],
Expand All @@ -194,35 +275,38 @@ def upload_to_s3(
dry_run: bool = True,
) -> None:
model_suffix = f"_{model}" if model else ""
s3_path = f"v3/{REPO}/{head_branch}/{head_sha}/{device}/benchmark_results{model_suffix}.json"
info(f"Upload benchmark results to s3://{s3_bucket}/{s3_path}")
s3_path = f"v3/{repo_name}/{head_branch}/{head_sha}/{device}/benchmark_results{model_suffix}.json"

info(f"Upload benchmark results to {s3_path}")
if not dry_run:
# Write in JSONEachRow format
data = "\n".join([json.dumps(r) for r in aggregated_results])
boto3.resource("s3").Object(
f"{s3_bucket}",
f"{s3_path}",
).put(
Body=gzip.compress(data.encode()),
ContentEncoding="gzip",
ContentType="application/json",
)

if UPLOADER_USERNAME and UPLOADER_PASSWORD:
# If the username and password are set, try to use the API (preferable)
upload_via_api(s3_path, data)
else:
# Otherwise, try to upload directly to the bucket
upload_s3(s3_path, data)


def main() -> None:
args = parse_args()

if args.vllm:
head_branch, head_sha, timestamp = get_git_metadata(args.vllm)
if args.repo:
repo_name, head_branch, head_sha, timestamp = get_git_metadata(args.repo)
else:
head_branch, head_sha, timestamp = (
repo_name, head_branch, head_sha, timestamp = (
args.repo_name,
args.head_branch,
args.head_sha,
int(time.time()),
)

# Gather some information about the benchmark
metadata = get_benchmark_metadata(head_branch, head_sha, timestamp)
metadata = get_benchmark_metadata(
repo_name, head_branch, head_sha, timestamp, args.benchmark_name
)
runner = get_runner_info()

# Extract and aggregate the benchmark results
Expand All @@ -231,8 +315,8 @@ def main() -> None:
warning(f"Find no benchmark results in {args.benchmark_results}")
sys.exit(1)

upload_to_s3(
args.s3_bucket,
upload(
repo_name,
head_branch,
head_sha,
aggregated_results,
Expand Down
Loading