mirror of
https://github.com/morgan9e/macos-stats
synced 2026-04-14 00:04:15 +09:00
feat: moved Remote to the new communication protocol and added basic control
This commit is contained in:
@@ -11,6 +11,7 @@
|
||||
|
||||
import Foundation
|
||||
import Cocoa
|
||||
import CoreAudio
|
||||
|
||||
public protocol RemoteType {
|
||||
func remote() -> Data?
|
||||
@@ -19,6 +20,7 @@ public protocol RemoteType {
|
||||
public class Remote {
|
||||
public static let shared = Remote()
|
||||
static public var host = URL(string: "https://api.system-stats.com")! // https://api.system-stats.com http://localhost:8008
|
||||
static public var brokerHost = URL(string: "wss://broker.system-stats.com:8084/mqtt")!
|
||||
|
||||
public var monitoring: Bool {
|
||||
get { Store.shared.bool(key: "remote_monitoring", defaultValue: false) }
|
||||
@@ -26,6 +28,7 @@ public class Remote {
|
||||
Store.shared.set(key: "remote_monitoring", value: newValue)
|
||||
if newValue {
|
||||
self.start()
|
||||
self.registerDevice()
|
||||
} else if !self.control {
|
||||
self.stop()
|
||||
}
|
||||
@@ -37,6 +40,7 @@ public class Remote {
|
||||
Store.shared.set(key: "remote_control", value: newValue)
|
||||
if newValue {
|
||||
self.start()
|
||||
self.registerDevice()
|
||||
} else if !self.monitoring {
|
||||
self.stop()
|
||||
}
|
||||
@@ -47,14 +51,55 @@ public class Remote {
|
||||
public var auth: RemoteAuth = RemoteAuth()
|
||||
|
||||
private let log: NextLog
|
||||
private var ws: WebSocketManager = WebSocketManager()
|
||||
private var wsURL: URL?
|
||||
private var mqtt: MQTTManager = MQTTManager()
|
||||
private var isConnecting = false
|
||||
|
||||
private var lastSleepTime: Date?
|
||||
|
||||
struct Details: Codable {
|
||||
let client: Client
|
||||
let system: System
|
||||
let hardware: Hardware
|
||||
}
|
||||
|
||||
struct Client: Codable {
|
||||
let version: String
|
||||
let control: Bool
|
||||
}
|
||||
|
||||
struct OS: Codable {
|
||||
let name: String?
|
||||
let version: String?
|
||||
let build: String?
|
||||
}
|
||||
|
||||
struct System: Codable {
|
||||
let platform: String
|
||||
let vendor: String?
|
||||
let model: String?
|
||||
let modelID: String?
|
||||
let os: OS
|
||||
let arch: String?
|
||||
}
|
||||
|
||||
struct Hardware: Codable {
|
||||
let cpu: cpu_s?
|
||||
let gpu: [gpu_s]?
|
||||
let ram: [dimm_s]?
|
||||
let disk: [disk_s]?
|
||||
}
|
||||
|
||||
public init() {
|
||||
self.log = NextLog.shared.copy(category: "Remote")
|
||||
self.id = UUID(uuidString: Store.shared.string(key: "telemetry_id", defaultValue: UUID().uuidString)) ?? UUID()
|
||||
|
||||
self.mqtt.commandCallback = { [weak self] cmd, payload in
|
||||
self?.command(cmd: cmd, payload: payload)
|
||||
}
|
||||
self.mqtt.registerCallback = { [weak self] in
|
||||
self?.registerDevice()
|
||||
}
|
||||
|
||||
if self.auth.hasCredentials() {
|
||||
info("Found auth credentials for remote monitoring, starting Remote...", log: self.log)
|
||||
self.start()
|
||||
@@ -64,7 +109,7 @@ public class Remote {
|
||||
}
|
||||
|
||||
deinit {
|
||||
self.ws.disconnect()
|
||||
self.mqtt.disconnect()
|
||||
NotificationCenter.default.removeObserver(self, name: .remoteLoginSuccess, object: nil)
|
||||
}
|
||||
|
||||
@@ -82,20 +127,21 @@ public class Remote {
|
||||
public func logout() {
|
||||
self.auth.logout()
|
||||
self.isAuthorized = false
|
||||
self.ws.disconnect()
|
||||
self.mqtt.disconnect()
|
||||
debug("Logout successfully from Stats Remote", log: self.log)
|
||||
NotificationCenter.default.post(name: .remoteState, object: nil, userInfo: ["auth": self.isAuthorized])
|
||||
}
|
||||
|
||||
public func send(key: String, value: Any) {
|
||||
guard self.monitoring && self.isAuthorized, let v = value as? RemoteType, let data = v.remote() else { return }
|
||||
self.ws.send(key: key, data: data)
|
||||
let topic = "stats/\(self.id.uuidString)/metrics/\(key)"
|
||||
self.mqtt.publish(topic: topic, data: data)
|
||||
}
|
||||
|
||||
@objc private func successLogin() {
|
||||
self.isAuthorized = true
|
||||
NotificationCenter.default.post(name: .remoteState, object: nil, userInfo: ["auth": self.isAuthorized])
|
||||
self.ws.connect()
|
||||
self.mqtt.connect()
|
||||
debug("Login successfully on Stats Remote", log: self.log)
|
||||
}
|
||||
|
||||
@@ -107,15 +153,274 @@ public class Remote {
|
||||
NotificationCenter.default.post(name: .remoteState, object: nil, userInfo: ["auth": self.isAuthorized])
|
||||
|
||||
if status {
|
||||
self.ws.connect()
|
||||
self.mqtt.connect()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func stop() {
|
||||
self.ws.disconnect()
|
||||
self.mqtt.disconnect()
|
||||
NotificationCenter.default.post(name: .remoteState, object: nil, userInfo: ["auth": self.isAuthorized])
|
||||
}
|
||||
|
||||
public func terminate() {
|
||||
self.mqtt.sendStatus(false)
|
||||
self.mqtt.disconnect()
|
||||
}
|
||||
|
||||
private func registerDevice() {
|
||||
guard let url = URL(string: "\(Remote.host)/remote/device") else { return }
|
||||
|
||||
var request = URLRequest(url: url)
|
||||
request.httpMethod = "POST"
|
||||
request.setValue("application/json", forHTTPHeaderField: "Content-Type")
|
||||
request.setValue("Bearer \(Remote.shared.auth.accessToken)", forHTTPHeaderField: "Authorization")
|
||||
|
||||
struct RegisterPayload: Codable {
|
||||
let id: String
|
||||
let details: Remote.Details
|
||||
}
|
||||
|
||||
let payload = RegisterPayload(
|
||||
id: Remote.shared.id.uuidString,
|
||||
details: Remote.Details(
|
||||
client: Client(
|
||||
version: Bundle.main.infoDictionary?["CFBundleShortVersionString"] as? String ?? "Unknown",
|
||||
control: Remote.shared.control
|
||||
),
|
||||
system: Remote.System(
|
||||
platform: "macOS",
|
||||
vendor: "Apple",
|
||||
model: SystemKit.shared.device.model.name,
|
||||
modelID: SystemKit.shared.device.model.id,
|
||||
os: Remote.OS(
|
||||
name: SystemKit.shared.device.os?.name,
|
||||
version: SystemKit.shared.device.os?.version.getFullVersion(),
|
||||
build: SystemKit.shared.device.os?.build
|
||||
),
|
||||
arch: SystemKit.shared.device.arch
|
||||
),
|
||||
hardware: Remote.Hardware(
|
||||
cpu: SystemKit.shared.device.info.cpu,
|
||||
gpu: SystemKit.shared.device.info.gpu,
|
||||
ram: SystemKit.shared.device.info.ram?.dimms,
|
||||
disk: SystemKit.shared.device.info.disk
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
guard let body = try? JSONEncoder().encode(payload) else { return }
|
||||
request.httpBody = body
|
||||
|
||||
URLSession.shared.dataTask(with: request) { data, response, _ in
|
||||
guard let httpResponse = response as? HTTPURLResponse else { return }
|
||||
if httpResponse.statusCode == 200 {
|
||||
debug("Registered device: \(Remote.shared.id.uuidString)", log: self.log)
|
||||
} else {
|
||||
let bodyString = data.flatMap { String(data: $0, encoding: .utf8) } ?? ""
|
||||
debug("Register remote failed (\(httpResponse.statusCode)): \(bodyString)", log: self.log)
|
||||
}
|
||||
}.resume()
|
||||
}
|
||||
|
||||
private func command(cmd: String, payload: Data?) {
|
||||
guard self.control else { return }
|
||||
|
||||
debug("received command '\(cmd)' with payload: \(String(data: payload ?? Data(), encoding: .utf8) ?? "")", log: self.log)
|
||||
|
||||
switch cmd {
|
||||
case "disable": self.disableControl()
|
||||
case "sleep": self.sleep()
|
||||
case "volume":
|
||||
guard let payload else { return }
|
||||
let value = String(data: payload, encoding: .utf8)
|
||||
let step: Float32 = 0.0625
|
||||
switch value {
|
||||
case "up":
|
||||
if let current = self.getSystemVolume() {
|
||||
if self.isSystemMuted() {
|
||||
self.setSystemMute(false)
|
||||
} else {
|
||||
self.setSystemVolume(min(current + step, 1.0))
|
||||
}
|
||||
}
|
||||
case "down":
|
||||
if let current = self.getSystemVolume() {
|
||||
if self.isSystemMuted() {
|
||||
self.setSystemMute(false)
|
||||
} else {
|
||||
self.setSystemVolume(max(current - step, 0.0))
|
||||
}
|
||||
}
|
||||
case "mute":
|
||||
self.setSystemMute(true)
|
||||
case "unmute":
|
||||
self.setSystemMute(false)
|
||||
default: break
|
||||
}
|
||||
default: break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension Remote {
|
||||
func disableControl() {
|
||||
self.control = false
|
||||
}
|
||||
|
||||
func sleep() {
|
||||
let minInterval: TimeInterval = 300
|
||||
let now = Date()
|
||||
if let last = self.lastSleepTime, now.timeIntervalSince(last) < minInterval {
|
||||
debug("Sleep command ignored due to cooldown", log: self.log)
|
||||
return
|
||||
}
|
||||
self.lastSleepTime = now
|
||||
let process = Process()
|
||||
process.launchPath = "/usr/bin/pmset"
|
||||
process.arguments = ["sleepnow"]
|
||||
process.launch()
|
||||
}
|
||||
|
||||
func isSystemMuted() -> Bool {
|
||||
var defaultOutputDeviceID = AudioDeviceID(0)
|
||||
var propertyAddress = AudioObjectPropertyAddress(
|
||||
mSelector: kAudioHardwarePropertyDefaultOutputDevice,
|
||||
mScope: kAudioObjectPropertyScopeGlobal,
|
||||
mElement: kAudioObjectPropertyElementMain
|
||||
)
|
||||
var size = UInt32(MemoryLayout<AudioDeviceID>.size)
|
||||
let status = AudioObjectGetPropertyData(
|
||||
AudioObjectID(kAudioObjectSystemObject),
|
||||
&propertyAddress,
|
||||
0,
|
||||
nil,
|
||||
&size,
|
||||
&defaultOutputDeviceID
|
||||
)
|
||||
guard status == noErr else { return false }
|
||||
|
||||
propertyAddress = AudioObjectPropertyAddress(
|
||||
mSelector: kAudioDevicePropertyMute,
|
||||
mScope: kAudioDevicePropertyScopeOutput,
|
||||
mElement: 0
|
||||
)
|
||||
var muteValue: UInt32 = 0
|
||||
size = UInt32(MemoryLayout<UInt32>.size)
|
||||
let muteStatus = AudioObjectGetPropertyData(
|
||||
defaultOutputDeviceID,
|
||||
&propertyAddress,
|
||||
0,
|
||||
nil,
|
||||
&size,
|
||||
&muteValue
|
||||
)
|
||||
return muteStatus == noErr && muteValue == 1
|
||||
}
|
||||
|
||||
func setSystemMute(_ mute: Bool) {
|
||||
var defaultOutputDeviceID = AudioDeviceID(0)
|
||||
var propertyAddress = AudioObjectPropertyAddress(
|
||||
mSelector: kAudioHardwarePropertyDefaultOutputDevice,
|
||||
mScope: kAudioObjectPropertyScopeGlobal,
|
||||
mElement: kAudioObjectPropertyElementMain
|
||||
)
|
||||
var size = UInt32(MemoryLayout<AudioDeviceID>.size)
|
||||
let status = AudioObjectGetPropertyData(
|
||||
AudioObjectID(kAudioObjectSystemObject),
|
||||
&propertyAddress,
|
||||
0,
|
||||
nil,
|
||||
&size,
|
||||
&defaultOutputDeviceID
|
||||
)
|
||||
guard status == noErr else { return }
|
||||
|
||||
propertyAddress = AudioObjectPropertyAddress(
|
||||
mSelector: kAudioDevicePropertyMute,
|
||||
mScope: kAudioDevicePropertyScopeOutput,
|
||||
mElement: 0
|
||||
)
|
||||
var muteValue: UInt32 = mute ? 1 : 0
|
||||
AudioObjectSetPropertyData(
|
||||
defaultOutputDeviceID,
|
||||
&propertyAddress,
|
||||
0,
|
||||
nil,
|
||||
UInt32(MemoryLayout<UInt32>.size),
|
||||
&muteValue
|
||||
)
|
||||
}
|
||||
|
||||
func getSystemVolume() -> Float32? {
|
||||
var defaultOutputDeviceID = AudioDeviceID(0)
|
||||
var propertyAddress = AudioObjectPropertyAddress(
|
||||
mSelector: kAudioHardwarePropertyDefaultOutputDevice,
|
||||
mScope: kAudioObjectPropertyScopeGlobal,
|
||||
mElement: kAudioObjectPropertyElementMain
|
||||
)
|
||||
var size = UInt32(MemoryLayout<AudioDeviceID>.size)
|
||||
let status = AudioObjectGetPropertyData(
|
||||
AudioObjectID(kAudioObjectSystemObject),
|
||||
&propertyAddress,
|
||||
0,
|
||||
nil,
|
||||
&size,
|
||||
&defaultOutputDeviceID
|
||||
)
|
||||
guard status == noErr else { return nil }
|
||||
|
||||
propertyAddress = AudioObjectPropertyAddress(
|
||||
mSelector: kAudioDevicePropertyVolumeScalar,
|
||||
mScope: kAudioDevicePropertyScopeOutput,
|
||||
mElement: 0
|
||||
)
|
||||
var volume: Float32 = 0
|
||||
size = UInt32(MemoryLayout<Float32>.size)
|
||||
let volStatus = AudioObjectGetPropertyData(
|
||||
defaultOutputDeviceID,
|
||||
&propertyAddress,
|
||||
0,
|
||||
nil,
|
||||
&size,
|
||||
&volume
|
||||
)
|
||||
return volStatus == noErr ? volume : nil
|
||||
}
|
||||
|
||||
func setSystemVolume(_ volume: Float32) {
|
||||
var defaultOutputDeviceID = AudioDeviceID(0)
|
||||
var propertyAddress = AudioObjectPropertyAddress(
|
||||
mSelector: kAudioHardwarePropertyDefaultOutputDevice,
|
||||
mScope: kAudioObjectPropertyScopeGlobal,
|
||||
mElement: kAudioObjectPropertyElementMain
|
||||
)
|
||||
var size = UInt32(MemoryLayout<AudioDeviceID>.size)
|
||||
let status = AudioObjectGetPropertyData(
|
||||
AudioObjectID(kAudioObjectSystemObject),
|
||||
&propertyAddress,
|
||||
0,
|
||||
nil,
|
||||
&size,
|
||||
&defaultOutputDeviceID
|
||||
)
|
||||
guard status == noErr else { return }
|
||||
|
||||
propertyAddress = AudioObjectPropertyAddress(
|
||||
mSelector: kAudioDevicePropertyVolumeScalar,
|
||||
mScope: kAudioDevicePropertyScopeOutput,
|
||||
mElement: 0
|
||||
)
|
||||
var vol = max(0.0, min(1.0, volume))
|
||||
AudioObjectSetPropertyData(
|
||||
defaultOutputDeviceID,
|
||||
&propertyAddress,
|
||||
0,
|
||||
nil,
|
||||
UInt32(MemoryLayout<Float32>.size),
|
||||
&vol
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
public class RemoteAuth {
|
||||
@@ -339,17 +644,29 @@ public class RemoteAuth {
|
||||
}
|
||||
}
|
||||
|
||||
struct WebSocketMessage: Codable {
|
||||
let name: String
|
||||
let data: Data
|
||||
|
||||
enum CodingKeys: String, CodingKey {
|
||||
case name
|
||||
case data
|
||||
}
|
||||
struct MQTTMessage {
|
||||
let topic: String
|
||||
let payload: Data
|
||||
let qos: UInt8
|
||||
let retain: Bool
|
||||
}
|
||||
|
||||
class WebSocketManager: NSObject {
|
||||
enum MQTTPacketType: UInt8 {
|
||||
case connect = 1
|
||||
case connack = 2
|
||||
case publish = 3
|
||||
case puback = 4
|
||||
case subscribe = 8
|
||||
case suback = 9
|
||||
case pingreq = 12
|
||||
case pingresp = 13
|
||||
case disconnect = 14
|
||||
}
|
||||
|
||||
class MQTTManager: NSObject {
|
||||
public var registerCallback: (() -> Void)? = nil
|
||||
public var commandCallback: ((String, Data?) -> Void)? = nil
|
||||
|
||||
private var webSocket: URLSessionWebSocketTask?
|
||||
private var session: URLSession?
|
||||
private var isConnected = false
|
||||
@@ -358,9 +675,10 @@ class WebSocketManager: NSObject {
|
||||
private var pingTimer: Timer?
|
||||
private var reachability: Reachability = Reachability(start: true)
|
||||
private let log: NextLog
|
||||
private var packetIdentifier: UInt16 = 1
|
||||
|
||||
override init() {
|
||||
self.log = NextLog.shared.copy(category: "Remote WS")
|
||||
self.log = NextLog.shared.copy(category: "Remote MQTT")
|
||||
|
||||
super.init()
|
||||
|
||||
@@ -384,109 +702,263 @@ class WebSocketManager: NSObject {
|
||||
Remote.shared.auth.isAuthorized { [weak self] status in
|
||||
guard status, let self else { return }
|
||||
|
||||
var wsHost = Remote.host.absoluteString
|
||||
wsHost = wsHost.replacingOccurrences(of: "https", with: "wss").replacingOccurrences(of: "http", with: "ws")
|
||||
let url = URL(string: "\(wsHost)/remote?jwt=\(Remote.shared.auth.accessToken)&machine_id=\(Remote.shared.id.uuidString)")!
|
||||
|
||||
self.webSocket = self.session?.webSocketTask(with: url)
|
||||
self.webSocket = self.session?.webSocketTask(with: Remote.brokerHost, protocols: ["mqtt"])
|
||||
self.webSocket?.resume()
|
||||
self.receiveMessage()
|
||||
self.isDisconnected = false
|
||||
debug("connected successfully", log: self.log)
|
||||
debug("MQTT WebSocket connecting...", log: self.log)
|
||||
}
|
||||
}
|
||||
|
||||
public func disconnect() {
|
||||
if self.webSocket == nil && !self.isConnected { return }
|
||||
self.isDisconnected = true
|
||||
|
||||
if self.isConnected {
|
||||
self.sendDisconnect()
|
||||
self.sendStatus(false)
|
||||
}
|
||||
|
||||
self.webSocket?.cancel(with: .normalClosure, reason: nil)
|
||||
self.webSocket = nil
|
||||
self.isConnected = false
|
||||
debug("disconnected gracefully", log: self.log)
|
||||
self.stopPingTimer()
|
||||
debug("MQTT disconnected gracefully", log: self.log)
|
||||
}
|
||||
|
||||
private func reconnect() {
|
||||
guard !self.isDisconnected else { return }
|
||||
DispatchQueue.main.asyncAfter(deadline: .now() + self.reconnectDelay) { [weak self] in
|
||||
if let log = self?.log {
|
||||
debug("trying to reconnect after some interruption", log: log)
|
||||
debug("trying to reconnect MQTT after interruption", log: log)
|
||||
}
|
||||
self?.connect()
|
||||
}
|
||||
}
|
||||
|
||||
private func sendDetails() {
|
||||
struct Details: Codable {
|
||||
let version: String
|
||||
let system: System
|
||||
let hardware: Hardware
|
||||
public func sendStatus(_ value: Bool) {
|
||||
let status = value ? "online" : "offline"
|
||||
let topic = "stats/\(Remote.shared.id.uuidString)/status"
|
||||
let payload = status.data(using: .utf8)
|
||||
if let payload = payload {
|
||||
self.publish(topic: topic, data: payload)
|
||||
}
|
||||
|
||||
struct OS: Codable {
|
||||
let name: String?
|
||||
let version: String?
|
||||
let build: String?
|
||||
}
|
||||
|
||||
struct System: Codable {
|
||||
let platform: String
|
||||
let vendor: String?
|
||||
let model: String?
|
||||
let modelID: String?
|
||||
let os: OS
|
||||
let arch: String?
|
||||
}
|
||||
|
||||
struct Hardware: Codable {
|
||||
let cpu: cpu_s?
|
||||
let gpu: [gpu_s]?
|
||||
let ram: [dimm_s]?
|
||||
let disk: [disk_s]?
|
||||
}
|
||||
|
||||
let details = Details(
|
||||
version: Bundle.main.infoDictionary?["CFBundleShortVersionString"] as? String ?? "Unknown",
|
||||
system: System(
|
||||
platform: "macOS",
|
||||
vendor: "Apple",
|
||||
model: SystemKit.shared.device.model.name,
|
||||
modelID: SystemKit.shared.device.model.id,
|
||||
os: OS(
|
||||
name: SystemKit.shared.device.os?.name,
|
||||
version: SystemKit.shared.device.os?.version.getFullVersion(),
|
||||
build: SystemKit.shared.device.os?.build
|
||||
),
|
||||
arch: SystemKit.shared.device.arch
|
||||
),
|
||||
hardware: Hardware(
|
||||
cpu: SystemKit.shared.device.info.cpu,
|
||||
gpu: SystemKit.shared.device.info.gpu,
|
||||
ram: SystemKit.shared.device.info.ram?.dimms,
|
||||
disk: SystemKit.shared.device.info.disk,
|
||||
)
|
||||
)
|
||||
let jsonData = try? JSONEncoder().encode(details)
|
||||
self.send(key: "details", data: jsonData ?? Data())
|
||||
}
|
||||
|
||||
public func send(key: String, data: Data) {
|
||||
if !self.isConnected { return }
|
||||
let message = WebSocketMessage(name: key, data: data)
|
||||
guard let messageData = try? JSONEncoder().encode(message) else { return }
|
||||
self.webSocket?.send(.data(messageData)) { error in
|
||||
private func sendConnect() {
|
||||
let connectPacket = createConnectPacket(username: Remote.shared.id.uuidString, password: Remote.shared.auth.accessToken)
|
||||
self.webSocket?.send(.data(connectPacket)) { error in
|
||||
if let error = error {
|
||||
print("Error sending message: \(error)")
|
||||
print("Error sending MQTT CONNECT: \(error)")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func sendDisconnect() {
|
||||
let disconnectPacket = Data([MQTTPacketType.disconnect.rawValue << 4, 0])
|
||||
self.webSocket?.send(.data(disconnectPacket)) { _ in }
|
||||
}
|
||||
|
||||
private func sendPingRequest() {
|
||||
let pingPacket = Data([MQTTPacketType.pingreq.rawValue << 4, 0])
|
||||
self.webSocket?.send(.data(pingPacket)) { error in
|
||||
if let error = error {
|
||||
print("Error sending MQTT PINGREQ: \(error)")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public func publish(topic: String, data: Data) {
|
||||
guard self.isConnected else { return }
|
||||
|
||||
let publishPacket = createPublishPacket(topic: topic, payload: data)
|
||||
self.webSocket?.send(.data(publishPacket)) { error in
|
||||
if let error = error {
|
||||
print("Error publishing MQTT message: \(error)")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func subscribe(to topic: String) {
|
||||
guard self.isConnected else { return }
|
||||
|
||||
let subscribePacket = createSubscribePacket(topic: topic)
|
||||
self.webSocket?.send(.data(subscribePacket)) { error in
|
||||
if let error = error {
|
||||
print("Error subscribing to MQTT topic: \(error)")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func createConnectPacket(username: String, password: String) -> Data {
|
||||
var packet = Data()
|
||||
|
||||
// Fixed header - packet type only (remaining length will be added later)
|
||||
let fixedHeaderByte = MQTTPacketType.connect.rawValue << 4
|
||||
|
||||
// Variable header
|
||||
var variableHeader = Data()
|
||||
variableHeader.append(contentsOf: encodeString("MQTT"))
|
||||
variableHeader.append(4)
|
||||
|
||||
var connectFlags: UInt8 = 0x00 // Clean session
|
||||
connectFlags |= 0x80 // Username flag
|
||||
connectFlags |= 0x40 // Password flag
|
||||
variableHeader.append(connectFlags)
|
||||
variableHeader.append(contentsOf: [0x03, 0x84])
|
||||
|
||||
// Payload
|
||||
var payload = Data()
|
||||
payload.append(contentsOf: encodeString("stats-\(username)"))
|
||||
payload.append(contentsOf: encodeString(username))
|
||||
payload.append(contentsOf: encodeString(password))
|
||||
|
||||
let remainingLength = variableHeader.count + payload.count
|
||||
packet.append(fixedHeaderByte)
|
||||
packet.append(contentsOf: encodeRemainingLength(remainingLength))
|
||||
packet.append(variableHeader)
|
||||
packet.append(payload)
|
||||
|
||||
return packet
|
||||
}
|
||||
|
||||
private func createPublishPacket(topic: String, payload: Data) -> Data {
|
||||
var packet = Data()
|
||||
|
||||
// Fixed header - packet type only
|
||||
let fixedHeaderByte = (MQTTPacketType.publish.rawValue << 4) | 0x00 // QoS 0
|
||||
|
||||
// Variable header
|
||||
var variableHeader = Data()
|
||||
variableHeader.append(contentsOf: encodeString(topic))
|
||||
|
||||
// Calculate remaining length
|
||||
let remainingLength = variableHeader.count + payload.count
|
||||
|
||||
// Build final packet
|
||||
packet.append(fixedHeaderByte)
|
||||
packet.append(contentsOf: encodeRemainingLength(remainingLength))
|
||||
packet.append(variableHeader)
|
||||
packet.append(payload)
|
||||
|
||||
return packet
|
||||
}
|
||||
|
||||
private func createSubscribePacket(topic: String) -> Data {
|
||||
var packet = Data()
|
||||
|
||||
// Fixed header - packet type only
|
||||
let fixedHeaderByte = (MQTTPacketType.subscribe.rawValue << 4) | 0x02
|
||||
|
||||
// Variable header
|
||||
var variableHeader = Data()
|
||||
|
||||
// Packet identifier
|
||||
let packetId = self.getNextPacketId()
|
||||
variableHeader.append(contentsOf: [UInt8(packetId >> 8), UInt8(packetId & 0xFF)])
|
||||
|
||||
// Payload
|
||||
var payload = Data()
|
||||
payload.append(contentsOf: encodeString(topic))
|
||||
payload.append(0x00) // QoS 0
|
||||
|
||||
// Calculate remaining length
|
||||
let remainingLength = variableHeader.count + payload.count
|
||||
|
||||
// Build final packet
|
||||
packet.append(fixedHeaderByte)
|
||||
packet.append(contentsOf: encodeRemainingLength(remainingLength))
|
||||
packet.append(variableHeader)
|
||||
packet.append(payload)
|
||||
|
||||
return packet
|
||||
}
|
||||
|
||||
private func encodeString(_ string: String) -> [UInt8] {
|
||||
let data = string.data(using: .utf8) ?? Data()
|
||||
let length = data.count
|
||||
return [UInt8(length >> 8), UInt8(length & 0xFF)] + Array(data)
|
||||
}
|
||||
|
||||
private func encodeRemainingLength(_ length: Int) -> [UInt8] {
|
||||
var bytes: [UInt8] = []
|
||||
var remainingLength = length
|
||||
|
||||
repeat {
|
||||
var byte = UInt8(remainingLength % 128)
|
||||
remainingLength /= 128
|
||||
if remainingLength > 0 {
|
||||
byte |= 128
|
||||
}
|
||||
bytes.append(byte)
|
||||
} while remainingLength > 0
|
||||
|
||||
return bytes
|
||||
}
|
||||
|
||||
private func getNextPacketId() -> UInt16 {
|
||||
self.packetIdentifier += 1
|
||||
if self.packetIdentifier == 0 {
|
||||
self.packetIdentifier = 1
|
||||
}
|
||||
return self.packetIdentifier
|
||||
}
|
||||
|
||||
private func handleMQTTPacket(_ data: Data) {
|
||||
guard data.count >= 2 else { return }
|
||||
|
||||
let packetType = MQTTPacketType(rawValue: (data[0] >> 4) & 0x0F)
|
||||
|
||||
switch packetType {
|
||||
case .connack:
|
||||
self.handleConnAck(data)
|
||||
case .pingresp:
|
||||
break
|
||||
case .suback:
|
||||
break
|
||||
case .publish:
|
||||
self.processCommand(data)
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
private func handleConnAck(_ data: Data) {
|
||||
guard data.count >= 4 else { return }
|
||||
|
||||
let returnCode = data[3]
|
||||
if returnCode == 0 {
|
||||
self.isConnected = true
|
||||
self.startPingTimer()
|
||||
self.subscribeToControlTopics()
|
||||
self.sendStatus(true)
|
||||
debug("MQTT connected successfully", log: self.log)
|
||||
self.registerCallback?()
|
||||
} else {
|
||||
debug("MQTT connection failed with code: \(returnCode)", log: self.log)
|
||||
}
|
||||
}
|
||||
|
||||
private func subscribeToControlTopics() {
|
||||
let controlTopic = "stats/\(Remote.shared.id.uuidString)/control/+"
|
||||
self.subscribe(to: controlTopic)
|
||||
}
|
||||
|
||||
private func receiveMessage() {
|
||||
self.webSocket?.receive { [weak self] result in
|
||||
switch result {
|
||||
case .failure(let error):
|
||||
self?.isConnected = false
|
||||
self?.handleWebSocketError(error)
|
||||
case .success:
|
||||
case .success(let message):
|
||||
switch message {
|
||||
case .data(let data):
|
||||
self?.handleMQTTPacket(data)
|
||||
case .string:
|
||||
break
|
||||
@unknown default:
|
||||
break
|
||||
}
|
||||
self?.receiveMessage()
|
||||
}
|
||||
}
|
||||
@@ -494,8 +966,8 @@ class WebSocketManager: NSObject {
|
||||
|
||||
private func startPingTimer() {
|
||||
self.stopPingTimer()
|
||||
self.pingTimer = Timer.scheduledTimer(withTimeInterval: 10, repeats: true) { [weak self] _ in
|
||||
self?.ping()
|
||||
self.pingTimer = Timer.scheduledTimer(withTimeInterval: 30, repeats: true) { [weak self] _ in
|
||||
self?.sendPingRequest()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -504,13 +976,6 @@ class WebSocketManager: NSObject {
|
||||
self.pingTimer = nil
|
||||
}
|
||||
|
||||
private func ping() {
|
||||
self.webSocket?.sendPing { [weak self] _ in
|
||||
self?.isConnected = false
|
||||
self?.reconnect()
|
||||
}
|
||||
}
|
||||
|
||||
private func handleWebSocketError(_ error: Error) {
|
||||
if let urlError = error as? URLError, urlError.code.rawValue == 401 {
|
||||
Remote.shared.start()
|
||||
@@ -518,16 +983,51 @@ class WebSocketManager: NSObject {
|
||||
self.reconnect()
|
||||
}
|
||||
}
|
||||
|
||||
private func processCommand(_ data: Data) {
|
||||
var offset = 1
|
||||
while data[offset] & 0x80 != 0 { offset += 1 }
|
||||
offset += 1
|
||||
|
||||
guard data.count > offset + 1 else { return }
|
||||
let topicLength = Int(data[offset]) << 8 | Int(data[offset + 1])
|
||||
offset += 2
|
||||
|
||||
guard data.count >= offset + topicLength else { return }
|
||||
let topicData = data.subdata(in: offset..<(offset + topicLength))
|
||||
let topic = String(data: topicData, encoding: .utf8) ?? "<invalid topic>"
|
||||
offset += topicLength
|
||||
|
||||
let prefix = "stats/\(Remote.shared.id.uuidString)/control/"
|
||||
let commandName = topic.hasPrefix(prefix) ? String(topic.dropFirst(prefix.count)) : topic
|
||||
let payload = data.subdata(in: offset..<data.count)
|
||||
self.commandCallback?(commandName, payload)
|
||||
}
|
||||
}
|
||||
|
||||
extension WebSocketManager: URLSessionWebSocketDelegate {
|
||||
extension MQTTManager: URLSessionWebSocketDelegate {
|
||||
func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) {
|
||||
self.isConnected = true
|
||||
self.sendDetails()
|
||||
debug("MQTT WebSocket opened, sending CONNECT", log: self.log)
|
||||
self.sendConnect()
|
||||
}
|
||||
|
||||
func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) {
|
||||
self.isConnected = false
|
||||
self.stopPingTimer()
|
||||
self.sendStatus(false)
|
||||
debug("MQTT WebSocket closed", log: self.log)
|
||||
self.reconnect()
|
||||
}
|
||||
|
||||
func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
|
||||
if let error = error {
|
||||
if let response = task.response as? HTTPURLResponse {
|
||||
let statusCode = response.statusCode
|
||||
let headers = response.allHeaderFields
|
||||
debug("MQTT WebSocket failed: \(error.localizedDescription), status: \(statusCode), headers: \(headers)", log: self.log)
|
||||
} else {
|
||||
debug("MQTT WebSocket failed: \(error.localizedDescription)", log: self.log)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user