Files
vulnerability_bot/nvd_telegram_notifier.py
2026-01-09 13:50:10 +01:00

827 lines
29 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
nvd_telegram_notifier_backfill.py - IMPROVED VERSION
Features:
- Default severity=medium filter for new subscribers
- Improved welcome message with filter examples
- Backfill supports HOURS: /backfill 1h = last 1 hour, /backfill 3h = last 3 hours
- Backfill limited to 15 CVEs max (prevents spam)
- New subscribers only get FUTURE CVEs
- /reset to skip old CVEs
Environment variables:
- TELEGRAM_BOT_TOKEN (required)
- NVD_API_KEY (optional)
- POLL_MINUTES (default 15)
- STATE_PATH (default ./state.json)
- SUBSCRIBERS_PATH (default ./subscribers.json)
- TG_POLL_SECONDS (default 15)
- LOG_STDOUT (0 or 1)
"""
import os
import json
import time
import requests
from datetime import datetime, timedelta, timezone
from dateutil import parser as dateparser
from html import escape
# ---------------- config ----------------
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN")
NVD_API_KEY = os.environ.get("NVD_API_KEY")
POLL_MINUTES = int(os.environ.get("POLL_MINUTES", "15"))
STATE_PATH = os.environ.get("STATE_PATH", "./state.json")
SUBSCRIBERS_PATH = os.environ.get("SUBSCRIBERS_PATH", "./subscribers.json")
TG_POLL_SECONDS = int(os.environ.get("TG_POLL_SECONDS", "15"))
LOG_STDOUT = os.environ.get("LOG_STDOUT", "0") == "1"
# Quiet hours configuration (24-hour format)
QUIET_HOURS_START = int(os.environ.get("QUIET_HOURS_START", "22")) # 10 PM
QUIET_HOURS_END = int(os.environ.get("QUIET_HOURS_END", "8")) # 8 AM
QUIET_HOURS_ENABLED = os.environ.get("QUIET_HOURS_ENABLED", "1") == "1"
NVD_BASE = "https://services.nvd.nist.gov/rest/json/cves/2.0/"
PER_PAGE = 2000
MAX_MSG_LEN = 3800
TELEGRAM_API = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}" if TELEGRAM_BOT_TOKEN else None
# ---------------- utilities ----------------
def now_iso():
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")
def is_quiet_hours():
"""Check if current time is within quiet hours"""
if not QUIET_HOURS_ENABLED:
return False
now = datetime.now(timezone.utc)
current_hour = now.hour
# Handle case where quiet hours span midnight
if QUIET_HOURS_START > QUIET_HOURS_END:
# e.g., 22:00 to 08:00
return current_hour >= QUIET_HOURS_START or current_hour < QUIET_HOURS_END
else:
# e.g., 01:00 to 06:00
return QUIET_HOURS_START <= current_hour < QUIET_HOURS_END
def safe_write_json(path, data):
try:
d = os.path.dirname(path)
if d:
os.makedirs(d, exist_ok=True)
with open(path, "w") as f:
json.dump(data, f, indent=2)
except Exception as e:
print("Failed to write json:", path, e)
def safe_load_json(path, default):
try:
if os.path.exists(path):
with open(path, "r") as f:
return json.load(f)
except Exception:
pass
safe_write_json(path, default)
return default
# ---------------- state ----------------
def load_state():
default = {"last_pub": (datetime.now(timezone.utc) - timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%S.000Z")}
return safe_load_json(STATE_PATH, default)
def save_state(state):
safe_write_json(STATE_PATH, state)
# ---------------- subscribers ----------------
def load_subscribers():
default = {"offset": 0, "users": {}}
return safe_load_json(SUBSCRIBERS_PATH, default)
def save_subscribers(data):
safe_write_json(SUBSCRIBERS_PATH, data)
def parse_start_payload(payload_text):
prefs = {}
if not payload_text:
# Default to medium severity and comprehensive keywords if no preferences provided
prefs["severity"] = "high"
prefs["keywords"] = [
"linux", "kernel", "ubuntu", "debian", "rhel",
"postgresql", "postgres", "mysql", "mariadb", "redis", "mongodb", "elasticsearch",
"nginx", "apache", "haproxy",
"tomcat", "java", "jdk", "openjdk", "spring", "hibernate", "log4j", "struts",
"docker", "kubernetes", "k8s", "helm",
"aws", "ec2", "rds", "eks", "lambda", "s3", "iam",
"angular", "node", "nodejs",
"ibm", "iseries", "db2", "websphere",
"openssl", "ssh", "openssh", "sudo",
"bind", "dns", "openvpn", "vpn", "iptables", "firewalld",
"jenkins", "gitlab", "terraform", "ansible",
"prometheus", "grafana", "vault", "consul"
]
return prefs
tokens = [t.strip() for t in payload_text.split(",") if t.strip()]
keywords = []
severity_set = False
for t in tokens:
if t.lower().startswith("severity="):
val = t.split("=", 1)[1].strip().lower()
if val in ("low", "medium", "high", "critical"):
prefs["severity"] = val
severity_set = True
else:
keywords.append(t.lower())
# If no severity specified, default to high
if not severity_set:
prefs["severity"] = "high"
if keywords:
prefs["keywords"] = keywords
return prefs
def extract_user_info_from_msg(msg):
user = msg.get("from", {}) or {}
chat = msg.get("chat", {}) or {}
now = datetime.now(timezone.utc).isoformat()
info = {
"chat_id": chat.get("id"),
"is_bot": user.get("is_bot"),
"first_name": user.get("first_name"),
"last_name": user.get("last_name"),
"username": user.get("username"),
"language_code": user.get("language_code"),
"subscribed_at": now,
"last_seen": now,
"start_payload": None,
"preferences": {},
"last_sent": None
}
return info
def build_welcome_message(prefs):
"""Build a nice welcome message showing active filters"""
msg = "✅ <b>Subscribed to CVE notifications!</b>\n\n"
msg += "📬 You will receive NEW CVEs as they're published.\n\n"
msg += "🔧 <b>Your Current Filters:</b>\n"
severity = prefs.get("severity", "medium")
msg += f"• Severity: <code>{severity.upper()}</code> and above\n"
keywords = prefs.get("keywords", [])
if keywords:
msg += f"• Keywords: <code>{', '.join(keywords)}</code>\n"
else:
msg += "• Keywords: <i>none (all CVEs)</i>\n"
msg += "\n━━━━━━━━━━━━━━━━━━━━━\n\n"
msg += "📋 <b>Available Commands:</b>\n\n"
msg += "<b>Backfill:</b>\n"
msg += "• <code>/backfill 1h</code> - Last 1 hour\n"
msg += "• <code>/backfill 6h</code> - Last 6 hours\n"
msg += "• <code>/backfill 1d</code> - Last 1 day (max 15)\n\n"
msg += "<b>Management:</b>\n"
msg += "• <code>/status</code> - Check your settings\n"
msg += "• <code>/reset</code> - Reset timeline\n"
msg += "• <code>/stop</code> - Unsubscribe\n\n"
msg += "━━━━━━━━━━━━━━━━━━━━━\n\n"
msg += "🎯 <b>Filter Examples:</b>\n\n"
msg += "<b>By severity only:</b>\n"
msg += "• <code>/start severity=high</code>\n"
msg += "• <code>/start severity=critical</code>\n\n"
msg += "<b>By keywords only:</b>\n"
msg += "• <code>/start linux,nginx</code>\n"
msg += "• <code>/start apache,tomcat</code>\n\n"
msg += "<b>Combined filters:</b>\n"
msg += "• <code>/start severity=high,postgresql</code>\n"
msg += "• <code>/start severity=critical,linux,kernel</code>\n\n"
msg += "💡 <i>Tip: Without filters, default is severity=medium</i>"
return msg
def add_or_update_user_from_msg(msg, payload=None):
info = extract_user_info_from_msg(msg)
prefs = parse_start_payload(payload)
info["start_payload"] = payload
info["preferences"] = prefs
subs = load_subscribers()
users = subs.get("users", {})
cid = str(info["chat_id"])
if cid not in users:
# New subscribers get last_sent = NOW
info["last_sent"] = now_iso()
users[cid] = info
subs["users"] = users
save_subscribers(subs)
print(f"Added new subscriber: {cid} with last_sent={info['last_sent']}, filters={prefs}")
# Send welcome message
send_telegram_to_chat(info["chat_id"], build_welcome_message(prefs))
# Auto-backfill last 12 hours for new subscribers
send_telegram_to_chat(
info["chat_id"],
"🔍 <b>Loading recent CVEs...</b>\n\n"
"Fetching CVEs from the last 12 hours to get you started.\n"
"(This is a one-time backfill)"
)
sent = backfill_to_user(info["chat_id"],
since_iso=(datetime.now(timezone.utc) - timedelta(hours=12)).strftime("%Y-%m-%dT%H:%M:%S.000Z"),
max_items=15)
send_telegram_to_chat(info["chat_id"], f"✅ Backfill complete: {sent} CVE(s) sent.\n\nYou'll now receive new CVEs as they're published!")
else:
users[cid]["last_seen"] = datetime.now(timezone.utc).isoformat()
if payload:
users[cid]["start_payload"] = payload
users[cid]["preferences"] = prefs
subs["users"] = users
save_subscribers(subs)
print("Updated subscriber:", cid, "filters=", prefs)
# Show updated filters
keywords_str = ', '.join(prefs.get('keywords', []))[:100] # Truncate if too long
if len(keywords_str) >= 100:
keywords_str += "..."
send_telegram_to_chat(
info["chat_id"],
f"✅ <b>Welcome back!</b>\n\n"
f"Your filters have been updated:\n"
f"• Severity: <code>{prefs.get('severity', 'high').upper()}</code>\n"
f"• Keywords: <code>{keywords_str or 'none'}</code>"
)
def remove_user_by_chat(chat_id):
subs = load_subscribers()
users = subs.get("users", {})
cid = str(chat_id)
if cid in users:
users.pop(cid)
subs["users"] = users
save_subscribers(subs)
print("Removed subscriber:", cid)
send_telegram_to_chat(chat_id, "✅ Unsubscribed from CVE notifications.")
return True
else:
send_telegram_to_chat(chat_id, " You were not subscribed.")
return False
def reset_user_timeline(chat_id):
subs = load_subscribers()
users = subs.get("users", {})
cid = str(chat_id)
if cid not in users:
send_telegram_to_chat(chat_id, "❌ Not subscribed. Use /start first.")
return
users[cid]["last_sent"] = now_iso()
users[cid]["last_seen"] = datetime.now(timezone.utc).isoformat()
subs["users"] = users
save_subscribers(subs)
send_telegram_to_chat(
chat_id,
f"✅ <b>Timeline reset!</b>\n\n"
f"You'll only receive CVEs published after:\n"
f"<code>{users[cid]['last_sent']}</code>"
)
def get_user_status(chat_id):
subs = load_subscribers()
users = subs.get("users", {})
cid = str(chat_id)
if cid not in users:
send_telegram_to_chat(chat_id, "❌ Not subscribed. Use /start to subscribe.")
return
user = users[cid]
status_msg = f"📊 <b>Subscription Status</b>\n\n"
status_msg += f"✅ Subscribed: <code>{user.get('subscribed_at', 'Unknown')[:19]}</code>\n"
status_msg += f"📅 Tracking CVEs after: <code>{user.get('last_sent', 'Unknown')[:19]}</code>\n\n"
prefs = user.get("preferences", {})
status_msg += f"🔧 <b>Active Filters:</b>\n"
severity = prefs.get("severity", "medium")
status_msg += f"• Severity: <code>{severity.upper()}</code> and above\n"
keywords = prefs.get("keywords", [])
if keywords:
status_msg += f"• Keywords: <code>{', '.join(keywords)}</code>\n"
else:
status_msg += "• Keywords: <i>none (all CVEs)</i>\n"
status_msg += "\n💡 <i>Use /start with new filters to update</i>"
send_telegram_to_chat(chat_id, status_msg)
def show_help(chat_id):
subs = load_subscribers()
users = subs.get("users", {})
cid = str(chat_id)
if cid not in users:
send_telegram_to_chat(chat_id, "❌ Not subscribed. Use /start to subscribe first.")
return
user = users[cid]
prefs = user.get("preferences", {})
help_msg = "📬 <b>You will receive NEW CVEs as they're published.</b>\n\n"
help_msg += "🔧 <b>Your Current Filters:</b>\n"
severity = prefs.get("severity", "medium")
help_msg += f"• Severity: <code>{severity.upper()}</code> and above\n"
keywords = prefs.get("keywords", [])
if keywords:
help_msg += f"• Keywords: <code>{', '.join(keywords)}</code>\n"
else:
help_msg += "• Keywords: <i>none (all CVEs)</i>\n"
help_msg += "\n━━━━━━━━━━━━━━━━━━━━━\n\n"
help_msg += "📋 <b>Available Commands:</b>\n\n"
help_msg += "<b>Backfill:</b>\n"
help_msg += "• <code>/backfill 1h</code> - Last 1 hour\n"
help_msg += "• <code>/backfill 6h</code> - Last 6 hours\n"
help_msg += "• <code>/backfill 1d</code> - Last 1 day (max 15)\n\n"
help_msg += "<b>Management:</b>\n"
help_msg += "• <code>/status</code> - Check your settings\n"
help_msg += "• <code>/reset</code> - Reset timeline\n"
help_msg += "• <code>/help</code> - Show this message\n"
help_msg += "• <code>/stop</code> - Unsubscribe\n\n"
help_msg += "━━━━━━━━━━━━━━━━━━━━━\n\n"
help_msg += "🎯 <b>Filter Examples:</b>\n\n"
help_msg += "<b>By severity only:</b>\n"
help_msg += "• <code>/start severity=high</code>\n"
help_msg += "• <code>/start severity=critical</code>\n\n"
help_msg += "<b>By keywords only:</b>\n"
help_msg += "• <code>/start linux,nginx</code>\n"
help_msg += "• <code>/start apache,tomcat</code>\n\n"
help_msg += "<b>Combined filters:</b>\n"
help_msg += "• <code>/start severity=high,postgresql</code>\n"
help_msg += "• <code>/start severity=critical,linux,kernel</code>\n\n"
help_msg += "💡 <i>Tip: Without filters, default is severity=medium</i>"
send_telegram_to_chat(chat_id, help_msg)
def parse_backfill_timespec(timespec):
"""Parse time specs like '1h', '6h', '1d' into hours"""
timespec = timespec.strip().lower()
# Default to 1 hour if just a number
if timespec.isdigit():
return int(timespec)
# Parse format like "1h", "6h", "1d"
import re
match = re.match(r'^(\d+)([hd])$', timespec)
if not match:
return 1 # default to 1 hour
num = int(match.group(1))
unit = match.group(2)
if unit == 'h':
return num
elif unit == 'd':
return num * 24
return 1
# ---------------- Telegram polling ----------------
def poll_telegram_updates_once():
if not TELEGRAM_API:
return
subs = load_subscribers()
offset = subs.get("offset", 0) or 0
url = f"{TELEGRAM_API}/getUpdates"
params = {"timeout": 0, "offset": offset, "limit": 100}
try:
r = requests.get(url, params=params, timeout=20)
r.raise_for_status()
res = r.json()
if not res.get("ok"):
print("getUpdates not ok:", res)
return
updates = res.get("result", []) or []
max_uid = offset
for u in updates:
uid = u.get("update_id")
if uid is not None and uid >= max_uid:
max_uid = uid + 1
msg = u.get("message") or u.get("edited_message") or u.get("channel_post") or {}
if not msg:
continue
text = (msg.get("text") or "").strip()
chat = msg.get("chat", {}) or {}
chat_id = chat.get("id")
if text.startswith("/start"):
parts = text.split(" ", 1)
payload = parts[1] if len(parts) > 1 else None
add_or_update_user_from_msg(msg, payload)
elif text.startswith("/stop"):
if chat_id:
remove_user_by_chat(chat_id)
elif text.startswith("/reset"):
if chat_id:
reset_user_timeline(chat_id)
elif text.startswith("/status"):
if chat_id:
get_user_status(chat_id)
elif text.startswith("/help"):
if chat_id:
show_help(chat_id)
elif text.startswith("/backfill"):
parts = text.split(" ", 1)
timespec = "1h" # default to 1 hour
if len(parts) > 1:
timespec = parts[1].strip()
hours = parse_backfill_timespec(timespec)
hours = min(hours, 72) # cap at 3 days
if chat_id:
send_telegram_to_chat(
chat_id,
f"🔍 Searching for CVEs from last {hours} hour(s)...\n"
f"(Max 15 CVEs, won't affect your regular notifications)"
)
since = (datetime.now(timezone.utc) - timedelta(hours=hours)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
sent = backfill_to_user(chat_id, since_iso=since, max_items=15)
send_telegram_to_chat(chat_id, f"✅ Backfill complete: {sent} CVE(s) sent.")
else:
# Update last_seen
cid = str(chat_id)
if cid:
s = load_subscribers()
users = s.get("users", {})
if cid in users:
users[cid]["last_seen"] = datetime.now(timezone.utc).isoformat()
s["users"] = users
save_subscribers(s)
subs2 = load_subscribers()
subs2["offset"] = max_uid
save_subscribers(subs2)
except Exception as e:
print("Failed to poll Telegram updates:", e)
# ---------------- Telegram send ----------------
def send_telegram_to_chat(chat_identifier, text):
if not TELEGRAM_API:
print("No TELEGRAM_BOT_TOKEN configured")
return False
payload = {"chat_id": chat_identifier, "text": text, "disable_web_page_preview": True, "parse_mode": "HTML"}
try:
r = requests.post(f"{TELEGRAM_API}/sendMessage", data=payload, timeout=20)
if r.status_code != 200:
try:
j = r.json()
desc = j.get("description", "")
if r.status_code in (403, 400) and ("blocked" in desc.lower() or "not found" in desc.lower()):
cid = int(chat_identifier)
remove_user_by_chat(cid)
except Exception:
pass
r.raise_for_status()
return True
except Exception as e:
print("Failed to send telegram:", e)
return False
# ---------------- NVD helpers ----------------
def nvd_request(params):
headers = {"Accept": "application/json"}
if NVD_API_KEY:
headers["apiKey"] = NVD_API_KEY
r = requests.get(NVD_BASE, params=params, headers=headers, timeout=60)
r.raise_for_status()
return r.json()
def extract_cve_info(v):
cve = v.get("cve") or {}
cve_id = cve.get("id") or cve.get("CVE_data_meta", {}).get("ID") or v.get("cveId")
published = cve.get("published") or v.get("published") or v.get("publishedDate")
desc = ""
for d in (cve.get("descriptions") or []):
if d.get("value"):
desc = d["value"]
break
if not desc:
desc = cve.get("description") or ""
refs = []
for r in (cve.get("references") or []):
url = r.get("url") or r.get("reference")
if url:
refs.append(url)
cvss_scores = []
metrics = cve.get("metrics") or {}
for key in metrics:
for item in metrics.get(key) or []:
cvss = item.get("cvssData") or item.get("cvssMetric")
if cvss:
score = cvss.get("baseScore") or cvss.get("score")
try:
if score is not None:
cvss_scores.append(float(score))
except Exception:
pass
cvss_strs = [("{:.1f}".format(s)) for s in cvss_scores]
return {"id": cve_id, "published": published, "description": desc, "cvss": cvss_scores, "cvss_str": cvss_strs, "references": refs}
def query_new_cves(last_pub, now_iso):
params = {"pubStartDate": last_pub, "pubEndDate": now_iso, "resultsPerPage": PER_PAGE, "startIndex": 0}
all_cves = []
while True:
data = nvd_request(params)
total = data.get("totalResults") or 0
vulns = data.get("vulnerabilities") or []
for v in vulns:
info = extract_cve_info(v)
if info.get("id"):
all_cves.append(info)
start = params.get("startIndex", 0)
per = params.get("resultsPerPage", PER_PAGE)
if not total or start + per >= int(total):
break
params["startIndex"] = start + per
time.sleep(0.5)
try:
all_cves.sort(key=lambda c: dateparser.parse(c.get("published") or now_iso))
except Exception:
pass
return all_cves
# ---------------- matching & formatting ----------------
def cvss_to_severity(cvss_score):
if cvss_score is None:
return None
try:
s = float(cvss_score)
except Exception:
return None
if s >= 9.0:
return "critical"
if s >= 7.0:
return "high"
if s >= 4.0:
return "medium"
return "low"
def matches_user_preferences(cve, user):
prefs = user.get("preferences", {}) or {}
sev_pref = prefs.get("severity")
if sev_pref:
scores = cve.get("cvss") or []
if not scores:
return False
meets = False
for s in scores:
severity = cvss_to_severity(s)
if severity:
order = {"low": 1, "medium": 2, "high": 3, "critical": 4}
if order.get(severity, 0) >= order.get(sev_pref, 0):
meets = True
break
if not meets:
return False
keywords = prefs.get("keywords") or []
if keywords:
text = (cve.get("description", "") + " " + (cve.get("id") or "")).lower()
if not any(kw.lower() in text for kw in keywords):
return False
return True
def extract_subject(description):
if not description:
return "New vulnerability"
parts = description.split(".")
return parts[0].strip() + "."
def build_cve_message(cve):
cve_id = cve.get("id") or "UNKNOWN"
published = cve.get("published") or "N/A"
subject = escape(extract_subject(cve.get("description", "")))
desc = escape((cve.get("description", "") or "")[:1200])
# Determine severity from CVSS scores
severity_label = "UNKNOWN"
severity_emoji = ""
cvss_scores = cve.get("cvss") or []
if cvss_scores:
max_score = max(cvss_scores)
severity = cvss_to_severity(max_score)
if severity == "critical":
severity_label = "CRITICAL"
severity_emoji = "🔴"
elif severity == "high":
severity_label = "HIGH"
severity_emoji = "🟠"
elif severity == "medium":
severity_label = "MEDIUM"
severity_emoji = "🟡"
elif severity == "low":
severity_label = "LOW"
severity_emoji = "🟢"
parts = [
f"🚨 <b>{escape(cve_id)}</b>",
f"🗓 <b>Published:</b> {escape(published)}",
f"{severity_emoji} <b>Severity:</b> {severity_label}",
f"📌 <b>Summary:</b> {subject}"
]
if cve.get("cvss_str"):
parts.append("🔥 <b>CVSS Score:</b> " + ", ".join(escape(s) for s in cve.get("cvss_str")))
parts.append("")
parts.append(desc)
if cve.get("references"):
parts.append(f'<a href="{escape(cve["references"][0], quote=True)}">🔗 Reference</a>')
msg = "\n".join(parts)
if len(msg) > MAX_MSG_LEN:
msg = msg[:MAX_MSG_LEN].rsplit("\n", 1)[0] + "\n"
return msg
# ---------------- backfill ----------------
def backfill_to_user(chat_id, since_iso, max_items=15):
"""Backfill - doesn't update last_sent"""
until_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")
params = {"pubStartDate": since_iso, "pubEndDate": until_iso, "resultsPerPage": 2000, "startIndex": 0}
sent = 0
subs = load_subscribers()
users = subs.get("users", {})
cid = str(chat_id)
user = users.get(cid, {})
try:
while True:
data = nvd_request(params)
vulns = data.get("vulnerabilities") or []
for v in vulns:
info = extract_cve_info(v)
if not matches_user_preferences(info, user):
continue
msg = build_cve_message(info)
if send_telegram_to_chat(chat_id, msg):
sent += 1
if sent >= max_items:
return sent
time.sleep(0.15)
total = data.get("totalResults") or 0
start = params.get("startIndex", 0)
per = params.get("resultsPerPage", 2000)
if not total or start + per >= int(total):
break
params["startIndex"] = start + per
except Exception as e:
print("Backfill error:", e)
return sent
# ---------------- main loop ----------------
def run_loop():
if not TELEGRAM_API:
print("Warning: TELEGRAM_BOT_TOKEN not set")
return
state = load_state()
try:
poll_telegram_updates_once()
except Exception as e:
print("Initial poll failed:", e)
while True:
try:
try:
poll_telegram_updates_once()
except Exception as e:
print("poll error:", e)
last_pub = state.get("last_pub") or (datetime.now(timezone.utc) - timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")
print(f"[{datetime.now(timezone.utc).isoformat()}] Querying NVD from {last_pub} to {now_iso}")
new_cves = query_new_cves(last_pub, now_iso)
subs_data = load_subscribers()
users = subs_data.get("users", {}) or {}
if not new_cves:
print(f"No new CVEs")
else:
print(f"Found {len(new_cves)} new CVE(s)")
# Check if we're in quiet hours
in_quiet_hours = is_quiet_hours()
if in_quiet_hours:
print(f"⏰ Quiet hours active - queuing messages for later")
for c in new_cves:
msg = build_cve_message(c)
poll_telegram_updates_once()
subs_data = load_subscribers()
users = subs_data.get("users", {}) or {}
if not users:
continue
for cid_str, u in list(users.items()):
try:
u_last = u.get("last_sent")
if not u_last:
users[cid_str]["last_sent"] = now_iso()
continue
send_it = True
try:
c_pub = dateparser.parse(c.get("published"))
u_dt = dateparser.parse(u_last)
if c_pub and u_dt:
send_it = c_pub > u_dt
except Exception:
send_it = True
if not send_it:
continue
if not matches_user_preferences(c, u):
continue
# During quiet hours, only send CRITICAL severity CVEs
if in_quiet_hours:
cvss_scores = c.get("cvss") or []
is_critical = any(cvss_to_severity(score) == "critical" for score in cvss_scores)
if not is_critical:
print(f"⏰ Skipping non-critical CVE {c.get('id')} during quiet hours")
# Update last_sent so we don't send it later
users[cid_str]["last_sent"] = c.get("published")
continue
ok = send_telegram_to_chat(u["chat_id"], msg)
if ok:
users[cid_str]["last_sent"] = c.get("published")
users[cid_str]["last_seen"] = datetime.now(timezone.utc).isoformat()
except Exception as e:
print("Error sending:", e)
time.sleep(0.12)
subs_data["users"] = users
save_subscribers(subs_data)
time.sleep(0.2)
state["last_pub"] = now_iso
save_state(state)
except requests.HTTPError as e:
print("HTTP error:", e)
time.sleep(60)
except Exception as e:
print("Error:", e)
time.sleep(30)
slept = 0
total_sleep = max(10, POLL_MINUTES * 60)
while slept < total_sleep:
time.sleep(TG_POLL_SECONDS)
slept += TG_POLL_SECONDS
try:
poll_telegram_updates_once()
except Exception as e:
print("poll error during sleep:", e)
if __name__ == "__main__":
run_loop()