Smart Log File Analyzer

import re

import pandas as pd

import numpy as np

from sklearn.ensemble import IsolationForest

import matplotlib.pyplot as plt


# -------------------------------------

# Log Pattern (Apache Common Log)

# -------------------------------------

log_pattern = re.compile(

    r'(?P<ip>\S+) - - \[(?P<time>.*?)\] '

    r'"(?P<method>\S+) (?P<url>\S+) \S+" '

    r'(?P<status>\d+) (?P<size>\d+)'

)


# -------------------------------------

# Parse Log File

# -------------------------------------

def parse_log(file_path):

    data = []


    with open(file_path, "r", encoding="utf-8", errors="ignore") as f:

        for line in f:

            match = log_pattern.search(line)

            if match:

                data.append(match.groupdict())


    df = pd.DataFrame(data)


    df["status"] = df["status"].astype(int)

    df["size"] = df["size"].astype(int)


    return df



# -------------------------------------

# Feature Engineering

# -------------------------------------

def extract_features(df):

    ip_counts = df.groupby("ip").size().reset_index(name="request_count")

    status_4xx = df[df["status"].between(400, 499)].groupby("ip").size().reset_index(name="errors")

    status_5xx = df[df["status"].between(500, 599)].groupby("ip").size().reset_index(name="server_errors")


    features = ip_counts.merge(status_4xx, on="ip", how="left")

    features = features.merge(status_5xx, on="ip", how="left")


    features.fillna(0, inplace=True)


    return features



# -------------------------------------

# Anomaly Detection

# -------------------------------------

def detect_anomalies(features):

    model = IsolationForest(contamination=0.05, random_state=42)


    X = features[["request_count", "errors", "server_errors"]]


    features["anomaly_score"] = model.fit_predict(X)

    features["is_suspicious"] = features["anomaly_score"] == -1


    return features



# -------------------------------------

# Visualization

# -------------------------------------

def visualize(features):

    plt.figure(figsize=(10, 5))


    normal = features[features["is_suspicious"] == False]

    suspicious = features[features["is_suspicious"] == True]


    plt.scatter(normal["request_count"], normal["errors"], label="Normal")

    plt.scatter(suspicious["request_count"], suspicious["errors"], label="Suspicious", marker="x")


    plt.xlabel("Request Count")

    plt.ylabel("4xx Errors")

    plt.legend()

    plt.title("Log Anomaly Detection")

    plt.show()



# -------------------------------------

# MAIN

# -------------------------------------

if __name__ == "__main__":

    path = input("Enter log file path: ").strip()


    print("Parsing logs...")

    df = parse_log(path)


    print(f"Loaded {len(df)} log entries.")


    features = extract_features(df)


    print("Detecting anomalies...")

    analyzed = detect_anomalies(features)


    suspicious = analyzed[analyzed["is_suspicious"] == True]


    print("\n Suspicious IP Addresses:\n")

    print(suspicious[["ip", "request_count", "errors", "server_errors"]])


    visualize(analyzed)


    analyzed.to_csv("log_analysis_report.csv", index=False)

    print("\n Report saved as log_analysis_report.csv")


No comments: