-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmonitor_disk_space.py
More file actions
299 lines (242 loc) · 9.04 KB
/
monitor_disk_space.py
File metadata and controls
299 lines (242 loc) · 9.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
#!/usr/bin/env python3
r"""
monitor_disk_space.py
---------------------
A cross-platform disk space monitoring script designed for automation (cron, systemd,
Windows Task Scheduler). It checks free disk space on a given filesystem path and sends
a Telegram notification if the percentage of free space falls below a user-defined
threshold.
Features:
- Works on Linux, Windows, and macOS.
- Accepts command-line parameters (no interactive input).
- Sends alerts using an external Telegram bot notifier (send_telegram_alert).
- Logs all events to a single file located next to the script.
- Log file automatically truncates when larger than 10 MB (no rotating backups).
- Includes computer name in the alert message via the COMPUTER_NAME environment variable.
- Requires COMPUTER_NAME to be set; exits with instructions to run
ida-scripts/set_computer_name.py if missing.
Usage:
python3 monitor_disk_space.py --threshold 10
python3 monitor_disk_space.py --threshold 15 --path /var
python monitor_disk_space.py -t 12 -p C:\
Example (Linux cron entry):
0 */1 * * * /usr/bin/python3 /home/user/ida-scripts/monitor_disk_space.py -t 10 -q
Example (Windows Task Scheduler):
Program: C:\Path\To\Python.exe
Arguments: C:\path\monitor_disk_space.py --threshold 10
"""
import argparse
import os
import shutil
import logging
from datetime import datetime
import json
import time
# ------------------------------------------------
# Determine log path (same directory as script)
# ------------------------------------------------
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
LOG_PATH = os.path.join(SCRIPT_DIR, "disk_monitor.log")
TELEGRAM_CREDS_PATH = os.path.join(SCRIPT_DIR, "telegram_credentials.py")
MAX_LOG_SIZE = 10 * 1024 * 1024 # 10 MB
# ------------------------------------------------
# Alert history management
# ------------------------------------------------
ALERT_HISTORY_PATH = os.path.join(SCRIPT_DIR, "alert_history.json")
ALERT_COOLDOWN_HOURS = 24
def load_alert_history():
"""Load alert history from JSON file or return empty structure."""
if not os.path.exists(ALERT_HISTORY_PATH):
return {}
try:
with open(ALERT_HISTORY_PATH, "r") as f:
return json.load(f)
except Exception:
return {}
def save_alert_history(history):
"""Save alert history to JSON file."""
try:
with open(ALERT_HISTORY_PATH, "w") as f:
json.dump(history, f, indent=2)
except Exception:
pass
def should_send_alert(path):
"""Return True if no alerts have been sent for this path within cooldown."""
history = load_alert_history()
timestamps = history.get(path, [])
cutoff = time.time() - (ALERT_COOLDOWN_HOURS * 3600)
# Keep only timestamps within the last 7 days (pruning old history)
timestamps = [ts for ts in timestamps if ts > (time.time() - 7 * 86400)]
history[path] = timestamps
save_alert_history(history)
# Check if any alert was sent recently
for ts in timestamps:
if ts > cutoff:
return False
return True
def record_alert(path):
"""Record current timestamp for a path."""
history = load_alert_history()
timestamps = history.get(path, [])
timestamps.append(time.time())
history[path] = timestamps
save_alert_history(history)
# ------------------------------------------------
# Logging
# ------------------------------------------------
def enforce_log_size_limit():
"""Truncate log file if it exceeds MAX_LOG_SIZE."""
if os.path.exists(LOG_PATH):
try:
size = os.path.getsize(LOG_PATH)
if size > MAX_LOG_SIZE:
with open(LOG_PATH, "w") as f:
f.write(
f"{datetime.now()} | INFO | Log truncated because it exceeded {MAX_LOG_SIZE} bytes.\n"
)
except OSError:
pass
def setup_logging(quiet=False):
"""Configure logging to a file and optionally to console."""
enforce_log_size_limit()
logger = logging.getLogger("disk_monitor")
logger.setLevel(logging.INFO)
# File handler
file_handler = logging.FileHandler(LOG_PATH, mode="a")
formatter = logging.Formatter(
"%(asctime)s | %(levelname)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# Console handler unless quiet
if not quiet:
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
# ------------------------------------------------
# Validation
# ------------------------------------------------
def validate_computer_name(logger, quiet=False):
"""Ensure COMPUTER_NAME is present before continuing."""
computer_name = os.environ.get("COMPUTER_NAME")
if computer_name:
return computer_name
message = (
"COMPUTER_NAME is not set. Run ida-scripts/set_computer_name.py to configure it."
)
logger.error(message)
if not quiet:
print(f"ERROR: {message}")
raise SystemExit(1)
# ------------------------------------------------
# Disk Usage
# ------------------------------------------------
def get_disk_usage_percent(path):
"""Return (free_percent, total, free) for the target path."""
try:
usage = shutil.disk_usage(path)
free_percent = (usage.free / usage.total) * 100
return free_percent, usage.total, usage.free
except Exception:
return None, None, None
# ------------------------------------------------
# Notifications
# ------------------------------------------------
def notify(message, logger, quiet=False):
"""
Send the alert via Telegram when credentials are present, otherwise warn and
print the message to the console.
"""
if os.path.exists(TELEGRAM_CREDS_PATH):
try:
from telegram_notifier import send_telegram_alert
send_telegram_alert(message)
return
except ImportError as exc:
logger.warning(
f"Failed to import telegram notifier even though credentials file exists: {exc}"
)
warning = (
f"Telegram credentials file not found at {TELEGRAM_CREDS_PATH}. "
"Printing alert to console instead."
)
logger.warning(warning)
if not quiet:
print(message)
# ------------------------------------------------
# Main
# ------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Check free disk space and send Telegram alert if below threshold."
)
parser.add_argument(
"--threshold",
"-t",
type=float,
required=True,
help="Alert if free disk space (percent) is below this value.",
)
parser.add_argument(
"--path",
"-p",
default=("/" if os.name != "nt" else "C:\\"),
help="Filesystem path to check. Default: '/' on Linux, 'C:\\\\' on Windows.",
)
parser.add_argument(
"--quiet",
"-q",
action="store_true",
help="Suppress all console output (useful for cron or Task Scheduler).",
)
args = parser.parse_args()
quiet = args.quiet
# Setup logging with quiet mode support
logger = setup_logging(quiet=quiet)
computer_name = validate_computer_name(logger, quiet=quiet)
threshold_percent = args.threshold
target_path = args.path
# Validate threshold
if not (1 <= threshold_percent <= 99):
logger.error("Invalid threshold: must be between 1 and 99.")
if not quiet:
print("ERROR: --threshold must be between 1 and 99.")
return
free_percent, total, free = get_disk_usage_percent(target_path)
if free_percent is None:
logger.error(f"Error reading disk usage for {target_path}")
if not quiet:
print(f"ERROR: Could not read disk usage for {target_path}")
return
if free_percent < threshold_percent:
total_gb = total / (1024**3)
free_gb = free / (1024**3)
message = (
f"⚠️ *Low Disk Space Alert*\n\n"
f"Computer: *{computer_name}*\n"
f"Path: `{target_path}`\n"
f"Free: *{free_gb:.2f} GB* / {total_gb:.2f} GB\n"
f"Free Percentage: *{free_percent:.2f}%*\n"
f"Threshold: {threshold_percent}%"
)
logger.warning(
f"{computer_name}: Low disk space: {free_percent:.2f}% free "
f"(threshold {threshold_percent}%) on {target_path}"
)
# Check cooldown before sending the alert
if should_send_alert(target_path):
notify(message, logger, quiet=quiet)
record_alert(target_path)
else:
logger.info(
f"{computer_name}: Alert suppressed — already sent within last {ALERT_COOLDOWN_HOURS} hours"
)
else:
logger.info(
f"{computer_name}: Disk OK: {free_percent:.2f}% free "
f"(threshold {threshold_percent}%) on {target_path}"
)
if __name__ == "__main__":
main()