Skip to content

Add a module for pivoting over a Quectel Modem#21523

Open
zeroSteiner wants to merge 15 commits into
rapid7:masterfrom
zeroSteiner:feat/mod/cell-socks
Open

Add a module for pivoting over a Quectel Modem#21523
zeroSteiner wants to merge 15 commits into
rapid7:masterfrom
zeroSteiner:feat/mod/cell-socks

Conversation

@zeroSteiner
Copy link
Copy Markdown
Contributor

This adds a new "modem" session type that allows you to open a session to an arbitrary modem and pivot your network traffic over it as if it were a Meterpreter session. I have testing hardware but I've also had Claude create an emulator that can be used for testing.

Verification

List the steps needed to make sure this thing works

  • Start the included emulator: python modem_emulator.py --debug
  • Use the new module, set SERIAL to /tmp/ttyModem from the emulator
  • Run the module and get a sessoin
  • Route your traffic through it and follow the debug lines to see it working
modem_emulator.py
#!/usr/bin/env python3
"""
Quectel Modem Emulator

Creates a virtual serial port (pty) and emulates the Quectel AT command protocol
used by modules/auxiliary/server/quectel_modem.rb, forwarding traffic through real
OS sockets. Use this to test the module without physical hardware.

Usage:
    python3 tools/hardware/modem_emulator.py --serial /tmp/ttyModem --debug

Then in msfconsole:
    use auxiliary/server/quectel_modem
    set SERIAL /tmp/ttyModem
    set STARTUP_OK_TIMEOUT_S 5
    run
"""

import argparse
import os
import pty
import random
import re
import select
import signal
import socket
import sys
import termios
import threading
import time
import tty


def _log(msg, prefix='[EMU]', file=sys.stderr):
    print(f'{prefix} {msg}', file=file, flush=True)


def _jitter_sleep(mean_ms, stddev_ms):
    if mean_ms <= 0 and stddev_ms <= 0:
        return
    delay_ms = max(0.0, random.gauss(mean_ms, stddev_ms))
    time.sleep(delay_ms / 1000.0)


class SIDPool:
    _PENDING = object()

    def __init__(self, n=12):
        self._lock = threading.Lock()
        self._free = list(range(n))
        self._table = {}

    def allocate(self):
        with self._lock:
            if not self._free:
                return None
            sid = self._free.pop(0)
            self._table[sid] = self._PENDING
            return sid

    def register(self, sid, fwd):
        with self._lock:
            self._table[sid] = fwd

    def release(self, sid):
        with self._lock:
            self._table.pop(sid, None)
            if sid not in self._free:
                self._free.append(sid)

    def get(self, sid):
        with self._lock:
            v = self._table.get(sid)
            return None if (v is None or v is self._PENDING) else v

    def is_allocated(self, sid):
        with self._lock:
            return sid in self._table

    def all_forwarders(self):
        with self._lock:
            return [v for v in self._table.values() if v is not self._PENDING]


class SocketForwarder:
    def __init__(self, sid, sock, proto, emulator):
        self.sid = sid
        self.sock = sock
        self.proto = proto
        self._emulator = emulator
        self._stop = threading.Event()
        self._thread = threading.Thread(
            target=self._forward_loop,
            daemon=True,
            name=f'modem-fwd-{sid}',
        )

    def start(self):
        self._thread.start()

    def stop(self):
        self._stop.set()

    def _forward_loop(self):
        args = self._emulator.args
        debug = args.debug
        sid = self.sid
        while not self._stop.is_set():
            try:
                r, _, _ = select.select([self.sock], [], [], 0.1)
            except (OSError, ValueError):
                break
            if not r:
                continue
            try:
                data = self.sock.recv(65536)
            except OSError as e:
                if not self._stop.is_set() and debug:
                    _log(f'[SID {sid}] recv error: {e}')
                data = b''
            if not data:
                # Only emit closed URC for a genuine remote close, not a local QICLOSE.
                if not self._stop.is_set():
                    if debug:
                        _log(f'[SID {sid}] remote closed')
                    urc = f'+QIURC: "closed",{sid}\r\n'.encode()
                    self._emulator._write(urc)
                    self._emulator.sid_pool.release(sid)
                break
            _jitter_sleep(args.jitter_mean, args.jitter_stddev)
            if debug:
                _log(f'[SID {sid}] recv {len(data)} bytes → URC')
            # Emit header + payload atomically in one _write call
            urc_header = f'+QIURC: "recv",{sid},{len(data)}\r\n'.encode()
            self._emulator._write(urc_header + data)


class ModemEmulator:
    # AT+QIOPEN=1,SID,"TCP"|"UDP","HOST",PORT,0,1
    _RE_QIOPEN = re.compile(
        r'^AT\+QIOPEN=\d+,(\d+),"(TCP|UDP)","([^"]+)",(\d+),\d+,\d+$',
        re.IGNORECASE,
    )
    # AT+QICLOSE=SID,N
    _RE_QICLOSE = re.compile(r'^AT\+QICLOSE=(\d+),\d+$', re.IGNORECASE)
    # AT+QISEND=SID,N
    _RE_QISEND = re.compile(r'^AT\+QISEND=(\d+),(\d+)$', re.IGNORECASE)

    def __init__(self, args):
        self.args = args
        self.master_fd = None
        self.slave_fd = None
        self.slave_path = None
        self._write_lock = threading.Lock()
        self.sid_pool = SIDPool(args.sockets)
        self._stop = threading.Event()
        self._reader_thread = None

    def setup_pty(self):
        self.master_fd, self.slave_fd = pty.openpty()
        self.slave_path = os.ttyname(self.slave_fd)

        # Set raw mode on slave before Ruby opens it; prevents echo/line-discipline
        # translation during the window before Ruby calls configure_serial_port().
        tty.setraw(self.slave_fd)

        # Set baud rate cosmetically so Ruby's TCGETS reads something sensible.
        attrs = termios.tcgetattr(self.slave_fd)
        baud_const = _baud_to_termios(self.args.baud)
        if baud_const is not None:
            attrs[4] = baud_const
            attrs[5] = baud_const
            termios.tcsetattr(self.slave_fd, termios.TCSANOW, attrs)

        serial_path = self.args.serial
        if os.path.islink(serial_path) or os.path.exists(serial_path):
            os.unlink(serial_path)
        os.symlink(self.slave_path, serial_path)

        _log(f'Emulator ready at {serial_path} -> {self.slave_path}', prefix='[EMU]')

    def start(self):
        # Emit RDY URC so Ruby's best-effort wait_for_rdy() can catch it.
        self._write(b'RDY\r\n')
        self._reader_thread = threading.Thread(
            target=self._reader_loop, daemon=True, name='modem-reader'
        )
        self._reader_thread.start()

    def stop(self):
        self._stop.set()
        try:
            os.close(self.master_fd)
        except OSError:
            pass
        if self._reader_thread:
            self._reader_thread.join(timeout=2.0)
        for fwd in self.sid_pool.all_forwarders():
            fwd.stop()
            try:
                fwd.sock.close()
            except OSError:
                pass
        try:
            os.unlink(self.args.serial)
        except FileNotFoundError:
            pass
        try:
            os.close(self.slave_fd)
        except OSError:
            pass

    def _write(self, data: bytes):
        with self._write_lock:
            try:
                os.write(self.master_fd, data)
            except OSError:
                pass

    def _read_exactly(self, n: int) -> bytes:
        buf = b''
        while len(buf) < n:
            try:
                chunk = os.read(self.master_fd, n - len(buf))
            except OSError as e:
                raise EOFError(f'pty closed mid-read: {e}')
            if not chunk:
                raise EOFError('pty closed mid-read')
            buf += chunk
        return buf

    def _reader_loop(self):
        buf = bytearray()
        debug = self.args.debug
        while not self._stop.is_set():
            try:
                ch = os.read(self.master_fd, 1)
            except OSError:
                break
            if not ch:
                break
            if ch == b'\r':
                line = buf.decode('ascii', errors='replace').strip()
                buf.clear()
                if line:
                    if debug:
                        _log(f'→ {line!r}')
                    self._handle_cmd(line)
            elif ch != b'\n':
                buf.extend(ch)

    def _handle_cmd(self, line: str):
        debug = self.args.debug

        if line.upper() in ('AT', 'AT\r'):
            self._write(b'OK\r\n')
            return

        if line.upper() == 'ATE0':
            self._write(b'OK\r\n')
            return

        m = self._RE_QIOPEN.match(line)
        if m:
            self._cmd_qiopen(m)
            return

        m = self._RE_QICLOSE.match(line)
        if m:
            self._cmd_qiclose(m)
            return

        m = self._RE_QISEND.match(line)
        if m:
            self._cmd_qisend(m)
            return

        if debug:
            _log(f'unknown command: {line!r}')
        self._write(b'ERROR\r\n')

    def _cmd_qiopen(self, m):
        sid = int(m.group(1))
        proto = m.group(2).upper()
        host = m.group(3)
        port = int(m.group(4))
        debug = self.args.debug

        # Check if this exact SID is already in use
        if self.sid_pool.is_allocated(sid):
            if debug:
                _log(f'[SID {sid}] already allocated')
            self._write(b'ERROR\r\n')
            return

        # Allocate the specific SID requested by Ruby
        # The pool uses FIFO allocation; we need to honor the SID Ruby chose.
        # We'll allocate the next free SID and verify it matches, or skip if wrong.
        # Actually, Ruby allocates SIDs sequentially from its own free pool and
        # passes the chosen SID in the command — we must register that specific SID.
        with self.sid_pool._lock:
            if sid in self.sid_pool._free:
                self.sid_pool._free.remove(sid)
                self.sid_pool._table[sid] = SIDPool._PENDING
            elif sid not in self.sid_pool._table:
                # SID not in free list and not allocated — shouldn't happen with
                # a fresh pool but handle gracefully.
                self.sid_pool._table[sid] = SIDPool._PENDING
            else:
                pass  # already marked pending by a prior allocate() call

        # Respond OK immediately; URC arrives after the real socket connects.
        self._write(b'OK\r\n')

        def connect_and_emit():
            sock = None
            err = 3
            try:
                if proto == 'TCP':
                    sock = socket.create_connection((host, port), timeout=self.args.connect_timeout)
                else:
                    # UDP: create an unconnected UDP socket, then "connect" to set default peer
                    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
                    sock.connect((host, port))
                err = 0
            except OSError as e:
                if debug:
                    _log(f'[SID {sid}] connect failed: {e}')
                sock = None
                err = 3

            if err == 0 and sock:
                fwd = SocketForwarder(sid, sock, proto, self)
                self.sid_pool.register(sid, fwd)
                fwd.start()
                if debug:
                    _log(f'[SID {sid}] {proto} connected to {host}:{port}')
            else:
                self.sid_pool.release(sid)

            urc = f'+QIOPEN: {sid},{err}\r\n'.encode()
            self._write(urc)

        t = threading.Thread(target=connect_and_emit, daemon=True, name=f'modem-conn-{sid}')
        t.start()

    def _cmd_qiclose(self, m):
        sid = int(m.group(1))
        debug = self.args.debug

        if not self.sid_pool.is_allocated(sid):
            if debug:
                _log(f'[SID {sid}] QICLOSE on unallocated SID → ERROR')
            self._write(b'ERROR\r\n')
            return

        fwd = self.sid_pool.get(sid)
        self.sid_pool.release(sid)
        if fwd:
            fwd.stop()
            try:
                fwd.sock.close()
            except OSError:
                pass
        if debug:
            _log(f'[SID {sid}] closed')
        self._write(b'OK\r\n')

    def _cmd_qisend(self, m):
        sid = int(m.group(1))
        n = int(m.group(2))
        debug = self.args.debug

        fwd = self.sid_pool.get(sid)
        if fwd is None:
            if debug:
                _log(f'[SID {sid}] QISEND on unallocated/pending SID → ERROR')
            self._write(b'ERROR\r\n')
            return

        # Send prompt — single byte, no CRLF
        self._write(b'>')

        try:
            # Read exactly n payload bytes + 1 Ctrl-Z (0x1A)
            raw = self._read_exactly(n + 1)
        except EOFError as e:
            if debug:
                _log(f'[SID {sid}] QISEND read error: {e}')
            self._write(b'SEND FAIL\r\n')
            return

        payload = raw[:-1]  # strip Ctrl-Z

        if debug:
            _log(f'[SID {sid}] QISEND {len(payload)} bytes')

        try:
            fwd.sock.sendall(payload)
            self._write(b'SEND OK\r\n')
        except OSError as e:
            if debug:
                _log(f'[SID {sid}] send error: {e}')
            self._write(b'SEND FAIL\r\n')


def _baud_to_termios(baud):
    mapping = {
        9600:   termios.B9600,
        19200:  termios.B19200,
        38400:  termios.B38400,
        57600:  termios.B57600,
        115200: termios.B115200,
        230400: termios.B230400,
    }
    return mapping.get(baud)


def main():
    parser = argparse.ArgumentParser(
        description='Quectel modem emulator for testing modules/auxiliary/server/quectel_modem.rb'
    )
    parser.add_argument(
        '--serial', default='/tmp/ttyModem',
        help='Symlink path exposed as the serial device (default: /tmp/ttyModem)',
    )
    parser.add_argument(
        '--baud', type=int, default=115200,
        help='Baud rate (informational only; pty ignores it for throughput)',
    )
    parser.add_argument(
        '--sockets', type=int, default=12,
        help='SID pool size — must match MODEM_SOCKETS in msfconsole (default: 12)',
    )
    parser.add_argument(
        '--jitter-mean', type=float, default=0.0, dest='jitter_mean',
        help='Mean receive jitter in milliseconds (default: 0)',
    )
    parser.add_argument(
        '--jitter-stddev', type=float, default=0.0, dest='jitter_stddev',
        help='Jitter standard deviation in milliseconds (default: 0)',
    )
    parser.add_argument(
        '--connect-timeout', type=float, default=10.0, dest='connect_timeout',
        help='Real socket connect timeout in seconds (default: 10)',
    )
    parser.add_argument(
        '--debug', action='store_true',
        help='Verbose protocol logging to stderr',
    )
    args = parser.parse_args()

    stop_event = threading.Event()

    def _handle_signal(signum, frame):
        _log('Shutting down…')
        stop_event.set()

    signal.signal(signal.SIGINT, _handle_signal)
    signal.signal(signal.SIGTERM, _handle_signal)

    emulator = ModemEmulator(args)
    emulator.setup_pty()
    emulator.start()

    _log(
        f'Listening on {args.serial} -> {emulator.slave_path} '
        f'(sockets={args.sockets}, jitter={args.jitter_mean}±{args.jitter_stddev}ms). '
        f'Ctrl-C to stop.',
    )

    stop_event.wait()
    emulator.stop()
    _log('Done.')


if __name__ == '__main__':
    main()

Demo

msf auxiliary(server/quectel_modem) > run
[*] Probing modem with AT until OK...
[+] Modem is responding to AT commands.
[*] Waiting briefly for RDY URC (best-effort)...
[*] Closing any leftover modem socket connections...
[+] Modem session 1 opened (12 sockets, 1024B chunks)
[*] Auxiliary module execution completed
msf auxiliary(server/quectel_modem) > route add 0 0 -1
[*] Route added
(reverse-i-search)`im': tInterrupt: use the 'exit' command to quit
msf auxiliary(server/quectel_modem) > load request
[*] Successfully loaded plugin: Request
msf auxiliary(server/quectel_modem) > request http://ifconfig.me

zeroSteiner and others added 8 commits May 29, 2026 15:48
Replace the hand-rolled SimpleEvent (Mutex + ConditionVariable) with the
bundled Concurrent::Event, drop a misplaced duplicate readiness/health
block inside QuectelConnection that referenced ivars it never owned, and
remove the per-line timestamp print_* overrides.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The module registers a native pivot session, not a SOCKS proxy, and more
modem types are planned, so rename cellular_socks_proxy to quectel_modem.
Refresh the info hash (name, authors) and drop the unneeded msf/core
require and Rank line flagged by msftidy.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@smcintyre-r7 smcintyre-r7 added needs-docs rn-modules release notes for new or majorly enhanced modules labels Jun 1, 2026
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Jun 1, 2026

Thanks for your pull request! Before this can be merged, we need the following documentation for your module:

@zeroSteiner zeroSteiner force-pushed the feat/mod/cell-socks branch from 1055115 to 88847fc Compare June 1, 2026 22:32
@bwatters-r7 bwatters-r7 self-assigned this Jun 1, 2026
@github-project-automation github-project-automation Bot moved this from Todo to In Progress in Metasploit Kanban Jun 2, 2026
@zeroSteiner zeroSteiner force-pushed the feat/mod/cell-socks branch from e2191ef to 47874a8 Compare June 3, 2026 19:31
@zeroSteiner zeroSteiner force-pushed the feat/mod/cell-socks branch from 47874a8 to 4216de8 Compare June 3, 2026 21:27
@zeroSteiner zeroSteiner force-pushed the feat/mod/cell-socks branch from 4216de8 to b4adc27 Compare June 3, 2026 22:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rn-modules release notes for new or majorly enhanced modules

Projects

Status: In Progress

Development

Successfully merging this pull request may close these issues.

3 participants