Files
2026-04-01 14:42:47 +09:00

425 lines
14 KiB
Python
Executable File

#!/usr/bin/python3 -sP
"""
batteryd — battery tracking daemon
Subscribes to UPower's PropertiesChanged on the battery device,
logs percentage + energy_wh + power_w on each change.
Stores in SQLite for long-term retention.
"""
import logging
import os
import signal
import sqlite3
import time
from datetime import datetime
import gi
gi.require_version('Gio', '2.0')
gi.require_version('GLib', '2.0')
from gi.repository import Gio, GLib
log = logging.getLogger('batteryd')
DB_DIR = os.path.join(GLib.get_user_data_dir(), 'batteryd')
DB_PATH = os.path.join(DB_DIR, 'battery.db')
FLUSH_INTERVAL = 5 * 60
MIN_SAMPLE_INTERVAL = 10
UPOWER_BUS = 'org.freedesktop.UPower'
UPOWER_DEVICE_IFACE = 'org.freedesktop.UPower.Device'
DBUS_PROPS_IFACE = 'org.freedesktop.DBus.Properties'
STATE_NAMES = {
0: 'Unknown', 1: 'Charging', 2: 'Discharging',
3: 'Empty', 4: 'Full', 5: 'PendingCharge', 6: 'PendingDischarge',
}
DBUS_BUS = 'org.batteryd'
DBUS_PATH = '/org/batteryd'
DBUS_IFACE_XML = '''<node>
<interface name="org.batteryd">
<method name="GetSamples">
<arg direction="in" name="date" type="s"/>
<arg direction="out" name="samples" type="a(dddddd)"/>
</method>
<method name="GetSessions">
<arg direction="in" name="date" type="s"/>
<arg direction="out" name="sessions" type="a(dddds)"/>
</method>
<method name="GetEvents">
<arg direction="in" name="date" type="s"/>
<arg direction="out" name="events" type="a(ds)"/>
</method>
<method name="GetCurrent">
<arg direction="out" name="level" type="d"/>
<arg direction="out" name="energy_wh" type="d"/>
<arg direction="out" name="power_w" type="d"/>
<arg direction="out" name="voltage_v" type="d"/>
<arg direction="out" name="status" type="s"/>
</method>
</interface>
</node>'''
def _local_day_range(date_str):
dt = datetime.strptime(date_str, '%Y-%m-%d')
start = dt.timestamp()
return start, start + 86400
def find_battery(sys_bus):
try:
result = sys_bus.call_sync(
UPOWER_BUS, '/org/freedesktop/UPower',
'org.freedesktop.UPower', 'EnumerateDevices',
None, GLib.VariantType('(ao)'),
Gio.DBusCallFlags.NONE, -1, None)
for path in result.unpack()[0]:
props = sys_bus.call_sync(
UPOWER_BUS, path, DBUS_PROPS_IFACE, 'Get',
GLib.Variant('(ss)', (UPOWER_DEVICE_IFACE, 'Type')),
GLib.VariantType('(v)'),
Gio.DBusCallFlags.NONE, -1, None)
if props.unpack()[0] == 2:
return path
except GLib.Error as e:
log.error('UPower: %s', e.message)
return None
def read_battery(sys_bus, bat_path):
try:
result = sys_bus.call_sync(
UPOWER_BUS, bat_path, DBUS_PROPS_IFACE, 'GetAll',
GLib.Variant('(s)', (UPOWER_DEVICE_IFACE,)),
GLib.VariantType('(a{sv})'),
Gio.DBusCallFlags.NONE, -1, None)
props = result.unpack()[0]
return {
'level': props.get('Percentage', 0.0),
'energy_wh': props.get('Energy', 0.0),
'power_w': props.get('EnergyRate', 0.0),
'voltage_v': props.get('Voltage', 0.0),
'state': props.get('State', 0),
}
except GLib.Error as e:
log.error('read: %s', e.message)
return None
def open_db():
os.makedirs(DB_DIR, exist_ok=True)
db = sqlite3.connect(DB_PATH)
db.execute('PRAGMA journal_mode=WAL')
db.execute('''
CREATE TABLE IF NOT EXISTS samples (
ts REAL NOT NULL,
level REAL NOT NULL,
energy_wh REAL NOT NULL,
power_w REAL NOT NULL,
voltage_v REAL NOT NULL DEFAULT 0,
state INTEGER NOT NULL
)
''')
db.execute('CREATE INDEX IF NOT EXISTS idx_samples_ts ON samples (ts)')
db.execute('''
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY,
start_ts REAL NOT NULL,
end_ts REAL NOT NULL,
start_level REAL NOT NULL,
end_level REAL NOT NULL,
status TEXT NOT NULL
)
''')
db.execute('CREATE INDEX IF NOT EXISTS idx_sessions_ts ON sessions (start_ts)')
db.execute('''
CREATE TABLE IF NOT EXISTS events (
ts REAL NOT NULL,
type TEXT NOT NULL
)
''')
db.execute('CREATE INDEX IF NOT EXISTS idx_events_ts ON events (ts)')
# migrations
cols = {r[1] for r in db.execute('PRAGMA table_info(samples)')}
if 'voltage_v' not in cols:
db.execute('ALTER TABLE samples ADD COLUMN voltage_v REAL NOT NULL DEFAULT 0')
log.info('migrated: added voltage_v column')
if 'energy_wh' not in cols:
db.execute('ALTER TABLE samples ADD COLUMN energy_wh REAL NOT NULL DEFAULT 0')
log.info('migrated: added energy_wh column')
db.commit()
return db
class Battery:
def __init__(self, db, sys_bus, bat_path):
self.db = db
self.sys_bus = sys_bus
self.bat_path = bat_path
self.buf = []
self.last_sample_ts = 0
self.last_state = None
self.session_start_ts = 0
self.session_start_level = 0.0
self.current = None
self.flush_timer_id = 0
def on_props_changed(self, changed_props):
now = time.time()
new_state = changed_props.get('State')
state_changed = new_state is not None and new_state != self.last_state
if not state_changed and (now - self.last_sample_ts) < MIN_SAMPLE_INTERVAL:
return
reading = read_battery(self.sys_bus, self.bat_path)
if reading is None:
return
self.current = reading
state = reading['state']
self.buf.append((
now, reading['level'], reading['energy_wh'],
reading['power_w'], reading['voltage_v'], float(state)))
self.last_sample_ts = now
log.debug('%.1f%% %.3fWh %.2fW %.3fV %s',
reading['level'], reading['energy_wh'],
reading['power_w'], reading['voltage_v'],
STATE_NAMES.get(state, '?'))
if state_changed:
if self.last_state is not None:
self._close_session(now, reading['level'])
self.session_start_ts = now
self.session_start_level = reading['level']
self.last_state = state
def _close_session(self, end_ts, end_level):
if self.session_start_ts == 0:
return
status = STATE_NAMES.get(self.last_state, 'Unknown')
self.db.execute(
'INSERT INTO sessions (start_ts, end_ts, start_level, end_level, status)'
' VALUES (?, ?, ?, ?, ?)',
(self.session_start_ts, end_ts,
self.session_start_level, end_level, status))
def _log_event(self, event_type):
now = time.time()
self.db.execute('INSERT INTO events (ts, type) VALUES (?, ?)',
(now, event_type))
def on_suspend(self):
reading = read_battery(self.sys_bus, self.bat_path)
if reading:
self.current = reading
now = time.time()
if self.current and self.last_state is not None:
self._close_session(now, self.current['level'])
self._log_event('suspend')
self.flush()
self.session_start_ts = 0
self.last_state = None
log.debug('suspended at %.1f%%',
self.current['level'] if self.current else 0)
def on_resume(self):
self._log_event('resume')
reading = read_battery(self.sys_bus, self.bat_path)
if reading is None:
return
self.current = reading
now = time.time()
state = reading['state']
self.buf.append((
now, reading['level'], reading['energy_wh'],
reading['power_w'], reading['voltage_v'], float(state)))
self.last_sample_ts = now
self.last_state = state
self.session_start_ts = now
self.session_start_level = reading['level']
log.debug('resumed: %.1f%% %s', reading['level'],
STATE_NAMES.get(state, '?'))
def flush(self):
if not self.buf:
return
self.db.executemany(
'INSERT INTO samples (ts, level, energy_wh, power_w, voltage_v, state)'
' VALUES (?, ?, ?, ?, ?, ?)',
self.buf)
self.db.commit()
log.debug('flushed %d samples', len(self.buf))
self.buf.clear()
def shutdown(self):
if self.current and self.last_state is not None:
self._close_session(time.time(), self.current['level'])
self._log_event('shutdown')
self.flush()
self.db.commit()
def start(self):
reading = read_battery(self.sys_bus, self.bat_path)
if reading:
self.current = reading
now = time.time()
self.last_state = reading['state']
self.session_start_ts = now
self.session_start_level = reading['level']
self.buf.append((
now, reading['level'], reading['energy_wh'],
reading['power_w'], reading['voltage_v'],
float(reading['state'])))
self.last_sample_ts = now
log.info('initial: %.1f%% %.3fWh %.2fW %.3fV %s',
reading['level'], reading['energy_wh'],
reading['power_w'], reading['voltage_v'],
STATE_NAMES.get(reading['state'], '?'))
self._log_event('start')
self.flush_timer_id = GLib.timeout_add_seconds(
FLUSH_INTERVAL, self._on_flush_timer)
def _on_flush_timer(self):
self.flush()
return True
def query_samples(self, date_str):
day_start, day_end = _local_day_range(date_str)
rows = self.db.execute('''
SELECT ts, level, energy_wh, power_w, voltage_v, state FROM samples
WHERE ts >= ? AND ts < ?
ORDER BY ts
''', (day_start, day_end)).fetchall()
for s in self.buf:
if day_start <= s[0] < day_end:
rows.append(s)
rows.sort(key=lambda r: r[0])
return [(float(ts), float(lv), float(e), float(pw), float(v), float(st))
for ts, lv, e, pw, v, st in rows]
def query_sessions(self, date_str):
day_start, day_end = _local_day_range(date_str)
rows = self.db.execute('''
SELECT start_ts, end_ts, start_level, end_level, status
FROM sessions
WHERE end_ts > ? AND start_ts < ?
ORDER BY start_ts
''', (day_start, day_end)).fetchall()
return [(float(s), float(e), float(sl), float(el), st)
for s, e, sl, el, st in rows]
def query_events(self, date_str):
day_start, day_end = _local_day_range(date_str)
rows = self.db.execute('''
SELECT ts, type FROM events
WHERE ts >= ? AND ts < ?
ORDER BY ts
''', (day_start, day_end)).fetchall()
return [(float(ts), t) for ts, t in rows]
def main():
logging.basicConfig(
level=logging.DEBUG if os.environ.get('BATTERYD_DEBUG') else logging.INFO,
format='%(name)s: %(message)s',
)
sys_bus = Gio.bus_get_sync(Gio.BusType.SYSTEM)
bat_path = find_battery(sys_bus)
if not bat_path:
log.error('no battery found, exiting')
return
log.info('tracking: %s', bat_path)
db = open_db()
tracker = Battery(db, sys_bus, bat_path)
tracker.start()
loop = GLib.MainLoop()
bus = Gio.bus_get_sync(Gio.BusType.SESSION)
sys_bus.signal_subscribe(
UPOWER_BUS, DBUS_PROPS_IFACE, 'PropertiesChanged',
bat_path, None, Gio.DBusSignalFlags.NONE,
lambda conn, sender, path, iface, sig, params: (
tracker.on_props_changed(dict(params.unpack()[1]))
))
# handle suspend/resume
sys_bus.signal_subscribe(
'org.freedesktop.login1', 'org.freedesktop.login1.Manager',
'PrepareForSleep',
'/org/freedesktop/login1', None, Gio.DBusSignalFlags.NONE,
lambda conn, sender, path, iface, sig, params: (
tracker.on_suspend() if params.unpack()[0]
else tracker.on_resume()
))
node = Gio.DBusNodeInfo.new_for_xml(DBUS_IFACE_XML)
def on_method_call(conn, sender, path, iface, method, params, invocation):
try:
if method == 'GetSamples':
rows = tracker.query_samples(params.unpack()[0])
invocation.return_value(GLib.Variant('(a(dddddd))', (rows,)))
elif method == 'GetSessions':
rows = tracker.query_sessions(params.unpack()[0])
invocation.return_value(GLib.Variant('(a(dddds))', (rows,)))
elif method == 'GetEvents':
rows = tracker.query_events(params.unpack()[0])
invocation.return_value(GLib.Variant('(a(ds))', (rows,)))
elif method == 'GetCurrent':
if tracker.current:
r = tracker.current
invocation.return_value(GLib.Variant('(dddds)', (
r['level'], r['energy_wh'], r['power_w'],
r['voltage_v'],
STATE_NAMES.get(r['state'], 'Unknown'))))
else:
invocation.return_value(GLib.Variant('(dddds)', (
0.0, 0.0, 0.0, 0.0, 'Unknown')))
except Exception as e:
log.error('D-Bus method %s: %s', method, e)
invocation.return_dbus_error('org.batteryd.Error', str(e))
bus.register_object_with_closures2(
DBUS_PATH, node.interfaces[0],
on_method_call, None, None)
Gio.bus_own_name_on_connection(
bus, DBUS_BUS, Gio.BusNameOwnerFlags.NONE, None, None)
def on_term():
tracker.shutdown()
loop.quit()
GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGTERM, on_term)
GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT, on_term)
log.info('archiving to %s', DB_PATH)
loop.run()
log.info('stopped')
if __name__ == '__main__':
main()