From 15c53ebe903bedec04b3fe4971188e0535a00c96 Mon Sep 17 00:00:00 2001 From: Sahan Paliskara Date: Mon, 23 Jun 2025 16:45:07 -0400 Subject: [PATCH 1/3] Add deduplication logic --- README.md | 15 ++ dedup.py | 398 +++++++++++++++++++++++++++++++++++++++++++++++++ export.py | 28 +++- test_dedup.py | 402 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 842 insertions(+), 1 deletion(-) create mode 100644 dedup.py create mode 100644 test_dedup.py diff --git a/README.md b/README.md index 54fe6b8..0113e64 100644 --- a/README.md +++ b/README.md @@ -34,3 +34,18 @@ python export.py The script will create a directory at the specified output path containing the dataset in Parquet format. If `--output_dir` is not provided, it will save to `dataset` in the current working directory. +## Tests +The deduplication scripts can be tested by running +```bash +python test_dedup.py +# if you have pytest you can run +python -m pytest test_dedup.py -v +``` +To test things we actually create a fake dataset. Here are the features of it +The test creates a 50-entry dataset with: +- **Exact duplicates**: First 5 entries use identical code +- **Fuzzy duplicates**: Next 5 entries use similar code with small variations +- **Multiple run modes**: `leaderboard`, `test`, `benchmark` +- **Mixed success states**: Both `True` and `False` values for `run_passed` +- **Realistic struct data**: Complex nested structures for `run_result`, `run_compilation`, `run_meta`, and `run_system_info` +- **Proper timestamps**: All timestamp fields include timezone information diff --git a/dedup.py b/dedup.py new file mode 100644 index 0000000..b316e38 --- /dev/null +++ b/dedup.py @@ -0,0 +1,398 @@ +# script to dedup a huggingface dataset + +from datasets import load_dataset +import tqdm +from collections import defaultdict +import hashlib +from typing import Dict, List, Tuple, Union + +import datasketch +import pandas as pd + +def remove_duplicates(data_dict: Dict[str, Dict[bool, Dict[Union[float, int], List[Dict]]]]): + """ + Remove exact duplicates from the nested data structure returned by get_sorted_hf_data. + + Args: + data_dict: Nested dictionary structure from get_sorted_hf_data + + Returns: + Dictionary with same structure but duplicates removed + """ + deduplicated_dict = {} + + for run_mode, score_duration_dict in tqdm.tqdm(data_dict.items(), desc="Processing run modes"): + deduplicated_dict[run_mode] = {} + + for run_success, run_success_dict in tqdm.tqdm(score_duration_dict.items(), desc=f"Processing {run_mode}", leave=False): + deduplicated_dict[run_mode][run_success] = {} + for score_duration, rows in tqdm.tqdm(run_success_dict.items(), desc=f"Processing {run_mode}", leave=False): + # Use a dictionary to track unique entries by their content hash + unique_entries = {} + + for row in tqdm.tqdm(rows, desc=f"Processing {run_mode} {score_duration}", leave=False): + # Create a hash of the relevant content (assuming 'input' or similar field exists) + # If the row has an 'input' field, use that; otherwise use the entire row + content = row.get('code', "") + content_hash = hashlib.sha256(content.encode()).hexdigest() + + if content_hash not in unique_entries: + unique_entries[content_hash] = row + else: + # If duplicate found, keep the one with better metrics + existing_row = unique_entries[content_hash] + + # For leaderboard mode with successful runs, prefer higher scores + if run_mode == 'leaderboard' and row.get('run_passed') == True: + if row.get('run_score', 0) > existing_row.get('run_score', 0): + unique_entries[content_hash] = row + # For other cases, prefer shorter duration (faster execution) + else: + existing_duration = existing_row.get('run_meta', {}).get('duration', float('inf')) + current_duration = row.get('run_meta', {}).get('duration', float('inf')) + if current_duration < existing_duration: + unique_entries[content_hash] = row + + deduplicated_dict[run_mode][run_success][score_duration] = list(unique_entries.values()) + + return deduplicated_dict + + +def create_minhashes( + documents: List[Dict[str, str]], + ngram_size: int = 5, + bands: int = 20, + rows_per_band: int = 128, +) -> Tuple[Dict[str, datasketch.MinHash], int]: + """ + Create MinHash signatures for a list of documents with LSH bands configuration. + + Args: + documents: List of dictionaries, each containing 'submission_id' and 'input' keys + num_permutations: Number of hash functions to use (default: 100) + ngram_size: Size of n-grams to generate from input text (default: 3) + bands: Number of bands for LSH (default: 20) + + Returns: + Tuple containing: + - Dictionary mapping document submission_ids to their MinHash signatures + - Rows per band (num_permutations / bands) + + Raises: + ValueError: If num_permutations is not divisible by bands + """ + + num_permutations = rows_per_band * bands + + def generate_ngrams(text: str, n: int) -> List[str]: + """Generate n-grams from input text.""" + return [text[i : i + n] for i in range(len(text) - n + 1)] + + # Initialize result dictionary + minhash_dict = {} + # Process each document + for doc in tqdm.tqdm(documents, desc="Creating minhashes"): + minhash = datasketch.MinHash(num_perm=num_permutations) + submission_id = doc["submission_id"] + text = doc["code"].lower() # Convert to lowercase for consistency + + # Generate n-grams + ngrams = generate_ngrams(text, ngram_size) + for ngram in ngrams: + minhash.update(ngram.encode("utf8")) + + minhash_dict[submission_id] = minhash + + return minhash_dict + + +# 16 bands with 128 rows +def create_similarity_matrix( + minhashes: Dict[str, datasketch.MinHash], + rows_per_band: int, + num_bands: int, + threshold: float, +) -> Dict[str, List[str]]: + lsh = datasketch.MinHashLSH(threshold=threshold, num_perm=num_bands * rows_per_band) + print(f"num_perm: {num_bands*rows_per_band}") + similarity_matrix = {} + for submission_id, minhash in tqdm.tqdm(minhashes.items(), desc="Inserting minhashes into LSH"): + lsh.insert(submission_id, minhash) + for submission_id, minhash in tqdm.tqdm(minhashes.items(), desc="Querying LSH"): + similar_submission_ids = lsh.query(minhash) + similarity_matrix[submission_id] = similar_submission_ids + for submission_id, similar_submission_ids in tqdm.tqdm( + similarity_matrix.items(), desc="Removing self-similarities" + ): + if submission_id in similar_submission_ids: + similar_submission_ids.remove(submission_id) + return similarity_matrix + + +def filter_matrix( + similarity_matrix: Dict[str, List[str]] +) -> set: + good_submission_ids = set() + processed = set() + + for submission_id, similar_submission_ids in similarity_matrix.items(): + if submission_id in processed: + continue + + # Find all submissions in the similarity cluster + cluster = {submission_id} + cluster.update(similar_submission_ids) + + # Keep the one with the largest ID (tiebreaker) + keeper = max(cluster) + good_submission_ids.add(keeper) + + # Mark all in cluster as processed + processed.update(cluster) + + return good_submission_ids + + +def fuzzy_filter( + data_dict: Dict[str, Dict[bool, Dict[Union[float, int], List[Dict]]]], + threshold: float = 0.7, + ngram_size: int = 5, + bands: int = 16, + rows_per_band: int = 128, +) -> Dict[str, Dict[bool, Dict[Union[float, int], List[Dict]]]]: + + total_categories = 0 + for run_mode, run_success_dict in data_dict.items(): + for run_success, score_duration_dict in run_success_dict.items(): + for score_duration, rows in score_duration_dict.items(): + total_categories += 1 + + deduped_data = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) + current_category = 0 + for run_mode, run_success_dict in data_dict.items(): + for run_success, score_duration_dict in run_success_dict.items(): + for score_duration, rows in score_duration_dict.items(): + print(f"Processing {run_mode} {run_success} {score_duration} {len(rows)}") + print(f"This is {current_category} of {total_categories}") + current_category += 1 + deduped_data[run_mode][run_success][score_duration] = _fuzzy_filter(rows, threshold, ngram_size, bands, rows_per_band) + + return deduped_data + +def _fuzzy_filter( + data_list: List[Dict], + threshold: float = 0.7, + ngram_size: int = 5, + bands: int = 16, + rows_per_band: int = 128, +) -> List[Dict]: + """ + Apply fuzzy deduplication to the nested data structure returned by get_sorted_hf_data. + + Args: + data_dict: Nested dictionary structure from get_sorted_hf_data + threshold: Similarity threshold for LSH + ngram_size: Size of n-grams for MinHash + bands: Number of bands for LSH + rows_per_band: Rows per band for LSH + create_histogram: Whether to create similarity histogram + + Returns: + Dictionary with same structure but fuzzy duplicates removed + """ + # Flatten the data for processing + + # Create documents for MinHash processing + + if len(data_list) <= 1: + return data_list + + all_documents = [] + for i, row in tqdm.tqdm(enumerate(data_list), desc="Creating documents for MinHash"): + # Use 'input' field if available, otherwise use a string representation + content = row.get('code', str(row)) + document = { + "submission_id": str(i), + "code": content, + "original_row": row + } + all_documents.append(document) + + # Apply fuzzy deduplication + minhashes = create_minhashes( + all_documents, ngram_size=ngram_size, bands=bands, rows_per_band=rows_per_band + ) + similarity_matrix = create_similarity_matrix( + minhashes, rows_per_band=rows_per_band, num_bands=bands, threshold=threshold + ) + + good_submission_ids = filter_matrix(similarity_matrix) + + # Keep only the documents that passed the filter + good_documents = [all_documents[int(submission_id)]["original_row"] for submission_id in good_submission_ids] + + # Reconstruct the nested structure + return good_documents + +def get_hf_data() -> Dict[str, Dict[Union[float, int], List[Dict]]]: + # Login using e.g. `huggingface-cli login` to access this dataset + ds = load_dataset("GPUMODE/kernelbot-data", "submissions") + + # we should divide things up into type + # run_mode + # run_sucess + # if run_mode is leaderboard then use score + # otherwise use run_meta[duration] + + + data = ds['train'] + + run_mode_dict = defaultdict(list) + run_success_dict = defaultdict(lambda: defaultdict(list)) + run_duration_dict = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) + + for _, row in tqdm.tqdm(enumerate(data), desc="Processing dataset rows"): + run_mode = row['run_mode'] + run_mode_dict[run_mode].append(row) + + for run_mode, rows in tqdm.tqdm(run_mode_dict.items(), desc="Processing run modes"): + for row in tqdm.tqdm(rows, desc=f"Processing {run_mode} success/failure", leave=False): + run_success_dict[run_mode][row['run_passed']].append(row) + + for run_mode, mode_dict in tqdm.tqdm(run_success_dict.items(), desc="Processing success/failure groups"): + for run_success, rows in tqdm.tqdm(mode_dict.items(), desc=f"Processing {run_mode}", leave=False): + for row in tqdm.tqdm(rows, desc=f"Processing {run_mode} {run_success} rows", leave=False): + if run_mode == 'leaderboard' and run_success == True: + rounded_score = round(float(row['run_score']), 4) + run_duration_dict[run_mode][run_success][rounded_score].append(row) + else: + rounded_duration = round(float(row['run_meta']['duration']), 0) + run_duration_dict[run_mode][run_success][rounded_duration].append(row) + + return run_duration_dict + +def convert_df_to_dict(df: pd.DataFrame) -> Dict[str, Dict[bool, Dict[Union[float, int], List[Dict]]]]: + """ + Convert a pandas DataFrame to a nested dictionary structure. + + Args: + df: pandas DataFrame + + Returns: + Nested dictionary structure + """ + data_dict = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) + for _, row in tqdm.tqdm(df.iterrows(), desc="Processing DataFrame rows"): + run_mode = row['run_mode'] + run_success = row['run_passed'] + score_duration = row['run_meta']['duration'] + data_dict[run_mode][run_success][score_duration].append(row) + return data_dict + +def flatten_data(data_dict: Dict[str, Dict[Union[float, int], List[Dict]]]) -> List[Dict]: + """ + Flatten the nested data structure to a list of documents with metadata. + + Args: + data_dict: Nested dictionary structure from get_sorted_hf_data + + Returns: + List of documents with additional metadata fields + """ + flattened = [] + for run_mode, run_success_dict in tqdm.tqdm(data_dict.items(), desc="Flattening data"): + for run_success, score_duration_dict in run_success_dict.items(): + for score_duration, rows in score_duration_dict.items(): + for row in tqdm.tqdm(rows, desc=f"Processing {run_mode} {score_duration}", leave=False): + # Add metadata to each row + row_with_metadata = row.copy() + row_with_metadata['_run_mode'] = run_mode + row_with_metadata['_run_success'] = run_success + row_with_metadata['_score_duration'] = score_duration + flattened.append(row_with_metadata) + return flattened + +def count_items(data_dict: Dict[str, Dict[bool, Dict[Union[float, int], List[Dict]]]]) -> int: + """ + Count total number of items in the nested data structure. + + Args: + data_dict: Nested dictionary structure from get_sorted_hf_data + + Returns: + Total number of items + """ + total = 0 + for run_mode in data_dict.values(): + for run_success_dict in run_mode.values(): + for rows in run_success_dict.values(): + total += len(rows) + return total + + +def example_usage(): + """ + Example of how to use the deduplication functions with get_hf_data output. + """ + # Load the data + data = get_hf_data() + + print(f"Original data has {count_items(data)} total items") + + # Remove exact duplicates + deduplicated_data = remove_duplicates(data) + print(f"After exact deduplication: {count_items(deduplicated_data)} items") + + # Apply fuzzy deduplication + fuzzy_deduplicated_data = fuzzy_filter( + deduplicated_data, + threshold=0.8, # High threshold for more strict deduplication + ngram_size=5, + bands=16, + rows_per_band=128 + ) + # convert to df + flattened_data = flatten_data(fuzzy_deduplicated_data) + df = pd.DataFrame(flattened_data) + + return df + +def dedup_df(df: pd.DataFrame) -> pd.DataFrame: + """ + Deduplicate a pandas DataFrame. + + Args: + df: pandas DataFrame + """ + # convert to dict + data_dict = convert_df_to_dict(df) + # deduplicate + deduplicated_data = fuzzy_filter(data_dict, threshold=0.8, ngram_size=5, bands=16, rows_per_band=128) + # convert to df + flattened_data = flatten_data(deduplicated_data) + df = pd.DataFrame(flattened_data) + return df + +def create_parquet_file(data_dict: Dict[str, Dict[Union[float, int], List[Dict]]], filename: str): + """ + Create a Parquet file from the nested data structure. + + Args: + data_dict: Nested dictionary structure from get_sorted_hf_data + filename: Name of the output Parquet file + """ + # Flatten the data + flattened_data = flatten_data(data_dict) + + # Create a pandas DataFrame from the flattened data + df = pd.DataFrame(flattened_data) + # Convert the DataFrame to a Parquet file + df.to_parquet(filename, index=False) + + + +def main(): + example_usage() + +if __name__ == "__main__": + main() diff --git a/export.py b/export.py index b4e3cdb..8c534ae 100644 --- a/export.py +++ b/export.py @@ -5,6 +5,7 @@ from datasets import Dataset from dotenv import load_dotenv from sqlalchemy import create_engine, text +from dedup import dedup_df load_dotenv() @@ -199,11 +200,27 @@ def main(output_dir): submissions_dataset.to_parquet(submissions_output_path) print(f"Submissions dataset successfully saved to {submissions_output_path}") + # Deduplicate submissions + print("Applying deduplication to submissions...") + try: + deduplicated_submissions_df = dedup_df(submissions_df.copy()) + deduplicated_submissions_path = os.path.join(output_dir, "deduplicated_submissions.parquet") + + # Convert to dataset and save + deduplicated_submissions_dataset = Dataset.from_pandas(deduplicated_submissions_df) + deduplicated_submissions_dataset.to_parquet(deduplicated_submissions_path) + print(f"Deduplicated submissions dataset successfully saved to {deduplicated_submissions_path}") + print(f"Original submissions: {len(submissions_df)}, After deduplication: {len(deduplicated_submissions_df)}") + + except Exception as e: + print(f"Warning: Deduplication failed with error: {e}") + print("Proceeding without deduplication...") + deduplicated_submissions_df = submissions_df.copy() + # Filter for and save successful submissions from the anonymized data if 'run_passed' in submissions_df.columns: print("Creating successful submissions dataset...") successful_submissions_df = submissions_df[submissions_df['run_passed'] == True].copy() - # Convert to dataset and save successful_submissions_dataset = Dataset.from_pandas(successful_submissions_df) successful_output_path = os.path.join( @@ -215,6 +232,15 @@ def main(output_dir): f"{successful_output_path}" ) + # Create deduplicated successful submissions + print("Creating deduplicated successful submissions dataset...") + deduplicated_successful_submissions_df = deduplicated_submissions_df[deduplicated_submissions_df['run_passed'] == True].copy() + deduplicated_successful_submissions_dataset = Dataset.from_pandas(deduplicated_successful_submissions_df) + deduplicated_successful_submissions_path = os.path.join(output_dir, "deduplicated_successful_submissions.parquet") + deduplicated_successful_submissions_dataset.to_parquet(deduplicated_successful_submissions_path) + print(f"Deduplicated successful submissions dataset successfully saved to {deduplicated_successful_submissions_path}") + print(f"Original successful submissions: {len(successful_submissions_df)}, After deduplication: {len(deduplicated_successful_submissions_df)}") + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Export leaderboard data to a Hugging Face dataset.") diff --git a/test_dedup.py b/test_dedup.py new file mode 100644 index 0000000..f5ea811 --- /dev/null +++ b/test_dedup.py @@ -0,0 +1,402 @@ +#!/usr/bin/env python3 +""" +Unit tests for the deduplication pipeline. +Tests the end-to-end flow with fake data matching the database schema. +""" + +import unittest +import pandas as pd +import numpy as np +from datetime import datetime, timezone +import random +from typing import Dict, List, Any +import tempfile +import os +import sys + +# Import the functions we want to test +try: + from dedup import ( + remove_duplicates, + fuzzy_filter, + convert_df_to_dict, + flatten_data, + dedup_df, + count_items, + create_parquet_file + ) +except ImportError as e: + print(f"Import error: {e}") + print("Some functions may not be available for testing") + + +class TestDedupEndToEnd(unittest.TestCase): + + def setUp(self): + """Set up test fixtures with fake data matching the schema.""" + random.seed(42) # For reproducible tests + np.random.seed(42) + self.fake_data = self.create_fake_dataset(50) + self.df = pd.DataFrame(self.fake_data) + + def create_fake_dataset(self, num_entries: int) -> List[Dict[str, Any]]: + """Create a fake dataset with the required schema fields.""" + fake_data = [] + + # Sample code snippets (some duplicates for testing) + code_samples = [ + "def hello_world():\n print('Hello World')", + "import numpy as np\nx = np.array([1, 2, 3])", + "for i in range(10):\n print(i)", + "class MyClass:\n def __init__(self):\n pass", + "def fibonacci(n):\n if n <= 1:\n return n\n return fibonacci(n-1) + fibonacci(n-2)", + "import pandas as pd\ndf = pd.DataFrame({'a': [1, 2, 3]})", + "def quicksort(arr):\n if len(arr) <= 1:\n return arr", + "x = [1, 2, 3, 4, 5]\ny = [i**2 for i in x]", + "try:\n result = 10 / 0\nexcept ZeroDivisionError:\n print('Error')", + "def hello_world():\n print('Hello World')", # Exact duplicate + ] + + run_modes = ['leaderboard', 'benchmark', 'test'] + file_names = ['solution.py', 'main.py', 'algorithm.py', 'test.py'] + + for i in range(num_entries): + # Create base timestamp + base_time = datetime(2024, 1, 1, tzinfo=timezone.utc) + submission_time = base_time.replace( + day=random.randint(1, 28), + hour=random.randint(0, 23), + minute=random.randint(0, 59) + ) + + # Select code (with some duplicates) + code = random.choice(code_samples) + if i < 5: # First 5 entries use the same code for exact duplicate testing + code = code_samples[0] + elif i < 10: # Next 5 use slightly modified versions for fuzzy testing + code = code_samples[0] + f"\n# Comment {i}" + + run_mode = random.choice(run_modes) + run_passed = random.choice([True, False]) + + # Generate run score based on mode and success + if run_mode == 'leaderboard' and run_passed: + run_score = round(random.uniform(0.1, 1.0), 4) + else: + run_score = 0.0 if not run_passed else round(random.uniform(0.1, 0.8), 4) + + # Create the entry matching the database schema + entry = { + 'submission_id': i + 1000, + 'leaderboard_id': random.randint(1, 10), + 'user_id': random.randint(100, 999), + 'submission_time': submission_time, + 'file_name': random.choice(file_names), + 'code': code, + 'code_id': i + 2000, + 'run_id': i + 3000, + 'run_start_time': submission_time, + 'run_end_time': submission_time.replace( + second=random.randint(1, 59) + ), + 'run_mode': run_mode, + 'run_score': run_score, + 'run_passed': run_passed, + 'run_result': { + 'benchmark-count': random.randint(1, 10), + 'benchmark.0.best': f'benchmark_{random.randint(1, 100)}.txt', + 'benchmark.0.err': '', + 'benchmark.0.mean': round(random.uniform(0.1, 2.0), 6), + 'benchmark.0.report': f'report_{i}.json' + }, + 'run_compilation': { + 'command': 'python', + 'exit_code': 0 if run_passed else random.randint(1, 255), + 'nvcc_found': random.choice([True, False]), + 'nvcc_version': f'11.{random.randint(0, 8)}', + 'stderr': '' if run_passed else f'Error message {i}', + 'stdout': f'Output {i}', + 'success': run_passed + }, + 'run_meta': { + 'command': 'python solution.py', + 'duration': round(random.uniform(0.1, 10.0), 3), + 'exit_code': 0 if run_passed else random.randint(1, 255), + 'stderr': '' if run_passed else f'Runtime error {i}', + 'stdout': f'Runtime output {i}', + 'success': run_passed + }, + 'run_system_info': { + 'cpu': f'Intel Core i{random.randint(5, 9)}', + 'gpu': random.choice(['NVIDIA RTX 3080', 'NVIDIA RTX 4090', 'None']), + 'platform': random.choice(['linux', 'darwin', 'win32']), + 'torch': f'2.{random.randint(0, 3)}.{random.randint(0, 9)}' + } + } + fake_data.append(entry) + + return fake_data + + def test_dataframe_creation(self): + """Test that the fake dataset creates a valid DataFrame.""" + self.assertEqual(len(self.df), 50) + + # Check required columns exist (matching the schema in the image) + required_columns = [ + 'submission_id', 'leaderboard_id', 'user_id', 'submission_time', + 'file_name', 'code', 'code_id', 'run_id', 'run_start_time', + 'run_end_time', 'run_mode', 'run_score', 'run_passed', + 'run_result', 'run_compilation', 'run_meta', 'run_system_info' + ] + + for col in required_columns: + self.assertIn(col, self.df.columns, f"Missing required column: {col}") + + # Check data types + self.assertTrue(self.df['submission_id'].dtype in ['int64', 'int32']) + self.assertTrue(self.df['run_passed'].dtype == 'bool') + self.assertTrue(self.df['run_score'].dtype in ['float64', 'float32']) + + # Verify struct fields exist + sample_row = self.df.iloc[0] + self.assertIsInstance(sample_row['run_result'], dict) + self.assertIsInstance(sample_row['run_compilation'], dict) + self.assertIsInstance(sample_row['run_meta'], dict) + self.assertIsInstance(sample_row['run_system_info'], dict) + + def test_convert_df_to_dict(self): + """Test conversion from DataFrame to nested dictionary structure.""" + try: + data_dict = convert_df_to_dict(self.df) + + # Check structure + self.assertIsInstance(data_dict, dict) + + # Should have run_mode keys + run_modes = set(self.df['run_mode'].unique()) + self.assertEqual(set(data_dict.keys()), run_modes) + + # Check nested structure + for run_mode in data_dict: + self.assertIsInstance(data_dict[run_mode], dict) + for run_success in data_dict[run_mode]: + self.assertIsInstance(data_dict[run_mode][run_success], dict) + for score_duration in data_dict[run_mode][run_success]: + self.assertIsInstance( + data_dict[run_mode][run_success][score_duration], + list + ) + except NameError: + self.skipTest("convert_df_to_dict function not available") + + def test_exact_deduplication(self): + """Test exact duplicate removal.""" + try: + data_dict = convert_df_to_dict(self.df) + original_count = count_items(data_dict) + + deduplicated_data = remove_duplicates(data_dict) + deduplicated_count = count_items(deduplicated_data) + + # Should have fewer or equal items after deduplication + self.assertLessEqual(deduplicated_count, original_count) + + # Structure should be preserved + self.assertEqual(set(data_dict.keys()), set(deduplicated_data.keys())) + + except NameError as e: + self.skipTest(f"Required functions not available: {e}") + + def test_fuzzy_deduplication_small(self): + """Test fuzzy duplicate removal with small threshold for faster testing.""" + try: + data_dict = convert_df_to_dict(self.df) + original_count = count_items(data_dict) + + # Use small parameters for faster testing + fuzzy_deduplicated_data = fuzzy_filter( + data_dict, + threshold=0.5, # Lower threshold for faster testing + ngram_size=3, # Smaller ngram size + bands=4, # Fewer bands + rows_per_band=32 # Fewer rows per band + ) + + fuzzy_count = count_items(fuzzy_deduplicated_data) + + # Should have fewer or equal items after fuzzy deduplication + self.assertLessEqual(fuzzy_count, original_count) + + # Structure should be preserved + self.assertEqual(set(data_dict.keys()), set(fuzzy_deduplicated_data.keys())) + + except NameError as e: + self.skipTest(f"Required functions not available: {e}") + + def test_flatten_and_reconstruct(self): + """Test flattening and reconstruction of data.""" + try: + data_dict = convert_df_to_dict(self.df) + original_count = count_items(data_dict) + + # Flatten + flattened_data = flatten_data(data_dict) + self.assertEqual(len(flattened_data), original_count) + + # Check metadata fields were added + if flattened_data: + sample_row = flattened_data[0] + self.assertIn('_run_mode', sample_row) + self.assertIn('_run_success', sample_row) + self.assertIn('_score_duration', sample_row) + + except NameError as e: + self.skipTest(f"Required functions not available: {e}") + + def test_dedup_df_end_to_end(self): + """Test the complete deduplication pipeline.""" + try: + original_length = len(self.df) + + # Run the complete deduplication pipeline + deduplicated_df = dedup_df(self.df) + + # Should return a DataFrame + self.assertIsInstance(deduplicated_df, pd.DataFrame) + + # Should have fewer or equal rows + self.assertLessEqual(len(deduplicated_df), original_length) + + # Should preserve required columns + required_columns = ['submission_id', 'code', 'run_mode', 'run_passed'] + for col in required_columns: + self.assertIn(col, deduplicated_df.columns) + + # Check data integrity + self.assertFalse(deduplicated_df.empty, "Deduplicated DataFrame should not be empty") + + except NameError as e: + self.skipTest(f"dedup_df function not available: {e}") + + def test_parquet_creation(self): + """Test Parquet file creation.""" + try: + data_dict = convert_df_to_dict(self.df) + + with tempfile.NamedTemporaryFile(suffix='.parquet', delete=False) as tmp_file: + try: + create_parquet_file(data_dict, tmp_file.name) + + # Check file was created + self.assertTrue(os.path.exists(tmp_file.name)) + + # Check file is not empty + self.assertGreater(os.path.getsize(tmp_file.name), 0) + + # Try to read the file back + df_from_parquet = pd.read_parquet(tmp_file.name) + self.assertIsInstance(df_from_parquet, pd.DataFrame) + self.assertGreater(len(df_from_parquet), 0) + + finally: + # Clean up + if os.path.exists(tmp_file.name): + os.unlink(tmp_file.name) + + except NameError as e: + self.skipTest(f"Required functions not available: {e}") + + def test_data_consistency_after_deduplication(self): + """Test that data remains consistent after deduplication.""" + try: + # Create dataset with known duplicates + duplicate_data = [] + + # Add the same code 3 times with different metadata + base_entry = self.fake_data[0].copy() + for i in range(3): + entry = base_entry.copy() + entry['submission_id'] = 9000 + i + entry['run_id'] = 9100 + i + duplicate_data.append(entry) + + # Add to main dataset + test_data = self.fake_data + duplicate_data + test_df = pd.DataFrame(test_data) + + original_length = len(test_df) + deduplicated_df = dedup_df(test_df) + + # Should have removed at least 2 duplicates + self.assertLess(len(deduplicated_df), original_length) + + # Check that essential fields are preserved + self.assertTrue(all(col in deduplicated_df.columns for col in + ['submission_id', 'code', 'run_mode', 'run_passed'])) + + except NameError as e: + self.skipTest(f"Required functions not available: {e}") + + def test_schema_compliance(self): + """Test that the fake dataset matches the expected schema from the database.""" + # Test all required fields exist and have correct types + + # Test BIGINT fields + bigint_fields = ['submission_id', 'leaderboard_id', 'user_id', 'code_id', 'run_id'] + for field in bigint_fields: + self.assertTrue(self.df[field].dtype in ['int64', 'int32'], + f"{field} should be integer type") + + # Test VARCHAR fields + varchar_fields = ['file_name', 'code', 'run_mode'] + for field in varchar_fields: + self.assertTrue(self.df[field].dtype == 'object', + f"{field} should be string/object type") + + # Test TIMESTAMP fields + timestamp_fields = ['submission_time', 'run_start_time', 'run_end_time'] + for field in timestamp_fields: + # Check that all values are datetime objects with timezone + sample_value = self.df[field].iloc[0] + self.assertIsInstance(sample_value, datetime) + self.assertIsNotNone(sample_value.tzinfo) + + # Test DOUBLE field + self.assertTrue(self.df['run_score'].dtype in ['float64', 'float32']) + + # Test BOOLEAN field + self.assertTrue(self.df['run_passed'].dtype == 'bool') + + # Test STRUCT fields + struct_fields = ['run_result', 'run_compilation', 'run_meta', 'run_system_info'] + for field in struct_fields: + # All values should be dictionaries + self.assertTrue(all(isinstance(val, dict) for val in self.df[field])) + + def test_duplicate_detection(self): + """Test that we can detect exact and near duplicates in the dataset.""" + # Count exact duplicates by code + code_counts = self.df['code'].value_counts() + exact_duplicates = code_counts[code_counts > 1] + + # Should have some exact duplicates (first 5 entries) + self.assertGreater(len(exact_duplicates), 0, "Should have exact duplicates for testing") + + # Check that fuzzy duplicates exist (entries with similar code) + similar_code_count = 0 + base_code = "def hello_world():\n print('Hello World')" + for code in self.df['code']: + if base_code in code and code != base_code: + similar_code_count += 1 + + self.assertGreater(similar_code_count, 0, "Should have fuzzy duplicates for testing") + + +if __name__ == '__main__': + # Add some helpful output + print("Running deduplication pipeline tests...") + print(f"Python version: {sys.version}") + print(f"Pandas version: {pd.__version__}") + + # Run the tests + unittest.main(verbosity=2) \ No newline at end of file From 2ad5e808d56243da49aa25a44dbac861628263d7 Mon Sep 17 00:00:00 2001 From: Sahan Paliskara Date: Mon, 23 Jun 2025 17:10:24 -0400 Subject: [PATCH 2/3] magic number removal --- dedup.py | 118 ++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 100 insertions(+), 18 deletions(-) diff --git a/dedup.py b/dedup.py index b316e38..8f209a6 100644 --- a/dedup.py +++ b/dedup.py @@ -9,6 +9,82 @@ import datasketch import pandas as pd +# ============================================================================= +# DEDUPLICATION CONFIGURATION CONSTANTS +# ============================================================================= + +# Fuzzy Deduplication Parameters +FUZZY_SIMILARITY_THRESHOLD = 0.8 +""" +Jaccard similarity threshold for considering two documents as duplicates. +Range: 0.0 to 1.0 +- 0.8 = High threshold, only very similar documents are considered duplicates +- 0.7 = Medium threshold, moderately similar documents are duplicates +- 0.5 = Low threshold, loosely similar documents are duplicates +Higher values = more strict deduplication, fewer items removed +""" + +NGRAM_SIZE = 5 +""" +Size of character n-grams used for MinHash fingerprinting. +- Smaller values (3-4): More sensitive to small changes, better for short text +- Larger values (5-7): Less sensitive to minor variations, better for longer text +- Too small: May create false positives (different texts seem similar) +- Too large: May miss actual duplicates with small variations +""" + +LSH_BANDS = 16 +""" +Number of bands for Locality Sensitive Hashing (LSH). +Used to speed up similarity detection by grouping similar hashes. +- More bands = faster but less accurate similarity detection +- Fewer bands = slower but more accurate similarity detection +Must divide evenly into ROWS_PER_BAND * LSH_BANDS = total permutations +""" + +ROWS_PER_BAND = 128 +""" +Number of rows per band in LSH configuration. +Total MinHash permutations = ROWS_PER_BAND * LSH_BANDS +- More rows per band = higher precision, may miss some similar pairs +- Fewer rows per band = higher recall, may include more false positives +Default: 128 rows × 16 bands = 2048 total permutations +""" + +# Score Processing Parameters +LEADERBOARD_SCORE_PRECISION = 4 +""" +Number of decimal places to round leaderboard scores when grouping submissions. +Used to group submissions with very similar scores together. +- Higher precision (more decimal places): More granular grouping +- Lower precision (fewer decimal places): Broader grouping of similar scores +""" + +DURATION_PRECISION = 0 +""" +Number of decimal places to round execution duration (in seconds). +Used to group submissions with similar execution times. +- 0: Round to nearest second (1.7s → 2s) +- 1: Round to nearest 0.1s (1.73s → 1.7s) +""" + +# ============================================================================= +# CONFIGURATION SUMMARY +# ============================================================================= +""" +Current deduplication configuration: +├─ Similarity Detection: 0.8 threshold (strict) +├─ Text Fingerprinting: 5-character n-grams +├─ LSH Performance: 16 bands × 128 rows = 2048 permutations +├─ Score Grouping: 4 decimal places for leaderboard scores +└─ Duration Grouping: 0 decimal places for execution times + +To adjust deduplication sensitivity: +- Increase FUZZY_SIMILARITY_THRESHOLD (0.8→0.9) for stricter deduplication +- Decrease FUZZY_SIMILARITY_THRESHOLD (0.8→0.7) for more aggressive deduplication +- Adjust NGRAM_SIZE for different text lengths (3-4 for short, 5-7 for long) +""" + def remove_duplicates(data_dict: Dict[str, Dict[bool, Dict[Union[float, int], List[Dict]]]]): """ Remove exact duplicates from the nested data structure returned by get_sorted_hf_data. @@ -60,9 +136,9 @@ def remove_duplicates(data_dict: Dict[str, Dict[bool, Dict[Union[float, int], Li def create_minhashes( documents: List[Dict[str, str]], - ngram_size: int = 5, - bands: int = 20, - rows_per_band: int = 128, + ngram_size: int = NGRAM_SIZE, + bands: int = LSH_BANDS, + rows_per_band: int = ROWS_PER_BAND, ) -> Tuple[Dict[str, datasketch.MinHash], int]: """ Create MinHash signatures for a list of documents with LSH bands configuration. @@ -155,10 +231,10 @@ def filter_matrix( def fuzzy_filter( data_dict: Dict[str, Dict[bool, Dict[Union[float, int], List[Dict]]]], - threshold: float = 0.7, - ngram_size: int = 5, - bands: int = 16, - rows_per_band: int = 128, + threshold: float = FUZZY_SIMILARITY_THRESHOLD, + ngram_size: int = NGRAM_SIZE, + bands: int = LSH_BANDS, + rows_per_band: int = ROWS_PER_BAND, ) -> Dict[str, Dict[bool, Dict[Union[float, int], List[Dict]]]]: total_categories = 0 @@ -181,10 +257,10 @@ def fuzzy_filter( def _fuzzy_filter( data_list: List[Dict], - threshold: float = 0.7, - ngram_size: int = 5, - bands: int = 16, - rows_per_band: int = 128, + threshold: float = FUZZY_SIMILARITY_THRESHOLD, + ngram_size: int = NGRAM_SIZE, + bands: int = LSH_BANDS, + rows_per_band: int = ROWS_PER_BAND, ) -> List[Dict]: """ Apply fuzzy deduplication to the nested data structure returned by get_sorted_hf_data. @@ -263,10 +339,10 @@ def get_hf_data() -> Dict[str, Dict[Union[float, int], List[Dict]]]: for run_success, rows in tqdm.tqdm(mode_dict.items(), desc=f"Processing {run_mode}", leave=False): for row in tqdm.tqdm(rows, desc=f"Processing {run_mode} {run_success} rows", leave=False): if run_mode == 'leaderboard' and run_success == True: - rounded_score = round(float(row['run_score']), 4) + rounded_score = round(float(row['run_score']), LEADERBOARD_SCORE_PRECISION) run_duration_dict[run_mode][run_success][rounded_score].append(row) else: - rounded_duration = round(float(row['run_meta']['duration']), 0) + rounded_duration = round(float(row['run_meta']['duration']), DURATION_PRECISION) run_duration_dict[run_mode][run_success][rounded_duration].append(row) return run_duration_dict @@ -346,10 +422,10 @@ def example_usage(): # Apply fuzzy deduplication fuzzy_deduplicated_data = fuzzy_filter( deduplicated_data, - threshold=0.8, # High threshold for more strict deduplication - ngram_size=5, - bands=16, - rows_per_band=128 + threshold=FUZZY_SIMILARITY_THRESHOLD, + ngram_size=NGRAM_SIZE, + bands=LSH_BANDS, + rows_per_band=ROWS_PER_BAND ) # convert to df flattened_data = flatten_data(fuzzy_deduplicated_data) @@ -367,7 +443,13 @@ def dedup_df(df: pd.DataFrame) -> pd.DataFrame: # convert to dict data_dict = convert_df_to_dict(df) # deduplicate - deduplicated_data = fuzzy_filter(data_dict, threshold=0.8, ngram_size=5, bands=16, rows_per_band=128) + deduplicated_data = fuzzy_filter( + data_dict, + threshold=FUZZY_SIMILARITY_THRESHOLD, + ngram_size=NGRAM_SIZE, + bands=LSH_BANDS, + rows_per_band=ROWS_PER_BAND + ) # convert to df flattened_data = flatten_data(deduplicated_data) df = pd.DataFrame(flattened_data) From a30f85d0353b6347aef785e827edc7f25f97bae9 Mon Sep 17 00:00:00 2001 From: Sahan Paliskara Date: Tue, 24 Jun 2025 08:43:42 -0400 Subject: [PATCH 3/3] Add deduplicated datasets --- dedup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dedup.py b/dedup.py index 8f209a6..f6f3449 100644 --- a/dedup.py +++ b/dedup.py @@ -118,9 +118,9 @@ def remove_duplicates(data_dict: Dict[str, Dict[bool, Dict[Union[float, int], Li # If duplicate found, keep the one with better metrics existing_row = unique_entries[content_hash] - # For leaderboard mode with successful runs, prefer higher scores + # For leaderboard mode with successful runs, prefer lower scores / faster times if run_mode == 'leaderboard' and row.get('run_passed') == True: - if row.get('run_score', 0) > existing_row.get('run_score', 0): + if row.get('run_score', 0) < existing_row.get('run_score', 0): unique_entries[content_hash] = row # For other cases, prefer shorter duration (faster execution) else: