import re
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
import matplotlib.pyplot as plt
# -------------------------------------
# Log Pattern (Apache Common Log)
# -------------------------------------
log_pattern = re.compile(
r'(?P<ip>\S+) - - \[(?P<time>.*?)\] '
r'"(?P<method>\S+) (?P<url>\S+) \S+" '
r'(?P<status>\d+) (?P<size>\d+)'
)
# -------------------------------------
# Parse Log File
# -------------------------------------
def parse_log(file_path):
data = []
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
match = log_pattern.search(line)
if match:
data.append(match.groupdict())
df = pd.DataFrame(data)
df["status"] = df["status"].astype(int)
df["size"] = df["size"].astype(int)
return df
# -------------------------------------
# Feature Engineering
# -------------------------------------
def extract_features(df):
ip_counts = df.groupby("ip").size().reset_index(name="request_count")
status_4xx = df[df["status"].between(400, 499)].groupby("ip").size().reset_index(name="errors")
status_5xx = df[df["status"].between(500, 599)].groupby("ip").size().reset_index(name="server_errors")
features = ip_counts.merge(status_4xx, on="ip", how="left")
features = features.merge(status_5xx, on="ip", how="left")
features.fillna(0, inplace=True)
return features
# -------------------------------------
# Anomaly Detection
# -------------------------------------
def detect_anomalies(features):
model = IsolationForest(contamination=0.05, random_state=42)
X = features[["request_count", "errors", "server_errors"]]
features["anomaly_score"] = model.fit_predict(X)
features["is_suspicious"] = features["anomaly_score"] == -1
return features
# -------------------------------------
# Visualization
# -------------------------------------
def visualize(features):
plt.figure(figsize=(10, 5))
normal = features[features["is_suspicious"] == False]
suspicious = features[features["is_suspicious"] == True]
plt.scatter(normal["request_count"], normal["errors"], label="Normal")
plt.scatter(suspicious["request_count"], suspicious["errors"], label="Suspicious", marker="x")
plt.xlabel("Request Count")
plt.ylabel("4xx Errors")
plt.legend()
plt.title("Log Anomaly Detection")
plt.show()
# -------------------------------------
# MAIN
# -------------------------------------
if __name__ == "__main__":
path = input("Enter log file path: ").strip()
print("Parsing logs...")
df = parse_log(path)
print(f"Loaded {len(df)} log entries.")
features = extract_features(df)
print("Detecting anomalies...")
analyzed = detect_anomalies(features)
suspicious = analyzed[analyzed["is_suspicious"] == True]
print("\n Suspicious IP Addresses:\n")
print(suspicious[["ip", "request_count", "errors", "server_errors"]])
visualize(analyzed)
analyzed.to_csv("log_analysis_report.csv", index=False)
print("\n Report saved as log_analysis_report.csv")
No comments:
Post a Comment