Files
linux-sys-telemetry/screentimed/screentimed
2026-03-11 11:52:02 +09:00

429 lines
14 KiB
Python

#!/usr/bin/python3 -sP
import logging
import os
import signal
import sqlite3
import time
import gi
gi.require_version('Gio', '2.0')
gi.require_version('GLib', '2.0')
from gi.repository import Gio, GLib
log = logging.getLogger('screentimed')
DB_DIR = os.path.join(GLib.get_user_data_dir(), 'screentime')
DB_PATH = os.path.join(DB_DIR, 'usage.db')
FLUSH_INTERVAL = 60
TRACKER_BUS = 'org.gnome.Shell'
TRACKER_IFACE = 'org.gnome.Shell.UsageTracker'
TRACKER_PATH = '/org/gnome/Shell/UsageTracker'
SCREENTIMED_BUS = 'org.gnome.ScreenTime'
SCREENTIMED_PATH = '/org/gnome/ScreenTime'
SCREENTIMED_IFACE_XML = '''<node>
<interface name="org.gnome.ScreenTime">
<method name="GetUsage">
<arg direction="in" name="date" type="s"/>
<arg direction="out" name="focused" type="a{su}"/>
<arg direction="out" name="running" type="a{su}"/>
</method>
<method name="GetUsageToday">
<arg direction="out" name="focused" type="a{su}"/>
<arg direction="out" name="running" type="a{su}"/>
</method>
<method name="GetUsageByHour">
<arg direction="in" name="date" type="s"/>
<arg direction="out" name="focused" type="aa{su}"/>
<arg direction="out" name="running" type="aa{su}"/>
</method>
<method name="GetTimeline">
<arg direction="in" name="date" type="s"/>
<arg direction="out" name="focused" type="a(sdd)"/>
<arg direction="out" name="running" type="a(sdd)"/>
</method>
</interface>
</node>'''
# ── database ──────────────────────────────────────────────
def open_db():
os.makedirs(DB_DIR, exist_ok=True)
db = sqlite3.connect(DB_PATH)
db.execute('PRAGMA journal_mode=WAL')
db.execute('''
CREATE TABLE IF NOT EXISTS focus_sessions (
id INTEGER PRIMARY KEY,
app_id TEXT NOT NULL,
start_ts REAL NOT NULL,
end_ts REAL NOT NULL
)
''')
db.execute('''
CREATE TABLE IF NOT EXISTS running_sessions (
id INTEGER PRIMARY KEY,
app_id TEXT NOT NULL,
start_ts REAL NOT NULL,
end_ts REAL NOT NULL
)
''')
db.execute('''
CREATE INDEX IF NOT EXISTS idx_focus_time
ON focus_sessions (start_ts, end_ts)
''')
db.execute('''
CREATE INDEX IF NOT EXISTS idx_running_time
ON running_sessions (start_ts, end_ts)
''')
# migrate from old schema
try:
db.execute('''
INSERT INTO focus_sessions (app_id, start_ts, end_ts)
SELECT app_id, start_ts, end_ts FROM sessions
''')
db.execute('DROP TABLE sessions')
db.commit()
log.info('migrated old sessions table')
except sqlite3.OperationalError:
pass
db.commit()
return db
def _date_to_ts(date):
return time.mktime(time.strptime(date, '%Y-%m-%d'))
def _day_range(date):
start = _date_to_ts(date)
return start, start + 86400
def _query_db(db, table, date):
return db.execute(f'''
SELECT app_id, start_ts, end_ts
FROM {table}
WHERE end_ts > unixepoch(:date)
AND start_ts < unixepoch(:date, '+1 day')
''', {'date': date}).fetchall()
def _aggregate_daily(sessions, day_start, day_end):
usage = {}
for app_id, start, end in sessions:
overlap = min(end, day_end) - max(start, day_start)
if overlap > 0:
usage[app_id] = usage.get(app_id, 0) + int(overlap)
return usage
def _aggregate_hourly(sessions, day_start):
hours = [{} for _ in range(24)]
for app_id, start, end in sessions:
for h in range(24):
h_start = day_start + h * 3600
h_end = h_start + 3600
overlap = min(end, h_end) - max(start, h_start)
if overlap > 0:
hours[h][app_id] = hours[h].get(app_id, 0) + int(overlap)
return hours
def _clip_to_day(sessions, day_start, day_end):
"""Return (app_id, clipped_start, clipped_end) tuples."""
out = []
for app_id, start, end in sessions:
s = max(start, day_start)
e = min(end, day_end)
if e > s:
out.append((app_id, s, e))
return out
# ── tracker ───────────────────────────────────────────────
class Tracker:
def __init__(self, db):
self.db = db
self.focus_buf = []
self.running_buf = []
self.current_app = ''
self.session_start = time.time()
self.mono_start = time.monotonic()
self.running_apps = {} # app_id -> (wall_start, mono_start)
self.idle = False
self.flush_timer_id = 0
# ── focus ──
def on_focus_changed(self, app_id):
if self.idle:
return
self._close_focus()
self.current_app = app_id
self.session_start = time.time()
self.mono_start = time.monotonic()
log.debug('focus: %s', app_id or '(none)')
def _close_focus(self):
elapsed = time.monotonic() - self.mono_start
if not self.current_app or elapsed < 1:
return
end_ts = self.session_start + elapsed
self.focus_buf.append((self.current_app, self.session_start, end_ts))
# ── running ──
def on_running_changed(self, app_ids):
now_wall = time.time()
now_mono = time.monotonic()
current = set(app_ids)
previous = set(self.running_apps.keys())
# closed apps
for app_id in previous - current:
wall_start, mono_start = self.running_apps.pop(app_id)
end_ts = wall_start + (now_mono - mono_start)
if end_ts - wall_start >= 1:
self.running_buf.append((app_id, wall_start, end_ts))
log.debug('closed: %s', app_id)
# new apps
for app_id in current - previous:
self.running_apps[app_id] = (now_wall, now_mono)
log.debug('opened: %s', app_id)
def _close_all_running(self):
now_wall = time.time()
now_mono = time.monotonic()
for app_id, (wall_start, mono_start) in self.running_apps.items():
end_ts = wall_start + (now_mono - mono_start)
if end_ts - wall_start >= 1:
self.running_buf.append((app_id, wall_start, end_ts))
# reopen immediately
for app_id in list(self.running_apps):
self.running_apps[app_id] = (now_wall, now_mono)
# ── idle ──
def on_idle(self):
if self.idle:
return
self._close_focus()
self._close_all_running()
self.idle = True
self._flush_to_db()
self._stop_timer()
log.debug('idle (locked/blank)')
def on_active(self):
if not self.idle:
return
self.idle = False
now_wall = time.time()
now_mono = time.monotonic()
self.session_start = now_wall
self.mono_start = now_mono
# reopen running timestamps
for app_id in list(self.running_apps):
self.running_apps[app_id] = (now_wall, now_mono)
self._start_timer()
log.debug('active (unlocked)')
# ── flush ──
def flush(self):
if self.idle:
return
self._close_focus()
self._close_all_running()
self._flush_to_db()
self.session_start = time.time()
self.mono_start = time.monotonic()
def _flush_to_db(self):
if not self.focus_buf and not self.running_buf:
return
if self.focus_buf:
self.db.executemany(
'INSERT INTO focus_sessions (app_id, start_ts, end_ts) VALUES (?, ?, ?)',
self.focus_buf)
if self.running_buf:
self.db.executemany(
'INSERT INTO running_sessions (app_id, start_ts, end_ts) VALUES (?, ?, ?)',
self.running_buf)
self.db.commit()
log.debug('flushed %d focus + %d running',
len(self.focus_buf), len(self.running_buf))
self.focus_buf.clear()
self.running_buf.clear()
def _start_timer(self):
if self.flush_timer_id == 0:
self.flush_timer_id = GLib.timeout_add_seconds(
FLUSH_INTERVAL, self._on_timer)
def _stop_timer(self):
if self.flush_timer_id != 0:
GLib.source_remove(self.flush_timer_id)
self.flush_timer_id = 0
def _on_timer(self):
self.flush()
return True
# ── queries ──
def _all_sessions(self, table, buf, date):
day_start, day_end = _day_range(date)
sessions = _query_db(self.db, table, date)
for s in buf:
if s[2] > day_start and s[1] < day_end:
sessions.append(s)
return sessions
def _all_focus(self, date):
sessions = self._all_sessions('focus_sessions', self.focus_buf, date)
# add live focus
if not self.idle and self.current_app:
elapsed = time.monotonic() - self.mono_start
end_ts = self.session_start + elapsed
day_start, day_end = _day_range(date)
if end_ts > day_start and self.session_start < day_end:
sessions.append((self.current_app, self.session_start, end_ts))
return sessions
def _all_running(self, date):
sessions = self._all_sessions('running_sessions', self.running_buf, date)
# add live running
if not self.idle:
now_mono = time.monotonic()
day_start, day_end = _day_range(date)
for app_id, (wall_start, mono_start) in self.running_apps.items():
end_ts = wall_start + (now_mono - mono_start)
if end_ts > day_start and wall_start < day_end:
sessions.append((app_id, wall_start, end_ts))
return sessions
def get_usage(self, date):
day_start, day_end = _day_range(date)
focused = _aggregate_daily(self._all_focus(date), day_start, day_end)
running = _aggregate_daily(self._all_running(date), day_start, day_end)
return focused, running
def get_usage_by_hour(self, date):
day_start, _ = _day_range(date)
focused = _aggregate_hourly(self._all_focus(date), day_start)
running = _aggregate_hourly(self._all_running(date), day_start)
return focused, running
def get_timeline(self, date):
day_start, day_end = _day_range(date)
focused = _clip_to_day(self._all_focus(date), day_start, day_end)
running = _clip_to_day(self._all_running(date), day_start, day_end)
return focused, running
# ── main ──────────────────────────────────────────────────
def main():
logging.basicConfig(
level=logging.DEBUG if os.environ.get('SCREENTIMED_DEBUG') else logging.INFO,
format='%(name)s: %(message)s',
)
db = open_db()
tracker = Tracker(db)
loop = GLib.MainLoop()
bus = Gio.bus_get_sync(Gio.BusType.SESSION)
# listen to the extension
bus.signal_subscribe(
TRACKER_BUS, TRACKER_IFACE, 'FocusChanged',
TRACKER_PATH, None, Gio.DBusSignalFlags.NONE,
lambda *a: tracker.on_focus_changed(a[5].unpack()[0]))
bus.signal_subscribe(
TRACKER_BUS, TRACKER_IFACE, 'RunningAppsChanged',
TRACKER_PATH, None, Gio.DBusSignalFlags.NONE,
lambda *a: tracker.on_running_changed(a[5].unpack()[0]))
# pause tracking on lock/blank
bus.signal_subscribe(
'org.gnome.ScreenSaver', 'org.gnome.ScreenSaver', 'ActiveChanged',
'/org/gnome/ScreenSaver', None, Gio.DBusSignalFlags.NONE,
lambda *a: tracker.on_idle() if a[5].unpack()[0] else tracker.on_active())
# fetch initial state
try:
result = bus.call_sync(
TRACKER_BUS, TRACKER_PATH, TRACKER_IFACE, 'GetFocus',
None, GLib.VariantType('(s)'),
Gio.DBusCallFlags.NONE, -1, None)
tracker.current_app = result.unpack()[0]
result = bus.call_sync(
TRACKER_BUS, TRACKER_PATH, TRACKER_IFACE, 'GetRunningApps',
None, GLib.VariantType('(as)'),
Gio.DBusCallFlags.NONE, -1, None)
now_wall = time.time()
now_mono = time.monotonic()
for app_id in result.unpack()[0]:
tracker.running_apps[app_id] = (now_wall, now_mono)
log.debug('initial: focus=%s, running=%s',
tracker.current_app, list(tracker.running_apps.keys()))
except GLib.Error:
log.debug('extension not yet available')
# expose analytics over d-bus
node = Gio.DBusNodeInfo.new_for_xml(SCREENTIMED_IFACE_XML)
def on_method_call(conn, sender, path, iface, method, params, invocation):
if method == 'GetUsageToday':
f, r = tracker.get_usage(time.strftime('%Y-%m-%d'))
invocation.return_value(GLib.Variant('(a{su}a{su})', (f, r)))
elif method == 'GetUsage':
f, r = tracker.get_usage(params.unpack()[0])
invocation.return_value(GLib.Variant('(a{su}a{su})', (f, r)))
elif method == 'GetUsageByHour':
f, r = tracker.get_usage_by_hour(params.unpack()[0])
invocation.return_value(GLib.Variant('(aa{su}aa{su})', (f, r)))
elif method == 'GetTimeline':
f, r = tracker.get_timeline(params.unpack()[0])
invocation.return_value(GLib.Variant('(a(sdd)a(sdd))', (f, r)))
bus.register_object_with_closures2(
SCREENTIMED_PATH, node.interfaces[0],
on_method_call, None, None)
Gio.bus_own_name_on_connection(
bus, SCREENTIMED_BUS, Gio.BusNameOwnerFlags.NONE, None, None)
tracker._start_timer()
def on_term():
tracker.flush()
loop.quit()
GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGTERM, on_term)
GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT, on_term)
log.info('tracking to %s', DB_PATH)
loop.run()
log.info('stopped')
if __name__ == '__main__':
main()