Skip to content

Commit 71b89d9

Browse files
committed
Add info argument to call-back
Change in the API - will pass in information about the DID request that can be used by other parts
1 parent d04ce80 commit 71b89d9

File tree

3 files changed

+34
-15
lines changed

3 files changed

+34
-15
lines changed

README.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ ServiceX DID finders take a dataset name and turn them into files to be transfor
1111
Create an async callback method that `yield`s file info dictionaries. For example:
1212

1313
```python
14-
async def my_callback(did_name: str):
14+
async def my_callback(did_name: str, info: Dict[str, Any]):
1515
for i in range(0, 10):
1616
yield {
1717
'file_path': f"root://atlas-experiment.cern.ch/dataset1/file{i}.root",
@@ -23,6 +23,9 @@ Create an async callback method that `yield`s file info dictionaries. For exampl
2323

2424
Yield the results as you find them - ServiceX will actually start processing the files before your DID lookup is finished using this scheme. Fill in `file_size` in bytes and number of events in `file_events` if you know them.
2525

26+
`info` contains a dict of various info about the request that asked for this DID:
27+
* `request-id` The request id that has this DID asscoiated. For logging.
28+
2629
In your main script, start off the DID finder with a call similar to:
2730

2831
```python
@@ -31,6 +34,10 @@ start_did_finder('my_finder', my_callback)
3134

3235
This script should be run in your docker container at start up. Once running, if a user submits a query with the DID name `my_finder://dataset1`, then your callback will be called with `did_name` set to `dataset1`.
3336

37+
### Proper Logging
38+
39+
You might not think logging is important, but... k8... blah blah
40+
3441
## Left to do
3542

3643
- what is adler32

src/servicex_did_finder_lib/communication.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from .servicex_adaptor import ServiceXAdapter
1515

1616
# The type for the callback method to handle DID's, supplied by the user.
17-
UserDIDHandler = Callable[[str], AsyncGenerator[Dict[str, Any], None]]
17+
UserDIDHandler = Callable[[str, Dict[str, Any]], AsyncGenerator[Dict[str, Any], None]]
1818

1919
# Given name, build the RabbitMQ queue name by appending this.
2020
# This is backed into how ServiceX works - do not change unless it
@@ -25,10 +25,11 @@
2525
__logging.addHandler(logging.NullHandler())
2626

2727

28-
async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, user_callback: UserDIDHandler):
28+
async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[str, Any],
29+
user_callback: UserDIDHandler):
2930
summary = DIDSummary(did)
3031
start_time = datetime.now()
31-
async for file_info in user_callback(did):
32+
async for file_info in user_callback(did, info):
3233

3334
# Track the file, inject back into the system
3435
summary.add_file(file_info)
@@ -78,9 +79,13 @@ def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, propertie
7879
servicex = ServiceXAdapter(did_request['service-endpoint'])
7980
servicex.post_status_update("DID Request received")
8081

82+
info = {
83+
'request-id': request_id,
84+
}
85+
8186
# Process the request and resolve the DID
8287
try:
83-
make_sync(run_file_fetch_loop)(did, servicex, user_callback)
88+
make_sync(run_file_fetch_loop)(did, servicex, info, user_callback)
8489

8590
except Exception as e:
8691
traceback.print_exc()

tests/test_communication.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import argparse
2+
import json
23
from typing import Any, AsyncGenerator, Dict
4+
from unittest.mock import ANY, MagicMock, patch
5+
36
import pika
47
import pytest
5-
from unittest.mock import ANY, patch, MagicMock
6-
import json
7-
8-
from servicex_did_finder_lib.communication import default_command_line_args, init_rabbit_mq, start_did_finder
8+
from servicex_did_finder_lib.communication import (default_command_line_args,
9+
init_rabbit_mq,
10+
start_did_finder)
911

1012

1113
class RabbitAdaptor:
@@ -96,7 +98,7 @@ def test_one_file_call(rabbitmq, SXAdaptor):
9698

9799
seen_name = None
98100

99-
async def my_callback(did_name: str):
101+
async def my_callback(did_name: str, info: Dict[str, Any]):
100102
nonlocal seen_name
101103
seen_name = did_name
102104
yield {
@@ -122,7 +124,8 @@ async def my_callback(did_name: str):
122124
def test_failed_file(rabbitmq, SXAdaptor):
123125
'Test a callback that fails before any files are sent'
124126

125-
async def my_callback(did_name: str) -> AsyncGenerator[Dict[str, Any], None]:
127+
async def my_callback(did_name: str, info: Dict[str, Any]) \
128+
-> AsyncGenerator[Dict[str, Any], None]:
126129
if False:
127130
yield {
128131
'ops': 'no'
@@ -142,7 +145,8 @@ async def my_callback(did_name: str) -> AsyncGenerator[Dict[str, Any], None]:
142145
def test_no_files_returned(rabbitmq, SXAdaptor):
143146
'Test a callback that fails before any files are sent'
144147

145-
async def my_callback(did_name: str) -> AsyncGenerator[Dict[str, Any], None]:
148+
async def my_callback(did_name: str, info: Dict[str, Any]) \
149+
-> AsyncGenerator[Dict[str, Any], None]:
146150
if False:
147151
yield {
148152
'ops': 'no'
@@ -163,7 +167,8 @@ def test_rabbitmq_connection_failure(rabbitmq_fail_once, SXAdaptor):
163167

164168
called = False
165169

166-
async def my_callback(did_name: str) -> AsyncGenerator[Dict[str, Any], None]:
170+
async def my_callback(did_name: str, info: Dict[str, Any]) \
171+
-> AsyncGenerator[Dict[str, Any], None]:
167172
nonlocal called
168173
called = True
169174
yield {
@@ -183,7 +188,8 @@ async def my_callback(did_name: str) -> AsyncGenerator[Dict[str, Any], None]:
183188
def test_auto_args_callback(init_rabbit_callback, simple_argument_parser):
184189
'If there is a missing argument on the command line it should cause a total failure'
185190

186-
async def my_callback(did_name: str) -> AsyncGenerator[Dict[str, Any], None]:
191+
async def my_callback(did_name: str, info: Dict[str, Any]) \
192+
-> AsyncGenerator[Dict[str, Any], None]:
187193
if False:
188194
yield {
189195
'ops': 'no'
@@ -217,7 +223,8 @@ def test_arg_required(init_rabbit_callback):
217223

218224
args = parser.parse_args(['--rabbit-uri', 'not-really-there'])
219225

220-
async def my_callback(did_name: str) -> AsyncGenerator[Dict[str, Any], None]:
226+
async def my_callback(did_name: str, info: Dict[str, Any]) \
227+
-> AsyncGenerator[Dict[str, Any], None]:
221228
if False:
222229
yield {
223230
'ops': 'no'

0 commit comments

Comments
 (0)