first commit

This commit is contained in:
2026-01-09 13:50:10 +01:00
parent 2ab7414e2b
commit b860c8dc6d
6 changed files with 892 additions and 0 deletions

16
Dockerfile Normal file
View File

@@ -0,0 +1,16 @@
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Install dependencies
RUN pip install --no-cache-dir requests python-dateutil
# Copy the script
COPY nvd_telegram_notifier.py .
# Create directories for persistent data
RUN mkdir -p /app/data
# Run the script
CMD ["python", "-u", "nvd_telegram_notifier.py"]

3
data/state.json Normal file
View File

@@ -0,0 +1,3 @@
{
"last_pub": "2026-01-09T10:23:51.000Z"
}

20
data/subscribers.json Normal file
View File

@@ -0,0 +1,20 @@
{
"offset": 844171944,
"users": {
"145645559": {
"chat_id": 145645559,
"is_bot": false,
"first_name": "Tomas",
"last_name": "Mali",
"username": "tomasmali",
"language_code": "en",
"subscribed_at": "2026-01-09T10:22:10.723523+00:00",
"last_seen": "2026-01-09T12:48:47.988471+00:00",
"start_payload": "severity=high",
"preferences": {
"severity": "high"
},
"last_sent": "2026-01-09T12:15:53.420"
}
}
}

24
docker-compose.yml Normal file
View File

@@ -0,0 +1,24 @@
services:
nvd-notifier:
build: .
container_name: nvd-telegram-notifier
restart: unless-stopped
network_mode: "host"
env_file:
- .env
volumes:
# Persist state files
- ./data:/app/data
environment:
# Override paths to use the data volume
- STATE_PATH=/app/data/state.json
- SUBSCRIBERS_PATH=/app/data/subscribers.json
dns:
- 8.8.8.8
- 8.8.4.4
# Uncomment to see logs in real-time
# logging:
# driver: "json-file"
# options:
# max-size: "10m"
# max-file: "3"

827
nvd_telegram_notifier.py Normal file
View File

@@ -0,0 +1,827 @@
#!/usr/bin/env python3
"""
nvd_telegram_notifier_backfill.py - IMPROVED VERSION
Features:
- Default severity=medium filter for new subscribers
- Improved welcome message with filter examples
- Backfill supports HOURS: /backfill 1h = last 1 hour, /backfill 3h = last 3 hours
- Backfill limited to 15 CVEs max (prevents spam)
- New subscribers only get FUTURE CVEs
- /reset to skip old CVEs
Environment variables:
- TELEGRAM_BOT_TOKEN (required)
- NVD_API_KEY (optional)
- POLL_MINUTES (default 15)
- STATE_PATH (default ./state.json)
- SUBSCRIBERS_PATH (default ./subscribers.json)
- TG_POLL_SECONDS (default 15)
- LOG_STDOUT (0 or 1)
"""
import os
import json
import time
import requests
from datetime import datetime, timedelta, timezone
from dateutil import parser as dateparser
from html import escape
# ---------------- config ----------------
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN")
NVD_API_KEY = os.environ.get("NVD_API_KEY")
POLL_MINUTES = int(os.environ.get("POLL_MINUTES", "15"))
STATE_PATH = os.environ.get("STATE_PATH", "./state.json")
SUBSCRIBERS_PATH = os.environ.get("SUBSCRIBERS_PATH", "./subscribers.json")
TG_POLL_SECONDS = int(os.environ.get("TG_POLL_SECONDS", "15"))
LOG_STDOUT = os.environ.get("LOG_STDOUT", "0") == "1"
# Quiet hours configuration (24-hour format)
QUIET_HOURS_START = int(os.environ.get("QUIET_HOURS_START", "22")) # 10 PM
QUIET_HOURS_END = int(os.environ.get("QUIET_HOURS_END", "8")) # 8 AM
QUIET_HOURS_ENABLED = os.environ.get("QUIET_HOURS_ENABLED", "1") == "1"
NVD_BASE = "https://services.nvd.nist.gov/rest/json/cves/2.0/"
PER_PAGE = 2000
MAX_MSG_LEN = 3800
TELEGRAM_API = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}" if TELEGRAM_BOT_TOKEN else None
# ---------------- utilities ----------------
def now_iso():
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")
def is_quiet_hours():
"""Check if current time is within quiet hours"""
if not QUIET_HOURS_ENABLED:
return False
now = datetime.now(timezone.utc)
current_hour = now.hour
# Handle case where quiet hours span midnight
if QUIET_HOURS_START > QUIET_HOURS_END:
# e.g., 22:00 to 08:00
return current_hour >= QUIET_HOURS_START or current_hour < QUIET_HOURS_END
else:
# e.g., 01:00 to 06:00
return QUIET_HOURS_START <= current_hour < QUIET_HOURS_END
def safe_write_json(path, data):
try:
d = os.path.dirname(path)
if d:
os.makedirs(d, exist_ok=True)
with open(path, "w") as f:
json.dump(data, f, indent=2)
except Exception as e:
print("Failed to write json:", path, e)
def safe_load_json(path, default):
try:
if os.path.exists(path):
with open(path, "r") as f:
return json.load(f)
except Exception:
pass
safe_write_json(path, default)
return default
# ---------------- state ----------------
def load_state():
default = {"last_pub": (datetime.now(timezone.utc) - timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%S.000Z")}
return safe_load_json(STATE_PATH, default)
def save_state(state):
safe_write_json(STATE_PATH, state)
# ---------------- subscribers ----------------
def load_subscribers():
default = {"offset": 0, "users": {}}
return safe_load_json(SUBSCRIBERS_PATH, default)
def save_subscribers(data):
safe_write_json(SUBSCRIBERS_PATH, data)
def parse_start_payload(payload_text):
prefs = {}
if not payload_text:
# Default to medium severity and comprehensive keywords if no preferences provided
prefs["severity"] = "high"
prefs["keywords"] = [
"linux", "kernel", "ubuntu", "debian", "rhel",
"postgresql", "postgres", "mysql", "mariadb", "redis", "mongodb", "elasticsearch",
"nginx", "apache", "haproxy",
"tomcat", "java", "jdk", "openjdk", "spring", "hibernate", "log4j", "struts",
"docker", "kubernetes", "k8s", "helm",
"aws", "ec2", "rds", "eks", "lambda", "s3", "iam",
"angular", "node", "nodejs",
"ibm", "iseries", "db2", "websphere",
"openssl", "ssh", "openssh", "sudo",
"bind", "dns", "openvpn", "vpn", "iptables", "firewalld",
"jenkins", "gitlab", "terraform", "ansible",
"prometheus", "grafana", "vault", "consul"
]
return prefs
tokens = [t.strip() for t in payload_text.split(",") if t.strip()]
keywords = []
severity_set = False
for t in tokens:
if t.lower().startswith("severity="):
val = t.split("=", 1)[1].strip().lower()
if val in ("low", "medium", "high", "critical"):
prefs["severity"] = val
severity_set = True
else:
keywords.append(t.lower())
# If no severity specified, default to high
if not severity_set:
prefs["severity"] = "high"
if keywords:
prefs["keywords"] = keywords
return prefs
def extract_user_info_from_msg(msg):
user = msg.get("from", {}) or {}
chat = msg.get("chat", {}) or {}
now = datetime.now(timezone.utc).isoformat()
info = {
"chat_id": chat.get("id"),
"is_bot": user.get("is_bot"),
"first_name": user.get("first_name"),
"last_name": user.get("last_name"),
"username": user.get("username"),
"language_code": user.get("language_code"),
"subscribed_at": now,
"last_seen": now,
"start_payload": None,
"preferences": {},
"last_sent": None
}
return info
def build_welcome_message(prefs):
"""Build a nice welcome message showing active filters"""
msg = "✅ <b>Subscribed to CVE notifications!</b>\n\n"
msg += "📬 You will receive NEW CVEs as they're published.\n\n"
msg += "🔧 <b>Your Current Filters:</b>\n"
severity = prefs.get("severity", "medium")
msg += f"• Severity: <code>{severity.upper()}</code> and above\n"
keywords = prefs.get("keywords", [])
if keywords:
msg += f"• Keywords: <code>{', '.join(keywords)}</code>\n"
else:
msg += "• Keywords: <i>none (all CVEs)</i>\n"
msg += "\n━━━━━━━━━━━━━━━━━━━━━\n\n"
msg += "📋 <b>Available Commands:</b>\n\n"
msg += "<b>Backfill:</b>\n"
msg += "• <code>/backfill 1h</code> - Last 1 hour\n"
msg += "• <code>/backfill 6h</code> - Last 6 hours\n"
msg += "• <code>/backfill 1d</code> - Last 1 day (max 15)\n\n"
msg += "<b>Management:</b>\n"
msg += "• <code>/status</code> - Check your settings\n"
msg += "• <code>/reset</code> - Reset timeline\n"
msg += "• <code>/stop</code> - Unsubscribe\n\n"
msg += "━━━━━━━━━━━━━━━━━━━━━\n\n"
msg += "🎯 <b>Filter Examples:</b>\n\n"
msg += "<b>By severity only:</b>\n"
msg += "• <code>/start severity=high</code>\n"
msg += "• <code>/start severity=critical</code>\n\n"
msg += "<b>By keywords only:</b>\n"
msg += "• <code>/start linux,nginx</code>\n"
msg += "• <code>/start apache,tomcat</code>\n\n"
msg += "<b>Combined filters:</b>\n"
msg += "• <code>/start severity=high,postgresql</code>\n"
msg += "• <code>/start severity=critical,linux,kernel</code>\n\n"
msg += "💡 <i>Tip: Without filters, default is severity=medium</i>"
return msg
def add_or_update_user_from_msg(msg, payload=None):
info = extract_user_info_from_msg(msg)
prefs = parse_start_payload(payload)
info["start_payload"] = payload
info["preferences"] = prefs
subs = load_subscribers()
users = subs.get("users", {})
cid = str(info["chat_id"])
if cid not in users:
# New subscribers get last_sent = NOW
info["last_sent"] = now_iso()
users[cid] = info
subs["users"] = users
save_subscribers(subs)
print(f"Added new subscriber: {cid} with last_sent={info['last_sent']}, filters={prefs}")
# Send welcome message
send_telegram_to_chat(info["chat_id"], build_welcome_message(prefs))
# Auto-backfill last 12 hours for new subscribers
send_telegram_to_chat(
info["chat_id"],
"🔍 <b>Loading recent CVEs...</b>\n\n"
"Fetching CVEs from the last 12 hours to get you started.\n"
"(This is a one-time backfill)"
)
sent = backfill_to_user(info["chat_id"],
since_iso=(datetime.now(timezone.utc) - timedelta(hours=12)).strftime("%Y-%m-%dT%H:%M:%S.000Z"),
max_items=15)
send_telegram_to_chat(info["chat_id"], f"✅ Backfill complete: {sent} CVE(s) sent.\n\nYou'll now receive new CVEs as they're published!")
else:
users[cid]["last_seen"] = datetime.now(timezone.utc).isoformat()
if payload:
users[cid]["start_payload"] = payload
users[cid]["preferences"] = prefs
subs["users"] = users
save_subscribers(subs)
print("Updated subscriber:", cid, "filters=", prefs)
# Show updated filters
keywords_str = ', '.join(prefs.get('keywords', []))[:100] # Truncate if too long
if len(keywords_str) >= 100:
keywords_str += "..."
send_telegram_to_chat(
info["chat_id"],
f"✅ <b>Welcome back!</b>\n\n"
f"Your filters have been updated:\n"
f"• Severity: <code>{prefs.get('severity', 'high').upper()}</code>\n"
f"• Keywords: <code>{keywords_str or 'none'}</code>"
)
def remove_user_by_chat(chat_id):
subs = load_subscribers()
users = subs.get("users", {})
cid = str(chat_id)
if cid in users:
users.pop(cid)
subs["users"] = users
save_subscribers(subs)
print("Removed subscriber:", cid)
send_telegram_to_chat(chat_id, "✅ Unsubscribed from CVE notifications.")
return True
else:
send_telegram_to_chat(chat_id, " You were not subscribed.")
return False
def reset_user_timeline(chat_id):
subs = load_subscribers()
users = subs.get("users", {})
cid = str(chat_id)
if cid not in users:
send_telegram_to_chat(chat_id, "❌ Not subscribed. Use /start first.")
return
users[cid]["last_sent"] = now_iso()
users[cid]["last_seen"] = datetime.now(timezone.utc).isoformat()
subs["users"] = users
save_subscribers(subs)
send_telegram_to_chat(
chat_id,
f"✅ <b>Timeline reset!</b>\n\n"
f"You'll only receive CVEs published after:\n"
f"<code>{users[cid]['last_sent']}</code>"
)
def get_user_status(chat_id):
subs = load_subscribers()
users = subs.get("users", {})
cid = str(chat_id)
if cid not in users:
send_telegram_to_chat(chat_id, "❌ Not subscribed. Use /start to subscribe.")
return
user = users[cid]
status_msg = f"📊 <b>Subscription Status</b>\n\n"
status_msg += f"✅ Subscribed: <code>{user.get('subscribed_at', 'Unknown')[:19]}</code>\n"
status_msg += f"📅 Tracking CVEs after: <code>{user.get('last_sent', 'Unknown')[:19]}</code>\n\n"
prefs = user.get("preferences", {})
status_msg += f"🔧 <b>Active Filters:</b>\n"
severity = prefs.get("severity", "medium")
status_msg += f"• Severity: <code>{severity.upper()}</code> and above\n"
keywords = prefs.get("keywords", [])
if keywords:
status_msg += f"• Keywords: <code>{', '.join(keywords)}</code>\n"
else:
status_msg += "• Keywords: <i>none (all CVEs)</i>\n"
status_msg += "\n💡 <i>Use /start with new filters to update</i>"
send_telegram_to_chat(chat_id, status_msg)
def show_help(chat_id):
subs = load_subscribers()
users = subs.get("users", {})
cid = str(chat_id)
if cid not in users:
send_telegram_to_chat(chat_id, "❌ Not subscribed. Use /start to subscribe first.")
return
user = users[cid]
prefs = user.get("preferences", {})
help_msg = "📬 <b>You will receive NEW CVEs as they're published.</b>\n\n"
help_msg += "🔧 <b>Your Current Filters:</b>\n"
severity = prefs.get("severity", "medium")
help_msg += f"• Severity: <code>{severity.upper()}</code> and above\n"
keywords = prefs.get("keywords", [])
if keywords:
help_msg += f"• Keywords: <code>{', '.join(keywords)}</code>\n"
else:
help_msg += "• Keywords: <i>none (all CVEs)</i>\n"
help_msg += "\n━━━━━━━━━━━━━━━━━━━━━\n\n"
help_msg += "📋 <b>Available Commands:</b>\n\n"
help_msg += "<b>Backfill:</b>\n"
help_msg += "• <code>/backfill 1h</code> - Last 1 hour\n"
help_msg += "• <code>/backfill 6h</code> - Last 6 hours\n"
help_msg += "• <code>/backfill 1d</code> - Last 1 day (max 15)\n\n"
help_msg += "<b>Management:</b>\n"
help_msg += "• <code>/status</code> - Check your settings\n"
help_msg += "• <code>/reset</code> - Reset timeline\n"
help_msg += "• <code>/help</code> - Show this message\n"
help_msg += "• <code>/stop</code> - Unsubscribe\n\n"
help_msg += "━━━━━━━━━━━━━━━━━━━━━\n\n"
help_msg += "🎯 <b>Filter Examples:</b>\n\n"
help_msg += "<b>By severity only:</b>\n"
help_msg += "• <code>/start severity=high</code>\n"
help_msg += "• <code>/start severity=critical</code>\n\n"
help_msg += "<b>By keywords only:</b>\n"
help_msg += "• <code>/start linux,nginx</code>\n"
help_msg += "• <code>/start apache,tomcat</code>\n\n"
help_msg += "<b>Combined filters:</b>\n"
help_msg += "• <code>/start severity=high,postgresql</code>\n"
help_msg += "• <code>/start severity=critical,linux,kernel</code>\n\n"
help_msg += "💡 <i>Tip: Without filters, default is severity=medium</i>"
send_telegram_to_chat(chat_id, help_msg)
def parse_backfill_timespec(timespec):
"""Parse time specs like '1h', '6h', '1d' into hours"""
timespec = timespec.strip().lower()
# Default to 1 hour if just a number
if timespec.isdigit():
return int(timespec)
# Parse format like "1h", "6h", "1d"
import re
match = re.match(r'^(\d+)([hd])$', timespec)
if not match:
return 1 # default to 1 hour
num = int(match.group(1))
unit = match.group(2)
if unit == 'h':
return num
elif unit == 'd':
return num * 24
return 1
# ---------------- Telegram polling ----------------
def poll_telegram_updates_once():
if not TELEGRAM_API:
return
subs = load_subscribers()
offset = subs.get("offset", 0) or 0
url = f"{TELEGRAM_API}/getUpdates"
params = {"timeout": 0, "offset": offset, "limit": 100}
try:
r = requests.get(url, params=params, timeout=20)
r.raise_for_status()
res = r.json()
if not res.get("ok"):
print("getUpdates not ok:", res)
return
updates = res.get("result", []) or []
max_uid = offset
for u in updates:
uid = u.get("update_id")
if uid is not None and uid >= max_uid:
max_uid = uid + 1
msg = u.get("message") or u.get("edited_message") or u.get("channel_post") or {}
if not msg:
continue
text = (msg.get("text") or "").strip()
chat = msg.get("chat", {}) or {}
chat_id = chat.get("id")
if text.startswith("/start"):
parts = text.split(" ", 1)
payload = parts[1] if len(parts) > 1 else None
add_or_update_user_from_msg(msg, payload)
elif text.startswith("/stop"):
if chat_id:
remove_user_by_chat(chat_id)
elif text.startswith("/reset"):
if chat_id:
reset_user_timeline(chat_id)
elif text.startswith("/status"):
if chat_id:
get_user_status(chat_id)
elif text.startswith("/help"):
if chat_id:
show_help(chat_id)
elif text.startswith("/backfill"):
parts = text.split(" ", 1)
timespec = "1h" # default to 1 hour
if len(parts) > 1:
timespec = parts[1].strip()
hours = parse_backfill_timespec(timespec)
hours = min(hours, 72) # cap at 3 days
if chat_id:
send_telegram_to_chat(
chat_id,
f"🔍 Searching for CVEs from last {hours} hour(s)...\n"
f"(Max 15 CVEs, won't affect your regular notifications)"
)
since = (datetime.now(timezone.utc) - timedelta(hours=hours)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
sent = backfill_to_user(chat_id, since_iso=since, max_items=15)
send_telegram_to_chat(chat_id, f"✅ Backfill complete: {sent} CVE(s) sent.")
else:
# Update last_seen
cid = str(chat_id)
if cid:
s = load_subscribers()
users = s.get("users", {})
if cid in users:
users[cid]["last_seen"] = datetime.now(timezone.utc).isoformat()
s["users"] = users
save_subscribers(s)
subs2 = load_subscribers()
subs2["offset"] = max_uid
save_subscribers(subs2)
except Exception as e:
print("Failed to poll Telegram updates:", e)
# ---------------- Telegram send ----------------
def send_telegram_to_chat(chat_identifier, text):
if not TELEGRAM_API:
print("No TELEGRAM_BOT_TOKEN configured")
return False
payload = {"chat_id": chat_identifier, "text": text, "disable_web_page_preview": True, "parse_mode": "HTML"}
try:
r = requests.post(f"{TELEGRAM_API}/sendMessage", data=payload, timeout=20)
if r.status_code != 200:
try:
j = r.json()
desc = j.get("description", "")
if r.status_code in (403, 400) and ("blocked" in desc.lower() or "not found" in desc.lower()):
cid = int(chat_identifier)
remove_user_by_chat(cid)
except Exception:
pass
r.raise_for_status()
return True
except Exception as e:
print("Failed to send telegram:", e)
return False
# ---------------- NVD helpers ----------------
def nvd_request(params):
headers = {"Accept": "application/json"}
if NVD_API_KEY:
headers["apiKey"] = NVD_API_KEY
r = requests.get(NVD_BASE, params=params, headers=headers, timeout=60)
r.raise_for_status()
return r.json()
def extract_cve_info(v):
cve = v.get("cve") or {}
cve_id = cve.get("id") or cve.get("CVE_data_meta", {}).get("ID") or v.get("cveId")
published = cve.get("published") or v.get("published") or v.get("publishedDate")
desc = ""
for d in (cve.get("descriptions") or []):
if d.get("value"):
desc = d["value"]
break
if not desc:
desc = cve.get("description") or ""
refs = []
for r in (cve.get("references") or []):
url = r.get("url") or r.get("reference")
if url:
refs.append(url)
cvss_scores = []
metrics = cve.get("metrics") or {}
for key in metrics:
for item in metrics.get(key) or []:
cvss = item.get("cvssData") or item.get("cvssMetric")
if cvss:
score = cvss.get("baseScore") or cvss.get("score")
try:
if score is not None:
cvss_scores.append(float(score))
except Exception:
pass
cvss_strs = [("{:.1f}".format(s)) for s in cvss_scores]
return {"id": cve_id, "published": published, "description": desc, "cvss": cvss_scores, "cvss_str": cvss_strs, "references": refs}
def query_new_cves(last_pub, now_iso):
params = {"pubStartDate": last_pub, "pubEndDate": now_iso, "resultsPerPage": PER_PAGE, "startIndex": 0}
all_cves = []
while True:
data = nvd_request(params)
total = data.get("totalResults") or 0
vulns = data.get("vulnerabilities") or []
for v in vulns:
info = extract_cve_info(v)
if info.get("id"):
all_cves.append(info)
start = params.get("startIndex", 0)
per = params.get("resultsPerPage", PER_PAGE)
if not total or start + per >= int(total):
break
params["startIndex"] = start + per
time.sleep(0.5)
try:
all_cves.sort(key=lambda c: dateparser.parse(c.get("published") or now_iso))
except Exception:
pass
return all_cves
# ---------------- matching & formatting ----------------
def cvss_to_severity(cvss_score):
if cvss_score is None:
return None
try:
s = float(cvss_score)
except Exception:
return None
if s >= 9.0:
return "critical"
if s >= 7.0:
return "high"
if s >= 4.0:
return "medium"
return "low"
def matches_user_preferences(cve, user):
prefs = user.get("preferences", {}) or {}
sev_pref = prefs.get("severity")
if sev_pref:
scores = cve.get("cvss") or []
if not scores:
return False
meets = False
for s in scores:
severity = cvss_to_severity(s)
if severity:
order = {"low": 1, "medium": 2, "high": 3, "critical": 4}
if order.get(severity, 0) >= order.get(sev_pref, 0):
meets = True
break
if not meets:
return False
keywords = prefs.get("keywords") or []
if keywords:
text = (cve.get("description", "") + " " + (cve.get("id") or "")).lower()
if not any(kw.lower() in text for kw in keywords):
return False
return True
def extract_subject(description):
if not description:
return "New vulnerability"
parts = description.split(".")
return parts[0].strip() + "."
def build_cve_message(cve):
cve_id = cve.get("id") or "UNKNOWN"
published = cve.get("published") or "N/A"
subject = escape(extract_subject(cve.get("description", "")))
desc = escape((cve.get("description", "") or "")[:1200])
# Determine severity from CVSS scores
severity_label = "UNKNOWN"
severity_emoji = ""
cvss_scores = cve.get("cvss") or []
if cvss_scores:
max_score = max(cvss_scores)
severity = cvss_to_severity(max_score)
if severity == "critical":
severity_label = "CRITICAL"
severity_emoji = "🔴"
elif severity == "high":
severity_label = "HIGH"
severity_emoji = "🟠"
elif severity == "medium":
severity_label = "MEDIUM"
severity_emoji = "🟡"
elif severity == "low":
severity_label = "LOW"
severity_emoji = "🟢"
parts = [
f"🚨 <b>{escape(cve_id)}</b>",
f"🗓 <b>Published:</b> {escape(published)}",
f"{severity_emoji} <b>Severity:</b> {severity_label}",
f"📌 <b>Summary:</b> {subject}"
]
if cve.get("cvss_str"):
parts.append("🔥 <b>CVSS Score:</b> " + ", ".join(escape(s) for s in cve.get("cvss_str")))
parts.append("")
parts.append(desc)
if cve.get("references"):
parts.append(f'<a href="{escape(cve["references"][0], quote=True)}">🔗 Reference</a>')
msg = "\n".join(parts)
if len(msg) > MAX_MSG_LEN:
msg = msg[:MAX_MSG_LEN].rsplit("\n", 1)[0] + "\n"
return msg
# ---------------- backfill ----------------
def backfill_to_user(chat_id, since_iso, max_items=15):
"""Backfill - doesn't update last_sent"""
until_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")
params = {"pubStartDate": since_iso, "pubEndDate": until_iso, "resultsPerPage": 2000, "startIndex": 0}
sent = 0
subs = load_subscribers()
users = subs.get("users", {})
cid = str(chat_id)
user = users.get(cid, {})
try:
while True:
data = nvd_request(params)
vulns = data.get("vulnerabilities") or []
for v in vulns:
info = extract_cve_info(v)
if not matches_user_preferences(info, user):
continue
msg = build_cve_message(info)
if send_telegram_to_chat(chat_id, msg):
sent += 1
if sent >= max_items:
return sent
time.sleep(0.15)
total = data.get("totalResults") or 0
start = params.get("startIndex", 0)
per = params.get("resultsPerPage", 2000)
if not total or start + per >= int(total):
break
params["startIndex"] = start + per
except Exception as e:
print("Backfill error:", e)
return sent
# ---------------- main loop ----------------
def run_loop():
if not TELEGRAM_API:
print("Warning: TELEGRAM_BOT_TOKEN not set")
return
state = load_state()
try:
poll_telegram_updates_once()
except Exception as e:
print("Initial poll failed:", e)
while True:
try:
try:
poll_telegram_updates_once()
except Exception as e:
print("poll error:", e)
last_pub = state.get("last_pub") or (datetime.now(timezone.utc) - timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")
print(f"[{datetime.now(timezone.utc).isoformat()}] Querying NVD from {last_pub} to {now_iso}")
new_cves = query_new_cves(last_pub, now_iso)
subs_data = load_subscribers()
users = subs_data.get("users", {}) or {}
if not new_cves:
print(f"No new CVEs")
else:
print(f"Found {len(new_cves)} new CVE(s)")
# Check if we're in quiet hours
in_quiet_hours = is_quiet_hours()
if in_quiet_hours:
print(f"⏰ Quiet hours active - queuing messages for later")
for c in new_cves:
msg = build_cve_message(c)
poll_telegram_updates_once()
subs_data = load_subscribers()
users = subs_data.get("users", {}) or {}
if not users:
continue
for cid_str, u in list(users.items()):
try:
u_last = u.get("last_sent")
if not u_last:
users[cid_str]["last_sent"] = now_iso()
continue
send_it = True
try:
c_pub = dateparser.parse(c.get("published"))
u_dt = dateparser.parse(u_last)
if c_pub and u_dt:
send_it = c_pub > u_dt
except Exception:
send_it = True
if not send_it:
continue
if not matches_user_preferences(c, u):
continue
# During quiet hours, only send CRITICAL severity CVEs
if in_quiet_hours:
cvss_scores = c.get("cvss") or []
is_critical = any(cvss_to_severity(score) == "critical" for score in cvss_scores)
if not is_critical:
print(f"⏰ Skipping non-critical CVE {c.get('id')} during quiet hours")
# Update last_sent so we don't send it later
users[cid_str]["last_sent"] = c.get("published")
continue
ok = send_telegram_to_chat(u["chat_id"], msg)
if ok:
users[cid_str]["last_sent"] = c.get("published")
users[cid_str]["last_seen"] = datetime.now(timezone.utc).isoformat()
except Exception as e:
print("Error sending:", e)
time.sleep(0.12)
subs_data["users"] = users
save_subscribers(subs_data)
time.sleep(0.2)
state["last_pub"] = now_iso
save_state(state)
except requests.HTTPError as e:
print("HTTP error:", e)
time.sleep(60)
except Exception as e:
print("Error:", e)
time.sleep(30)
slept = 0
total_sleep = max(10, POLL_MINUTES * 60)
while slept < total_sleep:
time.sleep(TG_POLL_SECONDS)
slept += TG_POLL_SECONDS
try:
poll_telegram_updates_once()
except Exception as e:
print("poll error during sleep:", e)
if __name__ == "__main__":
run_loop()

2
requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
requests
python-dateutil