-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
275 lines (240 loc) · 9.95 KB
/
app.py
File metadata and controls
275 lines (240 loc) · 9.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
"""Automercatorum Video Export — pywebview entry point.
Native window that downloads MP4 lecture videos from Universitas Mercatorum.
Supports pause / resume / stop and per-video selection (advanced mode).
"""
from __future__ import annotations
import json
import logging
import subprocess
import sys
import threading
from pathlib import Path
import webview
from mercatorum.api import AuthError, MercatorumAPI, Video
from mercatorum.creds_store import CredentialsStore
from mercatorum.downloader import (
DownloadController,
download_course_videos,
estimate_total_size,
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("app")
ROOT = Path(__file__).resolve().parent
AUTH_DIR = ROOT / ".auth"
DOWNLOADS = ROOT / "downloads"
UI_INDEX = ROOT / "ui" / "index.html"
class JsApi:
"""Bridge exposed to JS via window.pywebview.api.<method>.
pywebview converts Python snake_case → camelCase on the JS side; method
names below are already camelCase so they cross unchanged.
"""
def __init__(self) -> None:
self.store = CredentialsStore(AUTH_DIR)
self.api: MercatorumAPI | None = None
self.username: str | None = None
self.window: webview.Window | None = None
self.controller = DownloadController()
self._video_cache: dict[str, list[Video]] = {}
# ---------------------------------------------------------------- auth
def autoLogin(self) -> dict:
if not self.store.exists():
return {"firstRun": True}
try:
username, password = self.store.load()
api = MercatorumAPI()
api.login(username, password)
self.api = api
self.username = username
return {
"firstRun": False, "ok": True, "username": username,
"courses": [self._course_to_dict(c) for c in api.list_courses()],
}
except AuthError as e:
return {"firstRun": False, "ok": False, "error": f"Login fallito: {e}"}
except Exception as e:
log.exception("autoLogin failed")
return {"firstRun": False, "ok": False, "error": str(e)}
def login(self, username: str, password: str, remember: bool) -> dict:
try:
api = MercatorumAPI()
api.login(username, password)
if remember:
self.store.save(username, password)
self.api = api
self.username = username
return {
"ok": True, "username": username,
"courses": [self._course_to_dict(c) for c in api.list_courses()],
}
except AuthError as e:
return {"ok": False, "error": f"Login fallito: {e}"}
except Exception as e:
log.exception("login failed")
return {"ok": False, "error": str(e)}
def forgetAccount(self) -> dict:
self.store.reset()
self.api = None
self.username = None
self._video_cache.clear()
return {"ok": True}
def logout(self) -> dict:
self.api = None
self.username = None
self._video_cache.clear()
return {"ok": True}
# ---------------------------------------------------- advanced mode
def getCourseVideos(self, course_code: str) -> dict:
"""List every video of a course (for the advanced-mode tree view)."""
if not self.api:
return {"ok": False, "error": "Non autenticato"}
if course_code in self._video_cache:
videos = self._video_cache[course_code]
else:
try:
videos = self.api.get_course_videos(course_code)
self._video_cache[course_code] = videos
except Exception as e:
log.exception("getCourseVideos failed for %s", course_code)
return {"ok": False, "error": str(e)}
return {
"ok": True,
"videos": [
{
"url": v.url,
"module_number": v.module_number,
"module_title": v.module_title,
"paragraph_number": v.paragraph_number,
"paragraph_title": v.paragraph_title,
"duration_s": v.duration_s,
}
for v in videos
],
}
# ----------------------------------------------------------- controls
def pause(self) -> dict:
self.controller.pause()
self._emit({"kind": "paused"})
return {"ok": True}
def resume(self) -> dict:
self.controller.resume()
self._emit({"kind": "resumed"})
return {"ok": True}
def stop(self) -> dict:
self.controller.stop()
return {"ok": True}
# ----------------------------------------------------------- download
def downloadSelection(self, selections: list[dict]) -> dict:
"""Start a download for an explicit selection.
`selections` is a list of `{course_code, video_urls?}`. If
`video_urls` is omitted or empty, download every video of that course.
"""
if not self.api:
return {"ok": False, "error": "Non autenticato."}
self.controller.reset()
threading.Thread(
target=self._download_worker, args=(list(selections),), daemon=True
).start()
return {"ok": True}
def _download_worker(self, selections: list[dict]) -> None:
assert self.api is not None
course_map = {c.code: c for c in self.api.list_courses()}
# 1) Resolve each selection to a concrete list of Videos.
targets: list[tuple[str, str, list[Video]]] = []
for sel in selections:
if self.controller.is_stopped:
break
code = sel.get("course_code")
course = course_map.get(code)
if not course:
self._emit({"kind": "course_error", "course_code": code,
"message": f"Corso {code} non trovato"})
continue
log.info("fetching video list for %s (%s)", code, course.name)
try:
videos = self._video_cache.get(code) or self.api.get_course_videos(code)
self._video_cache[code] = videos
except Exception as e:
log.exception("fetch failed for %s", code)
self._emit({"kind": "course_error", "course_code": code,
"message": f"Fetch fallito: {e}"})
continue
wanted = sel.get("video_urls") or []
if wanted:
wanted_set = set(wanted)
videos = [v for v in videos if v.url in wanted_set]
targets.append((code, course.name, videos))
self._emit({"kind": "course_ready", "course_code": code,
"course_name": course.name, "total": len(videos)})
# 2) Pre-flight size estimate (parallel HEADs).
if not self.controller.is_stopped:
log.info("estimating total size…")
all_videos = [v for _, _, vs in targets for v in vs]
total_bytes = estimate_total_size(all_videos)
log.info("estimated total: %d MB", total_bytes // (1024 * 1024))
self._emit({"kind": "size_estimate", "total_bytes": total_bytes,
"video_count": len(all_videos)})
# 3) Download each course.
for code, course_name, videos in targets:
if self.controller.is_stopped:
break
self._emit({"kind": "course_start", "course_code": code,
"course_name": course_name, "total": len(videos)})
def cb(evt: dict, _code=code) -> None:
self._emit({
"kind": "file", "course_code": _code,
"index": evt["index"], "total": evt["total"],
"file": evt["file"], "message": evt["message"],
"status": evt["status"],
"downloaded_bytes": evt.get("downloaded_bytes", 0),
"total_bytes": evt.get("total_bytes", 0),
})
try:
summary = download_course_videos(
course_name, videos, DOWNLOADS,
progress=cb, controller=self.controller,
)
self._emit({"kind": "course_done", "course_code": code, **summary})
except Exception as e:
log.exception("download failed for %s", code)
self._emit({"kind": "course_error", "course_code": code, "message": str(e)})
self._emit({
"kind": "all_done",
"stopped": self.controller.is_stopped,
})
def _emit(self, evt: dict) -> None:
if not self.window:
return
payload = json.dumps(evt)
self.window.evaluate_js(f"window.notifyProgress({json.dumps(payload)})")
def openDownloadsFolder(self) -> dict:
DOWNLOADS.mkdir(parents=True, exist_ok=True)
if sys.platform == "darwin":
subprocess.run(["open", str(DOWNLOADS)], check=False)
elif sys.platform.startswith("linux"):
subprocess.run(["xdg-open", str(DOWNLOADS)], check=False)
elif sys.platform == "win32":
subprocess.run(["explorer", str(DOWNLOADS)], check=False)
return {"ok": True}
# ----------------------------------------------------------- helpers
def _course_to_dict(self, c) -> dict:
return {"code": c.code, "name": c.name, "progress": c.progress}
def main() -> int:
log.info("Automercatorum Video Export")
log.info("auth dir: %s", AUTH_DIR)
log.info("downloads dir: %s", DOWNLOADS)
js_api = JsApi()
window = webview.create_window(
"Automercatorum Video Export",
str(UI_INDEX),
js_api=js_api,
width=960, height=720, min_size=(700, 500),
)
js_api.window = window
webview.start()
return 0
if __name__ == "__main__":
sys.exit(main())