diff --git a/Bluetooth_LE_beacon/uxplay-beacon.1 b/Bluetooth_LE_beacon/uxplay-beacon.1 new file mode 100644 index 0000000..0eebbf3 --- /dev/null +++ b/Bluetooth_LE_beacon/uxplay-beacon.1 @@ -0,0 +1,54 @@ +.TH UXPLAY 1 2026-01-26 "UxPlay 1.73" "User Commands" +.SH NAME +uxplay-beacon.py \- Python (>= 3.6) script for a Bluetooth LE Service-Discovery beacon. +.SH SYNOPSIS +.B uxplay-beacon.py +[BlueIO] [\fI\, -h, --help] + more options. +.SH DESCRIPTION +.TP +UxPlay 1.73: Standalone Python Script for Bluetooth LE Service Discovery +.nf +Modules for BLE support: +BlueZ: (Linux, with D-Bus) +winrt: (Windows) +BleuIO: (for a BleuIO USB serial device, all platforms, including macOS). +The best choice for host platform is made unless option BleuIO is used. +.SH OPTIONS +.TP +.B +\fB\ BleuIO\fR Force the use of a BleuIO device +.TP +\fB\--file\fR fn Specify configuration file (default: ~/.uxplay.beacon) +.TP +\fB\--path\fR fn Specify a non-default Bluetooth LE data file used by uxplay +.TP +\fB\--ipv4\fR ip Override automatically-found ipv4 address for contacting UxPlay +.TP +\fB\--AdvMin\fR x Minimum Advertising interval in msecs (>= 100) +.TP +\fB\--AdvMax\fR y Maximum Advertising interval in msecs (>= AdvMin, <= 102400) +.TP +\fB\--index\fR x Used to distinguish different instances of beacons +.TP +\fB\--serial\fR p Override automatically-found serial port for BleuIO device +.TP +\fB \-h, --help\fR Show help text. +.SH +FILES +Options in configuration file are applied first (command-line options may modify them). +.TP +Format: one option per line, with initial "--"; lines beginning with "#" ignored. +.SH +AUTHORS +.TP +Various, see website or distribution. +.SH +COPYRIGHT +.TP +Various, see website or distribution. License: GPL v3+: +.TP +GNU GPL version 3 or later. (some parts LGPL v.2.1+ or MIT). +.SH +SEE ALSO +.TP +Website: diff --git a/Bluetooth_LE_beacon/uxplay-beacon.py b/Bluetooth_LE_beacon/uxplay-beacon.py new file mode 100644 index 0000000..4c4c3c0 --- /dev/null +++ b/Bluetooth_LE_beacon/uxplay-beacon.py @@ -0,0 +1,498 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: LGPL-2.1-or-later +# adapted from https://github.com/bluez/bluez/blob/master/test/example-advertisement +#---------------------------------------------------------------- +# a standalone python-3.6 or later DBus-based AirPlay Service-Discovery Bluetooth LE beacon for UxPlay +# (c) F. Duncanh, October 2025 + +import sys +if not sys.version_info >= (3,6): + print("uxplay-beacon.py requires Python 3.6 or higher") + +import gi +try: + from gi.repository import GLib +except ImportError as e: + print(f'ImportError: {e}, failed to import GLib from Python GObject Introspection Library ("gi")') + print('Install PyGObject pip3 install PyGobject==3.50.0') + raise SystemExit(1) + +import platform +os_name = platform.system() +windows = 'Windows' +linux = 'Linux' +macos = 'Darwin' + +advertised_port = None +advertised_address = None + + +def check_port(port): + if advertised_port is None or port == advertised_port: + return True + else: + return False + + +import importlib +import argparse +import textwrap +import os +import struct +import socket +import time + +try: + import psutil +except ImportError as e: + print(f'ImportError {e}: failed to import psutil') + print(f' install the python3 psutil package') + raise SystemExit(1) + +# global variables +beacon_is_running = False +beacon_is_pending_on = False +beacon_is_pending_off = False + +port = None +advmin = None +advmax = None +ipv4_str = None +index = None + +from typing import Optional +def setup_beacon(ipv4_str: str, port: int, advmin: Optional[int], advmax: Optional[int], index: Optional[int]) -> int: + return 0 + +def beacon_on() ->bool: + return False + +def beacon_off() ->int: + return 0 + +def start_beacon(): + global beacon_is_running + global port + global ipv4_str + global advmin + global advmax + global index + setup_beacon(ipv4_str, port, advmin, advmax, index) + beacon_is_running = beacon_on() + + +def stop_beacon(): + global beacon_is_running + global advertised_port + advertised_port = beacon_off() + beacon_is_running = False + +def pid_is_running(pid): + return psutil.pid_exists(pid) + +def check_process_name(pid, pname): + try: + process = psutil.Process(pid) + if process.name().find(pname,0) == 0: + return True + else: + return False + except psutil.NoSuchProcess: + return False + +def check_pending(): + global beacon_is_pending_on + global beacon_is_pending_off + if beacon_is_running: + if beacon_is_pending_off: + stop_beacon() + beacon_is_pending_off = False + else: + if beacon_is_pending_on: + start_beacon() + beacon_is_pending_on = False + return True + + +def check_file_exists(file_path): + global port + global beacon_is_pending_on + global beacon_is_pending_off + pname = "process name unread" + if os.path.isfile(file_path): + test = True + try: + with open(file_path, 'rb') as file: + data = file.read(2) + port = struct.unpack('= min): + raise ValueError('advmax was smaller than advmin') + if not (max <= 10240): + raise ValueError('advmax was larger than 10240 msecs') + +from typing import Literal +def setup_beacon(ipv4_str: str, port: int, advmin: int, advmax: int, index: Literal[None]) ->int: + if index is not None: + raise ValuError('uxplay_beacon_module_BleuIO called with value of index: not None') + global advertised_port + global advertised_address + global airplay_advertisement + global advertisement_parameters + check_adv_intrvl(advmin, advmax) + # set up advertising message: + assert port > 0 + assert port <= 65535 + import ipaddress + ipv4_address = ipaddress.ip_address(ipv4_str) + port_bytes = port.to_bytes(2, 'big') + data = bytearray([0xff, 0x4c, 0x00]) # ( 3 bytes) type manufacturer_specific 0xff, manufacturer id Apple 0x004c + data.extend(bytearray([0x09, 0x08, 0x13, 0x30])) # (4 bytes) Apple Data Unit type 9 (Airplay), Apple data length 8, Apple flags 0001 0011, seed 30 + data.extend(bytearray(ipv4_address.packed)) # (4 bytes) ipv4 address + data.extend(port_bytes) # (2 bytes) port + length = len(data) # 13 bytes + adv_data = bytearray([length]) # first byte of message data unit is length of meaningful data that follows (0x0d = 13) + adv_data.extend(data) + airplay_advertisement = ':'.join(format(b,'02x') for b in adv_data) + advertisement_parameters = "0;" + str(advmin) + ";" + str(advmax) + ";0;" # non-connectable mode, min ad internal, max ad interval, time = unlimited + advertised_address = ipv4_str + advertised_port = port + return advertised_port + +def beacon_on() ->bool: + global airplay_advertisement + global advertisement_parameters + global serial_port + success = False + try: + print(f'Connecting to BleuIO dongle on {serial_port} ....') + with serial.Serial(serial_port, 115200, timeout = 1) as ser: + print(f'Connection established') + #Start advertising + response = send_at_command(ser, "AT+ADVDATA=" + airplay_advertisement) + #print(response) + response = send_at_command(ser, "AT+ADVSTART=" + advertisement_parameters) + #print(f'{response}') + print(f'AirPlay Service Discovery advertising started, port = {advertised_port} ip address = {advertised_address}') + success = True + except serial.SerialException as e: + print(f"beacon_on: Serial port error: {e}") + raise SystemExit(1) + except Exception as e: + print(f"beacon_on: An unexpected error occurred: {e}") + raise SystemExit(1) + finally: + ser.close() + return success + +def beacon_off() ->int: + global advertisement_parameters + global airplay_advertisement + global advertised_port + global advertised_address + global serial_port + # Stop advertising + try: + with serial.Serial(serial_port, 115200, timeout = 1) as ser: + response = send_at_command(ser, "AT+ADVSTOP") + #print(f'{response}') + print(f'AirPlay Service-Discovery beacon advertisement stopped') + airplay_advertisement = None + advertised_Port = None + advertised_address = None + advertisement_parameters = None + resullt = True + except serial.SerialException as e: + print(f"beacon_off: Serial port error: {e}") + raise SystemExit(1) + except Exception as e: + print(f"beacon_off: An unexpected error occurred: {e}") + raise SystemExit(1) + finally: + ser.close() + return advertised_port + +from typing import Optional +def find_bleuio(serial_port_in: Optional[str]) ->Optional[str]: + global serial_port + serial_ports = list(list_ports.comports()) + count = 0 + serial_port_found = False + serial_port = None + TARGET_VID = 0x2DCF # used by BleuIO and BleuIO Pro + if serial_port_in is not None: + for p in serial_ports: + if p.vid is None: + continue + if p.vid == TARGET_VID and p.device == serial_port_in: + serial_port = serial_port_in + return serial_port + for p in serial_ports: + if p.vid is not None and p.vid == TARGET_VID: + count+=1 + if count == 1: + serial_port = p.device + print(f'=== detected BlueuIO {count}. port: {p.device} desc: {p.description} hwid: {p.hwid}') + + if count>1: + print(f'warning: {count} BleueIO devices were found, the first found will be used') + print(f'(to override this choice, specify "--serial_port=..." in optional arguments') + + return serial_port + +print(f'Imported uxplay_beacon_module_BleuIO') diff --git a/Bluetooth_LE_beacon/uxplay_beacon_module_BlueZ.py b/Bluetooth_LE_beacon/uxplay_beacon_module_BlueZ.py new file mode 100644 index 0000000..80ab108 --- /dev/null +++ b/Bluetooth_LE_beacon/uxplay_beacon_module_BlueZ.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: LGPL-2.1-or-later +# adapted from https://github.com/bluez/bluez/blob/master/test/example-advertisement +#---------------------------------------------------------------- +# BlueZ/D-Bus (Linux) module for a standalone python-3.6 AirPlay Service-Discovery Bluetooth LE beacon for UxPlay +# (c) F. Duncanh, March 2026 + +try: + import dbus + import dbus.exceptions + import dbus.mainloop.glib + import dbus.service +except ImportError as e: + print(f"ImportError: {e}, failed to import required dbus components") + print(f"install the python3 dbus package") + raise SystemExit(1) + +ad_manager = None +airplay_advertisement = None +advertised_port = None +advertised_address = None + +BLUEZ_SERVICE_NAME = 'org.bluez' +LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1' +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' + +LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1' + +class InvalidArgsException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' + +class NotSupportedException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.NotSupported' + +class NotPermittedException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.NotPermitted' + +class InvalidValueLengthException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.InvalidValueLength' + +class FailedException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.Failed' + +class AirPlay_Service_Discovery_Advertisement(dbus.service.Object): + PATH_BASE = '/org/bluez/airplay_service_discovery_advertisement' + + def __init__(self, bus, index): + self.path = self.PATH_BASE + str(index) + self.bus = bus + self.manufacturer_data = None + self.min_intrvl = 0 + self.max_intrvl = 0 + dbus.service.Object.__init__(self, bus, self.path) + + def get_properties(self): + properties = dict() + properties['Type'] = 'broadcast' + if self.manufacturer_data is not None: + properties['ManufacturerData'] = dbus.Dictionary( + self.manufacturer_data, signature='qv') + if self.min_intrvl > 0: + properties['MinInterval'] = dbus.UInt32(self.min_intrvl) + if self.max_intrvl > 0: + properties['MaxInterval'] = dbus.UInt32(self.max_intrvl) + return {LE_ADVERTISEMENT_IFACE: properties} + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_manufacturer_data(self, manuf_code, manuf_data): + if not self.manufacturer_data: + self.manufacturer_data = dbus.Dictionary({}, signature='qv') + self.manufacturer_data[manuf_code] = dbus.Array(manuf_data, signature='y') + + def set_min_intrvl(self, min_intrvl): + if self.min_intrvl == 0: + self.min_intrvl = 100 + self.min_intrvl = max(min_intrvl, 100) + + def set_max_intrvl(self, max_intrvl): + if self.max_intrvl == 0: + self.max_intrvl = 100 + self.max_intrvl = max(max_intrvl, 100) + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + + def GetAll(self, interface): + if interface != LE_ADVERTISEMENT_IFACE: + raise InvalidArgsException() + return self.get_properties()[LE_ADVERTISEMENT_IFACE] + + @dbus.service.method(LE_ADVERTISEMENT_IFACE, + in_signature='', + out_signature='') + + def Release(self): + print(f'{self.path}: Released!') + +class AirPlayAdvertisement(AirPlay_Service_Discovery_Advertisement): + + def __init__(self, bus, index, ipv4_str, port, min_intrvl, max_intrvl): + AirPlay_Service_Discovery_Advertisement.__init__(self, bus, index) + assert port > 0 + assert port <= 65535 + mfg_data = bytearray([0x09, 0x08, 0x13, 0x30]) # Apple Data Unit type 9 (Airplay), length 8, flags 0001 0011, seed 30 + import ipaddress + ipv4_address = ipaddress.ip_address(ipv4_str) + ipv4 = bytearray(ipv4_address.packed) + mfg_data.extend(ipv4) + port_bytes = port.to_bytes(2, 'big') + mfg_data.extend(port_bytes) + self.add_manufacturer_data(0x004c, mfg_data) + self.set_min_intrvl(min_intrvl) + self.set_max_intrvl(max_intrvl) + +def register_ad_cb(): + print(f'AirPlay Service_Discovery Advertisement ({advertised_address}:{advertised_port}) registered') + +def register_ad_error_cb(error): + print(f'Failed to register advertisement: {error}') + global ad_manager + global advertised_port + global advertised_address + ad_manager = None + advertised_port = None + advertised_address = None + +def find_adapter(bus): + remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), + DBUS_OM_IFACE) + objects = remote_om.GetManagedObjects() + for o, props in objects.items(): + if LE_ADVERTISING_MANAGER_IFACE in props: + return o + return None + +def setup_beacon(ipv4_str :str, port :int, advmin :int, advmax :int, index :int ) ->int: + global ad_manager + global airplay_advertisement + global advertised_address + global advertised_port + advertised_port = port + advertised_address = ipv4_str + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + bus = dbus.SystemBus() + adapter = find_adapter(bus) + if not adapter: + print(f'LEAdvertisingManager1 interface not found') + return + adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), + "org.freedesktop.DBus.Properties") + adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1)) + ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), + LE_ADVERTISING_MANAGER_IFACE) + airplay_advertisement = AirPlayAdvertisement(bus, index, ipv4_str, port, advmin, advmax) + return advertised_port + +def beacon_on() ->bool: + global airplay_advertisement + ad_manager.RegisterAdvertisement(airplay_advertisement.get_path(), {}, + reply_handler=register_ad_cb, + error_handler=register_ad_error_cb) + if ad_manager is None: + airplay_advertisement = None + advertised_port = None + advertised_address = None + return False + else: + return True + +def beacon_off() ->int: + global ad_manager + global airplay_advertisement + global advertised_port + global advertised_address + if ad_manager is not None: + ad_manager.UnregisterAdvertisement(airplay_advertisement) + print(f'AirPlay Service-Discovery beacon advertisement unregistered') + ad_manager = None + if airplay_advertisement is not None: + dbus.service.Object.remove_from_connection(airplay_advertisement) + airplay_advertisement = None + advertised_Port = None + advertised_address = None + return advertised_port + +print(f'loaded uxplay_beacon_module_BlueZ ') diff --git a/Bluetooth_LE_beacon/uxplay_beacon_module_winrt.py b/Bluetooth_LE_beacon/uxplay_beacon_module_winrt.py new file mode 100644 index 0000000..743e373 --- /dev/null +++ b/Bluetooth_LE_beacon/uxplay_beacon_module_winrt.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: LGPL-2.1-or-later +#---------------------------------------------------------------- +# winrt (Windows) module for a standalone python-3.6 AirPlay Service-Discovery Bluetooth LE beacon for UxPlay +# (c) F. Duncanh, March 2026 + +# Import WinRT APIs (see https://pypi.org/project/winrt-Windows.Foundation.Collections/) +try: + import winrt.windows.foundation.collections +except ImportError: + print(f"ImportError from winrt-Windows.Foundation.Collections") + print(f"Install with 'pip install winrt-Windows.Foundation'") + print(f"and with 'pip install winrt-Windows.Foundation.Collections'") + print(f'You may need to use pip option "--break-system-packages" (disregard the warning)') + raise SystemExit(1) + +try: + import winrt.windows.devices.bluetooth.advertisement as ble_adv +except ImportError: + print(f"ImportError from winrt-Windows.Devices.Bluetooth.Advertisement") + print(f"Install with 'pip install winrt-Windows.Devices.Bluetooth.Advertisement'") + print(f'You may need to use pip option "--break-system-packages" (disregard the warning)') + raise SystemExit(1) + +try: + import winrt.windows.storage.streams as streams +except ImportError: + print(f"ImportError from winrt-Windows.Storage.Streams") + print(f"Install with 'pip install winrt-Windows.Storage.Streams'") + print(f'You may need to use pip option "--break-system-packages" (disregard the warning)') + raise SystemExit(1) + +publisher = None +advertised_port = None +advertised_address = None + +def on_status_changed(sender, args): + global publisher + print(f"Publisher status change to: {args.status.name}") + if args.status.name == "STOPPED": + publisher = None + +def create_airplay_service_discovery_advertisement_publisher(ipv4_str, port): + assert port > 0 + assert port <= 65535 + mfg_data = bytearray([0x09, 0x08, 0x13, 0x30]) # Apple Data Unit type 9 (Airplay), length 8, flags 0001 0011, seed 30 + import ipaddress + ipv4_address = ipaddress.ip_address(ipv4_str) + ipv4 = bytearray(ipv4_address.packed) + mfg_data.extend(ipv4) + port_bytes = port.to_bytes(2, 'big') + mfg_data.extend(port_bytes) + writer = streams.DataWriter() + writer.write_bytes(mfg_data) + manufacturer_data = ble_adv.BluetoothLEManufacturerData() + manufacturer_data.company_id = 0x004C #Apple + manufacturer_data.data = writer.detach_buffer() + advertisement = ble_adv.BluetoothLEAdvertisement() + advertisement.manufacturer_data.append(manufacturer_data) + global publisher + global advertised_port + global advertised_address + publisher = ble_adv.BluetoothLEAdvertisementPublisher(advertisement) + advertised_port = port + advertised_address = ipv4_str + publisher.add_status_changed(on_status_changed) + +async def publish_advertisement(): + global advertised_port + global advertised_address + try: + publisher.start() + print(f"AirPlay Service_Discovery Advertisement ({advertised_address}:{advertised_port}) registered") + + except Exception as e: + print(f"Failed to start Publisher: {e}") + print(f"Publisher Status: {publisher.status.name}") + advertised_address = None + advertised_port = None + +from typing import Literal +def setup_beacon(ipv4_str: str, port:int , advmin: Literal[None], advmax :Literal[None], index :Literal[None]) ->int: + if (advmin is not None) or (advmax is not None) or (index is not None): + raise ValueError('uxplay_beacon_module_winrt: advmin, advmax, index were not all None') + global advertised_port + create_airplay_service_discovery_advertisement_publisher(ipv4_str, port) + return advertised_port + +def beacon_on() -> bool: + import asyncio + try: + asyncio.run( publish_advertisement()) + return True + except Exception as e: + print(f"Failed to start publisher: {e}") + global publisher + publisher = None + return False + + +def beacon_off() ->int: + publisher.stop() + global advertised_port + global advertised_address + advertised_port = None + advertised_address = None + return advertised_port + +print(f'loaded uxplay_beacon_module_winrt')