mirror of
https://github.com/morgan9e/gnome-screentime
synced 2026-04-13 15:54:11 +09:00
429 lines
14 KiB
Python
429 lines
14 KiB
Python
#!/usr/bin/python3 -sP
|
|
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sqlite3
|
|
import time
|
|
|
|
import gi
|
|
gi.require_version('Gio', '2.0')
|
|
gi.require_version('GLib', '2.0')
|
|
from gi.repository import Gio, GLib
|
|
|
|
log = logging.getLogger('screentimed')
|
|
|
|
DB_DIR = os.path.join(GLib.get_user_data_dir(), 'screentime')
|
|
DB_PATH = os.path.join(DB_DIR, 'usage.db')
|
|
|
|
FLUSH_INTERVAL = 60
|
|
|
|
TRACKER_BUS = 'org.gnome.Shell'
|
|
TRACKER_IFACE = 'org.gnome.Shell.UsageTracker'
|
|
TRACKER_PATH = '/org/gnome/Shell/UsageTracker'
|
|
|
|
SCREENTIMED_BUS = 'org.gnome.ScreenTime'
|
|
SCREENTIMED_PATH = '/org/gnome/ScreenTime'
|
|
SCREENTIMED_IFACE_XML = '''<node>
|
|
<interface name="org.gnome.ScreenTime">
|
|
<method name="GetUsage">
|
|
<arg direction="in" name="date" type="s"/>
|
|
<arg direction="out" name="focused" type="a{su}"/>
|
|
<arg direction="out" name="running" type="a{su}"/>
|
|
</method>
|
|
<method name="GetUsageToday">
|
|
<arg direction="out" name="focused" type="a{su}"/>
|
|
<arg direction="out" name="running" type="a{su}"/>
|
|
</method>
|
|
<method name="GetUsageByHour">
|
|
<arg direction="in" name="date" type="s"/>
|
|
<arg direction="out" name="focused" type="aa{su}"/>
|
|
<arg direction="out" name="running" type="aa{su}"/>
|
|
</method>
|
|
<method name="GetTimeline">
|
|
<arg direction="in" name="date" type="s"/>
|
|
<arg direction="out" name="focused" type="a(sdd)"/>
|
|
<arg direction="out" name="running" type="a(sdd)"/>
|
|
</method>
|
|
</interface>
|
|
</node>'''
|
|
|
|
|
|
# ── database ──────────────────────────────────────────────
|
|
|
|
def open_db():
|
|
os.makedirs(DB_DIR, exist_ok=True)
|
|
db = sqlite3.connect(DB_PATH)
|
|
db.execute('PRAGMA journal_mode=WAL')
|
|
db.execute('''
|
|
CREATE TABLE IF NOT EXISTS focus_sessions (
|
|
id INTEGER PRIMARY KEY,
|
|
app_id TEXT NOT NULL,
|
|
start_ts REAL NOT NULL,
|
|
end_ts REAL NOT NULL
|
|
)
|
|
''')
|
|
db.execute('''
|
|
CREATE TABLE IF NOT EXISTS running_sessions (
|
|
id INTEGER PRIMARY KEY,
|
|
app_id TEXT NOT NULL,
|
|
start_ts REAL NOT NULL,
|
|
end_ts REAL NOT NULL
|
|
)
|
|
''')
|
|
db.execute('''
|
|
CREATE INDEX IF NOT EXISTS idx_focus_time
|
|
ON focus_sessions (start_ts, end_ts)
|
|
''')
|
|
db.execute('''
|
|
CREATE INDEX IF NOT EXISTS idx_running_time
|
|
ON running_sessions (start_ts, end_ts)
|
|
''')
|
|
# migrate from old schema
|
|
try:
|
|
db.execute('''
|
|
INSERT INTO focus_sessions (app_id, start_ts, end_ts)
|
|
SELECT app_id, start_ts, end_ts FROM sessions
|
|
''')
|
|
db.execute('DROP TABLE sessions')
|
|
db.commit()
|
|
log.info('migrated old sessions table')
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
db.commit()
|
|
return db
|
|
|
|
|
|
def _date_to_ts(date):
|
|
return time.mktime(time.strptime(date, '%Y-%m-%d'))
|
|
|
|
|
|
def _day_range(date):
|
|
start = _date_to_ts(date)
|
|
return start, start + 86400
|
|
|
|
|
|
def _query_db(db, table, date):
|
|
return db.execute(f'''
|
|
SELECT app_id, start_ts, end_ts
|
|
FROM {table}
|
|
WHERE end_ts > unixepoch(:date)
|
|
AND start_ts < unixepoch(:date, '+1 day')
|
|
''', {'date': date}).fetchall()
|
|
|
|
|
|
def _aggregate_daily(sessions, day_start, day_end):
|
|
usage = {}
|
|
for app_id, start, end in sessions:
|
|
overlap = min(end, day_end) - max(start, day_start)
|
|
if overlap > 0:
|
|
usage[app_id] = usage.get(app_id, 0) + int(overlap)
|
|
return usage
|
|
|
|
|
|
def _aggregate_hourly(sessions, day_start):
|
|
hours = [{} for _ in range(24)]
|
|
for app_id, start, end in sessions:
|
|
for h in range(24):
|
|
h_start = day_start + h * 3600
|
|
h_end = h_start + 3600
|
|
overlap = min(end, h_end) - max(start, h_start)
|
|
if overlap > 0:
|
|
hours[h][app_id] = hours[h].get(app_id, 0) + int(overlap)
|
|
return hours
|
|
|
|
|
|
def _clip_to_day(sessions, day_start, day_end):
|
|
"""Return (app_id, clipped_start, clipped_end) tuples."""
|
|
out = []
|
|
for app_id, start, end in sessions:
|
|
s = max(start, day_start)
|
|
e = min(end, day_end)
|
|
if e > s:
|
|
out.append((app_id, s, e))
|
|
return out
|
|
|
|
|
|
# ── tracker ───────────────────────────────────────────────
|
|
|
|
class Tracker:
|
|
def __init__(self, db):
|
|
self.db = db
|
|
self.focus_buf = []
|
|
self.running_buf = []
|
|
|
|
self.current_app = ''
|
|
self.session_start = time.time()
|
|
self.mono_start = time.monotonic()
|
|
|
|
self.running_apps = {} # app_id -> (wall_start, mono_start)
|
|
|
|
self.idle = False
|
|
self.flush_timer_id = 0
|
|
|
|
# ── focus ──
|
|
|
|
def on_focus_changed(self, app_id):
|
|
if self.idle:
|
|
return
|
|
self._close_focus()
|
|
self.current_app = app_id
|
|
self.session_start = time.time()
|
|
self.mono_start = time.monotonic()
|
|
log.debug('focus: %s', app_id or '(none)')
|
|
|
|
def _close_focus(self):
|
|
elapsed = time.monotonic() - self.mono_start
|
|
if not self.current_app or elapsed < 1:
|
|
return
|
|
end_ts = self.session_start + elapsed
|
|
self.focus_buf.append((self.current_app, self.session_start, end_ts))
|
|
|
|
# ── running ──
|
|
|
|
def on_running_changed(self, app_ids):
|
|
now_wall = time.time()
|
|
now_mono = time.monotonic()
|
|
|
|
current = set(app_ids)
|
|
previous = set(self.running_apps.keys())
|
|
|
|
# closed apps
|
|
for app_id in previous - current:
|
|
wall_start, mono_start = self.running_apps.pop(app_id)
|
|
end_ts = wall_start + (now_mono - mono_start)
|
|
if end_ts - wall_start >= 1:
|
|
self.running_buf.append((app_id, wall_start, end_ts))
|
|
log.debug('closed: %s', app_id)
|
|
|
|
# new apps
|
|
for app_id in current - previous:
|
|
self.running_apps[app_id] = (now_wall, now_mono)
|
|
log.debug('opened: %s', app_id)
|
|
|
|
def _close_all_running(self):
|
|
now_wall = time.time()
|
|
now_mono = time.monotonic()
|
|
for app_id, (wall_start, mono_start) in self.running_apps.items():
|
|
end_ts = wall_start + (now_mono - mono_start)
|
|
if end_ts - wall_start >= 1:
|
|
self.running_buf.append((app_id, wall_start, end_ts))
|
|
# reopen immediately
|
|
for app_id in list(self.running_apps):
|
|
self.running_apps[app_id] = (now_wall, now_mono)
|
|
|
|
# ── idle ──
|
|
|
|
def on_idle(self):
|
|
if self.idle:
|
|
return
|
|
self._close_focus()
|
|
self._close_all_running()
|
|
self.idle = True
|
|
self._flush_to_db()
|
|
self._stop_timer()
|
|
log.debug('idle (locked/blank)')
|
|
|
|
def on_active(self):
|
|
if not self.idle:
|
|
return
|
|
self.idle = False
|
|
now_wall = time.time()
|
|
now_mono = time.monotonic()
|
|
self.session_start = now_wall
|
|
self.mono_start = now_mono
|
|
# reopen running timestamps
|
|
for app_id in list(self.running_apps):
|
|
self.running_apps[app_id] = (now_wall, now_mono)
|
|
self._start_timer()
|
|
log.debug('active (unlocked)')
|
|
|
|
# ── flush ──
|
|
|
|
def flush(self):
|
|
if self.idle:
|
|
return
|
|
self._close_focus()
|
|
self._close_all_running()
|
|
self._flush_to_db()
|
|
self.session_start = time.time()
|
|
self.mono_start = time.monotonic()
|
|
|
|
def _flush_to_db(self):
|
|
if not self.focus_buf and not self.running_buf:
|
|
return
|
|
if self.focus_buf:
|
|
self.db.executemany(
|
|
'INSERT INTO focus_sessions (app_id, start_ts, end_ts) VALUES (?, ?, ?)',
|
|
self.focus_buf)
|
|
if self.running_buf:
|
|
self.db.executemany(
|
|
'INSERT INTO running_sessions (app_id, start_ts, end_ts) VALUES (?, ?, ?)',
|
|
self.running_buf)
|
|
self.db.commit()
|
|
log.debug('flushed %d focus + %d running',
|
|
len(self.focus_buf), len(self.running_buf))
|
|
self.focus_buf.clear()
|
|
self.running_buf.clear()
|
|
|
|
def _start_timer(self):
|
|
if self.flush_timer_id == 0:
|
|
self.flush_timer_id = GLib.timeout_add_seconds(
|
|
FLUSH_INTERVAL, self._on_timer)
|
|
|
|
def _stop_timer(self):
|
|
if self.flush_timer_id != 0:
|
|
GLib.source_remove(self.flush_timer_id)
|
|
self.flush_timer_id = 0
|
|
|
|
def _on_timer(self):
|
|
self.flush()
|
|
return True
|
|
|
|
# ── queries ──
|
|
|
|
def _all_sessions(self, table, buf, date):
|
|
day_start, day_end = _day_range(date)
|
|
sessions = _query_db(self.db, table, date)
|
|
|
|
for s in buf:
|
|
if s[2] > day_start and s[1] < day_end:
|
|
sessions.append(s)
|
|
|
|
return sessions
|
|
|
|
def _all_focus(self, date):
|
|
sessions = self._all_sessions('focus_sessions', self.focus_buf, date)
|
|
# add live focus
|
|
if not self.idle and self.current_app:
|
|
elapsed = time.monotonic() - self.mono_start
|
|
end_ts = self.session_start + elapsed
|
|
day_start, day_end = _day_range(date)
|
|
if end_ts > day_start and self.session_start < day_end:
|
|
sessions.append((self.current_app, self.session_start, end_ts))
|
|
return sessions
|
|
|
|
def _all_running(self, date):
|
|
sessions = self._all_sessions('running_sessions', self.running_buf, date)
|
|
# add live running
|
|
if not self.idle:
|
|
now_mono = time.monotonic()
|
|
day_start, day_end = _day_range(date)
|
|
for app_id, (wall_start, mono_start) in self.running_apps.items():
|
|
end_ts = wall_start + (now_mono - mono_start)
|
|
if end_ts > day_start and wall_start < day_end:
|
|
sessions.append((app_id, wall_start, end_ts))
|
|
return sessions
|
|
|
|
def get_usage(self, date):
|
|
day_start, day_end = _day_range(date)
|
|
focused = _aggregate_daily(self._all_focus(date), day_start, day_end)
|
|
running = _aggregate_daily(self._all_running(date), day_start, day_end)
|
|
return focused, running
|
|
|
|
def get_usage_by_hour(self, date):
|
|
day_start, _ = _day_range(date)
|
|
focused = _aggregate_hourly(self._all_focus(date), day_start)
|
|
running = _aggregate_hourly(self._all_running(date), day_start)
|
|
return focused, running
|
|
|
|
def get_timeline(self, date):
|
|
day_start, day_end = _day_range(date)
|
|
focused = _clip_to_day(self._all_focus(date), day_start, day_end)
|
|
running = _clip_to_day(self._all_running(date), day_start, day_end)
|
|
return focused, running
|
|
|
|
|
|
# ── main ──────────────────────────────────────────────────
|
|
|
|
def main():
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if os.environ.get('SCREENTIMED_DEBUG') else logging.INFO,
|
|
format='%(name)s: %(message)s',
|
|
)
|
|
|
|
db = open_db()
|
|
tracker = Tracker(db)
|
|
loop = GLib.MainLoop()
|
|
bus = Gio.bus_get_sync(Gio.BusType.SESSION)
|
|
|
|
# listen to the extension
|
|
bus.signal_subscribe(
|
|
TRACKER_BUS, TRACKER_IFACE, 'FocusChanged',
|
|
TRACKER_PATH, None, Gio.DBusSignalFlags.NONE,
|
|
lambda *a: tracker.on_focus_changed(a[5].unpack()[0]))
|
|
|
|
bus.signal_subscribe(
|
|
TRACKER_BUS, TRACKER_IFACE, 'RunningAppsChanged',
|
|
TRACKER_PATH, None, Gio.DBusSignalFlags.NONE,
|
|
lambda *a: tracker.on_running_changed(a[5].unpack()[0]))
|
|
|
|
# pause tracking on lock/blank
|
|
bus.signal_subscribe(
|
|
'org.gnome.ScreenSaver', 'org.gnome.ScreenSaver', 'ActiveChanged',
|
|
'/org/gnome/ScreenSaver', None, Gio.DBusSignalFlags.NONE,
|
|
lambda *a: tracker.on_idle() if a[5].unpack()[0] else tracker.on_active())
|
|
|
|
# fetch initial state
|
|
try:
|
|
result = bus.call_sync(
|
|
TRACKER_BUS, TRACKER_PATH, TRACKER_IFACE, 'GetFocus',
|
|
None, GLib.VariantType('(s)'),
|
|
Gio.DBusCallFlags.NONE, -1, None)
|
|
tracker.current_app = result.unpack()[0]
|
|
|
|
result = bus.call_sync(
|
|
TRACKER_BUS, TRACKER_PATH, TRACKER_IFACE, 'GetRunningApps',
|
|
None, GLib.VariantType('(as)'),
|
|
Gio.DBusCallFlags.NONE, -1, None)
|
|
now_wall = time.time()
|
|
now_mono = time.monotonic()
|
|
for app_id in result.unpack()[0]:
|
|
tracker.running_apps[app_id] = (now_wall, now_mono)
|
|
|
|
log.debug('initial: focus=%s, running=%s',
|
|
tracker.current_app, list(tracker.running_apps.keys()))
|
|
except GLib.Error:
|
|
log.debug('extension not yet available')
|
|
|
|
# expose analytics over d-bus
|
|
node = Gio.DBusNodeInfo.new_for_xml(SCREENTIMED_IFACE_XML)
|
|
|
|
def on_method_call(conn, sender, path, iface, method, params, invocation):
|
|
if method == 'GetUsageToday':
|
|
f, r = tracker.get_usage(time.strftime('%Y-%m-%d'))
|
|
invocation.return_value(GLib.Variant('(a{su}a{su})', (f, r)))
|
|
elif method == 'GetUsage':
|
|
f, r = tracker.get_usage(params.unpack()[0])
|
|
invocation.return_value(GLib.Variant('(a{su}a{su})', (f, r)))
|
|
elif method == 'GetUsageByHour':
|
|
f, r = tracker.get_usage_by_hour(params.unpack()[0])
|
|
invocation.return_value(GLib.Variant('(aa{su}aa{su})', (f, r)))
|
|
elif method == 'GetTimeline':
|
|
f, r = tracker.get_timeline(params.unpack()[0])
|
|
invocation.return_value(GLib.Variant('(a(sdd)a(sdd))', (f, r)))
|
|
|
|
bus.register_object_with_closures2(
|
|
SCREENTIMED_PATH, node.interfaces[0],
|
|
on_method_call, None, None)
|
|
|
|
Gio.bus_own_name_on_connection(
|
|
bus, SCREENTIMED_BUS, Gio.BusNameOwnerFlags.NONE, None, None)
|
|
|
|
tracker._start_timer()
|
|
|
|
def on_term():
|
|
tracker.flush()
|
|
loop.quit()
|
|
|
|
GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGTERM, on_term)
|
|
GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT, on_term)
|
|
|
|
log.info('tracking to %s', DB_PATH)
|
|
loop.run()
|
|
log.info('stopped')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|