Skip to content

Commit 6a6707f

Browse files
authored
Merge pull request #41 from mpdimitr/mixed-workload
fixed bug with concurrent workers attempting to insert the same doc_id
2 parents 18e1b9f + dd77ab1 commit 6a6707f

File tree

4 files changed

+418
-9
lines changed

4 files changed

+418
-9
lines changed

dataset_reader/ann_h5_reader.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import itertools
12
from typing import Iterator
23

34
import h5py
@@ -14,17 +15,18 @@ def __init__(self, path, normalize=False):
1415

1516
def read_queries(self) -> Iterator[Query]:
1617
data = h5py.File(self.path)
18+
distances = data["distances"] if "distances" in data else itertools.repeat(None)
1719

1820
for vector, expected_result, expected_scores in zip(
19-
data["test"], data["neighbors"], data["distances"]
21+
data["test"], data["neighbors"], distances
2022
):
2123
if self.normalize:
2224
vector /= np.linalg.norm(vector)
2325
yield Query(
2426
vector=vector.tolist(),
2527
meta_conditions=None,
2628
expected_result=expected_result.tolist(),
27-
expected_scores=expected_scores.tolist(),
29+
expected_scores=expected_scores.tolist() if expected_scores is not None else None,
2830
)
2931

3032
def read_data(self, *args, **kwargs) -> Iterator[Record]:

datasets/datasets.json

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1296,5 +1296,25 @@
12961296
"path": "random-100-match-kw-small-vocab/random_keywords_1m_vocab_10_no_filters",
12971297
"vector_count": 100,
12981298
"description": "Synthetic data"
1299+
},
1300+
{
1301+
"name": "cohere-768-1M",
1302+
"vector_size": 768,
1303+
"distance": "dot",
1304+
"type": "h5",
1305+
"path": "cohere-768-1M/cohere-768-1M.hdf5",
1306+
"link": "https://dbyiw3u3rf9yr.cloudfront.net/corpora/vectorsearch/cohere-wikipedia-22-12-en-embeddings/documents-1m.hdf5.bz2",
1307+
"vector_count": 1000000,
1308+
"description": "Wikipedia embeddings"
1309+
},
1310+
{
1311+
"name": "cohere-768-10M",
1312+
"vector_size": 768,
1313+
"distance": "dot",
1314+
"type": "h5",
1315+
"path": "cohere-768-10M/cohere-768-10M.hdf5",
1316+
"link": "https://dbyiw3u3rf9yr.cloudfront.net/corpora/vectorsearch/cohere-wikipedia-22-12-en-embeddings/documents-10m.hdf5.bz2",
1317+
"vector_count": 10000000,
1318+
"description": "Wikipedia embeddings"
12991319
}
1300-
]
1320+
]

engine/base_client/search.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121

2222
class BaseSearcher:
23-
_doc_id_counter = itertools.count(100000000)
23+
_doc_id_counter = None # Will be initialized per process
2424
MP_CONTEXT = None
2525

2626
def __init__(self, host, connection_params, search_params):
@@ -67,15 +67,22 @@ def _search_one(cls, query, top: Optional[int] = None):
6767
precision = len(ids.intersection(query.expected_result[:top])) / top
6868
return precision, end - start
6969

70+
@classmethod
71+
def _get_doc_id_counter(cls):
72+
if cls._doc_id_counter is None:
73+
# Use process ID to create unique starting point for each worker
74+
process_id = os.getpid()
75+
# Each process gets a unique range: 1000000000 + (pid * 1000000)
76+
start_offset = 1000000000 + (process_id % 1000) * 1000000
77+
cls._doc_id_counter = itertools.count(start_offset)
78+
return cls._doc_id_counter
79+
7080
@classmethod
7181
def _insert_one(cls, query):
7282
start = time.perf_counter()
7383

74-
# Generate unique doc_id here
75-
doc_id = next(cls._doc_id_counter)
76-
77-
# Debug logging to verify inserts are happening
78-
#print(f"DEBUG: Inserting vector with doc_id={doc_id}")
84+
# Generate unique doc_id with process-safe counter
85+
doc_id = next(cls._get_doc_id_counter())
7986

8087
cls.insert_one(str(doc_id), query.vector, query.meta_conditions)
8188
end = time.perf_counter()

0 commit comments

Comments
 (0)