Miscellaneous scripts
This repository contains miscellaneous scripts that does not fit in one repository, yet I will use them sometimes for my personal use. Note that some of the scripts might contain hardcoded paths and opinionated presets, and you are advised to inspect them before actually using.
Loading...
Searching...
No Matches
webtunnel Namespace Reference

Functions

 _iter_log_lines (str log_dir, str filename_glob)
list[dict] extract_ips (str log_dir, str filter_path, str filename_glob='access.log *')
str format_bytes (int n)
pd.DataFrame get_geolocation_data (list[str] ips, str db_path)
None plot_map (pd.DataFrame df, str output_file)
None print_summary (list[dict] records, pd.DataFrame df, str focus_country)

Variables

str DB_PATH = 'GeoLite2-City.mmdb'
str FOCUS_COUNTRY = 'Russia'
list[dict] ip_list = extract_ips(LOG_DIR, SEARCH_PATH)
pd.DataFrame location_data = get_geolocation_data([r['ip'] for r in ip_list], DB_PATH)
str LOG_DIR = 'nginx'
str OUTPUT_IMAGE = 'ip_map.png'
str SEARCH_PATH = '/yUNshbl1fOXngs4JRBkrJIFU'

Function Documentation

◆ _iter_log_lines()

webtunnel._iter_log_lines ( str log_dir,
str filename_glob )
protected
Yields lines from all matching log files in a directory, handling .gz transparently.

Definition at line 17 of file webtunnel.nginx.parselog.py.

17def _iter_log_lines(log_dir: str, filename_glob: str):
18 """Yields lines from all matching log files in a directory, handling .gz transparently."""
19 files = sorted(pathlib.Path(log_dir).glob(filename_glob))
20 if not files:
21 print(f"Warning: No files matching '{filename_glob}' found in '{log_dir}'.")
22 for path in files:
23 try:
24 if path.suffix == '.gz':
25 with gzip.open(path, 'rt', errors='replace') as f:
26 yield from f
27 else:
28 with open(path, 'r', errors='replace') as f:
29 yield from f
30 except OSError as e:
31 print(f"Warning: Could not read '{path}': {e}")
32
33

Referenced by extract_ips().

Here is the caller graph for this function:

◆ extract_ips()

list[dict] webtunnel.extract_ips ( str log_dir,
str filter_path,
str filename_glob = 'access.log*' )
Parses all matching log files in log_dir and returns records with IP and bytes transferred.

Definition at line 34 of file webtunnel.nginx.parselog.py.

34def extract_ips(log_dir: str, filter_path: str, filename_glob: str = 'access.log*') -> list[dict]:
35 """Parses all matching log files in log_dir and returns records with IP and bytes transferred."""
36 records = []
37 for line in _iter_log_lines(log_dir, filename_glob):
38 if filter_path in line:
39 parts = line.split()
40 if parts:
41 try:
42 bytes_sent = int(parts[9])
43 except (IndexError, ValueError):
44 bytes_sent = 0
45 records.append({'ip': parts[0], 'bytes': bytes_sent})
46 return records
47

References _iter_log_lines().

Here is the call graph for this function:

◆ format_bytes()

str webtunnel.format_bytes ( int n)
Converts a byte count to a human-readable string.

Definition at line 8 of file webtunnel.nginx.parselog.py.

8def format_bytes(n: int) -> str:
9 """Converts a byte count to a human-readable string."""
10 for unit in ('B', 'KB', 'MB', 'GB', 'TB'):
11 if n < 1024:
12 return f"{n:.2f} {unit}"
13 n /= 1024
14 return f"{n:.2f} PB"
15
16

◆ get_geolocation_data()

pd.DataFrame webtunnel.get_geolocation_data ( list[str] ips,
str db_path )
Resolves IPs to (lat, lon) using the MaxMind database.

Definition at line 48 of file webtunnel.nginx.parselog.py.

48def get_geolocation_data(ips: list[str], db_path: str) -> pd.DataFrame:
49 """Resolves IPs to (lat, lon) using the MaxMind database."""
50 locations = []
51 try:
52 with geoip2.database.Reader(db_path) as reader:
53 for ip in ips:
54 try:
55 record = reader.city(ip)
56 # Note: Shapely/GeoPandas use (Longitude, Latitude) order
57 locations.append({
58 'ip': ip,
59 'lon': record.location.longitude,
60 'lat': record.location.latitude,
61 'country': record.country.name or 'Unknown'
62 })
63 except (geoip2.errors.AddressNotFoundError, ValueError):
64 continue
65 except FileNotFoundError:
66 print(f"Error: Database '{db_path}' not found.")
67
68 return pd.DataFrame(locations)
69

◆ plot_map()

None webtunnel.plot_map ( pd.DataFrame df,
str output_file )
Plots the coordinates on a world map.

Definition at line 137 of file webtunnel.nginx.parselog.py.

137def plot_map(df: pd.DataFrame, output_file: str) -> None:
138 """Plots the coordinates on a world map."""
139 if df.empty:
140 print("No valid location data to plot.")
141 return
142
143 # Create GeoDataFrame
144 gdf = gpd.GeoDataFrame(
145 df,
146 geometry=gpd.points_from_xy(df.lon, df.lat)
147 )
148
149 # Load base map
150 world = gpd.read_file("https://naciscdn.org/naturalearth/10m/cultural/ne_10m_admin_0_countries.zip")
151
152 # Setup plot
153 fig, ax = plt.subplots(figsize=(15, 10))
154 world.plot(ax=ax, color='#e0e0e0', edgecolor='white')
155 gdf.plot(ax=ax, color='red', markersize=15, alpha=0.5)
156
157 plt.axis('off')
158 plt.savefig(output_file, bbox_inches='tight', dpi=300)
159 print(f"Map saved successfully: {output_file}")
160

◆ print_summary()

None webtunnel.print_summary ( list[dict] records,
pd.DataFrame df,
str focus_country )
Prints summary statistics to the console.

Definition at line 70 of file webtunnel.nginx.parselog.py.

70def print_summary(records: list[dict], df: pd.DataFrame, focus_country: str) -> None:
71 """Prints summary statistics to the console."""
72 raw_ips = [r['ip'] for r in records]
73 unique_ips = set(raw_ips)
74
75 print("Summary Statistics")
76 print(f"Total connections matched: {len(records)}")
77 print(f"Unique IPs: {len(unique_ips)}")
78
79 if records:
80 records_df = pd.DataFrame(records)
81 bytes_values = records_df['bytes']
82 total_bytes = int(bytes_values.sum())
83 avg_bytes = bytes_values.mean()
84 print(f"Total bytes transferred: {format_bytes(total_bytes)}")
85 print(f"Average bytes per connection: {format_bytes(avg_bytes)}")
86 print(f"Min / Max bytes: {format_bytes(int(bytes_values.min()))} / {format_bytes(int(bytes_values.max()))}")
87
88 print("Top 5 clients by bytes transferred:")
89 top_clients = (
90 records_df.groupby('ip')['bytes']
91 .sum()
92 .sort_values(ascending=False)
93 .head(5)
94 )
95 for ip, b in top_clients.items():
96 print(f" {ip}: {format_bytes(int(b))}")
97
98 if not df.empty:
99 country_counts = (
100 df.groupby('country')['ip']
101 .nunique()
102 .sort_values(ascending=False)
103 )
104 print(f"IPs resolved to location: {country_counts.sum()}")
105 print("Top 5 countries by unique IP count:")
106 for country, count in country_counts.head(5).items():
107 print(f" {country}: {count}")
108
109 if records:
110 merged = records_df.merge(df[['ip', 'country']].drop_duplicates(), on='ip', how='inner')
111 country_bytes = (
112 merged.groupby('country')['bytes']
113 .sum()
114 .sort_values(ascending=False)
115 )
116 print("Top 5 countries by bytes transferred:")
117 for country, b in country_bytes.head(5).items():
118 print(f" {country}: {format_bytes(int(b))}")
119
120 focus_ips = merged[merged['country'] == focus_country]
121 if not focus_ips.empty:
122 focus_top = (
123 focus_ips.groupby('ip')['bytes']
124 .sum()
125 .sort_values(ascending=False)
126 .head(5)
127 )
128 print(f"Top 5 {focus_country} IPs by bytes transferred:")
129 for ip, b in focus_top.items():
130 print(f" {ip}: {format_bytes(int(b))}")
131 else:
132 print(f"No {focus_country} IPs resolved to location.")
133 else:
134 print("IPs resolved to location: 0")
135
136

Variable Documentation

◆ DB_PATH

str webtunnel.DB_PATH = 'GeoLite2-City.mmdb'

Definition at line 163 of file webtunnel.nginx.parselog.py.

◆ FOCUS_COUNTRY

str webtunnel.FOCUS_COUNTRY = 'Russia'

Definition at line 166 of file webtunnel.nginx.parselog.py.

◆ ip_list

list[dict] webtunnel.ip_list = extract_ips(LOG_DIR, SEARCH_PATH)

Definition at line 169 of file webtunnel.nginx.parselog.py.

◆ location_data

pd.DataFrame webtunnel.location_data = get_geolocation_data([r['ip'] for r in ip_list], DB_PATH)

Definition at line 173 of file webtunnel.nginx.parselog.py.

◆ LOG_DIR

str webtunnel.LOG_DIR = 'nginx'

Definition at line 162 of file webtunnel.nginx.parselog.py.

◆ OUTPUT_IMAGE

str webtunnel.OUTPUT_IMAGE = 'ip_map.png'

Definition at line 165 of file webtunnel.nginx.parselog.py.

◆ SEARCH_PATH

str webtunnel.SEARCH_PATH = '/yUNshbl1fOXngs4JRBkrJIFU'

Definition at line 164 of file webtunnel.nginx.parselog.py.