diff --git a/ospd/command/command.py b/ospd/command/command.py
index 3dde5811..e22bfafa 100644
--- a/ospd/command/command.py
+++ b/ospd/command/command.py
@@ -408,8 +408,23 @@ def handle_xml(self, xml: Element) -> Iterator[bytes]:
yield begin_vts_tag
if not version_only:
- for vt in self._daemon.get_vt_iterator(vts_selection, vt_details):
- yield xml_helper.add_element(self._daemon.get_vt_xml(vt))
+ if vt_details:
+ if vt_filter:
+ logger.debug('Getting filtered with details.')
+ for vt_id in self._daemon.get_vt_id_iterator(vts_selection):
+ yield self._daemon.get_vt_xml_str_prepared(vt_id)
+ ## 3s
+ #yield self._daemon.get_vt_xml_str_fixed(vt_id)
+ else:
+ logger.debug('Getting all with details.')
+ for s in self._daemon.get_vt_xml_str_iterator():
+ yield s
+ else:
+ logger.debug('Getting filtered or sparse.')
+ for vt in self._daemon.get_vt_iterator(vts_selection, vt_details):
+ #1m30s
+ yield xml_helper.add_element(self._daemon.get_vt_xml(vt))
+ #yield self._daemon.get_vt_xml_str(vt)
yield xml_helper.create_element('vts', end=True)
yield xml_helper.create_response('get_vts', end=True)
diff --git a/ospd/ospd.py b/ospd/ospd.py
index 95df32c9..371462b3 100644
--- a/ospd/ospd.py
+++ b/ospd/ospd.py
@@ -85,6 +85,7 @@ def _terminate_process_group(process: multiprocessing.Process) -> None:
'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
)
+from xml.sax.saxutils import escape, quoteattr
class OSPDaemon:
@@ -127,6 +128,7 @@ def remove_previous_data_pickler_files():
dp.unlink()
return
+ self.vt_xml = dict()
self.scan_collection = ScanCollection(file_storage_dir)
self.scan_processes = dict()
remove_previous_data_pickler_files()
@@ -900,6 +902,33 @@ def get_vt_iterator( # pylint: disable=unused-argument
from the VTs dictionary."""
return self.vts.items()
+ def get_vt_xml_str_fixed(self, vt_id: str) -> bytes:
+ """Gets a single vulnerability test as an XML string.
+
+ Returns:
+ String of single vulnerability test information as XML string.
+ """
+ if not vt_id:
+ return ''.encode('utf-8')
+
+ return ''.encode('utf-8')
+
+ def get_vt_xml_str_prepared(self, vt_id: str) -> bytes:
+ """Gets a single vulnerability test as an XML string.
+
+ Returns:
+ String of single vulnerability test information as XML string.
+ """
+ if not vt_id:
+ return ''.encode('utf-8')
+
+ return self.vt_xml[vt_id]
+
+ def get_vt_xml_str_iterator(self) -> Iterator[bytes]:
+ """Return iterator object for getting elements
+ from vt_xml."""
+ return self.vt_xml.values()
+
def get_vt_xml(self, single_vt: Tuple[str, Dict]) -> Element:
"""Gets a single vulnerability test information in XML format.
@@ -1004,6 +1033,110 @@ def get_vt_xml(self, single_vt: Tuple[str, Dict]) -> Element:
return vt_xml
+ def get_vt_xml_str(self, single_vt: Tuple[str, Dict]) -> bytes:
+ """Gets a single vulnerability test as an XML string.
+
+ Returns:
+ String of single vulnerability test information as XML string.
+ """
+ if not single_vt or single_vt[1] is None:
+ return ''.encode('utf-8')
+
+ vt_id, vt = single_vt
+
+ name = vt.get('name')
+ vt_str = ''
+
+ for name, value in [('name', name)]:
+ vt_str += ' <' + name + '>' + escape(str(value)) + '' + name + '>'
+
+ xml_helper = XmlStringVTHelper()
+
+ if vt.get('vt_params'):
+ params_xml_str = xml_helper.get_params_vt_as_xml_str(
+ vt_id, vt.get('vt_params')
+ )
+ vt_str += params_xml_str
+
+ if vt.get('vt_refs'):
+ refs_xml_str = xml_helper.get_refs_vt_as_xml_str(
+ vt_id, vt.get('vt_refs')
+ )
+ vt_str += refs_xml_str
+
+ if vt.get('vt_dependencies'):
+ dependencies = xml_helper.get_dependencies_vt_as_xml_str(
+ vt_id, vt.get('vt_dependencies')
+ )
+ vt_str += dependencies
+
+ if vt.get('creation_time'):
+ vt_ctime = xml_helper.get_creation_time_vt_as_xml_str(
+ vt_id, vt.get('creation_time')
+ )
+ vt_str += vt_ctime
+
+ if vt.get('modification_time'):
+ vt_mtime = xml_helper.get_modification_time_vt_as_xml_str(
+ vt_id, vt.get('modification_time')
+ )
+ vt_str += vt_mtime
+
+ if vt.get('summary'):
+ summary_xml_str = xml_helper.get_summary_vt_as_xml_str(
+ vt_id, vt.get('summary')
+ )
+ vt_str += summary_xml_str
+
+ if vt.get('impact'):
+ impact_xml_str = xml_helper.get_impact_vt_as_xml_str(
+ vt_id, vt.get('impact')
+ )
+ vt_str += impact_xml_str
+
+ if vt.get('affected'):
+ affected_xml_str = xml_helper.get_affected_vt_as_xml_str(
+ vt_id, vt.get('affected')
+ )
+ vt_str += affected_xml_str
+
+ if vt.get('insight'):
+ insight_xml_str = xml_helper.get_insight_vt_as_xml_str(
+ vt_id, vt.get('insight')
+ )
+ vt_str += insight_xml_str
+
+ if vt.get('solution'):
+ solution_xml_str = xml_helper.get_solution_vt_as_xml_str(
+ vt_id,
+ vt.get('solution'),
+ vt.get('solution_type'),
+ vt.get('solution_method'),
+ )
+ vt_str += solution_xml_str
+
+ if vt.get('detection') or vt.get('qod_type') or vt.get('qod'):
+ detection_xml_str = xml_helper.get_detection_vt_as_xml_str(
+ vt_id, vt.get('detection'), vt.get('qod_type'), vt.get('qod')
+ )
+ vt_str += detection_xml_str
+
+ if vt.get('severities'):
+ severities_xml_str = xml_helper.get_severities_vt_as_xml_str(
+ vt_id, vt.get('severities')
+ )
+ vt_str += severities_xml_str
+
+ if vt.get('custom'):
+ custom_xml_str = xml_helper.get_custom_vt_as_xml_str(
+ vt_id, vt.get('custom')
+ )
+ vt_str += custom_xml_str
+
+ vt_str += ''
+
+ return vt_str.encode('utf-8')
+
def get_vts_selection_list(
self, vt_id: str = None, filtered_vts: Dict = None
) -> Iterable[str]:
diff --git a/ospd_openvas/daemon.py b/ospd_openvas/daemon.py
index bcaecc86..cc978ccc 100644
--- a/ospd_openvas/daemon.py
+++ b/ospd_openvas/daemon.py
@@ -664,6 +664,12 @@ def update_vts(self):
loaded = Openvas.load_vts_into_redis()
if loaded:
+ logger.debug('Filling vt_xml...')
+ for single_vt in self.get_vt_iterator(self.get_vts_selection_list(), True):
+ vt_id, vt = single_vt
+ #logger.info('Adding to vt_xml: %s', vt_id)
+ self.vt_xml[vt_id] = self.get_vt_xml_str(single_vt)
+
new = self.nvti.get_feed_version()
if new != old:
logger.info(
@@ -720,6 +726,12 @@ def get_vt_iterator(
vthelper = VtHelper(self.nvti, self.notus)
return vthelper.get_vt_iterator(vt_selection, details)
+ def get_vt_id_iterator(
+ self, vt_selection: List[str] = None
+ ) -> Iterator[str]:
+ vthelper = VtHelper(self.nvti, self.notus)
+ return vthelper.get_vt_id_iterator(vt_selection)
+
@property
def is_running_as_root(self) -> bool:
"""Check if it is running as root user."""
diff --git a/ospd_openvas/vthelper.py b/ospd_openvas/vthelper.py
index f68b8f73..814a8bfa 100644
--- a/ospd_openvas/vthelper.py
+++ b/ospd_openvas/vthelper.py
@@ -195,6 +195,29 @@ def get_vt_iterator(
if vt:
yield (vt_id, vt)
+ def get_vt_id_iterator(
+ self, vt_selection: List[str] = None
+ ) -> Iterator[str]:
+ """Yield the vt ids from the Redis NVTicache."""
+
+ if not vt_selection or details:
+ # notus contains multiple oids per advisory therefore unlike
+ # nasl they share the filename
+ # The vt collection is taken from both Caches
+ if self.notus:
+ vt_collection = chain(
+ self.notus.get_oids(), self.nvti.get_oids()
+ )
+ else:
+ vt_collection = self.nvti.get_oids()
+
+ if not vt_selection:
+ vt_selection = [v for _, v in vt_collection]
+
+ #vt_selection.sort()
+ for vt_id in vt_selection:
+ yield vt_id
+
def vt_verification_string_iter(self) -> str:
# for a reproducible hash calculation
# the vts must already be sorted in the dictionary.