Miscellaneous scripts
This repository contains miscellaneous scripts that does not fit in one repository, yet I will use them sometimes for my personal use. Note that some of the scripts might contain hardcoded paths and opinionated presets, and you are advised to inspect them before actually using.
Loading...
Searching...
No Matches
webtunnel.nginx.parselog.py
Go to the documentation of this file.
1import gzip
2import pathlib
3import geoip2.database
4import pandas as pd
5import geopandas as gpd
6import matplotlib.pyplot as plt
7
8def format_bytes(n: int) -> str:
9 """Converts a byte count to a human-readable string."""
10 for unit in ('B', 'KB', 'MB', 'GB', 'TB'):
11 if n < 1024:
12 return f"{n:.2f} {unit}"
13 n /= 1024
14 return f"{n:.2f} PB"
15
16
17def _iter_log_lines(log_dir: str, filename_glob: str):
18 """Yields lines from all matching log files in a directory, handling .gz transparently."""
19 files = sorted(pathlib.Path(log_dir).glob(filename_glob))
20 if not files:
21 print(f"Warning: No files matching '{filename_glob}' found in '{log_dir}'.")
22 for path in files:
23 try:
24 if path.suffix == '.gz':
25 with gzip.open(path, 'rt', errors='replace') as f:
26 yield from f
27 else:
28 with open(path, 'r', errors='replace') as f:
29 yield from f
30 except OSError as e:
31 print(f"Warning: Could not read '{path}': {e}")
32
33
34def extract_ips(log_dir: str, filter_path: str, filename_glob: str = 'access.log*') -> list[dict]:
35 """Parses all matching log files in log_dir and returns records with IP and bytes transferred."""
36 records = []
37 for line in _iter_log_lines(log_dir, filename_glob):
38 if filter_path in line:
39 parts = line.split()
40 if parts:
41 try:
42 bytes_sent = int(parts[9])
43 except (IndexError, ValueError):
44 bytes_sent = 0
45 records.append({'ip': parts[0], 'bytes': bytes_sent})
46 return records
47
48def get_geolocation_data(ips: list[str], db_path: str) -> pd.DataFrame:
49 """Resolves IPs to (lat, lon) using the MaxMind database."""
50 locations = []
51 try:
52 with geoip2.database.Reader(db_path) as reader:
53 for ip in ips:
54 try:
55 record = reader.city(ip)
56 # Note: Shapely/GeoPandas use (Longitude, Latitude) order
57 locations.append({
58 'ip': ip,
59 'lon': record.location.longitude,
60 'lat': record.location.latitude,
61 'country': record.country.name or 'Unknown'
62 })
63 except (geoip2.errors.AddressNotFoundError, ValueError):
64 continue
65 except FileNotFoundError:
66 print(f"Error: Database '{db_path}' not found.")
67
68 return pd.DataFrame(locations)
69
70def print_summary(records: list[dict], df: pd.DataFrame, focus_country: str) -> None:
71 """Prints summary statistics to the console."""
72 raw_ips = [r['ip'] for r in records]
73 unique_ips = set(raw_ips)
74
75 print("Summary Statistics")
76 print(f"Total connections matched: {len(records)}")
77 print(f"Unique IPs: {len(unique_ips)}")
78
79 if records:
80 records_df = pd.DataFrame(records)
81 bytes_values = records_df['bytes']
82 total_bytes = int(bytes_values.sum())
83 avg_bytes = bytes_values.mean()
84 print(f"Total bytes transferred: {format_bytes(total_bytes)}")
85 print(f"Average bytes per connection: {format_bytes(avg_bytes)}")
86 print(f"Min / Max bytes: {format_bytes(int(bytes_values.min()))} / {format_bytes(int(bytes_values.max()))}")
87
88 print("Top 5 clients by bytes transferred:")
89 top_clients = (
90 records_df.groupby('ip')['bytes']
91 .sum()
92 .sort_values(ascending=False)
93 .head(5)
94 )
95 for ip, b in top_clients.items():
96 print(f" {ip}: {format_bytes(int(b))}")
97
98 if not df.empty:
99 country_counts = (
100 df.groupby('country')['ip']
101 .nunique()
102 .sort_values(ascending=False)
103 )
104 print(f"IPs resolved to location: {country_counts.sum()}")
105 print("Top 5 countries by unique IP count:")
106 for country, count in country_counts.head(5).items():
107 print(f" {country}: {count}")
108
109 if records:
110 merged = records_df.merge(df[['ip', 'country']].drop_duplicates(), on='ip', how='inner')
111 country_bytes = (
112 merged.groupby('country')['bytes']
113 .sum()
114 .sort_values(ascending=False)
115 )
116 print("Top 5 countries by bytes transferred:")
117 for country, b in country_bytes.head(5).items():
118 print(f" {country}: {format_bytes(int(b))}")
119
120 focus_ips = merged[merged['country'] == focus_country]
121 if not focus_ips.empty:
122 focus_top = (
123 focus_ips.groupby('ip')['bytes']
124 .sum()
125 .sort_values(ascending=False)
126 .head(5)
127 )
128 print(f"Top 5 {focus_country} IPs by bytes transferred:")
129 for ip, b in focus_top.items():
130 print(f" {ip}: {format_bytes(int(b))}")
131 else:
132 print(f"No {focus_country} IPs resolved to location.")
133 else:
134 print("IPs resolved to location: 0")
135
136
137def plot_map(df: pd.DataFrame, output_file: str) -> None:
138 """Plots the coordinates on a world map."""
139 if df.empty:
140 print("No valid location data to plot.")
141 return
142
143 # Create GeoDataFrame
144 gdf = gpd.GeoDataFrame(
145 df,
146 geometry=gpd.points_from_xy(df.lon, df.lat)
147 )
148
149 # Load base map
150 world = gpd.read_file("https://naciscdn.org/naturalearth/10m/cultural/ne_10m_admin_0_countries.zip")
151
152 # Setup plot
153 fig, ax = plt.subplots(figsize=(15, 10))
154 world.plot(ax=ax, color='#e0e0e0', edgecolor='white')
155 gdf.plot(ax=ax, color='red', markersize=15, alpha=0.5)
156
157 plt.axis('off')
158 plt.savefig(output_file, bbox_inches='tight', dpi=300)
159 print(f"Map saved successfully: {output_file}")
160
161if __name__ == "__main__":
162 LOG_DIR = 'nginx'
163 DB_PATH = 'GeoLite2-City.mmdb'
164 SEARCH_PATH = '/yUNshbl1fOXngs4JRBkrJIFU'
165 OUTPUT_IMAGE = 'ip_map.png'
166 FOCUS_COUNTRY = 'Russia'
167
168 print("Parsing log files...")
169 ip_list = extract_ips(LOG_DIR, SEARCH_PATH)
170
171 if ip_list:
172 print(f"Resolving {len(ip_list)} IPs...")
173 location_data = get_geolocation_data([r['ip'] for r in ip_list], DB_PATH)
174 print_summary(ip_list, location_data, FOCUS_COUNTRY)
175 plot_map(location_data, OUTPUT_IMAGE)
176 else:
177 print("No matching connections found.")
None print_summary(list[dict] records, pd.DataFrame df, str focus_country)
pd.DataFrame get_geolocation_data(list[str] ips, str db_path)
list[dict] extract_ips(str log_dir, str filter_path, str filename_glob='access.log *')
None plot_map(pd.DataFrame df, str output_file)
str format_bytes(int n)
_iter_log_lines(str log_dir, str filename_glob)