-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdd.py
More file actions
220 lines (182 loc) · 9.56 KB
/
dd.py
File metadata and controls
220 lines (182 loc) · 9.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
from relations import predicate, geometric
from Read_in_Relations import _parse_predicate
from itertools import product
class DeductiveDatabase:
def __init__(self, rules_file="rules.txt"):
self.rules_file = rules_file
self.rules = self._load_rules()
def _load_rules(self):
points = {chr(i): geometric.Point(chr(i), 0, 0) for i in range(65, 91)}
rules = []
with open(self.rules_file, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#') or '?' not in line:
continue
premises_str, conclusion_str = line.split('?')
premises = [_parse_predicate(p, points) for p in premises_str.split(',') if p.strip()]
conclusion = _parse_predicate(conclusion_str.strip(), points)
rules.append((premises, conclusion, line.strip()))
return rules
def get_valid_points(self, premises, assumptions):
"""Check if all the combination of points in the premises matches the combination in the assumptions"""
point_hashmap_default = {}
point_set = []
for prem in premises:
points = []
for p in prem.get_points():
points.append(p)
point_hashmap_default[p.name] = 1
point_set.append(points)
assumptions_point_permutations = []
for assumption in assumptions:
if hasattr(assumption, 'permutations'):
perms = assumption.permutations
assumptions_point_permutations.append(perms)
valid_permutations = []
for perm_combo in product(*assumptions_point_permutations):
point_hashmap = point_hashmap_default.copy()
is_valid = True
for points_assumption, points_premise in zip(perm_combo, point_set):
for p_assump, p_prem in zip(points_assumption, points_premise):
if point_hashmap[p_prem.name] == 1:
point_hashmap[p_prem.name] = p_assump
elif point_hashmap[p_prem.name] != 1 and point_hashmap[p_prem.name] != p_assump:
is_valid = False
break
if not is_valid:
break
if is_valid:
valid_permutations.append(point_hashmap)
return valid_permutations
def check_applicable_predicates(self, assumptions, prev_combinations):
"""Check for applicable rules and return new derivations"""
new_derivations = []
rules_applied = []
new_combinations = []
reasoning_steps = []
for premises, conclusion, rule_str in self.rules:
if premises is None or conclusion is None or any(p is None for p in premises):
continue
candidates = [] # list of candidate matches (each is list of assumption objects)
# For each premise, collect assumptions that have the same predicate class
# Dictionary to cache valid point mappings for assumption combinations
point_mapping_cache = {}
# First get all possible matches for each premise type
match_lists = []
for prem in premises:
matches = [a for a in assumptions if type(a) is type(prem)]
match_lists.append(matches)
# Skip if any premise has no matches
if not all(match_lists):
continue
candidates = []
used_assumptions = set()
def build_candidate_sets(current_set, premise_index, current_point_map):
if premise_index == len(premises):
new_points = [current_point_map[p.name] for p in conclusion.get_points()]
try:
instantiated_conclusion = type(conclusion)(*new_points)
candidates.append((current_set[:], instantiated_conclusion))
except ValueError as e:
# The points generated in the end are not enough
pass
return
for assumption in match_lists[premise_index]:
if assumption in used_assumptions:
continue
current_combo = tuple(current_set + [assumption])
if current_combo in point_mapping_cache:
if point_mapping_cache[current_combo] is None:
continue
point_maps = point_mapping_cache[current_combo]
else:
point_maps = self.get_valid_points(premises[:premise_index + 1],
current_set + [assumption])
if not point_maps:
point_mapping_cache[current_combo] = None
continue
point_mapping_cache[current_combo] = point_maps
used_assumptions.add(assumption)
current_set.append(assumption)
# Explore all valid point mappings
for new_point_map in point_maps:
build_candidate_sets(current_set, premise_index + 1, new_point_map)
current_set.pop()
used_assumptions.remove(assumption)
# print(" ✅ Checking rule:", rule_str)
build_candidate_sets([], 0, {})
# Filter out previously used combinations
candidates = [(combo, concl) for combo, concl in candidates
if combo not in prev_combinations]
new_combinations.extend(combo for combo, _ in candidates)
if candidates:
# print(f" DD applying rule: {rule_str}")
# Check if conclusion is new
for matched_assumptions, conclusion in candidates:
if not any(conclusion == a for a in assumptions + new_derivations):
new_derivations.append(conclusion)
rules_applied.append(rule_str)
reasoning_steps.append({
'predicate': conclusion,
'dependencies': {
'type': 'DD',
'rule': rule_str,
'premises': matched_assumptions,
}
})
print(f" DD: {rule_str}")
print(f" ⟹ {conclusion}")
return new_derivations, rules_applied, new_combinations, reasoning_steps
def check_implied_predicates(self, assumptions):
"""Check for implied predicates based on current assumptions"""
new_implied = []
reasoning_steps = []
for assumption in assumptions:
if hasattr(assumption, 'implied_predicates'):
for implied in assumption.implied_predicates:
if not any(implied == a for a in assumptions + new_implied):
new_implied.append(implied)
reasoning_steps.append({
'predicate': implied,
'dependencies': {
'type': 'DD',
'rule': 'implied_predicate',
'premises': [assumption],
}
})
print(f" Implied: {implied}")
return new_implied, reasoning_steps
def apply_rules(self, problem):
"""Standalone DD reasoning (legacy method)"""
self.reasoning_steps = []
print("Starting DD reasoning...")
print("="*50)
goals = list(problem.goals)
all_facts = list(problem.assumptions)
applied_rules = []
print(f"Assumptions: {len(all_facts)}, Goals: {len(goals)}\n")
iteration = 0
while True:
iteration += 1
print(f"Iteration {iteration}")
new_implied = self.check_implied_predicates(all_facts)
new_derivations, new_rules, _ = self.check_applicable_predicates(all_facts + new_implied)
if not new_derivations:
solved = [g for g in goals if any(g == a for a in all_facts)]
if len(solved) == len(goals):
print(f"\n✓ ALL GOALS SOLVED in {iteration-1} iterations")
return all_facts, applied_rules
else:
print(f"\n✗ Fixed point reached ({len(solved)}/{len(goals)} goals)")
return all_facts, applied_rules
applied_rules.extend(new_rules)
all_facts.extend(new_derivations + new_implied)
# Check for solved goals
newly_solved = [g for g in goals if any(g == a for a in new_derivations)]
if newly_solved:
print(f" 🎯 Goal(s) achieved: {len(newly_solved)}")
if all(any(g == a for a in all_facts) for g in goals):
print(f"\n✓ ALL GOALS SOLVED in {iteration} iterations")
return all_facts, applied_rules
print(f" Added {len(new_derivations)} facts (total: {len(all_facts)})\n")