3
3
Monitor certificates of services that require STARTTLS and return a JSON formatted sensor result.
4
4
5
5
This custom Python script sensor is used to monitor certificates of services that require STARTTLS
6
- to initiate a secure transport. It takes the same parameter as the PRTG built-in sensor `SSL Certificate`
7
- but additionally requires the protocol the sensor must use to communicate with the remote endpoint.
6
+ to initiate a secure transport. It takes the same parameter as the PRTG built-in sensor
7
+ `SSL Certificate` but additionally requires the protocol the sensor must use to communicate with
8
+ the remote endpoint.
8
9
The list of protocols is currently limited to `SMTP`, `LMTP`, and `LDAP`.
9
10
10
11
The sensor result in JSON contains the same channels as the `SSL Certificate` sensor with channel
11
12
`Days to Expiration` set as primary channel.
12
13
13
14
Keyword for additional parameters:
14
15
port -- Port for the connection to the target endpoint (default: 25)
15
- protocol -- Protocol used for the connection to the target endpoint (default: smtp)
16
+ protocol -- Protocol used with the connection (default: smtp)
16
17
Implemented protocols: smtp, lmtp, ldap
17
18
cert_domainname -- Common Name as contained in the certificate
18
19
cert_domainname_validation -- Type of validation of the cert domain name (default: None)
33
34
from cryptography .hazmat .backends import default_backend
34
35
from cryptography .hazmat .primitives import hashes
35
36
36
-
37
- from prtg .sensor .result import CustomSensorResult
37
+ from prtg .sensor .result import CustomSensorResult
38
38
from prtg .sensor .units import ValueUnit
39
39
40
40
class Protocol (Enum ):
41
+ """Application Layer protocols
42
+ """
41
43
SMTP = 1
42
44
LMTP = 2
43
45
IMAP = 3
44
46
LDAP = 4
45
47
46
48
class Validation (Enum ):
49
+ """Certificate Name validations
50
+ """
47
51
NONE = 1
48
52
CN = 2
49
53
CN_SAN = 3
50
54
51
55
def prtg_params_dict (params : str ) -> dict :
52
56
"""
53
57
prtg_params_dict - Converts a PRTG params string into a dictionary.
54
-
58
+
55
59
It takes the params string and converts it via json into a dictionary. The solution is based
56
60
on Stack Overflow (https://stackoverflow.com/questions/47663809/python-convert-string-to-dict)
57
61
"""
58
-
62
+
59
63
_params = '{' + params .strip () + '}'
60
64
_params_json_string = ''
61
65
62
66
# Remove surrounding spaces arround separator chars
63
- _params_stripped = re .sub (r'\s*([:,])\s*' , '\g<1>' , _params )
67
+ _params_stripped = re .sub (r'\s*([:,])\s*' , r '\g<1>' , _params )
64
68
_params_json_string = re .sub (r'([:,])' , r'"\g<1>"' , _params_stripped )
65
69
_params_json_string = re .sub (r'{' , r'{"' , _params_json_string )
66
70
_params_json_string = re .sub (r'}' , r'"}' , _params_json_string )
67
-
71
+
68
72
return json .loads (_params_json_string )
69
73
70
- def starttls_getpeercert (host : str , port : int , starttls_proto : Protocol , cert_hostname = None , timeout = 3.0 , msglen = 4096 ) -> dict :
74
+ def starttls_getpeercert (address ,
75
+ starttls_proto : Protocol ,
76
+ cert_hostname = None ,
77
+ timeout = 3.0 , msglen = 4096 ) -> dict :
71
78
"""
72
79
starttls_getpeercert - Retrieves the certificate of the other side of the connection.
73
80
74
81
@Returns: Certificate dict with selfSigned? and rootAuthorityTrusted? mixed in.
75
82
76
- @NOTE: Dictionary keys
83
+ @NOTE: Dictionary keys:
77
84
subject (string; distinguished name form)
78
85
issuer (string; distinguished name form)
79
86
version (int)
@@ -91,23 +98,28 @@ def starttls_getpeercert(host: str, port: int, starttls_proto: Protocol, cert_ho
91
98
rootAuthorityTrusted? (boolean)
92
99
93
100
Ref: https://stackoverflow.com/questions/5108681/use-python-to-get-an-smtp-server-certificate
94
- REf: https://stackoverflow.com/questions/71114085/how-can-i-retrieve-openldap-servers-starttls-certificate-with-pythons-ssl-libr
101
+ REf: https://stackoverflow.com/questions/71114085/how-can-i-retrieve-openldap-servers-starttls-
102
+ certificate-with-pythons-ssl-libr
95
103
"""
96
104
97
- if cert_hostname == None :
98
- sni_hostname = host
105
+ if cert_hostname is None :
106
+ sni_hostname = address [ 0 ]
99
107
else :
100
108
sni_hostname = cert_hostname
101
-
102
- # Request #1: Get certificate data (incl. self-issued certs) with context in verify_mode == CERT_NONE
103
- # The SSLContext is created with the class constructor since retrieving self-signed certs require
104
- # loosened SSL settings. To fetch self-signed certs verify_mode MUST be set to CERT_NONE which
105
- # requires disabling hostname checking.
109
+
110
+ # Request #1: Get certificate data (incl. self-issued certs) with verify_mode set to CERT_NONE
111
+ # in the created context
112
+ # The SSLContext is created with the class constructor since retrieving self-signed certs
113
+ # require loosened SSL settings. To fetch self-signed certs verify_mode MUST be set to
114
+ # CERT_NONE which requires disabling hostname checking.
106
115
ctx = ssl .SSLContext (ssl .PROTOCOL_TLS_CLIENT )
107
116
ctx .check_hostname = False
108
- ctx .verify_mode = ssl .VerifyMode . CERT_NONE
117
+ ctx .verify_mode = ssl .CERT_NONE
109
118
110
- raw_sock = _starttls_do_service_handshake ((host , port ), starttls_proto , timeout = timeout , msglen = msglen )
119
+ raw_sock = _starttls_do_service_handshake (address ,
120
+ starttls_proto ,
121
+ timeout = timeout ,
122
+ msglen = msglen )
111
123
with ctx .wrap_socket (raw_sock , server_hostname = sni_hostname ) as ssl_sock :
112
124
cert_der = ssl_sock .getpeercert (binary_form = True )
113
125
cert = x509 .load_der_x509_certificate (cert_der , default_backend ())
@@ -119,27 +131,33 @@ def starttls_getpeercert(host: str, port: int, starttls_proto: Protocol, cert_ho
119
131
if cert_dict ['selfSigned?' ]:
120
132
cert_dict ['rootAuthorityTrusted?' ] = False
121
133
else :
122
- ctx_trust_check = ssl .create_default_context ()
123
- raw_sock_trust_check = _starttls_do_service_handshake ((host , port ), starttls_proto , timeout = timeout , msglen = msglen )
134
+ ctx = ssl .create_default_context ()
135
+ raw_sock = _starttls_do_service_handshake (address ,
136
+ starttls_proto ,
137
+ timeout = timeout ,
138
+ msglen = msglen )
124
139
try :
125
- ssl_sock_trust_check = ctx_trust_check .wrap_socket (raw_sock_trust_check , server_hostname = sni_hostname )
140
+ ssl_sock = ctx .wrap_socket (raw_sock , server_hostname = sni_hostname )
126
141
cert_dict ['rootAuthorityTrusted?' ] = True
127
- ssl_sock_trust_check .close ()
142
+ ssl_sock .close ()
128
143
except ssl .SSLCertVerificationError :
129
144
cert_dict ['rootAuthorityTrusted?' ] = False
130
145
131
146
return cert_dict
132
147
133
- def _starttls_do_service_handshake (address , starttls_proto : Protocol , timeout = 3.0 , msglen = 4096 ) -> socket .socket :
148
+ def _starttls_do_service_handshake (address ,
149
+ starttls_proto : Protocol ,
150
+ timeout = 3.0 ,
151
+ msglen = 4096 ) -> socket .socket :
134
152
"""
135
153
starttls_do_service_handshake - Perform the service handshake to initiate a TLS connection
136
154
137
155
@Returns: socket.socket
138
156
"""
139
157
# Protocol.LDAP sends a LDAP_START_TLS_OID - sniffed with Wireshark
140
158
protocol_greeters = {
141
- Protocol .SMTP : bytes (" EHLO {0} \n STARTTLS \n " . format ( socket .gethostname ()) , 'ascii' ),
142
- Protocol .LMTP : bytes (" LHLO {0} \n STARTTLS \n " . format ( socket .gethostname ()) , 'ascii' ),
159
+ Protocol .SMTP : bytes (f' EHLO { socket .gethostname ()} \n STARTTLS \n ' , 'ascii' ),
160
+ Protocol .LMTP : bytes (f' LHLO { socket .gethostname ()} \n STARTTLS \n ' , 'ascii' ),
143
161
Protocol .LDAP : b'\x30 \x1d \x02 \x01 \x01 \x77 \x18 \x80 \x16 \x31 \x2e \x33 \x2e \x36 \x2e \x31 ' \
144
162
b'\x2e \x34 \x2e \x31 \x2e \x31 \x34 \x36 \x36 \x2e \x32 \x30 \x30 \x33 \x37 '
145
163
}
@@ -164,14 +182,17 @@ def _starttls_do_service_handshake(address, starttls_proto: Protocol, timeout=3.
164
182
raw_sock .send (protocol_greeting )
165
183
# Look for \x0a\x01 {result code} \x04\x00\x04\x00 - if the second \x04 is not followed
166
184
# by \x00 then it seems that the server support STARTTLS but has no cert installed
167
- _ldap_tls_ext_response = raw_sock .recv (msglen )
168
- _ldap_tls_ext_response_result_pos = _ldap_tls_ext_response .find (b'\x0a \x01 ' )
169
- _ldap_tls_ext_response_result_trail = _ldap_tls_ext_response .find (b'\x04 \x00 \x04 \x00 ' , _ldap_tls_ext_response_result_pos )
185
+ _ldap_response = raw_sock .recv (msglen )
186
+ _ldap_response_result_pos = _ldap_response .find (b'\x0a \x01 ' )
187
+ _ldap_response_result_trail = _ldap_response .find (b'\x04 \x00 \x04 \x00 ' ,
188
+ _ldap_response_result_pos )
170
189
171
- if not (_ldap_tls_ext_response_result_trail - _ldap_tls_ext_response_result_pos == 3 and
172
- _ldap_tls_ext_response [ _ldap_tls_ext_response_result_pos + 2 ] == 0 ):
190
+ if not (_ldap_response_result_trail - _ldap_response_result_pos == 3 and
191
+ _ldap_response [ _ldap_response_result_pos + 2 ] == 0 ):
173
192
raw_sock .close ()
174
- raise OSError ("LDAP Server does not support LDAP_START_TLS_OID or has no certificate installed." )
193
+ _err_msg = "LDAP Server does not support LDAP_START_TLS_OID "
194
+ _err_msg += "or has no certificate installed."
195
+ raise OSError (_err_msg )
175
196
176
197
return raw_sock
177
198
@@ -207,12 +228,13 @@ def _starttls_cert_dict(cert) -> dict:
207
228
try :
208
229
_extension = cert .extensions .get_extension_for_oid (_extension_oid )
209
230
if _dict_key == 'subjectAltName' :
210
- cert_dict [_dict_key ] = [ ('DNS' , _dnsname ) for _dnsname in _extension .value .get_values_for_type (x509 .DNSName ) ]
231
+ _extension_subject_alt_names = _extension .value .get_values_for_type (x509 .DNSName )
232
+ cert_dict [_dict_key ] = [ ('DNS' , _name ) for _name in _extension_subject_alt_names ]
211
233
if _dict_key == 'crlDistributionPoints' :
212
234
cert_dict [_dict_key ] = [ _crldp .full_name [0 ].value for _crldp in _extension .value ]
213
235
except x509 .ExtensionNotFound :
214
236
pass
215
-
237
+
216
238
# Cert extension data - Authority Access Info Methods (OCSP, caIssuers)
217
239
_extension_oid = x509 .ObjectIdentifier ('1.3.6.1.5.5.7.1.1' )
218
240
_authority_access_method_oids = [
@@ -224,23 +246,24 @@ def _starttls_cert_dict(cert) -> dict:
224
246
while len (_authority_access_method_oids ) > 0 :
225
247
_access_method_oid , _dict_key = _authority_access_method_oids .pop (0 )
226
248
for _access_description in _extension .value :
227
- try :
228
- if _access_description .access_method == _access_method_oid :
229
- cert_dict [_dict_key ] = _access_description .access_location .value
230
- break
231
- except :
232
- pass
249
+ if _access_description .access_method == _access_method_oid :
250
+ cert_dict [_dict_key ] = _access_description .access_location .value
251
+ break
233
252
except x509 .ExtensionNotFound :
234
253
pass
235
254
236
255
return cert_dict
237
256
238
- def _prtg_cert_cncheck_result (certificate : dict , validation_mode : Validation , cert_hostname : str ) -> int :
257
+ def _prtg_cert_cncheck_result (certificate : dict ,
258
+ validation_mode : Validation ,
259
+ cert_hostname : str ) -> int :
239
260
"""
240
- prtg_cert_cncheck_result - Returns the proper result value based on the sensor parameter cert_domainname_validation
261
+ prtg_cert_cncheck_result - Returns the proper result value based on cert_domainname_validation
241
262
242
- The return value matches the expected result specified in the default overlay prtg.standardlookups.sslcertificatesensor.cncheck.
243
- This script DOES NOT return all values since SNI check and common_name check are considered interchangeable.
263
+ The return value matches the expected result specified in the default overlay
264
+ prtg.standardlookups.sslcertificatesensor.cncheck.
265
+ This script DOES NOT return all values since SNI check and common_name check
266
+ are considered interchangeable.
244
267
245
268
Expected result values defined in overlay:
246
269
Value 0: State Ok, Matches device address (Validation mode: cn)
@@ -262,10 +285,10 @@ def _prtg_cert_cncheck_result(certificate: dict, validation_mode: Validation, ce
262
285
if cert_hostname .strip ().lower () == certificate ["commonName" ].strip ().lower ():
263
286
result_value = 5
264
287
if "subjectAltName" in certificate .keys ():
265
- _dns_names = [altname_tuple [1 ] for altname_tuple in certificate ["subjectAltName" ] if altname_tuple [0 ] == "DNS" ]
266
- if cert_hostname .strip ().lower () in _dns_names :
288
+ _names = [_tuple [1 ] for _tuple in certificate ["subjectAltName" ] if _tuple [0 ] == "DNS" ]
289
+ if cert_hostname .strip ().lower () in _names :
267
290
result_value = 5
268
- # If after all checks result_value is still 2 (disabled) - correct it to the propper check error value
291
+ # If after all checks result_value is still 2 (disabled) - correct it to the error value
269
292
if result_value == 2 :
270
293
result_value = 6
271
294
@@ -280,17 +303,23 @@ def main():
280
303
in order to start a secure connection.
281
304
"""
282
305
try :
306
+ # Basic argument check
307
+ if len (sys .argv ) < 2 :
308
+ raise TypeError ("JSON object argument missing." )
283
309
data = json .loads (sys .argv [1 ])
310
+
311
+ if "params" not in data .keys ():
312
+ raise TypeError ("Key 'params' missing in JSON object." )
284
313
params = prtg_params_dict (data ["params" ])
285
- _now = datetime .datetime .now ()
286
314
287
- cert = starttls_getpeercert (
288
- data ["host" ],
289
- int (params .get ("port" , "25" )),
290
- Protocol [params .get ("protocol" , "smtp" ).upper ()],
291
- cert_hostname = params .get ("cert_domainname" , data ["host" ]))
315
+ _now = datetime .datetime .now ()
316
+ cert = starttls_getpeercert ((data ["host" ], int (params .get ("port" , "25" ))),
317
+ Protocol [params .get ("protocol" , "smtp" ).upper ()],
318
+ cert_hostname = params .get ("cert_domainname" , data ["host" ]))
292
319
293
- csr_text_ok = "OK. Certificate Common Name: {} - Certificate Thumbprint: {} - STARTTLS Protocol: {}"
320
+ csr_text_ok = "OK. Certificate Common Name: {} - "
321
+ csr_text_ok += "Certificate Thumbprint: {} - "
322
+ csr_text_ok += "STARTTLS Protocol: {}"
294
323
csr = CustomSensorResult (text = csr_text_ok .format (cert ['commonName' ],
295
324
cert ['fingerprint' ],
296
325
params .get ('protocol' , "smtp" ).upper ()))
@@ -307,9 +336,9 @@ def main():
307
336
limit_warning_msg = "Certificate will expire soon." )
308
337
309
338
# Channel _Common Name Check_
310
- _prtg_cncheck_value = _prtg_cert_cncheck_result ( cert ,
311
- Validation [ params .get ("cert_domainname_validation " , "NONE" ). upper ()],
312
- params . get ( "cert_domainname" , data [ "host" ]) )
339
+ _validation = Validation [ params . get ( "cert_domainname_validation" , "NONE" ). upper ()]
340
+ _sni_domainname = params .get ("cert_domainname " , data [ "host" ])
341
+ _prtg_cncheck_value = _prtg_cert_cncheck_result ( cert , _validation , _sni_domainname )
313
342
csr .add_channel (name = "Common Name Check" ,
314
343
unit = ValueUnit .CUSTOM ,
315
344
value_lookup = 'prtg.standardlookups.sslcertificatesensor.cncheck' ,
@@ -350,12 +379,21 @@ def main():
350
379
is_float = False ,
351
380
is_limit_mode = False )
352
381
353
- # Print sensor JSON result
354
- print ( csr . json_result )
355
-
356
- except Exception as e :
382
+ except json . JSONDecodeError as sensor_error :
383
+ csr = CustomSensorResult ( text = "Python Script execution error" )
384
+ csr . error = f"Failed to decode 'params' into valid JSON object: { str ( sensor_error ) } "
385
+ except KeyError as sensor_error :
357
386
csr = CustomSensorResult (text = "Python Script execution error" )
358
- csr .error = "Python Script execution error: {}" .format (str (e ))
387
+ csr .error = f"Invalid key in 'protocol'|'cert_domainname_validation': { str (sensor_error )} "
388
+ except ssl .SSLEOFError as sensor_error :
389
+ csr = CustomSensorResult (text = "Python Script execution error" )
390
+ csr .error = f"Protocol handshake prior to STARTTLS possibly failed': { str (sensor_error )} "
391
+ except (TypeError , OSError , RuntimeError ) as sensor_error :
392
+ csr = CustomSensorResult (text = "Python Script execution error" )
393
+ csr .error = f"Python Script runtime error: { str (sensor_error )} "
394
+
395
+ finally :
396
+ # Print sensor JSON result
359
397
print (csr .json_result )
360
398
361
399
if __name__ == "__main__" :
0 commit comments