-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathciq_helpers.py
More file actions
478 lines (376 loc) · 15.7 KB
/
ciq_helpers.py
File metadata and controls
478 lines (376 loc) · 15.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
#!/usr/bin/env python3
#
# CIQ Kernel Tools function library
import logging
import os
import re
import subprocess
import git
def process_full_commit_message(commit):
"""Process the full git commit message specific to the CIQ Kernel Tools.
NOTE: This has only been tested with byte strings and not unicode strings.
Parameters:
commit: <byte string array> The full commit message from a git commit.
Return:
upstream_commit: The upstream commit SHA1.
cves: A list of CVEs.
tickets: The ticket number.
upstream_subject: The subject of the commit.
repo_commit: The repo commit SHA1.
"""
cves = []
tickets = []
upstream_commit = ""
repo_commit = ""
upstream_subject = ""
repo_commit = commit[0].decode("utf-8").split()[1]
upstream_subject = commit[4].decode("utf-8").strip()
for line in commit[5:]:
if re.match(b"^ jira", line, re.IGNORECASE):
tickets.append(line.decode("utf-8").strip().split()[1:])
elif re.match(b"^ cve", line, re.IGNORECASE):
cves.append(line.decode("utf-8").strip().split()[1:])
elif re.match(b"^ commit ", line, re.IGNORECASE):
_commit = line.decode("utf-8").strip().split()
if len(_commit) > 1:
upstream_commit = _commit[1]
if line.decode("utf-8").strip() == "" and upstream_commit:
break
return upstream_commit, cves, tickets, upstream_subject, repo_commit
def get_backport_commit_data(repo, branch, common_ancestor, allow_duplicates=False):
"""Get a dictionary of backport commits from a repo on a branch to the common ancestor.
parameters
repo: The git repo patch to the source
branch: The branch we're building the backport data from
common_ancestor: The Tag on Linus Mainline that is the common ancestor for the branch, this is where we stop
looking for commits. This is the tag that was used to create the branch.
allow_duplicates: Allow duplicate commits in the backport data, this will overwrite the first one.
Default is False.
Note: This option is added because due to CentOS's cherry-pick process, we may have duplicate backprots in the
backport data due to inconsistent changelog histories.
Return: Dictoionary of backport commits
"upstream_commmit": {
"repo_commit": "SHA1",
"upstream_subject": "Subject",
"cves": ["tag1", "tag2"], (Optional)
"tickets": ["JIRA-1234"], (Optional)
}
"""
upstream_commits = {}
subprocess.run(
["git", "checkout", "-f", branch],
cwd=repo,
timeout=240,
check=True,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
)
cmd = ["git", "log", "--no-abbrev-commit", common_ancestor + "~1.." + branch]
res = subprocess.run(cmd, cwd=repo, timeout=240, check=True, stdout=subprocess.PIPE)
lines = res.stdout.splitlines()
commit = []
for line in lines:
if len(commit) > 0 and line.startswith(b"commit "):
upstream_commit, cves, tickets, upstream_subject, repo_commit = process_full_commit_message(commit)
if upstream_commit in upstream_commits:
print(f"WARNING: {upstream_commit} already in upstream_commits")
if not allow_duplicates:
return upstream_commits, False
if upstream_commit != "":
upstream_commits[upstream_commit] = {
"repo_commit": repo_commit,
"upstream_subject": upstream_subject,
"cves": cves,
"tickets": tickets,
}
commit = []
commit.append(line)
return upstream_commits, True
def CIQ_cherry_pick_commit_standardization(lines, commit, tags=None, jira="", optional_msg=""):
"""Standardize CIQ the cherry-pick commit message.
Parameters:
lines: Original SHAS commit message.
commit: The commit SHA1 that was cherry-picked.
tags: A list of tags to add to the commit message.
jira: The JIRA number to add to the commit message, this can be a comma separated list.
optional_msg: An optional message to add to the commit message. Traditionally used for `upstream-diff`.
Return: The modified commit message passed in as lines.
"""
# assemble in reverse by inserting lines below first blank line (line 2)
lines.insert(2, "\n")
if optional_msg != "":
lines.insert(2, f"{optional_msg}\n")
lines.insert(2, f"commit {commit}\n")
if tags:
for tag in tags[::-1]:
lines.insert(2, f"{tag}\n")
if jira:
for i in jira.split(","):
lines.insert(2, f"jira {i.strip()}\n")
# We Need to indent lines that have email addresss as some tooling in the community
# will atttempt to read these lines and email everyone on the list. We do not want
# to annoy the community when doing our own work.
for i in range(5, len(lines)):
# The (cherry Picked from commit: <sha1>) line is the indicator we cherry-picked
if lines[i].startswith("cherry picked from commit"):
break
if (
lines[i].startswith("Signed-off-by")
or lines[i].startswith("Reported-by")
or lines[i].startswith("Cc:")
or lines[i].startswith("Reviewed-by")
or lines[i].startswith("Tested-by")
or lines[i].startswith("Debugged-by")
or lines[i].startswith("Acked-by")
or lines[i].startswith("Suggested-by")
):
lines[i] = f"\t{lines[i]}"
return lines
def CIQ_original_commit_author_to_tag_string(repo_path, sha):
"""This will grab the original commit author and return the "tag" we use for the CIQ based header
Parameters:
repo_path: pwd to the repository with the kernel mainline remote
sha: this is the full commit sha we're going to backport
Return: String for Tag
"""
git_auth_res = subprocess.run(
["git", "show", '--pretty="%aN <%aE>"', "--no-patch", sha],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
cwd=repo_path,
)
if git_auth_res.returncode != 0:
print(f"[FAILED] git show --pretty='%aN <%aE>' --no-patch {sha}")
print(f"[FAILED][STDERR:{git_auth_res.returncode}] {git_auth_res.stderr.decode('utf-8')}")
return None
return "commit-author " + git_auth_res.stdout.decode("utf-8").replace('"', "").strip()
def CIQ_run_git(repo_path, args):
"""
Run a git command in the given repository and return its output as a string.
"""
result = subprocess.run(["git", "-C", repo_path] + args, text=True, capture_output=True, check=False)
if result.returncode != 0:
raise RuntimeError(f"Git command failed: {' '.join(args)}\n{result.stderr}")
return result.stdout
def CIQ_get_commit_body(repo_path, sha):
return CIQ_run_git(repo_path, ["show", "-s", sha, "--format=%B"])
def CIQ_extract_fixes_references_from_commit_body_lines(lines):
fixes = []
for line in lines:
m = re.match(r"^\s*Fixes:\s*([0-9a-fA-F]{6,40})", line, re.IGNORECASE)
if not m:
continue
fixes.append(m.group(1))
return fixes
def CIQ_fixes_references(repo_path, sha):
"""
If commit message of sha contains lines like
Fixes: <short_fixed>, this returns a list of <short_fixed>, otherwise an empty list
"""
commit_body = CIQ_get_commit_body(repo_path, sha)
return CIQ_extract_fixes_references_from_commit_body_lines(lines=commit_body.splitlines())
def CIQ_get_full_hash(repo, short_hash):
return CIQ_run_git(repo, ["show", "-s", "--pretty=%H", short_hash]).strip()
def CIQ_get_current_branch(repo):
return CIQ_run_git(repo, ["branch", "--show-current"]).strip()
def CIQ_hash_exists_in_ref(repo, pr_ref, hash_):
"""
Return True if hash_ is reachable from pr_ref
"""
try:
CIQ_run_git(repo, ["merge-base", "--is-ancestor", hash_, pr_ref])
return True
except RuntimeError:
return False
def CIQ_commit_exists_in_branch(repo, pr_branch, upstream_hash_):
"""
Return True if upstream_hash_ has been backported and it exists in the pr branch
"""
# First check if the commit has been backported by CIQ
output = CIQ_run_git(repo, ["log", pr_branch, "--grep", "^commit " + upstream_hash_])
if output:
return True
# If it was not backported by CIQ, maybe it came from upstream as it is
return CIQ_hash_exists_in_ref(repo, pr_branch, upstream_hash_)
def CIQ_commit_exists_in_current_branch(repo, upstream_hash_):
"""
Return True if upstream_hash_ has been backported and it exists in the current branch
"""
current_branch = CIQ_get_current_branch(repo)
full_upstream_hash = CIQ_get_full_hash(repo, upstream_hash_)
return CIQ_commit_exists_in_branch(repo, current_branch, full_upstream_hash)
def CIQ_find_fixes_in_mainline(repo, pr_branch, upstream_ref, hash_):
"""
Return unique commits in upstream_ref that have Fixes: <N chars of hash_> in their message, case-insensitive,
if they have not been committed in the pr_branch.
Start from 12 chars and work down to 6, but do not include duplicates if already found at a longer length.
Returns a list of tuples: (full_hash, display_string)
"""
results = []
# Prepare hash prefixes from 12 down to 6
hash_prefixes = [hash_[:index] for index in range(12, 5, -1)]
# Get all commits with 'Fixes:' in the message
output = CIQ_run_git(
repo,
[
"log",
upstream_ref,
"--grep",
"Fixes:",
"-i",
"--format=%H %h %s (%an)%x0a%B%x00",
],
).strip()
if not output:
return []
# Each commit is separated by a NUL character and a newline
commits = output.split("\x00\x0a")
for commit in commits:
if not commit.strip():
continue
lines = commit.splitlines()
# The first line is the summary, the rest is the body
header = lines[0]
full_hash, display_string = (lambda h: (h[0], " ".join(h[1:])))(header.split())
fixes = CIQ_extract_fixes_references_from_commit_body_lines(lines=lines[1:])
for fix in fixes:
for prefix in hash_prefixes:
if fix.lower().startswith(prefix.lower()):
if not CIQ_commit_exists_in_branch(repo, pr_branch, full_hash):
results.append((full_hash, display_string))
break
return results
def CIQ_find_fixes_in_mainline_current_branch(repo, upstream_ref, hash_):
current_branch = CIQ_get_current_branch(repo)
return CIQ_find_fixes_in_mainline(repo, current_branch, upstream_ref, hash_)
def CIQ_reset_HEAD(repo):
return CIQ_run_git(repo_path=repo, args=["reset", "--hard", "HEAD"])
def CIQ_raise_or_warn(cond, error_msg, warn):
if not cond:
return
if not warn:
raise RuntimeError(error_msg)
logging.warning(error_msg)
def repo_init(repo):
"""Initialize a git repo object.
Parameters:
repo: The path to the git repo.
Return: The git repo object.
"""
if os.path.isdir(repo):
return git.Repo.init(repo)
return None
def last_git_tag(repo):
"""Returns the most recent tag for repo.
Repo can either be a path to a repo or a git repo object.
"""
if isinstance(repo, str) and os.path.isdir(repo):
repo = git.Repo.init(repo)
r = repo.git.describe("--tags", "--abbrev=0")
if not r:
raise Exception("Could not find last tag for", repo)
return r
def get_git_user(repo):
"""Get the git user name and email from a repo's config.
Returns a (name, email) tuple.
Raises git.exc.GitCommandError if user.name or user.email are not configured.
"""
name = repo.git.config("user.name")
email = repo.git.config("user.email")
return name, email
def parse_ciq_tag_release(tag):
"""Extract the release counter N from a CIQ tag like 'ciq_kernel-6.18.21-1'.
Returns the integer N.
Raises ValueError if the tag is not in ciq_kernel-X.Y.Z-N format.
"""
m = re.match(r"^ciq_kernel-\d+\.\d+\.\d+-(\d+)$", tag)
if not m:
raise ValueError(
f"Cannot parse CIQ release from tag: {tag!r} (expected 'ciq_kernel-X.Y.Z-N', e.g. 'ciq_kernel-6.18.21-1')"
)
return int(m.group(1))
def parse_kernel_tag(tag):
"""Validate and parse a kernel version tag.
Accepts: 'v6.12.74', '6.12.74', or 'ciq_kernel-6.12.74-N'.
Returns the version string (e.g., '6.12.74').
Raises ValueError if the tag format is invalid.
"""
m = re.match(r"^ciq_kernel-(\d+\.\d+\.\d+)-\d+$", tag)
if m:
return m.group(1)
tag_without_v = tag.removeprefix("v")
tag_parts = tag_without_v.split(".")
if len(tag_parts) != 3:
raise ValueError(f"Invalid kernel tag format: {tag} (expected vX.Y.Z, X.Y.Z, or ciq_kernel-X.Y.Z-N)")
try:
for part in tag_parts:
int(part)
except ValueError:
raise ValueError(f"Invalid kernel tag format: {tag} (version parts must be numeric)")
return tag_without_v
def replace_spec_changelog(spec_lines, new_changelog_lines):
"""Replace the %changelog section in spec_lines with new_changelog_lines.
Preserves any trailing comment lines (starting with #) from the original changelog.
Returns a new list of lines.
Raises ValueError if %changelog is not found.
"""
# Collect trailing comments from the original changelog section
trailing_comments = []
in_changelog = False
for line in spec_lines:
if line.startswith("%changelog"):
in_changelog = True
continue
if in_changelog and (line.startswith("#")):
trailing_comments.append(line)
# Build new spec, replacing everything from %changelog onward
new_spec = []
found = False
for line in spec_lines:
if line.startswith("%changelog"):
found = True
new_spec.append(line)
new_spec.extend(new_changelog_lines)
new_spec.extend(trailing_comments)
break
new_spec.append(line)
if not found:
raise ValueError("Could not find %changelog section in spec file")
return new_spec
def prepend_spec_changelog(spec_lines, new_entry_lines):
"""Prepend new_entry_lines at the top of the %changelog section.
The existing changelog entries are preserved below the new entry.
Returns a new list of lines.
Raises ValueError if %changelog is not found.
"""
new_spec = []
found = False
for i, line in enumerate(spec_lines):
if line.startswith("%changelog"):
found = True
new_spec.append(line)
new_spec.extend(new_entry_lines)
new_spec.extend(spec_lines[i + 1 :])
break
new_spec.append(line)
if not found:
raise ValueError("Could not find %changelog section in spec file")
return new_spec
def _read_spec_define(spec_lines, name, value_pattern):
"""Return the value of a %define directive from spec file lines.
Builds a regex from name and value_pattern (a raw regex string matching the value),
scans spec_lines for the first match, and returns the captured value string.
Raises ValueError if the directive is not found.
"""
pattern = re.compile(rf"^%define {re.escape(name)}\s+({value_pattern})")
for line in spec_lines:
m = pattern.match(line)
if m:
return m.group(1)
raise ValueError(f"Could not find %define {name} in spec file")
def read_spec_el_version(spec_lines):
"""Read the EL version number from spec file lines.
Returns the el_version string (e.g., '9').
Raises ValueError if not found.
"""
return _read_spec_define(spec_lines, "el_version", r"\d+")