mirror of
https://github.com/morgan9e/UxPlay
synced 2026-04-14 00:04:13 +09:00
uxplay-beacon.py python cleanups (dbus and winrt)
This commit is contained in:
@@ -9,7 +9,7 @@ UxPlay 1.72: Standalone Python Script for Bluetooth LE Service Discovery (DBus).
|
||||
.SH OPTIONS
|
||||
.TP
|
||||
.B
|
||||
\fB\--file\fR fn Specify alternate configuration file
|
||||
\fB\--file\fR fn Specify configuration file (default: ~/.uxplay.beacon)
|
||||
.TP
|
||||
\fB\--path\fR fn Specify non-default Bluetooth LE data file used by uxplay
|
||||
.TP
|
||||
@@ -24,11 +24,9 @@ UxPlay 1.72: Standalone Python Script for Bluetooth LE Service Discovery (DBus).
|
||||
\fB \-h, --help\fR Show help text.
|
||||
.SH
|
||||
FILES
|
||||
Options in beacon configuration file ~/.uxplay.beacon
|
||||
Options in configuration file are applied first (command-line options may modify them).
|
||||
.TP
|
||||
are applied first (command-line options may modify them). Format:
|
||||
.TP
|
||||
one option per line, with initial "--"; lines beginning with "#" ignored.
|
||||
Format: one option per line, with initial "--"; lines beginning with "#" ignored.
|
||||
.SH
|
||||
AUTHORS
|
||||
.TP
|
||||
|
||||
@@ -19,7 +19,8 @@ import dbus.service
|
||||
|
||||
ad_manager = None
|
||||
airplay_advertisement = None
|
||||
server_address = None
|
||||
advertised_port = None
|
||||
advertised_address = None
|
||||
|
||||
BLUEZ_SERVICE_NAME = 'org.bluez'
|
||||
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
|
||||
@@ -125,15 +126,17 @@ class AirPlayAdvertisement(AirPlay_Service_Discovery_Advertisement):
|
||||
|
||||
|
||||
def register_ad_cb():
|
||||
global server_address
|
||||
print(f'AirPlay Service_Discovery Advertisement ({server_address}) registered')
|
||||
print(f'AirPlay Service_Discovery Advertisement ({advertised_address}:{advertised_port}) registered')
|
||||
|
||||
|
||||
def register_ad_error_cb(error):
|
||||
print(f'Failed to register advertisement: {error}')
|
||||
global ad_manager
|
||||
global advertised_port
|
||||
global advertised_address
|
||||
ad_manager = None
|
||||
|
||||
advertised_port = None
|
||||
advertised_address = None
|
||||
|
||||
def find_adapter(bus):
|
||||
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
|
||||
@@ -150,8 +153,9 @@ def find_adapter(bus):
|
||||
def setup_beacon(ipv4_str, port, advmin, advmax, index):
|
||||
global ad_manager
|
||||
global airplay_advertisement
|
||||
global server_address
|
||||
server_address = f"{ipv4_str}:{port}"
|
||||
global advertised_address
|
||||
advertised_port = port
|
||||
advertised_address = ipv4_str
|
||||
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
||||
bus = dbus.SystemBus()
|
||||
adapter = find_adapter(bus)
|
||||
@@ -168,7 +172,6 @@ def setup_beacon(ipv4_str, port, advmin, advmax, index):
|
||||
airplay_advertisement = AirPlayAdvertisement(bus, index, ipv4_str, port, advmin, advmax)
|
||||
|
||||
def beacon_on():
|
||||
global ad_manager
|
||||
global airplay_advertisement
|
||||
ad_manager.RegisterAdvertisement(airplay_advertisement.get_path(), {},
|
||||
reply_handler=register_ad_cb,
|
||||
@@ -182,14 +185,25 @@ def beacon_on():
|
||||
def beacon_off():
|
||||
global ad_manager
|
||||
global airplay_advertisement
|
||||
global advertised_port
|
||||
global advertised_address
|
||||
ad_manager.UnregisterAdvertisement(airplay_advertisement)
|
||||
print(f'AirPlay Service-Discovery beacon advertisement unregistered')
|
||||
ad_manager = None
|
||||
dbus.service.Object.remove_from_connection(airplay_advertisement)
|
||||
airplay_advertisement = None
|
||||
|
||||
advertised_Port = None
|
||||
advertised_address = None
|
||||
|
||||
|
||||
#==generic code (non-dbus) below here =============
|
||||
|
||||
def check_port(port):
|
||||
if advertised_port is None or port == advertised_port:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
@@ -238,7 +252,6 @@ def check_process_name(pid, pname):
|
||||
return False
|
||||
|
||||
def check_pending():
|
||||
global beacon_is_running
|
||||
global beacon_is_pending_on
|
||||
global beacon_is_pending_off
|
||||
if beacon_is_running:
|
||||
@@ -254,38 +267,50 @@ def check_pending():
|
||||
|
||||
def check_file_exists(file_path):
|
||||
global port
|
||||
global beacon_is_running
|
||||
global beacon_is_pending_on
|
||||
global beacon_is_pending_off
|
||||
|
||||
if os.path.exists(file_path):
|
||||
with open(file_path, 'rb') as file:
|
||||
data = file.read(2)
|
||||
port = struct.unpack('<H', data)[0]
|
||||
data = file.read(4)
|
||||
pid = struct.unpack('<I', data)[0]
|
||||
if not pid_is_running(pid):
|
||||
file.close()
|
||||
test = False
|
||||
else :
|
||||
data = file.read()
|
||||
file.close()
|
||||
pname = data.split(b'\0',1)[0].decode('utf-8')
|
||||
last_element_of_pname = os.path.basename(pname)
|
||||
test = check_process_name(pid, last_element_of_pname)
|
||||
if test == True:
|
||||
if not beacon_is_running:
|
||||
beacon_is_pending_on = True
|
||||
pname = "process name unread"
|
||||
if os.path.isfile(file_path):
|
||||
test = True
|
||||
try:
|
||||
with open(file_path, 'rb') as file:
|
||||
data = file.read(2)
|
||||
port = struct.unpack('<H', data)[0]
|
||||
data = file.read(4)
|
||||
pid = struct.unpack('<I', data)[0]
|
||||
if not pid_is_running(pid):
|
||||
file.close()
|
||||
test = False
|
||||
if test:
|
||||
data = file.read()
|
||||
file.close()
|
||||
pname = data.split(b'\0',1)[0].decode('utf-8')
|
||||
last_element_of_pname = os.path.basename(pname)
|
||||
test = check_process_name(pid, last_element_of_pname)
|
||||
except IOError:
|
||||
test = False
|
||||
except FileNotFoundError:
|
||||
test = False
|
||||
if test:
|
||||
if not beacon_is_running:
|
||||
beacon_is_pending_on = True
|
||||
else:
|
||||
print(f'orphan beacon file {file_path} exists, but process {pname} (pid {pid}) is no longer active')
|
||||
try:
|
||||
os.remove(file_path)
|
||||
print(f'File "{file_path}" deleted successfully.')
|
||||
except FileNotFoundError:
|
||||
print(f'File "{file_path}" not found.')
|
||||
if beacon_is_running:
|
||||
if not check_port(port):
|
||||
# uxplay is active, and beacon is running but is advertising a different port, so shut it down
|
||||
beacon_is_pending_off = True
|
||||
else:
|
||||
else:
|
||||
print(f'Orphan beacon file exists, but process pid {pid} ({pname}) is no longer active')
|
||||
try:
|
||||
os.remove(file_path)
|
||||
print(f'Orphan beacon file "{file_path}" deleted successfully.')
|
||||
except FileNotFoundError:
|
||||
print(f'File "{file_path}" not found.')
|
||||
except PermissionError as e:
|
||||
print(f'Permission Errror {e}: cannot delete "{file_path}".')
|
||||
if beacon_is_running:
|
||||
beacon_is_pending_off = True
|
||||
|
||||
else: #BLE file does not exist
|
||||
if beacon_is_running:
|
||||
beacon_is_pending_off = True
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ UxPlay 1.72: Standalone Python Script for Bluetooth LE Service Discovery (Window
|
||||
.SH OPTIONS
|
||||
.TP
|
||||
.B
|
||||
\fB\--file\fR fn Specify alternate configuration file
|
||||
\fB\--file\fR fn Specify configuration file (default: ~/.uxplay.beacon)
|
||||
.TP
|
||||
\fB\--path\fR fn Specify non-default Bluetooth LE data file used by uxplay
|
||||
.TP
|
||||
@@ -18,11 +18,9 @@ UxPlay 1.72: Standalone Python Script for Bluetooth LE Service Discovery (Window
|
||||
\fB \-h, --help\fR Show help text.
|
||||
.SH
|
||||
FILES
|
||||
Options in beacon configuration file ~/.uxplay.beacon
|
||||
Options in configuration file are applied first (command-line options may modify them).
|
||||
.TP
|
||||
are applied first (command-line options may modify them). Format:
|
||||
.TP
|
||||
one option per line, with initial "--"; lines beginning with "#" ignored.
|
||||
Format: one option per line, with initial "--"; lines beginning with "#" ignored.
|
||||
.SH
|
||||
AUTHORS
|
||||
.TP
|
||||
|
||||
@@ -36,6 +36,8 @@ import asyncio
|
||||
|
||||
#global variables used by winrt.windows.devices.bluetooth.advertisement code
|
||||
publisher = None
|
||||
advertised_port = None
|
||||
advertised_address = None
|
||||
|
||||
def on_status_changed(sender, args):
|
||||
global publisher
|
||||
@@ -60,40 +62,56 @@ def create_airplay_service_discovery_advertisement_publisher(ipv4_str, port):
|
||||
advertisement = ble_adv.BluetoothLEAdvertisement()
|
||||
advertisement.manufacturer_data.append(manufacturer_data)
|
||||
global publisher
|
||||
global advertised_port
|
||||
global advertised_address
|
||||
publisher = ble_adv.BluetoothLEAdvertisementPublisher(advertisement)
|
||||
advertised_port = port
|
||||
advertised_address = ipv4_str
|
||||
publisher.add_status_changed(on_status_changed)
|
||||
|
||||
async def publish_advertisement():
|
||||
global advertised_port
|
||||
global advertised_address
|
||||
try:
|
||||
publisher.start()
|
||||
print(f"Publisher started successfully")
|
||||
print(f"AirPlay Service_Discovery Advertisement ({advertised_address}:{advertised_port}) registered")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Failed to start Publisher: {e}")
|
||||
print(f"Publisher Status: {publisher.status.name}")
|
||||
advertised_address = None
|
||||
advertised_port = None
|
||||
|
||||
|
||||
def setup_beacon(ipv4_str, port):
|
||||
#index will be ignored
|
||||
print(f"setup_beacon for {ipv4_str}:{port}")
|
||||
create_airplay_service_discovery_advertisement_publisher(ipv4_str, port)
|
||||
|
||||
def beacon_on():
|
||||
global publisher
|
||||
try:
|
||||
asyncio.run( publish_advertisement())
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Failed to start publisher: {e}")
|
||||
global publisher
|
||||
publisher = None
|
||||
return False
|
||||
|
||||
|
||||
def beacon_off():
|
||||
global publisher
|
||||
publisher.stop()
|
||||
|
||||
global advertised_port
|
||||
global advertised_address
|
||||
advertised_port = None
|
||||
advertised_address = None
|
||||
|
||||
#==generic code (non-winrt) below here =============
|
||||
|
||||
def check_port(port):
|
||||
if advertised_port is None or port == advertised_port:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
@@ -112,8 +130,6 @@ ipv4_str = "ipv4_address"
|
||||
|
||||
def start_beacon():
|
||||
global beacon_is_running
|
||||
global port
|
||||
global ipv4_str
|
||||
setup_beacon(ipv4_str, port)
|
||||
beacon_is_running = beacon_on()
|
||||
|
||||
@@ -136,7 +152,6 @@ def check_process_name(pid, pname):
|
||||
return False
|
||||
|
||||
def check_pending():
|
||||
global beacon_is_running
|
||||
global beacon_is_pending_on
|
||||
global beacon_is_pending_off
|
||||
if beacon_is_running:
|
||||
@@ -148,41 +163,57 @@ def check_pending():
|
||||
start_beacon()
|
||||
beacon_is_pending_on = False
|
||||
return True
|
||||
|
||||
|
||||
|
||||
def check_file_exists(file_path):
|
||||
global port
|
||||
global beacon_is_running
|
||||
global beacon_is_pending_on
|
||||
global beacon_is_pending_off
|
||||
|
||||
if os.path.exists(file_path):
|
||||
with open(file_path, 'rb') as file:
|
||||
data = file.read(2)
|
||||
port = struct.unpack('<H', data)[0]
|
||||
data = file.read(4)
|
||||
pid = struct.unpack('<I', data)[0]
|
||||
if not pid_is_running(pid):
|
||||
file.close()
|
||||
test = False
|
||||
pname = "process name unread"
|
||||
if os.path.isfile(file_path):
|
||||
test = True
|
||||
try:
|
||||
with open(file_path, 'rb') as file:
|
||||
data = file.read(2)
|
||||
port = struct.unpack('<H', data)[0]
|
||||
data = file.read(4)
|
||||
pid = struct.unpack('<I', data)[0]
|
||||
if not pid_is_running(pid):
|
||||
file.close()
|
||||
test = False
|
||||
if test:
|
||||
data = file.read()
|
||||
file.close()
|
||||
pname = data.split(b'\0',1)[0].decode('utf-8')
|
||||
last_element_of_pname = os.path.basename(pname)
|
||||
test = check_process_name(pid, last_element_of_pname)
|
||||
except IOError:
|
||||
test = False
|
||||
except FileNotFoundError:
|
||||
test = False
|
||||
if test:
|
||||
if not beacon_is_running:
|
||||
beacon_is_pending_on = True
|
||||
else:
|
||||
data = file.read()
|
||||
file.close()
|
||||
pname = data.split(b'\0',1)[0].decode('utf-8')
|
||||
last_element_of_pname = os.path.basename(pname)
|
||||
test = check_process_name(pid, last_element_of_pname)
|
||||
if test == True:
|
||||
if not beacon_is_running:
|
||||
beacon_is_pending_on = True
|
||||
else:
|
||||
if beacon_is_running:
|
||||
print(f'orphan beacon file {file_path} exists, but process {pname} (pid {pid}) is no longer active')
|
||||
# PermissionError prevents deletion of orphan beacon files in Windows systems
|
||||
if not check_port(port):
|
||||
# uxplay is active, and beacon is running but is advertising a different port, so shut it down
|
||||
beacon_is_pending_off = True
|
||||
else:
|
||||
else:
|
||||
print(f'Orphan beacon file exists, but process pid {pid} ({pname}) is no longer active')
|
||||
try:
|
||||
os.remove(file_path)
|
||||
print(f'Orphan beacon file "{file_path}" deleted successfully.')
|
||||
except FileNotFoundError:
|
||||
print(f'File "{file_path}" not found.')
|
||||
except PermissionError as e:
|
||||
print(f'Permission Errror {e}: cannot delete "{file_path}".')
|
||||
if beacon_is_running:
|
||||
beacon_is_pending_off = True
|
||||
|
||||
else: #BLE file does not exist
|
||||
if beacon_is_running:
|
||||
beacon_is_pending_off = True
|
||||
|
||||
|
||||
def on_timeout(file_path):
|
||||
check_file_exists(file_path)
|
||||
return True
|
||||
|
||||
Reference in New Issue
Block a user