From 60f5d49386dfbdb8d487b26b1b6b3b1909ab104d Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Wed, 19 Jun 2024 18:46:23 +0200 Subject: [PATCH 1/3] feat: add support to logstash --- caso/messenger/logstash.py | 29 +++++++++++++++++++++++++---- caso/record.py | 16 ++++++++++++++++ 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/caso/messenger/logstash.py b/caso/messenger/logstash.py index 6c19632b..5795de2a 100644 --- a/caso/messenger/logstash.py +++ b/caso/messenger/logstash.py @@ -24,7 +24,10 @@ from caso import exception import caso.messenger - +#add json lib +import json +#add datetime lib +import datetime opts = [ cfg.StrOpt("host", default="localhost", help="Logstash host to send records to."), @@ -49,11 +52,29 @@ def __init__(self, host=CONF.logstash.host, port=CONF.logstash.port): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def push(self, records): + + # NOTE(acostantini): code for the serialization and push of the + # records in logstash. JSON format to be used and encoding UTF-8 + """Serialization of records to be sent to logstash""" + if not records: + return + + #Actual timestamp to be added on each record + cdt = datetime.datetime.now() + ct = int(datetime.datetime.now().timestamp()) + + #Open the connection with LS + self.sock.connect((self.host, self.port)) + """Push records to logstash using tcp.""" try: - self.sock.connect((self.host, self.port)) - for _, record in six.iteritems(records): - self.sock.sendall(record.as_json() + "\n") + for record in records: + #serialization of record + rec=record.logstash_message() + #cASO timestamp added to each record + rec['caso-timestamp']=ct + #Send the record to LS + self.sock.send((json.dumps(rec)+'\n').encode('utf-8')) except socket.error as e: raise exception.LogstashConnectionError( host=self.host, port=self.port, exception=e diff --git a/caso/record.py b/caso/record.py index 9939f01d..fc3b788f 100644 --- a/caso/record.py +++ b/caso/record.py @@ -49,6 +49,22 @@ def ssm_message(self): raise NotImplementedError("Method not implemented") + def logstash_message(self): + """Render record as the expected logstash message.""" + opts = { + "by_alias": True, + "exclude_none": True, + } + # NOTE(acostatnini): part related to the definition of the logstash message to be + # serialized before to send data + # NOTE(aloga): do not iter over the dictionary returned by record.dict() as this + # is just a dictionary representation of the object, where no serialization is + # done. In order to get objects correctly serialized we need to convert to JSON, + # then reload the model + serialized_record = json.loads(self.json(**opts)) + return serialized_record + + class _ValidCloudStatus(str, enum.Enum): """This is a private class to enum valid cloud statuses.""" From f3257178149b7236c7edafa6769ef523329b88d3 Mon Sep 17 00:00:00 2001 From: Alessandro Costantini Date: Thu, 20 Jun 2024 18:53:30 +0200 Subject: [PATCH 2/3] fix: made generic the logstash serialization function --- caso/messenger/logstash.py | 2 +- caso/record.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/caso/messenger/logstash.py b/caso/messenger/logstash.py index 5795de2a..4ff1cf91 100644 --- a/caso/messenger/logstash.py +++ b/caso/messenger/logstash.py @@ -70,7 +70,7 @@ def push(self, records): try: for record in records: #serialization of record - rec=record.logstash_message() + rec=record.serialization_message() #cASO timestamp added to each record rec['caso-timestamp']=ct #Send the record to LS diff --git a/caso/record.py b/caso/record.py index fc3b788f..de62789c 100644 --- a/caso/record.py +++ b/caso/record.py @@ -49,7 +49,7 @@ def ssm_message(self): raise NotImplementedError("Method not implemented") - def logstash_message(self): + def serialization_message(self): """Render record as the expected logstash message.""" opts = { "by_alias": True, From b3b86fa6b9654cd5a0aa1bd42692674609525f69 Mon Sep 17 00:00:00 2001 From: Alvaro Lopez Garcia Date: Fri, 27 Sep 2024 16:18:11 +0200 Subject: [PATCH 3/3] style: fix errors --- caso/messenger/logstash.py | 27 ++++++++++----------------- caso/record.py | 5 ++--- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/caso/messenger/logstash.py b/caso/messenger/logstash.py index 4ff1cf91..fc6a217f 100644 --- a/caso/messenger/logstash.py +++ b/caso/messenger/logstash.py @@ -16,18 +16,16 @@ """Module containing a Logstask cASO messenger.""" +import json +import datetime import socket from oslo_config import cfg from oslo_log import log -import six from caso import exception import caso.messenger -#add json lib -import json -#add datetime lib -import datetime + opts = [ cfg.StrOpt("host", default="localhost", help="Logstash host to send records to."), @@ -55,26 +53,21 @@ def push(self, records): # NOTE(acostantini): code for the serialization and push of the # records in logstash. JSON format to be used and encoding UTF-8 - """Serialization of records to be sent to logstash""" if not records: return - #Actual timestamp to be added on each record - cdt = datetime.datetime.now() + # Actual timestamp to be added on each record ct = int(datetime.datetime.now().timestamp()) - #Open the connection with LS + # Open the connection with LS self.sock.connect((self.host, self.port)) - """Push records to logstash using tcp.""" try: - for record in records: - #serialization of record - rec=record.serialization_message() - #cASO timestamp added to each record - rec['caso-timestamp']=ct - #Send the record to LS - self.sock.send((json.dumps(rec)+'\n').encode('utf-8')) + for record in records: + # serialization of record + rec = record.serialization_message() + rec["caso-timestamp"] = ct + self.sock.send((json.dumps(rec) + "\n").encode("utf-8")) except socket.error as e: raise exception.LogstashConnectionError( host=self.host, port=self.port, exception=e diff --git a/caso/record.py b/caso/record.py index de62789c..855e637e 100644 --- a/caso/record.py +++ b/caso/record.py @@ -48,15 +48,14 @@ def ssm_message(self): """Render record as the expected SSM message.""" raise NotImplementedError("Method not implemented") - def serialization_message(self): """Render record as the expected logstash message.""" opts = { "by_alias": True, "exclude_none": True, } - # NOTE(acostatnini): part related to the definition of the logstash message to be - # serialized before to send data + # NOTE(acostatnini): part related to the definition of the logstash message to + # be serialized before to send data # NOTE(aloga): do not iter over the dictionary returned by record.dict() as this # is just a dictionary representation of the object, where no serialization is # done. In order to get objects correctly serialized we need to convert to JSON,