various fixes/cleanups to beacon modules

This commit is contained in:
F. Duncanh
2026-03-15 00:46:49 -04:00
parent 05bedcfaf1
commit 83c434dfcc
4 changed files with 79 additions and 56 deletions

View File

@@ -50,14 +50,14 @@ os_name = platform.system()
# external functions that must be supplied by loading a module:
from typing import Optional
def setup_beacon(ipv4_str: str, port: int, advmin: Optional[int], advmax: Optional[int], index: Optional[int]) -> int:
return 0
def beacon_on() ->bool:
def setup_beacon(ipv4_str: str, port: int, advmin: Optional[int], advmax: Optional[int], index: Optional[int]) -> bool:
return False
def beacon_off() ->int:
return 0
def beacon_on() ->Optional[int]:
return None
def beacon_off():
return
def find_device(device: Optional[str]) -> Optional[str]:
return None
@@ -70,16 +70,22 @@ def start_beacon():
global advmin
global advmax
global index
if beacon_is_running:
print(f'code error, should not happen')
raise SystemExit(1)
setup_beacon(ipv4_str, port, advmin, advmax, index)
beacon_is_running = beacon_on()
advertised_port = beacon_on()
beacon_is_running = advertised_port is not None
if not beacon_is_running:
print(f'second attempt to start beacon:')
beacon_is_running = beacon_on()
advertised_port = beacon_on()
beacon_is_running = advertised_port is not None
def stop_beacon():
global beacon_is_running
global advertised_port
advertised_port = beacon_off()
beacon_off()
advertised_port = None
beacon_is_running = False
def pid_is_running(pid):
@@ -223,6 +229,7 @@ if __name__ == '__main__':
ble_bluez = "bluez"
ble_winrt = "winrt"
ble_bleuio = "bleuio"
ble_hci = "hci"
# Create an ArgumentParser object
epilog_text = '''
@@ -252,12 +259,13 @@ if __name__ == '__main__':
bleuio = 'BleuIO'
winrt = 'winrt'
bluez = 'BlueZ'
hci = 'HCI'
# Add arguments
parser.add_argument(
'ble_type',
nargs='?',
choices=[bleuio, None],
choices=[bleuio, hci, None],
help=textwrap.dedent('''
Specifies whether or not to use the module supporting the BleuIO USB dongle, or
(if not supplied) the default native Linux (BlueZ) or Windows (winrt) modules.
@@ -464,7 +472,7 @@ if __name__ == '__main__':
beacon_off = ble.beacon_off
need_device = False
if ble_type == bleuio:
if ble_type == bleuio or ble_type == hci:
# obtain serial port for BleuIO device
find_device = ble.find_device
need_device = True

View File

@@ -50,7 +50,7 @@ def check_adv_intrvl(min, max):
raise ValueError('advmax was larger than 10240 msecs')
from typing import Literal
def setup_beacon(ipv4_str: str, port: int, advmin: int, advmax: int, index: Literal[None]) ->int:
def setup_beacon(ipv4_str: str, port: int, advmin: int, advmax: int, index: Literal[None]) ->bool:
if index is not None:
raise ValuError('uxplay_beacon_module_BleuIO called with value of index: not None')
global advertised_port
@@ -75,13 +75,14 @@ def setup_beacon(ipv4_str: str, port: int, advmin: int, advmax: int, index: Lite
advertisement_parameters = "0;" + str(advmin) + ";" + str(advmax) + ";0;" # non-connectable mode, min ad internal, max ad interval, time = unlimited
advertised_address = ipv4_str
advertised_port = port
return advertised_port
return True
def beacon_on() ->bool:
global airplay_advertisement
global advertisement_parameters
global advertised_port
global serial_port
success = False
ser = None
try:
print(f'Connecting to BleuIO dongle on {serial_port} ....')
with serial.Serial(serial_port, 115200, timeout = 1) as ser:
@@ -92,23 +93,25 @@ def beacon_on() ->bool:
response = send_at_command(ser, "AT+ADVSTART=" + advertisement_parameters)
#print(f'{response}')
print(f'AirPlay Service Discovery advertising started, port = {advertised_port} ip address = {advertised_address}')
success = True
except serial.SerialException as e:
print(f"beacon_on: Serial port error: {e}")
raise SystemExit(1)
advertised_port = None
except Exception as e:
print(f"beacon_on: An unexpected error occurred: {e}")
raise SystemExit(1)
advertised_port = None
finally:
if ser is not None:
ser.close()
return success
return advertised_port
def beacon_off() ->int:
def beacon_off():
global advertisement_parameters
global airplay_advertisement
global advertised_port
global advertised_address
global serial_port
ser = None
# Stop advertising
try:
with serial.Serial(serial_port, 115200, timeout = 1) as ser:
@@ -122,13 +125,11 @@ def beacon_off() ->int:
resullt = True
except serial.SerialException as e:
print(f"beacon_off: Serial port error: {e}")
raise SystemExit(1)
except Exception as e:
print(f"beacon_off: An unexpected error occurred: {e}")
raise SystemExit(1)
finally:
if ser is not None:
ser.close()
return advertised_port
from typing import Optional
def find_device(serial_port_in: Optional[str]) ->Optional[str]:
@@ -144,7 +145,7 @@ def find_device(serial_port_in: Optional[str]) ->Optional[str]:
continue
if p.vid == TARGET_VID and p.device == serial_port_in:
serial_port = serial_port_in
return serial_port
if serial_port is None:
for p in serial_ports:
if p.vid is not None and p.vid == TARGET_VID:
count+=1
@@ -154,6 +155,23 @@ def find_device(serial_port_in: Optional[str]) ->Optional[str]:
if count>1:
print(f'warning: {count} BleueIO devices were found, the first found will be used')
print(f'(to override this choice, specify "--device =..." in optional arguments)')
if serial_port is None:
return serial_port
#test access to serial_port
try:
with serial.Serial(serial_port, 115200, timeout = 1) as ser:
send_at_command(ser, "AT")
ser.close()
except Exception as e:
print(f"beacon_on: Serial port error: {e}")
text='''
The user does not have sufficient privilegs to access this serial port:
On Linux, the system administrator should add the user to the "dialout" group
On BSD systems, the necesary group is usually the "dialer" group.
This can be checked with '''
print(text, f'"ls -l {serial_port}"')
raise SystemExit(1)
return serial_port
print(f'Imported uxplay_beacon_module_BleuIO')

View File

@@ -119,7 +119,7 @@ def register_ad_cb():
print(f'AirPlay Service_Discovery Advertisement ({advertised_address}:{advertised_port}) registered')
def register_ad_error_cb(error):
print(f'Failed to register advertisement: {error}')
print(f'register_ad: {error}')
global ad_manager
global advertised_port
global advertised_address
@@ -136,6 +136,7 @@ def find_adapter(bus):
return o
return None
from typing import Optional
def setup_beacon(ipv4_str :str, port :int, advmin :int, advmax :int, index :int ) ->int:
global ad_manager
global airplay_advertisement
@@ -148,29 +149,26 @@ def setup_beacon(ipv4_str :str, port :int, advmin :int, advmax :int, index :int
adapter = find_adapter(bus)
if not adapter:
print(f'LEAdvertisingManager1 interface not found')
return
return False
adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
"org.freedesktop.DBus.Properties")
adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))
ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
LE_ADVERTISING_MANAGER_IFACE)
airplay_advertisement = AirPlayAdvertisement(bus, index, ipv4_str, port, advmin, advmax)
return advertised_port
return True
def beacon_on() ->bool:
def beacon_on() ->Optional[int]:
global airplay_advertisement
ad_manager.RegisterAdvertisement(airplay_advertisement.get_path(), {},
reply_handler=register_ad_cb,
error_handler=register_ad_error_cb)
if ad_manager is None:
# if registration error occurs, advertised_port is set to None by callback
if advertised_port is None:
airplay_advertisement = None
advertised_port = None
advertised_address = None
return False
else:
return True
return advertised_port
def beacon_off() ->int:
def beacon_off():
global ad_manager
global airplay_advertisement
global advertised_port
@@ -184,6 +182,5 @@ def beacon_off() ->int:
airplay_advertisement = None
advertised_Port = None
advertised_address = None
return advertised_port
print(f'loaded uxplay_beacon_module_BlueZ ')

View File

@@ -77,31 +77,31 @@ async def publish_advertisement():
advertised_port = None
from typing import Literal
def setup_beacon(ipv4_str: str, port:int , advmin: Literal[None], advmax :Literal[None], index :Literal[None]) ->int:
def setup_beacon(ipv4_str: str, port:int , advmin: Literal[None], advmax :Literal[None], index :Literal[None]) ->bool:
if (advmin is not None) or (advmax is not None) or (index is not None):
raise ValueError('uxplay_beacon_module_winrt: advmin, advmax, index were not all None')
global advertised_port
create_airplay_service_discovery_advertisement_publisher(ipv4_str, port)
return advertised_port
return True
def beacon_on() -> bool:
from typing import Optional
def beacon_on() -> Optional[int]:
import asyncio
try:
asyncio.run(publish_advertisement())
return True
except Exception as e:
print(f"Failed to start publisher: {e}")
global publisher
publisher = None
return False
finally:
#advertised_port is set to None if publish_advertisement failed
global advertised_port
return advertised_port
def beacon_off() ->int:
def beacon_off():
publisher.stop()
global advertised_port
global advertised_address
advertised_port = None
advertised_address = None
return advertised_port
print(f'loaded uxplay_beacon_module_winrt')