Skip to content

IPMI Scanner Bug Fixes#21526

Open
zeroSteiner wants to merge 3 commits into
rapid7:masterfrom
zeroSteiner:fix/ipmi/scanner-multi
Open

IPMI Scanner Bug Fixes#21526
zeroSteiner wants to merge 3 commits into
rapid7:masterfrom
zeroSteiner:fix/ipmi/scanner-multi

Conversation

@zeroSteiner
Copy link
Copy Markdown
Contributor

@zeroSteiner zeroSteiner commented Jun 2, 2026

This makes a few improvements to the IPMI scanner modules as described below.

auxiliary/scanner/ipmi/ipmi_cipher_zero

This module had an issue where if a malformed reply was received it would abort the operation meaning that if a legitimate reply was received afterwards that it would not be processed.

  • Use the ipmi_udp_harness.py script, python tools/dev/ipmi_udp_harness.py --mode cipher_zero --profile malformed_then_valid --listen 127.0.0.1 --port 623
  • Use the ipmi_cipher_zero module and target the local harness
  • See in the patched version that there's a malformed reply that's received followed by a successful reply

auxiliary/scanner/ipmi/ipmi_dumphashes

This module would run against a target that is not an IPMI service. In that case, it can appear hung due to how long it takes to run as it makes repeated requests to a target it can't communicate with. The update here aborts the operation more quickly when there isn't evidence that the target is IPMI.

  • Use the ipmi_udp_harness.py script, python tools/dev/ipmi_udp_harness.py --mode dumphashes --profile silent --listen 127.0.0.1 --port 623
  • Use the auxiliary/scanner/ipmi/ipmi_dumphashes module and target the local harness
  • Run the module and see it return in a matter of seconds, without this patch it would appear to be hung

Multiple

Addresses use Rex::Socket.to_authority to report the host and port combination so when there is an IPv6 address, it's correctly encapsulated in brackets.

Verification

List the steps needed to make sure this thing works

  • Start msfconsole
  • Run through the steps to demonstrate the original issue and the fix as outlined above
ipmi_udp_harness.py
#!/usr/bin/env python3
"""Configurable UDP harness for stressing Metasploit IPMI scanners.

Examples:
  python tools/dev/ipmi_udp_harness.py --mode version --profile valid
  python tools/dev/ipmi_udp_harness.py --mode cipher_zero --profile malformed
  python tools/dev/ipmi_udp_harness.py --mode dumphashes --profile flappy
"""

from __future__ import annotations

import argparse
import random
import socketserver
import struct
import sys
import time
from dataclasses import dataclass
from typing import Dict, List

PAYLOAD_RMCPPLUSOPEN_REQ = 0x10
PAYLOAD_RMCPPLUSOPEN_REP = 0x11
PAYLOAD_RAKP1 = 0x12
PAYLOAD_RAKP2 = 0x13


@dataclass
class Reply:
    payload: bytes
    delay: float = 0.0


@dataclass
class ClientState:
    packet_count: int = 0
    stage: int = 0


def pack_u24(value: int) -> bytes:
    return struct.pack("<I", value & 0xFFFFFF)[:3]


def make_random_bytes(length: int) -> bytes:
    return bytes(random.getrandbits(8) for _ in range(length))


def build_version_reply() -> bytes:
    body = bytes(
        [
            0x20, 0x00, 0x00, 0x81, 0x00, 0x38, 0x00, 0x0E, 0xFF, 0x07, 0x03
        ]
    ) + pack_u24(0x01020304) + b"\x00\x00\x00\x00"

    header = bytes([0x06, 0x00, 0xFF, 0x07, 0x00])
    return header + struct.pack("<I", 0) + struct.pack("<I", 0) + bytes([len(body)]) + body


def build_open_session_reply(
    *,
    error_code: int = 0,
    payload_type: int = PAYLOAD_RMCPPLUSOPEN_REP,
    console_session_id: bytes = b"CON1",
    bmc_session_id: bytes = b"BMC1",
    data: bytes | None = None,
) -> bytes:
    if data is None:
        data = console_session_id + bmc_session_id

    header = bytes([0x06, 0x00, 0xFF, 0x07, 0x06, payload_type & 0x3F])
    header += struct.pack("<I", 0x11111111)
    header += struct.pack("<I", 0x22222222)

    body = bytes([0x00, error_code & 0xFF]) + struct.pack("<H", 0x0000) + data
    return header + struct.pack("<H", len(body)) + body


def build_rakp2_reply() -> bytes:
    data = (
        b"CON1"
        + b"RANDOM-RANDOM-01"
        + b"0123456789ABCDEF"
        + b"0123456789ABCDEF0123"
    )
    return build_open_session_reply(
        error_code=0,
        payload_type=PAYLOAD_RAKP2,
        data=data,
    )


class IPMIHarnessScenario:
    def __init__(self, mode: str, profile: str, delay: float, duplicate_count: int, truncate_length: int, error_code: int):
        self.mode = mode
        self.profile = profile
        self.delay = delay
        self.duplicate_count = max(1, duplicate_count)
        self.truncate_length = max(1, truncate_length)
        self.error_code = error_code

    def replies_for(self, state: ClientState) -> List[Reply]:
        state.packet_count += 1

        if self.profile == "silent":
            return []

        if self.profile == "flappy":
            slot = state.packet_count % 5
            if slot == 1:
                return []
            if slot == 2:
                return [Reply(self.truncated_payload())]
            if slot == 3:
                return [Reply(self.valid_payload())]
            if slot == 4:
                return [Reply(self.valid_payload()) for _ in range(self.duplicate_count)]
            return [Reply(self.valid_payload(), delay=self.delay)]

        if self.profile == "malformed":
            return [Reply(self.truncated_payload() if self.truncate_length else make_random_bytes(8))]

        if self.profile == "malformed_then_valid":
            # Emit a malformed straggler immediately followed by a legitimate
            # reply from the same host. A scanner that aborts on the malformed
            # packet will never process (and never report) the valid one.
            return [Reply(self.truncated_payload()), Reply(self.valid_payload())]

        if self.profile == "duplicate":
            return [Reply(self.valid_payload()) for _ in range(self.duplicate_count)]

        if self.profile == "delayed":
            return [Reply(self.valid_payload(), delay=self.delay)]

        if self.profile == "error":
            return [Reply(self.error_payload())]

        return [Reply(self.valid_payload())]

    def valid_payload(self) -> bytes:
        if self.mode == "version":
            return build_version_reply()
        if self.mode == "cipher_zero":
            return build_open_session_reply(error_code=0)
        return self.dumphashes_payload()

    def error_payload(self) -> bytes:
        if self.mode == "version":
            return build_version_reply()
        if self.mode == "cipher_zero":
            return build_open_session_reply(error_code=self.error_code)
        return build_open_session_reply(error_code=self.error_code)

    def truncated_payload(self) -> bytes:
        return self.valid_payload()[: self.truncate_length]

    def dumphashes_payload(self) -> bytes:
        if self.mode != "dumphashes":
            return build_open_session_reply(error_code=0)

        payload = build_open_session_reply(error_code=0) if self._next_stage_is_open_session() else build_rakp2_reply()
        return payload

    def _next_stage_is_open_session(self) -> bool:
        # Stage alternates per client: open session first, then RAKP2.
        return True


class IPMIUDPServer(socketserver.ThreadingMixIn, socketserver.UDPServer):
    allow_reuse_address = True
    daemon_threads = True

    def __init__(self, server_address, handler_class, scenario: IPMIHarnessScenario):
        super().__init__(server_address, handler_class)
        self.scenario = scenario
        self.client_states: Dict[tuple, ClientState] = {}

    def state_for(self, client_address) -> ClientState:
        state = self.client_states.get(client_address)
        if state is None:
            state = ClientState()
            self.client_states[client_address] = state
        return state


class HarnessHandler(socketserver.BaseRequestHandler):
    def handle(self) -> None:
        data, sock = self.request
        state = self.server.state_for(self.client_address)

        if self.server.scenario.mode == "dumphashes":
            replies = self._dumphashes_replies(data, state)
        else:
            replies = self.server.scenario.replies_for(state)

        for reply in replies:
            if reply.delay > 0:
                time.sleep(reply.delay)
            sock.sendto(reply.payload, self.client_address)

    def _dumphashes_replies(self, data: bytes, state: ClientState) -> List[Reply]:
        state.packet_count += 1

        if self.server.scenario.profile == "silent":
            return []

        if self.server.scenario.profile == "flappy":
            slot = state.packet_count % 5
            if slot == 1:
                return []
            if slot == 2:
                return [Reply(build_open_session_reply(error_code=0)[: self.server.scenario.truncate_length])]
            if slot == 3:
                return [Reply(build_open_session_reply(error_code=0))]
            if slot == 4:
                return [Reply(build_rakp2_reply()) for _ in range(self.server.scenario.duplicate_count)]
            return [Reply(build_rakp2_reply(), delay=self.server.scenario.delay)]

        if state.stage == 0:
            state.stage = 1
            if self.server.scenario.profile == "malformed":
                return [Reply(build_open_session_reply(error_code=0)[: self.server.scenario.truncate_length])]
            if self.server.scenario.profile == "delayed":
                return [Reply(build_open_session_reply(error_code=0), delay=self.server.scenario.delay)]
            if self.server.scenario.profile == "duplicate":
                return [Reply(build_open_session_reply(error_code=0)) for _ in range(self.server.scenario.duplicate_count)]
            if self.server.scenario.profile == "error":
                return [Reply(build_open_session_reply(error_code=self.server.scenario.error_code))]
            return [Reply(build_open_session_reply(error_code=0))]

        if state.stage == 1:
            state.stage = 2
            if self.server.scenario.profile == "malformed":
                return [Reply(build_rakp2_reply()[: self.server.scenario.truncate_length])]
            if self.server.scenario.profile == "delayed":
                return [Reply(build_rakp2_reply(), delay=self.server.scenario.delay)]
            if self.server.scenario.profile == "duplicate":
                return [Reply(build_rakp2_reply()) for _ in range(self.server.scenario.duplicate_count)]
            return [Reply(build_rakp2_reply())]

        return [Reply(build_rakp2_reply())]


def parse_args(argv: List[str]) -> argparse.Namespace:
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument("--listen", default="0.0.0.0", help="Address to bind")
    parser.add_argument("--port", type=int, default=623, help="UDP port to bind")
    parser.add_argument("--mode", choices=["version", "cipher_zero", "dumphashes"], required=True)
    parser.add_argument(
        "--profile",
        choices=["valid", "silent", "malformed", "malformed_then_valid", "delayed", "duplicate", "error", "flappy"],
        default="valid",
    )
    parser.add_argument("--delay", type=float, default=1.5, help="Delay for delayed replies")
    parser.add_argument("--duplicate-count", type=int, default=2, help="How many times to duplicate responses")
    parser.add_argument("--truncate-length", type=int, default=12, help="Length used for truncated replies")
    parser.add_argument("--error-code", type=int, default=0x11, help="Error code used by error replies")
    parser.add_argument("--seed", type=int, default=1, help="Random seed for malformed payloads")
    return parser.parse_args(argv)


def main(argv: List[str]) -> int:
    args = parse_args(argv)
    random.seed(args.seed)
    scenario = IPMIHarnessScenario(
        mode=args.mode,
        profile=args.profile,
        delay=args.delay,
        duplicate_count=args.duplicate_count,
        truncate_length=args.truncate_length,
        error_code=args.error_code,
    )

    with IPMIUDPServer((args.listen, args.port), HarnessHandler, scenario) as server:
        print(f"Listening on {args.listen}:{args.port} for mode={args.mode} profile={args.profile}")
        try:
            server.serve_forever()
        except KeyboardInterrupt:
            return 130
    return 0


if __name__ == "__main__":
    raise SystemExit(main(sys.argv[1:]))

Demo

image

Ran through both tests, new on the left, old on the right.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves robustness and output correctness across the IPMI scanner modules by hardening parsing against malformed UDP replies, shortening unresponsive-target behavior in ipmi_dumphashes, and standardizing host:port formatting via Rex::Socket.to_authority (IPv6-safe).

Changes:

  • Add RSpec coverage for malformed-reply handling (ipmi_cipher_zero) and early-abort behavior on silent targets (ipmi_dumphashes).
  • Prevent ipmi_cipher_zero from aborting when a malformed Open Session reply is received.
  • Use Rex::Socket.to_authority(host, port) in IPMI scanners so IPv6 addresses are bracketed correctly in output.

Impact Analysis:

  • Blast radius: medium — affects runtime behavior/output of IPMI auxiliary scanner modules (ipmi_cipher_zero, ipmi_dumphashes, ipmi_version) and adds new spec coverage for those scanners.
  • Data and contract effects: low — no schema/payload changes; scanner output strings change (authority formatting) and ipmi_dumphashes may abort enumeration sooner on targets deemed non-IPMI.
  • Rollback and test focus: rollback is straightforward (module-only changes); focus testing on (1) malformed-then-valid reply sequences for ipmi_cipher_zero, and (2) targets that respond with IPMI “refusal”/short Open Session replies and then go silent (to avoid false-negative early abort).

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
spec/modules/auxiliary/scanner/ipmi/ipmi_dumphashes_spec.rb Adds regression coverage to ensure enumeration stops quickly when the target never answers the initial Open Session probe.
spec/modules/auxiliary/scanner/ipmi/ipmi_cipher_zero_spec.rb Adds regression coverage to ensure malformed replies don’t raise/abort and a later valid reply is still reported.
modules/auxiliary/scanner/ipmi/ipmi_version.rb Switches output to Rex::Socket.to_authority for IPv6-safe host:port formatting.
modules/auxiliary/scanner/ipmi/ipmi_dumphashes.rb Adds early-abort logic when there’s no evidence the target is IPMI, and updates output to to_authority.
modules/auxiliary/scanner/ipmi/ipmi_cipher_zero.rb Wraps reply parsing to ignore malformed packets safely and updates output to to_authority.

Comment on lines +122 to +126
if seen_valid_open_session
ipmi_status("No response to IPMI open session request for username #{username}")
else
ipmi_error("No response to IPMI open session request; stopping username enumeration")
stop_username_enumeration = true
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Comment on lines 149 to +150
session_succeeded = true
seen_valid_open_session = true
Comment on lines 48 to 50
unless info.ipmi_command == 56
vprint_error "#{shost}:#{rport} - IPMI - Invalid response"
vprint_error "#{Rex::Socket.to_authority(shost, rport)} - IPMI - Invalid response"
return
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure in this context the 's' in shost is for scan-host. It's confusing but the original module printed shost:rport not rhost:rport so either way this is not a regression.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Todo

Development

Successfully merging this pull request may close these issues.

4 participants