|
| 1 | +from pathlib import Path |
| 2 | +from .base import ModuleTestBase, tempwordlist |
| 3 | +import pytest |
| 4 | + |
| 5 | + |
| 6 | +@pytest.fixture(params=["ssh", "ftp"]) |
| 7 | +def protocol(request): |
| 8 | + return request.param |
| 9 | + |
| 10 | + |
| 11 | +@pytest.fixture |
| 12 | +def mock_legba_run_process(monkeypatch, request): |
| 13 | + async def fake_run_process(self, cmd): |
| 14 | + try: |
| 15 | + # find index of `--output` in cmd |
| 16 | + output_index = cmd.index("--output") |
| 17 | + # output_path is directly after `--output` in cmd |
| 18 | + output_path = Path(cmd[output_index + 1]) |
| 19 | + except Exception as e: |
| 20 | + raise Exception(f"Could not determine output file path from command {cmd}: {e}") |
| 21 | + |
| 22 | + protocol = request.getfixturevalue("protocol") |
| 23 | + |
| 24 | + expected_file_content_per_protocol = { |
| 25 | + "ssh": '{"found_at":"2025-07-22T20:50:19.541305293+02:00","target":"127.0.0.1:2222","plugin":"ssh","data":{"username":"remnux","password":"malware"},"partial":false}', |
| 26 | + "ftp": '{"found_at":"2025-07-22T20:51:19.541305293+02:00","target":"127.0.0.1:21","plugin":"ftp","data":{"username":"ftp_boot","password":"ftp_boot"},"partial":false}', |
| 27 | + } |
| 28 | + |
| 29 | + output_path.write_text(expected_file_content_per_protocol[protocol]) |
| 30 | + |
| 31 | + from bbot.modules.base import BaseModule |
| 32 | + |
| 33 | + monkeypatch.setattr(BaseModule, "run_process", fake_run_process) |
| 34 | + |
| 35 | + |
| 36 | +@pytest.mark.usefixtures("mock_legba_run_process") |
| 37 | +class TestLegba(ModuleTestBase): |
| 38 | + targets = ["127.0.0.1"] |
| 39 | + |
| 40 | + temp_ssh_wordlist = tempwordlist(["test:test", "admin:admin", "admin:password", "remnux:malware", "user:pass"]) |
| 41 | + temp_ftp_wordlist = tempwordlist(["test:test", "ftp_boot:ftp_boot", "admin:password", "root:root", "user:pass"]) |
| 42 | + |
| 43 | + config_overrides = { |
| 44 | + "modules": { |
| 45 | + "legba": { |
| 46 | + "ssh_wordlist": str(temp_ssh_wordlist), |
| 47 | + "ftp_wordlist": str(temp_ftp_wordlist), |
| 48 | + } |
| 49 | + } |
| 50 | + } |
| 51 | + |
| 52 | + @pytest.fixture(autouse=True) |
| 53 | + def _protocol_dependency(self, protocol): |
| 54 | + # ensure pytest sees dependency and runs one test per protocol |
| 55 | + self._protocol = protocol |
| 56 | + |
| 57 | + async def setup_after_prep(self, module_test): |
| 58 | + protocol = module_test.request_fixture.getfixturevalue("protocol") |
| 59 | + ports = {"ssh": 2222, "ftp": 21} |
| 60 | + event_data = {"host": str(self.targets[0]), "protocol": protocol.upper(), "port": ports[protocol]} |
| 61 | + protocol_event = module_test.scan.make_event( |
| 62 | + event_data, |
| 63 | + "PROTOCOL", |
| 64 | + parent=module_test.scan.root_event, |
| 65 | + ) |
| 66 | + |
| 67 | + await module_test.module.emit_event(protocol_event) |
| 68 | + |
| 69 | + def check(self, module_test, events): |
| 70 | + protocol = module_test.request_fixture.getfixturevalue("protocol") |
| 71 | + vuln_events = [e for e in events if e.type == "VULNERABILITY"] |
| 72 | + |
| 73 | + assert len(vuln_events) == 1 |
| 74 | + |
| 75 | + expected_desc = { |
| 76 | + "ssh": "Valid ssh credentials found - remnux:malware", |
| 77 | + "ftp": "Valid ftp credentials found - ftp_boot:ftp_boot", |
| 78 | + } |
| 79 | + |
| 80 | + assert expected_desc[protocol] in vuln_events[0].data["description"] |
0 commit comments