-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathcommon.py
More file actions
142 lines (116 loc) · 3.94 KB
/
common.py
File metadata and controls
142 lines (116 loc) · 3.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# ruff: noqa: PLR2004
from binascii import crc32 as crc32int
import lzma
from struct import pack, unpack
from typing import cast
HEADER_MAGIC = b"\xfd7zXZ\x00"
FOOTER_MAGIC = b"YZ"
class XZError(Exception):
pass
def encode_mbi(value: int) -> bytes:
data = bytearray()
while value >= 0x80:
data.append((value & 0x7F) | 0x80)
value >>= 7
data.append(value)
return data
def decode_mbi(data: bytes) -> tuple[int, int]:
value = 0
for size, byte in enumerate(data):
value |= (byte & 0x7F) << (size * 7)
if not byte & 0x80:
return (size + 1, value)
raise XZError("invalid mbi")
def crc32(data: bytes) -> bytes:
return pack("<I", crc32int(data))
def round_up(value: int) -> int:
remainder = value % 4
if remainder:
return value - remainder + 4
return value
def pad(value: int) -> bytes:
return b"\x00" * (round_up(value) - value)
def create_xz_header(check: int) -> bytes:
if not 0 <= check <= 0xF:
raise XZError("header check")
# stream header
flags = pack("<BB", 0, check)
return HEADER_MAGIC + flags + crc32(flags)
def create_xz_index_footer(check: int, records: list[tuple[int, int]]) -> bytes:
if not 0 <= check <= 0xF:
raise XZError("footer check")
# index
index = b"\x00"
index += encode_mbi(len(records))
for unpadded_size, uncompressed_size in records:
if not unpadded_size:
raise XZError("index record unpadded size")
index += encode_mbi(unpadded_size)
index += encode_mbi(uncompressed_size)
index += pad(len(index))
index += crc32(index)
# stream footer
footer = pack("<IBB", (len(index) // 4) - 1, 0, check)
footer = crc32(footer) + footer + FOOTER_MAGIC
return index + footer
def parse_xz_header(header: bytes) -> int:
if len(header) != 12:
raise XZError("header length")
if header[:6] != HEADER_MAGIC:
raise XZError("header magic")
if crc32(header[6:8]) != header[8:12]:
raise XZError("header crc32")
flag_first_byte, check = cast(
"tuple[int, int]",
unpack("<BB", header[6:8]),
)
if flag_first_byte or not 0 <= check <= 0xF:
raise XZError("header flags")
return check
def parse_xz_index(index: bytes) -> list[tuple[int, int]]:
if len(index) < 8 or len(index) % 4:
raise XZError("index length")
index = memoryview(index)
if index[0]:
raise XZError("index indicator")
if crc32(index[:-4]) != index[-4:]:
raise XZError("index crc32")
size, nb_records = decode_mbi(index[1:])
index = index[1 + size : -4]
# records
records = []
for _ in range(nb_records):
if not index:
raise XZError("index size")
size, unpadded_size = decode_mbi(index)
if not unpadded_size:
raise XZError("index record unpadded size")
index = index[size:]
if not index:
raise XZError("index size")
size, uncompressed_size = decode_mbi(index)
if not uncompressed_size:
raise XZError("index record uncompressed size")
index = index[size:]
records.append((unpadded_size, uncompressed_size))
# index padding
if any(index):
raise XZError("index padding")
return records
def parse_xz_footer(footer: bytes) -> tuple[int, int]:
if len(footer) != 12:
raise XZError("footer length")
if footer[10:12] != FOOTER_MAGIC:
raise XZError("footer magic")
if crc32(footer[4:10]) != footer[:4]:
raise XZError("footer crc32")
backward_size, flag_first_byte, check = cast(
"tuple[int, int, int]",
unpack("<IBB", footer[4:10]),
)
backward_size = (backward_size + 1) * 4
if flag_first_byte or not 0 <= check <= 0xF:
raise XZError("footer flags")
return (check, backward_size)
# find default value for check implicitly used by lzma
DEFAULT_CHECK = parse_xz_header(lzma.compress(b"")[:12])