#!/usr/bin/env python3 """ nvd_telegram_notifier_backfill.py - IMPROVED VERSION Features: - Default severity=medium filter for new subscribers - Improved welcome message with filter examples - Backfill supports HOURS: /backfill 1h = last 1 hour, /backfill 3h = last 3 hours - Backfill limited to 15 CVEs max (prevents spam) - New subscribers only get FUTURE CVEs - /reset to skip old CVEs Environment variables: - TELEGRAM_BOT_TOKEN (required) - NVD_API_KEY (optional) - POLL_MINUTES (default 15) - STATE_PATH (default ./state.json) - SUBSCRIBERS_PATH (default ./subscribers.json) - TG_POLL_SECONDS (default 15) - LOG_STDOUT (0 or 1) """ import os import json import time import requests from datetime import datetime, timedelta, timezone from dateutil import parser as dateparser from html import escape # ---------------- config ---------------- TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN") NVD_API_KEY = os.environ.get("NVD_API_KEY") POLL_MINUTES = int(os.environ.get("POLL_MINUTES", "15")) STATE_PATH = os.environ.get("STATE_PATH", "./state.json") SUBSCRIBERS_PATH = os.environ.get("SUBSCRIBERS_PATH", "./subscribers.json") TG_POLL_SECONDS = int(os.environ.get("TG_POLL_SECONDS", "15")) LOG_STDOUT = os.environ.get("LOG_STDOUT", "0") == "1" # Quiet hours configuration (24-hour format) QUIET_HOURS_START = int(os.environ.get("QUIET_HOURS_START", "22")) # 10 PM QUIET_HOURS_END = int(os.environ.get("QUIET_HOURS_END", "8")) # 8 AM QUIET_HOURS_ENABLED = os.environ.get("QUIET_HOURS_ENABLED", "1") == "1" NVD_BASE = "https://services.nvd.nist.gov/rest/json/cves/2.0/" PER_PAGE = 2000 MAX_MSG_LEN = 3800 TELEGRAM_API = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}" if TELEGRAM_BOT_TOKEN else None # ---------------- utilities ---------------- def now_iso(): return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z") def is_quiet_hours(): """Check if current time is within quiet hours""" if not QUIET_HOURS_ENABLED: return False now = datetime.now(timezone.utc) current_hour = now.hour # Handle case where quiet hours span midnight if QUIET_HOURS_START > QUIET_HOURS_END: # e.g., 22:00 to 08:00 return current_hour >= QUIET_HOURS_START or current_hour < QUIET_HOURS_END else: # e.g., 01:00 to 06:00 return QUIET_HOURS_START <= current_hour < QUIET_HOURS_END def safe_write_json(path, data): try: d = os.path.dirname(path) if d: os.makedirs(d, exist_ok=True) with open(path, "w") as f: json.dump(data, f, indent=2) except Exception as e: print("Failed to write json:", path, e) def safe_load_json(path, default): try: if os.path.exists(path): with open(path, "r") as f: return json.load(f) except Exception: pass safe_write_json(path, default) return default # ---------------- state ---------------- def load_state(): default = {"last_pub": (datetime.now(timezone.utc) - timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%S.000Z")} return safe_load_json(STATE_PATH, default) def save_state(state): safe_write_json(STATE_PATH, state) # ---------------- subscribers ---------------- def load_subscribers(): default = {"offset": 0, "users": {}} return safe_load_json(SUBSCRIBERS_PATH, default) def save_subscribers(data): safe_write_json(SUBSCRIBERS_PATH, data) def parse_start_payload(payload_text): prefs = {} if not payload_text: # Default to medium severity and comprehensive keywords if no preferences provided prefs["severity"] = "high" prefs["keywords"] = [ "linux", "kernel", "ubuntu", "debian", "rhel", "postgresql", "postgres", "mysql", "mariadb", "redis", "mongodb", "elasticsearch", "nginx", "apache", "haproxy", "tomcat", "java", "jdk", "openjdk", "spring", "hibernate", "log4j", "struts", "docker", "kubernetes", "k8s", "helm", "aws", "ec2", "rds", "eks", "lambda", "s3", "iam", "angular", "node", "nodejs", "ibm", "iseries", "db2", "websphere", "openssl", "ssh", "openssh", "sudo", "bind", "dns", "openvpn", "vpn", "iptables", "firewalld", "jenkins", "gitlab", "terraform", "ansible", "prometheus", "grafana", "vault", "consul" ] return prefs tokens = [t.strip() for t in payload_text.split(",") if t.strip()] keywords = [] severity_set = False for t in tokens: if t.lower().startswith("severity="): val = t.split("=", 1)[1].strip().lower() if val in ("low", "medium", "high", "critical"): prefs["severity"] = val severity_set = True else: keywords.append(t.lower()) # If no severity specified, default to high if not severity_set: prefs["severity"] = "high" if keywords: prefs["keywords"] = keywords return prefs def extract_user_info_from_msg(msg): user = msg.get("from", {}) or {} chat = msg.get("chat", {}) or {} now = datetime.now(timezone.utc).isoformat() info = { "chat_id": chat.get("id"), "is_bot": user.get("is_bot"), "first_name": user.get("first_name"), "last_name": user.get("last_name"), "username": user.get("username"), "language_code": user.get("language_code"), "subscribed_at": now, "last_seen": now, "start_payload": None, "preferences": {}, "last_sent": None } return info def build_welcome_message(prefs): """Build a nice welcome message showing active filters""" msg = "āœ… Subscribed to CVE notifications!\n\n" msg += "šŸ“¬ You will receive NEW CVEs as they're published.\n\n" msg += "šŸ”§ Your Current Filters:\n" severity = prefs.get("severity", "medium") msg += f"• Severity: {severity.upper()} and above\n" keywords = prefs.get("keywords", []) if keywords: msg += f"• Keywords: {', '.join(keywords)}\n" else: msg += "• Keywords: none (all CVEs)\n" msg += "\n━━━━━━━━━━━━━━━━━━━━━\n\n" msg += "šŸ“‹ Available Commands:\n\n" msg += "Backfill:\n" msg += "• /backfill 1h - Last 1 hour\n" msg += "• /backfill 6h - Last 6 hours\n" msg += "• /backfill 1d - Last 1 day (max 15)\n\n" msg += "Management:\n" msg += "• /status - Check your settings\n" msg += "• /reset - Reset timeline\n" msg += "• /stop - Unsubscribe\n\n" msg += "━━━━━━━━━━━━━━━━━━━━━\n\n" msg += "šŸŽÆ Filter Examples:\n\n" msg += "By severity only:\n" msg += "• /start severity=high\n" msg += "• /start severity=critical\n\n" msg += "By keywords only:\n" msg += "• /start linux,nginx\n" msg += "• /start apache,tomcat\n\n" msg += "Combined filters:\n" msg += "• /start severity=high,postgresql\n" msg += "• /start severity=critical,linux,kernel\n\n" msg += "šŸ’” Tip: Without filters, default is severity=medium" return msg def add_or_update_user_from_msg(msg, payload=None): info = extract_user_info_from_msg(msg) prefs = parse_start_payload(payload) info["start_payload"] = payload info["preferences"] = prefs subs = load_subscribers() users = subs.get("users", {}) cid = str(info["chat_id"]) if cid not in users: # New subscribers get last_sent = NOW info["last_sent"] = now_iso() users[cid] = info subs["users"] = users save_subscribers(subs) print(f"Added new subscriber: {cid} with last_sent={info['last_sent']}, filters={prefs}") # Send welcome message send_telegram_to_chat(info["chat_id"], build_welcome_message(prefs)) # Auto-backfill last 12 hours for new subscribers send_telegram_to_chat( info["chat_id"], "šŸ” Loading recent CVEs...\n\n" "Fetching CVEs from the last 12 hours to get you started.\n" "(This is a one-time backfill)" ) sent = backfill_to_user(info["chat_id"], since_iso=(datetime.now(timezone.utc) - timedelta(hours=12)).strftime("%Y-%m-%dT%H:%M:%S.000Z"), max_items=15) send_telegram_to_chat(info["chat_id"], f"āœ… Backfill complete: {sent} CVE(s) sent.\n\nYou'll now receive new CVEs as they're published!") else: users[cid]["last_seen"] = datetime.now(timezone.utc).isoformat() if payload: users[cid]["start_payload"] = payload users[cid]["preferences"] = prefs subs["users"] = users save_subscribers(subs) print("Updated subscriber:", cid, "filters=", prefs) # Show updated filters keywords_str = ', '.join(prefs.get('keywords', []))[:100] # Truncate if too long if len(keywords_str) >= 100: keywords_str += "..." send_telegram_to_chat( info["chat_id"], f"āœ… Welcome back!\n\n" f"Your filters have been updated:\n" f"• Severity: {prefs.get('severity', 'high').upper()}\n" f"• Keywords: {keywords_str or 'none'}" ) def remove_user_by_chat(chat_id): subs = load_subscribers() users = subs.get("users", {}) cid = str(chat_id) if cid in users: users.pop(cid) subs["users"] = users save_subscribers(subs) print("Removed subscriber:", cid) send_telegram_to_chat(chat_id, "āœ… Unsubscribed from CVE notifications.") return True else: send_telegram_to_chat(chat_id, "ā„¹ļø You were not subscribed.") return False def reset_user_timeline(chat_id): subs = load_subscribers() users = subs.get("users", {}) cid = str(chat_id) if cid not in users: send_telegram_to_chat(chat_id, "āŒ Not subscribed. Use /start first.") return users[cid]["last_sent"] = now_iso() users[cid]["last_seen"] = datetime.now(timezone.utc).isoformat() subs["users"] = users save_subscribers(subs) send_telegram_to_chat( chat_id, f"āœ… Timeline reset!\n\n" f"You'll only receive CVEs published after:\n" f"{users[cid]['last_sent']}" ) def get_user_status(chat_id): subs = load_subscribers() users = subs.get("users", {}) cid = str(chat_id) if cid not in users: send_telegram_to_chat(chat_id, "āŒ Not subscribed. Use /start to subscribe.") return user = users[cid] status_msg = f"šŸ“Š Subscription Status\n\n" status_msg += f"āœ… Subscribed: {user.get('subscribed_at', 'Unknown')[:19]}\n" status_msg += f"šŸ“… Tracking CVEs after: {user.get('last_sent', 'Unknown')[:19]}\n\n" prefs = user.get("preferences", {}) status_msg += f"šŸ”§ Active Filters:\n" severity = prefs.get("severity", "medium") status_msg += f"• Severity: {severity.upper()} and above\n" keywords = prefs.get("keywords", []) if keywords: status_msg += f"• Keywords: {', '.join(keywords)}\n" else: status_msg += "• Keywords: none (all CVEs)\n" status_msg += "\nšŸ’” Use /start with new filters to update" send_telegram_to_chat(chat_id, status_msg) def show_help(chat_id): subs = load_subscribers() users = subs.get("users", {}) cid = str(chat_id) if cid not in users: send_telegram_to_chat(chat_id, "āŒ Not subscribed. Use /start to subscribe first.") return user = users[cid] prefs = user.get("preferences", {}) help_msg = "šŸ“¬ You will receive NEW CVEs as they're published.\n\n" help_msg += "šŸ”§ Your Current Filters:\n" severity = prefs.get("severity", "medium") help_msg += f"• Severity: {severity.upper()} and above\n" keywords = prefs.get("keywords", []) if keywords: help_msg += f"• Keywords: {', '.join(keywords)}\n" else: help_msg += "• Keywords: none (all CVEs)\n" help_msg += "\n━━━━━━━━━━━━━━━━━━━━━\n\n" help_msg += "šŸ“‹ Available Commands:\n\n" help_msg += "Backfill:\n" help_msg += "• /backfill 1h - Last 1 hour\n" help_msg += "• /backfill 6h - Last 6 hours\n" help_msg += "• /backfill 1d - Last 1 day (max 15)\n\n" help_msg += "Management:\n" help_msg += "• /status - Check your settings\n" help_msg += "• /reset - Reset timeline\n" help_msg += "• /help - Show this message\n" help_msg += "• /stop - Unsubscribe\n\n" help_msg += "━━━━━━━━━━━━━━━━━━━━━\n\n" help_msg += "šŸŽÆ Filter Examples:\n\n" help_msg += "By severity only:\n" help_msg += "• /start severity=high\n" help_msg += "• /start severity=critical\n\n" help_msg += "By keywords only:\n" help_msg += "• /start linux,nginx\n" help_msg += "• /start apache,tomcat\n\n" help_msg += "Combined filters:\n" help_msg += "• /start severity=high,postgresql\n" help_msg += "• /start severity=critical,linux,kernel\n\n" help_msg += "šŸ’” Tip: Without filters, default is severity=medium" send_telegram_to_chat(chat_id, help_msg) def parse_backfill_timespec(timespec): """Parse time specs like '1h', '6h', '1d' into hours""" timespec = timespec.strip().lower() # Default to 1 hour if just a number if timespec.isdigit(): return int(timespec) # Parse format like "1h", "6h", "1d" import re match = re.match(r'^(\d+)([hd])$', timespec) if not match: return 1 # default to 1 hour num = int(match.group(1)) unit = match.group(2) if unit == 'h': return num elif unit == 'd': return num * 24 return 1 # ---------------- Telegram polling ---------------- def poll_telegram_updates_once(): if not TELEGRAM_API: return subs = load_subscribers() offset = subs.get("offset", 0) or 0 url = f"{TELEGRAM_API}/getUpdates" params = {"timeout": 0, "offset": offset, "limit": 100} try: r = requests.get(url, params=params, timeout=20) r.raise_for_status() res = r.json() if not res.get("ok"): print("getUpdates not ok:", res) return updates = res.get("result", []) or [] max_uid = offset for u in updates: uid = u.get("update_id") if uid is not None and uid >= max_uid: max_uid = uid + 1 msg = u.get("message") or u.get("edited_message") or u.get("channel_post") or {} if not msg: continue text = (msg.get("text") or "").strip() chat = msg.get("chat", {}) or {} chat_id = chat.get("id") if text.startswith("/start"): parts = text.split(" ", 1) payload = parts[1] if len(parts) > 1 else None add_or_update_user_from_msg(msg, payload) elif text.startswith("/stop"): if chat_id: remove_user_by_chat(chat_id) elif text.startswith("/reset"): if chat_id: reset_user_timeline(chat_id) elif text.startswith("/status"): if chat_id: get_user_status(chat_id) elif text.startswith("/help"): if chat_id: show_help(chat_id) elif text.startswith("/backfill"): parts = text.split(" ", 1) timespec = "1h" # default to 1 hour if len(parts) > 1: timespec = parts[1].strip() hours = parse_backfill_timespec(timespec) hours = min(hours, 72) # cap at 3 days if chat_id: send_telegram_to_chat( chat_id, f"šŸ” Searching for CVEs from last {hours} hour(s)...\n" f"(Max 15 CVEs, won't affect your regular notifications)" ) since = (datetime.now(timezone.utc) - timedelta(hours=hours)).strftime("%Y-%m-%dT%H:%M:%S.000Z") sent = backfill_to_user(chat_id, since_iso=since, max_items=15) send_telegram_to_chat(chat_id, f"āœ… Backfill complete: {sent} CVE(s) sent.") else: # Update last_seen cid = str(chat_id) if cid: s = load_subscribers() users = s.get("users", {}) if cid in users: users[cid]["last_seen"] = datetime.now(timezone.utc).isoformat() s["users"] = users save_subscribers(s) subs2 = load_subscribers() subs2["offset"] = max_uid save_subscribers(subs2) except Exception as e: print("Failed to poll Telegram updates:", e) # ---------------- Telegram send ---------------- def send_telegram_to_chat(chat_identifier, text): if not TELEGRAM_API: print("No TELEGRAM_BOT_TOKEN configured") return False payload = {"chat_id": chat_identifier, "text": text, "disable_web_page_preview": True, "parse_mode": "HTML"} try: r = requests.post(f"{TELEGRAM_API}/sendMessage", data=payload, timeout=20) if r.status_code != 200: try: j = r.json() desc = j.get("description", "") if r.status_code in (403, 400) and ("blocked" in desc.lower() or "not found" in desc.lower()): cid = int(chat_identifier) remove_user_by_chat(cid) except Exception: pass r.raise_for_status() return True except Exception as e: print("Failed to send telegram:", e) return False # ---------------- NVD helpers ---------------- def nvd_request(params): headers = {"Accept": "application/json"} if NVD_API_KEY: headers["apiKey"] = NVD_API_KEY r = requests.get(NVD_BASE, params=params, headers=headers, timeout=60) r.raise_for_status() return r.json() def extract_cve_info(v): cve = v.get("cve") or {} cve_id = cve.get("id") or cve.get("CVE_data_meta", {}).get("ID") or v.get("cveId") published = cve.get("published") or v.get("published") or v.get("publishedDate") desc = "" for d in (cve.get("descriptions") or []): if d.get("value"): desc = d["value"] break if not desc: desc = cve.get("description") or "" refs = [] for r in (cve.get("references") or []): url = r.get("url") or r.get("reference") if url: refs.append(url) cvss_scores = [] metrics = cve.get("metrics") or {} for key in metrics: for item in metrics.get(key) or []: cvss = item.get("cvssData") or item.get("cvssMetric") if cvss: score = cvss.get("baseScore") or cvss.get("score") try: if score is not None: cvss_scores.append(float(score)) except Exception: pass cvss_strs = [("{:.1f}".format(s)) for s in cvss_scores] return {"id": cve_id, "published": published, "description": desc, "cvss": cvss_scores, "cvss_str": cvss_strs, "references": refs} def query_new_cves(last_pub, now_iso): params = {"pubStartDate": last_pub, "pubEndDate": now_iso, "resultsPerPage": PER_PAGE, "startIndex": 0} all_cves = [] while True: data = nvd_request(params) total = data.get("totalResults") or 0 vulns = data.get("vulnerabilities") or [] for v in vulns: info = extract_cve_info(v) if info.get("id"): all_cves.append(info) start = params.get("startIndex", 0) per = params.get("resultsPerPage", PER_PAGE) if not total or start + per >= int(total): break params["startIndex"] = start + per time.sleep(0.5) try: all_cves.sort(key=lambda c: dateparser.parse(c.get("published") or now_iso)) except Exception: pass return all_cves # ---------------- matching & formatting ---------------- def cvss_to_severity(cvss_score): if cvss_score is None: return None try: s = float(cvss_score) except Exception: return None if s >= 9.0: return "critical" if s >= 7.0: return "high" if s >= 4.0: return "medium" return "low" def matches_user_preferences(cve, user): prefs = user.get("preferences", {}) or {} sev_pref = prefs.get("severity") if sev_pref: scores = cve.get("cvss") or [] if not scores: return False meets = False for s in scores: severity = cvss_to_severity(s) if severity: order = {"low": 1, "medium": 2, "high": 3, "critical": 4} if order.get(severity, 0) >= order.get(sev_pref, 0): meets = True break if not meets: return False keywords = prefs.get("keywords") or [] if keywords: text = (cve.get("description", "") + " " + (cve.get("id") or "")).lower() if not any(kw.lower() in text for kw in keywords): return False return True def extract_subject(description): if not description: return "New vulnerability" parts = description.split(".") return parts[0].strip() + "." def build_cve_message(cve): cve_id = cve.get("id") or "UNKNOWN" published = cve.get("published") or "N/A" subject = escape(extract_subject(cve.get("description", ""))) desc = escape((cve.get("description", "") or "")[:1200]) # Determine severity from CVSS scores severity_label = "UNKNOWN" severity_emoji = "⚪" cvss_scores = cve.get("cvss") or [] if cvss_scores: max_score = max(cvss_scores) severity = cvss_to_severity(max_score) if severity == "critical": severity_label = "CRITICAL" severity_emoji = "šŸ”“" elif severity == "high": severity_label = "HIGH" severity_emoji = "🟠" elif severity == "medium": severity_label = "MEDIUM" severity_emoji = "🟔" elif severity == "low": severity_label = "LOW" severity_emoji = "🟢" parts = [ f"🚨 {escape(cve_id)}", f"šŸ—“ Published: {escape(published)}", f"{severity_emoji} Severity: {severity_label}", f"šŸ“Œ Summary: {subject}" ] if cve.get("cvss_str"): parts.append("šŸ”„ CVSS Score: " + ", ".join(escape(s) for s in cve.get("cvss_str"))) parts.append("") parts.append(desc) if cve.get("references"): parts.append(f'šŸ”— Reference') msg = "\n".join(parts) if len(msg) > MAX_MSG_LEN: msg = msg[:MAX_MSG_LEN].rsplit("\n", 1)[0] + "\n…" return msg # ---------------- backfill ---------------- def backfill_to_user(chat_id, since_iso, max_items=15): """Backfill - doesn't update last_sent""" until_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z") params = {"pubStartDate": since_iso, "pubEndDate": until_iso, "resultsPerPage": 2000, "startIndex": 0} sent = 0 subs = load_subscribers() users = subs.get("users", {}) cid = str(chat_id) user = users.get(cid, {}) try: while True: data = nvd_request(params) vulns = data.get("vulnerabilities") or [] for v in vulns: info = extract_cve_info(v) if not matches_user_preferences(info, user): continue msg = build_cve_message(info) if send_telegram_to_chat(chat_id, msg): sent += 1 if sent >= max_items: return sent time.sleep(0.15) total = data.get("totalResults") or 0 start = params.get("startIndex", 0) per = params.get("resultsPerPage", 2000) if not total or start + per >= int(total): break params["startIndex"] = start + per except Exception as e: print("Backfill error:", e) return sent # ---------------- main loop ---------------- def run_loop(): if not TELEGRAM_API: print("Warning: TELEGRAM_BOT_TOKEN not set") return state = load_state() try: poll_telegram_updates_once() except Exception as e: print("Initial poll failed:", e) while True: try: try: poll_telegram_updates_once() except Exception as e: print("poll error:", e) last_pub = state.get("last_pub") or (datetime.now(timezone.utc) - timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%S.000Z") now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z") print(f"[{datetime.now(timezone.utc).isoformat()}] Querying NVD from {last_pub} to {now_iso}") new_cves = query_new_cves(last_pub, now_iso) subs_data = load_subscribers() users = subs_data.get("users", {}) or {} if not new_cves: print(f"No new CVEs") else: print(f"Found {len(new_cves)} new CVE(s)") # Check if we're in quiet hours in_quiet_hours = is_quiet_hours() if in_quiet_hours: print(f"ā° Quiet hours active - queuing messages for later") for c in new_cves: msg = build_cve_message(c) poll_telegram_updates_once() subs_data = load_subscribers() users = subs_data.get("users", {}) or {} if not users: continue for cid_str, u in list(users.items()): try: u_last = u.get("last_sent") if not u_last: users[cid_str]["last_sent"] = now_iso() continue send_it = True try: c_pub = dateparser.parse(c.get("published")) u_dt = dateparser.parse(u_last) if c_pub and u_dt: send_it = c_pub > u_dt except Exception: send_it = True if not send_it: continue if not matches_user_preferences(c, u): continue # During quiet hours, only send CRITICAL severity CVEs if in_quiet_hours: cvss_scores = c.get("cvss") or [] is_critical = any(cvss_to_severity(score) == "critical" for score in cvss_scores) if not is_critical: print(f"ā° Skipping non-critical CVE {c.get('id')} during quiet hours") # Update last_sent so we don't send it later users[cid_str]["last_sent"] = c.get("published") continue ok = send_telegram_to_chat(u["chat_id"], msg) if ok: users[cid_str]["last_sent"] = c.get("published") users[cid_str]["last_seen"] = datetime.now(timezone.utc).isoformat() except Exception as e: print("Error sending:", e) time.sleep(0.12) subs_data["users"] = users save_subscribers(subs_data) time.sleep(0.2) state["last_pub"] = now_iso save_state(state) except requests.HTTPError as e: print("HTTP error:", e) time.sleep(60) except Exception as e: print("Error:", e) time.sleep(30) slept = 0 total_sleep = max(10, POLL_MINUTES * 60) while slept < total_sleep: time.sleep(TG_POLL_SECONDS) slept += TG_POLL_SECONDS try: poll_telegram_updates_once() except Exception as e: print("poll error during sleep:", e) if __name__ == "__main__": run_loop()