Skip to content

Commit 15328b2

Browse files
committed
Add support for --prefix of returned files
1 parent a60b683 commit 15328b2

File tree

3 files changed

+36
-6
lines changed

3 files changed

+36
-6
lines changed

src/servicex_did_finder_lib/communication.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[st
5757
servicex.post_status_update(f'Completed load of file in {elapsed_time} seconds')
5858

5959

60-
def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, properties, body):
60+
def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, properties, body,
61+
file_prefix=None):
6162
'''rabbit_mq_callback Respond to RabbitMQ Message
6263
6364
When a request to resolve a DID comes into the DID finder, we
@@ -70,6 +71,7 @@ def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, propertie
7071
method ([type]): Delivery method
7172
properties ([type]): Properties of the message
7273
body ([type]): The body (json for us) of the message
74+
file_prefix([str]): Prefix to put in front of file paths to enable use of Cache service
7375
'''
7476
request_id = None # set this in case we get an exception while loading request
7577
try:
@@ -78,7 +80,7 @@ def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, propertie
7880
did = did_request['did']
7981
request_id = did_request['request_id']
8082
__logging.info(f'Received DID request {did_request}', extra={'requestId': request_id})
81-
servicex = ServiceXAdapter(did_request['service-endpoint'])
83+
servicex = ServiceXAdapter(did_request['service-endpoint'], file_prefix)
8284
servicex.post_status_update("DID Request received")
8385

8486
info = {
@@ -105,7 +107,9 @@ def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, propertie
105107

106108

107109
def init_rabbit_mq(user_callback: UserDIDHandler,
108-
rabbitmq_url: str, queue_name: str, retries: int, retry_interval: float):
110+
rabbitmq_url: str, queue_name: str, retries: int,
111+
retry_interval: float,
112+
file_prefix: str = None):
109113
rabbitmq = None
110114
retry_count = 0
111115

@@ -120,7 +124,7 @@ def init_rabbit_mq(user_callback: UserDIDHandler,
120124
_channel.basic_consume(queue=queue_name,
121125
auto_ack=False,
122126
on_message_callback=lambda c, m, p, b:
123-
rabbit_mq_callback(user_callback, c, m, p, b))
127+
rabbit_mq_callback(user_callback, c, m, p, b, file_prefix))
124128
_channel.start_consuming()
125129

126130
except pika.exceptions.AMQPConnectionError: # type: ignore

src/servicex_did_finder_lib/servicex_adaptor.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@
3535

3636

3737
class ServiceXAdapter:
38-
def __init__(self, endpoint):
38+
def __init__(self, endpoint, file_prefix=None):
3939
self.endpoint = endpoint
40+
self.file_prefix = file_prefix
4041

4142
self.logger = logging.getLogger(__name__)
4243
self.logger.addHandler(logging.NullHandler())
@@ -61,14 +62,17 @@ def post_status_update(self, status_msg, severity="info"):
6162
self.logger.error(f'After {attempts} tries, failed to send ServiceX App a status '
6263
f'message: {str(status_msg)} - Ignoring error.')
6364

65+
def _prefix_file(self, file_path):
66+
return file_path if not self.file_prefix else self.file_prefix+file_path
67+
6468
def put_file_add(self, file_info):
6569
success = False
6670
attempts = 0
6771
while not success and attempts < MAX_RETRIES:
6872
try:
6973
mesg = {
7074
"timestamp": datetime.now().isoformat(),
71-
"file_path": file_info['file_path'],
75+
"file_path": self._prefix_file(file_info['file_path']),
7276
'adler32': file_info['adler32'],
7377
'file_size': file_info['file_size'],
7478
'file_events': file_info['file_events']

tests/servicex_did_finder_lib/test_servicex_did.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1+
import json
2+
3+
import responses
14
from servicex_did_finder_lib import __version__
5+
from servicex_did_finder_lib.servicex_adaptor import ServiceXAdapter
26

37

48
def test_version():
@@ -23,3 +27,21 @@ def test_put_file_add():
2327
assert submitted['file_events'] == 3141
2428
assert submitted['file_size'] == 1024
2529

30+
31+
@responses.activate
32+
def test_put_file_add_with_prefix():
33+
responses.add(responses.PUT, 'http://servicex.org/files', status=206)
34+
sx = ServiceXAdapter("http://servicex.org", "xcache123:")
35+
sx.put_file_add({
36+
'file_path': 'root://foo.bar.ROOT',
37+
'adler32': '32',
38+
'file_size': 1024,
39+
'file_events': 3141
40+
})
41+
42+
assert len(responses.calls) == 1
43+
submitted = json.loads(responses.calls[0].request.body)
44+
assert submitted['file_path'] == 'xcache123:root://foo.bar.ROOT'
45+
assert submitted['adler32'] == '32'
46+
assert submitted['file_events'] == 3141
47+
assert submitted['file_size'] == 1024

0 commit comments

Comments
 (0)