198 lines
5.2 KiB
Python
198 lines
5.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Ultra-simple container manager - starts ONLY the requested container
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import time
|
|
import docker
|
|
|
|
CONFIG_PATH = "config.json"
|
|
INACTIVITY_SECONDS = int(os.getenv("INACTIVITY_SECONDS", "3"))
|
|
APP_PORT = 3000
|
|
|
|
client = docker.from_env()
|
|
user_activity = {} # user_id -> last_activity_timestamp
|
|
|
|
|
|
def get_container_ip(container_name):
|
|
"""Get IP of running container"""
|
|
try:
|
|
cnt = client.containers.get(container_name)
|
|
cnt.reload()
|
|
if cnt.status != "running":
|
|
return None
|
|
networks = cnt.attrs["NetworkSettings"]["Networks"]
|
|
for net in networks.values():
|
|
if net.get("IPAddress"):
|
|
return net["IPAddress"]
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
|
|
def start_user_container(user):
|
|
"""Start ONLY this user's container if not running"""
|
|
name = user["container_name"]
|
|
|
|
try:
|
|
# Check if exists
|
|
cnt = client.containers.get(name)
|
|
cnt.reload()
|
|
|
|
if cnt.status == "running":
|
|
print(f"[{user['id']}] Already running")
|
|
return cnt
|
|
|
|
print(f"[{user['id']}] Starting existing container...")
|
|
cnt.start()
|
|
|
|
# Wait for it to be ready
|
|
for _ in range(30):
|
|
time.sleep(0.5)
|
|
if get_container_ip(name):
|
|
print(f"[{user['id']}] Ready!")
|
|
return cnt
|
|
|
|
return cnt
|
|
|
|
except docker.errors.NotFound:
|
|
# Create it
|
|
print(f"[{user['id']}] Creating container...")
|
|
|
|
volumes = {}
|
|
for vol_name, mount in user.get("volumes", {}).items():
|
|
try:
|
|
client.volumes.get(vol_name)
|
|
except:
|
|
client.volumes.create(name=vol_name)
|
|
volumes[vol_name] = {"bind": mount, "mode": "rw"}
|
|
|
|
cnt = client.containers.run(
|
|
"fis",
|
|
name=name,
|
|
detach=True,
|
|
volumes=volumes,
|
|
environment=user.get("environment", {}),
|
|
mem_limit=user.get("mem_limit"),
|
|
memswap_limit=user.get("memswap_limit"),
|
|
privileged=user.get("privileged", False),
|
|
tty=True,
|
|
stdin_open=True,
|
|
restart_policy={"Name": "no"}
|
|
)
|
|
|
|
# Wait for ready
|
|
for _ in range(30):
|
|
time.sleep(0.5)
|
|
if get_container_ip(name):
|
|
print(f"[{user['id']}] Ready!")
|
|
return cnt
|
|
|
|
return cnt
|
|
|
|
|
|
async def proxy_data(reader, writer, backend_ip, backend_port, user_id):
|
|
"""Simple bidirectional proxy"""
|
|
try:
|
|
br, bw = await asyncio.open_connection(backend_ip, backend_port)
|
|
except Exception as e:
|
|
print(f"[{user_id}] Can't connect to backend: {e}")
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
return
|
|
|
|
async def pipe(src, dst):
|
|
try:
|
|
while True:
|
|
data = await src.read(8192)
|
|
if not data:
|
|
break
|
|
dst.write(data)
|
|
await dst.drain()
|
|
user_activity[user_id] = time.time()
|
|
except:
|
|
pass
|
|
|
|
t1 = asyncio.create_task(pipe(reader, bw))
|
|
t2 = asyncio.create_task(pipe(br, writer))
|
|
|
|
await asyncio.wait([t1, t2], return_when=asyncio.FIRST_COMPLETED)
|
|
|
|
for t in [t1, t2]:
|
|
t.cancel()
|
|
|
|
try:
|
|
bw.close()
|
|
writer.close()
|
|
await bw.wait_closed()
|
|
await writer.wait_closed()
|
|
except:
|
|
pass
|
|
|
|
|
|
async def handle_connection(reader, writer, user):
|
|
"""Handle incoming connection for a user"""
|
|
uid = user["id"]
|
|
print(f"[{uid}] New connection")
|
|
|
|
# Start container in background (non-blocking)
|
|
loop = asyncio.get_event_loop()
|
|
cnt = await loop.run_in_executor(None, start_user_container, user)
|
|
|
|
# Get IP
|
|
ip = get_container_ip(user["container_name"])
|
|
if not ip:
|
|
print(f"[{uid}] No IP available")
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
return
|
|
|
|
# Mark activity and proxy
|
|
user_activity[uid] = time.time()
|
|
await proxy_data(reader, writer, ip, APP_PORT, uid)
|
|
|
|
|
|
async def idle_checker():
|
|
"""Stop idle containers"""
|
|
while True:
|
|
await asyncio.sleep(10)
|
|
now = time.time()
|
|
|
|
for uid, last_active in list(user_activity.items()):
|
|
if now - last_active > INACTIVITY_SECONDS:
|
|
print(f"[{uid}] Idle timeout - stopping")
|
|
try:
|
|
cnt = client.containers.get(f"fis-{uid}")
|
|
cnt.stop(timeout=10)
|
|
del user_activity[uid]
|
|
except:
|
|
pass
|
|
|
|
|
|
async def main():
|
|
with open(CONFIG_PATH) as f:
|
|
config = json.load(f)
|
|
|
|
print("Starting FIS manager...")
|
|
print(f"Inactivity timeout: {INACTIVITY_SECONDS}s")
|
|
|
|
servers = []
|
|
for user in config["users"]:
|
|
port = user["port"]
|
|
|
|
async def handler(r, w, u=user):
|
|
await handle_connection(r, w, u)
|
|
|
|
server = await asyncio.start_server(handler, "0.0.0.0", port)
|
|
print(f"✓ Port {port} ready for {user['id']}")
|
|
servers.append(server)
|
|
|
|
asyncio.create_task(idle_checker())
|
|
|
|
await asyncio.gather(*(s.serve_forever() for s in servers))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |