diff --git a/Bluetooth_LE_beacon/uxplay-beacon.py b/Bluetooth_LE_beacon/uxplay-beacon.py index 987ee2a..2ab4547 100644 --- a/Bluetooth_LE_beacon/uxplay-beacon.py +++ b/Bluetooth_LE_beacon/uxplay-beacon.py @@ -50,14 +50,14 @@ os_name = platform.system() # external functions that must be supplied by loading a module: from typing import Optional -def setup_beacon(ipv4_str: str, port: int, advmin: Optional[int], advmax: Optional[int], index: Optional[int]) -> int: - return 0 - -def beacon_on() ->bool: +def setup_beacon(ipv4_str: str, port: int, advmin: Optional[int], advmax: Optional[int], index: Optional[int]) -> bool: return False -def beacon_off() ->int: - return 0 +def beacon_on() ->Optional[int]: + return None + +def beacon_off(): + return def find_device(device: Optional[str]) -> Optional[str]: return None @@ -70,16 +70,22 @@ def start_beacon(): global advmin global advmax global index + if beacon_is_running: + print(f'code error, should not happen') + raise SystemExit(1) setup_beacon(ipv4_str, port, advmin, advmax, index) - beacon_is_running = beacon_on() + advertised_port = beacon_on() + beacon_is_running = advertised_port is not None if not beacon_is_running: print(f'second attempt to start beacon:') - beacon_is_running = beacon_on() + advertised_port = beacon_on() + beacon_is_running = advertised_port is not None def stop_beacon(): global beacon_is_running global advertised_port - advertised_port = beacon_off() + beacon_off() + advertised_port = None beacon_is_running = False def pid_is_running(pid): @@ -223,6 +229,7 @@ if __name__ == '__main__': ble_bluez = "bluez" ble_winrt = "winrt" ble_bleuio = "bleuio" + ble_hci = "hci" # Create an ArgumentParser object epilog_text = ''' @@ -252,12 +259,13 @@ if __name__ == '__main__': bleuio = 'BleuIO' winrt = 'winrt' bluez = 'BlueZ' + hci = 'HCI' # Add arguments parser.add_argument( 'ble_type', nargs='?', - choices=[bleuio, None], + choices=[bleuio, hci, None], help=textwrap.dedent(''' Specifies whether or not to use the module supporting the BleuIO USB dongle, or (if not supplied) the default native Linux (BlueZ) or Windows (winrt) modules. @@ -464,7 +472,7 @@ if __name__ == '__main__': beacon_off = ble.beacon_off need_device = False - if ble_type == bleuio: + if ble_type == bleuio or ble_type == hci: # obtain serial port for BleuIO device find_device = ble.find_device need_device = True diff --git a/Bluetooth_LE_beacon/uxplay_beacon_module_BleuIO.py b/Bluetooth_LE_beacon/uxplay_beacon_module_BleuIO.py index e2ef30d..b9525c0 100644 --- a/Bluetooth_LE_beacon/uxplay_beacon_module_BleuIO.py +++ b/Bluetooth_LE_beacon/uxplay_beacon_module_BleuIO.py @@ -50,7 +50,7 @@ def check_adv_intrvl(min, max): raise ValueError('advmax was larger than 10240 msecs') from typing import Literal -def setup_beacon(ipv4_str: str, port: int, advmin: int, advmax: int, index: Literal[None]) ->int: +def setup_beacon(ipv4_str: str, port: int, advmin: int, advmax: int, index: Literal[None]) ->bool: if index is not None: raise ValuError('uxplay_beacon_module_BleuIO called with value of index: not None') global advertised_port @@ -75,13 +75,14 @@ def setup_beacon(ipv4_str: str, port: int, advmin: int, advmax: int, index: Lite advertisement_parameters = "0;" + str(advmin) + ";" + str(advmax) + ";0;" # non-connectable mode, min ad internal, max ad interval, time = unlimited advertised_address = ipv4_str advertised_port = port - return advertised_port + return True def beacon_on() ->bool: global airplay_advertisement global advertisement_parameters + global advertised_port global serial_port - success = False + ser = None try: print(f'Connecting to BleuIO dongle on {serial_port} ....') with serial.Serial(serial_port, 115200, timeout = 1) as ser: @@ -92,23 +93,25 @@ def beacon_on() ->bool: response = send_at_command(ser, "AT+ADVSTART=" + advertisement_parameters) #print(f'{response}') print(f'AirPlay Service Discovery advertising started, port = {advertised_port} ip address = {advertised_address}') - success = True except serial.SerialException as e: print(f"beacon_on: Serial port error: {e}") raise SystemExit(1) + advertised_port = None except Exception as e: print(f"beacon_on: An unexpected error occurred: {e}") - raise SystemExit(1) + advertised_port = None finally: - ser.close() - return success + if ser is not None: + ser.close() + return advertised_port -def beacon_off() ->int: +def beacon_off(): global advertisement_parameters global airplay_advertisement global advertised_port global advertised_address global serial_port + ser = None # Stop advertising try: with serial.Serial(serial_port, 115200, timeout = 1) as ser: @@ -122,13 +125,11 @@ def beacon_off() ->int: resullt = True except serial.SerialException as e: print(f"beacon_off: Serial port error: {e}") - raise SystemExit(1) except Exception as e: print(f"beacon_off: An unexpected error occurred: {e}") - raise SystemExit(1) finally: - ser.close() - return advertised_port + if ser is not None: + ser.close() from typing import Optional def find_device(serial_port_in: Optional[str]) ->Optional[str]: @@ -144,16 +145,33 @@ def find_device(serial_port_in: Optional[str]) ->Optional[str]: continue if p.vid == TARGET_VID and p.device == serial_port_in: serial_port = serial_port_in - return serial_port - for p in serial_ports: - if p.vid is not None and p.vid == TARGET_VID: - count+=1 - if count == 1: - serial_port = p.device - print(f'=== detected BlueuIO {count}. port: {p.device} desc: {p.description} hwid: {p.hwid}') - if count>1: - print(f'warning: {count} BleueIO devices were found, the first found will be used') - print(f'(to override this choice, specify "--device =..." in optional arguments)') + if serial_port is None: + for p in serial_ports: + if p.vid is not None and p.vid == TARGET_VID: + count+=1 + if count == 1: + serial_port = p.device + print(f'=== detected BlueuIO {count}. port: {p.device} desc: {p.description} hwid: {p.hwid}') + if count>1: + print(f'warning: {count} BleueIO devices were found, the first found will be used') + print(f'(to override this choice, specify "--device =..." in optional arguments)') + if serial_port is None: + return serial_port + + #test access to serial_port + try: + with serial.Serial(serial_port, 115200, timeout = 1) as ser: + send_at_command(ser, "AT") + ser.close() + except Exception as e: + print(f"beacon_on: Serial port error: {e}") + text=''' + The user does not have sufficient privilegs to access this serial port: + On Linux, the system administrator should add the user to the "dialout" group + On BSD systems, the necesary group is usually the "dialer" group. + This can be checked with ''' + print(text, f'"ls -l {serial_port}"') + raise SystemExit(1) return serial_port print(f'Imported uxplay_beacon_module_BleuIO') diff --git a/Bluetooth_LE_beacon/uxplay_beacon_module_BlueZ.py b/Bluetooth_LE_beacon/uxplay_beacon_module_BlueZ.py index bfa952d..5ab93c0 100644 --- a/Bluetooth_LE_beacon/uxplay_beacon_module_BlueZ.py +++ b/Bluetooth_LE_beacon/uxplay_beacon_module_BlueZ.py @@ -119,7 +119,7 @@ def register_ad_cb(): print(f'AirPlay Service_Discovery Advertisement ({advertised_address}:{advertised_port}) registered') def register_ad_error_cb(error): - print(f'Failed to register advertisement: {error}') + print(f'register_ad: {error}') global ad_manager global advertised_port global advertised_address @@ -136,6 +136,7 @@ def find_adapter(bus): return o return None +from typing import Optional def setup_beacon(ipv4_str :str, port :int, advmin :int, advmax :int, index :int ) ->int: global ad_manager global airplay_advertisement @@ -148,29 +149,26 @@ def setup_beacon(ipv4_str :str, port :int, advmin :int, advmax :int, index :int adapter = find_adapter(bus) if not adapter: print(f'LEAdvertisingManager1 interface not found') - return + return False adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), "org.freedesktop.DBus.Properties") adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1)) ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), LE_ADVERTISING_MANAGER_IFACE) airplay_advertisement = AirPlayAdvertisement(bus, index, ipv4_str, port, advmin, advmax) - return advertised_port + return True -def beacon_on() ->bool: +def beacon_on() ->Optional[int]: global airplay_advertisement ad_manager.RegisterAdvertisement(airplay_advertisement.get_path(), {}, reply_handler=register_ad_cb, error_handler=register_ad_error_cb) - if ad_manager is None: + # if registration error occurs, advertised_port is set to None by callback + if advertised_port is None: airplay_advertisement = None - advertised_port = None - advertised_address = None - return False - else: - return True + return advertised_port -def beacon_off() ->int: +def beacon_off(): global ad_manager global airplay_advertisement global advertised_port @@ -184,6 +182,5 @@ def beacon_off() ->int: airplay_advertisement = None advertised_Port = None advertised_address = None - return advertised_port print(f'loaded uxplay_beacon_module_BlueZ ') diff --git a/Bluetooth_LE_beacon/uxplay_beacon_module_winrt.py b/Bluetooth_LE_beacon/uxplay_beacon_module_winrt.py index 88a7df5..12f4362 100644 --- a/Bluetooth_LE_beacon/uxplay_beacon_module_winrt.py +++ b/Bluetooth_LE_beacon/uxplay_beacon_module_winrt.py @@ -77,31 +77,31 @@ async def publish_advertisement(): advertised_port = None from typing import Literal -def setup_beacon(ipv4_str: str, port:int , advmin: Literal[None], advmax :Literal[None], index :Literal[None]) ->int: +def setup_beacon(ipv4_str: str, port:int , advmin: Literal[None], advmax :Literal[None], index :Literal[None]) ->bool: if (advmin is not None) or (advmax is not None) or (index is not None): raise ValueError('uxplay_beacon_module_winrt: advmin, advmax, index were not all None') - global advertised_port create_airplay_service_discovery_advertisement_publisher(ipv4_str, port) - return advertised_port - -def beacon_on() -> bool: + return True + +from typing import Optional +def beacon_on() -> Optional[int]: import asyncio try: - asyncio.run( publish_advertisement()) - return True + asyncio.run(publish_advertisement()) except Exception as e: print(f"Failed to start publisher: {e}") global publisher publisher = None - return False - + finally: + #advertised_port is set to None if publish_advertisement failed + global advertised_port + return advertised_port -def beacon_off() ->int: +def beacon_off(): publisher.stop() global advertised_port global advertised_address advertised_port = None advertised_address = None - return advertised_port print(f'loaded uxplay_beacon_module_winrt')