mirror of
https://github.com/morgan9e/UxPlay
synced 2026-04-14 00:04:13 +09:00
160 lines
6.5 KiB
Python
160 lines
6.5 KiB
Python
# SPDX-License-Identifier: LGPL-2.1-or-later
|
|
# adapted from https://github.com/bluez/bluez/blob/master/test/example-advertisement
|
|
#----------------------------------------------------------------
|
|
# BleuIO (for BleuIO UB serial device) module for a standalone python-3.6 or later AirPlay Service-Discovery Bluetooth LE beacon for UxPlay
|
|
# (c) F. Duncanh, March 2026
|
|
|
|
|
|
# **** This implementation requires a blueio dongle https://bleuio.com/bluetooth-low-energy-usb-ssd005.php
|
|
# This device has a self-contained bluetooth LE stack packaged as a usb serial modem.
|
|
# It is needed on macOS because macOS does not permit users to send manufacturer-specific BLE advertisements
|
|
# with its native BlueTooth stack. It works also on linux and windows.
|
|
|
|
import time
|
|
import os
|
|
|
|
try:
|
|
import serial
|
|
from serial.tools import list_ports
|
|
except ImportError as e:
|
|
print(f'ImportError: {e}, failed to import required serial port support')
|
|
print(f'install pyserial')
|
|
raise SystemExit(1)
|
|
|
|
advertised_port = None
|
|
advertised_address = None
|
|
serial_port = None
|
|
advertisement_parameters = None
|
|
airplay_advertisement = None
|
|
|
|
# --- Serial Communication Helper Functions ---
|
|
def send_at_command(serial_port, command):
|
|
# Sends an AT command and reads the response.
|
|
serial_port.write(f"{command}\r\n".encode('utf-8'))
|
|
time.sleep(0.1) # Give the dongle a moment to respond
|
|
response = ""
|
|
while serial_port.in_waiting:
|
|
response += serial_port.readline().decode('utf-8')
|
|
response_without_empty_lines = os.linesep.join(
|
|
[line for line in response.splitlines() if line]
|
|
)
|
|
return response_without_empty_lines
|
|
|
|
#check AdvInterval
|
|
def check_adv_intrvl(min, max):
|
|
if not (100 <= min):
|
|
raise ValueError('advmin was smaller than 100 msecs')
|
|
if not (max >= min):
|
|
raise ValueError('advmax was smaller than advmin')
|
|
if not (max <= 10240):
|
|
raise ValueError('advmax was larger than 10240 msecs')
|
|
|
|
from typing import Literal
|
|
def setup_beacon(ipv4_str: str, port: int, advmin: int, advmax: int, index: Literal[None]) ->int:
|
|
if index is not None:
|
|
raise ValuError('uxplay_beacon_module_BleuIO called with value of index: not None')
|
|
global advertised_port
|
|
global advertised_address
|
|
global airplay_advertisement
|
|
global advertisement_parameters
|
|
check_adv_intrvl(advmin, advmax)
|
|
# set up advertising message:
|
|
assert port > 0
|
|
assert port <= 65535
|
|
import ipaddress
|
|
ipv4_address = ipaddress.ip_address(ipv4_str)
|
|
port_bytes = port.to_bytes(2, 'big')
|
|
data = bytearray([0xff, 0x4c, 0x00]) # ( 3 bytes) type manufacturer_specific 0xff, manufacturer id Apple 0x004c
|
|
data.extend(bytearray([0x09, 0x08, 0x13, 0x30])) # (4 bytes) Apple Data Unit type 9 (Airplay), Apple data length 8, Apple flags 0001 0011, seed 30
|
|
data.extend(bytearray(ipv4_address.packed)) # (4 bytes) ipv4 address
|
|
data.extend(port_bytes) # (2 bytes) port
|
|
length = len(data) # 13 bytes
|
|
adv_data = bytearray([length]) # first byte of message data unit is length of meaningful data that follows (0x0d = 13)
|
|
adv_data.extend(data)
|
|
airplay_advertisement = ':'.join(format(b,'02x') for b in adv_data)
|
|
advertisement_parameters = "0;" + str(advmin) + ";" + str(advmax) + ";0;" # non-connectable mode, min ad internal, max ad interval, time = unlimited
|
|
advertised_address = ipv4_str
|
|
advertised_port = port
|
|
return advertised_port
|
|
|
|
def beacon_on() ->bool:
|
|
global airplay_advertisement
|
|
global advertisement_parameters
|
|
global serial_port
|
|
success = False
|
|
try:
|
|
print(f'Connecting to BleuIO dongle on {serial_port} ....')
|
|
with serial.Serial(serial_port, 115200, timeout = 1) as ser:
|
|
print(f'Connection established')
|
|
#Start advertising
|
|
response = send_at_command(ser, "AT+ADVDATA=" + airplay_advertisement)
|
|
#print(response)
|
|
response = send_at_command(ser, "AT+ADVSTART=" + advertisement_parameters)
|
|
#print(f'{response}')
|
|
print(f'AirPlay Service Discovery advertising started, port = {advertised_port} ip address = {advertised_address}')
|
|
success = True
|
|
except serial.SerialException as e:
|
|
print(f"beacon_on: Serial port error: {e}")
|
|
raise SystemExit(1)
|
|
except Exception as e:
|
|
print(f"beacon_on: An unexpected error occurred: {e}")
|
|
raise SystemExit(1)
|
|
finally:
|
|
ser.close()
|
|
return success
|
|
|
|
def beacon_off() ->int:
|
|
global advertisement_parameters
|
|
global airplay_advertisement
|
|
global advertised_port
|
|
global advertised_address
|
|
global serial_port
|
|
# Stop advertising
|
|
try:
|
|
with serial.Serial(serial_port, 115200, timeout = 1) as ser:
|
|
response = send_at_command(ser, "AT+ADVSTOP")
|
|
#print(f'{response}')
|
|
print(f'AirPlay Service-Discovery beacon advertisement stopped')
|
|
airplay_advertisement = None
|
|
advertised_Port = None
|
|
advertised_address = None
|
|
advertisement_parameters = None
|
|
resullt = True
|
|
except serial.SerialException as e:
|
|
print(f"beacon_off: Serial port error: {e}")
|
|
raise SystemExit(1)
|
|
except Exception as e:
|
|
print(f"beacon_off: An unexpected error occurred: {e}")
|
|
raise SystemExit(1)
|
|
finally:
|
|
ser.close()
|
|
return advertised_port
|
|
|
|
from typing import Optional
|
|
def find_device(serial_port_in: Optional[str]) ->Optional[str]:
|
|
global serial_port
|
|
serial_ports = list(list_ports.comports())
|
|
count = 0
|
|
serial_port_found = False
|
|
serial_port = None
|
|
TARGET_VID = 0x2DCF # used by BleuIO and BleuIO Pro
|
|
if serial_port_in is not None:
|
|
for p in serial_ports:
|
|
if p.vid is None:
|
|
continue
|
|
if p.vid == TARGET_VID and p.device == serial_port_in:
|
|
serial_port = serial_port_in
|
|
return serial_port
|
|
for p in serial_ports:
|
|
if p.vid is not None and p.vid == TARGET_VID:
|
|
count+=1
|
|
if count == 1:
|
|
serial_port = p.device
|
|
print(f'=== detected BlueuIO {count}. port: {p.device} desc: {p.description} hwid: {p.hwid}')
|
|
if count>1:
|
|
print(f'warning: {count} BleueIO devices were found, the first found will be used')
|
|
print(f'(to override this choice, specify "--device =..." in optional arguments)')
|
|
return serial_port
|
|
|
|
print(f'Imported uxplay_beacon_module_BleuIO')
|