diff --git a/engine/base_client/search.py b/engine/base_client/search.py index bff908cc..bfbe80cf 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -164,13 +164,26 @@ def cycling_query_generator(queries, total_count): # Process queries with progress updates results = [] + total_insert_count = 0 + total_search_count = 0 + all_insert_latencies = [] + all_search_latencies = [] + for query in used_queries: - results.append(search_one(query)) + if random.random() < insert_fraction: + precision, latency = insert_one(query) + total_insert_count += 1 + all_insert_latencies.append(latency) + results.append(('insert', precision, latency)) + else: + precision, latency = search_one(query) + total_search_count += 1 + all_search_latencies.append(latency) + results.append(('search', precision, latency)) pbar.update(1) # Close the progress bar pbar.close() - total_time = time.perf_counter() - start else: # Dynamically calculate chunk size based on total_query_count @@ -206,10 +219,19 @@ def cycling_query_generator(queries, total_count): # Collect results from all worker processes results = [] + total_insert_count = 0 + total_search_count = 0 + all_insert_latencies = [] + all_search_latencies = [] min_start_time = time.perf_counter() + for _ in processes: - proc_start_time, chunk_results = result_queue.get() + proc_start_time, chunk_results, insert_count, search_count, insert_latencies, search_latencies = result_queue.get() results.extend(chunk_results) + total_insert_count += insert_count + total_search_count += search_count + all_insert_latencies.extend(insert_latencies) + all_search_latencies.extend(search_latencies) # Update min_start_time if necessary if proc_start_time < min_start_time: @@ -222,24 +244,53 @@ def cycling_query_generator(queries, total_count): for process in processes: process.join() - # Extract precisions and latencies (outside the timed section) - precisions, latencies = zip(*results) + # Extract overall precisions and latencies + all_precisions = [result[1] for result in results] + all_latencies = [result[2] for result in results] + + # Calculate search-only precisions (exclude inserts from precision calculation) + search_precisions = [result[1] for result in results if result[0] == 'search'] self.__class__.delete_client() return { + # Overall metrics "total_time": total_time, - "mean_time": np.mean(latencies), - "mean_precisions": np.mean(precisions), - "std_time": np.std(latencies), - "min_time": np.min(latencies), - "max_time": np.max(latencies), - "rps": len(latencies) / total_time, - "p50_time": np.percentile(latencies, 50), - "p95_time": np.percentile(latencies, 95), - "p99_time": np.percentile(latencies, 99), - "precisions": precisions, - "latencies": latencies, + "total_operations": len(all_latencies), + "rps": len(all_latencies) / total_time, + + # Search metrics + "search_count": total_search_count, + "search_rps": total_search_count / total_time if total_search_count > 0 else 0, + "mean_search_time": np.mean(all_search_latencies) if all_search_latencies else 0, + "mean_search_precision": np.mean(search_precisions) if search_precisions else 0, + "p50_search_time": np.percentile(all_search_latencies, 50) if all_search_latencies else 0, + "p95_search_time": np.percentile(all_search_latencies, 95) if all_search_latencies else 0, + "p99_search_time": np.percentile(all_search_latencies, 99) if all_search_latencies else 0, + + # Insert metrics + "insert_count": total_insert_count, + "insert_rps": total_insert_count / total_time if total_insert_count > 0 else 0, + "mean_insert_time": np.mean(all_insert_latencies) if all_insert_latencies else 0, + "p50_insert_time": np.percentile(all_insert_latencies, 50) if all_insert_latencies else 0, + "p95_insert_time": np.percentile(all_insert_latencies, 95) if all_insert_latencies else 0, + "p99_insert_time": np.percentile(all_insert_latencies, 99) if all_insert_latencies else 0, + + # Mixed workload metrics + "actual_insert_fraction": total_insert_count / len(all_latencies) if len(all_latencies) > 0 else 0, + "target_insert_fraction": insert_fraction, + + # Legacy compatibility (for existing code that expects these) + "mean_time": np.mean(all_latencies), + "mean_precisions": np.mean(search_precisions) if search_precisions else 1.0, # Only search precisions + "std_time": np.std(all_latencies), + "min_time": np.min(all_latencies), + "max_time": np.max(all_latencies), + "p50_time": np.percentile(all_latencies, 50), + "p95_time": np.percentile(all_latencies, 95), + "p99_time": np.percentile(all_latencies, 99), + "precisions": search_precisions, # Only search precisions + "latencies": all_latencies, } def setup_search(self): @@ -259,6 +310,27 @@ def chunked_iterable(iterable, size): while chunk := list(islice(it, size)): yield chunk +def process_chunk(chunk, search_one, insert_one, insert_fraction): + results = [] + insert_count = 0 + search_count = 0 + insert_latencies = [] + search_latencies = [] + + for i, query in enumerate(chunk): + if random.random() < insert_fraction: + precision, latency = insert_one(query) + insert_count += 1 + insert_latencies.append(latency) + results.append(('insert', precision, latency)) + else: + precision, latency = search_one(query) + search_count += 1 + search_latencies.append(latency) + results.append(('search', precision, latency)) + + return results, insert_count, search_count, insert_latencies, search_latencies + # Function to be executed by each worker process def worker_function(self, distance, search_one, insert_one, chunk, result_queue, insert_fraction=0.0): self.init_client( @@ -270,16 +342,7 @@ def worker_function(self, distance, search_one, insert_one, chunk, result_queue, self.setup_search() start_time = time.perf_counter() - results = process_chunk(chunk, search_one, insert_one, insert_fraction) - result_queue.put((start_time, results)) - - -def process_chunk(chunk, search_one, insert_one, insert_fraction): - results = [] - for i, query in enumerate(chunk): - if random.random() < insert_fraction: - result = insert_one(query) - else: - result = search_one(query) - results.append(result) - return results \ No newline at end of file + results, insert_count, search_count, insert_latencies, search_latencies = process_chunk( + chunk, search_one, insert_one, insert_fraction + ) + result_queue.put((start_time, results, insert_count, search_count, insert_latencies, search_latencies)) \ No newline at end of file