diff --git a/Bluetooth_LE_beacon/uxplay-beacon.py b/Bluetooth_LE_beacon/uxplay-beacon.py index 107be26..217a824 100644 --- a/Bluetooth_LE_beacon/uxplay-beacon.py +++ b/Bluetooth_LE_beacon/uxplay-beacon.py @@ -8,15 +8,6 @@ import sys if not sys.version_info >= (3,6): print("uxplay-beacon.py requires Python 3.6 or higher") - -import gi -try: - from gi.repository import GLib -except ImportError as e: - print(f'ImportError: {e}, failed to import GLib from Python GObject Introspection Library ("gi")') - print('Install PyGObject pip3 install PyGobject==3.50.0') - print(f'You may need to use pip option "--break-system-packages" (disregard the warning)') - raise SystemExit(1) import importlib import argparse @@ -26,6 +17,7 @@ import struct import socket import time import platform +import ipaddress try: import psutil @@ -47,6 +39,13 @@ index = None windows = 'Windows' linux = 'Linux' os_name = platform.system() +mainloop = None + +# BLE modules +BLEUIO = 'BleuIO' +WINRT = 'winrt' +BLUEZ = 'BlueZ' +HCI = 'HCI' # external functions that must be supplied by loading a module: from typing import Optional @@ -63,6 +62,10 @@ def find_device(device: Optional[str]) -> Optional[str]: return None #internal functions +def exit(err_text): + print(err_text) + raise SystemExit(1) + def start_beacon(): global beacon_is_running global port @@ -71,20 +74,12 @@ def start_beacon(): global advmax global index if beacon_is_running: - print(f'code error, should not happen') - raise SystemExit(1) + exit('code error, should not happen') setup_beacon(ipv4_str, port, advmin, advmax, index) advertised_port = beacon_on() beacon_is_running = advertised_port is not None - count = 1 - while not beacon_is_running: - print(f'Failed attempt {count} to start beacon:') - advertised_port = beacon_on() - beacon_is_running = advertised_port is not None - count += 1 - if count > 5: - print(f'Giving up, check Bluetooth adapter') - raise SystemExit(1) + if not beacon_is_running: + exit('Failed to start beacon:\ngiving up, check Bluetooth adapter') def stop_beacon(): global beacon_is_running @@ -176,6 +171,7 @@ def check_file_exists(file_path): def on_timeout(file_path): check_file_exists(file_path) + check_pending() return True def main(file_path_in, ipv4_str_in, advmin_in, advmax_in, index_in): @@ -183,7 +179,6 @@ def main(file_path_in, ipv4_str_in, advmin_in, advmax_in, index_in): global advmin global advmax global index - global beacon_is_running file_path = file_path_in ipv4_str = ipv4_str_in advmin = advmin_in @@ -192,22 +187,34 @@ def main(file_path_in, ipv4_str_in, advmin_in, advmax_in, index_in): try: while True: - GLib.timeout_add_seconds(1, on_timeout, file_path) - GLib.timeout_add(200, check_pending) - mainloop = GLib.MainLoop() - mainloop.run() + if mainloop is not None: + # the BleuZ module is being used, needs a GLib mainloop + GLib.timeout_add_seconds(1, on_timeout, file_path) + mainloop.run() + else: + on_timeout(file_path) + time.sleep(1) + except KeyboardInterrupt: + if mainloop is not None: + mainloop.quit() # "just in case, but often redundant, if GLib's SIGINT handler aready quit the loop" print(f'') if beacon_is_running: stop_beacon() print(f'Exiting ...') sys.exit(0) +def is_valid_ipv4(ipv4_str): + try: + ipaddress.IPv4Address(ipv4_str) + return True + except ipaddress.AddressValueError: + return False + def get_ipv4(): if os_name is windows: ipv4 = socket.gethostbyname(socket.gethostname()) return ipv4 - try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) @@ -225,17 +232,11 @@ def get_ipv4(): try: ipv4 = socket.gethostbyname(socket.gethostname()+".local") except socket_error: - print(f"failed to obtain local ipv4 address: enter it with option --ipv4 ... ") - raise SystemExit(1) + exit("failed to obtain local ipv4 address: enter it with option --ipv4 ... ") return ipv4 -if __name__ == '__main__': +def parse_params(): - ble_bluez = "bluez" - ble_winrt = "winrt" - ble_bleuio = "bleuio" - ble_hci = "hci" - # Create an ArgumentParser object epilog_text = ''' Example: python beacon.py --ipv4 192.168.1.100 --advmax 200 --path = ~/my_ble @@ -257,20 +258,15 @@ if __name__ == '__main__': home_dir = os.environ.get('HOME') if home_dir is None: home_dir = os.path.expanduser("~") - default_file = home_dir+"/.uxplay.beacon" - default_ipv4 = "gethostbyname" - - # BLE modules - bleuio = 'BleuIO' - winrt = 'winrt' - bluez = 'BlueZ' - hci = 'HCI' + default_config_file = home_dir+"/.uxplay.beacon" + + optional_modules = [BLEUIO, HCI] # Add arguments parser.add_argument( 'ble_type', nargs='?', - choices=[bleuio, hci, None], + choices=optional_modules + [None], help=textwrap.dedent(''' Allows choice of alternative Bluetooth implementations, supporting the BleuIO USB Bluetooth LE serial device, and direct access to the Bluetooth Host @@ -284,7 +280,7 @@ if __name__ == '__main__': parser.add_argument( '--file', type=str, - default= default_file, + default=None, help='beacon startup file (Default: ~/.uxplay.beacon).' ) @@ -297,26 +293,26 @@ if __name__ == '__main__': parser.add_argument( '--ipv4', type=str, - default=default_ipv4, + default=None, help='ipv4 address of AirPlay server (default: use gethostbyname).' ) parser.add_argument( '--advmin', - type=str, + type=int, default=None, help='The minimum Advertising Interval (>= 100) units=msec, (default 100, BlueZ, BleuIO only).' ) parser.add_argument( '--advmax', - type=str, + type=int, default=None, help='The maximum Advertising Interval (>= advmin, <= 10240) units=msec, (default 100, BlueZ, BleuIO only).' ) parser.add_argument( '--index', - type=str, + type=int, default=None, help='use index >= 0 to distinguish multiple AirPlay Service Discovery beacons, (default 0, BlueZ only). ' ) @@ -329,8 +325,8 @@ if __name__ == '__main__': ) # script input arguments - ble_type = None config_file = None + ble_type = None path = None ipv4_str = None advmin = None @@ -342,14 +338,14 @@ if __name__ == '__main__': args = parser.parse_args() # look for a configuration file - if args.file != default_file: + if args.file is not None: if os.path.isfile(args.file): config_file = args.file else: - print ("optional argument --file ", args.file, "does not point to a valid file") - raise SystemExit(1) - if config_file is None and os.path.isfile(default_file): - config_file = default_file + err = f'optional argument --file "{args.file}" does not point to a valid file' + exit(err) + if config_file is None and os.path.isfile(default_config_file): + config_file = default_config_file # read configuration file,if present if config_file is not None: @@ -357,59 +353,56 @@ if __name__ == '__main__': try: with open(config_file, 'r') as file: for line in file: - stripped_line = line.strip() - if stripped_line.startswith('#'): + if line.startswith('#'): continue + err = f'Invalid line "{line}" in configuration file' + stripped_line = line.strip() parts = stripped_line.partition(" ") - part0 = parts[0] - part2 = parts[2] - key = part0.strip() - value = part2.strip() + key = parts[0].strip() + value = parts[2].strip() if value == "": - if key != ble_bluez and key != ble_winrt and key != ble_bleuio: - print('invalid line "',stripped_line,'" in configuration file ',config_file) - raise SystemExit(1) - else: - if ble_type is None: - ble_type = stripped_line - continue - elif key == "--path": + if not key in optional_modules: + exit(err) + ble_type = key + continue + if key == "--path": path = value + continue elif key == "--ipv4": + if not is_valid_ipv4(value): + print(f'{value} is not a valid IPv4 address') + exit(err) ipv4_str = value + continue elif key == "--advmin": - if value.isdigit(): - advmin = int(value) - else: - print(f'Invalid config file input (--advmin) {value} in {args.file}') - raise SystemExit(1) + if not value.isdigit(): + exit(err) + advmin = int(value) + continue elif key == "--advmax": - if value.isdigit(): - advmax = int(value) - else: - print(f'Invalid config file input (--advmax) {value} in {args.file}') - raise SystemExit(1) + if not value.isdigit(): + exit(err) + advmax = int(value) + continue elif key == "--index": - if value.isdigit(): - index = int(value) - else: - print(f'Invalid config file input (--index) {value} in {args.file}') - raise SystemExit(1) - elif key == "--device": - device_address = value - else: - print(f'Unknown key "{key}" in config file {args.file}') - raise SystemExit(1) + if not value.isdigit(): + exit(err) + index = int(value) + continue + elif key == '--device': + device_address = value + continue + else: + exit(err) except FileNotFoundError: - print(f'the configuration file {config_file} was not found') - raise SystemExit(1) - except IOError: - print(f'IOError when reading configuration file {config_file}') - raise SystemExit(1) + err = f'the configuration file {config_file} was not found' except PermissionError: - print('fPermissionError when trying to read configuration file {config_file}') - raise SystemExit(1) - + err = f'PermissionError when trying to read configuration file {config_file}' + except IOError: + err = f'IOError when reading configuration file {config_file}' + finally: + exit(err) + # overwrite configuration file entries with command line entries if args.ble_type is not None: ble_type = args.ble_type @@ -417,6 +410,9 @@ if __name__ == '__main__': path = args.path if args.ipv4 is not None: ipv4_str = args.ipv4 + if not is_valid_ipv4(ipv4_str): + err = f'{ipv4_str} is not a valid IPv4 address' + exit(err) if args.advmin is not None: advmin = args.advmin if args.advmax is not None: @@ -426,91 +422,111 @@ if __name__ == '__main__': if args.device is not None: device_address = args.device - # process arguments, exclude values not used by ble_type + # determine which Bluetooth LE module will be used if ble_type is None: if os_name == windows: - ble_type = winrt + ble_type = WINRT elif os_name == linux: - ble_type = bluez + ble_type = BLUEZ else: - ble_type = bleuio - if ipv4_str == default_ipv4: + ble_type = BLEUIO + + # IPV4 address + if ipv4_str is None: ipv4_str = get_ipv4() if ipv4_str is None: - print(f'Failed to obtain Server IPv4 address with gethostbyname: provide it with option --ipv4') - raise SystemExit(1) + exit('Failed to obtain Server IPv4 address with gethostbyname: provide it with option --ipv4') + + #AdvMin, AdvMax if advmin is not None: - if ble_type == winrt: + if ble_type == WINRT: advmin = None print(f' --advmin option is not used when ble_type = {ble_type}') - else: + elif ble_type != WINRT: advmin = 100 #default value if advmax is not None: - if ble_type == winrt: + if ble_type == WINRT: advmax = None print(f' --advmax option is not used when ble_type = {ble_type}') - else: + elif ble_type != WINRT: advmax = 100 #default value - if ble_type == winrt: - advmin = None - advmax = None + + #index (BLEUZ only) if index is not None: - if ble_type != bluez: + if ble_type != BLUEZ: index = None print(f' --index option is not used when ble_type = {ble_type}') - else: + elif ble_type == BLUEZ: index = 0 #default value - if ble_type != bluez: - index = None + + #device_address (BLEUIO, HCI only) if device_address is not None: - if ble_type == bluez or ble_type == winrt: + if ble_type == BLUEZ or ble_type == WINRT: device_address = None print(f' --device option is not used when ble_type = {ble_type}') - + + return [ble_type, path, ipv4_str, advmin, advmax, index, device_address] + +if __name__ == '__main__': + #global mainloop + + #parse input options + [ble_type, path, ipv4_str, advmin, advmax, index, device_address] = parse_params() + + if ble_type == BLUEZ: + # a GLib mainloop is required by the BlueZ module + import gi + try: + from gi.repository import GLib + mainloop = GLib.MainLoop() + except ImportError as e: + print(f'ImportError: {e}, failed to import GLib from Python GObject Introspection Library ("gi")') + print('Install PyGObject pip3 install PyGobject==3.50.0') + exit('You may need to use pip option "--break-system-packages" (disregard the warning)') + # import module for chosen ble_type module = f'uxplay_beacon_module_{ble_type}' print(f'Will use BLE module {module}.py') try: ble = importlib.import_module(module) except ImportError as e: - print(f'Failed to import {module}: {e}') - raise SystemExit(1) + err =f'Failed to import {module}: {e}' + exit(err) setup_beacon = ble.setup_beacon beacon_on = ble.beacon_on beacon_off = ble.beacon_off - need_device = False - if ble_type == bleuio or ble_type == hci: - # obtain serial port for BleuIO device + need_device = False + if ble_type == BLEUIO or ble_type == HCI: + # obtain serial port for BleuIO device, or a Bluetooth >= 4.0 HCI device for HCI module find_device = ble.find_device need_device = True if need_device: use_device = find_device(device_address) if use_device is None: - print(f'No devices needed for BLE type {ble_type} were found') - raise SystemExit(1) + err = f'No devices needed for BLE type {ble_type} were found' + exit(err) if device_address is not None and use_device != device_address: print(f'Error: A required device was NOT found at {device_address} given as an optional argument') - print(f'(however required devices WERE found and are listed above') - raise SystemExit(1) + exit('(Note: required devices WERE found and are listed above)') print(f'using the required device found at {use_device}') - - #start beacon - advminmax = f'' - indx = f'' - if ble_type != winrt: - advminmax = f'[advmin:advmax]={advmin}:{advmax}' - if ble_type == bluez: - indx = f'index {index}' - test = None - if ble_type == winrt or ble_type == bluez: + else: + #start beacon as test to see if Bluetooth is available, (WINRT and BLUEZ) + test = None # initial test to see if Bluetooth is available setup_beacon(ipv4_str, 1, advmin, advmax, index) test = beacon_on() beacon_off() - if test is not None: - print(f"test passed") + if test is not None: + print(f"test passed ({ble_type}") + + advminmax = f'' + indx = f'' + if ble_type != WINRT: + advminmax = f'[advmin:advmax]={advmin}:{advmax}' + if ble_type == BLUEZ: + indx = f'index {index}' print(f'AirPlay Service-Discovery Bluetooth LE beacon: BLE file {path} {advminmax} {indx}') print(f'Advertising IP address {ipv4_str}') print(f'(Press Ctrl+C to exit)')