From 89e38801b50aaf9e21e3de175f0add03e9a721fd Mon Sep 17 00:00:00 2001 From: YouMeiYouMaoTai <15335885760@163.com> Date: Sat, 30 Nov 2024 11:02:16 +0800 Subject: [PATCH] perf: temp_scaler, bp_balance, load_least, ensure_scaler, ensure_scheduler, lass, pass, rotate. and Three types of load plots --- scripts/analyse_experiment_data.py | 154 ----- scripts/draw.yaml | 44 -- scripts/draw_diff_load_on_1_figure.py | 540 ++++++++++++++++++ scripts/draw_diff_load_on_1_figure.yaml | 95 +++ serverless_sim/src/scale/num/ensure_scaler.rs | 39 +- serverless_sim/src/scale/num/temp_scaler.rs | 58 +- serverless_sim/src/sche/bp_balance.rs | 8 +- serverless_sim/src/sche/ensure_scheduler.rs | 43 +- serverless_sim/src/sche/load_least.rs | 14 +- serverless_sim/src/sche/pass.rs | 31 +- serverless_sim/src/sche/rotate.rs | 1 + 11 files changed, 771 insertions(+), 256 deletions(-) delete mode 100644 scripts/analyse_experiment_data.py delete mode 100644 scripts/draw.yaml create mode 100644 scripts/draw_diff_load_on_1_figure.py create mode 100644 scripts/draw_diff_load_on_1_figure.yaml diff --git a/scripts/analyse_experiment_data.py b/scripts/analyse_experiment_data.py deleted file mode 100644 index 8fc14e5..0000000 --- a/scripts/analyse_experiment_data.py +++ /dev/null @@ -1,154 +0,0 @@ -# 自动运行同路径下的run_different_req_freq.py -import os -CUR_FPATH = os.path.abspath(__file__) -CUR_FDIR = os.path.dirname(CUR_FPATH) -# chdir to the directory of this script -os.chdir(CUR_FDIR) - -import json -import os -import subprocess -import time -import matplotlib.pyplot as plt -import numpy as np - -# json文件中各个字段的索引 -FRAME_IDX_FRAME = 0; # 帧数 -FRAME_IDX_RUNNING_REQS = 1; # 请求数量 -FRAME_IDX_NODES = 2; # 节点的状态:cpu、mem -FRAME_IDX_REQ_DONE_TIME_AVG = 3; # 请求的平均完成时间 -FRAME_IDX_REQ_DONE_TIME_STD = 4; # 请求的完成时间的标准差 -FRAME_IDX_REQ_DONE_TIME_AVG_90P = 5; # 请求的90%完成时间 -FRAME_IDX_COST = 6; # 成本 -FRAME_IDX_SCORE = 7; # 得分(强化学习用) -FRAME_IDX_DONE_REQ_COUNT = 8; # 已完成请求数量 -FRAME_IDX_REQ_WAIT_SCHE_TIME = 9; # 等待调度的时间 -FRAME_IDX_REQ_WAIT_COLDSTART_TIME = 10; # 冷启动的时间 -FRAME_IDX_REQ_DATA_RECV_TIME = 11; # 数据接收时间 -FRAME_IDX_REQ_EXE_TIME = 12; # 请求的执行时间 -FRAME_IDX_ALGO_EXE_TIME = 13; # 算法执行时间 -FRAME_IDX_FNCONTAINER_COUNT = 14; # 总的容器数量 - -""" -目前比较的指标有: -FRAME_IDX_REQ_DONE_TIME_AVG = 3; # 请求的平均完成时间 -FRAME_IDX_COST = 6; # 成本 -性价比: 1 / (FRAME_IDX_REQ_DONE_TIME_AVG * FRAME_IDX_COST) -FRAME_IDX_REQ_WAIT_COLDSTART_TIME = 10; # 冷启动的时间 -""" - -# 记录文件的路径,相对路径报错 -records_path = "..\\serverless_sim\\records" -script_path = ".\\run_different_req_freq.py" # py脚本的路径 -output_path = "实验结果\\算法延迟\\实验结果-10帧生成" - -# create outout dir -try: - os.makedirs(output_path) -except: - pass - -RUN_TIMES = 10 # 运行次数 -# 算法组合,key为算法名,value为数组,数组的元素为字典,key为参数名,value为参数值 -algos_metrics = {} # HashMap>> - -# 多次运行脚本以分析实验数据 -def run_script(): - for _ in range(RUN_TIMES): - # 使用subprocess.run来运行脚本,并等待其完成 - result = subprocess.run(['python', script_path], check=True) - # 检查运行结果,如果失败则抛出异常 - if result.returncode != 0: - raise Exception(f"脚本运行失败,返回码: {result.returncode}") - # 可以在这里添加等待时间,如果需要的话 - time.sleep(1) - - -# 根据执行后的json文件分析运行了哪些算法组合 -def analyze_which_algo(): - json_files = [f for f in os.listdir(records_path) if f.endswith('.json')] - algos = [] - for file in json_files: - # 用 . 分割文件名,取出算法名 - compete_name = file.split('.') - a = 1 - algo_name = compete_name[5][4:] + "." + compete_name[9][4:] + "." + compete_name[11][3:] - if algo_name not in algos: - algos.append(algo_name) - for algo in algos: - algos_metrics[algo] = [] - - -# 分析同一个算法运行 RUN_TIMES 次的实验结果折线图 -def analyze_same_algo_metrics_bytimes(): - json_files = [f for f in os.listdir(records_path) if f.endswith('.json')] - for file in json_files: - # 取出算法名 - compete_name = file.split('.') - algo_name = compete_name[5][4:] + "." + compete_name[9][4:] + "." + compete_name[11][3:] - - with open(os.path.join(records_path, file), 'r') as f: - # print("fcontent: ", f.read()) - # 读取json数据 - record = json.load(f) - frames = record['frames'] - done_time = frames[len(frames) - 1][3] - cost = frames[len(frames) - 1][6] - efficency = 1 / (frames[len(frames) - 1][3] * frames[len(frames) - 1][6]) - cold_start_time = frames[len(frames) - 1][10] - algos_metrics[algo_name].append({'req_done_time_avg': done_time, 'cost': cost, 'efficency': efficency, 'cold_start_time': cold_start_time}) - - -# 分析不同算法运行 RUN_TIMES 次的平均指标柱状图 -def analyze_diff_algo_avg_metrics(): - metrics = ['req_done_time_avg', 'cost', 'efficency', 'cold_start_time'] - colors = ['b', 'g', 'r', 'c', 'm', 'y'] - - # 生成折线图 - for algo, data in algos_metrics.items(): - fig, axs = plt.subplots(2, 2, figsize=(15, 10)) - for idx, metric in enumerate(metrics): - ax = axs[idx // 2, idx % 2] - values = [d[metric] for d in data] - for i, entry in enumerate(data): - ax.plot(range(len(data)), values, color=colors[i % len(colors)]) - std_value = np.std(values) - ax.text(0.95, 0.95, f'STD: {std_value:.5f}', transform=ax.transAxes, fontsize=12, verticalalignment='top', horizontalalignment='right') - ax.set_title(f'{metric}') - ax.set_xlabel('TIMES') - ax.set_ylabel(metric) - ax.legend() - fig.suptitle(f'{algo} - Metrics') - plt.tight_layout(rect=[0, 0, 1, 0.96]) - plt.savefig(os.path.join(output_path, f"{algo}.png")) - - # 生成直方图 - avg_metrics = {algo: {metric: np.mean([entry[metric] for entry in data]) for metric in metrics} for algo, data in algos_metrics.items()} - - fig, axs = plt.subplots(2, 2, figsize=(15, 10)) - for idx, metric in enumerate(metrics): - ax = axs[idx // 2, idx % 2] - algo_names = list(avg_metrics.keys()) - values = [avg_metrics[algo][metric] for algo in algo_names] - bars = ax.bar(algo_names, values, color=colors[:len(algo_names)]) - ax.set_title(f'Average {metric} Comparison') - ax.set_xlabel('Algorithm') - ax.set_ylabel(metric) - ax.set_xticklabels(algo_names, rotation=45, ha='right') - # 在每个直方上方显示具体数值 - for bar, value in zip(bars, values): - ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height(), f'{value:.2f}', ha='center', va='bottom') - plt.tight_layout() - plt.savefig(os.path.join(output_path, "avg_comparison.png")) - - -if __name__ == "__main__": - # run_script() - - analyze_which_algo() - - - analyze_same_algo_metrics_bytimes() - - - analyze_diff_algo_avg_metrics() diff --git a/scripts/draw.yaml b/scripts/draw.yaml deleted file mode 100644 index ed7558d..0000000 --- a/scripts/draw.yaml +++ /dev/null @@ -1,44 +0,0 @@ -## filter with fixed value -filter: - dag_type: single - cold_start: high - fn_type: cpu - scale_down_exec: default. - # request_freq: low - -## each group bars -targets_alias: -- [{scale_num: temp_scaler., scale_up_exec: least_task.,sche: pos.greedy, instance_cache_policy: no_evict.}, 'Temp_POSG_NoEvi'] -- [{scale_num: temp_scaler., scale_up_exec: least_task.,sche: pos.random, instance_cache_policy: no_evict.}, 'Temp_POSR_NoEvi'] -# - [{scale_num: temp_scaler., scale_up_exec: least_task.,sche: pos., instance_cache_policy: lru.10}, 'Temp_POS_LRU10'] -- [{scale_num: hpa., scale_up_exec: least_task.,sche: pos.greedy,instance_cache_policy: no_evict.}, 'HPA_POSG_NoEvi'] -- [{scale_num: hpa., scale_up_exec: least_task.,sche: pos.random,instance_cache_policy: no_evict.}, 'HPA_POSR_NoEvi'] -# - [{scale_num: hpa., scale_up_exec: least_task.,sche: pos., instance_cache_policy: lru.10}, 'HPA_POS_LRU10'] -# - [{scale_num: hpa., scale_up_exec: least_task.,sche: random.,instance_cache_policy: no_evict.}, 'HPA_Random_NoEvi'] -# - [{scale_num: hpa., scale_up_exec: least_task.,sche: random.,instance_cache_policy: lru.10}, 'HPA_Random_LRU10'] -- [{scale_num: hpa., scale_up_exec: least_task.,sche: greedy., instance_cache_policy: no_evict.}, 'HPA_Greedy_NoEvi'] -# - [{scale_num: hpa., scale_up_exec: least_task.,sche: greedy., instance_cache_policy: lru.10}, 'HPA_Greedy_LRU10'] -# - [{scale_num: hpa., scale_up_exec: least_task.,sche: bp_balance., instance_cache_policy: lru.10}, 'HPA_bp_balance_LRU10'] -# - [{scale_num: hpa., scale_up_exec: least_task.,sche: bp_balance., instance_cache_policy: no_evict.}, 'HPA_bp_balance_NoEvi'] - -# - [{scale_num: full_placement., scale_up_exec: least_task.,sche: pos., instance_cache_policy: lru.10}, 'FP_Pos_lru.10'] -# - [{scale_num: full_placement., scale_up_exec: least_task.,sche: pos., instance_cache_policy: no_evict.}, 'FP_Pos_NoEvi'] - -# - [{mechtype: scale_sche_joint,scale_num: hpa., scale_up_exec: least_task.,sche: pos.}, 'Joint_HPA_POS)'] -# - [{mechtype: scale_sche_joint,scale_num: temp., scale_up_exec: least_task.,sche: pos.}, 'Joint_Temp_POS'] - -# - [{mechtype: no_scaler,scale_num: 'no', scale_up_exec: 'no',sche: greedy}, 'NoScalerGreedy'] - -## group on x axis: -group: - by: request_freq - types: [low,middle,high] - alias: 'Request Frequency' - type_alias: ['Low','Middle','High'] - -## y axis -values: -# - {alias: Throughput, trans: throughput} -- {alias: Cost, trans: cost_per_req} -- {alias: Latency(ms), trans: '[waitsche_time_per_req,coldstart_time_per_req,datarecv_time_per_req,exe_time_per_req]'} # convert 10ms to ms -- {alias: Quality-Price Ratio, trans: 1/cost_per_req/time_per_req} \ No newline at end of file diff --git a/scripts/draw_diff_load_on_1_figure.py b/scripts/draw_diff_load_on_1_figure.py new file mode 100644 index 0000000..71f8b3c --- /dev/null +++ b/scripts/draw_diff_load_on_1_figure.py @@ -0,0 +1,540 @@ +import os +CUR_FPATH = os.path.abspath(__file__) +CUR_FDIR = os.path.dirname(CUR_FPATH) +# chdir to the directory of this script +os.chdir(CUR_FDIR) + +import requests +from pprint import pprint +import yaml +import re +import matplotlib.pyplot as plt +import numpy as np + +### doc: https://fvd360f8oos.feishu.cn/docx/RMjfdhRutoDmOkx4f4Lcl1sjnzd + +# class PackedRecord: +# # configstr.clone().into(), +# # cost_per_req, +# # time_per_req, +# # score, +# # rps.into(), +# # f.time_str.clone().into() +# raw_record=[] + +# configstr="" +# cost_per_req=0.0 +# time_per_req=0.0 +# score=0.0 +# rps=0.0 +# coldstart_time_per_req=0.0 +# waitsche_time_per_req=0.0 +# datarecv_time_per_req=0.0 +# exe_time_per_req=0.0 + +# filename="" + +# rand_seed="" +# request_freq="" +# dag_type="" +# cold_start="" +# scale_num="" +# scale_down_exec="" +# scale_up_exec="" +# fn_type="" +# instance_cache_policy="" + + +# def __init__(self, raw_record): +# if len(raw_record) != 10: +# raise ValueError("The input list must contain exactly 10 elements.") +# self.configstr = raw_record[0] +# self.cost_per_req = raw_record[1] +# self.time_per_req = raw_record[2] +# self.score = raw_record[3] +# self.rps = raw_record[4] +# self.coldstart_time_per_req=raw_record[5] +# self.waitsche_time_per_req=raw_record[6] +# self.datarecv_time_per_req=raw_record[7] +# self.exe_time_per_req=raw_record[8] +# self.filename = raw_record[9] + + +# # compute sub values by config str +# self.parse_configstr() + +# def parse_configstr(self): +# config_patterns = [ +# (r'sd(\w+)\.rf', 'rand_seed'), +# (r'\.rf(\w+)\.', 'request_freq'), +# (r'\.dt(\w+)\.', 'dag_type'), +# (r'\.cs(\w+)\.', 'cold_start'), +# (r'\.ft(\w+)\.', 'fn_type'), +# (r'\.scl\(([^)]+)\)\(([^)]+)\)\(([^)]+)\)\.', 'scale_num', 'scale_down_exec', 'scale_up_exec'), +# (r'\.scd\(([^)]+)\)', 'sche'), +# (r'\.ic\(([^)]+)\)', 'instance_cache_policy') +# ] + +# for pattern, *keys in config_patterns: +# match = re.search(pattern, self.configstr) +# if match: +# values = match.groups() +# for key, value in zip(keys, values): +# setattr(self, key, value) +# self.print_attributes() + + +# def print_attributes(self): +# attributes = [ +# 'configstr', 'cost_per_req', 'time_per_req', 'score', 'rps', 'filename', +# 'rand_seed', 'request_freq', 'dag_type', 'cold_start', 'fn_type', +# 'scale_num', 'scale_down_exec', 'scale_up_exec', 'sche' +# ] +# for attr in attributes: +# print(f"{attr}={getattr(self, attr)}") + +import records_read +# { +# confstr: [files...] +# } +def get_record_filelist(drawconf): + conf_2_files=records_read.group_by_conf_files() + # filter out we dont care + new={} + for confstr in conf_2_files: + conf=records_read.FlattenConfig(confstr) + confjson=conf.json() + + nomatch_filter=False + + # check match draw filter + for drawfilter in drawconf['filter']: + if drawfilter in confjson: + if confjson[drawfilter]!=drawconf['filter'][drawfilter]: + # continue + nomatch_filter=True + break + + if nomatch_filter: + continue + + + nomatch_targets=True + # check match draw targets_alias + for target in drawconf['targets_alias']: + nomatch_target=False + for targetkey in target[0]: + if targetkey not in confjson: + print("!!! invalid target alias with key",targetkey) + exit(1) + if confjson[targetkey]!=target[0][targetkey]: + # continue + nomatch_target=True + break + if not nomatch_target: + nomatch_targets=False + break + # if invalid: + # continue + if nomatch_targets: + continue + new[confstr]=conf_2_files[confstr] + return new + +# no return +# panic if check failed +def check_first_draw_group_match_avg_cnt(drawconf,conf_2_files): + avg_cnt=drawconf['avg_cnt'] + if avg_cnt==0: + print("!!! avg_cnt should not be 0") + exit(1) + + first_group_k=drawconf['group']['by'] + first_group_v=drawconf['group']['types'][0] + conf_2_files_only_first_group={} + # filter + for confstr in conf_2_files: + conf=records_read.FlattenConfig(confstr) + if getattr(conf,first_group_k)==first_group_v: + conf_2_files_only_first_group[confstr]=conf_2_files[confstr] + + # all group files cnt >= avg_cnt + for confstr in conf_2_files_only_first_group: + if len(conf_2_files_only_first_group[confstr])") + exit(1) + + yamlfilepath=sys.argv[1] + + drawconf=yaml.safe_load(open(yamlfilepath, 'r', encoding='utf-8')) + + print("\n\n get_record_filelist") + conf_2_files=get_record_filelist(drawconf) + + print("\n\n check_first_draw_group_match_avg_cnt") + check_first_draw_group_match_avg_cnt(drawconf,conf_2_files) + + print("\n\n get_each_group_prev_avg_cnt_file__compute_avg") + records=get_each_group_prev_avg_cnt_file__compute_avg(drawconf,conf_2_files) + + print("\n\n flatten records") + records=[records[confstr] for confstr in records] + for record in records: + # record.print_attributes() + print(record.configstr) + # print([r.configstr for r in records]) + + print("\n\n group_records") + groups=group_records(records,drawconf) + + print("\n\n to_draw_meta") + drawmeta=to_draw_meta(groups,drawconf) + + print("\n\n") + pprint(drawmeta) + draw_with_draw_meta(drawmeta,drawconf) + # import matplotlib.pyplot as plt + # from collections import defaultdict + + + # groups = defaultdict(list) + # for record in records: + # key_parts = record[0].split(".") + # common_part = ".".join(key_parts[1:5]) + # algorithm = "".join(key_parts[5:len(key_parts) - 1]) + # algorithm = algorithm.split(")") + # algorithm = ")\n".join(algorithm) + # record[5] = algorithm + # groups[common_part].append(record) + + + # for group_name, group_records in groups.items(): + # data_points = { + # 'Cost': [row[1] for row in group_records], + # 'Latency': [row[2] for row in group_records], + # } + # costs = data_points['Cost'] + # latencies = data_points['Latency'] + # value_for_money = [(1 / latency) * 1 / cost if cost != 0 and latency != 0 else float('inf') for latency, cost in zip(latencies, costs)] # 防止除以零 + # data_points['Performance_Cost'] = value_for_money + + # x_ticks = [row[5] for row in group_records] + + # for key, values in data_points.items(): + # plt.figure() + # bars = plt.bar(range(len(values)), values) + # plt.title(f'Comparison of {key} in {group_name}') + # plt.xlabel('Experiment') + # plt.ylabel(key) + # plt.xticks(range(len(values)), x_ticks, fontsize = 9) + # plt.subplots_adjust(bottom = 0.21) + + # for bar in bars: + # height = bar.get_height() + # plt.text(bar.get_x() + bar.get_width() / 2, height, f'{height:.4f}', ha='center', va='bottom') + + # plt.show() + +pipeline() \ No newline at end of file diff --git a/scripts/draw_diff_load_on_1_figure.yaml b/scripts/draw_diff_load_on_1_figure.yaml new file mode 100644 index 0000000..68b7a66 --- /dev/null +++ b/scripts/draw_diff_load_on_1_figure.yaml @@ -0,0 +1,95 @@ +# 如果需要一张图中画三种负载的实验结果,只需要改下面的 targets_alias 块的内容, +# 与之前的一个负载单张图配置信息有所不同,注意自己看一下怎么写的 targets_alias 配置项 + +avg_cnt: 1 + +## filter with fixed value +filter: +# dag_type: single + cold_start: high +# fn_type: cpu +# scale_down_exec: default. +# # request_freq: low + +## each group bars +targets_alias: +# 整体方案绘图表 +- [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'bp_balance.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'Ours'] +- [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'ensure_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'ensure_scheduler.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'ENSURE'] +- [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'no.', 'scale_down_exec': 'default.', 'scale_up_exec': 'no.', 'sche': 'faasflow.', 'instance_cache_policy': 'no_evict.', 'filter': '', 'no_mech_latency': '1'}, 'faasflow'] +- [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'lass.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'LaSS+LoadLeast'] +- [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'HPA+LoadLeast'] +- [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pass.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'HPA+PASS'] + + +# 扩缩容消融实验绘图表------LoadLeast +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'HPTD+LoadLeast'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'ensure_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'ensure_scheduler.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'ENSURE'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'no.', 'scale_down_exec': 'default.', 'scale_up_exec': 'no.', 'sche': 'faasflow.', 'instance_cache_policy': 'no_evict.', 'filter': '', 'no_mech_latency': '1'}, 'faasflow'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'LaSS+LoadLeast'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'lass.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'HPA+LoadLeast'] + +# 扩缩容消融实验绘图表------bp_balance +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'bp_balance.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'HPTD+DLO'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'ensure_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'ensure_scheduler.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'ENSURE'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'no.', 'scale_down_exec': 'default.', 'scale_up_exec': 'no.', 'sche': 'faasflow.', 'instance_cache_policy': 'no_evict.', 'filter': '', 'no_mech_latency': '1'}, 'faasflow'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'lass.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'bp_balance.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'LaSS+DLO'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'bp_balance.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'HPA+DLO'] + + +# # 调度消融实验------ +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'bp_balance.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'HPA+DLO'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'HPA+LoadLeast'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pass.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'HPA+PASS'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'rotate.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'HPA+RoundRobin'] + + +# 蚂蚁实验---100 dag +# - [{'rand_seed': '', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'bp_balance.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'temp+bp'] +# - [{'rand_seed': '', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pos.greedy', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'temp+pos'] +# - [{'rand_seed': '', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'no.', 'scale_down_exec': 'default.', 'scale_up_exec': 'no.', 'sche': 'hash.', 'instance_cache_policy': 'no_evict.', 'filter': '', 'no_mech_latency': '1'}, 'hash'] +# - [{'rand_seed': '', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'no.', 'scale_down_exec': 'default.', 'scale_up_exec': 'no.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '', 'no_mech_latency': '1'}, 'load_least'] +# - [{'rand_seed': '', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'no.', 'scale_down_exec': 'default.', 'scale_up_exec': 'no.', 'sche': 'rotate.', 'instance_cache_policy': 'no_evict.', 'filter': '', 'no_mech_latency': '1'}, 'rotate'] +# - [{'rand_seed': '', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'hash.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'hpa+hash'] +# - [{'rand_seed': '', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'hpa+LoadLeast'] +# - [{'rand_seed': '', 'dag_type': 'dag', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'rotate.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'hpa+rotate'] + +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pos.greedy', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'temp+pos'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'no.', 'scale_down_exec': 'default.', 'scale_up_exec': 'no.', 'sche': 'hash.', 'instance_cache_policy': 'no_evict.', 'filter': '', 'no_mech_latency': '1'}, 'hash'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'no.', 'scale_down_exec': 'default.', 'scale_up_exec': 'no.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '', 'no_mech_latency': '1'}, 'load_least'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'no.', 'scale_down_exec': 'default.', 'scale_up_exec': 'no.', 'sche': 'rotate.', 'instance_cache_policy': 'no_evict.', 'filter': '', 'no_mech_latency': '1'}, 'rotate'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'hash.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'hpa+hash'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'hpa+LoadLeast'] +# - [{'rand_seed': '', 'dag_type': 'mix', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'rotate.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'hpa+rotate'] + +# - [{'rand_seed': '', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'temp_scaler.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'pos.greedy', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'ours'] +# - [{'rand_seed': '', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'no.', 'scale_down_exec': 'default.', 'scale_up_exec': 'no.', 'sche': 'hash.', 'instance_cache_policy': 'no_evict.', 'filter': '', 'no_mech_latency': '1'}, 'hash'] +# - [{'rand_seed': '', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'no.', 'scale_down_exec': 'default.', 'scale_up_exec': 'no.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '', 'no_mech_latency': '1'}, 'load_least'] +# - [{'rand_seed': '', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'no.', 'scale_down_exec': 'default.', 'scale_up_exec': 'no.', 'sche': 'rotate.', 'instance_cache_policy': 'no_evict.', 'filter': '', 'no_mech_latency': '1'}, 'rotate'] +# - [{'rand_seed': '', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'hash.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'hpa+hash'] +# - [{'rand_seed': '', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'load_least.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'hpa+LoadLeast'] +# - [{'rand_seed': '', 'dag_type': 'single', 'cold_start': 'high', 'fn_type': 'cpu', 'scale_num': 'hpa.', 'scale_down_exec': 'default.', 'scale_up_exec': 'least_task.', 'sche': 'rotate.', 'instance_cache_policy': 'no_evict.', 'filter': '(careful_down.)', 'no_mech_latency': '1'}, 'hpa+rotate'] + + +group on x axis: +group: + by: request_freq + types: [low,middle,high] + alias: 'Request Frequency' + type_alias: ['Low','Middle','High'] + +# group: +# by: cold_start +# types: [high] +# alias: '' +# type_alias: [''] + +## y axis +values: +# - {alias: Quality-Price Ratio, trans: '10/cost_per_req/time_per_req if cost_per_req>0 and time_per_req>0 else 0'} +- {alias: Quality-Price Ratio, trans: '10 * rps/cost_per_req/time_per_req if cost_per_req>0 and time_per_req>0 else 0'} +- {alias: Cost, trans: 100 * cost_per_req} +- {alias: Latency(ms), trans: 'time_per_req'} # convert 10ms to ms +# - {alias: Cold Start Latency(ms), trans: 'coldstart_time_per_req'} # convert 10ms to ms +- {alias: Throuphput, trans: rps*1000} +# - {alias: Avg Container Count, trans: fn_container_cnt} diff --git a/serverless_sim/src/scale/num/ensure_scaler.rs b/serverless_sim/src/scale/num/ensure_scaler.rs index a9eb642..1712975 100644 --- a/serverless_sim/src/scale/num/ensure_scaler.rs +++ b/serverless_sim/src/scale/num/ensure_scaler.rs @@ -1,11 +1,12 @@ use std::cell::{ RefCell }; -use std::collections::{ HashMap, VecDeque }; +use std::collections::{ HashMap, HashSet, VecDeque }; use crate::fn_dag::EnvFnExt; use crate::mechanism::SimEnvObserve; use crate::node::EnvNodeExt; -use crate::with_env_sub::{ WithEnvCore }; +use crate::sim_run::schedule_helper; +use crate::with_env_sub::{ WithEnvCore, WithEnvHelp }; use crate::{ actions::ESActionWrapper, fn_dag::FnId, CONTAINER_BASIC_MEM }; use super::{ down_filter::{ CarefulScaleDownFilter, ScaleFilter }, ScaleNum }; @@ -22,18 +23,40 @@ impl EnsureScaleNum { impl ScaleNum for EnsureScaleNum { fn scale_for_fn(&mut self, env: &SimEnvObserve, fnid: FnId, _action: &ESActionWrapper) -> usize { - + + let mut need_to_schedule = false; + // 找到这一帧需要调度的函数 + for (_req_id, req) in env.core().requests_mut().iter_mut() { + let schedule_able_fns = schedule_helper::collect_task_to_sche( + req, + env, + schedule_helper::CollectTaskConfig::All, + ); + for sche_fnid in schedule_able_fns.iter() { + if sche_fnid == &fnid { + need_to_schedule = true; + } + } + } + + let current_frame = env.core().current_frame(); + // 当前容器数量 let cur_container_cnt = env.fn_container_cnt(fnid); // 取cur_container_cnt的根号 let sqrt_container_cnt = (cur_container_cnt as f64).sqrt().ceil() as usize; - if cur_container_cnt + sqrt_container_cnt == 0 { - 1 - } - else { - cur_container_cnt + sqrt_container_cnt + if need_to_schedule || cur_container_cnt == 0 { + + if cur_container_cnt + sqrt_container_cnt == 0{ + 1 + } + else { + cur_container_cnt + sqrt_container_cnt + } + }else { + cur_container_cnt } } diff --git a/serverless_sim/src/scale/num/temp_scaler.rs b/serverless_sim/src/scale/num/temp_scaler.rs index 453071e..78c8c51 100644 --- a/serverless_sim/src/scale/num/temp_scaler.rs +++ b/serverless_sim/src/scale/num/temp_scaler.rs @@ -8,6 +8,7 @@ use std::collections::{ HashMap, VecDeque }; use crate::fn_dag::EnvFnExt; use crate::mechanism::SimEnvObserve; use crate::node::EnvNodeExt; +use crate::sim_run::schedule_helper; use crate::with_env_sub::{ WithEnvCore }; use crate::{ actions::ESActionWrapper, fn_dag::FnId, CONTAINER_BASIC_MEM }; @@ -199,15 +200,29 @@ impl ScaleNum for TempScaleNum { let requests = env.core().requests(); // 遍历所有请求,只看当前帧到达的请求 - for (_, req) in requests.iter().filter(|(_, req)| req.begin_frame == current_frame) { - // 拿到该请求对应的DAG - let mut walker = env.dag(req.dag_i).new_dag_walker(); - // 遍历DAG里面的所有图节点 - while let Some(fngid) = walker.next(&env.dag(req.dag_i).dag_inner) { - // 得到该图节点对应的函数 - let fnid_in_dag = env.dag_inner(req.dag_i)[fngid]; - // 累加当前函数到达的次数 - if fnid_in_dag == fnid { + for (_, req) in requests.iter(){ + // for (_, req) in requests.iter().filter(|(_, req)| req.begin_frame == current_frame) { + // // 拿到该请求对应的DAG + // let mut walker = env.dag(req.dag_i).new_dag_walker(); + // // 遍历DAG里面的所有图节点 + // while let Some(fngid) = walker.next(&env.dag(req.dag_i).dag_inner) { + // // 得到该图节点对应的函数 + // let fnid_in_dag = env.dag_inner(req.dag_i)[fngid]; + // // 累加当前函数到达的次数 + // if fnid_in_dag == fnid { + // fn_count += 1; + // } + // } + + // 收集该请求中所有可以执行的函数 + let schedule_able_fns = schedule_helper::collect_task_to_sche( + req, + env, + schedule_helper::CollectTaskConfig::PreAllDone, + ); + + for fnid_in_req in schedule_able_fns { + if fnid_in_req == fnid { fn_count += 1; } } @@ -293,7 +308,8 @@ impl ScaleNum for TempScaleNum { if temp_change.abs() > threshold { // MARK 该增率的计算方式与论文中所写的不一致,后续有时间应该进一步实验测试对比一下现计算方式和论文中所写计算方式的优劣 // 计算容器数量的增率 - let container_inc_rate = temp_change.abs() / threshold; + // let container_inc_rate = temp_change.abs() / threshold; + let container_inc_rate = temp_change.abs() / temp_history_mean; // 统计目前已有的函数实例数量 let mut fn_instance_cnt = 0; @@ -322,10 +338,11 @@ impl ScaleNum for TempScaleNum { if temp_change > 0.0 { // MARK 该增量的计算方式与论文中所写的不一致,后续有时间应该进一步实验测试对比一下现计算方式和论文中所写计算方式的优劣 // 根据温度增量计算容器数量的增量 - let container_change = ( - (fn_instance_cnt as f64) * - (container_inc_rate - 1.0) - ).ceil() as i32; + // let container_change = ( + // (fn_instance_cnt as f64) * + // (container_inc_rate - 1.0) + // ).ceil() as i32; + let container_change = ((fn_instance_cnt as f64) * container_inc_rate).ceil() as i32; // 如果所需要的实例数量大于空闲的实例数量,则进行扩容 if container_change >= idle_fn_instance_cnt { @@ -362,7 +379,8 @@ impl ScaleNum for TempScaleNum { // 设置机制来处理 温度感知器没反应,但是函数在持续缓慢升温/降温的情况----------------------------------------------------- // 获取当前函数的所有容器,计算平均cpu、mem利用率 - if !scale_sign && cur_container_cnt != 0 { + // MARK 修改一处,增加 && fn_count > 0 + if !scale_sign && cur_container_cnt != 0 && fn_count > 0{ let mut container_avg_cpu_util = 0.0; let mut container_avg_mem_util = 0.0; @@ -397,19 +415,15 @@ impl ScaleNum for TempScaleNum { } // 对于容器数量为0的函数,如果最后一次调用距离现在的长度小于历史调用窗口长度,则变为一个容器 - if desired_container_cnt == 0 && last_call_frame + self.call_history_window_len >= current_frame { + if fn_count > 0 && desired_container_cnt == 0 { desired_container_cnt = 1; } // 对于容器数量是1的函数,如果最后一次调用距离现在的长度大于历史调用窗口长度,则缩容为0个容器 - else if desired_container_cnt == 1 && last_call_frame + self.call_history_window_len < current_frame { + else if desired_container_cnt == 1 && last_call_frame + self.call_history_window_len < 20 { + assert!(fn_count == 0); desired_container_cnt = 0; } - // log::info!("函数:{}, 在第{}帧的目标容器数量为:{}.scale_for_fn()结束", fnid, current_frame, desired_container_cnt); - - // log::info!("扩缩容器决策升温 {} 次", self.decide_to_up_count); - // log::info!("扩缩容器决策降温 {} 次", self.decide_to_down_count); - // log::info!("mem决策升温 {} 次", self.mem_decide_to_up_count); desired_container_cnt } diff --git a/serverless_sim/src/sche/bp_balance.rs b/serverless_sim/src/sche/bp_balance.rs index a7a5728..33b10d5 100644 --- a/serverless_sim/src/sche/bp_balance.rs +++ b/serverless_sim/src/sche/bp_balance.rs @@ -249,7 +249,7 @@ impl BpBalanceScheduler { } - if self.binpack_map.get(&fnid).unwrap().len() == 0 { + if self.binpack_map.get(&fnid).unwrap().len() == 0 && fn_scale_up_cmds.len() != 0 { panic!("fnid:{}, last_nodes_len:{}", fnid, self.latest_nodes.get(&fnid).unwrap().len()); } @@ -381,7 +381,7 @@ impl Scheduler for BpBalanceScheduler { } } // 如果需要缩容 - else if target < cur && (cur != 1 || !self.need_schedule_fn.contains(&func.fn_id)) { + else if target < cur && (cur > 1 || !self.need_schedule_fn.contains(&func.fn_id)) { // 标记可以开始bp机制 if self.mech_impl_sign.get(&func.fn_id).unwrap() == &false { log::info!("fn_id: {}, 在第 {} 帧触发机制", func.fn_id, env.core().current_frame()); @@ -440,14 +440,14 @@ impl Scheduler for BpBalanceScheduler { let binpack = self.binpack_map.get(&func.fn_id).unwrap(); // 如果扩缩容器没有缩容,那么遍历每个容器,对binpack数组外的容器进行超时缩容------------------------------------------ - if scale_down_sign == false { + if /* scale_down_sign == false */ true { env.fn_containers_for_each(func.fn_id, |container| { // 对于不是binpack数组中的节点,进行超时缩容 if !binpack.contains(&container.node_id) { // 如果该容器最近50帧都是空闲则缩容 - if container.recent_frame_is_idle(50) && container.req_fn_state.len() == 0 { + if container.recent_frame_is_idle(20) && container.req_fn_state.len() == 0 { // 发送缩容命令 cmd_distributor diff --git a/serverless_sim/src/sche/ensure_scheduler.rs b/serverless_sim/src/sche/ensure_scheduler.rs index 7e0d703..5931c02 100644 --- a/serverless_sim/src/sche/ensure_scheduler.rs +++ b/serverless_sim/src/sche/ensure_scheduler.rs @@ -4,12 +4,7 @@ use std::{ }; use crate::{ - fn_dag::{EnvFnExt, FnId}, - mechanism::{DownCmd, MechType, MechanismImpl, ScheCmd, SimEnvObserve}, - mechanism_thread::{MechCmdDistributor, MechScheduleOnceRes}, - node::{self, EnvNodeExt, Node, NodeId}, - sim_run::{schedule_helper, Scheduler}, - with_env_sub::WithEnvCore, + fn_dag::{EnvFnExt, FnId}, mechanism::{DownCmd, MechType, MechanismImpl, ScheCmd, SimEnvObserve}, mechanism_thread::{MechCmdDistributor, MechScheduleOnceRes}, node::{self, EnvNodeExt, Node, NodeId}, sche, sim_run::{schedule_helper, Scheduler}, with_env_sub::{WithEnvCore, WithEnvHelp} }; struct NodeCpuResc { @@ -88,7 +83,7 @@ impl Scheduler for EnsureScheduler { let schedule_able_fns = schedule_helper::collect_task_to_sche( req, env, - schedule_helper::CollectTaskConfig::PreAllSched, + schedule_helper::CollectTaskConfig::All, ); for fnid in schedule_able_fns.iter() { need_schedule_fn.insert(*fnid); @@ -117,11 +112,23 @@ impl Scheduler for EnsureScheduler { nodes.insert(cmd.nid); } } + else if target == 0 && need_schedule_fn.contains(&func.fn_id) { + let up_cmd = mech.scale_up_exec().exec_scale_up( + 1, + func.fn_id, env, + cmd_distributor + ); + + // 实时更新函数的节点情况 + for cmd in up_cmd.iter() { + nodes.insert(cmd.nid); + } + } if !need_schedule_fn.contains(&func.fn_id) { env.fn_containers_for_each(func.fn_id, |container| { // 如果该容器最近50帧都是空闲则缩容 - if container.recent_frame_is_idle(50) && container.req_fn_state.len() == 0 { + if container.recent_frame_is_idle(20) && container.req_fn_state.len() == 0 { // 发送缩容命令 cmd_distributor .send(MechScheduleOnceRes::ScaleDownCmd(DownCmd @@ -136,7 +143,7 @@ impl Scheduler for EnsureScheduler { }); } - log::info!("fn {}, nodes.len() = {}", func.fn_id, nodes.len()); + // log::info!("fn {}, nodes.len() = {}", func.fn_id, nodes.len()); self.fn_nodes.insert(func.fn_id, nodes.clone()); } @@ -149,12 +156,18 @@ impl Scheduler for EnsureScheduler { //迭代请求中的函数,选择最合适的节点进行调度 for fnid in fns { - let sche_nodeid = self.select_best_node_to_fn(fnid, env); + let nodes = self.fn_nodes.get(&fnid).unwrap(); - log::info!("schedule fn {} to node {}", fnid, sche_nodeid); + let mut sche_nodeid = self.select_best_node_to_fn(fnid, env); - if sche_nodeid != 9999 { - cmd_distributor + log::info!("schedule fn {} to node {}. nodes.len() = {}", fnid, sche_nodeid, nodes.len()); + + if sche_nodeid == 9999 { + assert!(nodes.len() == 0); + sche_nodeid = env.core().current_frame() % env.core().nodes().len(); + } + + cmd_distributor .send(MechScheduleOnceRes::ScheCmd(ScheCmd { nid: sche_nodeid, reqid: req.req_id, @@ -162,8 +175,8 @@ impl Scheduler for EnsureScheduler { memlimit: None, })) .unwrap(); - self.node_cpu_usage.get_mut(&sche_nodeid).unwrap().all_task_cnt += 1.0; - } + self.node_cpu_usage.get_mut(&sche_nodeid).unwrap().all_task_cnt += 1.0; + } } diff --git a/serverless_sim/src/sche/load_least.rs b/serverless_sim/src/sche/load_least.rs index 0a1f4d4..9df1a50 100644 --- a/serverless_sim/src/sche/load_least.rs +++ b/serverless_sim/src/sche/load_least.rs @@ -79,12 +79,15 @@ impl Scheduler for LoadLeastScheduler { //迭代请求中的函数,选择最合适的节点进行调度 for fnid in fns { - let sche_nodeid = self.select_best_node_to_fn(fnid, env); + let mut sche_nodeid = self.select_best_node_to_fn(fnid, env); log::info!("schedule fn {} to node {}", fnid, sche_nodeid); - if sche_nodeid != 9999 { - cmd_distributor + if sche_nodeid == 9999 { + assert!(self.fn_nodes.get(&fnid).unwrap().len() == 0); + sche_nodeid = env.core().current_frame() % env.core().nodes().len(); + } + cmd_distributor .send(MechScheduleOnceRes::ScheCmd(ScheCmd { nid: sche_nodeid, reqid: req.req_id, @@ -93,9 +96,8 @@ impl Scheduler for LoadLeastScheduler { })) .unwrap(); - let tasks_cnt = self.node_cpu_usage.get(&sche_nodeid).unwrap(); - self.node_cpu_usage.insert(sche_nodeid, tasks_cnt + 1); - } + let tasks_cnt = self.node_cpu_usage.get(&sche_nodeid).unwrap(); + self.node_cpu_usage.insert(sche_nodeid, tasks_cnt + 1); } } diff --git a/serverless_sim/src/sche/pass.rs b/serverless_sim/src/sche/pass.rs index 4fef07f..73e778e 100644 --- a/serverless_sim/src/sche/pass.rs +++ b/serverless_sim/src/sche/pass.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use daggy::Walker; use rand::Rng; @@ -90,19 +90,44 @@ impl PassScheduler { env: &SimEnvObserve, ) { let func = env.func(func_id); + + // let mut nodes = HashSet::new(); + let nodes = env.core().nodes(); + let nodes_sche = env + .core() + .fn_2_nodes() + .get(&func.fn_id) + .map(|v| v.clone().into_iter().collect()) + .unwrap_or_else(Vec::new); + + let mut nodes_len = 0; + + if nodes_sche.len() == 0 { + nodes_len = nodes.len(); + } + else { + nodes_len = nodes_sche.len(); + } let func_pres_id = func.parent_fns(env); log::info!("func {} pres {:?}", func_id, func_pres_id); if func_pres_id.len() == 0 { let mut rng = rand::thread_rng(); - let rand = rng.gen_range(0..nodes.len()); + let rand = rng.gen_range(0..nodes_len); + let mut sche_nodeid = rand; + + if nodes_sche.len() != 0 { + sche_nodeid = nodes_sche[rand]; + } + + // let rand = rng.gen_range(0..nodes.len()); schedule_to_map.insert(func_id, rand); // schedule_to.push((func_id, rand)); cmd_distributor .send(MechScheduleOnceRes::ScheCmd(ScheCmd { - nid: rand, + nid: sche_nodeid, reqid: req.req_id, fnid: func_id, memlimit: None, diff --git a/serverless_sim/src/sche/rotate.rs b/serverless_sim/src/sche/rotate.rs index 3dbe36c..5b845ee 100644 --- a/serverless_sim/src/sche/rotate.rs +++ b/serverless_sim/src/sche/rotate.rs @@ -58,6 +58,7 @@ impl RotateScheduler { if !node_list.is_empty() { node_id = node_list[(self.last_schedule_node_id + 1) % node_list.len()]; + self.last_schedule_node_id = node_id; } cmd_distributor