Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions Tests/scs_cert_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ def _resolve_spec(spec: dict):
# step 4. resolve references
# step 4a. resolve references to modules in includes
# in this step, we also normalize the include form
for version in spec['versions'].values():
for idx, version in enumerate(spec['versions'].values()):
version['_idx'] = idx
version['include'] = [
{'module': module_lookup[inc], 'parameters': {}} if isinstance(inc, str) else
{'module': module_lookup[inc['ref']], 'parameters': inc.get('parameters', {})}
Expand Down Expand Up @@ -205,8 +206,8 @@ def eval_buckets(results, testcase_ids) -> dict:

def evaluate(results, testcase_ids) -> int:
"""returns overall result"""
return min([
# here, we treat None (MISSING) as 0 (ABORT)
results.get(testcase_id, {}).get('result') or 0
for testcase_id in testcase_ids
], default=0)
buckets = eval_buckets(results, testcase_ids)
for value in (-1, None, 0):
if buckets[value]:
return value
return 1
145 changes: 74 additions & 71 deletions compliance-monitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ def _evaluate_version(version, scope_results):
for tname, tc_ids in version['targets'].items()
}
return {
'_idx': version['_idx'],
'result': target_results['main']['result'],
'targets': target_results,
'tc_target': version['tc_target'],
Expand All @@ -273,21 +274,22 @@ def _evaluate_scope(spec, scope_results, include_drafts=False):
for vname, version in versions.items()
if version['_explicit_validity']
}
by_validity = defaultdict(list)
for vname, version in versions.items():
by_validity[version['_explicit_validity']].append(vname)
# go through worsening validity values until a passing version is found
winner = None # first passed version that's not a draft
result = -1
passed = []
relevant = []
best_passed = None
for validity in ('effective', 'warn', 'deprecated'):
vnames = by_validity[validity]
relevant.extend(vnames)
if any(version_results[vname]['result'] == 1 for vname in vnames):
best_passed = validity
break
if include_drafts:
relevant.extend(by_validity['draft'])
passed = [vname for vname in relevant if version_results[vname]['result'] == 1]
# assumption: versions are listed in spec in descending order recency
# first the drafts, then effective, then warn, then the rest
for vname, version_result in version_results.items():
if version_result['validity'] == 'draft' and not include_drafts:
continue
relevant.append(vname)
result = version_result['result']
if result != -1:
passed.append(vname)
if version_result['validity'] != 'draft':
winner = vname
break
# only list testcases that occur in any relevant version
relevant_testcases = set()
for vname in relevant:
Expand All @@ -304,12 +306,14 @@ def _evaluate_scope(spec, scope_results, include_drafts=False):
},
'versions': version_results,
'relevant': relevant,
'result': result,
'passed': passed,
'passed_str': ', '.join([
vname + ASTERISK_LOOKUP[versions[vname]['validity']]
vname + ASTERISK_LOOKUP[version_results[vname]['validity']]
for vname in passed
]),
'best_passed': best_passed,
'best_passed': None if winner is None else version_results[winner]['_idx'],
'validity': 'deprecated' if winner is None else version_results[winner]['validity'],
}


Expand Down Expand Up @@ -672,20 +676,25 @@ async def get_detail(
subject: str,
scopeuuid: str,
):
return _make_detail_view(conn, view_type, subject, scopeuuid)


def _make_detail_view(conn, view_type, subject, scopeuuid, include_drafts=False):
scopeuuid = _resolve_scope(scopeuuid)
with conn.cursor() as cur:
group, subjects = _resolve_group(cur, subject)
rows2 = []
for subj in subjects:
rows2.extend(db_get_relevant_results2(cur, subj, scopeuuid, approved_only=True))
rows2.extend(db_get_relevant_results2(cur, subj, scopeuuid))
results2 = convert_result_rows_to_dict2(
rows2, get_scopes(), include_report=True, grace_period_days=GRACE_PERIOD_DAYS,
rows2, get_scopes(), include_report=True, include_drafts=include_drafts,
subjects=subjects, scopes=(scopeuuid, ),
)
title = f'Details for group {group}' if group else f'Details for subject {subject}'
if include_drafts:
title += ' (incl. drafts)'
return render_view(
VIEW_DETAIL, view_type, results=results2, base_url=settings.base_url,
title=title,
VIEW_DETAIL, view_type, results=results2, base_url=settings.base_url, title=title,
)


Expand All @@ -697,21 +706,7 @@ async def get_detail_full(
subject: str,
scopeuuid: str,
):
scopeuuid = _resolve_scope(scopeuuid)
with conn.cursor() as cur:
group, subjects = _resolve_group(cur, subject)
rows2 = []
for subj in subjects:
rows2.extend(db_get_relevant_results2(cur, subj, scopeuuid, approved_only=False))
results2 = convert_result_rows_to_dict2(
rows2, get_scopes(), include_report=True, include_drafts=True,
subjects=subjects, scopes=(scopeuuid, ),
)
title = f'Details for group {group}' if group else f'Details for subject {subject}'
return render_view(
VIEW_DETAIL, view_type, results=results2, base_url=settings.base_url,
title=f'{title} (incl. unverified results)',
)
return _make_detail_view(conn, view_type, subject, scopeuuid, include_drafts=True)


@app.get("/{view_type}/table")
Expand All @@ -720,13 +715,20 @@ async def get_table(
conn: Annotated[connection, Depends(get_conn)],
view_type: ViewType,
):
return _make_table_view(conn, view_type, detail_page='detail')


def _make_table_view(conn, view_type, detail_page, include_drafts=False):
with conn.cursor() as cur:
groups = db_get_groups(cur)
rows2 = db_get_relevant_results2(cur, approved_only=True)
results2 = convert_result_rows_to_dict2(rows2, get_scopes(), grace_period_days=GRACE_PERIOD_DAYS)
rows2 = db_get_relevant_results2(cur)
results2 = convert_result_rows_to_dict2(rows2, get_scopes(), include_drafts=include_drafts)
title = 'SCS compliance overview'
if include_drafts:
title += ' (incl. drafts)'
return render_view(
VIEW_TABLE, view_type, results=results2, base_url=settings.base_url, detail_page='detail',
title="SCS compliance overview", groups=groups,
VIEW_TABLE, view_type, results=results2, base_url=settings.base_url, detail_page=detail_page,
title=title, groups=groups,
)


Expand All @@ -736,14 +738,7 @@ async def get_table_full(
conn: Annotated[connection, Depends(get_conn)],
view_type: ViewType,
):
with conn.cursor() as cur:
groups = db_get_groups(cur)
rows2 = db_get_relevant_results2(cur, approved_only=False)
results2 = convert_result_rows_to_dict2(rows2, get_scopes(), include_drafts=True)
return render_view(
VIEW_TABLE, view_type, results=results2, base_url=settings.base_url, detail_page='detail_full',
title="SCS compliance overview (incl. unverified results)", unverified=True, groups=groups,
)
return _make_table_view(conn, view_type, detail_page='detail_full', include_drafts=True)


@app.get("/{view_type}/scope/{scopeuuid}")
Expand All @@ -756,13 +751,12 @@ async def get_scope(
scopeuuid = _resolve_scope(scopeuuid)
spec = get_scopes()[scopeuuid]
versions = spec['versions']
# sort by name, and all drafts after all non-drafts
column_data = [
(version['_explicit_validity'].lower() == 'draft', name)
# use same order as in details view
relevant = [
name
for name, version in versions.items()
if version['_explicit_validity']
]
relevant = [name for _, name in sorted(column_data)]
modules_chart = {}
for name in relevant:
for include in versions[name]['include']:
Expand Down Expand Up @@ -839,40 +833,50 @@ def pick_filter(ctx, results, scopeuuid, *subjects):
return [r for r in rs if r is not None]


STATUS_ORDERING = {
'effective': 10,
'warn': 5,
'deprecated': 1,
NIL = object() # the version in question does not have a result
# used to sort multiple versions according to the "goodness" of their result
RESULT_SCORE = {
-1: 0,
None: 1,
NIL: 2, # NIL and None are basically the same, but prefer None because it has more info
0: 3,
1: 4,
}
COLOR_MAP = {
-1: '🛑', # fail
None: '🟧', # missing
0: '✅*', # inconclusive
1: '✅', # pass
}


def summary_filter(scope_results):
"""Jinja filter to construct summary from `scope_results`"""
# be prepared for empty dicts here because they are created to avoid KeyError in jinja2
if not isinstance(scope_results, dict):
# new generalized case: "aggregate" results for multiple subjects
# simplified computation: just select the worst subject to represent the group
scope_results = min(
scope_results,
default={},
key=lambda sr: STATUS_ORDERING.get(sr.get('best_passed'), -1),
key=lambda sr: RESULT_SCORE[sr.get('result', NIL)],
)
passed_str = scope_results.get('passed_str', '') or '–'
best_passed = scope_results.get('best_passed')
# avoid simple 🟢🔴 (hard to distinguish for color-blind folks)
color = {
'effective': '✅',
'warn': '✅', # forgo differentiation here in favor of simplicity (will be apparent in version list)
'deprecated': '🟧',
}.get(best_passed, '🛑')
if not scope_results:
return '🛑 –'
result = scope_results['result']
color = COLOR_MAP[result]
# if the result is not pass anyway, deduct points if the version is outdated
# (this case should happen very rarely because we usually don't consider those)
if result != -1:
validity = scope_results['validity']
if validity == 'warn':
color = '🟧'
elif validity == 'deprecated':
color = '🛑'
passed_str = scope_results['passed_str'] or '–'
return f'{color} {passed_str}'


def verdict_filter(value):
"""Jinja filter to turn a canonical result value into a written verdict (PASS, MISS, or FAIL)"""
# be fault-tolerant here and turn every non-canonical value into a MISS
return {1: 'PASS', -1: 'FAIL'}.get(value, 'MISS')


def verdict_check_filter(value):
"""Jinja filter to turn a canonical result value into a symbolic verdict (✔, ⚠, or ✘)"""
# be fault-tolerant here and turn every non-canonical value into a MISS
Expand Down Expand Up @@ -905,7 +909,6 @@ def reload_static_config(*args, do_ensure_schema=False):
env.filters.update(
pick=pick_filter,
summary=summary_filter,
verdict=verdict_filter,
verdict_check=verdict_check_filter,
markdown=markdown,
validity_symbol=ASTERISK_LOOKUP.get,
Expand Down
2 changes: 1 addition & 1 deletion compliance-monitor/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ def db_insert_result2(

def db_get_relevant_results2(
cur: cursor,
subject=None, scopeuuid=None, version=None, approved_only=True,
subject=None, scopeuuid=None, version=None, approved_only=False,
):
"""for each combination of scope/version/check, get the most recent test result that is still valid"""
# find the latest result per subject/scopeuuid/version/checkid for this subject
Expand Down
2 changes: 0 additions & 2 deletions compliance-monitor/templates/overview.md.j2
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ we could of course iterate over results etc., but hardcode the table (except the
for the time being to have the highest degree of control
-#}

{% if unverified %}These tables show the most recent results, including unverified ones. Consumers are referred to the [verified tables]({{base_url}}page/table). **Beware of false positives!**{% else %}These tables show the most recent **verified** results.{% endif %}

Version numbers are suffixed by a symbol depending on state: * for _draft_, † for _warn_ (soon to be deprecated), and †† for _deprecated_.

### SCS-compatible IaaS
Expand Down
Loading