178 lines
5.3 KiB
Python
178 lines
5.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Hostapd Client Module
|
|
Provides interface to hostapd_cli commands for managing WiFi stations.
|
|
"""
|
|
|
|
import subprocess
|
|
import json
|
|
import logging
|
|
import os
|
|
from typing import List, Dict, Optional
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HostapdClientError(Exception):
|
|
"""Custom exception for hostapd client errors"""
|
|
|
|
pass
|
|
|
|
|
|
class HostapdClient:
|
|
def __init__(self, interface: str = "wlan0"):
|
|
"""
|
|
Initialize HostapdClient
|
|
|
|
Args:
|
|
interface (str): Network interface name (default: wlan0)
|
|
"""
|
|
self.interface = interface
|
|
self.hostapd_cli_path = "hostapd_cli"
|
|
|
|
def _is_root(self) -> bool:
|
|
"""
|
|
Check if the current user is root (UID 0)
|
|
|
|
Returns:
|
|
bool: True if running as root, False otherwise
|
|
"""
|
|
return os.geteuid() == 0
|
|
|
|
def _run_hostapd_command(self, args: List[str], timeout: int = 10) -> str:
|
|
"""
|
|
Execute hostapd_cli command with error handling
|
|
|
|
Args:
|
|
args (List[str]): Command arguments
|
|
timeout (int): Timeout in seconds
|
|
|
|
Returns:
|
|
str: Command output
|
|
|
|
Raises:
|
|
HostapdClientError: If command fails
|
|
"""
|
|
try:
|
|
cmd = ["-s", "/run/hostapd"] + args
|
|
# Build command, adding sudo if not running as root
|
|
cmd.insert(0, self.hostapd_cli_path)
|
|
if not self._is_root():
|
|
cmd.insert(0, "-n") # make sudo non interactive
|
|
cmd.insert(0, "sudo")
|
|
|
|
logger.debug(f"Executing command: {' '.join(cmd)}")
|
|
|
|
result = subprocess.run(
|
|
cmd, capture_output=True, text=True, timeout=timeout
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
error_msg = (
|
|
result.stderr.strip()
|
|
or f"Command failed with return code {result.returncode}"
|
|
)
|
|
raise HostapdClientError(f"hostapd_cli error: {error_msg}")
|
|
|
|
return result.stdout.strip()
|
|
|
|
except subprocess.TimeoutExpired:
|
|
raise HostapdClientError("hostapd_cli command timed out")
|
|
except FileNotFoundError:
|
|
raise HostapdClientError(
|
|
"hostapd_cli command not found. Is hostapd installed?"
|
|
)
|
|
except Exception as e:
|
|
raise HostapdClientError(f"Failed to execute hostapd_cli: {str(e)}")
|
|
|
|
def list_stations(self) -> List[str]:
|
|
"""
|
|
Get list of connected stations (MAC addresses)
|
|
|
|
Returns:
|
|
List[str]: List of MAC addresses
|
|
|
|
Raises:
|
|
HostapdClientError: If command fails
|
|
"""
|
|
try:
|
|
output = self._run_hostapd_command(["-i", self.interface, "list_sta"])
|
|
|
|
if not output:
|
|
return []
|
|
|
|
# Split by newline and filter out empty lines
|
|
stations = [line.strip() for line in output.split("\n") if line.strip()]
|
|
return stations
|
|
|
|
except HostapdClientError:
|
|
raise
|
|
except Exception as e:
|
|
raise HostapdClientError(f"Failed to parse station list: {str(e)}")
|
|
|
|
def get_station_details(self, mac_address: str) -> Dict[str, str]:
|
|
"""
|
|
Get detailed information about a specific station
|
|
|
|
Args:
|
|
mac_address (str): MAC address of the station
|
|
|
|
Returns:
|
|
Dict[str, str]: Dictionary of station details
|
|
|
|
Raises:
|
|
HostapdClientError: If command fails or MAC is invalid
|
|
"""
|
|
# Basic MAC address validation
|
|
if not mac_address or not isinstance(mac_address, str):
|
|
raise HostapdClientError("Invalid MAC address")
|
|
|
|
# Simple format validation (xx:xx:xx:xx:xx:xx)
|
|
mac_parts = mac_address.split(":")
|
|
if len(mac_parts) != 6 or not all(len(part) == 2 for part in mac_parts):
|
|
raise HostapdClientError("Invalid MAC address format")
|
|
|
|
try:
|
|
output = self._run_hostapd_command(
|
|
["-i", self.interface, "sta", mac_address]
|
|
)
|
|
|
|
if not output:
|
|
raise HostapdClientError(f"No details found for station {mac_address}")
|
|
|
|
# Parse key-value pairs from hostapd output
|
|
details = {}
|
|
for line in output.split("\n"):
|
|
if "=" in line:
|
|
key, value = line.split("=", 1)
|
|
details[key.strip()] = value.strip()
|
|
elif line.strip():
|
|
# Handle lines without '=' (might be flags or status info)
|
|
details[line.strip()] = "true"
|
|
|
|
return details
|
|
|
|
except HostapdClientError:
|
|
raise
|
|
except Exception as e:
|
|
raise HostapdClientError(f"Failed to parse station details: {str(e)}")
|
|
|
|
|
|
# Example usage (for testing)
|
|
if __name__ == "__main__":
|
|
client = HostapdClient()
|
|
|
|
try:
|
|
# Test listing stations
|
|
stations = client.list_stations()
|
|
print("Connected stations:", stations)
|
|
|
|
# Test getting details for first station (if any)
|
|
if stations:
|
|
details = client.get_station_details(stations[0])
|
|
print(f"Details for {stations[0]}:", json.dumps(details, indent=2))
|
|
|
|
except HostapdClientError as e:
|
|
print(f"Error: {e}")
|