|
| 1 | +#!/usr/bin/env python |
| 2 | + |
| 3 | +""" |
| 4 | +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) |
| 5 | +See the file 'LICENSE' for copying permission |
| 6 | +
|
| 7 | +Property/fuzz tests for the pure parsers and transforms. Where the other test |
| 8 | +files pin specific examples, these assert INVARIANTS over hundreds of randomized |
| 9 | +(but deterministic, cross-version-identical - see _testutils.Rng) inputs, which is |
| 10 | +the cheap net for the edge-bug class that example tests miss (commas inside quoted |
| 11 | +literals / nested parens, NUL / 0xff / astral code points in codecs, etc.). |
| 12 | +
|
| 13 | +Property families: |
| 14 | + - codec/serializer pairs round-trip: decode(encode(x)) == x |
| 15 | + - structure transforms preserve their contract (flat/de-arrayized/permutation) |
| 16 | + - string transforms hold their stated invariant (ASCII-only, no newlines, ...) |
| 17 | + - random helpers respect length / alphabet / range bounds |
| 18 | + - splitFields/zeroDepthSearch partition faithfully and never cut inside a group |
| 19 | + - a batch of transforms never raise on arbitrary input |
| 20 | +
|
| 21 | +On failure _testutils.for_all prints the exact offending input + its case index so |
| 22 | +it reproduces on any interpreter. |
| 23 | +""" |
| 24 | + |
| 25 | +import os |
| 26 | +import string |
| 27 | +import sys |
| 28 | +import unittest |
| 29 | + |
| 30 | +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) |
| 31 | +from _testutils import bootstrap, for_all, set_dbms |
| 32 | +bootstrap() |
| 33 | + |
| 34 | +from extra.cloak.cloak import cloak, decloak |
| 35 | +from lib.core.common import (escapeJsonValue, filterStringValue, flattenValue, isListLike, normalizeUnicode, |
| 36 | + prioritySortColumns, randomInt, randomRange, randomStr, safeSQLIdentificatorNaming, |
| 37 | + sanitizeStr, splitFields, unArrayizeValue, unsafeSQLIdentificatorNaming, urldecode, |
| 38 | + urlencode, zeroDepthSearch) |
| 39 | +from lib.core.convert import (base64pickle, base64unpickle, decodeBase64, decodeHex, dejsonize, encodeBase64, |
| 40 | + encodeHex, getBytes, getConsoleLength, getOrds, getText, htmlEscape, htmlUnescape, |
| 41 | + jsonize, stdoutEncode) |
| 42 | +from lib.core.data import kb |
| 43 | +from lib.utils.safe2bin import safecharencode |
| 44 | + |
| 45 | + |
| 46 | +# --- input strategies (draw ONLY through rng: randint / choice / sample / blob) --- |
| 47 | + |
| 48 | +# deliberately loaded with structural metacharacters + tricky code points |
| 49 | +_TEXT = [u"a", u"Z", u"7", u" ", u",", u"'", u'"', u"(", u")", u"\\", u";", |
| 50 | + u"\n", u"\t", u"\x00", u"\x7f", u"\xe9", u"\u0107", u"\u4e2d", u"\U0001F600", u" FROM "] |
| 51 | + |
| 52 | + |
| 53 | +def gen_text(rng): |
| 54 | + return u"".join(rng.choice(_TEXT) for _ in range(rng.randint(0, 24))) |
| 55 | + |
| 56 | + |
| 57 | +def gen_ascii(rng): |
| 58 | + return u"".join(rng.choice(string.printable) for _ in range(rng.randint(0, 20))) |
| 59 | + |
| 60 | + |
| 61 | +def gen_blob(rng): |
| 62 | + return rng.blob(rng.randint(0, 32)) |
| 63 | + |
| 64 | + |
| 65 | +def gen_json(rng): |
| 66 | + # JSON-safe only: tuples become lists and non-str keys are coerced, so exclude them here |
| 67 | + if rng.randint(0, 4) == 0: |
| 68 | + return [gen_json(rng) for _ in range(rng.randint(0, 3))] |
| 69 | + if rng.randint(0, 4) == 0: |
| 70 | + return dict((u"k%d" % j, gen_json(rng)) for j in range(rng.randint(0, 3))) |
| 71 | + return rng.choice([0, 1, -1, 2 ** 31, 1.5, -0.25, True, False, None, u"", u"x", u"\u0107", u'a"b,c']) |
| 72 | + |
| 73 | + |
| 74 | +def gen_pickle(rng): |
| 75 | + kind = rng.randint(0, 9) |
| 76 | + if kind < 5: |
| 77 | + return rng.choice([0, -7, 2 ** 40, 3.5, True, False, None, u"\u0107x", b"\x00\xff", u""]) |
| 78 | + if kind < 7: |
| 79 | + return [gen_pickle(rng) for _ in range(rng.randint(0, 3))] |
| 80 | + if kind < 8: |
| 81 | + return tuple(gen_pickle(rng) for _ in range(rng.randint(0, 3))) |
| 82 | + if kind < 9: |
| 83 | + return set(rng.choice([1, 2, 3, u"a", u"b"]) for _ in range(rng.randint(0, 3))) |
| 84 | + return dict((u"k%d" % j, gen_pickle(rng)) for j in range(rng.randint(0, 2))) |
| 85 | + |
| 86 | + |
| 87 | +def gen_columns(rng): |
| 88 | + return [rng.choice([u"id", u"userid", u"name", u"password", u"a", u"created_id", u"x_id_y", u"data"]) |
| 89 | + for _ in range(rng.randint(0, 6))] |
| 90 | + |
| 91 | + |
| 92 | +def gen_ident(rng): |
| 93 | + # clean (round-trippable) identifier names: letters/digits/underscore, optional dot/space |
| 94 | + chars = string.ascii_letters + string.digits + u"_" |
| 95 | + name = u"".join(rng.choice(chars) for _ in range(rng.randint(1, 10))) |
| 96 | + if rng.randint(0, 3) == 0: |
| 97 | + name += rng.choice([u".col", u" alias", u"_2"]) |
| 98 | + return name |
| 99 | + |
| 100 | + |
| 101 | +# well-formed field lists: balanced parens, properly closed/escaped quotes |
| 102 | +_TOKENS = [u"foo", u"bar", u"id", u"a b", u"1", u"*", u"max(a)", u"COALESCE(a, b, c)", u"func(x, y)"] |
| 103 | +_QUOTED = [u"a,b", u"x, y", u"f(1, 2)", u"o''k", u"plain", u""] |
| 104 | + |
| 105 | + |
| 106 | +def gen_sql_fields(rng): |
| 107 | + parts = [] |
| 108 | + for _ in range(rng.randint(1, 5)): |
| 109 | + t = rng.randint(0, 9) |
| 110 | + if t < 5: |
| 111 | + parts.append(rng.choice(_TOKENS)) |
| 112 | + elif t < 8: |
| 113 | + q = rng.choice([u"'", u'"']) |
| 114 | + parts.append(q + rng.choice(_QUOTED) + q) |
| 115 | + else: |
| 116 | + parts.append(u"g(%s, %s)" % (rng.choice(_TOKENS), rng.choice(_TOKENS))) |
| 117 | + return u", ".join(parts) |
| 118 | + |
| 119 | + |
| 120 | +class TestCodecRoundTrips(unittest.TestCase): |
| 121 | + def test_base64(self): |
| 122 | + for_all(self, gen_blob, lambda b: decodeBase64(encodeBase64(b)) == b, label="base64") |
| 123 | + |
| 124 | + def test_hex(self): |
| 125 | + for_all(self, gen_blob, lambda b: decodeHex(encodeHex(b)) == b, label="hex") |
| 126 | + |
| 127 | + def test_getbytes_gettext(self): |
| 128 | + # unsafe=False -> plain UTF-8 (no \xNN escape interpretation), so it is a clean round-trip |
| 129 | + for_all(self, gen_text, lambda s: getText(getBytes(s, unsafe=False)) == s, label="bytes-text") |
| 130 | + |
| 131 | + def test_json(self): |
| 132 | + for_all(self, gen_json, lambda v: dejsonize(jsonize(v)) == v, label="json") |
| 133 | + |
| 134 | + def test_pickle(self): |
| 135 | + for_all(self, gen_pickle, lambda v: base64unpickle(base64pickle(v)) == v, label="pickle") |
| 136 | + |
| 137 | + def test_html_escape(self): |
| 138 | + for_all(self, gen_text, lambda s: htmlUnescape(htmlEscape(s)) == s, label="html") |
| 139 | + |
| 140 | + def test_cloak(self): |
| 141 | + for_all(self, gen_blob, lambda b: decloak(data=cloak(data=b)) == b, label="cloak") |
| 142 | + |
| 143 | + |
| 144 | +class TestStructureTransforms(unittest.TestCase): |
| 145 | + def test_unarrayize_never_listlike(self): |
| 146 | + # the whole point of unArrayizeValue is that the result is a scalar, never a list/tuple |
| 147 | + # (gen_pickle includes sets - they used to crash here; see test_unarrayize_set regression) |
| 148 | + for_all(self, gen_pickle, lambda v: not isListLike(unArrayizeValue(v)), label="unarrayize") |
| 149 | + |
| 150 | + def test_flatten_is_flat(self): |
| 151 | + for_all(self, gen_pickle, lambda v: all(not isListLike(x) for x in flattenValue([v])), label="flatten") |
| 152 | + |
| 153 | + def test_unarrayize_set(self): |
| 154 | + # regression: a 1-element set is list-like but not subscriptable; unArrayizeValue must |
| 155 | + # de-arrayize it rather than crash on value[0] |
| 156 | + self.assertEqual(unArrayizeValue(set(["x"])), "x") |
| 157 | + self.assertEqual(unArrayizeValue(set()), None) |
| 158 | + self.assertEqual(unArrayizeValue(["1"]), "1") # ordinary fast-path still works |
| 159 | + |
| 160 | + def test_prioritysort_is_permutation(self): |
| 161 | + # sorting must not invent/drop columns, and must be idempotent |
| 162 | + def prop(cols): |
| 163 | + out = prioritySortColumns(cols) |
| 164 | + return sorted(out) == sorted(cols) and prioritySortColumns(out) == out |
| 165 | + for_all(self, gen_columns, prop, label="prioritysort") |
| 166 | + |
| 167 | + |
| 168 | +class TestStringTransforms(unittest.TestCase): |
| 169 | + def test_normalize_unicode_is_ascii(self): |
| 170 | + for_all(self, gen_text, lambda s: all(ord(c) < 128 for c in normalizeUnicode(s)), label="normalize-ascii") |
| 171 | + |
| 172 | + def test_sanitizestr_strips_newlines(self): |
| 173 | + for_all(self, gen_text, lambda s: "\n" not in sanitizeStr(s) and "\r" not in sanitizeStr(s), label="sanitizestr") |
| 174 | + |
| 175 | + def test_filterstringvalue_charset(self): |
| 176 | + allowed = set("0123456789abcdef") |
| 177 | + for_all(self, gen_text, lambda s: set(filterStringValue(s, r"[0-9a-f]")) <= allowed, label="filterstring") |
| 178 | + |
| 179 | + def test_escapejson_no_control_char(self): |
| 180 | + # control chars and bare quotes must be escaped away (output is JSON-string-body safe re: those) |
| 181 | + for_all(self, gen_text, lambda s: all(c >= " " for c in escapeJsonValue(s)), label="escapejson-invariant") |
| 182 | + |
| 183 | + def test_escapejson_json_roundtrip(self): |
| 184 | + # escapeJsonValue(s) embedded in a JSON string must parse back to s - for ALL text, |
| 185 | + # including backslash (the F1 fix; this used to fail on '\') |
| 186 | + import json |
| 187 | + for_all(self, gen_text, lambda s: json.loads(u'"%s"' % escapeJsonValue(s)) == s, label="escapejson-roundtrip") |
| 188 | + |
| 189 | + def test_escapejson_backslash(self): |
| 190 | + # regression for F1: backslash is now escaped, so the round-trip holds |
| 191 | + import json |
| 192 | + self.assertEqual(json.loads(u'"%s"' % escapeJsonValue(u"a\\b")), u"a\\b") |
| 193 | + |
| 194 | + def test_getords_length(self): |
| 195 | + for_all(self, gen_text, lambda s: len(getOrds(s)) == len(s) and all(isinstance(o, int) for o in getOrds(s)), label="getords") |
| 196 | + |
| 197 | + def test_consolelength_ascii(self): |
| 198 | + for_all(self, gen_ascii, lambda s: getConsoleLength(s) == len(s), label="consolelength") |
| 199 | + |
| 200 | + |
| 201 | +class TestRandomHelpers(unittest.TestCase): |
| 202 | + def test_randomstr_length_and_alphabet(self): |
| 203 | + for_all(self, lambda r: r.randint(0, 16), |
| 204 | + lambda n: len(randomStr(n)) == n and set(randomStr(n)) <= set(string.ascii_letters), label="randomstr") |
| 205 | + |
| 206 | + def test_randomstr_lowercase(self): |
| 207 | + for_all(self, lambda r: r.randint(0, 16), |
| 208 | + lambda n: set(randomStr(n, lowercase=True)) <= set(string.ascii_lowercase), label="randomstr-lower") |
| 209 | + |
| 210 | + def test_randomint_digits(self): |
| 211 | + for_all(self, lambda r: r.randint(1, 8), lambda n: len(str(randomInt(n))) == n, label="randomint") |
| 212 | + |
| 213 | + def test_randomrange_bounds(self): |
| 214 | + def prop(_): |
| 215 | + a = _[0] |
| 216 | + b = _[0] + _[1] |
| 217 | + return a <= randomRange(a, b) <= b |
| 218 | + for_all(self, lambda r: (r.randint(-50, 50), r.randint(0, 100)), prop, label="randomrange") |
| 219 | + |
| 220 | + |
| 221 | +class TestSplitterInvariants(unittest.TestCase): |
| 222 | + def test_reconstruction(self): |
| 223 | + # pure partition identity: rejoining the 0-depth split must reproduce the (space-normalized) input |
| 224 | + for_all(self, gen_text, lambda s: u",".join(splitFields(s)) == s.replace(", ", ","), label="split-reconstruct-text") |
| 225 | + for_all(self, gen_sql_fields, lambda s: u",".join(splitFields(s)) == s.replace(", ", ","), label="split-reconstruct-sql") |
| 226 | + |
| 227 | + def test_never_cuts_inside_parens(self): |
| 228 | + # on well-formed input no field may carry unbalanced parens (i.e. a split never lands inside a group) |
| 229 | + for_all(self, gen_sql_fields, lambda s: all(f.count(u"(") == f.count(u")") for f in splitFields(s)), label="split-balanced") |
| 230 | + |
| 231 | + def test_zerodepth_indices_are_real_commas(self): |
| 232 | + def prop(s): |
| 233 | + idx = zeroDepthSearch(s, ",") |
| 234 | + return all(s[i] == u"," for i in idx) and idx == sorted(idx) and len(set(idx)) == len(idx) |
| 235 | + for_all(self, gen_text, prop, label="zerodepth-commas-text") |
| 236 | + for_all(self, gen_sql_fields, prop, label="zerodepth-commas-sql") |
| 237 | + |
| 238 | + |
| 239 | +class TestIdentifierRoundTrip(unittest.TestCase): |
| 240 | + def setUp(self): |
| 241 | + self._saved = kb.get("forcedDbms") |
| 242 | + set_dbms("MySQL") # identifier quoting is DBMS-specific; pin a case-preserving back-end |
| 243 | + |
| 244 | + def tearDown(self): |
| 245 | + kb.forcedDbms = self._saved |
| 246 | + |
| 247 | + def test_safe_unsafe_roundtrip(self): |
| 248 | + for_all(self, gen_ident, lambda n: unsafeSQLIdentificatorNaming(safeSQLIdentificatorNaming(n)) == n, label="identifier") |
| 249 | + |
| 250 | + |
| 251 | +class TestRobustness(unittest.TestCase): |
| 252 | + # total functions: must never raise on arbitrary text (return value unconstrained) |
| 253 | + def test_urlencode_urldecode(self): |
| 254 | + for_all(self, gen_text, lambda s: (urlencode(s), urldecode(s)) and True, label="urlcodec") |
| 255 | + |
| 256 | + def test_safecharencode(self): |
| 257 | + for_all(self, gen_text, lambda s: safecharencode(s) is not None or s == u"", label="safecharencode") |
| 258 | + |
| 259 | + def test_stdoutencode(self): |
| 260 | + for_all(self, gen_text, lambda s: stdoutEncode(s) is not None or s == u"", label="stdoutencode") |
| 261 | + |
| 262 | + |
| 263 | +if __name__ == "__main__": |
| 264 | + unittest.main() |
0 commit comments