Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.

Commit 141b994

Browse files
authored
Merge pull request #6 from astrobl1904/develop
Initial import of sensor script.
2 parents 28e508a + 5fa8138 commit 141b994

File tree

1 file changed

+362
-0
lines changed

1 file changed

+362
-0
lines changed
Lines changed: 362 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,362 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Monitor certificates of services that require STARTTLS and return a JSON formatted sensor result.
4+
5+
This custom Python script sensor is used to monitor certificates of services that require STARTTLS
6+
to initiate a secure transport. It takes the same parameter as the PRTG built-in sensor `SSL Certificate`
7+
but additionally requires the protocol the sensor must use to communicate with the remote endpoint.
8+
The list of protocols is currently limited to `SMTP`, `LMTP`, and `LDAP`.
9+
10+
The sensor result in JSON contains the same channels as the `SSL Certificate` sensor with channel
11+
`Days to Expiration` set as primary channel.
12+
13+
Keyword for additional parameters:
14+
port -- Port for the connection to the target endpoint (default: 25)
15+
protocol -- Protocol used for the connection to the target endpoint (default: smtp)
16+
Implemented protocols: smtp, lmtp, ldap
17+
cert_domainname -- Common Name as contained in the certificate
18+
cert_domainname_validation -- Type of validation of the cert domain name (default: None)
19+
Allowed values: None, cn, cn_san
20+
21+
Ref.: https://stackoverflow.com/questions/16899247/how-can-i-decode-a-ssl-certificate-using-python
22+
"""
23+
24+
import json
25+
import sys
26+
import re
27+
import ssl
28+
import socket
29+
import select
30+
import datetime
31+
from enum import Enum
32+
from cryptography import x509
33+
from cryptography.hazmat.backends import default_backend
34+
from cryptography.hazmat.primitives import hashes
35+
36+
37+
from prtg.sensor.result import CustomSensorResult
38+
from prtg.sensor.units import ValueUnit
39+
40+
class Protocol(Enum):
41+
SMTP = 1
42+
LMTP = 2
43+
IMAP = 3
44+
LDAP = 4
45+
46+
class Validation(Enum):
47+
NONE = 1
48+
CN = 2
49+
CN_SAN = 3
50+
51+
def prtg_params_dict(params: str) -> dict:
52+
"""
53+
prtg_params_dict - Converts a PRTG params string into a dictionary.
54+
55+
It takes the params string and converts it via json into a dictionary. The solution is based
56+
on Stack Overflow (https://stackoverflow.com/questions/47663809/python-convert-string-to-dict)
57+
"""
58+
59+
_params = '{' + params.strip() + '}'
60+
_params_json_string = ''
61+
62+
# Remove surrounding spaces arround separator chars
63+
_params_stripped = re.sub(r'\s*([:,])\s*', '\g<1>', _params)
64+
_params_json_string = re.sub(r'([:,])', r'"\g<1>"', _params_stripped)
65+
_params_json_string = re.sub(r'{', r'{"', _params_json_string)
66+
_params_json_string = re.sub(r'}', r'"}', _params_json_string)
67+
68+
return json.loads(_params_json_string)
69+
70+
def starttls_getpeercert(host: str, port: int, starttls_proto: Protocol, cert_hostname=None, timeout=3.0, msglen=4096) -> dict:
71+
"""
72+
starttls_getpeercert - Retrieves the certificate of the other side of the connection.
73+
74+
@Returns: Certificate dict with selfSigned? and rootAuthorityTrusted? mixed in.
75+
76+
@NOTE: Dictionary keys
77+
subject (string; distinguished name form)
78+
issuer (string; distinguished name form)
79+
version (int)
80+
serialNumber (hex string; w/o leading 0x)
81+
notBefor (datetime)
82+
notAfter (datetime)
83+
fingerprint (hex string; w/o leading 0x)
84+
commonName (string; w/o CN=)
85+
publicKeySize (int)
86+
subjectAltName (list of tuples: type, value; optional)
87+
crlDistributionPoints (list of URIs; optional)
88+
OCSP (URI; optional)
89+
caIssuers (URI; optional)
90+
selfSigned? (boolean)
91+
rootAuthorityTrusted? (boolean)
92+
93+
Ref: https://stackoverflow.com/questions/5108681/use-python-to-get-an-smtp-server-certificate
94+
REf: https://stackoverflow.com/questions/71114085/how-can-i-retrieve-openldap-servers-starttls-certificate-with-pythons-ssl-libr
95+
"""
96+
97+
if cert_hostname == None:
98+
sni_hostname = host
99+
else:
100+
sni_hostname = cert_hostname
101+
102+
# Request #1: Get certificate data (incl. self-issued certs) with context in verify_mode == CERT_NONE
103+
# The SSLContext is created with the class constructor since retrieving self-signed certs require
104+
# loosened SSL settings. To fetch self-signed certs verify_mode MUST be set to CERT_NONE which
105+
# requires disabling hostname checking.
106+
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
107+
ctx.check_hostname = False
108+
ctx.verify_mode = ssl.VerifyMode.CERT_NONE
109+
110+
raw_sock = _starttls_do_service_handshake((host, port), starttls_proto, timeout=timeout, msglen=msglen)
111+
with ctx.wrap_socket(raw_sock, server_hostname=sni_hostname) as ssl_sock:
112+
cert_der = ssl_sock.getpeercert(binary_form=True)
113+
cert = x509.load_der_x509_certificate(cert_der, default_backend())
114+
raw_sock.close()
115+
cert_dict = _starttls_cert_dict(cert)
116+
cert_dict['selfSigned?'] = (cert_dict['subject'] == cert_dict['issuer'])
117+
118+
# Request #2: Verify root authority trust and certificate chain
119+
if cert_dict['selfSigned?']:
120+
cert_dict['rootAuthorityTrusted?'] = False
121+
else:
122+
ctx_trust_check = ssl.create_default_context()
123+
raw_sock_trust_check = _starttls_do_service_handshake((host, port), starttls_proto, timeout=timeout, msglen=msglen)
124+
try:
125+
ssl_sock_trust_check = ctx_trust_check.wrap_socket(raw_sock_trust_check, server_hostname=sni_hostname)
126+
cert_dict['rootAuthorityTrusted?'] = True
127+
ssl_sock_trust_check.close()
128+
except ssl.SSLCertVerificationError:
129+
cert_dict['rootAuthorityTrusted?'] = False
130+
131+
return cert_dict
132+
133+
def _starttls_do_service_handshake(address, starttls_proto: Protocol, timeout=3.0, msglen=4096) -> socket.socket:
134+
"""
135+
starttls_do_service_handshake - Perform the service handshake to initiate a TLS connection
136+
137+
@Returns: socket.socket
138+
"""
139+
# Protocol.LDAP sends a LDAP_START_TLS_OID - sniffed with Wireshark
140+
protocol_greeters = {
141+
Protocol.SMTP: bytes("EHLO {0}\nSTARTTLS\n".format(socket.gethostname()), 'ascii'),
142+
Protocol.LMTP: bytes("LHLO {0}\nSTARTTLS\n".format(socket.gethostname()), 'ascii'),
143+
Protocol.LDAP: b'\x30\x1d\x02\x01\x01\x77\x18\x80\x16\x31\x2e\x33\x2e\x36\x2e\x31' \
144+
b'\x2e\x34\x2e\x31\x2e\x31\x34\x36\x36\x2e\x32\x30\x30\x33\x37'
145+
}
146+
protocol_greeting = protocol_greeters.get(starttls_proto, protocol_greeters[Protocol.SMTP])
147+
148+
raw_sock = socket.create_connection(address, timeout)
149+
# Send protocol related greeting to initiate STARTTLS
150+
# Protocol.SMTP and Protocol.LMTP
151+
if starttls_proto in [Protocol.SMTP, Protocol.LMTP]:
152+
raw_sock.recv(msglen)
153+
raw_sock.send(protocol_greeting)
154+
raw_sock.recv(msglen)
155+
# Debugging this script and stepping thru the statements catches all data sent
156+
# by the server, but running normaly requires an additional read before creating
157+
# the SSL socket - omitting the following block would throw a SSLError: WRONG_VERSION_NUMBER
158+
_raw_sock_ready = select.select([raw_sock], [], [], (timeout / 10))
159+
if _raw_sock_ready[0]:
160+
raw_sock.recv(msglen)
161+
162+
# Protocol.LDAP
163+
if starttls_proto == Protocol.LDAP:
164+
raw_sock.send(protocol_greeting)
165+
# Look for \x0a\x01 {result code} \x04\x00\x04\x00 - if the second \x04 is not followed
166+
# by \x00 then it seems that the server support STARTTLS but has no cert installed
167+
_ldap_tls_ext_response = raw_sock.recv(msglen)
168+
_ldap_tls_ext_response_result_pos = _ldap_tls_ext_response.find(b'\x0a\x01')
169+
_ldap_tls_ext_response_result_trail = _ldap_tls_ext_response.find(b'\x04\x00\x04\x00', _ldap_tls_ext_response_result_pos)
170+
171+
if not(_ldap_tls_ext_response_result_trail - _ldap_tls_ext_response_result_pos == 3 and
172+
_ldap_tls_ext_response[_ldap_tls_ext_response_result_pos + 2] == 0):
173+
raw_sock.close()
174+
raise OSError("LDAP Server does not support LDAP_START_TLS_OID or has no certificate installed.")
175+
176+
return raw_sock
177+
178+
def _starttls_cert_dict(cert) -> dict:
179+
"""
180+
starttls_cert_dict - Converts a <Certificate> object into a <dict> object
181+
182+
The dict object contains the keys `subject`, `issuer`, `version`, `serialNumber`,
183+
`notBefore`, `notAfter`, `subjectAltName`, `OCSP`, `caIssuers`, and `crlDistributionPoints`.
184+
185+
Mixed-in are `commonName`, `fingerprint` (SHA1), and `publicKeySize`
186+
"""
187+
188+
# Cert basic data
189+
cert_dict = {}
190+
cert_dict['subject'] = cert.subject.rfc4514_string()
191+
cert_dict['issuer'] = cert.issuer.rfc4514_string()
192+
cert_dict['version'] = cert.version.value
193+
cert_dict['serialNumber'] = hex(cert.serial_number).replace('0x', '').upper()
194+
cert_dict['notBefore'] = cert.not_valid_before
195+
cert_dict['notAfter'] = cert.not_valid_after
196+
cert_dict['fingerprint'] = cert.fingerprint(hashes.SHA1()).hex().upper()
197+
cert_dict['commonName'] = cert.subject.rfc4514_string().split(',')[0].split('=')[1]
198+
cert_dict['publicKeySize'] = cert.public_key().key_size
199+
200+
# Cert extension data - subjectAltName: looking only for values of type DNSName
201+
# crlDistributionPoints
202+
_extension_oids = [
203+
(x509.ObjectIdentifier('2.5.29.17'), 'subjectAltName'),
204+
(x509.ObjectIdentifier('2.5.29.31'), 'crlDistributionPoints'),
205+
]
206+
for _extension_oid, _dict_key in _extension_oids:
207+
try:
208+
_extension = cert.extensions.get_extension_for_oid(_extension_oid)
209+
if _dict_key == 'subjectAltName':
210+
cert_dict[_dict_key] = [ ('DNS', _dnsname) for _dnsname in _extension.value.get_values_for_type(x509.DNSName) ]
211+
if _dict_key == 'crlDistributionPoints':
212+
cert_dict[_dict_key] = [ _crldp.full_name[0].value for _crldp in _extension.value ]
213+
except x509.ExtensionNotFound:
214+
pass
215+
216+
# Cert extension data - Authority Access Info Methods (OCSP, caIssuers)
217+
_extension_oid = x509.ObjectIdentifier('1.3.6.1.5.5.7.1.1')
218+
_authority_access_method_oids = [
219+
(x509.ObjectIdentifier('1.3.6.1.5.5.7.48.1'), 'OCSP'),
220+
(x509.ObjectIdentifier('1.3.6.1.5.5.7.48.2'), 'caIssuers'),
221+
]
222+
try:
223+
_extension = cert.extensions.get_extension_for_oid(_extension_oid)
224+
while len(_authority_access_method_oids) > 0:
225+
_access_method_oid, _dict_key = _authority_access_method_oids.pop(0)
226+
for _access_description in _extension.value:
227+
try:
228+
if _access_description.access_method == _access_method_oid:
229+
cert_dict[_dict_key] = _access_description.access_location.value
230+
break
231+
except:
232+
pass
233+
except x509.ExtensionNotFound:
234+
pass
235+
236+
return cert_dict
237+
238+
def _prtg_cert_cncheck_result(certificate: dict, validation_mode: Validation, cert_hostname: str) -> int:
239+
"""
240+
prtg_cert_cncheck_result - Returns the proper result value based on the sensor parameter cert_domainname_validation
241+
242+
The return value matches the expected result specified in the default overlay prtg.standardlookups.sslcertificatesensor.cncheck.
243+
This script DOES NOT return all values since SNI check and common_name check are considered interchangeable.
244+
245+
Expected result values defined in overlay:
246+
Value 0: State Ok, Matches device address (Validation mode: cn)
247+
Value 1: State Error, Does not match device address (Validation mode: cn)
248+
Value 2: State Ok, Disabled (Validation mode: None)
249+
Value 5: State Ok, CN/SAN match (Validation mode: cn_san)
250+
Value 6: State Error, CN/SAN do not match SNI
251+
252+
@Returns: int
253+
"""
254+
result_value = 2
255+
256+
if validation_mode == Validation.CN:
257+
if cert_hostname.strip().lower() == certificate["commonName"].strip().lower():
258+
result_value = 0
259+
else:
260+
result_value = 1
261+
if validation_mode == Validation.CN_SAN:
262+
if cert_hostname.strip().lower() == certificate["commonName"].strip().lower():
263+
result_value = 5
264+
if "subjectAltName" in certificate.keys():
265+
_dns_names = [altname_tuple[1] for altname_tuple in certificate["subjectAltName"] if altname_tuple[0] == "DNS"]
266+
if cert_hostname.strip().lower() in _dns_names:
267+
result_value = 5
268+
# If after all checks result_value is still 2 (disabled) - correct it to the propper check error value
269+
if result_value == 2:
270+
result_value = 6
271+
272+
return result_value
273+
274+
275+
def main():
276+
"""
277+
starttls_certificate_sensor - Monitors the certificate of a STARTTLS-secured connection
278+
279+
Monitors the SSL certificate of services that require the client to issue a STARTTLS command
280+
in order to start a secure connection.
281+
"""
282+
try:
283+
data = json.loads(sys.argv[1])
284+
params = prtg_params_dict(data["params"])
285+
_now = datetime.datetime.now()
286+
287+
cert = starttls_getpeercert(
288+
data["host"],
289+
int(params.get("port", "25")),
290+
Protocol[params.get("protocol", "smtp").upper()],
291+
cert_hostname=params.get("cert_domainname", data["host"]))
292+
293+
csr_text_ok = "OK. Certificate Common Name: {} - Certificate Thumbprint: {} - STARTTLS Protocol: {}"
294+
csr = CustomSensorResult(text=csr_text_ok.format(cert['commonName'],
295+
cert['fingerprint'],
296+
params.get('protocol', "smtp").upper()))
297+
# Channel _Days to Expiration_ (Primary)
298+
_prtg_expirationcheck_value = (cert['notAfter'] - _now).days
299+
csr.add_primary_channel(name="Days to Expiration",
300+
value=_prtg_expirationcheck_value,
301+
unit=ValueUnit.COUNT,
302+
is_float=False,
303+
is_limit_mode=True,
304+
limit_min_error=17,
305+
limit_min_warning=35,
306+
limit_error_msg="Certificate will expire in less than 17 days.",
307+
limit_warning_msg="Certificate will expire soon.")
308+
309+
# Channel _Common Name Check_
310+
_prtg_cncheck_value = _prtg_cert_cncheck_result(cert,
311+
Validation[params.get("cert_domainname_validation", "NONE").upper()],
312+
params.get("cert_domainname", data["host"]))
313+
csr.add_channel(name="Common Name Check",
314+
unit=ValueUnit.CUSTOM,
315+
value_lookup='prtg.standardlookups.sslcertificatesensor.cncheck',
316+
value=_prtg_cncheck_value,
317+
is_float=False,
318+
is_limit_mode=False)
319+
320+
# Channel _Public Key Length_
321+
_prtg_publickeycheck_value = cert["publicKeySize"]
322+
csr.add_channel(name="Public Key Length",
323+
unit=ValueUnit.CUSTOM,
324+
value_lookup='prtg.standardlookups.sslcertificatesensor.publickey',
325+
value=_prtg_publickeycheck_value,
326+
is_float=False,
327+
is_limit_mode=False)
328+
329+
# Channel _Revoked_ - not implemented
330+
331+
# Channel _Root Authority Trusted_ - checks trust AND chain
332+
_prtg_trustedrootcheck_value = 0
333+
if not cert["rootAuthorityTrusted?"]:
334+
_prtg_trustedrootcheck_value = 1
335+
csr.add_channel(name="Root Authority Trusted",
336+
unit=ValueUnit.CUSTOM,
337+
value_lookup='prtg.standardlookups.sslcertificatesensor.trustedroot',
338+
value=_prtg_trustedrootcheck_value,
339+
is_float=False,
340+
is_limit_mode=False)
341+
342+
# Channel _Self-Signed_
343+
_prtg_selfsignedcheck_value = 0
344+
if cert['selfSigned?']:
345+
_prtg_selfsignedcheck_value = 1
346+
csr.add_channel(name="Self-Signed",
347+
unit=ValueUnit.CUSTOM,
348+
value_lookup='prtg.standardlookups.sslcertificatesensor.selfsigned',
349+
value=_prtg_selfsignedcheck_value,
350+
is_float=False,
351+
is_limit_mode=False)
352+
353+
# Print sensor JSON result
354+
print(csr.json_result)
355+
356+
except Exception as e:
357+
csr = CustomSensorResult(text="Python Script execution error")
358+
csr.error = "Python Script execution error: {}".format(str(e))
359+
print(csr.json_result)
360+
361+
if __name__ == "__main__":
362+
main()

0 commit comments

Comments
 (0)