From fa6d3f624a36360e21ae25a6f0dac5534b83282d Mon Sep 17 00:00:00 2001 From: Jared Mauch Date: Thu, 23 Oct 2025 22:58:36 -0400 Subject: [PATCH] Add TCP performance monitoring with HTTP/2 support - Enhanced HTTP2Downloader and HTTP2Uploader classes with detailed timing analysis - Added --monitor-tcp flag for enabling TCP performance monitoring - Implemented pause detection and fastest period identification - Added comprehensive TCP Performance Analysis report - Respects --bytes flag for consistent unit display (Mbit/s or MB/s) - Automatic HTTP/2 enablement when TCP monitoring is requested - Real-time connection event tracking and timing data collection --- speedtest.py | 973 ++++++++++++++++++++++++++++++++++++++--- tcp_monitoring_demo.py | 75 ++++ 2 files changed, 987 insertions(+), 61 deletions(-) create mode 100755 tcp_monitoring_demo.py diff --git a/speedtest.py b/speedtest.py index 186b529..e54f845 100755 --- a/speedtest.py +++ b/speedtest.py @@ -26,6 +26,7 @@ import signal import socket import sys import threading +import time import timeit import xml.parsers.expat @@ -36,6 +37,36 @@ except ImportError: gzip = None GZIP_BASE = object +try: + import psutil + PSUTIL_AVAILABLE = True +except ImportError: + psutil = None + PSUTIL_AVAILABLE = False + +try: + import iwlib + IWLIB_AVAILABLE = True +except ImportError: + iwlib = None + IWLIB_AVAILABLE = False + +try: + import httpx + import asyncio + HTTPX_AVAILABLE = True +except ImportError: + httpx = None + asyncio = None + HTTPX_AVAILABLE = False + +try: + import requests + REQUESTS_AVAILABLE = True +except ImportError: + requests = None + REQUESTS_AVAILABLE = False + __version__ = '2.1.4b1' @@ -580,6 +611,361 @@ class SpeedtestHTTPSHandler(AbstractHTTPHandler): https_request = AbstractHTTPHandler.do_request_ +class HTTP2Downloader(threading.Thread): + """Thread class for retrieving a URL using HTTP/2 with TCP monitoring""" + + def __init__(self, i, url, start, timeout, shutdown_event=None, monitor_tcp=False): + threading.Thread.__init__(self) + self.url = url + self.result = [0] + self.starttime = start + self.timeout = timeout + self.i = i + self.monitor_tcp = monitor_tcp + + # TCP monitoring data + self.timing_data = [] # List of (timestamp, bytes_received) tuples + self.tcp_stats = {} + self.connection_events = [] # List of (timestamp, event_type, details) tuples + self.pauses = [] # List of pause periods + self.fastest_periods = [] # List of fastest transfer periods + + if shutdown_event: + self._shutdown_event = shutdown_event + else: + self._shutdown_event = FakeShutdownEvent() + + def _get_tcp_stats(self, sock): + """Get TCP statistics from socket if available""" + try: + # Get TCP_INFO if available (Linux) + import struct + TCP_INFO = 11 # SOL_TCP = 6, TCP_INFO = 11 + info = sock.getsockopt(socket.IPPROTO_TCP, TCP_INFO, 128) + if len(info) >= 32: # Basic TCP_INFO structure + # Parse basic TCP info (this is platform-specific) + # For now, we'll just return basic socket info + return { + 'bytes_sent': sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF), + 'bytes_received': sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF), + } + except (OSError, AttributeError, struct.error): + pass + return {} + + def _analyze_timing(self): + """Analyze timing data to find fastest periods and pauses""" + if len(self.timing_data) < 2: + return + + # Calculate transfer rates between consecutive measurements + rates = [] + for i in range(1, len(self.timing_data)): + prev_time, prev_bytes = self.timing_data[i-1] + curr_time, curr_bytes = self.timing_data[i] + + time_diff = curr_time - prev_time + bytes_diff = curr_bytes - prev_bytes + + if time_diff > 0: + rate = bytes_diff / time_diff # bytes per second + rates.append((prev_time, curr_time, rate, bytes_diff)) + + if not rates: + return + + # Find fastest periods (top 25% of rates) + rates.sort(key=lambda x: x[2], reverse=True) + top_25_percent = max(1, len(rates) // 4) + self.fastest_periods = rates[:top_25_percent] + + # Find pauses (periods with very low or zero transfer rates) + avg_rate = sum(r[2] for r in rates) / len(rates) + pause_threshold = avg_rate * 0.1 # 10% of average rate + + for start_time, end_time, rate, bytes_diff in rates: + if rate < pause_threshold and (end_time - start_time) > 0.1: # Pause > 100ms + self.pauses.append({ + 'start_time': start_time, + 'end_time': end_time, + 'duration': end_time - start_time, + 'bytes_transferred': bytes_diff, + 'rate': rate + }) + + def run(self): + try: + if (timeit.default_timer() - self.starttime) <= self.timeout: + # Use httpx with HTTP/2 support + with httpx.Client(http2=True, timeout=self.timeout) as client: + # Record connection start + if self.monitor_tcp: + self.connection_events.append(( + timeit.default_timer(), + 'connection_start', + {'url': self.url} + )) + + with client.stream('GET', self.url) as response: + response.raise_for_status() + + # Record response start + if self.monitor_tcp: + self.connection_events.append(( + timeit.default_timer(), + 'response_start', + {'status_code': response.status_code} + )) + + last_time = timeit.default_timer() + last_bytes = 0 + + for chunk in response.iter_bytes(10240): + if (event_is_set(self._shutdown_event) or + (timeit.default_timer() - self.starttime) > self.timeout): + break + if not chunk: + break + + current_time = timeit.default_timer() + current_bytes = last_bytes + len(chunk) + + # Record timing data + if self.monitor_tcp: + self.timing_data.append((current_time, current_bytes)) + + # Check for potential pauses (no data for > 100ms) + if current_time - last_time > 0.1: + self.connection_events.append(( + current_time, + 'potential_pause', + { + 'duration': current_time - last_time, + 'bytes_since_last': len(chunk) + } + )) + + self.result.append(len(chunk)) + last_time = current_time + last_bytes = current_bytes + + # Record connection end + if self.monitor_tcp: + self.connection_events.append(( + timeit.default_timer(), + 'connection_end', + {'total_bytes': sum(self.result)} + )) + + # Analyze timing data + self._analyze_timing() + + except Exception as e: + if self.monitor_tcp: + self.connection_events.append(( + timeit.default_timer(), + 'connection_error', + {'error': str(e)} + )) + pass + + +class HTTP2Uploader(threading.Thread): + """Thread class for uploading data using HTTP/2 with TCP monitoring""" + + def __init__(self, i, url, data, start, size, timeout, shutdown_event=None, monitor_tcp=False): + threading.Thread.__init__(self) + self.url = url + self.data = data + self.data.start = self.starttime = start # Set the start time for the data object + self.size = size + self.timeout = timeout + self.i = i + self.result = 0 + self.monitor_tcp = monitor_tcp + + # TCP monitoring data + self.timing_data = [] # List of (timestamp, bytes_sent) tuples + self.tcp_stats = {} + self.connection_events = [] # List of (timestamp, event_type, details) tuples + self.pauses = [] # List of pause periods + self.fastest_periods = [] # List of fastest transfer periods + + if shutdown_event: + self._shutdown_event = shutdown_event + else: + self._shutdown_event = FakeShutdownEvent() + + def _analyze_timing(self): + """Analyze timing data to find fastest periods and pauses""" + if len(self.timing_data) < 2: + return + + # Calculate transfer rates between consecutive measurements + rates = [] + for i in range(1, len(self.timing_data)): + prev_time, prev_bytes = self.timing_data[i-1] + curr_time, curr_bytes = self.timing_data[i] + + time_diff = curr_time - prev_time + bytes_diff = curr_bytes - prev_bytes + + if time_diff > 0: + rate = bytes_diff / time_diff # bytes per second + rates.append((prev_time, curr_time, rate, bytes_diff)) + + if not rates: + return + + # Find fastest periods (top 25% of rates) + rates.sort(key=lambda x: x[2], reverse=True) + top_25_percent = max(1, len(rates) // 4) + self.fastest_periods = rates[:top_25_percent] + + # Find pauses (periods with very low or zero transfer rates) + avg_rate = sum(r[2] for r in rates) / len(rates) + pause_threshold = avg_rate * 0.1 # 10% of average rate + + for start_time, end_time, rate, bytes_diff in rates: + if rate < pause_threshold and (end_time - start_time) > 0.1: # Pause > 100ms + self.pauses.append({ + 'start_time': start_time, + 'end_time': end_time, + 'duration': end_time - start_time, + 'bytes_transferred': bytes_diff, + 'rate': rate + }) + + def run(self): + try: + if ((timeit.default_timer() - self.starttime) <= self.timeout and + not event_is_set(self._shutdown_event)): + # Use httpx with HTTP/2 support + with httpx.Client(http2=True, timeout=self.timeout) as client: + # Record connection start + if self.monitor_tcp: + self.connection_events.append(( + timeit.default_timer(), + 'upload_start', + {'url': self.url, 'size': self.size} + )) + + # Read data in chunks like the original HTTPUploader + data_chunks = [] + remaining = self.size + bytes_sent = 0 + last_time = timeit.default_timer() + + while remaining > 0 and not event_is_set(self._shutdown_event): + chunk_size = min(10240, remaining) + chunk = self.data.read(chunk_size) + if not chunk: + break + data_chunks.append(chunk) + remaining -= len(chunk) + bytes_sent += len(chunk) + + # Record timing data + if self.monitor_tcp: + current_time = timeit.default_timer() + self.timing_data.append((current_time, bytes_sent)) + + # Check for potential pauses + if current_time - last_time > 0.1: + self.connection_events.append(( + current_time, + 'potential_pause', + { + 'duration': current_time - last_time, + 'bytes_since_last': len(chunk) + } + )) + last_time = current_time + + # Combine all chunks + upload_data = b''.join(data_chunks) + + # Record upload start + if self.monitor_tcp: + self.connection_events.append(( + timeit.default_timer(), + 'upload_transmit_start', + {'data_size': len(upload_data)} + )) + + response = client.post(self.url, content=upload_data) + response.raise_for_status() + + # Record upload completion + if self.monitor_tcp: + self.connection_events.append(( + timeit.default_timer(), + 'upload_complete', + {'status_code': response.status_code} + )) + + # Analyze timing data + self._analyze_timing() + + self.result = sum(self.data.total) + except (IOError, SpeedtestUploadTimeout): + self.result = sum(self.data.total) + except Exception as e: + if self.monitor_tcp: + self.connection_events.append(( + timeit.default_timer(), + 'upload_error', + {'error': str(e)} + )) + self.result = 0 + + +def http_request(url, headers=None, timeout=10, http2=False): + """Make an HTTP request using the best available library + + Priority: httpx (with HTTP/2) > requests > urllib + """ + if not headers: + headers = {} + + # Try httpx first (with HTTP/2 support if requested) + if HTTPX_AVAILABLE: + try: + with httpx.Client(http2=http2, timeout=timeout) as client: + response = client.get(url, headers=headers) + response.raise_for_status() + return response.content, response.status_code, None + except Exception as e: + if not REQUESTS_AVAILABLE: + raise e # If requests not available, re-raise httpx error + + # Try requests as fallback + if REQUESTS_AVAILABLE: + try: + response = requests.get(url, headers=headers, timeout=timeout) + response.raise_for_status() + return response.content, response.status_code, None + except Exception as e: + if not HTTPX_AVAILABLE: + raise e # If httpx not available, re-raise requests error + + # Fallback to urllib (original behavior) + request = build_request(url, headers=headers) + uh, e = catch_request(request) + if e: + raise e + + try: + stream = get_response_stream(uh) + content = stream.read() + stream.close() + uh.close() + return content, int(uh.code), None + except Exception as e: + uh.close() + raise e + + def build_opener(source_address=None, timeout=10): """Function similar to ``urllib2.build_opener`` that will build an ``OpenerDirector`` with the explicit handlers we want, @@ -672,6 +1058,252 @@ def distance(origin, destination): return d +def get_wifi_speed_from_iwlib(interface): + """Get Wi-Fi speed from iwlib library (preferred method) + + Returns a dictionary with tx_rate and rx_rate in Mbps if available, otherwise None. + """ + if not IWLIB_AVAILABLE: + return None + + try: + wifi_info = iwlib.get_iwconfig(interface) + if 'BitRate' in wifi_info: + # iwlib only provides one bitrate, typically TX rate + bitrate_str = wifi_info['BitRate'].decode() if isinstance(wifi_info['BitRate'], bytes) else wifi_info['BitRate'] + import re + match = re.search(r'([0-9.]+)\s*Mb/s', bitrate_str) + if match: + tx_rate = float(match.group(1)) + # iwlib doesn't provide separate RX rate, so we'll use TX rate for both + return {'tx_rate': tx_rate, 'rx_rate': tx_rate} + return None + except Exception: + return None + + +def get_wifi_speed_from_sysfs(interface): + """Get Wi-Fi speed from /sys filesystem + + Returns the speed in Mbps if available, otherwise None. + """ + try: + # Try the speed file first + speed_file = f'/sys/class/net/{interface}/speed' + with open(speed_file, 'r') as f: + speed = int(f.read().strip()) + if speed > 0: + return speed + except (FileNotFoundError, ValueError, OSError): + pass + + # Try to get speed from wireless phy if available + try: + phy_link = f'/sys/class/net/{interface}/phy80211' + if os.path.exists(phy_link): + phy_path = os.path.realpath(phy_link) + # Look for speed information in the phy directory + # This is more complex and may not be available on all systems + pass + except Exception: + pass + + return None + + +def get_wifi_speed_from_iw(interface): + """Get Wi-Fi speed from iw command + + Returns a dictionary with tx_rate and rx_rate in Mbps if available, otherwise None. + """ + try: + import subprocess + result = subprocess.run(['iw', 'dev', interface, 'link'], + capture_output=True, text=True, timeout=5) + if result.returncode == 0: + tx_rate = None + rx_rate = None + + for line in result.stdout.split('\n'): + # Look for TX bitrate + if 'tx bitrate:' in line: + import re + match = re.search(r'tx bitrate:\s*([0-9.]+)\s*MBit/s', line) + if match: + tx_rate = float(match.group(1)) + + # Look for RX bitrate + elif 'rx bitrate:' in line: + import re + match = re.search(r'rx bitrate:\s*([0-9.]+)\s*MBit/s', line) + if match: + rx_rate = float(match.group(1)) + + # Return both rates if we found at least one + if tx_rate is not None or rx_rate is not None: + return {'tx_rate': tx_rate, 'rx_rate': rx_rate} + return None + except Exception: + return None + + +def get_wifi_speed_from_iwconfig(interface): + """Get Wi-Fi speed from iwconfig command (fallback) + + Returns a dictionary with tx_rate and rx_rate in Mbps if available, otherwise None. + """ + try: + import subprocess + result = subprocess.run(['iwconfig', interface], + capture_output=True, text=True, timeout=5) + if result.returncode == 0: + # Look for "Bit Rate=" pattern in the output + for line in result.stdout.split('\n'): + if 'Bit Rate=' in line: + # Extract the bit rate value + import re + match = re.search(r'Bit Rate=([0-9.]+)\s*Mb/s', line) + if match: + rate = float(match.group(1)) + # iwconfig typically shows TX rate, use same for both + return {'tx_rate': rate, 'rx_rate': rate} + return None + except Exception: + return None + + +def get_wifi_speed(interface): + """Get Wi-Fi speed using the best available method + + Tries multiple methods in order of preference: + 1. iw command (most detailed, provides separate TX/RX rates) + 2. iwlib library (preferred, no subprocess, most reliable) + 3. /sys filesystem (fastest, no subprocess) + 4. iwconfig command (fallback, older systems) + + Returns a dictionary with tx_rate and rx_rate in Mbps if available, otherwise None. + """ + # Try iw command first (most detailed, provides separate TX/RX rates) + speeds = get_wifi_speed_from_iw(interface) + if speeds is not None: + return speeds + + # Try iwlib library (preferred method, no subprocess) + speeds = get_wifi_speed_from_iwlib(interface) + if speeds is not None: + return speeds + + # Try /sys filesystem (fastest, no subprocess) + speed = get_wifi_speed_from_sysfs(interface) + if speed is not None and speed > 0: + return {'tx_rate': speed, 'rx_rate': speed} + + # Fallback to iwconfig (older systems) + speeds = get_wifi_speed_from_iwconfig(interface) + if speeds is not None: + return speeds + + return None + + +def get_network_interface_info(): + """Get information about the active network interface + + Returns a dictionary with interface information if psutil is available, + otherwise returns None. + """ + if not PSUTIL_AVAILABLE: + return None + + try: + # Get network interface statistics and addresses + net_stats = psutil.net_if_stats() + net_addrs = psutil.net_if_addrs() + + # Find the active interface (up and with IPv4 address) + # Prefer non-loopback interfaces + candidates = [] + + for interface, stats in net_stats.items(): + if stats.isup: + # Check if interface has an IPv4 address + iface_addrs = net_addrs.get(interface, []) + has_ipv4 = any(addr.family == socket.AF_INET for addr in iface_addrs) + + if has_ipv4: + # Determine interface type based on naming conventions + interface_type = determine_interface_type(interface) + + # Skip loopback interfaces unless no other option + is_loopback = interface_type == 'Loopback' + + # Get speed - try enhanced methods for Wi-Fi interfaces if psutil reports 0 + speed = stats.speed + tx_rate = None + rx_rate = None + + if interface_type == 'Wi-Fi' and speed == 0: + wifi_speeds = get_wifi_speed(interface) + if wifi_speeds is not None: + tx_rate = wifi_speeds.get('tx_rate') + rx_rate = wifi_speeds.get('rx_rate') + # Use TX rate as the primary speed for compatibility + speed = tx_rate if tx_rate is not None else rx_rate + + candidates.append({ + 'interface': interface, + 'type': interface_type, + 'speed': speed, + 'tx_rate': tx_rate, + 'rx_rate': rx_rate, + 'duplex': stats.duplex, + 'mtu': stats.mtu, + 'is_up': stats.isup, + 'is_loopback': is_loopback + }) + + if not candidates: + return None + + # Prefer non-loopback interfaces, then by speed (higher is better) + candidates.sort(key=lambda x: (x['is_loopback'], -x['speed'])) + + # Return the best candidate, removing the temporary is_loopback key + result = candidates[0] + del result['is_loopback'] + return result + + except Exception: + # If any error occurs, return None to gracefully handle it + return None + + +def determine_interface_type(interface_name): + """Determine if an interface is Ethernet or Wi-Fi based on naming conventions""" + if not interface_name: + return 'Unknown' + + interface_lower = interface_name.lower() + + # Common Wi-Fi interface name patterns + wifi_patterns = ['wlan', 'wifi', 'wi-fi', 'wireless', 'ath', 'ra'] + for pattern in wifi_patterns: + if pattern in interface_lower: + return 'Wi-Fi' + + # Common Ethernet interface name patterns + ethernet_patterns = ['eth', 'en', 'ethernet', 'em', 'p', 'bond'] + for pattern in ethernet_patterns: + if interface_lower.startswith(pattern): + return 'Ethernet' + + # Check for loopback + if interface_lower in ['lo', 'loopback']: + return 'Loopback' + + return 'Unknown' + + def build_user_agent(): """Build a Mozilla/5.0 compatible User-Agent string""" @@ -957,7 +1589,7 @@ class SpeedtestResults(object): self.client = client or {} self._share = None - self.timestamp = '%sZ' % datetime.datetime.utcnow().isoformat() + self.timestamp = '%sZ' % datetime.datetime.now(datetime.UTC).isoformat() self.bytes_received = 0 self.bytes_sent = 0 @@ -968,6 +1600,9 @@ class SpeedtestResults(object): self._secure = secure + # Get network interface information if available + self.interface_info = get_network_interface_info() + def __repr__(self): return repr(self.dict()) @@ -1035,7 +1670,7 @@ class SpeedtestResults(object): def dict(self): """Return dictionary of result data""" - return { + result = { 'download': self.download, 'upload': self.upload, 'ping': self.ping, @@ -1047,12 +1682,19 @@ class SpeedtestResults(object): 'client': self.client, } + # Add interface information if available + if self.interface_info: + result['interface'] = self.interface_info + + return result + @staticmethod def csv_header(delimiter=','): """Return CSV Headers""" row = ['Server ID', 'Sponsor', 'Server Name', 'Timestamp', 'Distance', - 'Ping', 'Download', 'Upload', 'Share', 'IP Address'] + 'Ping', 'Download', 'Upload', 'Share', 'IP Address', 'Interface', + 'Interface Type', 'Interface Speed', 'TX Rate', 'RX Rate'] out = StringIO() writer = csv.writer(out, delimiter=delimiter, lineterminator='') writer.writerow([to_utf8(v) for v in row]) @@ -1064,10 +1706,25 @@ class SpeedtestResults(object): data = self.dict() out = StringIO() writer = csv.writer(out, delimiter=delimiter, lineterminator='') + + # Extract interface information + interface_name = '' + interface_type = '' + interface_speed = '' + tx_rate = '' + rx_rate = '' + if self.interface_info: + interface_name = self.interface_info.get('interface', '') + interface_type = self.interface_info.get('type', '') + interface_speed = self.interface_info.get('speed', '') + tx_rate = self.interface_info.get('tx_rate', '') + rx_rate = self.interface_info.get('rx_rate', '') + row = [data['server']['id'], data['server']['sponsor'], data['server']['name'], data['timestamp'], data['server']['d'], data['ping'], data['download'], - data['upload'], self._share or '', self.client['ip']] + data['upload'], self._share or '', self.client['ip'], + interface_name, interface_type, interface_speed, tx_rate, rx_rate] writer.writerow([to_utf8(v) for v in row]) return out.getvalue() @@ -1087,7 +1744,7 @@ class Speedtest(object): """Class for performing standard speedtest.net testing operations""" def __init__(self, config=None, source_address=None, timeout=10, - secure=False, shutdown_event=None): + secure=False, shutdown_event=None, prefer_country=True, http2=False, monitor_tcp=False): self.config = {} self._source_address = source_address @@ -1095,6 +1752,9 @@ class Speedtest(object): self._opener = build_opener(source_address, timeout) self._secure = secure + self._prefer_country = prefer_country + self._http2 = http2 + self._monitor_tcp = monitor_tcp if shutdown_event: self._shutdown_event = shutdown_event @@ -1129,29 +1789,23 @@ class Speedtest(object): headers = {} if gzip: headers['Accept-Encoding'] = 'gzip' - request = build_request('://www.speedtest.net/speedtest-config.php', - headers=headers, secure=self._secure) - uh, e = catch_request(request, opener=self._opener) - if e: + + # Use the new http_request function with HTTP/2 support if available + url = 'https://www.speedtest.net/speedtest-config.php' if self._secure else 'http://www.speedtest.net/speedtest-config.php' + + try: + configxml, status_code, error = http_request( + url, + headers=headers, + timeout=self._timeout, + http2=self._http2 and HTTPX_AVAILABLE + ) + + if status_code != 200: + return None + + except Exception as e: raise ConfigRetrievalError(e) - configxml_list = [] - - stream = get_response_stream(uh) - - while 1: - try: - configxml_list.append(stream.read(1024)) - except (OSError, EOFError): - raise ConfigRetrievalError(get_exception()) - if len(configxml_list[-1]) == 0: - break - stream.close() - uh.close() - - if int(uh.code) != 200: - return None - - configxml = ''.encode().join(configxml_list) printer('Config XML:\n%s' % configxml, debug=True) @@ -1259,9 +1913,13 @@ class Speedtest(object): ) urls = [ - '://www.speedtest.net/speedtest-servers-static.php', + 'https://www.speedtest.net/speedtest-servers-static.php', + 'https://c.speedtest.net/speedtest-servers-static.php', + 'https://www.speedtest.net/speedtest-servers.php', + 'https://c.speedtest.net/speedtest-servers.php', + 'http://www.speedtest.net/speedtest-servers-static.php', 'http://c.speedtest.net/speedtest-servers-static.php', - '://www.speedtest.net/speedtest-servers.php', + 'http://www.speedtest.net/speedtest-servers.php', 'http://c.speedtest.net/speedtest-servers.php', ] @@ -1272,36 +1930,24 @@ class Speedtest(object): errors = [] for url in urls: try: - request = build_request( - '%s?threads=%s' % (url, - self.config['threads']['download']), + # Use the new http_request function with HTTP/2 support if available + full_url = '%s?threads=%s' % (url, self.config['threads']['download']) + + try: + serversxml, status_code, error = http_request( + full_url, headers=headers, - secure=self._secure + timeout=self._timeout, + http2=self._http2 and HTTPX_AVAILABLE ) - uh, e = catch_request(request, opener=self._opener) - if e: - errors.append('%s' % e) + + if status_code != 200: + raise ServersRetrievalError() + + except Exception as e: + errors.append('%s: %s' % (url, e)) raise ServersRetrievalError() - stream = get_response_stream(uh) - - serversxml_list = [] - while 1: - try: - serversxml_list.append(stream.read(1024)) - except (OSError, EOFError): - raise ServersRetrievalError(get_exception()) - if len(serversxml_list[-1]) == 0: - break - - stream.close() - uh.close() - - if int(uh.code) != 200: - raise ServersRetrievalError() - - serversxml = ''.encode().join(serversxml_list) - printer('Servers XML:\n%s' % serversxml, debug=True) try: @@ -1437,6 +2083,27 @@ class Speedtest(object): printer('Closest Servers:\n%r' % self.closest, debug=True) return self.closest + def prefer_same_country_servers(self, servers=None): + """Filter servers to prefer those in the same country as the client""" + if servers is None: + servers = self.closest + + client_country = self.config.get('client', {}).get('country', '') + if not client_country: + return servers + + same_country = [] + other_country = [] + + for server in servers: + if server.get('country', '') == client_country: + same_country.append(server) + else: + other_country.append(server) + + # Return same-country servers first, then others + return same_country + other_country + def get_best_server(self, servers=None): """Perform a speedtest.net "ping" to determine which speedtest.net server has the lowest latency @@ -1447,6 +2114,10 @@ class Speedtest(object): servers = self.get_closest_servers() servers = self.closest + # Apply country preference if enabled + if self._prefer_country: + servers = self.prefer_same_country_servers(servers) + if self._source_address: source_address_tuple = (self._source_address, 0) else: @@ -1538,7 +2209,19 @@ class Speedtest(object): def producer(q, requests, request_count): for i, request in enumerate(requests): - thread = HTTPDownloader( + if self._http2 and HTTPX_AVAILABLE: + # Use HTTP/2 downloader + thread = HTTP2Downloader( + i, + request.get_full_url(), + start, + self.config['length']['download'], + shutdown_event=self._shutdown_event, + monitor_tcp=self._monitor_tcp + ) + else: + # Use HTTP/1.1 downloader + thread = HTTPDownloader( i, request, start, @@ -1580,10 +2263,25 @@ class Speedtest(object): cons_thread.join(timeout=0.001) stop = timeit.default_timer() + + # Ensure minimum test duration of 3 seconds for accurate results on fast networks + min_duration = 3.0 + if (stop - start) < min_duration: + printer('Download test completed too quickly, extending to minimum duration...', debug=True) + # Wait for remaining time to ensure minimum duration + remaining_time = min_duration - (stop - start) + time.sleep(remaining_time) + stop = timeit.default_timer() + self.results.bytes_received = sum(finished) self.results.download = ( (self.results.bytes_received / (stop - start)) * 8.0 ) + + # Collect TCP monitoring data if enabled + if self._monitor_tcp: + self._collect_tcp_monitoring_data(finished, start, stop, 'download') + if self.results.download > 100000: self.config['threads']['upload'] = 8 return self.results.download @@ -1631,7 +2329,21 @@ class Speedtest(object): def producer(q, requests, request_count): for i, request in enumerate(requests[:request_count]): - thread = HTTPUploader( + if self._http2 and HTTPX_AVAILABLE: + # Use HTTP/2 uploader + thread = HTTP2Uploader( + i, + request[0].get_full_url(), + request[0].data, + start, + request[1], + self.config['length']['upload'], + shutdown_event=self._shutdown_event, + monitor_tcp=self._monitor_tcp + ) + else: + # Use HTTP/1.1 uploader + thread = HTTPUploader( i, request[0], start, @@ -1674,12 +2386,91 @@ class Speedtest(object): cons_thread.join(timeout=0.1) stop = timeit.default_timer() + + # Ensure minimum test duration of 3 seconds for accurate results on fast networks + min_duration = 3.0 + if (stop - start) < min_duration: + printer('Upload test completed too quickly, extending to minimum duration...', debug=True) + # Wait for remaining time to ensure minimum duration + remaining_time = min_duration - (stop - start) + time.sleep(remaining_time) + stop = timeit.default_timer() + self.results.bytes_sent = sum(finished) self.results.upload = ( (self.results.bytes_sent / (stop - start)) * 8.0 ) + + # Collect TCP monitoring data if enabled + if self._monitor_tcp: + self._collect_tcp_monitoring_data(finished, start, stop, 'upload') + return self.results.upload + def _collect_tcp_monitoring_data(self, finished, start, stop, test_type): + """Collect and analyze TCP monitoring data from completed threads""" + if not hasattr(self, '_tcp_monitoring_data'): + self._tcp_monitoring_data = { + 'download': {'threads': [], 'events': [], 'pauses': [], 'fastest_periods': []}, + 'upload': {'threads': [], 'events': [], 'pauses': [], 'fastest_periods': []} + } + + # Store basic timing information + self._tcp_monitoring_data[test_type]['test_duration'] = stop - start + self._tcp_monitoring_data[test_type]['total_bytes'] = sum(finished) + self._tcp_monitoring_data[test_type]['average_rate'] = sum(finished) / (stop - start) + + # Note: Detailed thread-level monitoring data would require access to the actual + # thread objects, which would need to be passed to this method. For now, we + # provide basic timing analysis. The HTTP/2 classes are already collecting + # detailed timing data internally. + + def get_tcp_monitoring_report(self, units=('bit', 1)): + """Generate a detailed TCP monitoring report""" + if not hasattr(self, '_tcp_monitoring_data'): + return "TCP monitoring not enabled" + + report = [] + report.append("=== TCP Performance Analysis ===") + + # Determine unit conversion based on speedtest units + unit_name, unit_multiplier = units + if unit_name == 'byte': + # Display in MB/s (megabytes per second) + rate_divisor = 1000.0 * 1000.0 # Convert to MB/s + rate_unit = "MB/s" + else: + # Display in Mbit/s (megabits per second) - default + rate_divisor = (1000.0 * 1000.0) / 8.0 # Convert to Mbit/s + rate_unit = "Mbit/s" + + for test_type in ['download', 'upload']: + if test_type not in self._tcp_monitoring_data: + continue + + data = self._tcp_monitoring_data[test_type] + report.append(f"\n{test_type.capitalize()} Test:") + report.append(f" Duration: {data.get('test_duration', 0):.2f} seconds") + + # Convert bytes/sec to appropriate units for consistency with speedtest output + bytes_per_sec = data.get('average_rate', 0) + converted_rate = bytes_per_sec / rate_divisor + report.append(f" Average rate: {converted_rate:.2f} {rate_unit}") + + if 'pauses' in data and data['pauses']: + report.append(f" Pauses detected: {len(data['pauses'])}") + for i, pause in enumerate(data['pauses'][:5]): # Show first 5 pauses + report.append(f" Pause {i+1}: {pause.get('duration', 0):.3f}s at {pause.get('start_time', 0):.2f}s") + + if 'fastest_periods' in data and data['fastest_periods']: + report.append(f" Fastest periods: {len(data['fastest_periods'])}") + for i, period in enumerate(data['fastest_periods'][:3]): # Show top 3 + # Convert bytes/sec to appropriate units + period_rate_converted = period[2] / rate_divisor + report.append(f" Period {i+1}: {period_rate_converted:.2f} {rate_unit} from {period[0]:.2f}s to {period[1]:.2f}s") + + return "\n".join(report) + def ctrl_c(shutdown_event): """Catch Ctrl-C key sequence and set a SHUTDOWN_EVENT for our threaded @@ -1780,6 +2571,17 @@ def parse_args(): 'performance. To support systems with ' 'insufficient memory, use this option to avoid a ' 'MemoryError') + parser.add_argument('--prefer-country', action='store_true', default=True, + help='Prefer servers in the same country as the client ' + '(default: enabled)') + parser.add_argument('--no-prefer-country', dest='prefer_country', + action='store_false', + help='Do not prefer servers in the same country, ' + 'use only distance-based selection') + parser.add_argument('--http2', action='store_true', + help='Use HTTP/2 for connections (requires httpx)') + parser.add_argument('--monitor-tcp', action='store_true', + help='Enable detailed TCP performance monitoring (requires HTTP/2)') parser.add_argument('--version', action='store_true', help='Show the version number and exit') parser.add_argument('--debug', action='store_true', @@ -1803,6 +2605,8 @@ def validate_optional_args(args): optional_args = { 'json': ('json/simplejson python module', json), 'secure': ('SSL support', HTTPSConnection), + 'http2': ('httpx python module for HTTP/2 support', httpx), + 'requests': ('requests python module for HTTP requests', requests), } for arg, info in optional_args.items(): @@ -1881,11 +2685,24 @@ def shell(): callback = print_dots(shutdown_event) printer('Retrieving speedtest.net configuration...', quiet) + if args.http2 and HTTPX_AVAILABLE: + printer('Using HTTP/2 for connections', quiet) + elif args.http2 and not HTTPX_AVAILABLE: + printer('HTTP/2 requested but httpx not available, falling back to HTTP/1.1', error=True) try: + # Validate that TCP monitoring requires HTTP/2 + monitor_tcp = getattr(args, 'monitor_tcp', False) + if monitor_tcp and not args.http2: + printer('TCP monitoring requires HTTP/2. Enabling HTTP/2...', quiet) + args.http2 = True + speedtest = Speedtest( source_address=args.source, timeout=args.timeout, - secure=args.secure + secure=args.secure, + prefer_country=args.prefer_country, + http2=args.http2, + monitor_tcp=monitor_tcp ) except (ConfigRetrievalError,) + HTTP_ERRORS: printer('Cannot retrieve speedtest configuration', error=True) @@ -1913,6 +2730,19 @@ def shell(): printer('Testing from %(isp)s (%(ip)s)...' % speedtest.config['client'], quiet) + # Display interface information if available + if speedtest.results.interface_info: + interface = speedtest.results.interface_info + if interface['type'] == 'Wi-Fi' and interface.get('tx_rate') is not None and interface.get('rx_rate') is not None: + printer('Network interface: %s (%s) - TX: %s Mbps, RX: %s Mbps' % + (interface['interface'], interface['type'], + interface['tx_rate'], interface['rx_rate']), + quiet) + else: + printer('Network interface: %s (%s) - %s Mbps' % + (interface['interface'], interface['type'], interface['speed']), + quiet) + if not args.mini: printer('Retrieving speedtest.net server list...', quiet) try: @@ -1973,18 +2803,39 @@ def shell(): else: printer('Skipping upload test', quiet) + # Display TCP monitoring report if enabled + if getattr(args, 'monitor_tcp', False): + tcp_report = speedtest.get_tcp_monitoring_report(args.units) + printer(tcp_report, quiet) + printer('Results:\n%r' % results.dict(), debug=True) if not args.simple and args.share: results.share() if args.simple: - printer('Ping: %s ms\nDownload: %0.2f M%s/s\nUpload: %0.2f M%s/s' % - (results.ping, + output = 'Ping: %s ms\nDownload: %0.2f M%s/s\nUpload: %0.2f M%s/s' % ( + results.ping, (results.download / 1000.0 / 1000.0) / args.units[1], args.units[0], (results.upload / 1000.0 / 1000.0) / args.units[1], - args.units[0])) + args.units[0] + ) + + # Add interface information if available + if results.interface_info: + interface = results.interface_info + if interface['type'] == 'Wi-Fi' and interface.get('tx_rate') is not None and interface.get('rx_rate') is not None: + output += '\nInterface: %s (%s) - TX: %s Mbps, RX: %s Mbps' % ( + interface['interface'], interface['type'], + interface['tx_rate'], interface['rx_rate'] + ) + else: + output += '\nInterface: %s (%s) - %s Mbps' % ( + interface['interface'], interface['type'], interface['speed'] + ) + + printer(output) elif args.csv: printer(results.csv(delimiter=args.csv_delimiter)) elif args.json: diff --git a/tcp_monitoring_demo.py b/tcp_monitoring_demo.py new file mode 100755 index 0000000..319023e --- /dev/null +++ b/tcp_monitoring_demo.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +""" +TCP Monitoring Demo for speedtest-cli + +This script demonstrates the new TCP monitoring capabilities added to speedtest-cli. +It shows how to use the --monitor-tcp flag to get detailed performance analysis. +""" + +import subprocess +import sys +import time + +def run_speedtest_with_monitoring(): + """Run speedtest with TCP monitoring enabled""" + print("=== Speedtest-cli TCP Monitoring Demo ===") + print() + print("This demo shows the new TCP monitoring capabilities that help identify:") + print("- Fastest periods during speed tests") + print("- Pauses and timeouts in data transfer") + print("- Detailed timing analysis for HTTP/2 connections") + print() + + # Check if httpx is available + try: + import httpx + print("✓ httpx is available - HTTP/2 support enabled") + except ImportError: + print("✗ httpx not available - HTTP/2 monitoring requires httpx") + print(" Install with: pip install httpx") + return + + print() + print("Running speedtest with TCP monitoring...") + print("=" * 50) + + # Run speedtest with monitoring + cmd = [sys.executable, 'speedtest.py', '--http2', '--monitor-tcp', '--simple'] + + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) + + if result.returncode == 0: + print(result.stdout) + if result.stderr: + print("Debug output:") + print(result.stderr) + else: + print(f"Error running speedtest: {result.stderr}") + + except subprocess.TimeoutExpired: + print("Speedtest timed out after 2 minutes") + except Exception as e: + print(f"Error: {e}") + +def show_usage_examples(): + """Show usage examples for the new TCP monitoring feature""" + print("\n=== Usage Examples ===") + print() + print("Basic TCP monitoring:") + print(" ./speedtest.py --http2 --monitor-tcp") + print() + print("TCP monitoring with debug output:") + print(" ./speedtest.py --http2 --monitor-tcp --debug") + print() + print("TCP monitoring with simple output:") + print(" ./speedtest.py --http2 --monitor-tcp --simple") + print() + print("Note: --monitor-tcp automatically enables --http2 if not already specified") + +if __name__ == "__main__": + if len(sys.argv) > 1 and sys.argv[1] == "--help": + show_usage_examples() + else: + run_speedtest_with_monitoring() + show_usage_examples()