diff --git a/Bluetooth_LE_beacon/uxplay-beacon.py b/Bluetooth_LE_beacon/uxplay-beacon.py index 9505e3b..107be26 100644 --- a/Bluetooth_LE_beacon/uxplay-beacon.py +++ b/Bluetooth_LE_beacon/uxplay-beacon.py @@ -76,11 +76,16 @@ def start_beacon(): setup_beacon(ipv4_str, port, advmin, advmax, index) advertised_port = beacon_on() beacon_is_running = advertised_port is not None - if not beacon_is_running: - print(f'second attempt to start beacon:') + count = 1 + while not beacon_is_running: + print(f'Failed attempt {count} to start beacon:') advertised_port = beacon_on() beacon_is_running = advertised_port is not None - + count += 1 + if count > 5: + print(f'Giving up, check Bluetooth adapter') + raise SystemExit(1) + def stop_beacon(): global beacon_is_running global advertised_port @@ -498,6 +503,14 @@ if __name__ == '__main__': advminmax = f'[advmin:advmax]={advmin}:{advmax}' if ble_type == bluez: indx = f'index {index}' + test = None + if ble_type == winrt or ble_type == bluez: + # initial test to see if Bluetooth is available + setup_beacon(ipv4_str, 1, advmin, advmax, index) + test = beacon_on() + beacon_off() + if test is not None: + print(f"test passed") print(f'AirPlay Service-Discovery Bluetooth LE beacon: BLE file {path} {advminmax} {indx}') print(f'Advertising IP address {ipv4_str}') print(f'(Press Ctrl+C to exit)') diff --git a/Bluetooth_LE_beacon/uxplay_beacon_module_BleuIO.py b/Bluetooth_LE_beacon/uxplay_beacon_module_BleuIO.py index b9525c0..ad888b3 100644 --- a/Bluetooth_LE_beacon/uxplay_beacon_module_BleuIO.py +++ b/Bluetooth_LE_beacon/uxplay_beacon_module_BleuIO.py @@ -12,6 +12,7 @@ import time import os +import ipaddress try: import serial @@ -21,6 +22,7 @@ except ImportError as e: print(f'install pyserial') raise SystemExit(1) +#global variables advertised_port = None advertised_address = None serial_port = None @@ -51,17 +53,16 @@ def check_adv_intrvl(min, max): from typing import Literal def setup_beacon(ipv4_str: str, port: int, advmin: int, advmax: int, index: Literal[None]) ->bool: - if index is not None: - raise ValuError('uxplay_beacon_module_BleuIO called with value of index: not None') global advertised_port global advertised_address global airplay_advertisement - global advertisement_parameters + global advertisement_parameters + if index is not None: + raise ValuError('uxplay_beacon_module_BleuIO called with value of index: not None') check_adv_intrvl(advmin, advmax) # set up advertising message: assert port > 0 assert port <= 65535 - import ipaddress ipv4_address = ipaddress.ip_address(ipv4_str) port_bytes = port.to_bytes(2, 'big') data = bytearray([0xff, 0x4c, 0x00]) # ( 3 bytes) type manufacturer_specific 0xff, manufacturer id Apple 0x004c @@ -78,10 +79,7 @@ def setup_beacon(ipv4_str: str, port: int, advmin: int, advmax: int, index: Lite return True def beacon_on() ->bool: - global airplay_advertisement - global advertisement_parameters global advertised_port - global serial_port ser = None try: print(f'Connecting to BleuIO dongle on {serial_port} ....') @@ -110,7 +108,6 @@ def beacon_off(): global airplay_advertisement global advertised_port global advertised_address - global serial_port ser = None # Stop advertising try: @@ -122,7 +119,6 @@ def beacon_off(): advertised_Port = None advertised_address = None advertisement_parameters = None - resullt = True except serial.SerialException as e: print(f"beacon_off: Serial port error: {e}") except Exception as e: @@ -135,19 +131,22 @@ from typing import Optional def find_device(serial_port_in: Optional[str]) ->Optional[str]: global serial_port serial_ports = list(list_ports.comports()) - count = 0 serial_port_found = False serial_port = None - TARGET_VID = 0x2DCF # used by BleuIO and BleuIO Pro + TARGET_VID = '0x2DCF' # used by BleuIO and BleuIO Pro + target_vid = int(TARGET_VID,16) + if serial_port_in is not None: for p in serial_ports: - if p.vid is None: - continue - if p.vid == TARGET_VID and p.device == serial_port_in: - serial_port = serial_port_in + if getattr(p, 'vid', None) == target_vid or TARGET_VID in p.hwid: + if p.device == serial_port_in: + serial_port = serial_port_in + break + if serial_port is None: + count = 0 for p in serial_ports: - if p.vid is not None and p.vid == TARGET_VID: + if getattr(p, 'vid', None) == target_vid or TARGET_VID in p.hwid: count+=1 if count == 1: serial_port = p.device @@ -155,6 +154,7 @@ def find_device(serial_port_in: Optional[str]) ->Optional[str]: if count>1: print(f'warning: {count} BleueIO devices were found, the first found will be used') print(f'(to override this choice, specify "--device =..." in optional arguments)') + if serial_port is None: return serial_port @@ -166,10 +166,10 @@ def find_device(serial_port_in: Optional[str]) ->Optional[str]: except Exception as e: print(f"beacon_on: Serial port error: {e}") text=''' - The user does not have sufficient privilegs to access this serial port: - On Linux, the system administrator should add the user to the "dialout" group + The user does not have sufficient privileges to access this serial port: + On Linux, the user should be added to the "dialout" or "uucp" group On BSD systems, the necesary group is usually the "dialer" group. - This can be checked with ''' + The correct group can be found using ''' print(text, f'"ls -l {serial_port}"') raise SystemExit(1) return serial_port diff --git a/Bluetooth_LE_beacon/uxplay_beacon_module_BlueZ.py b/Bluetooth_LE_beacon/uxplay_beacon_module_BlueZ.py index 5ab93c0..87d5d5c 100644 --- a/Bluetooth_LE_beacon/uxplay_beacon_module_BlueZ.py +++ b/Bluetooth_LE_beacon/uxplay_beacon_module_BlueZ.py @@ -8,12 +8,17 @@ try: import dbus import dbus.exceptions import dbus.mainloop.glib - import dbus.service + import dbus.service except ImportError as e: print(f"ImportError: {e}, failed to import required dbus components") print(f"install the python3 dbus package") raise SystemExit(1) +import os +import ipaddress +from typing import Optional + +#global variables ad_manager = None airplay_advertisement = None advertised_port = None @@ -96,7 +101,9 @@ class AirPlay_Service_Discovery_Advertisement(dbus.service.Object): out_signature='') def Release(self): - print(f'{self.path}: Released!') + print(f'{self.path}: D-Bus Released! (Bluetooth USB adapter removed?)') + print(f'Stopping ...') + os._exit(1) class AirPlayAdvertisement(AirPlay_Service_Discovery_Advertisement): @@ -105,7 +112,6 @@ class AirPlayAdvertisement(AirPlay_Service_Discovery_Advertisement): assert port > 0 assert port <= 65535 mfg_data = bytearray([0x09, 0x08, 0x13, 0x30]) # Apple Data Unit type 9 (Airplay), length 8, flags 0001 0011, seed 30 - import ipaddress ipv4_address = ipaddress.ip_address(ipv4_str) ipv4 = bytearray(ipv4_address.packed) mfg_data.extend(ipv4) @@ -119,7 +125,6 @@ def register_ad_cb(): print(f'AirPlay Service_Discovery Advertisement ({advertised_address}:{advertised_port}) registered') def register_ad_error_cb(error): - print(f'register_ad: {error}') global ad_manager global advertised_port global advertised_address @@ -128,15 +133,22 @@ def register_ad_error_cb(error): advertised_address = None def find_adapter(bus): - remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), - DBUS_OM_IFACE) + try: + remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), + DBUS_OM_IFACE) + except dbus.exceptions.DBusException as e: + if e.get_dbus_name() == 'org.freedesktop.DBus.Error.ServiceUnknown': + print("Error: Bluetooth D-Bus service not running on host.") + print(f'Stopping ...') + os._exit(1) objects = remote_om.GetManagedObjects() for o, props in objects.items(): if LE_ADVERTISING_MANAGER_IFACE in props: return o - return None + print(f'Error: Bluetooth adapter not found') + print(f'Stopping ...') + os._exit(1) -from typing import Optional def setup_beacon(ipv4_str :str, port :int, advmin :int, advmax :int, index :int ) ->int: global ad_manager global airplay_advertisement @@ -145,11 +157,8 @@ def setup_beacon(ipv4_str :str, port :int, advmin :int, advmax :int, index :int advertised_port = port advertised_address = ipv4_str dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - bus = dbus.SystemBus() + bus = dbus.SystemBus() adapter = find_adapter(bus) - if not adapter: - print(f'LEAdvertisingManager1 interface not found') - return False adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), "org.freedesktop.DBus.Properties") adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1)) @@ -160,6 +169,13 @@ def setup_beacon(ipv4_str :str, port :int, advmin :int, advmax :int, index :int def beacon_on() ->Optional[int]: global airplay_advertisement + global advertised_port + global ad_manager + if advertised_port == 1: + # this value is used when testing for Bluetooth Service + ad_manager = None + advertised_port = None + return None ad_manager.RegisterAdvertisement(airplay_advertisement.get_path(), {}, reply_handler=register_ad_cb, error_handler=register_ad_error_cb) diff --git a/Bluetooth_LE_beacon/uxplay_beacon_module_HCI.py b/Bluetooth_LE_beacon/uxplay_beacon_module_HCI.py index 30d475f..5e76813 100644 --- a/Bluetooth_LE_beacon/uxplay_beacon_module_HCI.py +++ b/Bluetooth_LE_beacon/uxplay_beacon_module_HCI.py @@ -3,12 +3,13 @@ #---------------------------------------------------------------- # HCI_Linux (uses sudo hciconfig): module for a standalone python-3.6 or later AirPlay Service-Discovery Bluetooth LE beacon for UxPlay -# this requires that users can run "sudo hciconfig" with giving a password: +# this requires that users can run "sudo -n hciconfig" without giving a password: # (1) (as root) create a group like "hciusers" -# (2) use visudo to make an entry in /etc/sudoers: +# (2a) Linux: use visudo to create a file /etc/sudoers.d/hciusers containing a line # %hciusers ALL=(ALL) NOPASSWD: /usr/bin/hcitool, /usr/bin/hciconfig -# (or or use visudo /etc/sudoers.d/hciusers to create a file /etc/sudoers.d/hciusers with this line in it) -# (3) add the user who will run uxplay-beacon.py to the group hciusers +# (2b) FreeBSD: use visudo to create /usr/local/etc/sudoers.d/hciusers with the line +# %hciusers ALL=(ALL) NOPASSWD: /usr/sbin/hccontrol +# (3) add the users who will run uxplay-beacon.py to the group hciusers import subprocess @@ -19,12 +20,15 @@ import platform from typing import Optional from typing import Literal +#global variables +hci = None +advertised_port = None +advertised_address = None + os_name = platform.system() -if os_name == 'Darwin': - os_name = 'macOS' linux = os_name == 'Linux' -bsd = 'BSD' in os_name -if not linux and not bsd: +freebsd = os_name == 'FreeBSD' +if not linux and not freebsd: print(f'{os_name} is not supported by the HCI module') raise SystemExit(1) @@ -39,7 +43,20 @@ if linux: (2) use visudo to create a file /etc/sudoers.d/hciusers containing the line: %hciusers ALL=(ALL) NOPASSWD: /usr/bin/hciconfig, /usr/bin/hcitool ''' -elif bsd: +elif freebsd: + disclaimer = ''' + + *********************************************************************** + * FreeBSD: this module currently requires a patch to FreeBSD's * + * hccontrol utility, that will hopefully be accepted into the FreeBSD * + * source tree. It is available at the UxPlay github site Wiki: * + * https://github.com/FDH2/UxPlay/wiki/hccontrol-patch-for-FreeBSD-15.0* + *********************************************************************** + wget https://github.com/user-attachments/files/26074904/hccontrol_FreeBSD_15_0_patch.txt + + ''' + print(disclaimer) + help_text2 = ''' (2) use visudo to create a file /usr/local/etc/sudoers.d/hciusers containing the line: %hciusers ALL=(ALL) NOPASSWD: /usr/sbin/hccontrol @@ -49,130 +66,121 @@ help_text3 = ''' ''' help_text = help_text1 + help_text2 + help_text3 -hci = None -LMP_version_map = ["1.0b","1.1", "1.2", "2.0+EDR", "2.1+EDR", "3.0+HS", "4.0", "4.1", "4.2", "5.0", "5.1", "5.2", "5.3", "5.4", "6.0", "6.1"] - - -advertised_port = None -advertised_address = None - - - +sudo = ['sudo', '-n'] +if linux: + ogf = "0x08" + def le_cmd(hcicmd, args): + cmd = sudo + ['hcitool', '-i', hci, 'cmd', ogf, hcicmd] + args + subprocess.run(cmd, capture_output=True, text=True, check=True) +elif freebsd: + def le_cmd(hcicmd, args): + cmd = sudo + ['hccontrol', '-n', hci, hcicmd] + args + subprocess.run(cmd, capture_output=True, text=True, check=True) + def setup_beacon(ipv4_str: str, port: int, advmin: int, advmax: int, index: Literal[None]) -> bool: - global hci global advertised_port global advertised_address advertised_port = None advertised_address = None - # convert into units of 5/8 msec. - advmin = (advmin * 8) // 5 - advmax = (advmax * 8) // 5 - # setup Advertising Parameters if linux: + # convert into units of 5/8 msec. + advmin = (advmin * 8) // 5 + advmax = (advmax * 8) // 5 min1 = f'{advmin %256 :#04x}' min2 = f'{advmin //256 :#04x}' max1 = f'{advmax % 256 :#04x}' max2 = f'{advmax // 256 :#04x}' - ogf = "0x08" - ocf = "0x0006" - cmd = ["sudo", '-n', "hcitool", "-i", hci, "cmd", ogf, ocf, min1, min2, max1, max2, '0x03', '0x00', '0x00'] + ['0x00'] * 6 + ['0x07', '0x00'] - elif bsd: - min = f'{advmin :04x}' - max = f'{advmax :04x}' - cmd = ["sudo", "-n", "hccontrol", "-n", hci, "le_set_advertising_param", min, max, '03', '00', '00', '000000000000', '07','00'] + args = [min1, min2, max1, max2, '0x03', '0x00', '0x00'] + ['0x00'] * 6 + ['0x07', '0x00'] + hcicmd = "0x0006" + elif freebsd: + min = f'-m {advmin}' + max = f'-M {advmax}' + args = [min, max, 't = 3'] + hcicmd = 'le_set_advertising_param' try: - result = subprocess.run(cmd, capture_output=True, text=True, check=True) + result = le_cmd(hcicmd, args) except subprocess.CalledProcessError as e: - print("Error:", e.stderr, e.stdout) + print(f'beacon_on error (set_advertisng_parameters):', e.stderr, e.stdout) return False - + # setup Advertising Data adv_head = ['0xff', '0x4c', '0x00', '0x09', '0x08', '0x13', '0x30'] adv_int = [int(hex_str, 16) for hex_str in adv_head] ip = list(map(int, ipv4_str.split('.'))) prt = [port // 256, port % 256] adv_int = adv_int + ip + prt - adv_len = len(adv_int) - adv_int = [adv_len + 1, adv_len ] + adv_int + if linux: - ogf = '0x08' - ocf = '0x0008' - cmd = ['sudo', '-n', 'hcitool', '-i', hci, 'cmd', ogf, ocf] - cmd = cmd + [f'{i:#04x}' for i in adv_int] - cmd = cmd + ['0x00'] * 17 - elif bsd: - cmd = ['sudo', '-n', 'hccontrol', '-n', hci, 'le_set_advertising_data'] - cmd = cmd + [f'{i:02x}' for i in adv_int] - + adv_len = len(adv_int) + adv_int = [adv_len + 1, adv_len ] + adv_int + args = [f'{i:#04x}' for i in adv_int] + args += ['0x00'] * (31 - len(adv_int)) + hcicmd = '0x0008' + elif freebsd: + adv = ','.join(f'{byte:02x}' for byte in adv_int) + args = ['-b', adv] + hcicmd = 'le_set_advertising_data' try: - result = subprocess.run(cmd, capture_output=True, text=True, check=True) + le_cmd(hcicmd, args) except subprocess.CalledProcessError as e: - print("Error:", e.stderr, e.stdout) + print(f'beacon_on error (set_advertisng_parameters):', e.stderr, e.stdout) return False - advertised_port = port advertised_address = ipv4_str return True def beacon_on() -> Optional[int]: - global advertised_port - global advertised_address - if linux: - ogf = '0x08' - ocf = '0x000a' - cmd = ['sudo', '-n', 'hcitool', '-i', hci, 'cmd', ogf, ocf, '0x01'] - elif bsd: - cmd = ['sudo', '-n', 'hccontrol', '-n', hci, 'le_set_advertising_enable', 'enable'] - try: - result = subprocess.run(cmd, capture_output=True, text=True, check=True) - print(f'Started Bluetooth LE Service Discovery beacon {advertised_address}:{advertised_port}') + hcicmd = '0x000a' + args = ['0x01'] + elif freebsd: + hcicmd = 'le_set_advertising_enable' + args = ['enable'] + try: + le_cmd(hcicmd, args) except subprocess.CalledProcessError as e: print(f'beacon_on error:', e.stderr, e.stdout) + global advertised_port + global advertised_address advertised_port = None advertised_address = None - finally: - return advertised_port + return None + print(f'AirPlay Service-Discovery beacon transmission started') + return advertised_port def beacon_off(): if linux: - ogf = '0x08' - ocf = '0x000a' - cmd = ['sudo', '-n', 'hcitool', '-i', hci, 'cmd', ogf, ocf, '0x00'] - elif bsd: - cmd = ['sudo', '-n', 'hccontrol', '-n', hci, 'le_set_advertising_enable', 'disable'] - try: - result = subprocess.run(cmd, capture_output=True, text=True, check=True) - print(f'Stopped Bluetooth LE Service Discovery beacon') - except subprocess.CalledProcessError as e: - print("Error (beacon_off):", e.stderr, e.stdout) - advertised_address = None - advertised_port = None - + hcicmd = '0x000a' + args = ['0x00'] + elif freebsd: + hcicmd = 'le_set_advertising_enable' + args = ['disable'] + le_cmd(hcicmd, args) + print(f'AirPlay Service-Discovery beacon transmission ended') + advertised_address = None + advertised_port = None + +LMP = ["1.0b","1.1", "1.2", "2.0+EDR", "2.1+EDR", "3.0+HS"] +LMP += ["4.0","4.1", "4.2", "5.0", "5.1", "5.2", "5.3", "5.4", "6.0", "6.1"] def get_bluetooth_version(device_name): - """ - Runs 'hciconfig -a ' and extracts the LMP version. - """ if linux: - cmd = f'hciconfig' - opt1 = f'' - opt2 = f'-a' + cmd ='hciconfig' + args = [cmd, device_name, '-a'] regexp = r"LMP Version: .*?\(0x([0-9a-fA-F])\)" - elif bsd: - cmd = f'hccontrol' - opt1 = f'-n' - opt2 = f'Read_Local_Version_Information' + elif freebsd: + cmd = 'hccontrol' + args = [cmd, '-n', device_name, 'Read_Local_Version_Information'] regexp = r"LMP version: .*?\[(0x[0-9a-fA-F]+)\]" try: # Run hciconfig -a for the specific device - result = subprocess.check_output([cmd, opt1, device_name, opt2], stderr=subprocess.STDOUT, text=True) + result = subprocess.check_output(args, stderr=subprocess.STDOUT, text=True) except subprocess.CalledProcessError as e: print(f"Error running {cmd} for {device_name}: {e.output}") return None except FileNotFoundError: - print("Error: {cmd} command not found") + print(f"Error: {cmd} command not found") return None # Regex to find "LMP Version: X.Y (0xZ)" lmp_version_match = re.search(regexp, result) @@ -183,22 +191,24 @@ def get_bluetooth_version(device_name): def list_devices_by_version(min_version): if linux: - cmd = f'hcitool' - opt = f'dev' + cmd = 'hcitool' + args = [cmd] + args.append('dev') regexp = r"(hci\d+)" - elif bsd: - cmd = f'hccontrol' - opt = f'Read_Node_List' + elif freebsd: + cmd = 'hccontrol' + args = [cmd] + args.append('Read_Node_List') regexp = r"(^ubt\d+hci)" - try: # Run hciconfig to list all devices - devices_list_output = subprocess.check_output([cmd, opt], stderr=subprocess.STDOUT, text=True) + devices_list_output = subprocess.check_output(args, stderr=subprocess.STDOUT, text=True) + print(devices_list_output) except subprocess.CalledProcessError as e: - print(f"Error running hciconfig: {e.output}") + print(f"Error running {cmd}: {e.output}") return None except FileNotFoundError: - print("Error: hciconfig command not found") + print(f"Error: {cmd} command not found") return None # Regex to find device names (e.g., hci0, hci1) device_names = re.findall(regexp, devices_list_output, re.MULTILINE) @@ -207,7 +217,7 @@ def list_devices_by_version(min_version): version_decimal = get_bluetooth_version(device_name) if version_decimal is None or version_decimal < min_version: continue - bt_version = LMP_version_map[version_decimal] + bt_version = LMP[version_decimal] device = [device_name, bt_version] found_devices.append(device) return found_devices @@ -216,7 +226,7 @@ from typing import Optional def find_device(hci_in: Optional[str]) -> Optional[str]: global hci list = list_devices_by_version(min_version=6) - if len(list) == 0: + if list is None or len(list) == 0: return None hci = None if hci_in is not None: @@ -234,9 +244,9 @@ def find_device(hci_in: Optional[str]) -> Optional[str]: print(f'warning: {count} HCI devices were found, the first found will be used') print(f'(to override this choice, specify "--device=..." in optional arguments)') if linux: - cmd = ['sudo', '-n', 'hciconfig', hci, 'reset'] - elif bsd: - cmd = ['sudo', '-n', 'hccontrol', '-n', hci, 'Reset'] + cmd = sudo + ['hciconfig', hci, 'reset'] + elif freebsd: + cmd = sudo + ['hccontrol', '-n', hci, 'Reset'] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) except subprocess.CalledProcessError as e: @@ -246,3 +256,4 @@ def find_device(hci_in: Optional[str]) -> Optional[str]: raise SystemExit(1) return hci +print('loaded uxplay_beacon_module_HCI') diff --git a/Bluetooth_LE_beacon/uxplay_beacon_module_winrt.py b/Bluetooth_LE_beacon/uxplay_beacon_module_winrt.py index 12f4362..13c2160 100644 --- a/Bluetooth_LE_beacon/uxplay_beacon_module_winrt.py +++ b/Bluetooth_LE_beacon/uxplay_beacon_module_winrt.py @@ -1,5 +1,5 @@ # SPDX-License-Identifier: LGPL-2.1-or-later -#---------------------------------------------------------------- +#--------------------------------------------------------------- # winrt (Windows) module for a standalone python-3.6 AirPlay Service-Discovery Bluetooth LE beacon for UxPlay # (c) F. Duncanh, March 2026 @@ -29,21 +29,36 @@ except ImportError: print(f'You may need to use pip option "--break-system-packages" (disregard the warning)') raise SystemExit(1) +import os +import asyncio +import ipaddress +from typing import Literal +from typing import Optional + +#global variables publisher = None advertised_port = None advertised_address = None +quiet = False def on_status_changed(sender, args): global publisher - print(f"Publisher status change to: {args.status.name}") + if not quiet: + print(f"Publisher status change to: {args.status.name}") + if args.status.name == "ABORTED": + print(f'Publisher was aborted after starting: perhaps no Bluetooth interface is available?') + print(f'Stopping') + os._exit(1) if args.status.name == "STOPPED": publisher = None def create_airplay_service_discovery_advertisement_publisher(ipv4_str, port): + global publisher + global advertised_port + global advertised_address assert port > 0 assert port <= 65535 mfg_data = bytearray([0x09, 0x08, 0x13, 0x30]) # Apple Data Unit type 9 (Airplay), length 8, flags 0001 0011, seed 30 - import ipaddress ipv4_address = ipaddress.ip_address(ipv4_str) ipv4 = bytearray(ipv4_address.packed) mfg_data.extend(ipv4) @@ -56,9 +71,6 @@ def create_airplay_service_discovery_advertisement_publisher(ipv4_str, port): manufacturer_data.data = writer.detach_buffer() advertisement = ble_adv.BluetoothLEAdvertisement() advertisement.manufacturer_data.append(manufacturer_data) - global publisher - global advertised_port - global advertised_address publisher = ble_adv.BluetoothLEAdvertisementPublisher(advertisement) advertised_port = port advertised_address = ipv4_str @@ -69,38 +81,42 @@ async def publish_advertisement(): global advertised_address try: publisher.start() - print(f"AirPlay Service_Discovery Advertisement ({advertised_address}:{advertised_port}) registered") + if not quiet: + print(f"AirPlay Service_Discovery Advertisement ({advertised_address}:{advertised_port}) registered") except Exception as e: print(f"Failed to start Publisher: {e}") print(f"Publisher Status: {publisher.status.name}") advertised_address = None advertised_port = None -from typing import Literal + def setup_beacon(ipv4_str: str, port:int , advmin: Literal[None], advmax :Literal[None], index :Literal[None]) ->bool: + global quiet + quiet = False + if port == 1: + #fake port used for testing + print(f'beacon test') + quiet = True if (advmin is not None) or (advmax is not None) or (index is not None): raise ValueError('uxplay_beacon_module_winrt: advmin, advmax, index were not all None') create_airplay_service_discovery_advertisement_publisher(ipv4_str, port) return True -from typing import Optional def beacon_on() -> Optional[int]: - import asyncio + global publisher + global advertised_port try: asyncio.run(publish_advertisement()) except Exception as e: print(f"Failed to start publisher: {e}") - global publisher publisher = None - finally: - #advertised_port is set to None if publish_advertisement failed - global advertised_port - return advertised_port + #advertised_port is set to None if publish_advertisement failed + return advertised_port def beacon_off(): - publisher.stop() global advertised_port global advertised_address + publisher.stop() advertised_port = None advertised_address = None