34def extract_ips(log_dir: str, filter_path: str, filename_glob: str =
'access.log*') -> list[dict]:
35 """Parses all matching log files in log_dir and returns records with IP and bytes transferred."""
38 if filter_path
in line:
42 bytes_sent = int(parts[9])
43 except (IndexError, ValueError):
45 records.append({
'ip': parts[0],
'bytes': bytes_sent})
49 """Resolves IPs to (lat, lon) using the MaxMind database."""
52 with geoip2.database.Reader(db_path)
as reader:
55 record = reader.city(ip)
59 'lon': record.location.longitude,
60 'lat': record.location.latitude,
61 'country': record.country.name
or 'Unknown'
63 except (geoip2.errors.AddressNotFoundError, ValueError):
65 except FileNotFoundError:
66 print(f
"Error: Database '{db_path}' not found.")
68 return pd.DataFrame(locations)
70def print_summary(records: list[dict], df: pd.DataFrame, focus_country: str) ->
None:
71 """Prints summary statistics to the console."""
72 raw_ips = [r[
'ip']
for r
in records]
73 unique_ips = set(raw_ips)
75 print(
"Summary Statistics")
76 print(f
"Total connections matched: {len(records)}")
77 print(f
"Unique IPs: {len(unique_ips)}")
80 records_df = pd.DataFrame(records)
81 bytes_values = records_df[
'bytes']
82 total_bytes = int(bytes_values.sum())
83 avg_bytes = bytes_values.mean()
84 print(f
"Total bytes transferred: {format_bytes(total_bytes)}")
85 print(f
"Average bytes per connection: {format_bytes(avg_bytes)}")
86 print(f
"Min / Max bytes: {format_bytes(int(bytes_values.min()))} / {format_bytes(int(bytes_values.max()))}")
88 print(
"Top 5 clients by bytes transferred:")
90 records_df.groupby(
'ip')[
'bytes']
92 .sort_values(ascending=
False)
95 for ip, b
in top_clients.items():
96 print(f
" {ip}: {format_bytes(int(b))}")
100 df.groupby(
'country')[
'ip']
102 .sort_values(ascending=
False)
104 print(f
"IPs resolved to location: {country_counts.sum()}")
105 print(
"Top 5 countries by unique IP count:")
106 for country, count
in country_counts.head(5).items():
107 print(f
" {country}: {count}")
110 merged = records_df.merge(df[[
'ip',
'country']].drop_duplicates(), on=
'ip', how=
'inner')
112 merged.groupby(
'country')[
'bytes']
114 .sort_values(ascending=
False)
116 print(
"Top 5 countries by bytes transferred:")
117 for country, b
in country_bytes.head(5).items():
118 print(f
" {country}: {format_bytes(int(b))}")
120 focus_ips = merged[merged[
'country'] == focus_country]
121 if not focus_ips.empty:
123 focus_ips.groupby(
'ip')[
'bytes']
125 .sort_values(ascending=
False)
128 print(f
"Top 5 {focus_country} IPs by bytes transferred:")
129 for ip, b
in focus_top.items():
130 print(f
" {ip}: {format_bytes(int(b))}")
132 print(f
"No {focus_country} IPs resolved to location.")
134 print(
"IPs resolved to location: 0")
137def plot_map(df: pd.DataFrame, output_file: str) ->
None:
138 """Plots the coordinates on a world map."""
140 print(
"No valid location data to plot.")
144 gdf = gpd.GeoDataFrame(
146 geometry=gpd.points_from_xy(df.lon, df.lat)
150 world = gpd.read_file(
"https://naciscdn.org/naturalearth/10m/cultural/ne_10m_admin_0_countries.zip")
153 fig, ax = plt.subplots(figsize=(15, 10))
154 world.plot(ax=ax, color=
'#e0e0e0', edgecolor=
'white')
155 gdf.plot(ax=ax, color=
'red', markersize=15, alpha=0.5)
158 plt.savefig(output_file, bbox_inches=
'tight', dpi=300)
159 print(f
"Map saved successfully: {output_file}")