Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions test/plugins/windows/windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,22 @@ def test_windows_specific_psscan(self, volatility, python):
assert out.find(b"svchost.exe") != -1
assert out.count(b"\n") > 10

def test_windows_specific_psscan_physical(self, volatility, python):
image = WindowsSamples.WINDOWSXP_GENERIC.value.path
rc, out, _err = test_volatility.runvol_plugin(
"windows.psscan.PsScan",
image,
volatility,
python,
pluginargs=("--physical",),
)
assert rc == 0
out = out.lower()
assert out.find(b"system") != -1
assert out.find(b"csrss.exe") != -1
assert out.find(b"svchost.exe") != -1
assert out.count(b"\n") > 10


class TestWindowsDlllist:
def test_windows_generic_dlllist(self, volatility, python, image):
Expand Down Expand Up @@ -772,19 +788,19 @@ def test_windows_specific_symlinkscan(self, volatility, python):
assert test_volatility.count_entries_flat(json_out) > 5
expected_rows = [
{
"CreateTime": "2005-06-25T16:47:28+00:00",
"From Name": "AUX",
"Offset": 453082584,
"To Name": "\\DosDevices\\COM1",
"__children": []
"CreateTime": "2005-06-25T16:47:28+00:00",
"From Name": "AUX",
"Offset": 453082584,
"To Name": "\\DosDevices\\COM1",
"__children": [],
},
{
"CreateTime": "2005-06-25T16:47:28+00:00",
"From Name": "UNC",
"Offset": 453176664,
"To Name": "\\Device\\Mup",
"__children": []
}
"CreateTime": "2005-06-25T16:47:28+00:00",
"From Name": "UNC",
"Offset": 453176664,
"To Name": "\\Device\\Mup",
"__children": [],
},
]

for expected_row in expected_rows:
Expand Down
83 changes: 52 additions & 31 deletions volatility3/framework/plugins/windows/psscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class PsScan(interfaces.plugins.PluginInterface, timeliner.TimeLinerInterface):
"""Scans for processes present in a particular windows memory image."""

_required_framework_version = (2, 3, 1)
_version = (2, 0, 0)
_version = (2, 0, 1)

@classmethod
def get_requirements(cls):
Expand Down Expand Up @@ -269,39 +269,60 @@ def _generator(self):
filter_func=pslist.PsList.create_pid_filter(self.config.get("pid", None)),
):
file_output = "Disabled"
if self.config["dump"]:
# windows 10 objects (maybe others in the future) are already in virtual memory
if proc.vol.layer_name == kernel.layer_name:
vproc = proc

# windows 10 objects (maybe others in the future) are already in virtual memory
# if the proc native_layer_name and layer_name match then it is in 'virtual' memory.
if proc.vol.layer_name == proc.vol.native_layer_name:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a robust test (it only works when scan_processes returns carved processes), and I don't really want people using it to determine whether an address is physical or not. The poolscanner's been asked to scan the kernel layer, so... it should only ever return virtual results? Have you verified that the "else" branch ever gets hit?

# proc is already in a virtual mem, so a new object is not needed. it means
# that if physical addresses are requested in the output then proc.vol.offset
# cannot be used because it will be virtual, so the mapping is needed.
vproc = proc
if self.config["physical"]:
# the display should be physical addresses, so proc cannot be used. The
# mappings are needed to find where it would be physically.
_, _, offset, _, _ = list(
memory.mapping(offset=proc.vol.offset, length=0)
)[0]
else:
try:
vproc = self.virtual_process_from_physical(
self.context,
self.config["kernel"],
proc,
)
except exceptions.PagedInvalidAddressException:
vproc = None
# the display should be virtual addresses, so proc can be used
offset = proc.vol.offset

# renderers.UnreadableValue()
else:
# proc is in virtual mem, so a new object needs to be creatd.
vproc = self.virtual_process_from_physical(
self.context, self.config["kernel"], proc
)
if self.config["physical"]:
# the display should be physical addresses, so proc can be used
# as it is
offset = proc.vol.offset
else:
# the display should be virtual address, so vproc should be used
# however virtual_process_from_physical is not always able to create
# a vproc, in that case we need to display a UnreadableValue()
if vproc is not None:
offset = vproc.vol.offset
else:
offset = None

if self.config["dump"]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we'd guard for if vproc is None:. If we hand it in, it will debug log but otherwise return None, so it's not catastrophic, but feels like an easy thing we can check (and catchall exception handlers aren't ideal, so that may go away at some point in the future)...

file_handle = pslist.PsList.process_dump(
self.context,
kernel.symbol_table_name,
pe_table_name,
vproc,
self.open,
)
file_output = "Error outputting file"
if vproc:
file_handle = pslist.PsList.process_dump(
self.context,
kernel.symbol_table_name,
pe_table_name,
vproc,
self.open,
)

if file_handle:
file_output = file_handle.preferred_filename

if not self.config["physical"]:
offset = proc.vol.offset
if file_handle:
file_output = file_handle.preferred_filename

# format offset for display
if offset is None:
display_offset = renderers.UnreadableValue()
else:
(_, _, offset, _, _) = list(
memory.mapping(offset=proc.vol.offset, length=0)
)[0]
display_offset = format_hints.Hex(offset)

try:
yield (
Expand All @@ -314,7 +335,7 @@ def _generator(self):
max_length=proc.ImageFileName.vol.count,
errors="replace",
),
format_hints.Hex(offset),
display_offset,
proc.ActiveThreads,
proc.get_handle_count(),
proc.get_session_id(),
Expand Down
Loading