#!/usr/bin/env python3 """ Hostapd Client Module Provides interface to hostapd_cli commands for managing WiFi stations. """ import subprocess import json import logging import os from typing import List, Dict, Optional # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class HostapdClientError(Exception): """Custom exception for hostapd client errors""" pass class HostapdClient: def __init__(self, interface: str = "wlan0"): """ Initialize HostapdClient Args: interface (str): Network interface name (default: wlan0) """ self.interface = interface self.hostapd_cli_path = "hostapd_cli" def _is_root(self) -> bool: """ Check if the current user is root (UID 0) Returns: bool: True if running as root, False otherwise """ return os.geteuid() == 0 def _run_hostapd_command(self, args: List[str], timeout: int = 10) -> str: """ Execute hostapd_cli command with error handling Args: args (List[str]): Command arguments timeout (int): Timeout in seconds Returns: str: Command output Raises: HostapdClientError: If command fails """ try: cmd = ["-s", "/run/hostapd"] + args # Build command, adding sudo if not running as root cmd.insert(0, self.hostapd_cli_path) if not self._is_root(): cmd.insert(0, "-n") # make sudo non interactive cmd.insert(0, "sudo") logger.debug(f"Executing command: {' '.join(cmd)}") result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) if result.returncode != 0: error_msg = ( result.stderr.strip() or f"Command failed with return code {result.returncode}" ) raise HostapdClientError(f"hostapd_cli error: {error_msg}") return result.stdout.strip() except subprocess.TimeoutExpired: raise HostapdClientError("hostapd_cli command timed out") except FileNotFoundError: raise HostapdClientError( "hostapd_cli command not found. Is hostapd installed?" ) except Exception as e: raise HostapdClientError(f"Failed to execute hostapd_cli: {str(e)}") def list_stations(self) -> List[str]: """ Get list of connected stations (MAC addresses) Returns: List[str]: List of MAC addresses Raises: HostapdClientError: If command fails """ try: output = self._run_hostapd_command(["-i", self.interface, "list_sta"]) if not output: return [] # Split by newline and filter out empty lines stations = [line.strip() for line in output.split("\n") if line.strip()] return stations except HostapdClientError: raise except Exception as e: raise HostapdClientError(f"Failed to parse station list: {str(e)}") def get_station_details(self, mac_address: str) -> Dict[str, str]: """ Get detailed information about a specific station Args: mac_address (str): MAC address of the station Returns: Dict[str, str]: Dictionary of station details Raises: HostapdClientError: If command fails or MAC is invalid """ # Basic MAC address validation if not mac_address or not isinstance(mac_address, str): raise HostapdClientError("Invalid MAC address") # Simple format validation (xx:xx:xx:xx:xx:xx) mac_parts = mac_address.split(":") if len(mac_parts) != 6 or not all(len(part) == 2 for part in mac_parts): raise HostapdClientError("Invalid MAC address format") try: output = self._run_hostapd_command( ["-i", self.interface, "sta", mac_address] ) if not output: raise HostapdClientError(f"No details found for station {mac_address}") # Parse key-value pairs from hostapd output details = {} for line in output.split("\n"): if "=" in line: key, value = line.split("=", 1) details[key.strip()] = value.strip() elif line.strip(): # Handle lines without '=' (might be flags or status info) details[line.strip()] = "true" return details except HostapdClientError: raise except Exception as e: raise HostapdClientError(f"Failed to parse station details: {str(e)}") # Example usage (for testing) if __name__ == "__main__": client = HostapdClient() try: # Test listing stations stations = client.list_stations() print("Connected stations:", stations) # Test getting details for first station (if any) if stations: details = client.get_station_details(stations[0]) print(f"Details for {stations[0]}:", json.dumps(details, indent=2)) except HostapdClientError as e: print(f"Error: {e}")