-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstart_dev_server.py
More file actions
303 lines (250 loc) · 10.5 KB
/
start_dev_server.py
File metadata and controls
303 lines (250 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
#!/usr/bin/env python3
"""
CoC 7版 AI 守秘人系统 - 开发服务器启动脚本
统一管理所有服务进程,确保调试环境的一致性
"""
import subprocess
import time
import sys
import os
import signal
import atexit
from pathlib import Path
class DevServerManager:
def __init__(self):
self.processes = {}
self.project_root = Path(__file__).parent
self.log_dir = self.project_root / "logs"
self.log_dir.mkdir(exist_ok=True)
# 注册退出时的清理函数
atexit.register(self.cleanup_all)
# 设置信号处理
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def log(self, service, message):
"""记录日志"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log_msg = f"[{timestamp}] [{service}] {message}"
# 控制台输出使用编码处理
try:
print(log_msg)
except UnicodeEncodeError:
# 如果控制台不支持Unicode,使用ASCII安全版本
safe_msg = log_msg.encode('ascii', errors='ignore').decode('ascii')
print(safe_msg)
# 写入日志文件
log_file = self.log_dir / f"{service.lower()}.log"
with open(log_file, "a", encoding="utf-8") as f:
f.write(log_msg + "\n")
def _find_listening_pids_by_port(self, port):
"""Find PIDs listening on a specific port."""
pids = set()
if os.name == "nt":
cmd = f'netstat -ano -p tcp | findstr /R /C:":{port}[ ]"'
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
for line in result.stdout.splitlines():
parts = line.split()
if len(parts) < 5:
continue
local_addr = parts[1]
state = parts[3].upper()
pid = parts[4]
if local_addr.endswith(f':{port}') and state == "LISTENING" and pid.isdigit():
pids.add(int(pid))
else:
# Linux/macOS: prefer lsof, fallback to ss.
lsof_cmd = f"lsof -ti tcp:{port}"
result = subprocess.run(lsof_cmd, shell=True, capture_output=True, text=True)
if result.returncode == 0 and result.stdout.strip():
for line in result.stdout.splitlines():
line = line.strip()
if line.isdigit():
pids.add(int(line))
else:
ss_cmd = f"ss -ltnp '( sport = :{port} )'"
result = subprocess.run(ss_cmd, shell=True, capture_output=True, text=True)
for line in result.stdout.splitlines():
# Example token includes: users:(("python3",pid=12345,fd=3))
if "pid=" in line:
try:
pid_part = line.split("pid=", 1)[1].split(",", 1)[0]
if pid_part.isdigit():
pids.add(int(pid_part))
except Exception:
continue
return pids
def _kill_pid(self, pid):
"""Terminate a process by PID."""
try:
if os.name == "nt":
subprocess.run(f"taskkill /F /PID {pid}", shell=True, check=False, capture_output=True, text=True)
else:
subprocess.run(f"kill -TERM {pid}", shell=True, check=False, capture_output=True, text=True)
self.log("MANAGER", f"已终止进程 PID: {pid}")
except Exception as e:
self.log("MANAGER", f"终止进程失败 PID {pid}: {e}")
def kill_existing_processes(self):
"""Terminate existing dev services before start."""
self.log("MANAGER", "正在清理现有进程...")
# Prefer deterministic cleanup by service ports.
target_ports = [8000, 8001, 3000]
try:
killed = set()
for port in target_ports:
pids = self._find_listening_pids_by_port(port)
if not pids:
self.log("MANAGER", f"端口 {port} 未发现残留进程")
continue
for pid in pids:
if pid in killed:
continue
self.log("MANAGER", f"端口 {port} 被进程占用,准备清理 PID: {pid}")
self._kill_pid(pid)
killed.add(pid)
except Exception as e:
self.log("MANAGER", f"清理进程时出错: {e}")
# Wait for processes to fully terminate.
time.sleep(2)
def start_backend_rag(self):
"""启动RAG API服务 (端口8000)"""
self.log("RAG-API", "启动RAG API服务...")
log_file = self.log_dir / "rag_api.log"
with open(log_file, "w", encoding="utf-8") as f:
process = subprocess.Popen(
[sys.executable, "gm_api_server.py"],
cwd=self.project_root,
stdout=f,
stderr=subprocess.STDOUT
)
self.processes["rag_api"] = process
self.log("RAG-API", f"RAG API服务已启动,PID: {process.pid}")
# 等待服务启动
time.sleep(3)
return process
def start_backend_agent(self):
"""启动CoC Agent API服务 (端口8001)"""
self.log("AGENT-API", "启动CoC Agent API服务...")
log_file = self.log_dir / "agent_api.log"
with open(log_file, "w", encoding="utf-8") as f:
process = subprocess.Popen(
[sys.executable, "coc_apiagent.py"],
cwd=self.project_root,
stdout=f,
stderr=subprocess.STDOUT
)
self.processes["agent_api"] = process
self.log("AGENT-API", f"CoC Agent API服务已启动,PID: {process.pid}")
# 等待服务启动
time.sleep(3)
return process
def start_frontend(self):
"""启动前端开发服务器 (端口3000)"""
self.log("FRONTEND", "启动前端开发服务器...")
frontend_dir = self.project_root / "coc-frontend"
log_file = self.log_dir / "frontend.log"
with open(log_file, "w", encoding="utf-8") as f:
process = subprocess.Popen(
["npm", "run", "dev"],
cwd=frontend_dir,
stdout=f,
stderr=subprocess.STDOUT
)
self.processes["frontend"] = process
self.log("FRONTEND", f"前端开发服务器已启动,PID: {process.pid}")
# 等待服务启动
time.sleep(5)
return process
def check_service_health(self, service_name, url, max_retries=10):
"""检查服务健康状态"""
import requests
for i in range(max_retries):
try:
response = requests.get(url, timeout=2)
if response.status_code == 200:
self.log(service_name, f"✅ 服务健康检查通过")
return True
except:
pass
self.log(service_name, f"⏳ 健康检查 {i+1}/{max_retries}...")
time.sleep(1)
self.log(service_name, f"❌ 服务健康检查失败")
return False
def signal_handler(self, signum, frame):
"""信号处理器"""
self.log("MANAGER", f"收到信号 {signum},正在关闭所有服务...")
self.cleanup_all()
sys.exit(0)
def cleanup_all(self):
"""清理所有进程"""
self.log("MANAGER", "正在清理所有服务进程...")
for service_name, process in self.processes.items():
if process and process.poll() is None:
try:
process.terminate()
self.log(service_name, f"已终止进程 PID: {process.pid}")
# 等待进程优雅退出
time.sleep(1)
if process.poll() is None:
process.kill()
self.log(service_name, f"强制终止进程 PID: {process.pid}")
except Exception as e:
self.log(service_name, f"终止进程时出错: {e}")
self.processes.clear()
self.log("MANAGER", "所有服务进程已清理完毕")
def start_all(self):
"""启动所有服务"""
print("=" * 60)
print("CoC 7版 AI 守秘人系统 - 开发服务器")
print("=" * 60)
# 1. 清理现有进程
self.kill_existing_processes()
# 2. 启动后端服务
self.start_backend_rag()
self.start_backend_agent()
# 3. 启动前端服务
self.start_frontend()
# 4. 健康检查
print("\n🔍 正在进行服务健康检查...")
health_checks = [
("RAG-API", "http://127.0.0.1:8000/health"),
("AGENT-API", "http://127.0.0.1:8001/health"),
]
all_healthy = True
for service_name, url in health_checks:
if not self.check_service_health(service_name, url):
all_healthy = False
# 5. 显示启动结果
print("\n" + "=" * 60)
if all_healthy:
print("🎉 所有服务启动成功!")
print("\n📍 访问地址:")
print(" 前端游戏界面: http://localhost:3000")
print(" RAG API文档: http://127.0.0.1:8000/docs")
print(" Agent API文档: http://127.0.0.1:8001/docs")
print("\n📝 日志文件位置:")
print(f" {self.log_dir}")
print("\n⚠️ 按 Ctrl+C 停止所有服务")
else:
print("❌ 部分服务启动失败,请检查日志文件")
print("=" * 60)
# 6. 保持运行
try:
while True:
# 检查是否有进程异常退出
for service_name, process in list(self.processes.items()):
if process and process.poll() is not None:
self.log(service_name, f"⚠️ 进程意外退出,PID: {process.pid}")
return False
time.sleep(5)
except KeyboardInterrupt:
self.log("MANAGER", "收到中断信号,正在关闭服务...")
return True
if __name__ == "__main__":
manager = DevServerManager()
try:
success = manager.start_all()
sys.exit(0 if success else 1)
except Exception as e:
manager.log("MANAGER", f"启动失败: {e}")
manager.cleanup_all()
sys.exit(1)