Website Uptime Monitor

import requests

import json

import os

import time

import threading

import smtplib

from datetime import datetime

from pathlib import Path

from email.mime.text import MIMEText

from email.mime.multipart import MIMEMultipart

from collections import defaultdict

 

# ============================================================

# CONFIGURATION

# ============================================================

 

SITES_FILE    = "monitored_sites.json"

LOG_FILE      = "uptime_log.json"

CONFIG_FILE   = "uptime_config.json"

CHECK_TIMEOUT = 10      # seconds before a request is considered failed

DEFAULT_INTERVAL = 60   # seconds between checks

 

# ============================================================

# LOAD & SAVE HELPERS

# ============================================================

 

def load_json(filepath, default):

    if Path(filepath).exists():

        try:

            with open(filepath, "r") as f:

                return json.load(f)

        except:

            return default

    return default

 

 

def save_json(filepath, data):

    with open(filepath, "w") as f:

        json.dump(data, f, indent=2)

 

 

# ============================================================

# CHECK A SINGLE WEBSITE

# ============================================================

 

def check_site(url):

    """

    Ping a URL and return a result dict:

    status, response_time_ms, status_code, error

    """

    start = time.time()

    try:

        response = requests.get(url, timeout=CHECK_TIMEOUT,

                                allow_redirects=True,

                                headers={"User-Agent": "UptimeMonitor/1.0"})

        elapsed   = round((time.time() - start) * 1000, 1)

        is_up     = response.status_code < 400

 

        return {

            "url":             url,

            "status":          "UP" if is_up else "DOWN",

            "status_code":     response.status_code,

            "response_time_ms": elapsed,

            "error":           None,

            "checked_at":      datetime.now().strftime("%d-%m-%Y %H:%M:%S")

        }

 

    except requests.exceptions.ConnectionError:

        return {

            "url":             url,

            "status":          "DOWN",

            "status_code":     None,

            "response_time_ms": None,

            "error":           "Connection refused",

            "checked_at":      datetime.now().strftime("%d-%m-%Y %H:%M:%S")

        }

    except requests.exceptions.Timeout:

        return {

            "url":             url,

            "status":          "DOWN",

            "status_code":     None,

            "response_time_ms": None,

            "error":           f"Timeout after {CHECK_TIMEOUT}s",

            "checked_at":      datetime.now().strftime("%d-%m-%Y %H:%M:%S")

        }

    except Exception as e:

        return {

            "url":             url,

            "status":          "DOWN",

            "status_code":     None,

            "response_time_ms": None,

            "error":           str(e),

            "checked_at":      datetime.now().strftime("%d-%m-%Y %H:%M:%S")

        }

 

 

# ============================================================

# SEND EMAIL ALERT

# ============================================================

 

def send_alert(config, subject, body):

    try:

        msg = MIMEMultipart()

        msg["From"]    = config["email"]

        msg["To"]      = config["alert_email"]

        msg["Subject"] = subject

        msg.attach(MIMEText(body, "plain"))

 

        server = smtplib.SMTP(config["smtp_host"], config["smtp_port"])

        server.starttls()

        server.login(config["email"], config["password"])

        server.sendmail(config["email"], config["alert_email"], msg.as_string())

        server.quit()

        print(f"\n   Alert sent to {config['alert_email']}")

    except Exception as e:

        print(f"\n  ⚠  Email alert failed: {e}")

 

 

# ============================================================

# UPTIME MONITOR CLASS

# ============================================================

 

class UptimeMonitor:

    def __init__(self):

        self.sites        = load_json(SITES_FILE, [])

        self.log          = load_json(LOG_FILE, {})

        self.config       = load_json(CONFIG_FILE, {})

        self.running      = False

        self.last_status  = {}   # url -> "UP" or "DOWN"

        self.downtime_start = {} # url -> datetime when it went down

 

    # ----------------------------------------------------------

    # Site Management

    # ----------------------------------------------------------

 

    def add_site(self, url, name=None, interval=DEFAULT_INTERVAL):

        # Normalize URL

        if not url.startswith("http"):

            url = "https://" + url

 

        # Check duplicates

        if any(s["url"] == url for s in self.sites):

            print(f"\n  ⚠  Already monitoring: {url}")

            return

 

        site = {

            "url":      url,

            "name":     name or url,

            "interval": interval,

            "added_at": datetime.now().strftime("%d-%m-%Y %H:%M:%S")

        }

        self.sites.append(site)

        save_json(SITES_FILE, self.sites)

        print(f"\n   Added: {site['name']} ({url})")

 

    def remove_site(self, index):

        if 0 <= index < len(self.sites):

            removed = self.sites.pop(index)

            save_json(SITES_FILE, self.sites)

            print(f"\n  🗑  Removed: {removed['name']}")

        else:

            print("\n   Invalid index.")

 

    def list_sites(self):

        if not self.sites:

            print("\n  📭 No sites being monitored.")

            return

        print("\n" + "="*60)

        print(f"  {'#':<4} {'NAME':<20} {'URL':<30} {'INTERVAL'}")

        print("  " + "-"*56)

        for i, s in enumerate(self.sites, 1):

            status_icon = "🟢" if self.last_status.get(s["url"]) == "UP" else \

                          "🔴" if self.last_status.get(s["url"]) == "DOWN" else "⚪"

            print(f"  {status_icon} {i:<3} {s['name']:<20} {s['url']:<30} every {s['interval']}s")

        print("="*60)

 

    # ----------------------------------------------------------

    # Manual Check (all sites at once)

    # ----------------------------------------------------------

 

    def check_all_now(self):

        if not self.sites:

            print("\n  📭 No sites to check.")

            return

 

        print(f"\n   Checking {len(self.sites)} site(s)...\n")

        print(f"  {'STATUS':<8} {'SITE':<25} {'CODE':<8} {'RESPONSE':<14} {'TIME'}")

        print("  " + "-"*65)

 

        for site in self.sites:

            result = check_site(site["url"])

            self._record_result(site["url"], result)

 

            icon    = " UP  " if result["status"] == "UP" else " DOWN"

            code    = str(result["status_code"]) if result["status_code"] else "—"

            rt      = f"{result['response_time_ms']}ms" if result["response_time_ms"] else "—"

            error   = f"  ⚠ {result['error']}" if result["error"] else ""

            print(f"  {icon}  {site['name']:<25} {code:<8} {rt:<14} {result['checked_at']}{error}")

 

    # ----------------------------------------------------------

    # Continuous Monitor Loop (runs in thread)

    # ----------------------------------------------------------

 

    def _monitor_loop(self):

        print("   Monitor running in background...")

        self.running = True

        counters = {s["url"]: 0 for s in self.sites}

 

        while self.running:

            for site in self.sites:

                url      = site["url"]

                interval = site.get("interval", DEFAULT_INTERVAL)

                counters[url] = counters.get(url, 0) + 1

 

                if counters[url] >= interval:

                    counters[url] = 0

                    result = check_site(url)

                    prev   = self.last_status.get(url)

 

                    self._record_result(url, result)

 

                    # Status changed → alert

                    if prev and prev != result["status"]:

                        if result["status"] == "DOWN":

                            self.downtime_start[url] = datetime.now()

                            msg = (f" DOWN: {site['name']}\n"

                                   f"URL: {url}\n"

                                   f"Error: {result['error']}\n"

                                   f"Time: {result['checked_at']}")

                            print(f"\n   ALERT: {site['name']} is DOWN!")

                            print("  > ", end="", flush=True)

                            if self.config:

                                send_alert(self.config,

                                           f" DOWN: {site['name']}",

                                           msg)

                        else:

                            downtime = ""

                            if url in self.downtime_start:

                                delta = datetime.now() - self.downtime_start[url]

                                mins  = int(delta.total_seconds() // 60)

                                secs  = int(delta.total_seconds() % 60)

                                downtime = f" (was down for {mins}m {secs}s)"

                                del self.downtime_start[url]

                            print(f"\n   RECOVERED: {site['name']}{downtime}")

                            print("  > ", end="", flush=True)

                            if self.config:

                                send_alert(self.config,

                                           f" Recovered: {site['name']}",

                                           f"{site['name']} is back UP{downtime}\nURL: {url}\nTime: {result['checked_at']}")

 

                    self.last_status[url] = result["status"]

 

            time.sleep(1)

 

    def start_monitoring(self):

        if not self.sites:

            print("\n   No sites added yet.")

            return

        t = threading.Thread(target=self._monitor_loop, daemon=True)

        t.start()

 

    def stop_monitoring(self):

        self.running = False

 

    # ----------------------------------------------------------

    # Record Result to Log

    # ----------------------------------------------------------

 

    def _record_result(self, url, result):

        if url not in self.log:

            self.log[url] = []

        self.log[url].append(result)

        # Keep last 500 entries per site

        if len(self.log[url]) > 500:

            self.log[url] = self.log[url][-500:]

        save_json(LOG_FILE, self.log)

 

    # ----------------------------------------------------------

    # Uptime Stats Report

    # ----------------------------------------------------------

 

    def show_stats(self):

        if not self.log:

            print("\n  📭 No data logged yet. Run a check first.")

            return

 

        print("\n" + "="*60)

        print("   UPTIME STATISTICS")

        print("="*60)

 

        for site in self.sites:

            url     = site["url"]

            records = self.log.get(url, [])

            if not records:

                continue

 

            total    = len(records)

            up_count = sum(1 for r in records if r["status"] == "UP")

            uptime   = (up_count / total) * 100

 

            # Avg response time

            times = [r["response_time_ms"] for r in records if r["response_time_ms"]]

            avg_rt = round(sum(times) / len(times), 1) if times else None

 

            # Last checked

            last = records[-1]

 

            status_icon = "🟢" if last["status"] == "UP" else "🔴"

 

            print(f"\n  {status_icon} {site['name']}")

            print(f"     URL        : {url}")

            print(f"     Uptime     : {uptime:.2f}%  ({up_count}/{total} checks passed)")

            print(f"     Avg Response: {avg_rt}ms" if avg_rt else "     Avg Response: —")

            print(f"     Last Check : {last['checked_at']}  [{last['status']}]")

            if last.get("error"):

                print(f"     Last Error : {last['error']}")

 

        print("="*60)

 

    # ----------------------------------------------------------

    # Setup Email Alerts

    # ----------------------------------------------------------

 

    def setup_alerts(self):

        print("\n" + "="*55)

        print("   EMAIL ALERT SETUP")

        print("="*55)

        print("\n  Providers: Gmail (smtp.gmail.com:587) | Outlook (smtp.office365.com:587)")

 

        smtp_host   = input("\n  SMTP Host: ").strip()

        smtp_port   = int(input("  SMTP Port: ").strip() or 587)

        email       = input("  Your email: ").strip()

        password    = input("  App password: ").strip()

        alert_email = input("  Send alerts to (email): ").strip()

 

        self.config = {

            "smtp_host":   smtp_host,

            "smtp_port":   smtp_port,

            "email":       email,

            "password":    password,

            "alert_email": alert_email

        }

        save_json(CONFIG_FILE, self.config)

        print("\n  ✅ Alert config saved.")

 

 

# ============================================================

# MAIN MENU

# ============================================================

 

def print_menu(monitor):

    status = " Running" if monitor.running else "⚫ Stopped"

    print("\n" + "-"*45)

    print(f"   WEBSITE UPTIME MONITOR  [{status}]")

    print("-"*45)

    print("  1. Add website to monitor")

    print("  2. Remove website")

    print("  3. List monitored sites")

    print("  4. Check all sites NOW")

    print("  5. Start continuous monitor (background)")

    print("  6. View uptime statistics")

    print("  7. Setup email alerts")

    print("  0. Exit")

    print("-"*45)

 

 

def main():

    print("\n" + "="*55)

    print("      WEBSITE UPTIME MONITOR")

    print("="*55)

    print(f"\n  Check timeout : {CHECK_TIMEOUT}s")

    print(f"  Default interval: every {DEFAULT_INTERVAL}s")

    print(f"  Log file      : {LOG_FILE}")

 

    monitor = UptimeMonitor()

 

    if monitor.sites:

        print(f"\n   Loaded {len(monitor.sites)} monitored site(s).")

    if monitor.config:

        print(f"   Alerts configured for: {monitor.config.get('alert_email','')}")

 

    while True:

        print_menu(monitor)

        choice = input("  > ").strip()

 

        if choice == "1":

            url      = input("\n  Website URL (e.g. google.com): ").strip()

            name     = input("  Display name (or Enter to use URL): ").strip() or None

            interval = input(f"  Check interval in seconds (default {DEFAULT_INTERVAL}): ").strip()

            interval = int(interval) if interval.isdigit() else DEFAULT_INTERVAL

            monitor.add_site(url, name, interval)

 

        elif choice == "2":

            monitor.list_sites()

            if monitor.sites:

                try:

                    idx = int(input("\n  Enter site number to remove: ").strip()) - 1

                    monitor.remove_site(idx)

                except ValueError:

                    print("   Invalid input.")

 

        elif choice == "3":

            monitor.list_sites()

 

        elif choice == "4":

            monitor.check_all_now()

 

        elif choice == "5":

            if monitor.running:

                print("\n   Monitor is already running.")

            else:

                monitor.start_monitoring()

                print(f"\n   Continuous monitor started!")

                print(f"   Alerts will print here. Keep this window open.")

 

        elif choice == "6":

            monitor.show_stats()

 

        elif choice == "7":

            monitor.setup_alerts()

 

        elif choice == "0":

            monitor.stop_monitoring()

            print("\n   Goodbye!\n")

            break

 

        else:

            print("   Invalid choice.")

 

 

# ============================================================

# RUN

# ============================================================

 

if __name__ == "__main__":

    main()

No comments: