Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 110 additions & 9 deletions problems/amd_distributed/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Any, Optional

import torch.cuda
from torch.cuda.nvtx import range as nvtx_range

from utils import set_seed, clear_l2_cache

Expand Down Expand Up @@ -499,26 +500,126 @@ def run_benchmarking(logger: PopcornOutput, pool: multiprocessing.Pool, tests: l
return 112


def run_single_profile(test: TestCase) -> str:
def _run_single_profile(test: TestCase) -> str:
"""
Runs a single test case. Do not call directly
"""
from submission import custom_kernel
from torch.profiler import profile, record_function, ProfilerActivity
data = generate_input(**test.args)
torch.cuda.synchronize()
from torch.profiler import profile, ProfilerActivity

with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA]) as prof:
submission_output = custom_kernel(_clone_data(data, 0))
with nvtx_range("generate input"):
data = generate_input(**test.args)
torch.cuda.synchronize()

with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA]) as prof:
with nvtx_range("custom_kernel"):
submission_output = custom_kernel(_clone_data(data, 0))
torch.cuda.synchronize()

return prof.key_averages().table(sort_by="self_cuda_time_total", row_limit=20)


def run_profiling(logger: PopcornOutput, tests: list[TestCase]):
def _run_distributed_profile(test: TestCase, rank: int) -> "EventList":
"""
Runs a single profiling case. Do not call directly
"""
from submission import custom_kernel
from torch.profiler import profile, ProfilerActivity
import torch.distributed as dist

with nvtx_range(f"init nccl, rank {rank}"):
world_size = test.args["world_size"]
os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "12356"
dist.init_process_group("nccl", init_method="env://", rank=rank, world_size=world_size, device_id=torch.device(f'cuda:{rank}'))

try:
with nvtx_range(f"generate input, rank {rank}"):
data = generate_input(**test.args, rank=rank)
data = _clone_data(data, rank)
torch.cuda.synchronize()
dist.barrier()

with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA]) as prof:
with nvtx_range(f"custom_kernel, rank {rank}"):
submission_output = custom_kernel(data)
torch.cuda.synchronize()
dist.barrier()

return prof.events()

finally:
dist.destroy_process_group()


def _combine_traces(traces: list["EventList"]) -> "EventList":
"""
Combine multiple event traces obtained from multiple (distributed) torch.profiler
activities. This function simply aggregates the data as like `prof.key_averages()`,
except over multiple traces. Most of this function is reimplemented
from `torch.autograd.profiler_util.EventList.key_averages()`.
"""
from torch.autograd.profiler_util import FunctionEventAvg, EventList
from collections import defaultdict

def get_key(event) -> tuple[str, ...]:
return (
str(event.key),
str(event.node_id),
str(event.device_type),
str(event.is_legacy),
str(event.is_user_annotation),
)

stats: dict[tuple[str, ...], FunctionEventAvg] = defaultdict(FunctionEventAvg)

for events in traces:
for event in events:
stats[get_key(event)].add(event)

avg_list = EventList(stats.values())
for event in avg_list:
event.stack = []
event.input_shapes = ""
event.overload_name = ""

return avg_list


def run_multi_gpu_profile(pool: multiprocessing.Pool, test: TestCase, world_size: int) -> str:
"""
Runs a single test in another process.
"""
rets = []
# world_size is a mandatory argument for multi-gpu tests
for i in range(world_size):
rets.append(
pool.apply_async(
_run_distributed_profile,
args=(test, i),
)
)

rets = [el.get(120) for el in rets]
return _combine_traces(rets).table(sort_by="self_cuda_time_total", row_limit=20)


def run_single_profile(test: TestCase, pool: multiprocessing.Pool) -> str:
"""
Runs a single profiling activity in another process.
"""
world_size = test.args.get("world_size", None)
if world_size is None:
return pool.apply(_run_single_profile, (test,))
else:
return run_multi_gpu_profile(pool, test, world_size)


def run_profiling(logger: PopcornOutput, pool: multiprocessing.Pool, tests: list[TestCase]):
logger.log("benchmark-count", len(tests))
for idx, test in enumerate(tests):
logger.log(f"benchmark.{idx}.spec", test.spec)
report = run_single_profile(test)
report = run_single_profile(test, pool)
logger.log(f"benchmark.{idx}.report", base64.b64encode(report.encode("utf-8"), b"+*").decode("utf-8"))
logger.log("check", "pass")
return 0
Expand Down Expand Up @@ -568,7 +669,7 @@ def main():

logger.log("check", "pass" if passed else "fail")
elif mode == "profile":
run_profiling(logger, tests)
run_profiling(logger, pool, tests)
else:
# invalid mode
return 2
Expand Down