#!/usr/bin/python3 -sP """ batteryd — battery tracking daemon Subscribes to UPower's PropertiesChanged on the battery device, logs percentage + energy_wh + power_w on each change. Stores in SQLite for long-term retention. """ import logging import os import signal import sqlite3 import time from datetime import datetime import gi gi.require_version('Gio', '2.0') gi.require_version('GLib', '2.0') from gi.repository import Gio, GLib log = logging.getLogger('batteryd') DB_DIR = os.path.join(GLib.get_user_data_dir(), 'batteryd') DB_PATH = os.path.join(DB_DIR, 'battery.db') FLUSH_INTERVAL = 5 * 60 MIN_SAMPLE_INTERVAL = 10 UPOWER_BUS = 'org.freedesktop.UPower' UPOWER_DEVICE_IFACE = 'org.freedesktop.UPower.Device' DBUS_PROPS_IFACE = 'org.freedesktop.DBus.Properties' STATE_NAMES = { 0: 'Unknown', 1: 'Charging', 2: 'Discharging', 3: 'Empty', 4: 'Full', 5: 'PendingCharge', 6: 'PendingDischarge', } DBUS_BUS = 'org.batteryd' DBUS_PATH = '/org/batteryd' DBUS_IFACE_XML = ''' ''' def _local_day_range(date_str): dt = datetime.strptime(date_str, '%Y-%m-%d') start = dt.timestamp() return start, start + 86400 def find_battery(sys_bus): try: result = sys_bus.call_sync( UPOWER_BUS, '/org/freedesktop/UPower', 'org.freedesktop.UPower', 'EnumerateDevices', None, GLib.VariantType('(ao)'), Gio.DBusCallFlags.NONE, -1, None) for path in result.unpack()[0]: props = sys_bus.call_sync( UPOWER_BUS, path, DBUS_PROPS_IFACE, 'Get', GLib.Variant('(ss)', (UPOWER_DEVICE_IFACE, 'Type')), GLib.VariantType('(v)'), Gio.DBusCallFlags.NONE, -1, None) if props.unpack()[0] == 2: return path except GLib.Error as e: log.error('UPower: %s', e.message) return None def read_battery(sys_bus, bat_path): try: result = sys_bus.call_sync( UPOWER_BUS, bat_path, DBUS_PROPS_IFACE, 'GetAll', GLib.Variant('(s)', (UPOWER_DEVICE_IFACE,)), GLib.VariantType('(a{sv})'), Gio.DBusCallFlags.NONE, -1, None) props = result.unpack()[0] return { 'level': props.get('Percentage', 0.0), 'energy_wh': props.get('Energy', 0.0), 'power_w': props.get('EnergyRate', 0.0), 'voltage_v': props.get('Voltage', 0.0), 'state': props.get('State', 0), } except GLib.Error as e: log.error('read: %s', e.message) return None def open_db(): os.makedirs(DB_DIR, exist_ok=True) db = sqlite3.connect(DB_PATH) db.execute('PRAGMA journal_mode=WAL') db.execute(''' CREATE TABLE IF NOT EXISTS samples ( ts REAL NOT NULL, level REAL NOT NULL, energy_wh REAL NOT NULL, power_w REAL NOT NULL, voltage_v REAL NOT NULL DEFAULT 0, state INTEGER NOT NULL ) ''') db.execute('CREATE INDEX IF NOT EXISTS idx_samples_ts ON samples (ts)') db.execute(''' CREATE TABLE IF NOT EXISTS sessions ( id INTEGER PRIMARY KEY, start_ts REAL NOT NULL, end_ts REAL NOT NULL, start_level REAL NOT NULL, end_level REAL NOT NULL, status TEXT NOT NULL ) ''') db.execute('CREATE INDEX IF NOT EXISTS idx_sessions_ts ON sessions (start_ts)') db.execute(''' CREATE TABLE IF NOT EXISTS events ( ts REAL NOT NULL, type TEXT NOT NULL ) ''') db.execute('CREATE INDEX IF NOT EXISTS idx_events_ts ON events (ts)') # migrations cols = {r[1] for r in db.execute('PRAGMA table_info(samples)')} if 'voltage_v' not in cols: db.execute('ALTER TABLE samples ADD COLUMN voltage_v REAL NOT NULL DEFAULT 0') log.info('migrated: added voltage_v column') if 'energy_wh' not in cols: db.execute('ALTER TABLE samples ADD COLUMN energy_wh REAL NOT NULL DEFAULT 0') log.info('migrated: added energy_wh column') db.commit() return db class Battery: def __init__(self, db, sys_bus, bat_path): self.db = db self.sys_bus = sys_bus self.bat_path = bat_path self.buf = [] self.last_sample_ts = 0 self.last_state = None self.session_start_ts = 0 self.session_start_level = 0.0 self.current = None self.flush_timer_id = 0 def on_props_changed(self, changed_props): now = time.time() new_state = changed_props.get('State') state_changed = new_state is not None and new_state != self.last_state if not state_changed and (now - self.last_sample_ts) < MIN_SAMPLE_INTERVAL: return reading = read_battery(self.sys_bus, self.bat_path) if reading is None: return self.current = reading state = reading['state'] self.buf.append(( now, reading['level'], reading['energy_wh'], reading['power_w'], reading['voltage_v'], float(state))) self.last_sample_ts = now log.debug('%.1f%% %.3fWh %.2fW %.3fV %s', reading['level'], reading['energy_wh'], reading['power_w'], reading['voltage_v'], STATE_NAMES.get(state, '?')) if state_changed: if self.last_state is not None: self._close_session(now, reading['level']) self.session_start_ts = now self.session_start_level = reading['level'] self.last_state = state def _close_session(self, end_ts, end_level): if self.session_start_ts == 0: return status = STATE_NAMES.get(self.last_state, 'Unknown') self.db.execute( 'INSERT INTO sessions (start_ts, end_ts, start_level, end_level, status)' ' VALUES (?, ?, ?, ?, ?)', (self.session_start_ts, end_ts, self.session_start_level, end_level, status)) def _log_event(self, event_type): now = time.time() self.db.execute('INSERT INTO events (ts, type) VALUES (?, ?)', (now, event_type)) def on_suspend(self): reading = read_battery(self.sys_bus, self.bat_path) if reading: self.current = reading now = time.time() if self.current and self.last_state is not None: self._close_session(now, self.current['level']) self._log_event('suspend') self.flush() self.session_start_ts = 0 self.last_state = None log.debug('suspended at %.1f%%', self.current['level'] if self.current else 0) def on_resume(self): self._log_event('resume') reading = read_battery(self.sys_bus, self.bat_path) if reading is None: return self.current = reading now = time.time() state = reading['state'] self.buf.append(( now, reading['level'], reading['energy_wh'], reading['power_w'], reading['voltage_v'], float(state))) self.last_sample_ts = now self.last_state = state self.session_start_ts = now self.session_start_level = reading['level'] log.debug('resumed: %.1f%% %s', reading['level'], STATE_NAMES.get(state, '?')) def flush(self): if not self.buf: return self.db.executemany( 'INSERT INTO samples (ts, level, energy_wh, power_w, voltage_v, state)' ' VALUES (?, ?, ?, ?, ?, ?)', self.buf) self.db.commit() log.debug('flushed %d samples', len(self.buf)) self.buf.clear() def shutdown(self): if self.current and self.last_state is not None: self._close_session(time.time(), self.current['level']) self._log_event('shutdown') self.flush() self.db.commit() def start(self): reading = read_battery(self.sys_bus, self.bat_path) if reading: self.current = reading now = time.time() self.last_state = reading['state'] self.session_start_ts = now self.session_start_level = reading['level'] self.buf.append(( now, reading['level'], reading['energy_wh'], reading['power_w'], reading['voltage_v'], float(reading['state']))) self.last_sample_ts = now log.info('initial: %.1f%% %.3fWh %.2fW %.3fV %s', reading['level'], reading['energy_wh'], reading['power_w'], reading['voltage_v'], STATE_NAMES.get(reading['state'], '?')) self._log_event('start') self.flush_timer_id = GLib.timeout_add_seconds( FLUSH_INTERVAL, self._on_flush_timer) def _on_flush_timer(self): self.flush() return True def query_samples(self, date_str): day_start, day_end = _local_day_range(date_str) rows = self.db.execute(''' SELECT ts, level, energy_wh, power_w, voltage_v, state FROM samples WHERE ts >= ? AND ts < ? ORDER BY ts ''', (day_start, day_end)).fetchall() for s in self.buf: if day_start <= s[0] < day_end: rows.append(s) rows.sort(key=lambda r: r[0]) return [(float(ts), float(lv), float(e), float(pw), float(v), float(st)) for ts, lv, e, pw, v, st in rows] def query_sessions(self, date_str): day_start, day_end = _local_day_range(date_str) rows = self.db.execute(''' SELECT start_ts, end_ts, start_level, end_level, status FROM sessions WHERE end_ts > ? AND start_ts < ? ORDER BY start_ts ''', (day_start, day_end)).fetchall() return [(float(s), float(e), float(sl), float(el), st) for s, e, sl, el, st in rows] def query_events(self, date_str): day_start, day_end = _local_day_range(date_str) rows = self.db.execute(''' SELECT ts, type FROM events WHERE ts >= ? AND ts < ? ORDER BY ts ''', (day_start, day_end)).fetchall() return [(float(ts), t) for ts, t in rows] def main(): logging.basicConfig( level=logging.DEBUG if os.environ.get('BATTERYD_DEBUG') else logging.INFO, format='%(name)s: %(message)s', ) sys_bus = Gio.bus_get_sync(Gio.BusType.SYSTEM) bat_path = find_battery(sys_bus) if not bat_path: log.error('no battery found, exiting') return log.info('tracking: %s', bat_path) db = open_db() tracker = Battery(db, sys_bus, bat_path) tracker.start() loop = GLib.MainLoop() bus = Gio.bus_get_sync(Gio.BusType.SESSION) sys_bus.signal_subscribe( UPOWER_BUS, DBUS_PROPS_IFACE, 'PropertiesChanged', bat_path, None, Gio.DBusSignalFlags.NONE, lambda conn, sender, path, iface, sig, params: ( tracker.on_props_changed(dict(params.unpack()[1])) )) # handle suspend/resume sys_bus.signal_subscribe( 'org.freedesktop.login1', 'org.freedesktop.login1.Manager', 'PrepareForSleep', '/org/freedesktop/login1', None, Gio.DBusSignalFlags.NONE, lambda conn, sender, path, iface, sig, params: ( tracker.on_suspend() if params.unpack()[0] else tracker.on_resume() )) node = Gio.DBusNodeInfo.new_for_xml(DBUS_IFACE_XML) def on_method_call(conn, sender, path, iface, method, params, invocation): try: if method == 'GetSamples': rows = tracker.query_samples(params.unpack()[0]) invocation.return_value(GLib.Variant('(a(dddddd))', (rows,))) elif method == 'GetSessions': rows = tracker.query_sessions(params.unpack()[0]) invocation.return_value(GLib.Variant('(a(dddds))', (rows,))) elif method == 'GetEvents': rows = tracker.query_events(params.unpack()[0]) invocation.return_value(GLib.Variant('(a(ds))', (rows,))) elif method == 'GetCurrent': if tracker.current: r = tracker.current invocation.return_value(GLib.Variant('(dddds)', ( r['level'], r['energy_wh'], r['power_w'], r['voltage_v'], STATE_NAMES.get(r['state'], 'Unknown')))) else: invocation.return_value(GLib.Variant('(dddds)', ( 0.0, 0.0, 0.0, 0.0, 'Unknown'))) except Exception as e: log.error('D-Bus method %s: %s', method, e) invocation.return_dbus_error('org.batteryd.Error', str(e)) bus.register_object_with_closures2( DBUS_PATH, node.interfaces[0], on_method_call, None, None) Gio.bus_own_name_on_connection( bus, DBUS_BUS, Gio.BusNameOwnerFlags.NONE, None, None) def on_term(): tracker.shutdown() loop.quit() GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGTERM, on_term) GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT, on_term) log.info('archiving to %s', DB_PATH) loop.run() log.info('stopped') if __name__ == '__main__': main()