From a48928d7434e6bf651be9d524a0339dbc49525dd Mon Sep 17 00:00:00 2001 From: Ting Date: Thu, 16 Oct 2025 16:16:21 +0800 Subject: [PATCH 1/3] hardcoding cpt dataset. --- paddleformers/data/causal_dataset.py | 72 ++++++++++++++++++++++++++-- 1 file changed, 69 insertions(+), 3 deletions(-) diff --git a/paddleformers/data/causal_dataset.py b/paddleformers/data/causal_dataset.py index d3789287649..c680c673b67 100644 --- a/paddleformers/data/causal_dataset.py +++ b/paddleformers/data/causal_dataset.py @@ -26,6 +26,51 @@ # return None +def get_logits(batch_ids, max_retries=3, timeout=120, retry_delay=5, prob_nums=20): + """ + 获取logits,如果超过规定时间没有返回则重试 + + 参数: + batch_ids: 输入的token ids + max_retries: 最大重试次数 (默认3次) + timeout: 请求超时时间(秒) (默认10秒) + retry_delay: 重试间隔时间(秒) (默认1秒) + + 返回: + tuple: (logits, ids) + 抛出: + Exception: 当所有重试都失败后抛出异常 + """ + input_len = len(batch_ids[0]) + return paddle.randn([input_len, 103424], dtype="bfloat16") + # headers = {"Content_Type": "application/json"} + # url = "http://10.85.158.107:8118/generate" + # payload = { + # "prompt_token_ids": batch_ids, + # "max_tokens": 1, + # "top_p": 1, + # "top_k": -1, + # "temperature": 1, + # "prompt_logprobs": prob_nums, + # } + + # for attempt in range(max_retries): + # try: + # response = requests.post(url=url, json=payload, headers=headers, timeout=timeout) + # response.raise_for_status() # 检查HTTP错误 + + # buffer = io.BytesIO(response.content) + # data = paddle.load(buffer) + # return data["logits"], data["ids"] + + # except (requests.exceptions.RequestException, IOError) as e: + # if attempt == max_retries - 1: # 最后一次尝试失败 + # raise Exception(f"Failed after {max_retries} attempts. Last error: {str(e)}") + + # print(f"Attempt {attempt + 1} failed. Retrying in {retry_delay} second(s)... Error: {str(e)}") + # time.sleep(retry_delay) + + def check_data_split(splits_string, do_train, do_eval, do_predict): splits = [] if splits_string.find(",") != -1: @@ -146,6 +191,7 @@ def build_train_valid_test_datasets( # Blending dataset. # Parse the values. output = get_datasets_weights_and_num_samples(data_prefix, train_val_test_num_samples) + print("TINGLSLS1:", output) prefixes, weights, datasets_train_valid_test_num_samples = output # NOTE: megatron/gpt_dataset.py has been updated. When creating BlendableDataset, we will use the raw train_val_test_num_samples instead of the expanded ones. # Please refer to https://github.com/NVIDIA/NeMo/blob/72f630d087d45655b1a069dc72debf01dfdbdb2d/nemo/collections/nlp/data/language_modeling/megatron/gpt_dataset.py#L74-L80 for more information @@ -214,6 +260,10 @@ def _build_train_valid_test_datasets( need_data=True, ): """Build train, valid, and test datasets.""" + CPT = False + if data_prefix.endswith("::CPT"): + data_prefix = data_prefix[: -len("::CPT")] + CPT = True # Indexed dataset. if need_data: @@ -254,6 +304,7 @@ def build_dataset(index, name): share_folder, data_cache_path=data_cache_path, need_data=need_data, + CPT=CPT, ) if need_data: return dataset if splits[index + 1] > splits[index] else None @@ -295,11 +346,13 @@ def __init__( *, data_cache_path=None, need_data=True, + CPT=False, ): self.name = name self.indexed_dataset = indexed_dataset self.return_doc_ids = return_doc_ids + self.CPT = CPT # Build index mappings. if need_data and len(documents) > 0: @@ -398,20 +451,33 @@ def __getitem__(self, idx): if append_mask: mask = np.concatenate(mask_list) # print(sample) + kl_logits = get_logits([sample]) if self.return_doc_ids: # for retro preprocessing if mask is None: - return {"text": np.array(sample, dtype=np.int64), "doc_ids": np.array(doc_ids, dtype=np.int64)} + return { + "text": np.array(sample, dtype=np.int64), + "doc_ids": np.array(doc_ids, dtype=np.int64), + "logits": kl_logits, + "CPT": self.CPT, + } else: return { "text": np.array(sample, dtype=np.int64), "doc_ids": np.array(doc_ids, dtype=np.int64), "mask": np.array(mask, dtype=np.int64), + "logits": kl_logits, + "CPT": self.CPT, } else: if mask is None: - return {"text": np.array(sample, dtype=np.int64)} + return {"text": np.array(sample, dtype=np.int64), "logits": kl_logits, "CPT": self.CPT} else: - return {"text": np.array(sample, dtype=np.int64), "mask": np.array(mask, dtype=np.int64)} + return { + "text": np.array(sample, dtype=np.int64), + "mask": np.array(mask, dtype=np.int64), + "logits": kl_logits, + "CPT": self.CPT, + } def _build_index_mappings( From 5c6668176728479aec90ecbe7a3276e754899036 Mon Sep 17 00:00:00 2001 From: Ting Date: Thu, 23 Oct 2025 14:56:36 +0800 Subject: [PATCH 2/3] cpt logit server. --- paddleformers/data/causal_dataset.py | 128 ++++++++++++++++----------- 1 file changed, 77 insertions(+), 51 deletions(-) diff --git a/paddleformers/data/causal_dataset.py b/paddleformers/data/causal_dataset.py index c680c673b67..7453a4e1446 100644 --- a/paddleformers/data/causal_dataset.py +++ b/paddleformers/data/causal_dataset.py @@ -13,6 +13,8 @@ from .indexed_dataset import make_dataset as make_indexed_dataset local_rank = int(os.getenv("PADDLE_RANK_IN_NODE", 0)) +INFER_SERVER_IP = os.getenv("INFER_SERVER_IP", "127.0.0.1") +INFER_SERVER_PORT = os.getenv("INFER_SERVER_PORT", "8008") # class FakeHCG: @@ -25,50 +27,54 @@ # def get_model_parallel_group(self): # return None +import pickle -def get_logits(batch_ids, max_retries=3, timeout=120, retry_delay=5, prob_nums=20): +import requests + + +def get_logits(batch_ids, max_retries=1, timeout=1200, retry_delay=1, prob_nums=10): """ - 获取logits,如果超过规定时间没有返回则重试 + Retrieve logits with retry mechanism if no response is received within the specified time - 参数: - batch_ids: 输入的token ids - max_retries: 最大重试次数 (默认3次) - timeout: 请求超时时间(秒) (默认10秒) - retry_delay: 重试间隔时间(秒) (默认1秒) + Parameters: + batch_ids: Input token ids + max_retries: Maximum number of retry attempts (default: 1) + timeout: Request timeout in seconds (default: 1200 seconds) + retry_delay: Delay between retries in seconds (default: 1 second) + prob_nums: Number of probabilities to return - 返回: + Returns: tuple: (logits, ids) - 抛出: - Exception: 当所有重试都失败后抛出异常 + Raises: + Exception: Thrown when all retry attempts fail """ - input_len = len(batch_ids[0]) - return paddle.randn([input_len, 103424], dtype="bfloat16") - # headers = {"Content_Type": "application/json"} - # url = "http://10.85.158.107:8118/generate" - # payload = { - # "prompt_token_ids": batch_ids, - # "max_tokens": 1, - # "top_p": 1, - # "top_k": -1, - # "temperature": 1, - # "prompt_logprobs": prob_nums, - # } - - # for attempt in range(max_retries): - # try: - # response = requests.post(url=url, json=payload, headers=headers, timeout=timeout) - # response.raise_for_status() # 检查HTTP错误 - - # buffer = io.BytesIO(response.content) - # data = paddle.load(buffer) - # return data["logits"], data["ids"] - - # except (requests.exceptions.RequestException, IOError) as e: - # if attempt == max_retries - 1: # 最后一次尝试失败 - # raise Exception(f"Failed after {max_retries} attempts. Last error: {str(e)}") - - # print(f"Attempt {attempt + 1} failed. Retrying in {retry_delay} second(s)... Error: {str(e)}") - # time.sleep(retry_delay) + + headers = {"Content_Type": "application/json"} + url = f"http://{INFER_SERVER_IP}:{INFER_SERVER_PORT}/generate" + payload = { + "prompt_token_ids": batch_ids, + "max_tokens": 1, + "top_p": 1, + "top_k": -1, + "temperature": 1, + "prompt_logprobs": prob_nums, + "logprobs": prob_nums, + } + + for attempt in range(max_retries): + try: + response = requests.post(url=url, json=payload, headers=headers, timeout=timeout) + response.raise_for_status() # 检查HTTP错误 + + data = pickle.loads(response.content) + all_token = data.get("logits", []) + all_ids = data.get("ids", []) + all_token = paddle.to_tensor(all_token, dtype="bfloat16") + all_ids = paddle.to_tensor(all_ids, dtype="int64") + return all_token, all_ids + + except (requests.exceptions.RequestException, IOError): + time.sleep(retry_delay) def check_data_split(splits_string, do_train, do_eval, do_predict): @@ -170,6 +176,8 @@ def build_train_valid_test_datasets( *, data_cache_path=None, need_data=True, + self_constraint_cpt=False, + prob_nums=10, ): """Build train, valid, and test datasets.""" @@ -186,12 +194,13 @@ def build_train_valid_test_datasets( share_folder=share_folder, data_cache_path=data_cache_path, need_data=need_data, + self_constraint_cpt=self_constraint_cpt, + prob_nums=prob_nums, ) # Blending dataset. # Parse the values. output = get_datasets_weights_and_num_samples(data_prefix, train_val_test_num_samples) - print("TINGLSLS1:", output) prefixes, weights, datasets_train_valid_test_num_samples = output # NOTE: megatron/gpt_dataset.py has been updated. When creating BlendableDataset, we will use the raw train_val_test_num_samples instead of the expanded ones. # Please refer to https://github.com/NVIDIA/NeMo/blob/72f630d087d45655b1a069dc72debf01dfdbdb2d/nemo/collections/nlp/data/language_modeling/megatron/gpt_dataset.py#L74-L80 for more information @@ -214,6 +223,8 @@ def build_train_valid_test_datasets( share_folder=share_folder, data_cache_path=data_cache_path, need_data=need_data, + self_constraint_cpt=self_constraint_cpt, + prob_nums=prob_nums, ) if train_ds: train_datasets.append(train_ds) @@ -258,6 +269,8 @@ def _build_train_valid_test_datasets( *, data_cache_path=None, need_data=True, + self_constraint_cpt=False, + prob_nums=10, ): """Build train, valid, and test datasets.""" CPT = False @@ -305,6 +318,8 @@ def build_dataset(index, name): data_cache_path=data_cache_path, need_data=need_data, CPT=CPT, + self_constraint_cpt=self_constraint_cpt, + prob_nums=prob_nums, ) if need_data: return dataset if splits[index + 1] > splits[index] else None @@ -347,12 +362,16 @@ def __init__( data_cache_path=None, need_data=True, CPT=False, + self_constraint_cpt=False, + prob_nums=10, ): self.name = name self.indexed_dataset = indexed_dataset self.return_doc_ids = return_doc_ids self.CPT = CPT + self.self_constraint_cpt = self_constraint_cpt + self.prob_nums = prob_nums # Build index mappings. if need_data and len(documents) > 0: @@ -450,35 +469,42 @@ def __getitem__(self, idx): sample = np.concatenate(sample_list) if append_mask: mask = np.concatenate(mask_list) - # print(sample) - kl_logits = get_logits([sample]) + + kl_logits, kl_ids = None, None + if self.self_constraint_cpt: + kl_logits, kl_ids = get_logits([sample.tolist()], prob_nums=self.prob_nums) + + res = None if self.return_doc_ids: # for retro preprocessing if mask is None: - return { + res = { "text": np.array(sample, dtype=np.int64), "doc_ids": np.array(doc_ids, dtype=np.int64), - "logits": kl_logits, - "CPT": self.CPT, } else: - return { + res = { "text": np.array(sample, dtype=np.int64), "doc_ids": np.array(doc_ids, dtype=np.int64), "mask": np.array(mask, dtype=np.int64), - "logits": kl_logits, - "CPT": self.CPT, } else: if mask is None: - return {"text": np.array(sample, dtype=np.int64), "logits": kl_logits, "CPT": self.CPT} + res = {"text": np.array(sample, dtype=np.int64)} else: - return { + res = { "text": np.array(sample, dtype=np.int64), "mask": np.array(mask, dtype=np.int64), - "logits": kl_logits, - "CPT": self.CPT, } + if self.self_constraint_cpt: + res.update = { + "logits": kl_logits, + "ids": kl_ids, + "CPT": self.CPT, + } + + return res + def _build_index_mappings( name, data_prefix, documents, sizes, splits_string, num_samples, seq_length, seed, share_folder, *, data_cache_path From a893234bd1c8a9d58997123c42def99e92bd9f1a Mon Sep 17 00:00:00 2001 From: Ting Date: Thu, 23 Oct 2025 17:09:25 +0800 Subject: [PATCH 3/3] fix try exception. --- paddleformers/data/causal_dataset.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/paddleformers/data/causal_dataset.py b/paddleformers/data/causal_dataset.py index 7453a4e1446..fc5169877ad 100644 --- a/paddleformers/data/causal_dataset.py +++ b/paddleformers/data/causal_dataset.py @@ -73,7 +73,9 @@ def get_logits(batch_ids, max_retries=1, timeout=1200, retry_delay=1, prob_nums= all_ids = paddle.to_tensor(all_ids, dtype="int64") return all_token, all_ids - except (requests.exceptions.RequestException, IOError): + except (requests.exceptions.RequestException, IOError) as e: + if attempt == max_retries - 1: + raise Exception(f"Failed after {max_retries} attempts. Last error: {str(e)}") time.sleep(retry_delay)