Skip to content

Commit 17d2bf6

Browse files
authored
[CLOUDS-4012] Fix service tag setting in lambda forwarder (#714)
* [CLOUDS-4012] Fix service tag setting in lambda forwarder If the service tag in set in dd_tags use it, otherwise use the one coming from the logGroup If none of the above, check if the tag is set on the lambda forwarder and use it, otherwise use the function name. Finally, check if the service tag is set in the application level and override it. Also: - Update Itests, Utests - Add __init__.py to create a package for running utests inside the IDE
1 parent cc3236c commit 17d2bf6

File tree

9 files changed

+390
-37
lines changed

9 files changed

+390
-37
lines changed

aws/logs_monitoring/.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
*.zip
22
tools/layers
3-
.forwarder
3+
.forwarder

aws/logs_monitoring/lambda_cache.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,16 @@ def get(self, key):
7373
Returns:
7474
lambda_tags (str[]): the list of "key:value" Datadog tag strings
7575
"""
76+
if not self.should_fetch_tags():
77+
logger.debug(
78+
"Not fetching lambda function tags because the env variable DD_FETCH_LAMBDA_TAGS is "
79+
"not set to true"
80+
)
81+
return []
82+
7683
if self._is_expired():
7784
send_forwarder_internal_metrics("local_cache_expired")
7885
logger.debug("Local cache expired, fetching cache from S3")
7986
self._refresh()
8087

81-
function_tags = self.tags_by_id.get(key, [])
82-
return function_tags
88+
return self.tags_by_id.get(key, [])

aws/logs_monitoring/lambda_function.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -257,14 +257,21 @@ def add_metadata_to_lambda_log(event):
257257
# Get custom tags of the Lambda function
258258
custom_lambda_tags = get_enriched_lambda_log_tags(event)
259259

260-
# Set the `service` tag and metadata field. If the Lambda function is
261-
# tagged with a `service` tag, use it, otherwise use the function name.
262-
service_tag = next(
263-
(tag for tag in custom_lambda_tags if tag.startswith("service:")),
264-
f"service:{function_name}",
265-
)
266-
tags.append(service_tag)
267-
event[DD_SERVICE] = service_tag.split(":")[1]
260+
# If not set during parsing or has a default value
261+
# then set the service tag from lambda tags cache or using the function name
262+
# otherwise, remove the service tag from the custom lambda tags if exists to avoid duplication
263+
if not event[DD_SERVICE] or event[DD_SERVICE] == event[DD_SOURCE]:
264+
service_tag = next(
265+
(tag for tag in custom_lambda_tags if tag.startswith("service:")),
266+
f"service:{function_name}",
267+
)
268+
if service_tag:
269+
tags.append(service_tag)
270+
event[DD_SERVICE] = service_tag.split(":")[1]
271+
else:
272+
custom_lambda_tags = [
273+
tag for tag in custom_lambda_tags if not tag.startswith("service:")
274+
]
268275

269276
# Check if one of the Lambda's custom tags is env
270277
# If an env tag exists, remove the env:none placeholder
@@ -319,6 +326,22 @@ def extract_ddtags_from_message(event):
319326
if logger.isEnabledFor(logging.DEBUG):
320327
logger.debug(f"Failed to extract ddtags from: {event}")
321328
return
329+
330+
# Extract service tag from message.ddtags if exists
331+
if "service" in extracted_ddtags:
332+
event[DD_SERVICE] = next(
333+
tag[8:]
334+
for tag in extracted_ddtags.split(",")
335+
if tag.startswith("service:")
336+
)
337+
event[DD_CUSTOM_TAGS] = ",".join(
338+
[
339+
tag
340+
for tag in event[DD_CUSTOM_TAGS].split(",")
341+
if not tag.startswith("service")
342+
]
343+
)
344+
322345
event[DD_CUSTOM_TAGS] = f"{event[DD_CUSTOM_TAGS]},{extracted_ddtags}"
323346

324347

aws/logs_monitoring/parsing.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ def s3_handler(event, context, metadata):
186186
source = "transitgateway"
187187
metadata[DD_SOURCE] = source
188188

189-
metadata[DD_SERVICE] = get_service_from_tags(metadata)
189+
metadata[DD_SERVICE] = get_service_from_tags_and_remove_duplicates(metadata)
190190

191191
##Get the ARN of the service and set it as the hostname
192192
hostname = parse_service_arn(source, key, bucket, context)
@@ -242,15 +242,21 @@ def s3_handler(event, context, metadata):
242242
yield structured_line
243243

244244

245-
def get_service_from_tags(metadata):
246-
# Get service from dd_custom_tags if it exists
245+
def get_service_from_tags_and_remove_duplicates(metadata):
246+
service = ""
247247
tagsplit = metadata[DD_CUSTOM_TAGS].split(",")
248-
for tag in tagsplit:
248+
for i, tag in enumerate(tagsplit):
249249
if tag.startswith("service:"):
250-
return tag[8:]
250+
if service:
251+
# remove duplicate entry from the tags
252+
del tagsplit[i]
253+
else:
254+
service = tag[8:]
255+
256+
metadata[DD_CUSTOM_TAGS] = ",".join(tagsplit)
251257

252258
# Default service to source value
253-
return metadata[DD_SOURCE]
259+
return service if service else metadata[DD_SOURCE]
254260

255261

256262
def parse_event_source(event, key):
@@ -530,7 +536,7 @@ def awslogs_handler(event, context, metadata):
530536

531537
# Set service from custom tags, which may include the tags set on the log group
532538
# Returns DD_SOURCE by default
533-
metadata[DD_SERVICE] = get_service_from_tags(metadata)
539+
metadata[DD_SERVICE] = get_service_from_tags_and_remove_duplicates(metadata)
534540

535541
# Set host as log group where cloudwatch is source
536542
if metadata[DD_SOURCE] == "cloudwatch" or metadata.get(DD_HOST, None) == None:
@@ -640,7 +646,7 @@ def cwevent_handler(event, metadata):
640646
else:
641647
metadata[DD_SOURCE] = "cloudwatch"
642648

643-
metadata[DD_SERVICE] = get_service_from_tags(metadata)
649+
metadata[DD_SERVICE] = get_service_from_tags_and_remove_duplicates(metadata)
644650

645651
yield data
646652

aws/logs_monitoring/tests/test_lambda_function.py

Lines changed: 197 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from time import time
99
from botocore.exceptions import ClientError
1010
from approvaltests.approvals import verify_as_json
11+
from importlib import reload
1112

1213
sys.modules["trace_forwarder.connection"] = MagicMock()
1314
sys.modules["datadog_lambda.wrapper"] = MagicMock()
@@ -34,6 +35,7 @@
3435
enrich,
3536
transform,
3637
split,
38+
extract_ddtags_from_message,
3739
)
3840
from parsing import parse, parse_event_type
3941

@@ -130,12 +132,8 @@ def create_cloudwatch_log_event_from_data(data):
130132

131133

132134
class TestLambdaFunctionEndToEnd(unittest.TestCase):
133-
@patch("cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.get")
134-
@patch("base_tags_cache.send_forwarder_internal_metrics")
135135
@patch("enhanced_lambda_metrics.LambdaTagsCache.get_cache_from_s3")
136-
def test_datadog_forwarder(
137-
self, mock_get_s3_cache, mock_forward_metrics, cw_logs_tags_get
138-
):
136+
def test_datadog_forwarder(self, mock_get_s3_cache):
139137
mock_get_s3_cache.return_value = (
140138
{
141139
"arn:aws:lambda:sa-east-1:601427279990:function:inferred-spans-python-dev-initsender": [
@@ -149,15 +147,7 @@ def test_datadog_forwarder(
149147
time(),
150148
)
151149
context = Context()
152-
my_path = os.path.abspath(os.path.dirname(__file__))
153-
path = os.path.join(my_path, "events/cloudwatch_logs.json")
154-
155-
with open(
156-
path,
157-
"r",
158-
) as input_file:
159-
input_data = input_file.read()
160-
150+
input_data = self._get_input_data()
161151
event = {"awslogs": {"data": create_cloudwatch_log_event_from_data(input_data)}}
162152
os.environ["DD_FETCH_LAMBDA_TAGS"] = "True"
163153

@@ -170,7 +160,7 @@ def test_datadog_forwarder(
170160

171161
verify_as_json(transformed_events)
172162

173-
metrics, logs, trace_payloads = split(transformed_events)
163+
_, _, trace_payloads = split(transformed_events)
174164
self.assertEqual(len(trace_payloads), 1)
175165

176166
trace_payload = json.loads(trace_payloads[0]["message"])
@@ -204,6 +194,98 @@ def test_datadog_forwarder(
204194

205195
del os.environ["DD_FETCH_LAMBDA_TAGS"]
206196

197+
@patch("cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.get")
198+
def test_setting_service_tag_from_log_group_cache(self, cw_logs_tags_get):
199+
reload(sys.modules["settings"])
200+
reload(sys.modules["parsing"])
201+
cw_logs_tags_get.return_value = ["service:log_group_service"]
202+
context = Context()
203+
input_data = self._get_input_data()
204+
event = {"awslogs": {"data": create_cloudwatch_log_event_from_data(input_data)}}
205+
206+
normalized_events = parse(event, context)
207+
enriched_events = enrich(normalized_events)
208+
transformed_events = transform(enriched_events)
209+
210+
_, logs, _ = split(transformed_events)
211+
self.assertEqual(len(logs), 16)
212+
for log in logs:
213+
self.assertEqual(log["service"], "log_group_service")
214+
215+
@patch.dict(os.environ, {"DD_TAGS": "service:dd_tag_service"}, clear=True)
216+
@patch("cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.get")
217+
def test_service_override_from_dd_tags(self, cw_logs_tags_get):
218+
reload(sys.modules["settings"])
219+
reload(sys.modules["parsing"])
220+
cw_logs_tags_get.return_value = ["service:log_group_service"]
221+
context = Context()
222+
input_data = self._get_input_data()
223+
event = {"awslogs": {"data": create_cloudwatch_log_event_from_data(input_data)}}
224+
225+
normalized_events = parse(event, context)
226+
enriched_events = enrich(normalized_events)
227+
transformed_events = transform(enriched_events)
228+
229+
_, logs, _ = split(transformed_events)
230+
self.assertEqual(len(logs), 16)
231+
for log in logs:
232+
self.assertEqual(log["service"], "dd_tag_service")
233+
234+
@patch("lambda_cache.LambdaTagsCache.get")
235+
@patch("cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.get")
236+
def test_overrding_service_tag_from_lambda_cache(
237+
self, lambda_tags_get, cw_logs_tags_get
238+
):
239+
lambda_tags_get.return_value = ["service:lambda_service"]
240+
cw_logs_tags_get.return_value = ["service:log_group_service"]
241+
242+
context = Context()
243+
input_data = self._get_input_data()
244+
event = {"awslogs": {"data": create_cloudwatch_log_event_from_data(input_data)}}
245+
246+
normalized_events = parse(event, context)
247+
enriched_events = enrich(normalized_events)
248+
transformed_events = transform(enriched_events)
249+
250+
_, logs, _ = split(transformed_events)
251+
self.assertEqual(len(logs), 16)
252+
for log in logs:
253+
self.assertEqual(log["service"], "lambda_service")
254+
255+
@patch.dict(os.environ, {"DD_TAGS": "service:dd_tag_service"}, clear=True)
256+
@patch("lambda_cache.LambdaTagsCache.get")
257+
@patch("cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.get")
258+
def test_overrding_service_tag_from_lambda_cache_when_dd_tags_is_set(
259+
self, lambda_tags_get, cw_logs_tags_get
260+
):
261+
lambda_tags_get.return_value = ["service:lambda_service"]
262+
cw_logs_tags_get.return_value = ["service:log_group_service"]
263+
264+
context = Context()
265+
input_data = self._get_input_data()
266+
event = {"awslogs": {"data": create_cloudwatch_log_event_from_data(input_data)}}
267+
268+
normalized_events = parse(event, context)
269+
enriched_events = enrich(normalized_events)
270+
transformed_events = transform(enriched_events)
271+
272+
_, logs, _ = split(transformed_events)
273+
self.assertEqual(len(logs), 16)
274+
for log in logs:
275+
self.assertEqual(log["service"], "lambda_service")
276+
277+
def _get_input_data(self):
278+
my_path = os.path.abspath(os.path.dirname(__file__))
279+
path = os.path.join(my_path, "events/cloudwatch_logs.json")
280+
281+
with open(
282+
path,
283+
"r",
284+
) as input_file:
285+
input_data = input_file.read()
286+
287+
return input_data
288+
207289

208290
class TestLambdaFunctionExtractTracePayload(unittest.TestCase):
209291
def test_extract_trace_payload_none_no_trace(self):
@@ -234,5 +316,105 @@ def test_extract_trace_payload_valid_trace(self):
234316
)
235317

236318

319+
class TestMergeMessageTags(unittest.TestCase):
320+
message_tags = '{"ddtags":"service:my_application_service,custom_tag_1:value1"}'
321+
custom_tags = "custom_tag_2:value2,service:my_custom_service"
322+
323+
def test_extract_ddtags_from_message_str(self):
324+
event = {
325+
"message": self.message_tags,
326+
"ddtags": self.custom_tags,
327+
"service": "my_service",
328+
}
329+
330+
extract_ddtags_from_message(event)
331+
332+
self.assertEqual(
333+
event["ddtags"],
334+
"custom_tag_2:value2,service:my_application_service,custom_tag_1:value1",
335+
)
336+
self.assertEqual(
337+
event["service"],
338+
"my_application_service",
339+
)
340+
341+
def test_extract_ddtags_from_message_dict(self):
342+
loaded_message_tags = json.loads(self.message_tags)
343+
event = {
344+
"message": loaded_message_tags,
345+
"ddtags": self.custom_tags,
346+
"service": "my_service",
347+
}
348+
349+
extract_ddtags_from_message(event)
350+
351+
self.assertEqual(
352+
event["ddtags"],
353+
"custom_tag_2:value2,service:my_application_service,custom_tag_1:value1",
354+
)
355+
self.assertEqual(
356+
event["service"],
357+
"my_application_service",
358+
)
359+
360+
def test_extract_ddtags_from_message_service_tag_setting(self):
361+
loaded_message_tags = json.loads(self.message_tags)
362+
loaded_message_tags["ddtags"] = ",".join(
363+
[
364+
tag
365+
for tag in loaded_message_tags["ddtags"].split(",")
366+
if not tag.startswith("service:")
367+
]
368+
)
369+
event = {
370+
"message": loaded_message_tags,
371+
"ddtags": self.custom_tags,
372+
"service": "my_custom_service",
373+
}
374+
375+
extract_ddtags_from_message(event)
376+
377+
self.assertEqual(
378+
event["ddtags"],
379+
"custom_tag_2:value2,service:my_custom_service,custom_tag_1:value1",
380+
)
381+
self.assertEqual(
382+
event["service"],
383+
"my_custom_service",
384+
)
385+
386+
def test_extract_ddtags_from_message_multiple_service_tag_values(self):
387+
custom_tags = self.custom_tags + ",service:my_custom_service_2"
388+
event = {"message": self.message_tags, "ddtags": custom_tags}
389+
390+
extract_ddtags_from_message(event)
391+
392+
self.assertEqual(
393+
event["ddtags"],
394+
"custom_tag_2:value2,service:my_application_service,custom_tag_1:value1",
395+
)
396+
self.assertEqual(
397+
event["service"],
398+
"my_application_service",
399+
)
400+
401+
def test_extract_ddtags_from_message_multiple_values_tag(self):
402+
loaded_message_tags = json.loads(self.message_tags)
403+
loaded_message_tags["ddtags"] += ",custom_tag_3:value4"
404+
custom_tags = self.custom_tags + ",custom_tag_3:value3"
405+
event = {"message": loaded_message_tags, "ddtags": custom_tags}
406+
407+
extract_ddtags_from_message(event)
408+
409+
self.assertEqual(
410+
event["ddtags"],
411+
"custom_tag_2:value2,custom_tag_3:value3,service:my_application_service,custom_tag_1:value1,custom_tag_3:value4",
412+
)
413+
self.assertEqual(
414+
event["service"],
415+
"my_application_service",
416+
)
417+
418+
237419
if __name__ == "__main__":
238420
unittest.main()

0 commit comments

Comments
 (0)