diff --git a/README.md b/README.md index 882dd155e..b1d1f7e52 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ This mod gives SWAG the ability to start containers on-demand when accessed thro - `SWAG_ONDEMAND_STOP_THRESHOLD` - duration of inactivity in seconds before stopping on-demand containers, defaults to `600` (10 minutes). - `SWAG_ONDEMAND_CONTAINER_QUERY_SLEEP` - sleep time in seconds between querying containers, defaults to `5.0`. - `SWAG_ONDEMAND_LOG_READER_SLEEP` - sleep time in seconds between log reads, defaults to `1.0`. + - `SWAG_ONDEMAND_REMOTE1...20` - the remote API of other hosts for ondemand to manage, such as: tcp://otherhost:2375. can add up to 20. ### Loading Page: diff --git a/root/app/ondemand/container_thread.py b/root/app/ondemand/container_thread.py new file mode 100644 index 000000000..7bdd10bf6 --- /dev/null +++ b/root/app/ondemand/container_thread.py @@ -0,0 +1,132 @@ +from data_classes import DockerHost, OnDemandContainer +import helper +from shared_state import last_accessed_urls, last_accessed_urls_lock + +from datetime import datetime +import logging +import os +import threading +import time + +CONTAINER_QUERY_SLEEP = float(os.environ.get("SWAG_ONDEMAND_CONTAINER_QUERY_SLEEP", "5.0")) +STOP_THRESHOLD = int(os.environ.get("SWAG_ONDEMAND_STOP_THRESHOLD", "600")) +REMOTE_HOSTS_PREFIX = "SWAG_ONDEMAND_REMOTE" + + +class ContainerThread(threading.Thread): + def __init__(self): + super().__init__(name="ContainerThread") + self.daemon = True + self.docker_hosts: list[DockerHost] = [] + self.init_docker_hosts() + + def init_docker_hosts(self): + docker_host_url = os.environ.get("DOCKER_HOST", None) + client, url = helper.get_docker_client(docker_host_url, True) + if client: + self.docker_hosts.append(DockerHost(client=client, url=url)) + + remote_hosts_env_vars = { key: value for key, value in os.environ.items() if key.startswith(REMOTE_HOSTS_PREFIX) } + for i in range(1, 21): + if f"{REMOTE_HOSTS_PREFIX}{i}" not in remote_hosts_env_vars: + break + + docker_host_url = remote_hosts_env_vars[f"{REMOTE_HOSTS_PREFIX}{i}"] + client, url = helper.get_docker_client(docker_host_url) + + if client: + self.docker_hosts.append(DockerHost(client=client, url=url)) + + if not self.docker_hosts: + logging.error("Failed to connect to any docker host") + + def process_containers(self): + for docker_host in self.docker_hosts: + if not helper.is_docker_connected(docker_host.client): + if docker_host.is_connected: + logging.warning(f"Lost connection to {docker_host.url}") + docker_host.is_connected = False + continue + + if not docker_host.is_connected: + logging.info(f"Connection to {docker_host.url} has been restored") + docker_host.is_connected = True + + containers = docker_host.client.containers.list(all=True, filters={ "label": ["swag_ondemand=enable"] }) + container_names = {container.name for container in containers} + + for container_name in list(docker_host.ondemand_containers.keys()): + if container_name not in container_names: + docker_host.ondemand_containers.pop(container_name) + logging.info(f"Stopped monitoring {container_name}") + + for container in containers: + default_url = container.labels.get("swag_url", f"{container.name}.").rstrip("*") + container_urls = container.labels.get("swag_ondemand_urls", f"https://{default_url},http://{default_url}") + + if container.name not in docker_host.ondemand_containers: + last_accessed = datetime.now() + logging.info(f"Started monitoring {container.name} for urls: {container_urls}") + else: + existing_container = docker_host.ondemand_containers[container.name] + last_accessed = existing_container.last_accessed + if container_urls != existing_container.urls: + logging.info(f"Updated urls for {container.name} to: {container_urls}") + + docker_host.ondemand_containers[container.name] = OnDemandContainer( + status=container.status, + urls=container_urls, + last_accessed=last_accessed + ) + + def stop_containers(self): + for docker_host in self.docker_hosts: + for container_name, container in docker_host.ondemand_containers.items(): + if container.status != "running": + continue + + inactive_seconds = (datetime.now() - container.last_accessed).total_seconds() + if inactive_seconds < STOP_THRESHOLD: + continue + + if not helper.is_docker_connected(docker_host.client): + logging.warning(f"Failed to stop {container_name}, docker host {docker_host.url} is unavailable") + continue + + docker_host.client.containers.get(container_name).stop() + logging.info(f"Stopped {container_name} after {STOP_THRESHOLD}s of inactivity") + + def start_containers(self): + with last_accessed_urls_lock: + last_accessed_urls_combined = ",".join(last_accessed_urls) + last_accessed_urls.clear() + + for docker_host in self.docker_hosts: + for container_name, container in docker_host.ondemand_containers.items(): + accessed = False + for ondemand_url in container.urls.split(","): + if ondemand_url in last_accessed_urls_combined: + container.last_accessed = datetime.now() + accessed = True + break + + if not accessed or container.status == "running": + continue + + if not helper.is_docker_connected(docker_host.client): + logging.warning(f"Failed to start {container_name}, docker host {docker_host.url} is unavailable") + continue + + docker_host.client.containers.get(container_name).start() + logging.info(f"Started {container_name}") + container.status = "running" + + def run(self): + while True: + try: + self.process_containers() + self.start_containers() + self.stop_containers() + time.sleep(CONTAINER_QUERY_SLEEP) + except Exception as e: + logging.exception(e) diff --git a/root/app/ondemand/data_classes.py b/root/app/ondemand/data_classes.py new file mode 100644 index 000000000..ccc4384b2 --- /dev/null +++ b/root/app/ondemand/data_classes.py @@ -0,0 +1,16 @@ +from dataclasses import dataclass, field +from datetime import datetime +import docker + +@dataclass +class OnDemandContainer: + status: str + urls: str + last_accessed: datetime + +@dataclass +class DockerHost: + client: docker.DockerClient + url: str + is_connected: bool = False + ondemand_containers: dict[str, OnDemandContainer] = field(default_factory=dict) diff --git a/root/app/ondemand/helper.py b/root/app/ondemand/helper.py new file mode 100644 index 000000000..4af9d35c0 --- /dev/null +++ b/root/app/ondemand/helper.py @@ -0,0 +1,24 @@ +import docker +import requests +from typing import Optional + + +def get_docker_client(docker_host_url: str, from_env: bool = False) -> tuple[Optional[docker.DockerClient], str]: + try: + if docker_host_url: + if not docker_host_url.startswith("tcp://"): + docker_host_url = f"tcp://{docker_host_url}:2375" + return docker.DockerClient(base_url=docker_host_url), docker_host_url + elif from_env: + return docker.from_env(), "unix://var/run/docker.sock" + else: + return None, "" + except (docker.errors.DockerException, requests.exceptions.ConnectionError): + return None, "" + +def is_docker_connected(client: docker.DockerClient) -> bool: + try: + return client.ping() + except (docker.errors.DockerException, requests.exceptions.ConnectionError): + return False + \ No newline at end of file diff --git a/root/app/ondemand/log_reader_thread.py b/root/app/ondemand/log_reader_thread.py new file mode 100644 index 000000000..c624ca378 --- /dev/null +++ b/root/app/ondemand/log_reader_thread.py @@ -0,0 +1,51 @@ +from shared_state import last_accessed_urls, last_accessed_urls_lock + +import logging +import os +import threading +import time + +ACCESS_LOG_FILE = "/config/log/nginx/access.log" +LOG_READER_SLEEP = float(os.environ.get("SWAG_ONDEMAND_LOG_READER_SLEEP", "1.0")) + + +class LogReaderThread(threading.Thread): + def __init__(self): + super().__init__(name="LogReaderThread") + self.daemon = True + + def tail(self, f): + f.seek(0,2) + inode = os.fstat(f.fileno()).st_ino + + while True: + line = f.readline() + if not line: + time.sleep(LOG_READER_SLEEP) + if os.stat(ACCESS_LOG_FILE).st_ino != inode: + f.close() + f = open(ACCESS_LOG_FILE, 'r') + inode = os.fstat(f.fileno()).st_ino + continue + yield line + + def run(self): + while True: + try: + if not os.path.exists(ACCESS_LOG_FILE): + time.sleep(1) + continue + + logfile = open(ACCESS_LOG_FILE, "r") + for line in self.tail(logfile): + if '" 302 ' in line: + continue + for part in line.split(): + if not part.startswith("http"): + continue + with last_accessed_urls_lock: + last_accessed_urls.add(part) + break + except Exception as e: + logging.exception(e) + time.sleep(1) diff --git a/root/app/ondemand/main.py b/root/app/ondemand/main.py new file mode 100644 index 000000000..a0278f536 --- /dev/null +++ b/root/app/ondemand/main.py @@ -0,0 +1,24 @@ +from container_thread import ContainerThread +from log_reader_thread import LogReaderThread + +import logging +import os +import time + +LOG_FILE = "/config/log/ondemand/ondemand.log" + + +if __name__ == "__main__": + os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) + logging.basicConfig(filename=LOG_FILE, + filemode='a', + format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=logging.INFO) + logging.info("Starting swag-ondemand...") + + ContainerThread().start() + LogReaderThread().start() + + while True: + time.sleep(1) diff --git a/root/app/ondemand/shared_state.py b/root/app/ondemand/shared_state.py new file mode 100644 index 000000000..d6bc12af2 --- /dev/null +++ b/root/app/ondemand/shared_state.py @@ -0,0 +1,4 @@ +import threading + +last_accessed_urls = set() +last_accessed_urls_lock = threading.Lock() diff --git a/root/app/swag-ondemand.py b/root/app/swag-ondemand.py deleted file mode 100644 index e99b0aa6d..000000000 --- a/root/app/swag-ondemand.py +++ /dev/null @@ -1,151 +0,0 @@ -from datetime import datetime -import docker -import logging -import os -import threading -import time - -ACCESS_LOG_FILE = "/config/log/nginx/access.log" -LOG_FILE = "/config/log/ondemand/ondemand.log" -CONTAINER_QUERY_SLEEP = float(os.environ.get("SWAG_ONDEMAND_CONTAINER_QUERY_SLEEP", "5.0")) -LOG_READER_SLEEP = float(os.environ.get("SWAG_ONDEMAND_LOG_READER_SLEEP", "1.0")) -STOP_THRESHOLD = int(os.environ.get("SWAG_ONDEMAND_STOP_THRESHOLD", "600")) - -last_accessed_urls = set() -last_accessed_urls_lock = threading.Lock() - -class ContainerThread(threading.Thread): - def __init__(self): - super().__init__() - self.daemon = True - self.ondemand_containers = {} - self.init_docker() - - def init_docker(self): - try: - docker_host = os.environ.get("DOCKER_HOST", None) - if docker_host: - if not docker_host.startswith("tcp://"): - docker_host = f"tcp://{docker_host}:2375" - self.docker_client = docker.DockerClient(base_url=docker_host) - else: - self.docker_client = docker.from_env() - except Exception as e: - logging.exception(e) - os._exit(1) - - def process_containers(self): - containers = self.docker_client.containers.list(all=True, filters={ "label": ["swag_ondemand=enable"] }) - container_names = {container.name for container in containers} - - for container_name in list(self.ondemand_containers.keys()): - if container_name in container_names: - continue - self.ondemand_containers.pop(container_name) - logging.info(f"Stopped monitoring {container_name}") - - for container in containers: - default_url = container.labels.get("swag_url", f"{container.name}.").rstrip("*") - container_urls = container.labels.get("swag_ondemand_urls", f"https://{default_url},http://{default_url}") - if container.name not in self.ondemand_containers.keys(): - last_accessed = datetime.now() - logging.info(f"Started monitoring {container.name} for urls: {container_urls}") - else: - last_accessed = self.ondemand_containers[container.name]["last_accessed"] - if container_urls != self.ondemand_containers[container.name]["urls"]: - logging.info(f"Updated urls for {container.name} to: {container_urls}") - self.ondemand_containers[container.name] = { "status": container.status, "urls": container_urls, "last_accessed": last_accessed } - - def stop_containers(self): - for container_name in self.ondemand_containers.keys(): - if self.ondemand_containers[container_name]["status"] != "running": - continue - inactive_seconds = (datetime.now() - self.ondemand_containers[container_name]["last_accessed"]).total_seconds() - if inactive_seconds < STOP_THRESHOLD: - continue - self.docker_client.containers.get(container_name).stop() - logging.info(f"Stopped {container_name} after {STOP_THRESHOLD}s of inactivity") - - def start_containers(self): - with last_accessed_urls_lock: - last_accessed_urls_combined = ",".join(last_accessed_urls) - last_accessed_urls.clear() - - for container_name in self.ondemand_containers.keys(): - accessed = False - for ondemand_url in self.ondemand_containers[container_name]["urls"].split(","): - if ondemand_url not in last_accessed_urls_combined: - continue - self.ondemand_containers[container_name]["last_accessed"] = datetime.now() - accessed = True - if not accessed or self.ondemand_containers[container_name]["status"] == "running": - continue - self.docker_client.containers.get(container_name).start() - logging.info(f"Started {container_name}") - self.ondemand_containers[container_name]["status"] = "running" - - def run(self): - while True: - try: - self.process_containers() - self.start_containers() - self.stop_containers() - time.sleep(CONTAINER_QUERY_SLEEP) - except Exception as e: - logging.exception(e) - -class LogReaderThread(threading.Thread): - def __init__(self): - super().__init__() - self.daemon = True - - def tail(self, f): - f.seek(0,2) - inode = os.fstat(f.fileno()).st_ino - - while True: - line = f.readline() - if not line: - time.sleep(LOG_READER_SLEEP) - if os.stat(ACCESS_LOG_FILE).st_ino != inode: - f.close() - f = open(ACCESS_LOG_FILE, 'r') - inode = os.fstat(f.fileno()).st_ino - continue - yield line - - def run(self): - while True: - try: - if not os.path.exists(ACCESS_LOG_FILE): - time.sleep(1) - continue - - logfile = open(ACCESS_LOG_FILE, "r") - for line in self.tail(logfile): - if '" 302 ' in line: - continue - for part in line.split(): - if not part.startswith("http"): - continue - with last_accessed_urls_lock: - last_accessed_urls.add(part) - break - except Exception as e: - logging.exception(e) - time.sleep(1) - -if __name__ == "__main__": - os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) - logging.basicConfig(filename=LOG_FILE, - filemode='a', - format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s', - datefmt='%Y-%m-%d %H:%M:%S', - level=logging.INFO) - logging.info("Starting swag-ondemand...") - - ContainerThread().start() - LogReaderThread().start() - - while True: - time.sleep(1) diff --git a/root/etc/s6-overlay/s6-rc.d/svc-mod-swag-ondemand/run b/root/etc/s6-overlay/s6-rc.d/svc-mod-swag-ondemand/run index abb7df10b..c93e2bf3b 100755 --- a/root/etc/s6-overlay/s6-rc.d/svc-mod-swag-ondemand/run +++ b/root/etc/s6-overlay/s6-rc.d/svc-mod-swag-ondemand/run @@ -1,6 +1,6 @@ #!/usr/bin/with-contenv bash -result=$(exec s6-setuidgid abc python3 /app/swag-ondemand.py) +result=$(exec s6-setuidgid abc python3 /app/ondemand/main.py) if [ $? -ne 0 ]; then echo "***** Error: swag-ondemand failed to run *****" echo "${result}"