#!/usr/bin/env python3 """ Multi-Device BLE Client for ESP32-C3 Connects to one or more ESP32-C3 BLE servers and can send/receive messages Supports controlling multiple XIAO ESP32-C3 devices simultaneously """ import asyncio from bleak import BleakClient, BleakScanner from typing import Dict, Optional # UUIDs matching the ESP32-C3 code SERVICE_UUID = "4fafc201-1fb5-459e-8fcc-c5c9c331914b" CHARACTERISTIC_UUID = "beb5483e-36e1-4688-b7f5-ea07361b26a8" # Device name pattern to search for (matches "ESP32-C3" or "ESP32-C3-BLE" or any variant) DEVICE_NAME_PATTERN = "ESP32-C3" class DeviceConnection: """Represents a connected ESP32-C3 device""" def __init__(self, name: str, address: str, client: BleakClient): self.name = name self.address = address self.client = client self.id = address # Use MAC address as unique ID def create_notification_handler(device_id: str): """Create a notification handler for a specific device""" def notification_handler(sender, data): message = data.decode('utf-8') print(f"šŸ“Ø [{device_id}] Received: {message}") return notification_handler async def scan_for_devices(): """Scan for ESP32-C3 BLE devices""" print("šŸ” Scanning for BLE devices (this may take 5-10 seconds)...") print(" (Make sure Bluetooth is enabled and ESP32-C3 is powered on)\n") try: # Increase scan timeout to 10 seconds for better device discovery devices = await BleakScanner.discover(timeout=10.0) esp32_devices = [] print(f"\nšŸ“‹ Found {len(devices)} BLE device(s) total:") for device in devices: device_name = device.name if device.name else "(Unknown)" print(f" • {device_name} ({device.address})") if device.name and DEVICE_NAME_PATTERN in device.name: esp32_devices.append(device) print(f" āœ“ Matches ESP32-C3 pattern!") if not esp32_devices: print(f"\nāš ļø No devices found matching '{DEVICE_NAME_PATTERN}'") print(" Possible issues:") print(" • ESP32-C3 not powered on or code not uploaded") print(" • Device name doesn't match (check Serial Monitor)") print(" • Bluetooth permissions not granted (macOS may need permission)") print(" • Device already connected to another client") return esp32_devices except Exception as e: print(f"āŒ Error during scan: {e}") print("\n Troubleshooting:") print(" • On macOS: Check System Preferences → Security & Privacy → Bluetooth") print(" • Make sure Bluetooth is enabled on your computer") print(" • Try running with: python3 computer_client.py") return [] async def connect_to_device(device) -> Optional[DeviceConnection]: """Connect to a single ESP32-C3 device""" print(f"\nšŸ”Œ Connecting to {device.name} ({device.address})...") client = BleakClient(device.address) try: await client.connect() if not client.is_connected: print(f"āŒ Failed to connect to {device.name}") return None print(f"āœ… Connected to {device.name}!") # Create device connection object device_conn = DeviceConnection(device.name, device.address, client) # Subscribe to notifications handler = create_notification_handler(device_conn.id) await client.start_notify(CHARACTERISTIC_UUID, handler) print(f"šŸ“” Listening for notifications from {device.name}...") # Read initial value try: value = await client.read_gatt_char(CHARACTERISTIC_UUID) print(f"šŸ“‹ Initial value from {device.name}: {value.decode('utf-8')}") except Exception as e: print(f"āš ļø Could not read initial value from {device.name}: {e}") return device_conn except Exception as e: print(f"āŒ Error connecting to {device.name}: {e}") return None async def send_to_device(device_conn: DeviceConnection, message: str) -> bool: """Send a message to a specific device""" if not device_conn.client.is_connected: print(f"āŒ Device {device_conn.name} is not connected") return False try: await device_conn.client.write_gatt_char( CHARACTERISTIC_UUID, message.encode('utf-8'), response=False ) print(f"āœ… Sent to [{device_conn.id}]: '{message}'") return True except Exception as e: print(f"āŒ Error sending to {device_conn.name}: {e}") return False async def manage_multiple_devices(): """Main function to manage multiple ESP32-C3 devices""" # Scan for devices esp32_devices = await scan_for_devices() if not esp32_devices: print(f"\nāŒ Could not find any devices matching '{DEVICE_NAME_PATTERN}'") print("Make sure your ESP32-C3 devices are powered on and the BLE server is running.") return print(f"\nšŸ“± Found {len(esp32_devices)} ESP32-C3 device(s)") # Connect to all found devices connections: Dict[str, DeviceConnection] = {} for device in esp32_devices: device_conn = await connect_to_device(device) if device_conn: connections[device_conn.id] = device_conn if not connections: print("āŒ Failed to connect to any devices") return print(f"\nāœ… Connected to {len(connections)} device(s)") print("\n" + "="*60) print("CONTROL INSTRUCTIONS:") print("="*60) print("Send commands using one of these formats:") print(" • : - Send to specific device (e.g., 'AA:BB:CC:DD:EE:FF:ON')") print(" • all: - Send to all devices (e.g., 'all:ON')") print(" • - Send to first device (e.g., 'ON')") print("\nAvailable commands:") print(" • ON, OFF, 1, 0, TRUE, FALSE - Control humidifier") print(" • Any other message - Toggle humidifier state") print(" • 'quit' or 'exit' - Disconnect and exit") print("="*60) print("\nConnected devices:") for device_id, conn in connections.items(): print(f" • {conn.name} ({device_id})") print("\nReady! Type your commands:\n") # Interactive loop try: while True: user_input = input().strip() if user_input.lower() in ['quit', 'exit', 'q']: break if not user_input: continue # Parse command if ':' in user_input: parts = user_input.split(':', 1) target = parts[0].strip().lower() command = parts[1].strip() if target == 'all': # Send to all devices print(f"šŸ“¤ Broadcasting to all devices: '{command}'") tasks = [send_to_device(conn, command) for conn in connections.values()] await asyncio.gather(*tasks) else: # Send to specific device target_device = None for device_id, conn in connections.items(): if device_id.lower() == target or target in device_id.lower(): target_device = conn break if target_device: await send_to_device(target_device, command) else: print(f"āŒ Device '{target}' not found. Available devices:") for device_id in connections.keys(): print(f" • {device_id}") else: # Send to first device (backward compatibility) if connections: first_device = list(connections.values())[0] await send_to_device(first_device, user_input) else: print("āŒ No devices connected") except KeyboardInterrupt: print("\nšŸ‘‹ Disconnecting...") finally: # Disconnect all devices print("\nšŸ”Œ Disconnecting from all devices...") for device_conn in connections.values(): try: if device_conn.client.is_connected: await device_conn.client.stop_notify(CHARACTERISTIC_UUID) await device_conn.client.disconnect() print(f"āœ… Disconnected from {device_conn.name}") except Exception as e: print(f"āš ļø Error disconnecting from {device_conn.name}: {e}") print("All devices disconnected.") if __name__ == "__main__": try: asyncio.run(manage_multiple_devices()) except Exception as e: print(f"āŒ Error: {e}")