diff --git a/src/cfengine_cli/lint.py b/src/cfengine_cli/lint.py index ee133f2..38882c9 100644 --- a/src/cfengine_cli/lint.py +++ b/src/cfengine_cli/lint.py @@ -32,32 +32,59 @@ class _State: block_type: str | None = None # "bundle" | "body" | "promise" | None promise_type: str | None = None # "vars" | "files" | "classes" | ... | None attribute_name: str | None = None # "if" | "string" | "slist" | ... | None + namespace: str = "default" # "ns" | "default" | ... | - def update(self, node) -> "_State": + def update(self, node): """Updates and returns the state that should apply to the children of `node`.""" + if node.type == "}": + assert node.parent + assert node.parent.type in [ + "bundle_block_body", + "promise_block_body", + "body_block_body", + "list", + ] + if node.parent.type != "list": + # We just ended a block + self.block_type = None + self.promise_type = None + self.attribute_name = None + return + if node.type == ";": + self.attribute_name = None + return if node.type == "bundle_block": - return _State(block_type="bundle") + self.block_type = "bundle" + return if node.type == "body_block": - return _State(block_type="body") + self.block_type = "body" + return if node.type == "promise_block": - return _State(block_type="promise") + self.block_type = "promise" + return if node.type == "bundle_section": - for child in node.children: - if child.type == "promise_guard": - return _State( - block_type=self.block_type, - promise_type=_text(child)[:-1], # strip trailing ':' - ) - return _State(block_type=self.block_type) + # A bundle_section is always: promise_guard, [promises], [class_guarded_promises...] + # The promise_guard is guaranteed to exist by the grammar + guard = next((c for c in node.children if c.type == "promise_guard"), None) + if guard is None: # Should never happen + print("ERROR: Bundle section without a promise guard") + return + + self.promise_type = _text(guard)[:-1] # strip trailing ':' + return if node.type == "attribute": for child in node.children: if child.type == "attribute_name": - return _State( - block_type=self.block_type, - promise_type=self.promise_type, - attribute_name=_text(child), - ) - return self + self.attribute_name = _text(child) + if self.attribute_name == "namespace": + self.namespace = _text(child.next_named_sibling).strip("\"'") + return + return + + @staticmethod + def qualify(name: str, namespace: str) -> str: + """If name is already qualified (contains ':'), return as-is. Otherwise prepend namespace.""" + return name if ":" in name else f"{namespace}:{name}" def lint_cfbs_json(filename) -> int: @@ -184,7 +211,8 @@ def _node_checks(filename, lines, node, user_definition, strict, state: _State): if node.type == "calling_identifier": if ( strict - and _text(node) in user_definition.get("all_bundle_names", set()) + and state.qualify(_text(node), state.namespace) + in user_definition.get("all_bundle_names", set()) and state.promise_type in user_definition.get("custom_promise_types", set()) ): _highlight_range(node, lines) @@ -193,11 +221,12 @@ def _node_checks(filename, lines, node, user_definition, strict, state: _State): ) return 1 if strict and ( - _text(node) - not in BUILTIN_FUNCTIONS.union( + state.qualify(_text(node), state.namespace) + not in set.union( user_definition.get("all_bundle_names", set()), user_definition.get("all_body_names", set()), ) + and _text(node) not in BUILTIN_FUNCTIONS ): _highlight_range(node, lines) print( @@ -215,11 +244,9 @@ def _stateful_walk( errors = _node_checks(filename, lines, node, user_definition, strict, state) - child_state = state.update(node) + state.update(node) for child in node.children: - errors += _stateful_walk( - filename, lines, child, user_definition, strict, child_state - ) + errors += _stateful_walk(filename, lines, child, user_definition, strict, state) return errors @@ -239,18 +266,55 @@ def _walk(filename, lines, node, user_definition=None, strict=True) -> int: line = node.range.start_point[0] + 1 column = node.range.start_point[1] + 1 - return _stateful_walk(filename, lines, node, user_definition, strict) + state = _State() + ret = _stateful_walk(filename, lines, node, user_definition, strict, state=state) + state = _State() # Clear state + return ret def _parse_user_definition(filename, lines, root_node): - promise_blocks = _find_node_type(filename, lines, root_node, "promise_block_name") - bundle_blocks = _find_node_type(filename, lines, root_node, "bundle_block_name") - body_blocks = _find_node_type(filename, lines, root_node, "body_block_name") + ns = "default" + promise_blocks = set() + bundle_blocks = set() + body_blocks = set() + + for child in root_node.children: + if child.type == "body_block": + name_node = next( + (c for c in child.named_children if c.type == "body_block_name"), + None, + ) + ns_attr = next( + ( + c + for c in _find_node_type(filename, lines, child, "attribute_name") + if _text(c) == "namespace" + ), + None, + ) + if ns_attr is not None: + ns = _text(ns_attr.next_named_sibling).strip("\"'") + elif name_node is not None: + body_blocks.add(_State.qualify(_text(name_node), ns)) + elif child.type == "bundle_block": + name_node = next( + (c for c in child.named_children if c.type == "bundle_block_name"), + None, + ) + if name_node is not None: + bundle_blocks.add(_State.qualify(_text(name_node), ns)) + elif child.type == "promise_block": + name_node = next( + (c for c in child.named_children if c.type == "promise_block_name"), + None, + ) + if name_node is not None: + promise_blocks.add(_text(name_node)) return { - "custom_promise_types": {_text(x) for x in promise_blocks}, - "all_bundle_names": {_text(x) for x in bundle_blocks}, - "all_body_names": {_text(x) for x in body_blocks}, + "custom_promise_types": promise_blocks, + "all_bundle_names": bundle_blocks, + "all_body_names": body_blocks, }