From 891b1941bd29366c8511f04c6e4162dfe5c3960d Mon Sep 17 00:00:00 2001 From: Morgan Date: Wed, 18 Mar 2026 17:47:10 +0900 Subject: [PATCH] Initial Commit --- .gitignore | 1 + auth.py | 141 ++++++++++++++++++++++++++++++++++++++++++++ bridge.py | 57 ++++++++++++++++++ crypto.py | 84 ++++++++++++++++++++++++++ desktop_proxy.py | 98 ++++++++++++++++++++++++++++++ gui.py | 37 ++++++++++++ ipc.py | 82 ++++++++++++++++++++++++++ log.py | 21 +++++++ native_messaging.py | 123 ++++++++++++++++++++++++++++++++++++++ storage/__init__.py | 40 +++++++++++++ storage/pin.py | 67 +++++++++++++++++++++ storage/tpm2.py | 76 ++++++++++++++++++++++++ 12 files changed, 827 insertions(+) create mode 100644 .gitignore create mode 100644 auth.py create mode 100644 bridge.py create mode 100644 crypto.py create mode 100644 desktop_proxy.py create mode 100644 gui.py create mode 100644 ipc.py create mode 100644 log.py create mode 100644 native_messaging.py create mode 100644 storage/__init__.py create mode 100644 storage/pin.py create mode 100644 storage/tpm2.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c18dd8d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__/ diff --git a/auth.py b/auth.py new file mode 100644 index 0000000..ceff4f6 --- /dev/null +++ b/auth.py @@ -0,0 +1,141 @@ +import base64 +import hashlib +import hmac +import json +import sys +import urllib.parse +import urllib.request +import uuid + +import log +from crypto import SymmetricKey, enc_string_decrypt_bytes + + +def login(email: str, password: str, server: str) -> tuple[bytes, str]: + base = server.rstrip("/") + if "bitwarden.com" in base or "bitwarden.eu" in base: + api = base.replace("vault.", "api.") + identity = base.replace("vault.", "identity.") + else: + api, identity = base + "/api", base + "/identity" + + log.info(f"prelogin {api}/accounts/prelogin") + prelogin = _json_post(f"{api}/accounts/prelogin", {"email": email}) + kdf_type = prelogin.get("kdf", prelogin.get("Kdf", 0)) + kdf_iter = prelogin.get("kdfIterations", prelogin.get("KdfIterations", 600000)) + kdf_mem = prelogin.get("kdfMemory", prelogin.get("KdfMemory", 64)) + kdf_par = prelogin.get("kdfParallelism", prelogin.get("KdfParallelism", 4)) + kdf_name = "pbkdf2" if kdf_type == 0 else "argon2id" + log.info(f"kdf: {kdf_name} iterations={kdf_iter}") + + log.info("deriving master key...") + master_key = _derive_master_key(password, email, kdf_type, kdf_iter, kdf_mem, kdf_par) + pw_hash = base64.b64encode( + hashlib.pbkdf2_hmac("sha256", master_key, password.encode(), 1, dklen=32) + ).decode() + + form = { + "grant_type": "password", "username": email, "password": pw_hash, + "scope": "api offline_access", "client_id": "connector", + "deviceType": "8", "deviceIdentifier": str(uuid.uuid4()), "deviceName": "bw-bridge", + } + + log.info(f"token {identity}/connect/token") + token_resp = _try_login(f"{identity}/connect/token", form) + + enc_user_key = _extract_encrypted_user_key(token_resp) + log.info("decrypting user key...") + stretched = _stretch(master_key) + user_key_bytes = enc_string_decrypt_bytes(enc_user_key, stretched) + user_id = _extract_user_id(token_resp.get("access_token", "")) + log.info(f"user key decrypted ({len(user_key_bytes)}B)") + + return user_key_bytes, user_id + + +def _try_login(url: str, form: dict) -> dict: + headers = { + "Content-Type": "application/x-www-form-urlencoded; charset=utf-8", + "Accept": "application/json", "Device-Type": "8", + } + try: + return _form_post(url, form, headers) + except _HttpError as e: + if "TwoFactor" not in e.body: + log.fatal(f"login failed: {e.body[:200]}") + + body = json.loads(e.body) + providers = body.get("TwoFactorProviders2", body.get("twoFactorProviders2", {})) + + if "0" not in providers and 0 not in providers: + log.fatal("2FA required but TOTP not available for this account") + + log.info("TOTP required") + code = input("TOTP code: ").strip() + form["twoFactorToken"] = code + form["twoFactorProvider"] = "0" + return _form_post(url, form, headers) + + +def _extract_encrypted_user_key(resp: dict) -> str: + udo = resp.get("UserDecryptionOptions", resp.get("userDecryptionOptions")) + if udo: + mpu = udo.get("MasterPasswordUnlock", udo.get("masterPasswordUnlock")) + if mpu: + k = mpu.get("MasterKeyEncryptedUserKey", mpu.get("masterKeyEncryptedUserKey")) + if k: + return k + k = resp.get("Key", resp.get("key")) + if k: + return k + log.fatal("no encrypted user key in server response") + + +def _extract_user_id(token: str) -> str: + try: + payload = token.split(".")[1] + payload += "=" * (4 - len(payload) % 4) + return json.loads(base64.urlsafe_b64decode(payload)).get("sub", "unknown") + except Exception: + return "unknown" + + +def _derive_master_key(pw: str, email: str, kdf: int, iters: int, mem: int, par: int) -> bytes: + salt = email.lower().strip().encode() + if kdf == 0: + return hashlib.pbkdf2_hmac("sha256", pw.encode(), salt, iters, dklen=32) + if kdf == 1: + from argon2.low_level import hash_secret_raw, Type + return hash_secret_raw( + secret=pw.encode(), salt=salt, time_cost=iters, + memory_cost=mem * 1024, parallelism=par, hash_len=32, type=Type.ID, + ) + log.fatal(f"unsupported kdf type: {kdf}") + + +def _stretch(master_key: bytes) -> SymmetricKey: + enc = hmac.new(master_key, b"enc\x01", hashlib.sha256).digest() + mac = hmac.new(master_key, b"mac\x01", hashlib.sha256).digest() + return SymmetricKey(enc + mac) + + +class _HttpError(Exception): + def __init__(self, code: int, body: str): + self.code, self.body = code, body + super().__init__(f"HTTP {code}") + + +def _json_post(url: str, data: dict) -> dict: + req = urllib.request.Request(url, json.dumps(data).encode(), + {"Content-Type": "application/json"}) + with urllib.request.urlopen(req) as r: + return json.loads(r.read()) + + +def _form_post(url: str, form: dict, headers: dict) -> dict: + req = urllib.request.Request(url, urllib.parse.urlencode(form).encode(), headers) + try: + with urllib.request.urlopen(req) as r: + return json.loads(r.read()) + except urllib.error.HTTPError as e: + raise _HttpError(e.code, e.read().decode()) from e diff --git a/bridge.py b/bridge.py new file mode 100644 index 0000000..a5be724 --- /dev/null +++ b/bridge.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 +import argparse +import getpass +import hashlib + +import log +from auth import login +from ipc import get_socket_path, serve +from native_messaging import BiometricBridge +from storage import get_backend + + +def user_hash(email: str) -> str: + return hashlib.sha256(email.lower().strip().encode()).hexdigest()[:16] + + +def main(): + p = argparse.ArgumentParser(description="Bitwarden desktop bridge agent") + p.add_argument("--email", required=True) + p.add_argument("--password") + p.add_argument("--server", default="https://vault.bitwarden.com") + p.add_argument("--backend", choices=["tpm2", "pin"]) + p.add_argument("--enroll", action="store_true", help="re-login and re-seal key") + args = p.parse_args() + + uid = user_hash(args.email) + store = get_backend(args.backend) + log.info(f"storage backend: {store.name}") + + if not store.has_key(uid) or args.enroll: + if not store.has_key(uid): + log.info(f"no sealed key found, need to enroll") + else: + log.info(f"re-enrolling (--enroll)") + + pw = args.password or getpass.getpass("master password: ") + log.info(f"logging in as {args.email} ...") + key_bytes, server_uid = login(args.email, pw, args.server) + log.info(f"authenticated, user_id={server_uid}") + + auth = getpass.getpass(f"choose {store.name} password: ") + auth2 = getpass.getpass(f"confirm: ") + if auth != auth2: + log.fatal("passwords don't match") + store.store(uid, key_bytes, auth) + log.info(f"user key sealed via {store.name}") + else: + log.info(f"sealed key ready for {args.email}") + + bridge = BiometricBridge(store, uid) + sock = get_socket_path() + log.info(f"listening on {sock}") + serve(sock, bridge) + + +if __name__ == "__main__": + main() diff --git a/crypto.py b/crypto.py new file mode 100644 index 0000000..4f0a698 --- /dev/null +++ b/crypto.py @@ -0,0 +1,84 @@ +import base64 +import hashlib +import hmac +import os + +from cryptography.hazmat.primitives import padding as sym_padding +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + + +class SymmetricKey: + def __init__(self, raw: bytes): + if len(raw) != 64: + raise ValueError(f"Expected 64 bytes, got {len(raw)}") + self.raw = raw + self.enc_key = raw[:32] + self.mac_key = raw[32:] + + def to_b64(self) -> str: + return base64.b64encode(self.raw).decode() + + @classmethod + def from_b64(cls, b64: str) -> "SymmetricKey": + return cls(base64.b64decode(b64)) + + @classmethod + def generate(cls) -> "SymmetricKey": + return cls(os.urandom(64)) + + +def enc_string_encrypt(plaintext: str, key: SymmetricKey) -> str: + iv = os.urandom(16) + cipher = Cipher(algorithms.AES(key.enc_key), modes.CBC(iv)) + padder = sym_padding.PKCS7(128).padder() + padded = padder.update(plaintext.encode()) + padder.finalize() + enc = cipher.encryptor() + ct = enc.update(padded) + enc.finalize() + mac = hmac.new(key.mac_key, iv + ct, hashlib.sha256).digest() + return f"2.{base64.b64encode(iv).decode()}|{base64.b64encode(ct).decode()}|{base64.b64encode(mac).decode()}" + + +def enc_string_decrypt(enc_str: str, key: SymmetricKey) -> str: + t, rest = enc_str.split(".", 1) + if t != "2": + raise ValueError(f"Unsupported type {t}") + parts = rest.split("|") + iv = base64.b64decode(parts[0]) + ct = base64.b64decode(parts[1]) + mac_got = base64.b64decode(parts[2]) + if not hmac.compare_digest(mac_got, hmac.new(key.mac_key, iv + ct, hashlib.sha256).digest()): + raise ValueError("MAC mismatch") + dec = Cipher(algorithms.AES(key.enc_key), modes.CBC(iv)).decryptor() + padded = dec.update(ct) + dec.finalize() + unpadder = sym_padding.PKCS7(128).unpadder() + return (unpadder.update(padded) + unpadder.finalize()).decode() + + +def enc_string_decrypt_bytes(enc_str: str, key: SymmetricKey) -> bytes: + t, rest = enc_str.split(".", 1) + parts = rest.split("|") + iv = base64.b64decode(parts[0]) + ct = base64.b64decode(parts[1]) + mac_got = base64.b64decode(parts[2]) if len(parts) > 2 else None + if mac_got and not hmac.compare_digest(mac_got, hmac.new(key.mac_key, iv + ct, hashlib.sha256).digest()): + raise ValueError("MAC mismatch") + dec = Cipher(algorithms.AES(key.enc_key), modes.CBC(iv)).decryptor() + padded = dec.update(ct) + dec.finalize() + unpadder = sym_padding.PKCS7(128).unpadder() + return unpadder.update(padded) + unpadder.finalize() + + +def enc_string_to_dict(enc_str: str) -> dict: + t, rest = enc_str.split(".", 1) + parts = rest.split("|") + d = {"encryptionType": int(t), "encryptedString": enc_str} + if len(parts) >= 1: d["iv"] = parts[0] + if len(parts) >= 2: d["data"] = parts[1] + if len(parts) >= 3: d["mac"] = parts[2] + return d + + +def dict_to_enc_string(d: dict) -> str: + if d.get("encryptedString"): + return d["encryptedString"] + return f"{d.get('encryptionType', 2)}.{d.get('iv', '')}|{d.get('data', '')}|{d.get('mac', '')}" diff --git a/desktop_proxy.py b/desktop_proxy.py new file mode 100644 index 0000000..aad3b98 --- /dev/null +++ b/desktop_proxy.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python3 +import os +import socket +import struct +import sys +import threading +from pathlib import Path + + +def ipc_socket_path() -> str: + return str(Path.home() / ".librewolf" / "s.bw") + + +def read_stdin_message() -> bytes | None: + header = sys.stdin.buffer.read(4) + if len(header) < 4: + return None + length = struct.unpack("=I", header)[0] + if length == 0 or length > 1024 * 1024: + return None + data = sys.stdin.buffer.read(length) + if len(data) < length: + return None + return data + + +def write_stdout_message(data: bytes): + header = struct.pack("=I", len(data)) + sys.stdout.buffer.write(header + data) + sys.stdout.buffer.flush() + + +def read_ipc_message(sock: socket.socket) -> bytes | None: + header = recv_exact(sock, 4) + if header is None: + return None + length = struct.unpack("=I", header)[0] + if length == 0 or length > 1024 * 1024: + return None + return recv_exact(sock, length) + + +def send_ipc_message(sock: socket.socket, data: bytes): + sock.sendall(struct.pack("=I", len(data)) + data) + + +def recv_exact(sock: socket.socket, n: int) -> bytes | None: + buf = b"" + while len(buf) < n: + chunk = sock.recv(n - len(buf)) + if not chunk: + return None + buf += chunk + return buf + + +def ipc_to_stdout(sock: socket.socket): + try: + while True: + msg = read_ipc_message(sock) + if msg is None: + break + write_stdout_message(msg) + except (ConnectionResetError, BrokenPipeError, OSError): + pass + + +def main(): + path = ipc_socket_path() + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + sock.connect(path) + except (FileNotFoundError, ConnectionRefusedError): + sys.exit(1) + + send_ipc_message(sock, b'{"command":"connected"}') + + reader = threading.Thread(target=ipc_to_stdout, args=(sock,), daemon=True) + reader.start() + + try: + while True: + msg = read_stdin_message() + if msg is None: + break + send_ipc_message(sock, msg) + except (BrokenPipeError, OSError): + pass + finally: + try: + send_ipc_message(sock, b'{"command":"disconnected"}') + except OSError: + pass + sock.close() + + +if __name__ == "__main__": + main() diff --git a/gui.py b/gui.py new file mode 100644 index 0000000..770d9b6 --- /dev/null +++ b/gui.py @@ -0,0 +1,37 @@ +import shutil +import subprocess +import sys + + +def ask_password(title: str = "Bitwarden", prompt: str = "Unlock password:") -> str | None: + if sys.platform == "darwin": + return _osascript(title, prompt) + if shutil.which("zenity"): + return _zenity(title, prompt) + return None + + +def _osascript(title: str, prompt: str) -> str | None: + script = ( + f'display dialog "{prompt}" with title "{title}" ' + f'default answer "" with hidden answer buttons {{"Cancel","OK"}} default button "OK"' + ) + r = subprocess.run( + ["osascript", "-e", script], + capture_output=True, text=True, + ) + if r.returncode != 0: + return None + for part in r.stdout.strip().split(","): + if "text returned:" in part: + return part.split("text returned:")[1].strip() + return None + + +def _zenity(title: str, prompt: str) -> str | None: + r = subprocess.run( + ["zenity", "--entry", "--hide-text", "--title", "", "--text", prompt, + "--width", "300", "--window-icon", "dialog-password"], + capture_output=True, text=True, + ) + return r.stdout.strip() or None if r.returncode == 0 else None diff --git a/ipc.py b/ipc.py new file mode 100644 index 0000000..a793aeb --- /dev/null +++ b/ipc.py @@ -0,0 +1,82 @@ +import json +import os +import socket +import struct +from pathlib import Path + +import log + + +def get_socket_path() -> Path: + cache = Path.home() / ".cache" / "com.bitwarden.desktop" + cache.mkdir(parents=True, exist_ok=True) + return cache / "s.bw" + + +def recv_exact(conn: socket.socket, n: int) -> bytes | None: + buf = b"" + while len(buf) < n: + chunk = conn.recv(n - len(buf)) + if not chunk: + return None + buf += chunk + return buf + + +def read_message(conn: socket.socket) -> dict | None: + header = recv_exact(conn, 4) + if not header: + return None + length = struct.unpack("=I", header)[0] + if length == 0 or length > 1024 * 1024: + return None + data = recv_exact(conn, length) + if not data: + return None + return json.loads(data) + + +def send_message(conn: socket.socket, msg: dict): + data = json.dumps(msg).encode() + conn.sendall(struct.pack("=I", len(data)) + data) + + +def serve(sock_path: Path, handler): + if sock_path.exists(): + sock_path.unlink() + + srv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + srv.bind(str(sock_path)) + srv.listen(5) + os.chmod(str(sock_path), 0o600) + + try: + while True: + conn, _ = srv.accept() + log.info("client connected") + _handle_conn(conn, handler) + log.info("client disconnected") + except KeyboardInterrupt: + log.info("shutting down") + finally: + srv.close() + if sock_path.exists(): + sock_path.unlink() + + +def _handle_conn(conn: socket.socket, handler): + try: + while True: + msg = read_message(conn) + if msg is None: + break + if "command" in msg and "appId" not in msg: + log.info(f"proxy: {msg.get('command')}") + continue + resp = handler(msg) + if resp is not None: + send_message(conn, resp) + except (ConnectionResetError, BrokenPipeError): + pass + finally: + conn.close() diff --git a/log.py b/log.py new file mode 100644 index 0000000..be627a2 --- /dev/null +++ b/log.py @@ -0,0 +1,21 @@ +import sys +import time + +_start = time.monotonic() + +def _ts() -> str: + elapsed = time.monotonic() - _start + return f"{elapsed:8.3f}" + +def info(msg: str): + print(f"[{_ts()}] {msg}", file=sys.stderr, flush=True) + +def warn(msg: str): + print(f"[{_ts()}] WARN {msg}", file=sys.stderr, flush=True) + +def error(msg: str): + print(f"[{_ts()}] ERROR {msg}", file=sys.stderr, flush=True) + +def fatal(msg: str): + error(msg) + sys.exit(1) diff --git a/native_messaging.py b/native_messaging.py new file mode 100644 index 0000000..f050427 --- /dev/null +++ b/native_messaging.py @@ -0,0 +1,123 @@ +import base64 +import json +import time + +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding as asym_padding + +import log +from crypto import SymmetricKey, enc_string_encrypt, enc_string_decrypt, enc_string_to_dict, dict_to_enc_string +from gui import ask_password +from storage import KeyStore + + +class BiometricBridge: + def __init__(self, store: KeyStore, user_id: str): + self._store = store + self._uid = user_id + self._sessions: dict[str, SymmetricKey] = {} + + def __call__(self, msg: dict) -> dict | None: + app_id = msg.get("appId", "") + message = msg.get("message") + if message is None: + return None + + if isinstance(message, dict) and message.get("command") == "setupEncryption": + return self._handshake(app_id, message) + + if isinstance(message, dict) and ("encryptedString" in message or "encryptionType" in message): + return self._encrypted(app_id, message) + + if isinstance(message, str) and message.startswith("2."): + return self._encrypted(app_id, {"encryptedString": message}) + + return None + + def _handshake(self, app_id: str, msg: dict) -> dict: + pub_bytes = base64.b64decode(msg.get("publicKey", "")) + pub_key = serialization.load_der_public_key(pub_bytes) + + shared = SymmetricKey.generate() + self._sessions[app_id] = shared + + encrypted = pub_key.encrypt( + shared.raw, + asym_padding.OAEP( + mgf=asym_padding.MGF1(algorithm=hashes.SHA1()), + algorithm=hashes.SHA1(), label=None, + ), + ) + + log.info(f"handshake complete, app={app_id[:12]}") + + return { + "appId": app_id, + "command": "setupEncryption", + "messageId": -1, + "sharedSecret": base64.b64encode(encrypted).decode(), + } + + def _encrypted(self, app_id: str, enc_msg: dict) -> dict | None: + if app_id not in self._sessions: + log.warn(f"no session for app={app_id[:12]}") + return {"appId": app_id, "command": "invalidateEncryption"} + + key = self._sessions[app_id] + try: + plaintext = enc_string_decrypt(dict_to_enc_string(enc_msg), key) + except Exception: + log.error("message decryption failed") + return {"appId": app_id, "command": "invalidateEncryption"} + + data = json.loads(plaintext) + cmd = data.get("command", "") + mid = data.get("messageId", 0) + + log.info(f"<- {cmd} (msg={mid})") + resp = self._dispatch(cmd, mid) + if resp is None: + log.warn(f"unhandled command: {cmd}") + return None + + encrypted = enc_string_encrypt(json.dumps(resp), key) + return {"appId": app_id, "messageId": mid, "message": enc_string_to_dict(encrypted)} + + def _dispatch(self, cmd: str, mid: int) -> dict | None: + ts = int(time.time() * 1000) + + if cmd == "unlockWithBiometricsForUser": + key_b64 = self._unseal_key() + if key_b64 is None: + log.warn("unlock denied or failed") + return {"command": cmd, "messageId": mid, "timestamp": ts, "response": False} + log.info("-> unlock granted, key delivered") + return {"command": cmd, "messageId": mid, "timestamp": ts, "response": True, "userKeyB64": key_b64} + + if cmd in ("getBiometricsStatus", "getBiometricsStatusForUser"): + log.info(f"-> biometrics available") + return {"command": cmd, "messageId": mid, "timestamp": ts, "response": 0} + + if cmd == "authenticateWithBiometrics": + log.info("-> authenticated") + return {"command": cmd, "messageId": mid, "timestamp": ts, "response": True} + + return None + + def _unseal_key(self) -> str | None: + log.info(f"requesting {self._store.name} password via GUI") + pw = ask_password("Bitwarden Unlock", f"Enter {self._store.name} password:") + if pw is None: + log.info("user cancelled dialog") + return None + try: + raw = self._store.load(self._uid, pw) + b64 = base64.b64encode(raw).decode() + log.info(f"unsealed {len(raw)}B key from {self._store.name}") + raw = None + pw = None + log.info("key material wiped from memory") + return b64 + except Exception as e: + log.error(f"unseal failed: {e}") + return None diff --git a/storage/__init__.py b/storage/__init__.py new file mode 100644 index 0000000..c174869 --- /dev/null +++ b/storage/__init__.py @@ -0,0 +1,40 @@ +from abc import ABC, abstractmethod + + +class KeyStore(ABC): + @abstractmethod + def is_available(self) -> bool: ... + + @abstractmethod + def has_key(self, user_id: str) -> bool: ... + + @abstractmethod + def store(self, user_id: str, data: bytes, auth: str) -> None: ... + + @abstractmethod + def load(self, user_id: str, auth: str) -> bytes: ... + + @abstractmethod + def remove(self, user_id: str) -> None: ... + + @property + @abstractmethod + def name(self) -> str: ... + + +def get_backend(preferred: str | None = None) -> KeyStore: + from .tpm2 import TPM2KeyStore + from .pin import PinKeyStore + + if preferred == "pin": + return PinKeyStore() + if preferred == "tpm2": + store = TPM2KeyStore() + if not store.is_available(): + raise RuntimeError("TPM2 not available") + return store + + tpm = TPM2KeyStore() + if tpm.is_available(): + return tpm + return PinKeyStore() diff --git a/storage/pin.py b/storage/pin.py new file mode 100644 index 0000000..ae0aaeb --- /dev/null +++ b/storage/pin.py @@ -0,0 +1,67 @@ +import base64 +import hashlib +import hmac +import os +from pathlib import Path + +from cryptography.hazmat.primitives import padding +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +from . import KeyStore + +STORE_DIR = Path.home() / ".cache" / "com.bitwarden.desktop" / "keys" + + +class PinKeyStore(KeyStore): + def __init__(self, store_dir: Path = STORE_DIR): + self._dir = store_dir + self._dir.mkdir(parents=True, exist_ok=True) + os.chmod(str(self._dir), 0o700) + + @property + def name(self) -> str: + return "pin" + + def _path(self, uid: str) -> Path: + return self._dir / f"{uid}.enc" + + def is_available(self) -> bool: + return True + + def has_key(self, uid: str) -> bool: + return self._path(uid).exists() + + def store(self, uid: str, data: bytes, auth: str): + salt = os.urandom(32) + enc_key, mac_key = _derive(auth, salt) + iv = os.urandom(16) + padder = padding.PKCS7(128).padder() + padded = padder.update(data) + padder.finalize() + ct = Cipher(algorithms.AES(enc_key), modes.CBC(iv)).encryptor() + encrypted = ct.update(padded) + ct.finalize() + mac = hmac.new(mac_key, salt + iv + encrypted, hashlib.sha256).digest() + blob = salt + iv + mac + encrypted + self._path(uid).write_bytes(blob) + os.chmod(str(self._path(uid)), 0o600) + + def load(self, uid: str, auth: str) -> bytes: + blob = self._path(uid).read_bytes() + salt, iv, mac_stored, ct = blob[:32], blob[32:48], blob[48:80], blob[80:] + enc_key, mac_key = _derive(auth, salt) + mac_check = hmac.new(mac_key, salt + iv + ct, hashlib.sha256).digest() + if not hmac.compare_digest(mac_stored, mac_check): + raise ValueError("Wrong PIN or corrupted data") + dec = Cipher(algorithms.AES(enc_key), modes.CBC(iv)).decryptor() + padded = dec.update(ct) + dec.finalize() + unpadder = padding.PKCS7(128).unpadder() + return unpadder.update(padded) + unpadder.finalize() + + def remove(self, uid: str): + p = self._path(uid) + if p.exists(): + p.unlink() + + +def _derive(password: str, salt: bytes) -> tuple[bytes, bytes]: + raw = hashlib.pbkdf2_hmac("sha256", password.encode(), salt, 600_000, dklen=64) + return raw[:32], raw[32:] diff --git a/storage/tpm2.py b/storage/tpm2.py new file mode 100644 index 0000000..e2b236b --- /dev/null +++ b/storage/tpm2.py @@ -0,0 +1,76 @@ +import os +import subprocess +import tempfile +from pathlib import Path + +from . import KeyStore + +STORE_DIR = Path.home() / ".cache" / "com.bitwarden.desktop" / "tpm2" + + +class TPM2KeyStore(KeyStore): + def __init__(self, store_dir: Path = STORE_DIR): + self._dir = store_dir + self._dir.mkdir(parents=True, exist_ok=True) + os.chmod(str(self._dir), 0o700) + + @property + def name(self) -> str: + return "tpm2" + + def _pub(self, uid: str) -> Path: + return self._dir / f"{uid}.pub" + + def _priv(self, uid: str) -> Path: + return self._dir / f"{uid}.priv" + + def is_available(self) -> bool: + try: + return subprocess.run( + ["tpm2_getcap", "properties-fixed"], + capture_output=True, timeout=5, + ).returncode == 0 + except (FileNotFoundError, subprocess.TimeoutExpired): + return False + + def has_key(self, uid: str) -> bool: + return self._pub(uid).exists() and self._priv(uid).exists() + + def store(self, uid: str, data: bytes, auth: str): + with tempfile.TemporaryDirectory() as tmp: + t = Path(tmp) + ctx = t / "primary.ctx" + dat = t / "data.bin" + dat.write_bytes(data) + os.chmod(str(dat), 0o600) + + _run(["tpm2_createprimary", "-C", "o", "-g", "sha256", "-G", "aes256cfb", "-c", str(ctx)]) + _run(["tpm2_create", "-C", str(ctx), "-i", str(dat), + "-u", str(self._pub(uid)), "-r", str(self._priv(uid)), "-p", auth]) + dat.write_bytes(b"\x00" * len(data)) + + os.chmod(str(self._pub(uid)), 0o600) + os.chmod(str(self._priv(uid)), 0o600) + + def load(self, uid: str, auth: str) -> bytes: + with tempfile.TemporaryDirectory() as tmp: + t = Path(tmp) + ctx = t / "primary.ctx" + loaded = t / "loaded.ctx" + + _run(["tpm2_createprimary", "-C", "o", "-g", "sha256", "-G", "aes256cfb", "-c", str(ctx)]) + _run(["tpm2_load", "-C", str(ctx), + "-u", str(self._pub(uid)), "-r", str(self._priv(uid)), "-c", str(loaded)]) + return _run(["tpm2_unseal", "-c", str(loaded), "-p", auth]).stdout + + def remove(self, uid: str): + for p in (self._pub(uid), self._priv(uid)): + if p.exists(): + p.unlink() + + +def _run(cmd: list[str]) -> subprocess.CompletedProcess: + r = subprocess.run(cmd, capture_output=True, timeout=30) + if r.returncode != 0: + raise RuntimeError(r.stderr.decode(errors="replace").strip()) + return r