import requests
import json
import os
import time
import threading
import smtplib
from datetime import datetime
from pathlib import Path
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from collections import defaultdict
# ============================================================
# CONFIGURATION
# ============================================================
SITES_FILE = "monitored_sites.json"
LOG_FILE = "uptime_log.json"
CONFIG_FILE = "uptime_config.json"
CHECK_TIMEOUT = 10 # seconds before a request is considered failed
DEFAULT_INTERVAL = 60 # seconds between checks
# ============================================================
# LOAD & SAVE HELPERS
# ============================================================
def load_json(filepath, default):
if Path(filepath).exists():
try:
with open(filepath, "r") as f:
return json.load(f)
except:
return default
return default
def save_json(filepath, data):
with open(filepath, "w") as f:
json.dump(data, f, indent=2)
# ============================================================
# CHECK A SINGLE WEBSITE
# ============================================================
def check_site(url):
"""
Ping a URL and return a result dict:
status, response_time_ms, status_code, error
"""
start = time.time()
try:
response = requests.get(url, timeout=CHECK_TIMEOUT,
allow_redirects=True,
headers={"User-Agent": "UptimeMonitor/1.0"})
elapsed = round((time.time() - start) * 1000, 1)
is_up = response.status_code < 400
return {
"url": url,
"status": "UP" if is_up else "DOWN",
"status_code": response.status_code,
"response_time_ms": elapsed,
"error": None,
"checked_at": datetime.now().strftime("%d-%m-%Y %H:%M:%S")
}
except requests.exceptions.ConnectionError:
return {
"url": url,
"status": "DOWN",
"status_code": None,
"response_time_ms": None,
"error": "Connection refused",
"checked_at": datetime.now().strftime("%d-%m-%Y %H:%M:%S")
}
except requests.exceptions.Timeout:
return {
"url": url,
"status": "DOWN",
"status_code": None,
"response_time_ms": None,
"error": f"Timeout after {CHECK_TIMEOUT}s",
"checked_at": datetime.now().strftime("%d-%m-%Y %H:%M:%S")
}
except Exception as e:
return {
"url": url,
"status": "DOWN",
"status_code": None,
"response_time_ms": None,
"error": str(e),
"checked_at": datetime.now().strftime("%d-%m-%Y %H:%M:%S")
}
# ============================================================
# SEND EMAIL ALERT
# ============================================================
def send_alert(config, subject, body):
try:
msg = MIMEMultipart()
msg["From"] = config["email"]
msg["To"] = config["alert_email"]
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
server = smtplib.SMTP(config["smtp_host"], config["smtp_port"])
server.starttls()
server.login(config["email"], config["password"])
server.sendmail(config["email"], config["alert_email"], msg.as_string())
server.quit()
print(f"\n Alert sent to {config['alert_email']}")
except Exception as e:
print(f"\n ⚠ Email alert failed: {e}")
# ============================================================
# UPTIME MONITOR CLASS
# ============================================================
class UptimeMonitor:
def __init__(self):
self.sites = load_json(SITES_FILE, [])
self.log = load_json(LOG_FILE, {})
self.config = load_json(CONFIG_FILE, {})
self.running = False
self.last_status = {} # url -> "UP" or "DOWN"
self.downtime_start = {} # url -> datetime when it went down
# ----------------------------------------------------------
# Site Management
# ----------------------------------------------------------
def add_site(self, url, name=None, interval=DEFAULT_INTERVAL):
# Normalize URL
if not url.startswith("http"):
url = "https://" + url
# Check duplicates
if any(s["url"] == url for s in self.sites):
print(f"\n ⚠ Already monitoring: {url}")
return
site = {
"url": url,
"name": name or url,
"interval": interval,
"added_at": datetime.now().strftime("%d-%m-%Y %H:%M:%S")
}
self.sites.append(site)
save_json(SITES_FILE, self.sites)
print(f"\n Added: {site['name']} ({url})")
def remove_site(self, index):
if 0 <= index < len(self.sites):
removed = self.sites.pop(index)
save_json(SITES_FILE, self.sites)
print(f"\n 🗑 Removed: {removed['name']}")
else:
print("\n Invalid index.")
def list_sites(self):
if not self.sites:
print("\n 📠No sites being monitored.")
return
print("\n" + "="*60)
print(f" {'#':<4} {'NAME':<20} {'URL':<30} {'INTERVAL'}")
print(" " + "-"*56)
for i, s in enumerate(self.sites, 1):
status_icon = "🟢" if self.last_status.get(s["url"]) == "UP" else \
"🔴" if self.last_status.get(s["url"]) == "DOWN" else "⚪"
print(f" {status_icon} {i:<3} {s['name']:<20} {s['url']:<30} every {s['interval']}s")
print("="*60)
# ----------------------------------------------------------
# Manual Check (all sites at once)
# ----------------------------------------------------------
def check_all_now(self):
if not self.sites:
print("\n 📠No sites to check.")
return
print(f"\n Checking {len(self.sites)} site(s)...\n")
print(f" {'STATUS':<8} {'SITE':<25} {'CODE':<8} {'RESPONSE':<14} {'TIME'}")
print(" " + "-"*65)
for site in self.sites:
result = check_site(site["url"])
self._record_result(site["url"], result)
icon = " UP " if result["status"] == "UP" else " DOWN"
code = str(result["status_code"]) if result["status_code"] else "—"
rt = f"{result['response_time_ms']}ms" if result["response_time_ms"] else "—"
error = f" ⚠ {result['error']}" if result["error"] else ""
print(f" {icon} {site['name']:<25} {code:<8} {rt:<14} {result['checked_at']}{error}")
# ----------------------------------------------------------
# Continuous Monitor Loop (runs in thread)
# ----------------------------------------------------------
def _monitor_loop(self):
print(" Monitor running in background...")
self.running = True
counters = {s["url"]: 0 for s in self.sites}
while self.running:
for site in self.sites:
url = site["url"]
interval = site.get("interval", DEFAULT_INTERVAL)
counters[url] = counters.get(url, 0) + 1
if counters[url] >= interval:
counters[url] = 0
result = check_site(url)
prev = self.last_status.get(url)
self._record_result(url, result)
# Status changed → alert
if prev and prev != result["status"]:
if result["status"] == "DOWN":
self.downtime_start[url] = datetime.now()
msg = (f" DOWN: {site['name']}\n"
f"URL: {url}\n"
f"Error: {result['error']}\n"
f"Time: {result['checked_at']}")
print(f"\n ALERT: {site['name']} is DOWN!")
print(" > ", end="", flush=True)
if self.config:
send_alert(self.config,
f" DOWN: {site['name']}",
msg)
else:
downtime = ""
if url in self.downtime_start:
delta = datetime.now() - self.downtime_start[url]
mins = int(delta.total_seconds() // 60)
secs = int(delta.total_seconds() % 60)
downtime = f" (was down for {mins}m {secs}s)"
del self.downtime_start[url]
print(f"\n RECOVERED: {site['name']}{downtime}")
print(" > ", end="", flush=True)
if self.config:
send_alert(self.config,
f" Recovered: {site['name']}",
f"{site['name']} is back UP{downtime}\nURL: {url}\nTime: {result['checked_at']}")
self.last_status[url] = result["status"]
time.sleep(1)
def start_monitoring(self):
if not self.sites:
print("\n No sites added yet.")
return
t = threading.Thread(target=self._monitor_loop, daemon=True)
t.start()
def stop_monitoring(self):
self.running = False
# ----------------------------------------------------------
# Record Result to Log
# ----------------------------------------------------------
def _record_result(self, url, result):
if url not in self.log:
self.log[url] = []
self.log[url].append(result)
# Keep last 500 entries per site
if len(self.log[url]) > 500:
self.log[url] = self.log[url][-500:]
save_json(LOG_FILE, self.log)
# ----------------------------------------------------------
# Uptime Stats Report
# ----------------------------------------------------------
def show_stats(self):
if not self.log:
print("\n 📠No data logged yet. Run a check first.")
return
print("\n" + "="*60)
print(" UPTIME STATISTICS")
print("="*60)
for site in self.sites:
url = site["url"]
records = self.log.get(url, [])
if not records:
continue
total = len(records)
up_count = sum(1 for r in records if r["status"] == "UP")
uptime = (up_count / total) * 100
# Avg response time
times = [r["response_time_ms"] for r in records if r["response_time_ms"]]
avg_rt = round(sum(times) / len(times), 1) if times else None
# Last checked
last = records[-1]
status_icon = "🟢" if last["status"] == "UP" else "🔴"
print(f"\n {status_icon} {site['name']}")
print(f" URL : {url}")
print(f" Uptime : {uptime:.2f}% ({up_count}/{total} checks passed)")
print(f" Avg Response: {avg_rt}ms" if avg_rt else " Avg Response: —")
print(f" Last Check : {last['checked_at']} [{last['status']}]")
if last.get("error"):
print(f" Last Error : {last['error']}")
print("="*60)
# ----------------------------------------------------------
# Setup Email Alerts
# ----------------------------------------------------------
def setup_alerts(self):
print("\n" + "="*55)
print(" EMAIL ALERT SETUP")
print("="*55)
print("\n Providers: Gmail (smtp.gmail.com:587) | Outlook (smtp.office365.com:587)")
smtp_host = input("\n SMTP Host: ").strip()
smtp_port = int(input(" SMTP Port: ").strip() or 587)
email = input(" Your email: ").strip()
password = input(" App password: ").strip()
alert_email = input(" Send alerts to (email): ").strip()
self.config = {
"smtp_host": smtp_host,
"smtp_port": smtp_port,
"email": email,
"password": password,
"alert_email": alert_email
}
save_json(CONFIG_FILE, self.config)
print("\n ✅ Alert config saved.")
# ============================================================
# MAIN MENU
# ============================================================
def print_menu(monitor):
status = " Running" if monitor.running else "⚫ Stopped"
print("\n" + "-"*45)
print(f" WEBSITE UPTIME MONITOR [{status}]")
print("-"*45)
print(" 1. Add website to monitor")
print(" 2. Remove website")
print(" 3. List monitored sites")
print(" 4. Check all sites NOW")
print(" 5. Start continuous monitor (background)")
print(" 6. View uptime statistics")
print(" 7. Setup email alerts")
print(" 0. Exit")
print("-"*45)
def main():
print("\n" + "="*55)
print(" WEBSITE UPTIME MONITOR")
print("="*55)
print(f"\n Check timeout : {CHECK_TIMEOUT}s")
print(f" Default interval: every {DEFAULT_INTERVAL}s")
print(f" Log file : {LOG_FILE}")
monitor = UptimeMonitor()
if monitor.sites:
print(f"\n Loaded {len(monitor.sites)} monitored site(s).")
if monitor.config:
print(f" Alerts configured for: {monitor.config.get('alert_email','')}")
while True:
print_menu(monitor)
choice = input(" > ").strip()
if choice == "1":
url = input("\n Website URL (e.g. google.com): ").strip()
name = input(" Display name (or Enter to use URL): ").strip() or None
interval = input(f" Check interval in seconds (default {DEFAULT_INTERVAL}): ").strip()
interval = int(interval) if interval.isdigit() else DEFAULT_INTERVAL
monitor.add_site(url, name, interval)
elif choice == "2":
monitor.list_sites()
if monitor.sites:
try:
idx = int(input("\n Enter site number to remove: ").strip()) - 1
monitor.remove_site(idx)
except ValueError:
print(" Invalid input.")
elif choice == "3":
monitor.list_sites()
elif choice == "4":
monitor.check_all_now()
elif choice == "5":
if monitor.running:
print("\n Monitor is already running.")
else:
monitor.start_monitoring()
print(f"\n Continuous monitor started!")
print(f" Alerts will print here. Keep this window open.")
elif choice == "6":
monitor.show_stats()
elif choice == "7":
monitor.setup_alerts()
elif choice == "0":
monitor.stop_monitoring()
print("\n Goodbye!\n")
break
else:
print(" Invalid choice.")
# ============================================================
# RUN
# ============================================================
if __name__ == "__main__":
main()
No comments:
Post a Comment