reposync/scripts/ngparse

231 lines
7.5 KiB
Python
Executable File

#!/usr/bin/python3
import re, math, os, sys, datetime
path_list = ["archlinux", "/archlinuxarm", "/asahilinux",
"/cd-image", "/debian", "/debian-cd", "/fedora",
"/gnu", "/index.html", "/kali", "/kali-images",
"/linux", "/manjaro", "/raspbian", "/static",
"/ubuntu", "/ubuntu-cd", "/ubuntu-old", "/"]
def byte_human(size_bytes):
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s%s" % (s, size_name[i])
def parse_req_line(http_entry):
req = {}
http_parsed = http_entry.split()
if http_parsed:
if http_parsed[0] in ["HEAD", "POST", "GET", "OPTION"]:
req["method"] = http_parsed[0]
if http_parsed[1][0] == "/":
req["path"] = http_parsed[1]
return req
def get_path_parent(path):
path = path.split("?")[0].split("&")[0].split("/")
if len(path) == 1:
return "/" + path[0]
else:
return "/".join(path[:2])
def get_date_range(all_log):
dates = sorted([ i[2] for i in all_log ])
dates = [datetime.datetime.strptime(i, "%d/%b/%Y:%H:%M:%S %z").strftime("%m/%d %H:%M:%S") for i in dates]
print(f"Date: {dates[0]} ~ {dates[-1]}")
def parse_log_entry(entry):
pattern = r'([\d\.]+) - (\S+) \[(.*?)\] "(.*?)" (\d+) (\d+) "(.*?)" "(.*?)" "(.*?)" "(.*?)" sn="(.*?)" rt=([\d\.]+) [^\n]+'
match = re.match(pattern, entry)
if not match:
return None
lm = list(match.groups())
assert len(lm) == 12
if "HTTP" not in lm[3]:
return
log_entry = {}
log_entry["client"] = lm[0]
log_entry["user"] = lm[1]
log_entry["time"] = datetime.datetime.strptime(lm[2], "%d/%b/%Y:%H:%M:%S %z")
log_entry["req"] = lm[3]
log_entry["status"] = lm[4]
log_entry["bytes"] = lm[5]
log_entry["referer"] = lm[6]
log_entry["ua"] = lm[7]
log_entry["forward"] = lm[8]
log_entry["host"] = lm[9]
log_entry["server"] = lm[10]
log_entry["reqtime"] = lm[11]
return log_entry
def get_all_log_entry(log_file):
with open(log_file, "r") as f:
full_log = f.read().split("\n")
log_entries = [ i for i in [ parse_log_entry(log) for log in full_log[:-1] ] if i ]
return log_entries
def main(log_file, logs=[]):
all_logs = get_all_log_entry(log_file) if not logs else logs
log_by_date = {}
for entry in all_logs:
date_day = datetime.datetime.strftime(entry.get("time"), "%Y-%m-%d")
if date_day not in log_by_date:
log_by_date[date_day] = []
log_by_date[date_day].append(entry)
for day in log_by_date:
paths = {}
for entry in log_by_date[day]:
req = parse_req_line(entry.get("req"))
if path := req.get("path"):
path_parent = get_path_parent(path)
if path_parent not in path_list:
continue
if path_parent not in paths:
paths[path_parent] = [0, 0]
paths[path_parent][0] += int(entry.get("bytes"))
paths[path_parent][1] += 1
print(day)
print('-'*36)
print(f'{"Path":<14} {"Count":<10} Transfer')
print('-'*36)
for path in sorted(paths):
print(f"{path:<14} {paths[path][1]:<10} {byte_human(paths[path][0])}")
print()
def main_geo(log_file, logs = []):
import geoip2.database
reader = geoip2.database.Reader('GeoLite2-Country.mmdb')
all_logs = get_all_log_entry(log_file) if not logs else logs
c = len(all_logs)
# print(c)
geolocstat = {}
for n, log in enumerate(all_logs):
req = parse_req_line(log.get("req"))
if path := req.get("path"):
path_parent = get_path_parent(path)
try:
geoloc = reader.country(log['client']).country.iso_code
except:
geoloc = "XX"
print(f"[{int(100*n/c):>3}%] {geoloc} {log['client']:>15} {byte_human(int(log['bytes'])):>8} {path_parent}", file=sys.stderr)
if geoloc not in geolocstat:
geolocstat[geoloc] = 0
geolocstat[geoloc] += int(log['bytes'])
print(file=sys.stderr)
sortdict = lambda x : {k: v for k, v in sorted(x.items(), key=lambda item: item[1], reverse=True)}
for geo in sortdict(geolocstat):
print(geo, byte_human(geolocstat[geo]))
def main_date(log_file, logs=[]):
all_logs = get_all_log_entry(log_file) if not logs else logs
date_sorted = (sorted(all_logs, key=(lambda x: x['time'])))
st = datetime.datetime.strftime(date_sorted[0]['time'], "%Y-%m-%d %H:%M:%S")
ed = datetime.datetime.strftime(date_sorted[-1]['time'], "%Y-%m-%d %H:%M:%S")
total_bytes = sum([int(x['bytes']) for x in all_logs])
print(f"------- Log {log_file} -------\n Date: {st} ~ {ed}\n Entry count: {len(all_logs)}\n Total bytes: {byte_human(total_bytes)}")
def main_html(log_file):
all_logs = get_all_log_entry(log_file)
###
date_sorted = (sorted(all_logs, key=(lambda x: x['time'])))
st = datetime.datetime.strftime(date_sorted[0]['time'], "%Y-%m-%d %H:%M:%S")
ed = datetime.datetime.strftime(date_sorted[-1]['time'], "%Y-%m-%d %H:%M:%S")
total_bytes = sum([int(x['bytes']) for x in all_logs])
print(f"-------- Log Info --------\nDate: {st} ~ {ed}\nEntry count: {len(all_logs)}\nTotal bytes: {byte_human(total_bytes)}\n")
###
print("-------- By Path ---------")
paths = {}
for entry in all_logs:
req = parse_req_line(entry.get("req"))
if path := req.get("path"):
path_parent = get_path_parent(path)
if path_parent not in path_list:
continue
if path_parent not in paths:
paths[path_parent] = [0, 0]
paths[path_parent][0] += int(entry.get("bytes"))
paths[path_parent][1] += 1
for path in sorted(paths):
print(f"{path:<14} {paths[path][1]:<10} {byte_human(paths[path][0])}")
print()
###
print("------- By Country -------")
import geoip2.database
reader = geoip2.database.Reader('/srv/mirror/scripts/GeoLite2-Country.mmdb')
geolocstat = {}
for n, log in enumerate(all_logs):
req = parse_req_line(log.get("req"))
if path := req.get("path"):
path_parent = get_path_parent(path)
try:
geoloc = reader.country(log['client']).country.iso_code
except:
geoloc = "XX"
if geoloc not in geolocstat:
geolocstat[geoloc] = 0
geolocstat[geoloc] += int(log['bytes'])
geos = []
sortdict = lambda x : {k: v for k, v in sorted(x.items(), key=lambda item: item[1], reverse=True)}
for geo in sortdict(geolocstat):
if geo:
geos.append(f"{geo} {byte_human(geolocstat[geo])}")
print('\n'.join(geos[:min(len(geos),7)]))
print("--------------------------")
if __name__=="__main__":
if len(sys.argv) != 3:
print("Error: ngparse {stat,parse,geo} [ log file ]\n* log file must have extended format.")
exit()
if os.path.exists(sys.argv[2]):
logfile = sys.argv[2]
else:
print("Error: File doesnt exists.")
exit
if sys.argv[1] == "stat":
main(logfile)
elif sys.argv[1] == "geo":
main_geo(logfile)
elif sys.argv[1] == "date":
main_date(logfile)
elif sys.argv[1] == "html":
main_html(logfile)