diff --git a/bbot/core/helpers/web/web.py b/bbot/core/helpers/web/web.py index 5e86424049..d0ec79f4c0 100644 --- a/bbot/core/helpers/web/web.py +++ b/bbot/core/helpers/web/web.py @@ -1,7 +1,10 @@ +import json import logging +import re import warnings from pathlib import Path from bs4 import BeautifulSoup +import ipaddress from bbot.core.engine import EngineClient from bbot.core.helpers.misc import truncate_filename @@ -319,12 +322,12 @@ async def curl(self, *args, **kwargs): method (str, optional): The HTTP method to use for the request (e.g., 'GET', 'POST'). cookies (dict, optional): A dictionary of cookies to include in the request. path_override (str, optional): Overrides the request-target to use in the HTTP request line. - head_mode (bool, optional): If True, includes '-I' to fetch headers only. Defaults to None. raw_body (str, optional): Raw string to be sent in the body of the request. + resolve (dict, optional): Host resolution override as dict with 'host', 'port', 'ip' keys for curl --resolve. **kwargs: Arbitrary keyword arguments that will be forwarded to the HTTP request function. Returns: - str: The output of the cURL command. + dict: JSON object with response data and metadata. Raises: CurlError: If 'url' is not supplied. @@ -338,7 +341,11 @@ async def curl(self, *args, **kwargs): if not url: raise CurlError("No URL supplied to CURL helper") - curl_command = ["curl", url, "-s"] + # Use BBOT-specific curl binary + bbot_curl = self.parent_helper.tools_dir / "curl" + if not bbot_curl.exists(): + raise CurlError(f"BBOT curl binary not found at {bbot_curl}. Run dependency installation.") + curl_command = [str(bbot_curl), url, "-s"] raw_path = kwargs.get("raw_path", False) if raw_path: @@ -382,6 +389,12 @@ async def curl(self, *args, **kwargs): curl_command.append("-m") curl_command.append(str(timeout)) + # mirror the web helper behavior + retries = self.parent_helper.web_config.get("http_retries", 1) + if retries > 0: + curl_command.extend(["--retry", str(retries)]) + curl_command.append("--retry-all-errors") + for k, v in headers.items(): if isinstance(v, list): for x in v: @@ -418,17 +431,120 @@ async def curl(self, *args, **kwargs): curl_command.append("--request-target") curl_command.append(f"{path_override}") - head_mode = kwargs.get("head_mode", None) - if head_mode: - curl_command.append("-I") - raw_body = kwargs.get("raw_body", None) if raw_body: curl_command.append("-d") curl_command.append(raw_body) - log.verbose(f"Running curl command: {curl_command}") + + # --resolve :: + resolve_dict = kwargs.get("resolve", None) + + if resolve_dict is not None: + # Validate "resolve" is a dict + if not isinstance(resolve_dict, dict): + raise CurlError("'resolve' must be a dictionary containing 'host', 'port', and 'ip' keys") + + # Extract and validate IP (required) + ip = resolve_dict.get("ip") + if not ip: + raise CurlError("'resolve' dictionary requires an 'ip' value") + try: + ipaddress.ip_address(ip) + except ValueError: + raise CurlError(f"Invalid IP address supplied to 'resolve': {ip}") + + # Host, port, and ip must ALL be supplied explicitly + host = resolve_dict.get("host") + if not host: + raise CurlError("'resolve' dictionary requires a 'host' value") + + if "port" not in resolve_dict: + raise CurlError("'resolve' dictionary requires a 'port' value") + port = resolve_dict["port"] + + try: + port = int(port) + except (TypeError, ValueError): + raise CurlError("'port' supplied to resolve must be an integer") + if port < 1 or port > 65535: + raise CurlError("'port' supplied to resolve must be between 1 and 65535") + + # Append the --resolve directive + curl_command.append("--resolve") + curl_command.append(f"{host}:{port}:{ip}") + + # Always add JSON --write-out format with separator and capture headers + curl_command.extend(["-D", "-", "-w", "\\n---CURL_METADATA---\\n%{json}"]) + + log.debug(f"Running curl command: {curl_command}") output = (await self.parent_helper.run(curl_command)).stdout - return output + + # Parse the output to separate headers, content, and metadata + parts = output.split("\n---CURL_METADATA---\n") + + # Raise CurlError if separator not found - this indicates a problem with our curl implementation + if len(parts) < 2: + raise CurlError(f"Curl output missing expected separator. Got: {output[:200]}...") + + # Headers and content are in the first part, JSON metadata is in the last part + header_content = parts[0] + json_data = parts[-1].strip() + + # Split headers from content + header_lines = [] + content_lines = [] + in_headers = True + + for line in header_content.split("\n"): + if in_headers: + if line.strip() == "": + in_headers = False + else: + header_lines.append(line) + else: + content_lines.append(line) + + # Parse headers into dictionary + headers_dict = {} + raw_headers = "\n".join(header_lines) + + for line in header_lines: + if ":" in line: + key, value = line.split(":", 1) + key = key.strip().lower() + value = value.strip() + + # Convert hyphens to underscores to match httpx (projectdiscovery) format + # This ensures consistency with how other modules expect headers + normalized_key = key.replace("-", "_") + + if normalized_key in headers_dict: + if isinstance(headers_dict[normalized_key], list): + headers_dict[normalized_key].append(value) + else: + headers_dict[normalized_key] = [headers_dict[normalized_key], value] + else: + headers_dict[normalized_key] = value + + response_data = "\n".join(content_lines) + + # Raise CurlError if JSON parsing fails - this indicates a problem with curl's %{json} output + try: + metadata = json.loads(json_data) + except json.JSONDecodeError as e: + # Try to fix common malformed JSON issues from curl output + try: + # Fix empty values like "certs":, -> "certs":null, + fixed_json = re.sub(r':"?\s*,', ":null,", json_data) + # Fix trailing commas before closing braces + fixed_json = re.sub(r",\s*}", "}", fixed_json) + metadata = json.loads(fixed_json) + log.debug(f"Fixed malformed JSON from curl: {json_data[:100]}... -> {fixed_json[:100]}...") + except json.JSONDecodeError: + raise CurlError(f"Failed to parse curl JSON metadata: {e}. JSON data: {json_data[:200]}...") + + # Combine into final JSON structure + return {"response_data": response_data, "headers": headers_dict, "raw_headers": raw_headers, **metadata} def beautifulsoup( self, diff --git a/bbot/core/shared_deps.py b/bbot/core/shared_deps.py index 013a8b4d67..eaf62b738d 100644 --- a/bbot/core/shared_deps.py +++ b/bbot/core/shared_deps.py @@ -173,6 +173,31 @@ }, ] +DEP_CURL = [ + { + "name": "Download static curl binary (v8.11.0)", + "get_url": { + "url": "https://github.com/moparisthebest/static-curl/releases/download/v8.11.0/curl-amd64", + "dest": "#{BBOT_TOOLS}/curl", + "mode": "0755", + "force": True, + }, + }, + { + "name": "Ensure curl binary is executable", + "file": { + "path": "#{BBOT_TOOLS}/curl", + "mode": "0755", + }, + }, + { + "name": "Verify curl binary works", + "command": "#{BBOT_TOOLS}/curl --version", + "register": "curl_version_output", + "changed_when": False, + }, +] + DEP_MASSCAN = [ { "name": "install os deps (Debian)", diff --git a/bbot/modules/generic_ssrf.py b/bbot/modules/generic_ssrf.py index 6ccde510b9..3eb3202f9f 100644 --- a/bbot/modules/generic_ssrf.py +++ b/bbot/modules/generic_ssrf.py @@ -39,6 +39,8 @@ class BaseSubmodule: severity = "INFO" paths = [] + deps_common = ["curl"] + def __init__(self, generic_ssrf): self.generic_ssrf = generic_ssrf self.test_paths = self.create_paths() @@ -61,7 +63,7 @@ async def test(self, event): self.generic_ssrf.debug(f"Sending request to URL: {test_url}") r = await self.generic_ssrf.helpers.curl(url=test_url) if r: - self.process(event, r, subdomain_tag) + self.process(event, r["response_data"], subdomain_tag) def process(self, event, r, subdomain_tag): response_token = self.generic_ssrf.interactsh_domain.split(".")[0][::-1] @@ -123,7 +125,7 @@ async def test(self, event): for tag, pd in post_data_list: r = await self.generic_ssrf.helpers.curl(url=test_url, method="POST", post_data=pd) - self.process(event, r, tag) + self.process(event, r["response_data"], tag) class Generic_XXE(BaseSubmodule): @@ -146,7 +148,7 @@ async def test(self, event): url=test_url, method="POST", raw_body=post_body, headers={"Content-type": "application/xml"} ) if r: - self.process(event, r, subdomain_tag) + self.process(event, r["response_data"], subdomain_tag) class generic_ssrf(BaseModule): diff --git a/bbot/modules/host_header.py b/bbot/modules/host_header.py index a60967b8b4..2dd77b2a09 100644 --- a/bbot/modules/host_header.py +++ b/bbot/modules/host_header.py @@ -15,7 +15,7 @@ class host_header(BaseModule): in_scope_only = True per_hostport_only = True - deps_apt = ["curl"] + deps_common = ["curl"] async def setup(self): self.subdomain_tags = {} @@ -106,7 +106,7 @@ async def handle_event(self, event): ignore_bbot_global_settings=True, cookies=added_cookies, ) - if self.domain in output: + if self.domain in output["response_data"]: domain_reflections.append(technique_description) # absolute URL / Host header transposition @@ -120,7 +120,7 @@ async def handle_event(self, event): cookies=added_cookies, ) - if self.domain in output: + if self.domain in output["response_data"]: domain_reflections.append(technique_description) # duplicate host header tolerance @@ -131,10 +131,9 @@ async def handle_event(self, event): # The fact that it's accepting two host headers is rare enough to note on its own, and not too noisy. Having the 3rd header be an interactsh would result in false negatives for the slightly less interesting cases. headers={"Host": ["", str(event.host), str(event.host)]}, cookies=added_cookies, - head_mode=True, ) - split_output = output.split("\n") + split_output = output["raw_headers"].split("\n") if " 4" in split_output: description = "Duplicate Host Header Tolerated" await self.emit_event( @@ -173,7 +172,7 @@ async def handle_event(self, event): headers=override_headers, cookies=added_cookies, ) - if self.domain in output: + if self.domain in output["response_data"]: domain_reflections.append(technique_description) # emit all the domain reflections we found diff --git a/bbot/modules/output/web_report.py b/bbot/modules/output/web_report.py index eb1aee5e52..69e307f002 100644 --- a/bbot/modules/output/web_report.py +++ b/bbot/modules/output/web_report.py @@ -4,7 +4,7 @@ class web_report(BaseOutputModule): - watched_events = ["URL", "TECHNOLOGY", "FINDING", "VULNERABILITY"] + watched_events = ["URL", "TECHNOLOGY", "FINDING", "VULNERABILITY", "VIRTUAL_HOST"] meta = { "description": "Create a markdown report with web assets", "created_date": "2023-02-08", diff --git a/bbot/modules/virtualhost.py b/bbot/modules/virtualhost.py new file mode 100644 index 0000000000..c1b67d538b --- /dev/null +++ b/bbot/modules/virtualhost.py @@ -0,0 +1,1068 @@ +from urllib.parse import urlparse +import random +import string + +from bbot.modules.base import BaseModule +from bbot.errors import CurlError +from bbot.core.helpers.simhash import compute_simhash + + +class virtualhost(BaseModule): + watched_events = ["URL"] + produced_events = ["VIRTUAL_HOST", "DNS_NAME", "HTTP_RESPONSE"] + flags = ["active", "aggressive", "slow", "deadly"] + meta = {"description": "Fuzz for virtual hosts", "created_date": "2022-05-02", "author": "@liquidsec"} + + def _format_headers(self, headers): + """ + Convert list headers back to strings for HTTP_RESPONSE compatibility. + The curl helper converts multiple headers with same name to lists, + but HTTP_RESPONSE events expect them as comma-separated strings. + """ + formatted_headers = {} + for key, value in headers.items(): + if isinstance(value, list): + # Convert list back to comma-separated string + formatted_headers[key] = ", ".join(str(v) for v in value) + else: + formatted_headers[key] = value + return formatted_headers + + deps_common = ["curl"] + + SIMILARITY_THRESHOLD = 0.8 + CANARY_LENGTH = 12 + MAX_RESULTS_FLOOD_PROTECTION = 50 + + special_virtualhost_list = ["127.0.0.1", "localhost", "host.docker.internal"] + options = { + "brute_wordlist": "https://raw.githubusercontent.com/danielmiessler/SecLists/master/Discovery/DNS/subdomains-top1million-5000.txt", + "force_basehost": "", + "brute_lines": 2000, + "subdomain_brute": True, + "mutation_check": True, + "special_hosts": False, + "certificate_sans": False, + "max_concurrent_requests": 80, + "require_inaccessible": True, + "wordcloud_check": False, + "report_interesting_default_content": True, + } + options_desc = { + "brute_wordlist": "Wordlist containing subdomains", + "force_basehost": "Use a custom base host (e.g. evilcorp.com) instead of the default behavior of using the current URL", + "brute_lines": "take only the first N lines from the wordlist when finding directories", + "subdomain_brute": "Enable subdomain brute-force on target host", + "mutation_check": "Enable trying mutations of the target host", + "special_hosts": "Enable testing of special virtual host list (localhost, etc.)", + "certificate_sans": "Enable extraction and testing of Subject Alternative Names from certificates", + "wordcloud_check": "Enable check using scan-wide wordcloud data on target host", + "max_concurrent_requests": "Maximum number of concurrent virtual host requests", + "require_inaccessible": "Only test virtual hosts that are not directly accessible (for discovering hidden content)", + "report_interesting_default_content": "Report interesting default content", + } + + in_scope_only = True + + virtualhost_ignore_strings = [ + "We weren't able to find your Azure Front Door Service", + "The http request header is incorrect.", + ] + + async def setup(self): + self.max_concurrent = self.config.get("max_concurrent_requests", 80) + self.scanned_hosts = {} + self.wordcloud_tried_hosts = set() + self.brute_wordlist = await self.helpers.wordlist( + self.config.get("brute_wordlist"), lines=self.config.get("brute_lines", 2000) + ) + self.similarity_cache = {} # Cache for similarity results + + self.waf_strings = self.helpers.get_waf_strings() + self.virtualhost_ignore_strings + + return True + + def _get_basehost(self, event): + """Get the basehost and subdomain from the event""" + basehost = self.helpers.parent_domain(event.parsed_url.hostname) + if not basehost: + raise ValueError(f"No parent domain found for {event.parsed_url.hostname}") + subdomain = event.parsed_url.hostname.removesuffix(basehost).rstrip(".") + return basehost, subdomain + + async def _get_baseline_response(self, event, normalized_url, host_ip): + """Get baseline response for a host using the appropriate method (HTTPS SNI or HTTP Host header)""" + is_https = event.parsed_url.scheme == "https" + host = event.parsed_url.netloc + + if is_https: + port = event.parsed_url.port or 443 + baseline_response = await self.helpers.web.curl( + url=f"https://{host}:{port}/", + resolve={"host": host, "port": port, "ip": host_ip}, + ) + else: + baseline_response = await self.helpers.web.curl( + url=normalized_url, + headers={"Host": host}, + resolve={"host": event.parsed_url.hostname, "port": event.parsed_url.port or 80, "ip": host_ip}, + ) + + return baseline_response + + async def handle_event(self, event): + if not self.helpers.is_ip(event.host) or self.config.get("force_basehost"): + scheme = event.parsed_url.scheme + host = event.parsed_url.netloc + normalized_url = f"{scheme}://{host}" + + # since we normalize the URL to the host level, + if normalized_url in self.scanned_hosts: + return + + self.scanned_hosts[normalized_url] = event + + if self.config.get("force_basehost"): + basehost = self.config.get("force_basehost") + subdomain = "" + else: + basehost, subdomain = self._get_basehost(event) + + is_https = event.parsed_url.scheme == "https" + + host_ip = next(iter(event.resolved_hosts)) + try: + baseline_response = await self._get_baseline_response(event, normalized_url, host_ip) + except CurlError as e: + self.warning(f"Failed to get baseline response for {normalized_url}: {e}") + return None + + if not await self._wildcard_canary_check(scheme, host, event, host_ip, baseline_response): + self.verbose( + f"WILDCARD CHECK FAILED in handle_event: Skipping {normalized_url} - failed virtual host wildcard check" + ) + return None + else: + self.verbose(f"WILDCARD CHECK PASSED in handle_event: Proceeding with {normalized_url}") + + # Phase 1: Main virtual host bruteforce + if self.config.get("subdomain_brute", True): + self.verbose(f"=== Starting subdomain brute-force on {normalized_url} ===") + await self._run_virtualhost_phase( + "Target host Subdomain Brute-force", + normalized_url, + basehost, + host_ip, + is_https, + event, + "subdomain", + ) + + # only run mutations if there is an actual subdomain (to mutate) + if subdomain: + # Phase 2: Check existing host for mutations + if self.config.get("mutation_check", True): + self.verbose(f"=== Starting mutations check on {normalized_url} ===") + await self._run_virtualhost_phase( + "Mutations on target host", + normalized_url, + basehost, + host_ip, + is_https, + event, + "mutation", + wordlist=self.mutations_check(subdomain), + ) + + # Phase 3: Special virtual host list + if self.config.get("special_hosts", True): + self.verbose(f"=== Starting special virtual hosts check on {normalized_url} ===") + await self._run_virtualhost_phase( + "Special virtual host list", + normalized_url, + "", + host_ip, + is_https, + event, + "random", + wordlist=self.helpers.tempfile(self.special_virtualhost_list, pipe=False), + skip_dns_host=True, + ) + + # Phase 4: Obtain subject alternate names from certicate and analyze them + if self.config.get("certificate_sans", True): + self.verbose(f"=== Starting certificate SAN analysis on {normalized_url} ===") + if is_https: + subject_alternate_names = await self._analyze_subject_alternate_names(event.data) + if subject_alternate_names: + self.debug( + f"Found {len(subject_alternate_names)} Subject Alternative Names from certificate: {subject_alternate_names}" + ) + + # Use SANs as potential virtual hosts for testing + san_wordlist = self.helpers.tempfile(subject_alternate_names, pipe=False) + await self._run_virtualhost_phase( + "Certificate Subject Alternate Name", + normalized_url, + "", + host_ip, + is_https, + event, + "random", + wordlist=san_wordlist, + skip_dns_host=True, + ) + + async def _analyze_subject_alternate_names(self, url): + """Analyze subject alternate names from certificate""" + from OpenSSL import crypto + from bbot.modules.sslcert import sslcert + + parsed = urlparse(url) + host = parsed.netloc + + response = await self.helpers.web.curl(url=url) + if not response or not response.get("certs"): + self.debug(f"No certificate data available for {url}") + return [] + + cert_output = response["certs"] + subject_alt_names = [] + + try: + cert_lines = cert_output.split("\n") + pem_lines = [] + in_cert = False + + for line in cert_lines: + if "-----BEGIN CERTIFICATE-----" in line: + in_cert = True + pem_lines.append(line) + elif "-----END CERTIFICATE-----" in line: + pem_lines.append(line) + break + elif in_cert: + pem_lines.append(line) + + if pem_lines: + cert_pem = "\n".join(pem_lines) + cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) + + # Use the existing SAN extraction method from sslcert module + sans = sslcert.get_cert_sans(cert) + + for san in sans: + self.debug(f"Found SAN: {san}") + if san != host and san not in subject_alt_names: + subject_alt_names.append(san) + else: + self.debug("No valid PEM certificate found in response") + + except Exception as e: + self.warning(f"Error parsing certificate for {url}: {e}") + + self.debug( + f"Found {len(subject_alt_names)} Subject Alternative Names: {subject_alt_names} (besides original target host {host})" + ) + return subject_alt_names + + async def _report_interesting_default_content(self, event, canary_hostname, host_ip, canary_response): + discovery_method = "Interesting Default Content (from intentionally-incorrect canary host)" + # Build URL with explicit authority to avoid double-port issues + authority = ( + f"{event.parsed_url.hostname}:{event.parsed_url.port}" + if event.parsed_url.port is not None + else event.parsed_url.hostname + ) + # Use the explicit canary hostname used in the wildcard request (works for HTTP Host and HTTPS SNI) + canary_host = (canary_hostname or "").split(":")[0] + virtualhost_dict = { + "host": str(event.host), + "url": f"{event.parsed_url.scheme}://{authority}/", + "virtual_host": canary_host, + "description": self._build_description(discovery_method, canary_response, True, host_ip), + "ip": host_ip, + } + + await self.emit_event( + virtualhost_dict, + "VIRTUAL_HOST", + parent=event, + tags=["virtual-host"], + context=f"{{module}} discovered virtual host via {discovery_method} for {event.data} and found {{event.type}}: {canary_host}", + ) + + # Emit HTTP_RESPONSE event with the canary response data + # Format to match what badsecrets expects + headers = canary_response.get("headers", {}) + headers = self._format_headers(headers) + + # Get the scheme from the actual probe URL + probe_url = canary_response.get("url", "") + from urllib.parse import urlparse + + parsed_probe_url = urlparse(probe_url) + actual_scheme = parsed_probe_url.scheme if parsed_probe_url.scheme else "http" + + http_response_data = { + "input": canary_host, + "url": f"{actual_scheme}://{canary_host}/", + "method": "GET", + "status_code": canary_response.get("http_code", 0), + "content_length": len(canary_response.get("response_data", "")), + "body": canary_response.get("response_data", ""), # badsecrets expects 'body' + "response_data": canary_response.get("response_data", ""), # keep for compatibility + "header": headers, + "raw_header": canary_response.get("raw_headers", ""), + } + + # Include location header for redirect handling + if "location" in headers: + http_response_data["location"] = headers["location"] + + http_response_event = await self.emit_event( + http_response_data, + "HTTP_RESPONSE", + parent=event, + tags=["virtual-host"], + context=f"{{module}} discovered virtual host via {discovery_method} for {event.data} and found {{event.type}}: {canary_host}", + ) + # Set scope distance to match parent's scope distance for HTTP_RESPONSE events + if http_response_event: + http_response_event.scope_distance = event.scope_distance + + def _get_canary_random_host(self, host, basehost, mode="subdomain"): + """Generate a random host for the canary""" + # Seed RNG with domain to get consistent canary hosts for same domain + random.seed(host) + + # Generate canary hostname based on mode + if mode == "mutation": + # Prepend random 4-character string with dash to existing hostname + random_prefix = "".join(random.choice(string.ascii_lowercase) for i in range(4)) + canary_host = f"{random_prefix}-{host}" + elif mode == "subdomain": + # Default subdomain mode - add random subdomain + canary_host = "".join(random.choice(string.ascii_lowercase) for i in range(self.CANARY_LENGTH)) + basehost + elif mode == "random_append": + # Append random string to existing hostname (first domain level) + random_suffix = "".join(random.choice(string.ascii_lowercase) for i in range(4)) + canary_host = f"{host.split('.')[0]}{random_suffix}.{'.'.join(host.split('.')[1:])}" + elif mode == "random": + # Fully random hostname with .com TLD + random_host = "".join(random.choice(string.ascii_lowercase) for i in range(self.CANARY_LENGTH)) + canary_host = f"{random_host}.com" + else: + raise ValueError(f"Invalid canary mode: {mode}") + + return canary_host + + async def _get_canary_response(self, normalized_url, basehost, host_ip, is_https, mode="subdomain"): + """Setup canary response for comparison using the appropriate technique. Returns canary response or None on failure.""" + + parsed = urlparse(normalized_url) + # Use hostname without port to avoid duplicating port in canary host + host = parsed.hostname or (parsed.netloc.split(":")[0] if ":" in parsed.netloc else parsed.netloc) + + # Seed RNG with domain to get consistent canary hosts for same domain + canary_host = self._get_canary_random_host(host, basehost, mode) + + # Get canary response + if is_https: + port = parsed.port or 443 + canary_response = await self.helpers.web.curl( + url=f"https://{canary_host}:{port}/", + resolve={"host": canary_host, "port": port, "ip": host_ip}, + ) + else: + http_port = parsed.port or 80 + canary_response = await self.helpers.web.curl( + url=normalized_url, + headers={"Host": canary_host}, + resolve={"host": parsed.hostname, "port": http_port, "ip": host_ip}, + ) + + return canary_response + + async def _is_host_accessible(self, url): + """ + Check if a URL is already accessible via direct HTTP request. + Returns True if the host is accessible (and should be skipped), False otherwise. + """ + try: + response = await self.helpers.web.curl(url=url) + if response and int(response.get("http_code", 0)) > 0: + return True + else: + return False + except CurlError as e: + self.debug(f"Error checking accessibility of {url}: {e}") + return False + + async def _wildcard_canary_check(self, probe_scheme, probe_host, event, host_ip, probe_response): + """Change one char in probe_host and test - if responses are similar, it's probably a wildcard""" + + # Extract hostname and port separately to avoid corrupting the port portion + original_hostname = event.parsed_url.hostname or "" + original_port = event.parsed_url.port + + # Try to mutate the first alphabetic character in the hostname + modified_hostname = None + for i, char in enumerate(original_hostname): + if char.isalpha(): + new_char = "z" if char != "z" else "a" + modified_hostname = original_hostname[:i] + new_char + original_hostname[i + 1 :] + break + + if modified_hostname is None: + # Fallback: generate random hostname of similar length (hostname-only) + modified_hostname = "".join( + random.choice(string.ascii_lowercase) for _ in range(len(original_hostname) or 12) + ) + + # Build modified host strings for each protocol + https_modified_host_for_sni = modified_hostname + http_modified_host_for_header = f"{modified_hostname}:{original_port}" if original_port else modified_hostname + + # Test modified host + if probe_scheme == "https": + port = event.parsed_url.port or 443 + # Log the canary URL for the wildcard SNI test + self.debug( + f"CANARY URL: https://{https_modified_host_for_sni}:{port}/ [phase=wildcard-check, mode=single-char-mutation]" + ) + wildcard_canary_response = await self.helpers.web.curl( + url=f"https://{https_modified_host_for_sni}:{port}/", + resolve={"host": https_modified_host_for_sni, "port": port, "ip": host_ip}, + ) + else: + # Log the canary URL for the wildcard Host header test + http_port = event.parsed_url.port or 80 + self.debug( + f"CANARY URL: {probe_scheme}://{http_modified_host_for_header if ':' in http_modified_host_for_header else f'{http_modified_host_for_header}:{http_port}'}/ [phase=wildcard-check, mode=single-char-mutation]" + ) + wildcard_canary_response = await self.helpers.web.curl( + url=f"{probe_scheme}://{event.parsed_url.netloc}/", + headers={"Host": http_modified_host_for_header}, + resolve={"host": event.parsed_url.hostname, "port": event.parsed_url.port or 80, "ip": host_ip}, + ) + + if not wildcard_canary_response or wildcard_canary_response["http_code"] == 0: + self.debug( + f"Wildcard check: {http_modified_host_for_header} failed to respond, assuming {probe_host} is valid" + ) + return True # Modified failed, original probably valid + + # If HTTP status codes differ, consider this a pass (not wildcard) + if probe_response.get("http_code") != wildcard_canary_response.get("http_code"): + self.debug( + f"WILDCARD CHECK OK (status mismatch): {probe_host} ({probe_response.get('http_code')}) vs {http_modified_host_for_header} ({wildcard_canary_response.get('http_code')})" + ) + if ( + self.config.get("report_interesting_default_content", True) + and wildcard_canary_response.get("http_code") == 200 + and len(wildcard_canary_response.get("response_data", "")) > 40 + ): + canary_hostname = ( + https_modified_host_for_sni if probe_scheme == "https" else http_modified_host_for_header + ) + await self._report_interesting_default_content( + event, canary_hostname, host_ip, wildcard_canary_response + ) + return True + + probe_simhash = await self.helpers.run_in_executor_mp(compute_simhash, probe_response["response_data"]) + wildcard_simhash = await self.helpers.run_in_executor_mp( + compute_simhash, wildcard_canary_response["response_data"] + ) + similarity = self.helpers.simhash.similarity(probe_simhash, wildcard_simhash) + + # Compare original probe response with modified response + + result = similarity <= self.SIMILARITY_THRESHOLD + + if not result: + self.debug( + f"WILDCARD DETECTED: {probe_host} vs {http_modified_host_for_header} similarity: {similarity:.3f} (threshold: {self.SIMILARITY_THRESHOLD}) -> FAIL (wildcard detected)" + ) + else: + self.debug( + f"WILDCARD CHECK OK: {probe_host} vs {http_modified_host_for_header} similarity: {similarity:.3f} (threshold: {self.SIMILARITY_THRESHOLD}) -> PASS (not wildcard)" + ) + if ( + self.config.get("report_interesting_default_content", True) + and wildcard_canary_response.get("http_code") == 200 + and len(wildcard_canary_response.get("response_data", "")) > 40 + ): + canary_hostname = ( + https_modified_host_for_sni if probe_scheme == "https" else http_modified_host_for_header + ) + await self._report_interesting_default_content( + event, canary_hostname, host_ip, wildcard_canary_response + ) + + return result # True if they're different (good), False if similar (wildcard) + + async def _run_virtualhost_phase( + self, + discovery_method, + normalized_url, + basehost, + host_ip, + is_https, + event, + canary_mode, + wordlist=None, + skip_dns_host=False, + ): + """Helper method to run a virtual host discovery phase and optionally mutations""" + + canary_response = await self._get_canary_response( + normalized_url, basehost, host_ip, is_https, mode=canary_mode + ) + + if not canary_response: + self.debug(f"Failed to get canary response for {normalized_url}, skipping virtual host detection") + return [] + + results = await self.curl_virtualhost( + discovery_method, + normalized_url, + basehost, + event, + canary_response, + canary_mode, + wordlist, + skip_dns_host, + ) + + # Emit all valid results + for virtual_host_data in results: + # Emit VIRTUAL_HOST event + await self.emit_event( + virtual_host_data["virtualhost_dict"], + "VIRTUAL_HOST", + parent=event, + tags=["virtual-host"], + context=f"{{module}} discovered virtual host via {virtual_host_data['discovery_method']} for {event.data} and found {{event.type}}: {virtual_host_data['probe_host']} (similarity: {virtual_host_data['similarity']:.2%})", + ) + + # Emit HTTP_RESPONSE event with the probe response data + # Format to match what badsecrets expects + headers = virtual_host_data["probe_response"].get("headers", {}) + headers = self._format_headers(headers) + + # Get the scheme from the actual probe URL + probe_url = virtual_host_data["probe_response"].get("url", "") + from urllib.parse import urlparse + + parsed_probe_url = urlparse(probe_url) + actual_scheme = parsed_probe_url.scheme if parsed_probe_url.scheme else "http" + + http_response_data = { + "input": virtual_host_data["probe_host"], + "url": f"{actual_scheme}://{virtual_host_data['probe_host']}/", # Use the actual virtual host URL with correct scheme + "method": "GET", + "status_code": virtual_host_data["probe_response"].get("http_code", 0), + "content_length": len(virtual_host_data["probe_response"].get("response_data", "")), + "body": virtual_host_data["probe_response"].get("response_data", ""), # badsecrets expects 'body' + "response_data": virtual_host_data["probe_response"].get( + "response_data", "" + ), # keep for compatibility + "header": headers, + "raw_header": virtual_host_data["probe_response"].get("raw_headers", ""), + } + + # Include location header for redirect handling + if "location" in headers: + http_response_data["location"] = headers["location"] + + http_response_event = await self.emit_event( + http_response_data, + "HTTP_RESPONSE", + parent=event, + tags=["virtual-host"], + context=f"{{module}} discovered virtual host via {virtual_host_data['discovery_method']} for {event.data} and found {{event.type}}: {virtual_host_data['probe_host']}", + ) + # Set scope distance to match parent's scope distance for HTTP_RESPONSE events + if http_response_event: + http_response_event.scope_distance = event.scope_distance + + # Emit DNS_NAME_UNVERIFIED event if needed + if virtual_host_data["skip_dns_host"] is False: + await self.emit_event( + virtual_host_data["virtualhost_dict"]["virtual_host"], + "DNS_NAME_UNVERIFIED", + parent=event, + tags=["virtual-host"], + context=f"{{module}} discovered virtual host via {virtual_host_data['discovery_method']} for {event.data} and found {{event.type}}: {{event.data}}", + ) + + async def curl_virtualhost( + self, + discovery_method, + normalized_url, + basehost, + event, + canary_response, + canary_mode, + wordlist=None, + skip_dns_host=False, + ): + if wordlist is None: + wordlist = self.brute_wordlist + + # Get baseline host for comparison and determine scheme from event + baseline_host = event.parsed_url.netloc + + # Collect all words for concurrent processing + candidates_to_check = [] + for word in self.helpers.read_file(wordlist): + word = word.strip() + if not word: + continue + + # Construct virtual host header + if basehost: + # Wordlist entries are subdomain prefixes - append basehost + probe_host = f"{word}.{basehost}" + + else: + # No basehost - use as-is + probe_host = word + + # Skip if this would be the same as the original host + if probe_host == baseline_host: + continue + + candidates_to_check.append(probe_host) + + self.debug(f"Loaded {len(candidates_to_check)} candidates from wordlist for {discovery_method}") + + host_ips = event.resolved_hosts + total_tests = len(candidates_to_check) * len(host_ips) + + self.verbose( + f"Initiating {total_tests} virtual host tests ({len(candidates_to_check)} candidates × {len(host_ips)} IPs) with max {self.max_concurrent} concurrent requests" + ) + + # Collect all virtual host results before emitting + virtual_host_results = [] + + # Process results as they complete with concurrency control + try: + # Build coroutines on-demand without wrapper + coroutines = ( + self._test_virtualhost( + normalized_url, + probe_host, + basehost, + event, + canary_response, + canary_mode, + skip_dns_host, + host_ip, + discovery_method, + ) + for host_ip in host_ips + for probe_host in candidates_to_check + ) + + async for completed in self.helpers.as_completed(coroutines, self.max_concurrent): + try: + result = await completed + except CurlError as e: + if getattr(self.scan, "stopping", False) or getattr(self.scan, "aborting", False): + self.debug(f"CurlError during shutdown (suppressed): {e}") + break + self.debug(f"CurlError in virtualhost test (skipping this test): {e}") + continue + if result: # Only append non-None results + virtual_host_results.append(result) + self.debug( + f"ADDED RESULT {len(virtual_host_results)}: {result['probe_host']} (similarity: {result['similarity']:.3f}) [Status: {result['status_code']} | Size: {result['content_length']} bytes]" + ) + + # Early exit if we're clearly hitting false positives + if len(virtual_host_results) >= self.MAX_RESULTS_FLOOD_PROTECTION: + self.warning( + f"RESULT FLOOD DETECTED: found {len(virtual_host_results)} virtual hosts (limit: {self.MAX_RESULTS_FLOOD_PROTECTION}), likely false positives - stopping further tests and skipping reporting" + ) + break + + except CurlError as e: + if getattr(self.scan, "stopping", False) or getattr(self.scan, "aborting", False): + self.debug(f"CurlError in as_completed during shutdown (suppressed): {e}") + return [] + self.warning(f"CurlError in as_completed, stopping all tests: {e}") + return [] + + # Return results for emission at _run_virtualhost_phase level + return virtual_host_results + + async def _test_virtualhost( + self, + normalized_url, + probe_host, + basehost, + event, + canary_response, + canary_mode, + skip_dns_host, + host_ip, + discovery_method, + ): + """ + Test a single virtual host candidate using HTTP Host header or HTTPS SNI + Returns virtual host data if detected, None otherwise + """ + is_https = event.parsed_url.scheme == "https" + + # Make request - different approach for HTTP vs HTTPS + if is_https: + port = event.parsed_url.port or 443 + probe_response = await self.helpers.web.curl( + url=f"https://{probe_host}:{port}/", + resolve={"host": probe_host, "port": port, "ip": host_ip}, + ) + else: + port = event.parsed_url.port or 80 + probe_response = await self.helpers.web.curl( + url=normalized_url, + headers={"Host": probe_host}, + resolve={"host": event.parsed_url.hostname, "port": port, "ip": host_ip}, + ) + + if not probe_response or probe_response["response_data"] == "": + protocol = "HTTPS" if is_https else "HTTP" + self.debug(f"{protocol} probe failed for {probe_host} on ip {host_ip} - no response or empty data") + return None + + similarity = await self.analyze_response(probe_host, probe_response, canary_response, event) + if similarity is None: + return None + + # Different from canary = possibly real virtual host, similar to canary = probably junk + if similarity > self.SIMILARITY_THRESHOLD: + self.debug( + f"REJECTING {probe_host}: similarity {similarity:.3f} > threshold {self.SIMILARITY_THRESHOLD} (too similar to canary)" + ) + return None + else: + self.verbose( + f"POTENTIAL VIRTUALHOST {probe_host} sim={similarity:.3f} " + f"probe: {probe_response.get('http_code', 'N/A')} | {len(probe_response.get('response_data', ''))}B | {probe_response.get('url', 'N/A')} ; " + f"canary: {canary_response.get('http_code', 'N/A')} | {len(canary_response.get('response_data', ''))}B | {canary_response.get('url', 'N/A')}" + ) + + # Re-verify canary consistency before emission + if not await self._verify_canary_consistency( + canary_response, canary_mode, normalized_url, is_https, basehost, host_ip + ): + self.verbose( + f"CANARY CHANGED: Rejecting {probe_host}. Original canary had code {canary_response['http_code']} and response data of length {len(canary_response['response_data'])}" + ) + raise CurlError(f"Canary changed since initial test, rejecting {probe_host}") + # Canary is consistent, proceed + + probe_url = f"{event.parsed_url.scheme}://{probe_host}:{port}/" + + # Check for keyword-based virtual host wildcards + if not await self._verify_canary_keyword(probe_response, probe_url, is_https, basehost, host_ip): + return None + + # Don't emit if this would be the same as the original netloc + if probe_host == event.parsed_url.netloc: + self.verbose(f"Skipping emit for virtual host {probe_host} - is the same as the original netloc") + return None + + # Check if this virtual host is externally accessible + port = event.parsed_url.port or (443 if is_https else 80) + + is_externally_accessible = await self._is_host_accessible(probe_url) + + virtualhost_dict = { + "host": str(event.host), + "url": normalized_url, + "virtual_host": probe_host, + "description": self._build_description( + discovery_method, probe_response, is_externally_accessible, host_ip + ), + "ip": host_ip, + } + + # Skip if we require inaccessible hosts and this one is accessible + if self.config.get("require_inaccessible", True) and is_externally_accessible: + self.verbose( + f"Skipping emit for virtual host {probe_host} - is externally accessible and require_inaccessible is True" + ) + return None + + # Return data for emission at _run_virtualhost_phase level + technique = "SNI" if is_https else "Host header" + return { + "virtualhost_dict": virtualhost_dict, + "similarity": similarity, + "probe_host": probe_host, + "skip_dns_host": skip_dns_host, + "discovery_method": f"{discovery_method} ({technique})", + "status_code": probe_response.get("http_code", "N/A"), + "content_length": len(probe_response.get("response_data", "")), + "probe_response": probe_response, + } + + async def analyze_response(self, probe_host, probe_response, canary_response, event): + probe_status = probe_response["http_code"] + canary_status = canary_response["http_code"] + + # Check for invalid/no response - skip processing + if probe_status == 0 or not probe_response.get("response_data"): + self.debug(f"SKIPPING {probe_host} - no valid HTTP response (status: {probe_status})") + return None + + if probe_status == 400: + self.debug(f"SKIPPING {probe_host} - got 400 Bad Request") + return None + + # Check for 421 Misdirected Request - clear signal that virtual host doesn't exist + if probe_status == 421: + self.debug(f"SKIPPING {probe_host} - got 421 Misdirected Request (SNI not configured)") + return None + + if probe_status == 502 or probe_status == 503: + self.debug(f"SKIPPING {probe_host} - got 502 or 503 Bad Gateway") + return None + + # Check for 403 Forbidden - signal that the virtual host is rejected (unless we started with a 403) + if probe_status == 403 and canary_status != 403: + self.debug(f"SKIPPING {probe_host} - got 403 Forbidden when canary status was {canary_status}") + return None + + if probe_status == 508: + self.debug(f"SKIPPING {probe_host} - got 508 Loop Detected") + return None + + # Check for redirects back to original domain - indicates virtual host just redirects to canonical + if probe_status in [301, 302]: + redirect_url = probe_response.get("redirect_url", "") + if redirect_url and str(event.parsed_url.netloc) in redirect_url: + self.debug(f"SKIPPING {probe_host} - redirects back to original domain {event.parsed_url.netloc}") + return None + + if any(waf_string in probe_response["response_data"] for waf_string in self.waf_strings): + self.debug(f"SKIPPING {probe_host} - got WAF response") + return None + + # Calculate content similarity to canary (junk response) + # Use probe hostname for normalization to remove hostname reflection differences + + probe_simhash = await self.helpers.run_in_executor_mp( + compute_simhash, probe_response["response_data"], normalization_filter=probe_host + ) + canary_simhash = await self.helpers.run_in_executor_mp( + compute_simhash, canary_response["response_data"], normalization_filter=probe_host + ) + + similarity = self.helpers.simhash.similarity(probe_simhash, canary_simhash) + + if similarity <= self.SIMILARITY_THRESHOLD: + self.verbose( + f"POTENTIAL MATCH: {probe_host} vs canary - similarity: {similarity:.3f} (threshold: {self.SIMILARITY_THRESHOLD}), probe status: {probe_status}, canary status: {canary_status}" + ) + + return similarity + + async def _verify_canary_keyword(self, original_response, probe_url, is_https, basehost, host_ip): + """Perform last-minute check on the canary for keyword-based virtual host wildcards""" + + try: + keyword_canary_response = await self._get_canary_response( + probe_url, basehost, host_ip, is_https, mode="random_append" + ) + except CurlError as e: + self.warning(f"Canary verification failed due to curl error: {e}") + return False + + if not keyword_canary_response: + return False + + # If we get the exact same content after altering the hostname, keyword based virtual host routing is likely being used + if keyword_canary_response["response_data"] == original_response["response_data"]: + self.verbose( + f"Intentionally wrong hostname has a canary too similar to the original. Using probe url: {probe_url} - response data is exactly the same" + ) + return False + + original_simhash = await self.helpers.run_in_executor_mp(compute_simhash, original_response["response_data"]) + keyword_simhash = await self.helpers.run_in_executor_mp( + compute_simhash, keyword_canary_response["response_data"] + ) + similarity = self.helpers.simhash.similarity(original_simhash, keyword_simhash) + + if similarity >= self.SIMILARITY_THRESHOLD: + self.verbose( + f"Intentionally wrong hostname has a canary too similar to the original. Using probe url: {probe_url} - similarity: {similarity:.3f} above threshold {self.SIMILARITY_THRESHOLD} - Original: {original_response.get('http_code', 'N/A')} ({len(original_response.get('response_data', ''))} bytes), Current: {keyword_canary_response.get('http_code', 'N/A')} ({len(keyword_canary_response.get('response_data', ''))} bytes)" + ) + return False + return True + + async def _verify_canary_consistency( + self, original_canary_response, canary_mode, normalized_url, is_https, basehost, host_ip + ): + """Perform last-minute check on the canary for consistency""" + + # Re-run the same canary test as we did initially + try: + consistency_canary_response = await self._get_canary_response( + normalized_url, basehost, host_ip, is_https, mode=canary_mode + ) + except CurlError as e: + self.warning(f"Canary verification failed due to curl error: {e}") + return False + + if not consistency_canary_response: + return False + + # Check if HTTP codes are different first (hard failure) + if original_canary_response["http_code"] != consistency_canary_response["http_code"]: + self.verbose( + f"CANARY HTTP CODE CHANGED for {normalized_url} - Original: {original_canary_response.get('http_code', 'N/A')} ({len(original_canary_response.get('response_data', ''))} bytes), Current: {consistency_canary_response.get('http_code', 'N/A')} ({len(consistency_canary_response.get('response_data', ''))} bytes)" + ) + return False + + # if response data is exactly the same, we're good + if original_canary_response["response_data"] == consistency_canary_response["response_data"]: + return True + + # Fallback - use similarity comparison for response data (allows slight differences) + original_simhash = await self.helpers.run_in_executor_mp( + compute_simhash, original_canary_response["response_data"] + ) + consistency_simhash = await self.helpers.run_in_executor_mp( + compute_simhash, consistency_canary_response["response_data"] + ) + similarity = self.helpers.simhash.similarity(original_simhash, consistency_simhash) + if similarity < self.SIMILARITY_THRESHOLD: + self.verbose( + f"CANARY SIMILARITY CHANGED for {normalized_url} - similarity: {similarity:.3f} below threshold {self.SIMILARITY_THRESHOLD} - Original: {original_canary_response.get('http_code', 'N/A')} ({len(original_canary_response.get('response_data', ''))} bytes), Current: {consistency_canary_response.get('http_code', 'N/A')} ({len(consistency_canary_response.get('response_data', ''))} bytes)" + ) + return False + return True + + def _extract_title(self, response_data): + """Extract title from HTML response""" + soup = self.helpers.beautifulsoup(response_data, "html.parser") + if soup and soup.title and soup.title.string: + return soup.title.string.strip() + return None + + def _build_description(self, discovery_string, probe_response, is_externally_accessible=None, host_ip=None): + """Build detailed description with discovery technique and content info""" + http_code = probe_response.get("http_code", "N/A") + response_size = len(probe_response.get("response_data", "")) + + description = f"Discovery Technique: [{discovery_string}], Discovered Content: [Status Code: {http_code}]" + + # Add title if available + title = self._extract_title(probe_response.get("response_data", "")) + if title: + description += f" [Title: {title}]" + description += f" [Size: {response_size} bytes]" + + # Add IP address if available + if host_ip: + description += f" [IP: {host_ip}]" + + # Add accessibility information if available + if is_externally_accessible is not None: + accessibility_status = "externally accessible" if is_externally_accessible else "not externally accessible" + description += f" [Access: {accessibility_status}]" + + return description + + def mutations_check(self, virtualhost): + mutations_list = [] + for mutation in self.helpers.word_cloud.mutations(virtualhost, cloud=False): + mutations_list.extend(["".join(mutation), "-".join(mutation)]) + mutations_list_file = self.helpers.tempfile(mutations_list, pipe=False) + return mutations_list_file + + async def finish(self): + # phase 5: check existing hosts with wordcloud + self.verbose(" === Starting Finish() Wordcloud check === ") + if not self.config.get("wordcloud_check", False): + self.debug("FINISH METHOD: Wordcloud check is disabled, skipping finish phase") + return + + if not self.helpers.word_cloud.keys(): + self.verbose("FINISH METHOD: No wordcloud data available for finish phase") + return + + # Filter wordcloud words: no dots, reasonable length limit + all_wordcloud_words = list(self.helpers.word_cloud.keys()) + filtered_words = [] + for word in all_wordcloud_words: + # Filter out words with dots (likely full domains) + if "." in word: + continue + # Filter out very long words (likely noise) + if len(word) > 15: + continue + # Filter out very short words (likely noise) + if len(word) < 2: + continue + filtered_words.append(word) + + tempfile = self.helpers.tempfile(filtered_words, pipe=False) + self.debug( + f"FINISH METHOD: Starting wordcloud check on {len(self.scanned_hosts)} hosts using {len(filtered_words)} filtered words from wordcloud" + ) + + for host, event in self.scanned_hosts.items(): + if host not in self.wordcloud_tried_hosts: + host_parsed_url = urlparse(host) + + if self.config.get("force_basehost"): + basehost = self.config.get("force_basehost") + else: + basehost, subdomain = self._get_basehost(event) + + # Get fresh canary and original response for this host + is_https = host_parsed_url.scheme == "https" + host_ip = next(iter(event.resolved_hosts)) + + self.verbose(f"FINISH METHOD: Starting wildcard check for {host}") + baseline_response = await self._get_baseline_response(event, host, host_ip) + if not await self._wildcard_canary_check( + host_parsed_url.scheme, host_parsed_url.netloc, event, host_ip, baseline_response + ): + self.debug( + f"WILDCARD CHECK FAILED in finish: Skipping {host} in wordcloud phase - failed virtual host wildcard check" + ) + self.wordcloud_tried_hosts.add(host) # Mark as tried to avoid retrying + continue + else: + self.debug(f"WILDCARD CHECK PASSED in finish: Proceeding with wordcloud mutations for {host}") + + await self._run_virtualhost_phase( + "Target host wordcloud mutations", + host, + basehost, + host_ip, + is_https, + event, + "subdomain", + wordlist=tempfile, + ) + self.wordcloud_tried_hosts.add(host) + + async def filter_event(self, event): + if ( + "cdn-cloudflare" in event.tags + or "cdn-imperva" in event.tags + or "cdn-akamai" in event.tags + or "cdn-cloudfront" in event.tags + ): + self.debug(f"Not processing URL {event.data} because it's behind a WAF or CDN, and that's pointless") + return False + return True diff --git a/bbot/modules/waf_bypass.py b/bbot/modules/waf_bypass.py new file mode 100644 index 0000000000..dc7fe38151 --- /dev/null +++ b/bbot/modules/waf_bypass.py @@ -0,0 +1,304 @@ +from radixtarget.tree.ip import IPRadixTree +from bbot.modules.base import BaseModule +from bbot.core.helpers.simhash import compute_simhash + + +class waf_bypass(BaseModule): + """ + Module to detect WAF bypasses by finding direct IP access to WAF-protected content. + + Overview: + Throughout the scan, we collect: + 1. WAF-protected domains (identified by CloudFlare/Imperva tags) and their SimHash content fingerprints + 2. All domain->IP mappings from DNS resolution of URL events + 3. Cloud IPs separately tracked via "cloud-ip" tags + + In finish(), we test if WAF-protected content can be accessed directly via IPs from non-protected domains. + Optionally, it explores IP neighbors within the same ASN to find additional bypass candidates. + """ + + watched_events = ["URL"] + produced_events = ["VULNERABILITY"] + options = { + "similarity_threshold": 0.90, + "search_ip_neighbors": True, + "neighbor_cidr": 24, # subnet size to explore when gathering neighbor IPs + } + + options_desc = { + "similarity_threshold": "Similarity threshold for content matching", + "search_ip_neighbors": "Also check IP neighbors of the target domain", + "neighbor_cidr": "CIDR mask (24-31) used for neighbor enumeration when search_ip_neighbors is true", + } + flags = ["active", "safe", "web-thorough"] + meta = { + "description": "Detects potential WAF bypasses", + "author": "@liquidsec", + "created_date": "2025-09-26", + } + + async def setup(self): + # Track protected domains and their potential bypass CIDRs + self.protected_domains = {} # {domain: event} - track protected domains and store their parent events + self.domain_ip_map = {} # {full_domain: set(ips)} - track all IPs for each domain + self.content_fingerprints = {} # {url: {simhash, http_code}} - track the content fingerprints for each URL + self.similarity_threshold = self.config.get("similarity_threshold", 0.90) + self.search_ip_neighbors = self.config.get("search_ip_neighbors", True) + self.neighbor_cidr = int(self.config.get("neighbor_cidr", 24)) + + if self.search_ip_neighbors and not (24 <= self.neighbor_cidr <= 31): + self.warning(f"Invalid neighbor_cidr {self.neighbor_cidr}. Must be between 24 and 31.") + return False + # Keep track of (protected_domain, ip) pairs we have already attempted to bypass + self.attempted_bypass_pairs = set() + # Keep track of any IPs that came from hosts that are "cloud-ips" + self.cloud_ips = set() + return True + + async def filter_event(self, event): + if "endpoint" in event.tags: + return False, "WAF bypass module only considers directory URLs" + return True + + async def handle_event(self, event): + domain = str(event.host) + url = str(event.data) + + # Store the IPs that each domain (that came from a URL event) resolves to. We have to resolve ourself, since normal BBOT DNS resolution doesn't keep ALL the IPs + domain_dns_response = await self.helpers.dns.resolve(domain) + if domain_dns_response: + if domain not in self.domain_ip_map: + self.domain_ip_map[domain] = set() + for ip in domain_dns_response: + ip_str = str(ip) + # Validate that this is actually an IP address before storing + if self.helpers.is_ip(ip_str): + self.domain_ip_map[domain].add(ip_str) + self.debug(f"Mapped domain {domain} to IP {ip_str}") + if "cloud-ip" in event.tags: + self.cloud_ips.add(ip_str) + self.debug(f"Added cloud-ip {ip_str} to cloud_ips") + else: + self.warning(f"DNS resolution for {domain} returned non-IP result: {ip_str}") + else: + self.warning(f"DNS resolution failed for {domain}") + + # Detect WAF/CDN protection based on tags + provider_name = None + if "cdn-cloudflare" in event.tags or "waf-cloudflare" in event.tags: + provider_name = "CloudFlare" + elif "cdn-imperva" in event.tags: + provider_name = "Imperva" + + is_protected = provider_name is not None + + if is_protected: + self.debug(f"{provider_name} protection detected via tags: {event.tags}") + # Save the full domain and event for WAF-protected URLs, this is necessary to find the appropriate parent event later in .finish() + self.protected_domains[domain] = event + self.debug(f"Found {provider_name}-protected domain: {domain}") + + curl_response = await self.get_url_content(url) + if not curl_response: + self.debug(f"Failed to get response from protected URL {url}") + return + + if not curl_response["response_data"]: + self.debug(f"Failed to get content from protected URL {url}") + return + + # Store a "simhash" (fuzzy hash) of the response data for later comparison + simhash = await self.helpers.run_in_executor_mp(compute_simhash, curl_response["response_data"]) + + self.content_fingerprints[url] = { + "simhash": simhash, + "http_code": curl_response["http_code"], + } + self.debug( + f"Stored simhash of response from {url} (content length: {len(curl_response['response_data'])})" + ) + + async def get_url_content(self, url, ip=None): + """Helper function to fetch content from a URL, optionally through specific IP""" + try: + if ip: + # Build resolve dict for curl helper + host_tuple = self.helpers.extract_host(url) + if not host_tuple[0]: + self.warning(f"Failed to extract host from URL: {url}") + return None + host = host_tuple[0] + + # Determine port from scheme (default 443/80) or explicit port in URL + try: + from urllib.parse import urlparse + + parsed = urlparse(url) + port = parsed.port or (443 if parsed.scheme == "https" else 80) + except Exception: + port = 443 # safe default for https + + self.debug(f"Fetching via curl with --resolve {host}:{port}:{ip} for {url}") + + curl_response = await self.helpers.web.curl( + url=url, + resolve={"host": host, "port": port, "ip": ip}, + ) + + if curl_response: + return curl_response + else: + self.debug(f"curl returned no content for {url} via IP {ip}") + else: + response = await self.helpers.web.curl(url=url) + if not response: + self.debug(f"No response received from {url}") + return None + elif response.get("http_code", 0) in [200, 301, 302, 500]: + return response + else: + self.debug( + f"Failed to fetch content from {url} - Status: {response.get('http_code', 'unknown')} (not in allowed list)" + ) + return None + except Exception as e: + self.debug(f"Error fetching content from {url}: {str(e)}") + return None + + async def check_ip(self, ip, source_domain, protected_domain, source_event): + matching_url = next((url for url in self.content_fingerprints.keys() if protected_domain in url), None) + + if not matching_url: + self.debug(f"No matching URL found for {protected_domain} in stored fingerprints") + return None + + original_response = self.content_fingerprints.get(matching_url) + if not original_response: + self.debug(f"did not get original response for {matching_url}") + return None + + self.verbose(f"Bypass attempt: {protected_domain} via {ip} from {source_domain}") + + bypass_response = await self.get_url_content(matching_url, ip) + bypass_simhash = await self.helpers.run_in_executor_mp(compute_simhash, bypass_response["response_data"]) + if not bypass_response: + self.debug(f"Failed to get content through IP {ip} for URL {matching_url}") + return None + + if original_response["http_code"] != bypass_response["http_code"]: + self.debug(f"Ignoring code difference {original_response['http_code']} != {bypass_response['http_code']}") + return None + + is_redirect = False + if bypass_response["http_code"] == 301 or bypass_response["http_code"] == 302: + is_redirect = True + + similarity = self.helpers.simhash.similarity(original_response["simhash"], bypass_simhash) + + # For redirects, require exact match (1.0), otherwise use configured threshold + required_threshold = 1.0 if is_redirect else self.similarity_threshold + return (matching_url, ip, similarity, source_event) if similarity >= required_threshold else None + + async def finish(self): + self.verbose(f"Found {len(self.protected_domains)} Protected Domains") + + confirmed_bypasses = [] # [(protected_url, matching_ip, similarity)] + ip_bypass_candidates = {} # {ip: domain} + waf_ips = set() + + # First collect all the WAF-protected DOMAINS we've seen + for protected_domain in self.protected_domains: + if protected_domain in self.domain_ip_map: + waf_ips.update(self.domain_ip_map[protected_domain]) + + # Then collect all the non-WAF-protected IPs we've seen + for domain, ips in self.domain_ip_map.items(): + self.debug(f"Checking IP {ips} from domain {domain}") + if domain not in self.protected_domains: # If it's not a protected domain + for ip in ips: + # Validate that this is actually an IP address before processing + if not self.helpers.is_ip(ip): + self.warning(f"Skipping non-IP address '{ip}' found in domain_ip_map for {domain}") + continue + + if ip not in waf_ips: # And IP isn't a known WAF IP + ip_bypass_candidates[ip] = domain + self.debug(f"Added potential bypass IP {ip} from domain {domain}") + + # if we have IP neighbors searching enabled, and the IP isn't a cloud IP, we can add the IP neighbors to our list of potential bypasses + if self.search_ip_neighbors and ip not in self.cloud_ips: + import ipaddress + + # Get the ASN data for the IP - used later to keep brute force from crossing ASN boundaries + asn_data = await self.helpers.asn.ip_to_subnets(str(ip)) + if asn_data: + # Build a radix tree of the ASN subnets for the IP + asn_subnets_tree = IPRadixTree() + for subnet in asn_data["subnets"]: + asn_subnets_tree.insert(subnet, data=True) + + # Generate a network based on the neighbor_cidr option + neighbor_net = ipaddress.ip_network(f"{ip}/{self.neighbor_cidr}", strict=False) + for neighbor_ip in neighbor_net.hosts(): + neighbor_ip_str = str(neighbor_ip) + # Don't add the neighbor IP if its: ip we started with, a waf ip, or already in the list + if ( + neighbor_ip_str == ip + or neighbor_ip_str in waf_ips + or neighbor_ip_str in ip_bypass_candidates + ): + continue + + # make sure we aren't crossing an ASN boundary with our neighbor exploration + if asn_subnets_tree.get_node(neighbor_ip_str): + self.debug( + f"Added Neighbor IP ({ip} -> {neighbor_ip_str}) as potential bypass IP derived from {domain}" + ) + ip_bypass_candidates[neighbor_ip_str] = domain + else: + self.debug(f"IP {ip} is in WAF IPS so we don't check as potential bypass") + + self.verbose(f"\nFound {len(ip_bypass_candidates)} non-WAF IPs to check") + + coros = [] + new_pairs_count = 0 + + for protected_domain, source_event in self.protected_domains.items(): + for ip, src in ip_bypass_candidates.items(): + combo = (protected_domain, ip) + if combo in self.attempted_bypass_pairs: + continue + self.attempted_bypass_pairs.add(combo) + new_pairs_count += 1 + self.debug(f"Checking {ip} for {protected_domain} from {src}") + coros.append(self.check_ip(ip, src, protected_domain, source_event)) + + self.verbose( + f"Checking {new_pairs_count} new bypass pairs (total attempted: {len(self.attempted_bypass_pairs)})..." + ) + + self.debug(f"about to start {len(coros)} coroutines") + async for completed in self.helpers.as_completed(coros): + result = await completed + if result: + confirmed_bypasses.append(result) + + if confirmed_bypasses: + # Aggregate by URL and similarity + agg = {} + for matching_url, ip, similarity, src_evt in confirmed_bypasses: + rec = agg.setdefault((matching_url, similarity), {"ips": [], "event": src_evt}) + rec["ips"].append(ip) + + for (matching_url, sim_key), data in agg.items(): + ip_list = data["ips"] + ip_list_str = ", ".join(sorted(set(ip_list))) + await self.emit_event( + { + "severity": "MEDIUM", + "url": matching_url, + "description": f"WAF Bypass Confirmed - Direct IPs: {ip_list_str} for {matching_url}. Similarity {sim_key:.2%}", + }, + "VULNERABILITY", + data["event"], + ) diff --git a/bbot/presets/waf-bypass.yml b/bbot/presets/waf-bypass.yml new file mode 100644 index 0000000000..801782538b --- /dev/null +++ b/bbot/presets/waf-bypass.yml @@ -0,0 +1,19 @@ +description: WAF bypass detection with subdomain enumeration + +flags: + # enable subdomain enumeration to find potential bypass targets + - subdomain-enum + +modules: + # explicitly enable the waf_bypass module for detection + - waf_bypass + # ensure httpx is enabled for web probing + - httpx + +config: + # waf_bypass module configuration + modules: + waf_bypass: + similarity_threshold: 0.90 + search_ip_neighbors: true + neighbor_cidr: 24 \ No newline at end of file diff --git a/bbot/presets/web/virtualhost-heavy.yml b/bbot/presets/web/virtualhost-heavy.yml new file mode 100644 index 0000000000..f195a6591a --- /dev/null +++ b/bbot/presets/web/virtualhost-heavy.yml @@ -0,0 +1,16 @@ +description: Scan heavily for virtual hosts, with a focus on discovering as many valid virtual hosts as possible + +modules: + - httpx + - virtualhost + +config: + modules: + virtualhost: + require_inaccessible: False + wordcloud_check: True + subdomain_brute: True + mutation_check: True + special_hosts: True + certificate_sans: True + diff --git a/bbot/presets/web/virtualhost-light.yml b/bbot/presets/web/virtualhost-light.yml new file mode 100644 index 0000000000..70f5fcde40 --- /dev/null +++ b/bbot/presets/web/virtualhost-light.yml @@ -0,0 +1,16 @@ +description: Scan for virtual hosts, with a focus on hidden normally not accessible content + +modules: + - httpx + - virtualhost + +config: + modules: + virtualhost: + require_inaccessible: True + wordcloud_check: False + subdomain_brute: False + mutation_check: True + special_hosts: False + certificate_sans: True + diff --git a/bbot/test/test_step_1/test_web.py b/bbot/test/test_step_1/test_web.py index dd526167bb..0913d66744 100644 --- a/bbot/test/test_step_1/test_web.py +++ b/bbot/test/test_step_1/test_web.py @@ -354,30 +354,61 @@ async def test_web_curl(bbot_scanner, bbot_httpserver): url = bbot_httpserver.url_for("/curl") bbot_httpserver.expect_request(uri="/curl").respond_with_data("curl_yep") bbot_httpserver.expect_request(uri="/index.html").respond_with_data("curl_yep_index") - assert await helpers.curl(url=url) == "curl_yep" - assert await helpers.curl(url=url, ignore_bbot_global_settings=True) == "curl_yep" - assert (await helpers.curl(url=url, head_mode=True)).startswith("HTTP/") - assert await helpers.curl(url=url, raw_body="body") == "curl_yep" - assert ( - await helpers.curl( - url=url, - raw_path=True, - headers={"test": "test", "test2": ["test2"]}, - ignore_bbot_global_settings=False, - post_data={"test": "test"}, - method="POST", - cookies={"test": "test"}, - path_override="/index.html", - ) - == "curl_yep_index" + + result1 = await helpers.curl(url=url) + assert result1["response_data"] == "curl_yep" + + result2 = await helpers.curl(url=url, ignore_bbot_global_settings=True) + assert result2["response_data"] == "curl_yep" + + result3 = await helpers.curl(url=url) + assert result3["response_data"] == "curl_yep" + + result4 = await helpers.curl(url=url, raw_body="body") + assert result4["response_data"] == "curl_yep" + + result5 = await helpers.curl( + url=url, + raw_path=True, + headers={"test": "test", "test2": ["test2"]}, + ignore_bbot_global_settings=False, + post_data={"test": "test"}, + method="POST", + cookies={"test": "test"}, + path_override="/index.html", ) + assert result5["response_data"] == "curl_yep_index" + # test custom headers bbot_httpserver.expect_request("/test-custom-http-headers-curl", headers={"test": "header"}).respond_with_data( "curl_yep_headers" ) headers_url = bbot_httpserver.url_for("/test-custom-http-headers-curl") curl_result = await helpers.curl(url=headers_url) - assert curl_result == "curl_yep_headers" + assert curl_result["response_data"] == "curl_yep_headers" + + assert "http_code" in curl_result + assert curl_result["http_code"] == 200 + assert "url_effective" in curl_result + assert "content_type" in curl_result + assert "size_download" in curl_result + assert "time_total" in curl_result + assert "speed_download" in curl_result + + # NEW: Test metadata types and ranges + assert isinstance(curl_result["http_code"], int) + assert isinstance(curl_result["size_download"], (int, float)) + assert isinstance(curl_result["time_total"], (int, float)) + assert isinstance(curl_result["speed_download"], (int, float)) + assert curl_result["size_download"] >= 0 + assert curl_result["time_total"] >= 0 + + # NEW: Test that all results have consistent metadata structure + for result in [result1, result2, result3, result4, result5, curl_result]: + assert "response_data" in result + assert "http_code" in result + assert "url_effective" in result + assert isinstance(result, dict) await scan._cleanup() diff --git a/bbot/test/test_step_2/module_tests/test_module_generic_ssrf.py b/bbot/test/test_step_2/module_tests/test_module_generic_ssrf.py index c0911fd661..23e6c7c731 100644 --- a/bbot/test/test_step_2/module_tests/test_module_generic_ssrf.py +++ b/bbot/test/test_step_2/module_tests/test_module_generic_ssrf.py @@ -15,6 +15,9 @@ def extract_subdomain_tag(data): class TestGeneric_SSRF(ModuleTestBase): targets = ["http://127.0.0.1:8888"] modules_overrides = ["httpx", "generic_ssrf"] + config_overrides = { + "interactsh_disable": False, + } def request_handler(self, request): subdomain_tag = None @@ -34,9 +37,15 @@ def request_handler(self, request): async def setup_before_prep(self, module_test): self.interactsh_mock_instance = module_test.mock_interactsh("generic_ssrf") - module_test.monkeypatch.setattr( - module_test.scan.helpers, "interactsh", lambda *args, **kwargs: self.interactsh_mock_instance - ) + + # Mock at the helper creation level BEFORE modules are set up + def mock_interactsh_factory(*args, **kwargs): + return self.interactsh_mock_instance + + # Apply the mock to the core helpers so modules get the mock during setup + from bbot.core.helpers.helper import ConfigAwareHelper + + module_test.monkeypatch.setattr(ConfigAwareHelper, "interactsh", mock_interactsh_factory) async def setup_after_prep(self, module_test): expect_args = re.compile("/") diff --git a/bbot/test/test_step_2/module_tests/test_module_virtualhost.py b/bbot/test/test_step_2/module_tests/test_module_virtualhost.py new file mode 100644 index 0000000000..55ac0f4b2a --- /dev/null +++ b/bbot/test/test_step_2/module_tests/test_module_virtualhost.py @@ -0,0 +1,892 @@ +from .base import ModuleTestBase, tempwordlist +import re +from werkzeug.wrappers import Response + + +class VirtualhostTestBase(ModuleTestBase): + """Base class for virtualhost tests with common setup""" + + async def setup_before_prep(self, module_test): + # Fix randomness for predictable canary generation + module_test.monkeypatch.setattr("random.seed", lambda x: None) + import string + + def predictable_choice(seq): + return seq[0] if seq == string.ascii_lowercase else seq[0] + + module_test.monkeypatch.setattr("random.choice", predictable_choice) + + async def setup_after_prep(self, module_test): + expect_args = re.compile("/") + module_test.set_expect_requests_handler(expect_args=expect_args, request_handler=self.request_handler) + + +class TestVirtualhostSpecialHosts(VirtualhostTestBase): + """Test special hosts detection""" + + targets = ["http://localhost:8888"] + modules_overrides = ["httpx", "virtualhost"] + config_overrides = { + "modules": { + "virtualhost": { + "subdomain_brute": False, # Focus on special hosts only + "mutation_check": False, # Focus on special hosts only + "special_hosts": True, # Enable special hosts + "certificate_sans": False, + "wordcloud_check": False, + "require_inaccessible": False, + } + } + } + + async def setup_after_prep(self, module_test): + # Keep request handler-based HTTP server + await super().setup_after_prep(module_test) + + # Emit URL event manually and ensure resolved_hosts + from bbot.modules.base import BaseModule + + class DummyModule(BaseModule): + _name = "dummy_module_special" + watched_events = ["SCAN"] + + async def handle_event(self, event): + if event.type == "SCAN": + url_event = self.scan.make_event( + "http://localhost:8888/", + "URL", + parent=event, + tags=["status-200", "ip-127.0.0.1"], + ) + await self.emit_event(url_event) + + module_test.scan.modules["dummy_module_special"] = DummyModule(module_test.scan) + + # Patch virtualhost to inject resolved_hosts + vh_module = module_test.scan.modules["virtualhost"] + orig_handle_event = vh_module.handle_event + + async def patched_handle_event(ev): + ev._resolved_hosts = {"127.0.0.1"} + return await orig_handle_event(ev) + + module_test.monkeypatch.setattr(vh_module, "handle_event", patched_handle_event) + + def request_handler(self, request): + host_header = request.headers.get("Host", "").lower() + + # Baseline request to localhost (with or without port) + if not host_header or host_header in ["localhost", "localhost:8888"]: + return Response("baseline response from localhost", status=200) + + # Wildcard canary check + if re.match(r"[a-z]ocalhost(?::8888)?$", host_header): + return Response("different wildcard response", status=404) + + # Random canary requests (12 lowercase letters .com) + if re.match(r"^[a-z]{12}\.com(?::8888)?$", host_header): + return Response( + """ +404 Not Found

Not Found

Random canary host.

""", + status=404, + ) + + # Special hosts responses - return different content than canary + if host_header == "host.docker.internal": + return Response("Docker internal host active", status=200) + if host_header == "127.0.0.1": + return Response("Loopback host active", status=200) + if host_header == "localhost": + return Response("Localhost virtual host active", status=200) + + # Default for any other requests - match canary content to avoid false positives + return Response( + """ +404 Not Found

Not Found

Random canary host.

""", + status=404, + ) + + def check(self, module_test, events): + special_hosts_found = set() + for e in events: + if e.type == "VIRTUAL_HOST": + vhost = e.data["virtual_host"] + if vhost in ["host.docker.internal", "127.0.0.1", "localhost"]: + special_hosts_found.add(vhost) + + # Test description elements to ensure they are as expected + description = e.data["description"] + assert ( + "Discovery Technique: [Special virtual host list" in description + or "Discovery Technique: [Mutations on discovered" in description + ), f"Description missing or unexpected discovery technique: {description}" + assert "Status Code:" in description, f"Description missing status code: {description}" + assert "Size:" in description and "bytes" in description, ( + f"Description missing size: {description}" + ) + assert "IP: 127.0.0.1" in description, f"Description missing IP: {description}" + assert "Access:" in description, f"Description missing access status: {description}" + + assert len(special_hosts_found) >= 1, f"Failed to detect special virtual hosts. Found: {special_hosts_found}" + + +class TestVirtualhostBruteForce(VirtualhostTestBase): + """Test subdomain brute-force detection using HTTP Host headers""" + + targets = ["http://test.example:8888"] + modules_overrides = ["virtualhost"] # Remove httpx, we'll manually create URL events + test_wordlist = ["admin", "api", "test"] + config_overrides = { + "modules": { + "virtualhost": { + "brute_wordlist": tempwordlist(test_wordlist), + "subdomain_brute": True, # Enable brute force + "mutation_check": False, # Focus on brute force only + "special_hosts": False, # Focus on brute force only + "certificate_sans": False, + "wordcloud_check": False, + "require_inaccessible": False, + } + } + } + + async def setup_after_prep(self, module_test): + # Call parent setup_after_prep to set up the HTTP server with request_handler + await super().setup_after_prep(module_test) + + # Set up DNS mocking for test.example to resolve to 127.0.0.1 + await module_test.mock_dns({"test.example": {"A": ["127.0.0.1"]}}) + + # Create a dummy module that will emit the URL event during the scan + from bbot.modules.base import BaseModule + + class DummyModule(BaseModule): + _name = "dummy_module" + watched_events = ["SCAN"] + + async def handle_event(self, event): + if event.type == "SCAN": + # Create and emit URL event for virtualhost module to process + url_event = self.scan.make_event( + "http://test.example:8888/", "URL", parent=event, tags=["status-200", "ip-127.0.0.1"] + ) + await self.emit_event(url_event) + + # Add the dummy module to the scan + dummy_module = DummyModule(module_test.scan) + module_test.scan.modules["dummy_module"] = dummy_module + + # Patch virtualhost to inject resolved_hosts for URL events during the test + vh_module = module_test.scan.modules["virtualhost"] + orig_handle_event = vh_module.handle_event + + async def patched_handle_event(ev): + ev._resolved_hosts = {"127.0.0.1"} + return await orig_handle_event(ev) + + module_test.monkeypatch.setattr(vh_module, "handle_event", patched_handle_event) + + def request_handler(self, request): + from werkzeug.wrappers import Response + + host_header = request.headers.get("Host", "").lower() + + # Baseline request to test.example or example (with or without port) + if not host_header or host_header in ["test.example", "test.example:8888", "example", "example:8888"]: + return Response("baseline response from example baseline", status=200) + + # Wildcard canary check - change one character in test.example + if re.match(r"[a-z]est\.example", host_header): + return Response("wildcard canary different response", status=404) + + # Brute-force canary requests - random string + .test.example (with optional port) + if re.match(r"^[a-z]{12}\.test\.example(?::8888)?$", host_header): + return Response("subdomain canary response", status=404) + + # Brute-force matches on discovered basehost (admin|api|test).test.example (with optional port) + if host_header in ["admin.test.example", "admin.test.example:8888"]: + return Response("Admin panel found here!", status=200) + if host_header in ["api.test.example", "api.test.example:8888"]: + return Response("API endpoint found here!", status=200) + if host_header in ["test.test.example", "test.test.example:8888"]: + return Response("Test environment found here!", status=200) + + # Default response + return Response("default response", status=404) + + def check(self, module_test, events): + brute_hosts_found = set() + for e in events: + if e.type == "VIRTUAL_HOST": + vhost = e.data["virtual_host"] + if vhost in ["admin.test.example", "api.test.example", "test.test.example"]: + brute_hosts_found.add(vhost) + + assert len(brute_hosts_found) >= 1, f"Failed to detect brute-force virtual hosts. Found: {brute_hosts_found}" + + +class TestVirtualhostMutations(VirtualhostTestBase): + """Test host mutation detection using HTTP Host headers""" + + targets = ["http://subdomain.target.test:8888"] + modules_overrides = ["httpx", "virtualhost"] + config_overrides = { + "modules": { + "virtualhost": { + "subdomain_brute": False, # Focus on mutations only + "mutation_check": True, # Enable mutations + "special_hosts": False, # Focus on mutations only + "certificate_sans": False, + "wordcloud_check": False, + "require_inaccessible": False, + } + } + } + + async def setup_before_prep(self, module_test): + # Call parent setup first + await super().setup_before_prep(module_test) + + # Mock wordcloud.mutations to return predictable results for "target" + def mock_mutations(self, word, **kwargs): + # Return realistic mutations that would be found for "target" + return [ + [word, "dev"], # targetdev, target-dev + ["dev", word], # devtarget, dev-target + [word, "test"], # targettest, target-test + ] + + module_test.monkeypatch.setattr("bbot.core.helpers.wordcloud.WordCloud.mutations", mock_mutations) + + async def setup_after_prep(self, module_test): + # Keep request handler-based HTTP server + await super().setup_after_prep(module_test) + + # Set up DNS mocking for target.test + await module_test.mock_dns({"target.test": {"A": ["127.0.0.1"]}}) + + # Emit URL event manually and ensure resolved_hosts + from bbot.modules.base import BaseModule + + class DummyModule(BaseModule): + _name = "dummy_module_mut" + watched_events = ["SCAN"] + + async def handle_event(self, event): + if event.type == "SCAN": + url_event = self.scan.make_event( + "http://subdomain.target.test:8888/", + "URL", + parent=event, + tags=["status-200", "ip-127.0.0.1"], + ) + await self.emit_event(url_event) + + module_test.scan.modules["dummy_module_mut"] = DummyModule(module_test.scan) + + # Patch virtualhost to inject resolved hosts + vh_module = module_test.scan.modules["virtualhost"] + orig_handle_event = vh_module.handle_event + + async def patched_handle_event(ev): + ev._resolved_hosts = {"127.0.0.1"} + return await orig_handle_event(ev) + + module_test.monkeypatch.setattr(vh_module, "handle_event", patched_handle_event) + + def request_handler(self, request): + host_header = request.headers.get("Host", "").lower() + + # Baseline request to target.test (with or without port) + if not host_header or host_header in ["subdomain.target.test", "subdomain.target.test:8888"]: + return Response("baseline response from target.test", status=200) + + # Wildcard canary check + if re.match(r"[a-z]subdomain\.target\.test(?::8888)?$", host_header): # Modified target.test + return Response("wildcard canary response", status=404) + + # Mutation canary requests (4 chars + dash + original host) + if re.match(r"^[a-z]{4}-subdomain\.target\.test(?::8888)?$", host_header): + return Response("Mutation Canary", status=404) + + # Word cloud mutation matches - return different content than canary + if host_header == "subdomain-dev.target.test": + return Response("Dev target 1 found!", status=200) + if host_header == "devsubdomain.target.test": + return Response("Dev target 2 found!", status=200) + if host_header == "subdomaintest.target.test": + return Response("Test target found!", status=200) + + # Default response + return Response( + """\n404 Not Found

Not Found

Default handler response.

""", + status=404, + ) + + def check(self, module_test, events): + mutation_hosts_found = set() + for e in events: + if e.type == "VIRTUAL_HOST": + vhost = e.data["virtual_host"] + # Look for mutation patterns with dev/test + if any(word in vhost for word in ["dev", "test"]) and "target" in vhost: + mutation_hosts_found.add(vhost) + + assert len(mutation_hosts_found) >= 1, ( + f"Failed to detect mutation virtual hosts. Found: {mutation_hosts_found}" + ) + + +class TestVirtualhostWordcloud(VirtualhostTestBase): + """Test finish() wordcloud-based detection using HTTP Host headers""" + + targets = ["http://wordcloud.test:8888"] + modules_overrides = ["httpx", "virtualhost"] + config_overrides = { + "modules": { + "virtualhost": { + "subdomain_brute": False, # Focus on wordcloud only + "mutation_check": False, # Focus on wordcloud only + "special_hosts": False, # Focus on wordcloud only + "certificate_sans": False, + "wordcloud_check": True, # Enable wordcloud + "require_inaccessible": False, + } + } + } + + async def setup_after_prep(self, module_test): + # Keep request handler-based HTTP server + await super().setup_after_prep(module_test) + + # Set up DNS mocking for wordcloud.test + await module_test.mock_dns({"wordcloud.test": {"A": ["127.0.0.1"]}}) + + # Mock wordcloud to have some common words + def mock_wordcloud_keys(self): + return ["staging", "prod", "dev", "admin", "api"] + + module_test.monkeypatch.setattr("bbot.core.helpers.wordcloud.WordCloud.keys", mock_wordcloud_keys) + + # Emit URL event manually and ensure resolved_hosts + from bbot.modules.base import BaseModule + + class DummyModule(BaseModule): + _name = "dummy_module_wc" + watched_events = ["SCAN"] + + async def handle_event(self, event): + if event.type == "SCAN": + url_event = self.scan.make_event( + "http://wordcloud.test:8888/", + "URL", + parent=event, + tags=["status-200", "ip-127.0.0.1"], + ) + await self.emit_event(url_event) + + module_test.scan.modules["dummy_module_wc"] = DummyModule(module_test.scan) + + # Patch virtualhost to inject resolved hosts + vh_module = module_test.scan.modules["virtualhost"] + orig_handle_event = vh_module.handle_event + + async def patched_handle_event(ev): + ev._resolved_hosts = {"127.0.0.1"} + return await orig_handle_event(ev) + + module_test.monkeypatch.setattr(vh_module, "handle_event", patched_handle_event) + + def request_handler(self, request): + host_header = request.headers.get("Host", "").lower() + + # Baseline request to wordcloud.test (with or without port) + if not host_header or host_header in ["wordcloud.test", "wordcloud.test:8888"]: + return Response("baseline response from wordcloud.test", status=200) + + # Wildcard canary check + if re.match(r"[a-z]ordcloud\.test(?::8888)?$", host_header): # Modified wordcloud.test + return Response("wildcard canary response", status=404) + + # Random canary requests (12 chars + .com) + if re.match(r"^[a-z]{12}\.com(?::8888)?$", host_header): + return Response("random canary response", status=404) + + # Wordcloud-based matches - these are checked in finish() + if host_header in ["staging.wordcloud.test", "staging.wordcloud.test:8888"]: + return Response("Staging environment found!", status=200) + if host_header in ["prod.wordcloud.test", "prod.wordcloud.test:8888"]: + return Response("Production environment found!", status=200) + if host_header in ["dev.wordcloud.test", "dev.wordcloud.test:8888"]: + return Response("Development environment found!", status=200) + + # Default response + return Response("default response", status=404) + + def check(self, module_test, events): + wordcloud_hosts_found = set() + for e in events: + if e.type == "VIRTUAL_HOST": + vhost = e.data["virtual_host"] + if vhost in ["staging.wordcloud.test", "prod.wordcloud.test", "dev.wordcloud.test"]: + wordcloud_hosts_found.add(vhost) + + assert len(wordcloud_hosts_found) >= 1, ( + f"Failed to detect wordcloud virtual hosts. Found: {wordcloud_hosts_found}" + ) + + +class TestVirtualhostHTTPSLogic(ModuleTestBase): + """Unit tests for HTTPS/SNI-specific functions""" + + targets = ["http://localhost:8888"] # Minimal target for unit testing + modules_overrides = ["httpx", "virtualhost"] + + async def setup_before_prep(self, module_test): + pass # No special setup needed + + async def setup_after_prep(self, module_test): + pass # No HTTP mocking needed for unit tests + + def check(self, module_test, events): + # Get the virtualhost module instance for direct testing + virtualhost_module = None + for module in module_test.scan.modules.values(): + if hasattr(module, "special_virtualhost_list"): + virtualhost_module = module + break + + assert virtualhost_module is not None, "Could not find virtualhost module instance" + + # Test canary host generation for different modes + canary_subdomain = virtualhost_module._get_canary_random_host("test.example.com", ".example.com", "subdomain") + canary_mutation = virtualhost_module._get_canary_random_host("test.example.com", ".example.com", "mutation") + canary_random = virtualhost_module._get_canary_random_host("test.example.com", ".example.com", "random") + + # Verify canary patterns + assert canary_subdomain.endswith(".example.com"), ( + f"Subdomain canary doesn't end with basehost: {canary_subdomain}" + ) + assert "-test.example.com" in canary_mutation, ( + f"Mutation canary doesn't contain expected pattern: {canary_mutation}" + ) + assert canary_random.endswith(".com"), f"Random canary doesn't end with .com: {canary_random}" + + # Test that all canaries are different + assert canary_subdomain != canary_mutation != canary_random, "Canaries should be different" + + +class TestVirtualhostForceBasehost(VirtualhostTestBase): + """Test force_basehost functionality specifically""" + + targets = ["http://127.0.0.1:8888"] # Use IP to require force_basehost + modules_overrides = ["httpx", "virtualhost"] + test_wordlist = ["admin", "api"] + config_overrides = { + "modules": { + "virtualhost": { + "brute_wordlist": tempwordlist(test_wordlist), + "force_basehost": "forced.domain", # Test force_basehost functionality + "subdomain_brute": True, + "mutation_check": False, + "special_hosts": False, + "certificate_sans": False, + "wordcloud_check": False, + "require_inaccessible": False, + } + } + } + + def request_handler(self, request): + host_header = request.headers.get("Host", "").lower() + + # Baseline request to the IP + if not host_header or host_header == "127.0.0.1:8888": + return Response("baseline response from IP", status=200) + + # Wildcard canary check + if re.match(r"[0-9]27\.0\.0\.1:8888", host_header): + return Response("wildcard canary response", status=404) + + # Subdomain canary (12 random chars + .forced.domain) + if re.match(r"[a-z]{12}\.forced\.domain", host_header): + return Response("forced domain canary response", status=404) + + # Virtual hosts using forced basehost + if host_header == "admin.forced.domain": + return Response("Admin with forced basehost found!", status=200) + if host_header == "api.forced.domain": + return Response("API with forced basehost found!", status=200) + + # Default response + return Response("default response", status=404) + + def check(self, module_test, events): + forced_hosts_found = set() + for e in events: + if e.type == "VIRTUAL_HOST": + vhost = e.data["virtual_host"] + if vhost in ["admin.forced.domain", "api.forced.domain"]: + forced_hosts_found.add(vhost) + + # Verify the description shows it used the forced basehost + description = e.data["description"] + assert "Subdomain Brute-force" in description, ( + f"Expected subdomain brute-force discovery: {description}" + ) + + assert len(forced_hosts_found) >= 1, ( + f"Failed to detect virtual hosts with force_basehost. Found: {forced_hosts_found}. " + f"Expected at least one of: admin.forced.domain, api.forced.domain" + ) + + +class TestVirtualhostInterestingDefaultContent(VirtualhostTestBase): + """Test reporting of interesting default canary content during wildcard check""" + + targets = ["http://interesting.test:8888"] + modules_overrides = ["httpx", "virtualhost"] + config_overrides = { + "modules": { + "virtualhost": { + "subdomain_brute": False, + "mutation_check": False, + "special_hosts": False, + "certificate_sans": False, + "wordcloud_check": False, + "report_interesting_default_content": True, + "require_inaccessible": False, + } + } + } + + async def setup_after_prep(self, module_test): + # Start HTTP server + await super().setup_after_prep(module_test) + + # Mock DNS resolution for interesting.test + await module_test.mock_dns({"interesting.test": {"A": ["127.0.0.1"]}}) + + # Dummy module to emit the URL event for the virtualhost module + from bbot.modules.base import BaseModule + + class DummyModule(BaseModule): + _name = "dummy_module_interesting" + watched_events = ["SCAN"] + + async def handle_event(self, event): + if event.type == "SCAN": + url_event = self.scan.make_event( + "http://interesting.test:8888/", + "URL", + parent=event, + tags=["status-404", "ip-127.0.0.1"], + ) + await self.emit_event(url_event) + + module_test.scan.modules["dummy_module_interesting"] = DummyModule(module_test.scan) + + # Patch virtualhost to inject resolved hosts + vh_module = module_test.scan.modules["virtualhost"] + orig_handle_event = vh_module.handle_event + + async def patched_handle_event(ev): + ev._resolved_hosts = {"127.0.0.1"} + return await orig_handle_event(ev) + + module_test.monkeypatch.setattr(vh_module, "handle_event", patched_handle_event) + + def request_handler(self, request): + host_header = request.headers.get("Host", "").lower() + + # Baseline response for original host (ensure status differs from canary) + if not host_header or host_header in ["interesting.test", "interesting.test:8888"]: + return Response("baseline not found", status=404) + + # Wildcard canary mutated hostname: change first alpha to 'z' -> znteresting.test + if host_header in ["znteresting.test", "znteresting.test:8888"]: + long_body = ( + "This is a sufficiently long default page body that exceeds forty characters " + "to trigger the interesting default content branch." + ) + return Response(long_body, status=200) + + # Default + return Response("default response", status=404) + + def check(self, module_test, events): + found_interesting = False + found_correct_host = False + for e in events: + if e.type == "VIRTUAL_HOST": + desc = e.data.get("description", "") + if "Interesting Default Content (from intentionally-incorrect canary host)" in desc: + found_interesting = True + # The VIRTUAL_HOST should be the canary hostname used in the wildcard request + if e.data.get("virtual_host") == "znteresting.test": + found_correct_host = True + break + + assert found_interesting, "Expected VIRTUAL_HOST from interesting default canary content was not emitted" + assert found_correct_host, "virtual_host should equal the canary hostname 'znteresting.test'" + + +class TestVirtualhostKeywordWildcard(VirtualhostTestBase): + """Test keyword-based wildcard detection using 'www' in hostname""" + + targets = ["http://acme.test:8888"] + modules_overrides = ["httpx", "virtualhost"] + config_overrides = { + "modules": { + "virtualhost": { + "subdomain_brute": True, + "mutation_check": False, + "special_hosts": False, + "certificate_sans": False, + "wordcloud_check": False, + "require_inaccessible": False, + # Keep brute_lines small and supply a tiny wordlist containing a 'www' entry and an exact match + } + } + } + + async def setup_after_prep(self, module_test): + # Start HTTP server with wildcard behavior for any hostname containing 'www' + await super().setup_after_prep(module_test) + + # Mock DNS resolution for acme.test + await module_test.mock_dns({"acme.test": {"A": ["127.0.0.1"]}}) + + # Provide a tiny custom wordlist containing 'wwwfoo' and 'admin' so that: + # - 'wwwfoo' would be a false positive without the keyword-based wildcard detection + # - 'admin' will be an exact match we deliberately allow via the response handler + from .base import tempwordlist + + words = ["wwwfoo", "admin"] + wl = tempwordlist(words) + + # Patch virtualhost to use our custom wordlist and inject resolved hosts + vh_module = module_test.scan.modules["virtualhost"] + original_setup = vh_module.setup + + async def patched_setup(): + await original_setup() + vh_module.brute_wordlist = wl + return True + + module_test.monkeypatch.setattr(vh_module, "setup", patched_setup) + + # Emit URL event manually and ensure resolved_hosts + from bbot.modules.base import BaseModule + + class DummyModule(BaseModule): + _name = "dummy_module_keyword" + watched_events = ["SCAN"] + + async def handle_event(self, event): + if event.type == "SCAN": + url_event = self.scan.make_event( + "http://acme.test:8888/", + "URL", + parent=event, + tags=["status-404", "ip-127.0.0.1"], + ) + await self.emit_event(url_event) + + module_test.scan.modules["dummy_module_keyword"] = DummyModule(module_test.scan) + + # Inject resolved hosts for the URL + orig_handle_event = vh_module.handle_event + + async def patched_handle_event(ev): + ev._resolved_hosts = {"127.0.0.1"} + return await orig_handle_event(ev) + + module_test.monkeypatch.setattr(vh_module, "handle_event", patched_handle_event) + + def request_handler(self, request): + host_header = request.headers.get("Host", "").lower() + + # Baseline response for original host + if not host_header or host_header in ["acme.test", "acme.test:8888"]: + return Response("baseline not found", status=404) + + # If hostname contains 'www' anywhere, return the same body as baseline (simulating keyword wildcard) + if "www" in host_header: + return Response("baseline not found", status=404) + + # Exact-match virtual host that should still be detected + if host_header in ["admin.acme.test", "admin.acme.test:8888"]: + return Response("Admin portal", status=200) + + # Default + return Response("default response", status=404) + + def check(self, module_test, events): + found_admin = False + found_www = False + for e in events: + if e.type == "VIRTUAL_HOST": + vhost = e.data.get("virtual_host") + if vhost == "admin.acme.test": + found_admin = True + if vhost and "www" in vhost: + found_www = True + + assert found_admin, "Expected VIRTUAL_HOST for admin.acme.test was not emitted" + assert not found_www, "No VIRTUAL_HOST should be emitted for 'www' keyword wildcard entries" + + +class TestVirtualhostHTTPResponse(VirtualhostTestBase): + """Test virtual host discovery with badsecrets analysis of HTTP_RESPONSE events""" + + targets = ["http://secrets.test:8888"] + modules_overrides = ["virtualhost", "badsecrets"] + test_wordlist = ["admin"] + config_overrides = { + "modules": { + "virtualhost": { + "brute_wordlist": tempwordlist(test_wordlist), + "subdomain_brute": True, + "mutation_check": False, + "special_hosts": False, + "certificate_sans": False, + "wordcloud_check": False, + "require_inaccessible": False, + } + } + } + + async def setup_after_prep(self, module_test): + # Call parent setup_after_prep to set up the HTTP server with request_handler + await super().setup_after_prep(module_test) + + # Set up DNS mocking for secrets.test to resolve to 127.0.0.1 + await module_test.mock_dns({"secrets.test": {"A": ["127.0.0.1"]}}) + + # Create a dummy module that will emit the URL event during the scan + from bbot.modules.base import BaseModule + + class DummyModule(BaseModule): + _name = "dummy_module_secrets" + watched_events = ["SCAN"] + + async def handle_event(self, event): + if event.type == "SCAN": + # Create and emit URL event for virtualhost module to process + url_event = self.scan.make_event( + "http://secrets.test:8888/", "URL", parent=event, tags=["status-200", "ip-127.0.0.1"] + ) + await self.emit_event(url_event) + + # Add the dummy module to the scan + dummy_module = DummyModule(module_test.scan) + module_test.scan.modules["dummy_module_secrets"] = dummy_module + + # Patch virtualhost to inject resolved_hosts for URL events during the test + vh_module = module_test.scan.modules["virtualhost"] + orig_handle_event = vh_module.handle_event + + async def patched_handle_event(ev): + ev._resolved_hosts = {"127.0.0.1"} + return await orig_handle_event(ev) + + module_test.monkeypatch.setattr(vh_module, "handle_event", patched_handle_event) + + def request_handler(self, request): + from werkzeug.wrappers import Response + + host_header = request.headers.get("Host", "").lower() + + # Baseline request to secrets.test (with or without port) + if not host_header or host_header in ["secrets.test", "secrets.test:8888"]: + return Response("baseline response from secrets.test", status=200) + + # Wildcard canary check - change one character in secrets.test + if re.match(r"[a-z]ecrets\.test", host_header): + return Response("wildcard canary different response", status=404) + + # Brute-force canary requests - random string + .secrets.test (with optional port) + if re.match(r"^[a-z]{12}\.secrets\.test(?::8888)?$", host_header): + return Response("subdomain canary response", status=404) + + # Virtual host with vulnerable JWT cookie and JWT in body - both using weak secret '1234' - this should trigger badsecrets twice + if host_header in ["admin.secrets.test", "admin.secrets.test:8888"]: + return Response( + "

Admin Panel

", + status=200, + headers={ + "set-cookie": "vulnjwt=eyJhbGciOiJIUzI1NiJ9.eyJJc3N1ZXIiOiJJc3N1ZXIiLCJVc2VybmFtZSI6IkJhZFNlY3JldHMiLCJleHAiOjE1OTMxMzM0ODMsImlhdCI6MTQ2NjkwMzA4M30.ovqRikAo_0kKJ0GVrAwQlezymxrLGjcEiW_s3UJMMCo; secure" + }, + ) + + # Default response + return Response("default response", status=404) + + def check(self, module_test, events): + virtual_host_found = False + http_response_found = False + jwt_cookie_vuln_found = False + jwt_body_vuln_found = False + + # Debug: print all events to see what we're getting + print(f"\n=== DEBUG: Found {len(events)} events ===") + for e in events: + print(f"Event: {e.type} - {e.data}") + if hasattr(e, "tags"): + print(f" Tags: {e.tags}") + + for e in events: + # Check for virtual host discovery + if e.type == "VIRTUAL_HOST": + vhost = e.data["virtual_host"] + if vhost in ["admin.secrets.test"]: + virtual_host_found = True + # Verify it has the virtual-host tag + assert "virtual-host" in e.tags, f"VIRTUAL_HOST event missing virtual-host tag: {e.tags}" + + # Check for HTTP_RESPONSE with virtual-host tag + elif e.type == "HTTP_RESPONSE": + if "virtual-host" in e.tags: + http_response_found = True + # Verify the HTTP_RESPONSE has the expected format + assert "input" in e.data, f"HTTP_RESPONSE missing input field: {e.data}" + assert e.data["input"] == "admin.secrets.test", f"HTTP_RESPONSE input mismatch: {e.data['input']}" + assert "status_code" in e.data, f"HTTP_RESPONSE missing status_code: {e.data}" + assert e.data["status_code"] == 200, f"HTTP_RESPONSE status_code mismatch: {e.data['status_code']}" + # Debug: print the response data to see what badsecrets is analyzing + print(f"HTTP_RESPONSE data: {e.data}") + + # Check for badsecrets vulnerability findings + elif e.type == "VULNERABILITY": + print(f"Found VULNERABILITY event: {e.data}") + description = e.data["description"] + + # Check for JWT vulnerability (from cookie) + if ( + "1234" in description + and "eyJhbGciOiJIUzI1NiJ9.eyJJc3N1ZXIiOiJJc3N1ZXIiLCJVc2VybmFtZSI6IkJhZFNlY3JldHMiLCJleHAiOjE1OTMxMzM0ODMsImlhdCI6MTQ2NjkwMzA4M30.ovqRikAo_0kKJ0GVrAwQlezymxrLGjcEiW_s3UJMMCo" + in description + and "JWT" in description + ): + jwt_cookie_vuln_found = True + + # Check for JWT vulnerability (from body) + if ( + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoxMjMsInVzZXJuYW1lIjoiYWRtaW4iLCJleHAiOjE1OTMxMzM0ODMsImlhdCI6MTQ2NjkwMzA4M30.03xPSXavrMk0HK4BD3_hPKgu3RLu6CmTSPGfrDx2qpg" + in description + and "JWT" in description + ): + jwt_body_vuln_found = True + + assert virtual_host_found, "Failed to detect virtual host admin.secrets.test" + assert http_response_found, "Failed to detect HTTP_RESPONSE event with virtual-host tag" + assert jwt_cookie_vuln_found, ( + "Failed to detect JWT vulnerability - JWT with weak secret '1234' should have been found" + ) + assert jwt_body_vuln_found, ( + "Failed to detect JWT vulnerability in body - JWT 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoxMjMsInVzZXJuYW1lIjoiYWRtaW4iLCJleHAiOjE1OTMxMzM0ODMsImlhdCI6MTQ2NjkwMzA4M30.03xPSXavrMk0HK4BD3_hPKgu3RLu6CmTSPGfrDx2qpg' should have been found" + ) + print( + f"Test results: virtual_host_found={virtual_host_found}, http_response_found={http_response_found}, jwt_cookie_vuln_found={jwt_cookie_vuln_found}, jwt_body_vuln_found={jwt_body_vuln_found}" + ) diff --git a/bbot/test/test_step_2/module_tests/test_module_waf_bypass.py b/bbot/test/test_step_2/module_tests/test_module_waf_bypass.py new file mode 100644 index 0000000000..da812633bb --- /dev/null +++ b/bbot/test/test_step_2/module_tests/test_module_waf_bypass.py @@ -0,0 +1,133 @@ +from .base import ModuleTestBase +from bbot.modules.base import BaseModule +import json + + +class TestWAFBypass(ModuleTestBase): + targets = ["protected.test", "direct.test"] + module_name = "waf_bypass" + modules_overrides = ["waf_bypass", "httpx"] + config_overrides = { + "scope": {"report_distance": 2}, + "modules": {"waf_bypass": {"search_ip_neighbors": True, "neighbor_cidr": 30}}, + } + + PROTECTED_IP = "127.0.0.129" + DIRECT_IP = "127.0.0.2" + + api_response_direct = { + "asn": 15169, + "subnets": ["127.0.0.0/25"], + "asn_name": "ACME-ORG", + "org": "ACME-ORG", + "country": "US", + } + + api_response_cloudflare = { + "asn": 13335, + "asn_name": "CLOUDFLARENET", + "country": "US", + "ip": "127.0.0.129", + "org": "Cloudflare, Inc.", + "rir": "ARIN", + "subnets": ["127.0.0.128/25"], + } + + class DummyModule(BaseModule): + watched_events = ["DNS_NAME"] + _name = "dummy_module" + events_seen = [] + + async def handle_event(self, event): + if event.data == "protected.test": + await self.helpers.sleep(0.5) + self.events_seen.append(event.data) + url = "http://protected.test:8888/" + url_event = self.scan.make_event( + url, "URL", parent=self.scan.root_event, tags=["cdn-cloudflare", "in-scope", "status-200"] + ) + if url_event is not None: + await self.emit_event(url_event) + + elif event.data == "direct.test": + await self.helpers.sleep(0.5) + self.events_seen.append(event.data) + url = "http://direct.test:8888/" + url_event = self.scan.make_event( + url, "URL", parent=self.scan.root_event, tags=["in-scope", "status-200"] + ) + if url_event is not None: + await self.emit_event(url_event) + + async def setup_after_prep(self, module_test): + from bbot.core.helpers.asn import ASNHelper + + await module_test.mock_dns( + { + "protected.test": {"A": [self.PROTECTED_IP]}, + "direct.test": {"A": [self.DIRECT_IP]}, + "": {"A": []}, + } + ) + + self.module_test = module_test + + self.dummy_module = self.DummyModule(module_test.scan) + module_test.scan.modules["dummy_module"] = self.dummy_module + + module_test.monkeypatch.setattr(ASNHelper, "asndb_ip_url", "http://127.0.0.1:8888/v1/ip/") + + expect_args = {"method": "GET", "uri": "/v1/ip/127.0.0.2"} + respond_args = { + "response_data": json.dumps(self.api_response_direct), + "status": 200, + "content_type": "application/json", + } + module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args) + + expect_args = {"method": "GET", "uri": "/", "headers": {"Host": "protected.test"}} + respond_args = {"status": 200, "response_data": "HELLO THERE!"} + module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args) + + # Patch WAF bypass get_url_content to control similarity outcome + waf_module = module_test.scan.modules["waf_bypass"] + + async def fake_get_url_content(self_waf, url, ip=None): + if "protected.test" in url and (ip == None or ip == "127.0.0.1"): + return {"response_data": "PROTECTED CONTENT!", "http_code": 200} + else: + return {"response_data": "ERROR!", "http_code": 404} + + import types + + module_test.monkeypatch.setattr( + waf_module, + "get_url_content", + types.MethodType(fake_get_url_content, waf_module), + raising=True, + ) + + # 7. Monkeypatch tldextract so base_domain is never empty + def fake_tldextract(domain): + import types as _t + + return _t.SimpleNamespace(top_domain_under_public_suffix=domain) + + module_test.monkeypatch.setattr( + waf_module.helpers, + "tldextract", + fake_tldextract, + raising=True, + ) + + def check(self, module_test, events): + waf_bypass_events = [e for e in events if e.type == "VULNERABILITY"] + assert waf_bypass_events, "No VULNERABILITY event produced" + + correct_description = [ + e + for e in waf_bypass_events + if "WAF Bypass Confirmed - Direct IPs: 127.0.0.1 for http://protected.test:8888/. Similarity 100.00%" + in e.data["description"] + ] + assert correct_description, "Incorrect description"