diff --git a/.gitignore b/.gitignore index c18dd8d..ce2920b 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ __pycache__/ +sep_helper diff --git a/askpass.py b/askpass.py new file mode 100644 index 0000000..0526bad --- /dev/null +++ b/askpass.py @@ -0,0 +1,100 @@ +import getpass +import os +import shutil +import subprocess +import sys +from typing import Callable + +Prompter = Callable[[str], str | None] + + +def _cli() -> Prompter: + def prompt(msg: str) -> str | None: + try: + return getpass.getpass(msg + " ") + except (EOFError, KeyboardInterrupt): + return None + return prompt + + +def _osascript() -> Prompter: + def prompt(msg: str) -> str | None: + script = ( + f'display dialog "{msg}" with title "Bitwarden" ' + f'default answer "" with hidden answer buttons {{"Cancel","OK"}} default button "OK"' + ) + r = subprocess.run(["osascript", "-e", script], capture_output=True, text=True) + if r.returncode != 0: + return None + for part in r.stdout.strip().split(","): + if "text returned:" in part: + return part.split("text returned:")[1].strip() + return None + return prompt + + +def _zenity() -> Prompter: + def prompt(msg: str) -> str | None: + r = subprocess.run( + ["zenity", "--entry", "--hide-text", "--title", "", + "--text", msg, "--width", "300", "--window-icon", "dialog-password"], + capture_output=True, text=True, + ) + return r.stdout.strip() or None if r.returncode == 0 else None + return prompt + + +def _kdialog() -> Prompter: + def prompt(msg: str) -> str | None: + r = subprocess.run( + ["kdialog", "--password", msg, "--title", "Bitwarden"], + capture_output=True, text=True, + ) + return r.stdout.strip() or None if r.returncode == 0 else None + return prompt + + +def _ssh_askpass() -> Prompter: + binary = os.environ.get("SSH_ASKPASS") or shutil.which("ssh-askpass") + if not binary: + raise RuntimeError("SSH_ASKPASS not set and ssh-askpass not found") + + def prompt(msg: str) -> str | None: + r = subprocess.run([binary, msg], capture_output=True, text=True) + return r.stdout.strip() or None if r.returncode == 0 else None + return prompt + + +PROVIDERS = { + "cli": _cli, + "osascript": _osascript, + "zenity": _zenity, + "kdialog": _kdialog, + "ssh-askpass": _ssh_askpass, +} + + +def get_prompter(name: str | None = None) -> Prompter: + if name: + if name not in PROVIDERS: + raise ValueError(f"unknown provider: {name} (available: {', '.join(PROVIDERS)})") + return PROVIDERS[name]() + + if sys.platform == "darwin": + return _osascript() + for gui in ("zenity", "kdialog"): + if shutil.which(gui): + return PROVIDERS[gui]() + return _cli() + + +def available() -> list[str]: + found = ["cli"] + if sys.platform == "darwin": + found.append("osascript") + for name in ("zenity", "kdialog"): + if shutil.which(name): + found.append(name) + if shutil.which("ssh-askpass") or "SSH_ASKPASS" in os.environ: + found.append("ssh-askpass") + return found diff --git a/auth.py b/auth.py index ceff4f6..6f347d9 100644 --- a/auth.py +++ b/auth.py @@ -2,16 +2,17 @@ import base64 import hashlib import hmac import json -import sys import urllib.parse import urllib.request import uuid import log +from askpass import Prompter from crypto import SymmetricKey, enc_string_decrypt_bytes +from secmem import wipe -def login(email: str, password: str, server: str) -> tuple[bytes, str]: +def login(email: str, password: str, server: str, prompt: Prompter) -> tuple[bytearray, str]: base = server.rstrip("/") if "bitwarden.com" in base or "bitwarden.eu" in base: api = base.replace("vault.", "api.") @@ -21,17 +22,16 @@ def login(email: str, password: str, server: str) -> tuple[bytes, str]: log.info(f"prelogin {api}/accounts/prelogin") prelogin = _json_post(f"{api}/accounts/prelogin", {"email": email}) - kdf_type = prelogin.get("kdf", prelogin.get("Kdf", 0)) - kdf_iter = prelogin.get("kdfIterations", prelogin.get("KdfIterations", 600000)) - kdf_mem = prelogin.get("kdfMemory", prelogin.get("KdfMemory", 64)) - kdf_par = prelogin.get("kdfParallelism", prelogin.get("KdfParallelism", 4)) - kdf_name = "pbkdf2" if kdf_type == 0 else "argon2id" - log.info(f"kdf: {kdf_name} iterations={kdf_iter}") + kdf_type = _get(prelogin, "kdf", 0) + kdf_iter = _get(prelogin, "kdfIterations", 600000) + kdf_mem = _get(prelogin, "kdfMemory", 64) + kdf_par = _get(prelogin, "kdfParallelism", 4) + log.info(f"kdf: {'pbkdf2' if kdf_type == 0 else 'argon2id'} iterations={kdf_iter}") log.info("deriving master key...") - master_key = _derive_master_key(password, email, kdf_type, kdf_iter, kdf_mem, kdf_par) + master_key = bytearray(_derive_master_key(password, email, kdf_type, kdf_iter, kdf_mem, kdf_par)) pw_hash = base64.b64encode( - hashlib.pbkdf2_hmac("sha256", master_key, password.encode(), 1, dklen=32) + hashlib.pbkdf2_hmac("sha256", bytes(master_key), password.encode(), 1, dklen=32) ).decode() form = { @@ -41,19 +41,25 @@ def login(email: str, password: str, server: str) -> tuple[bytes, str]: } log.info(f"token {identity}/connect/token") - token_resp = _try_login(f"{identity}/connect/token", form) + token_resp = _try_login(f"{identity}/connect/token", form, prompt) enc_user_key = _extract_encrypted_user_key(token_resp) log.info("decrypting user key...") stretched = _stretch(master_key) - user_key_bytes = enc_string_decrypt_bytes(enc_user_key, stretched) + wipe(master_key) + user_key = enc_string_decrypt_bytes(enc_user_key, stretched) + stretched.close() user_id = _extract_user_id(token_resp.get("access_token", "")) - log.info(f"user key decrypted ({len(user_key_bytes)}B)") + log.info(f"user key decrypted ({len(user_key)}B)") - return user_key_bytes, user_id + return user_key, user_id -def _try_login(url: str, form: dict) -> dict: +def _get(d: dict, key: str, default=None): + return d.get(key, d.get(key[0].upper() + key[1:], default)) + + +def _try_login(url: str, form: dict, prompt: Prompter) -> dict: headers = { "Content-Type": "application/x-www-form-urlencoded; charset=utf-8", "Accept": "application/json", "Device-Type": "8", @@ -65,27 +71,29 @@ def _try_login(url: str, form: dict) -> dict: log.fatal(f"login failed: {e.body[:200]}") body = json.loads(e.body) - providers = body.get("TwoFactorProviders2", body.get("twoFactorProviders2", {})) + providers = _get(body, "twoFactorProviders2", {}) if "0" not in providers and 0 not in providers: - log.fatal("2FA required but TOTP not available for this account") + log.fatal("2FA required but TOTP not available") log.info("TOTP required") - code = input("TOTP code: ").strip() - form["twoFactorToken"] = code + code = prompt("TOTP code:") + if code is None: + log.fatal("no TOTP code provided") + form["twoFactorToken"] = code.strip() form["twoFactorProvider"] = "0" return _form_post(url, form, headers) def _extract_encrypted_user_key(resp: dict) -> str: - udo = resp.get("UserDecryptionOptions", resp.get("userDecryptionOptions")) + udo = _get(resp, "userDecryptionOptions") if udo: - mpu = udo.get("MasterPasswordUnlock", udo.get("masterPasswordUnlock")) + mpu = _get(udo, "masterPasswordUnlock") if mpu: - k = mpu.get("MasterKeyEncryptedUserKey", mpu.get("masterKeyEncryptedUserKey")) + k = _get(mpu, "masterKeyEncryptedUserKey") if k: return k - k = resp.get("Key", resp.get("key")) + k = _get(resp, "key") if k: return k log.fatal("no encrypted user key in server response") @@ -113,10 +121,10 @@ def _derive_master_key(pw: str, email: str, kdf: int, iters: int, mem: int, par: log.fatal(f"unsupported kdf type: {kdf}") -def _stretch(master_key: bytes) -> SymmetricKey: - enc = hmac.new(master_key, b"enc\x01", hashlib.sha256).digest() - mac = hmac.new(master_key, b"mac\x01", hashlib.sha256).digest() - return SymmetricKey(enc + mac) +def _stretch(master_key: bytearray) -> SymmetricKey: + enc = hmac.new(bytes(master_key), b"enc\x01", hashlib.sha256).digest() + mac = hmac.new(bytes(master_key), b"mac\x01", hashlib.sha256).digest() + return SymmetricKey(bytearray(enc + mac)) class _HttpError(Exception): diff --git a/bridge.py b/bridge.py index a5be724..7b4c18f 100644 --- a/bridge.py +++ b/bridge.py @@ -1,12 +1,13 @@ #!/usr/bin/env python3 import argparse -import getpass import hashlib import log +from askpass import get_prompter, available from auth import login from ipc import get_socket_path, serve from native_messaging import BiometricBridge +from secmem import wipe from storage import get_backend @@ -20,34 +21,49 @@ def main(): p.add_argument("--password") p.add_argument("--server", default="https://vault.bitwarden.com") p.add_argument("--backend", choices=["tpm2", "pin"]) - p.add_argument("--enroll", action="store_true", help="re-login and re-seal key") + p.add_argument("--askpass", choices=available()) + p.add_argument("--enroll", action="store_true") + p.add_argument("--remove", action="store_true") args = p.parse_args() uid = user_hash(args.email) store = get_backend(args.backend) - log.info(f"storage backend: {store.name}") + prompt = get_prompter(args.askpass) + log.info(f"backend: {store.name}") + + if args.remove: + if store.has_key(uid): + store.remove(uid) + log.info(f"key removed for {args.email}") + else: + log.info(f"no key found for {args.email}") + return if not store.has_key(uid) or args.enroll: - if not store.has_key(uid): - log.info(f"no sealed key found, need to enroll") - else: - log.info(f"re-enrolling (--enroll)") + log.info("enrolling" if not store.has_key(uid) else "re-enrolling") + pw = args.password or prompt("master password:") + if pw is None: + log.fatal("no password provided") + log.info(f"logging in as {args.email}") + key_bytes, server_uid = login(args.email, pw, args.server, prompt) + pw = None + log.info(f"authenticated, uid={server_uid}") - pw = args.password or getpass.getpass("master password: ") - log.info(f"logging in as {args.email} ...") - key_bytes, server_uid = login(args.email, pw, args.server) - log.info(f"authenticated, user_id={server_uid}") - - auth = getpass.getpass(f"choose {store.name} password: ") - auth2 = getpass.getpass(f"confirm: ") + auth = prompt(f"choose {store.name} password:") + if auth is None: + log.fatal("no password provided") + auth2 = prompt(f"confirm {store.name} password:") if auth != auth2: log.fatal("passwords don't match") - store.store(uid, key_bytes, auth) - log.info(f"user key sealed via {store.name}") + store.store(uid, bytes(key_bytes), auth) + wipe(key_bytes) + auth = None + auth2 = None + log.info(f"key sealed via {store.name}") else: - log.info(f"sealed key ready for {args.email}") + log.info(f"key ready for {args.email}") - bridge = BiometricBridge(store, uid) + bridge = BiometricBridge(store, uid, prompt) sock = get_socket_path() log.info(f"listening on {sock}") serve(sock, bridge) diff --git a/bw-agent b/bw-agent new file mode 100755 index 0000000..daaac63 --- /dev/null +++ b/bw-agent @@ -0,0 +1,8 @@ +#!/usr/bin/env python3 +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from bridge import main + +main() diff --git a/com.8bit.bitwarden.json b/com.8bit.bitwarden.json new file mode 100644 index 0000000..a5d4f4c --- /dev/null +++ b/com.8bit.bitwarden.json @@ -0,0 +1,7 @@ +{ + "name": "com.8bit.bitwarden", + "description": "Bitwarden desktop <-> browser bridge", + "path": "/Users/morgan/Projects/bitwarden-client/desktop-agent/desktop_proxy.py", + "type": "stdio", + "allowed_extensions": ["{446900e4-71c2-419f-a6a7-df9c091e268b}"] +} diff --git a/crypto.py b/crypto.py index 4f0a698..3dba726 100644 --- a/crypto.py +++ b/crypto.py @@ -6,17 +6,34 @@ import os from cryptography.hazmat.primitives import padding as sym_padding from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from secmem import SecureBuffer, wipe + class SymmetricKey: - def __init__(self, raw: bytes): + def __init__(self, raw: bytes | bytearray): if len(raw) != 64: - raise ValueError(f"Expected 64 bytes, got {len(raw)}") - self.raw = raw - self.enc_key = raw[:32] - self.mac_key = raw[32:] + raise ValueError(f"expected 64 bytes, got {len(raw)}") + self._secure = SecureBuffer(raw) + if isinstance(raw, bytearray): + wipe(raw) + + @property + def raw(self) -> bytearray: + return self._secure.raw + + @property + def enc_key(self) -> bytearray: + return self._secure.raw[:32] + + @property + def mac_key(self) -> bytearray: + return self._secure.raw[32:] def to_b64(self) -> str: - return base64.b64encode(self.raw).decode() + return base64.b64encode(bytes(self._secure)).decode() + + def close(self): + self._secure.close() @classmethod def from_b64(cls, b64: str) -> "SymmetricKey": @@ -27,58 +44,61 @@ class SymmetricKey: return cls(os.urandom(64)) +def _decrypt_raw(enc_str: str, key: SymmetricKey) -> bytearray: + t, rest = enc_str.split(".", 1) + parts = rest.split("|") + iv = base64.b64decode(parts[0]) + ct = base64.b64decode(parts[1]) + if len(parts) > 2: + mac_got = base64.b64decode(parts[2]) + expected = hmac.new(bytes(key.mac_key), iv + ct, hashlib.sha256).digest() + if not hmac.compare_digest(mac_got, expected): + raise ValueError("MAC mismatch") + dec = Cipher(algorithms.AES(bytes(key.enc_key)), modes.CBC(iv)).decryptor() + padded = dec.update(ct) + dec.finalize() + unpadder = sym_padding.PKCS7(128).unpadder() + return bytearray(unpadder.update(padded) + unpadder.finalize()) + + def enc_string_encrypt(plaintext: str, key: SymmetricKey) -> str: iv = os.urandom(16) - cipher = Cipher(algorithms.AES(key.enc_key), modes.CBC(iv)) padder = sym_padding.PKCS7(128).padder() padded = padder.update(plaintext.encode()) + padder.finalize() - enc = cipher.encryptor() - ct = enc.update(padded) + enc.finalize() - mac = hmac.new(key.mac_key, iv + ct, hashlib.sha256).digest() - return f"2.{base64.b64encode(iv).decode()}|{base64.b64encode(ct).decode()}|{base64.b64encode(mac).decode()}" + ct = Cipher(algorithms.AES(bytes(key.enc_key)), modes.CBC(iv)).encryptor() + encrypted = ct.update(padded) + ct.finalize() + mac = hmac.new(bytes(key.mac_key), iv + encrypted, hashlib.sha256).digest() + iv_b64 = base64.b64encode(iv).decode() + ct_b64 = base64.b64encode(encrypted).decode() + mac_b64 = base64.b64encode(mac).decode() + return f"2.{iv_b64}|{ct_b64}|{mac_b64}" def enc_string_decrypt(enc_str: str, key: SymmetricKey) -> str: - t, rest = enc_str.split(".", 1) - if t != "2": - raise ValueError(f"Unsupported type {t}") - parts = rest.split("|") - iv = base64.b64decode(parts[0]) - ct = base64.b64decode(parts[1]) - mac_got = base64.b64decode(parts[2]) - if not hmac.compare_digest(mac_got, hmac.new(key.mac_key, iv + ct, hashlib.sha256).digest()): - raise ValueError("MAC mismatch") - dec = Cipher(algorithms.AES(key.enc_key), modes.CBC(iv)).decryptor() - padded = dec.update(ct) + dec.finalize() - unpadder = sym_padding.PKCS7(128).unpadder() - return (unpadder.update(padded) + unpadder.finalize()).decode() + raw = _decrypt_raw(enc_str, key) + result = raw.decode() + wipe(raw) + return result -def enc_string_decrypt_bytes(enc_str: str, key: SymmetricKey) -> bytes: - t, rest = enc_str.split(".", 1) - parts = rest.split("|") - iv = base64.b64decode(parts[0]) - ct = base64.b64decode(parts[1]) - mac_got = base64.b64decode(parts[2]) if len(parts) > 2 else None - if mac_got and not hmac.compare_digest(mac_got, hmac.new(key.mac_key, iv + ct, hashlib.sha256).digest()): - raise ValueError("MAC mismatch") - dec = Cipher(algorithms.AES(key.enc_key), modes.CBC(iv)).decryptor() - padded = dec.update(ct) + dec.finalize() - unpadder = sym_padding.PKCS7(128).unpadder() - return unpadder.update(padded) + unpadder.finalize() +def enc_string_decrypt_bytes(enc_str: str, key: SymmetricKey) -> bytearray: + return _decrypt_raw(enc_str, key) def enc_string_to_dict(enc_str: str) -> dict: t, rest = enc_str.split(".", 1) parts = rest.split("|") d = {"encryptionType": int(t), "encryptedString": enc_str} - if len(parts) >= 1: d["iv"] = parts[0] - if len(parts) >= 2: d["data"] = parts[1] - if len(parts) >= 3: d["mac"] = parts[2] + if len(parts) >= 1: + d["iv"] = parts[0] + if len(parts) >= 2: + d["data"] = parts[1] + if len(parts) >= 3: + d["mac"] = parts[2] return d def dict_to_enc_string(d: dict) -> str: - if d.get("encryptedString"): - return d["encryptedString"] - return f"{d.get('encryptionType', 2)}.{d.get('iv', '')}|{d.get('data', '')}|{d.get('mac', '')}" + if s := d.get("encryptedString"): + return s + t = d.get("encryptionType", 2) + return f"{t}.{d.get('iv', '')}|{d.get('data', '')}|{d.get('mac', '')}" diff --git a/desktop_proxy.py b/desktop_proxy.py old mode 100644 new mode 100755 index aad3b98..a5a8bfd --- a/desktop_proxy.py +++ b/desktop_proxy.py @@ -1,47 +1,15 @@ -#!/usr/bin/env python3 -import os +#!/opt/homebrew/bin/python3 import socket import struct import sys import threading from pathlib import Path +MAX_MSG = 1024 * 1024 + def ipc_socket_path() -> str: - return str(Path.home() / ".librewolf" / "s.bw") - - -def read_stdin_message() -> bytes | None: - header = sys.stdin.buffer.read(4) - if len(header) < 4: - return None - length = struct.unpack("=I", header)[0] - if length == 0 or length > 1024 * 1024: - return None - data = sys.stdin.buffer.read(length) - if len(data) < length: - return None - return data - - -def write_stdout_message(data: bytes): - header = struct.pack("=I", len(data)) - sys.stdout.buffer.write(header + data) - sys.stdout.buffer.flush() - - -def read_ipc_message(sock: socket.socket) -> bytes | None: - header = recv_exact(sock, 4) - if header is None: - return None - length = struct.unpack("=I", header)[0] - if length == 0 or length > 1024 * 1024: - return None - return recv_exact(sock, length) - - -def send_ipc_message(sock: socket.socket, data: bytes): - sock.sendall(struct.pack("=I", len(data)) + data) + return str(Path.home() / ".cache" / "com.bitwarden.desktop" / "s.bw") def recv_exact(sock: socket.socket, n: int) -> bytes | None: @@ -54,41 +22,68 @@ def recv_exact(sock: socket.socket, n: int) -> bytes | None: return buf +def read_stdin() -> bytes | None: + header = sys.stdin.buffer.read(4) + if len(header) < 4: + return None + length = struct.unpack("=I", header)[0] + if length == 0 or length > MAX_MSG: + return None + data = sys.stdin.buffer.read(length) + return data if len(data) == length else None + + +def write_stdout(data: bytes): + sys.stdout.buffer.write(struct.pack("=I", len(data)) + data) + sys.stdout.buffer.flush() + + +def read_ipc(sock: socket.socket) -> bytes | None: + header = recv_exact(sock, 4) + if header is None: + return None + length = struct.unpack("=I", header)[0] + if length == 0 or length > MAX_MSG: + return None + return recv_exact(sock, length) + + +def send_ipc(sock: socket.socket, data: bytes): + sock.sendall(struct.pack("=I", len(data)) + data) + + def ipc_to_stdout(sock: socket.socket): try: while True: - msg = read_ipc_message(sock) + msg = read_ipc(sock) if msg is None: break - write_stdout_message(msg) + write_stdout(msg) except (ConnectionResetError, BrokenPipeError, OSError): pass def main(): - path = ipc_socket_path() sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: - sock.connect(path) + sock.connect(ipc_socket_path()) except (FileNotFoundError, ConnectionRefusedError): sys.exit(1) - send_ipc_message(sock, b'{"command":"connected"}') - - reader = threading.Thread(target=ipc_to_stdout, args=(sock,), daemon=True) - reader.start() + send_ipc(sock, b'{"command":"connected"}') + threading.Thread(target=ipc_to_stdout, args=(sock,), daemon=True).start() try: while True: - msg = read_stdin_message() + msg = read_stdin() if msg is None: break - send_ipc_message(sock, msg) + send_ipc(sock, msg) except (BrokenPipeError, OSError): pass finally: try: - send_ipc_message(sock, b'{"command":"disconnected"}') + send_ipc(sock, b'{"command":"disconnected"}') except OSError: pass sock.close() diff --git a/gui.py b/gui.py deleted file mode 100644 index 770d9b6..0000000 --- a/gui.py +++ /dev/null @@ -1,37 +0,0 @@ -import shutil -import subprocess -import sys - - -def ask_password(title: str = "Bitwarden", prompt: str = "Unlock password:") -> str | None: - if sys.platform == "darwin": - return _osascript(title, prompt) - if shutil.which("zenity"): - return _zenity(title, prompt) - return None - - -def _osascript(title: str, prompt: str) -> str | None: - script = ( - f'display dialog "{prompt}" with title "{title}" ' - f'default answer "" with hidden answer buttons {{"Cancel","OK"}} default button "OK"' - ) - r = subprocess.run( - ["osascript", "-e", script], - capture_output=True, text=True, - ) - if r.returncode != 0: - return None - for part in r.stdout.strip().split(","): - if "text returned:" in part: - return part.split("text returned:")[1].strip() - return None - - -def _zenity(title: str, prompt: str) -> str | None: - r = subprocess.run( - ["zenity", "--entry", "--hide-text", "--title", "", "--text", prompt, - "--width", "300", "--window-icon", "dialog-password"], - capture_output=True, text=True, - ) - return r.stdout.strip() or None if r.returncode == 0 else None diff --git a/ipc.py b/ipc.py index a793aeb..efe7d45 100644 --- a/ipc.py +++ b/ipc.py @@ -6,6 +6,8 @@ from pathlib import Path import log +MAX_MSG = 1024 * 1024 + def get_socket_path() -> Path: cache = Path.home() / ".cache" / "com.bitwarden.desktop" @@ -28,7 +30,7 @@ def read_message(conn: socket.socket) -> dict | None: if not header: return None length = struct.unpack("=I", header)[0] - if length == 0 or length > 1024 * 1024: + if length == 0 or length > MAX_MSG: return None data = recv_exact(conn, length) if not data: diff --git a/log.py b/log.py index be627a2..623317f 100644 --- a/log.py +++ b/log.py @@ -3,19 +3,23 @@ import time _start = time.monotonic() + def _ts() -> str: - elapsed = time.monotonic() - _start - return f"{elapsed:8.3f}" + return f"{time.monotonic() - _start:8.3f}" + def info(msg: str): print(f"[{_ts()}] {msg}", file=sys.stderr, flush=True) + def warn(msg: str): print(f"[{_ts()}] WARN {msg}", file=sys.stderr, flush=True) + def error(msg: str): print(f"[{_ts()}] ERROR {msg}", file=sys.stderr, flush=True) + def fatal(msg: str): error(msg) sys.exit(1) diff --git a/native_messaging.py b/native_messaging.py index f050427..a9b917a 100644 --- a/native_messaging.py +++ b/native_messaging.py @@ -6,15 +6,17 @@ from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding as asym_padding import log +from askpass import Prompter from crypto import SymmetricKey, enc_string_encrypt, enc_string_decrypt, enc_string_to_dict, dict_to_enc_string -from gui import ask_password +from secmem import wipe from storage import KeyStore class BiometricBridge: - def __init__(self, store: KeyStore, user_id: str): + def __init__(self, store: KeyStore, user_id: str, prompter: Prompter): self._store = store self._uid = user_id + self._prompt = prompter self._sessions: dict[str, SymmetricKey] = {} def __call__(self, msg: dict) -> dict | None: @@ -42,7 +44,7 @@ class BiometricBridge: self._sessions[app_id] = shared encrypted = pub_key.encrypt( - shared.raw, + bytes(shared.raw), asym_padding.OAEP( mgf=asym_padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None, @@ -50,7 +52,6 @@ class BiometricBridge: ) log.info(f"handshake complete, app={app_id[:12]}") - return { "appId": app_id, "command": "setupEncryption", @@ -83,40 +84,51 @@ class BiometricBridge: encrypted = enc_string_encrypt(json.dumps(resp), key) return {"appId": app_id, "messageId": mid, "message": enc_string_to_dict(encrypted)} + def _reply(self, cmd: str, mid: int, **kwargs) -> dict: + return {"command": cmd, "messageId": mid, "timestamp": int(time.time() * 1000), **kwargs} + def _dispatch(self, cmd: str, mid: int) -> dict | None: - ts = int(time.time() * 1000) + handlers = { + "unlockWithBiometricsForUser": self._handle_unlock, + "getBiometricsStatus": self._handle_status, + "getBiometricsStatusForUser": self._handle_status, + "authenticateWithBiometrics": self._handle_auth, + } + handler = handlers.get(cmd) + if handler is None: + return None + return handler(cmd, mid) - if cmd == "unlockWithBiometricsForUser": - key_b64 = self._unseal_key() - if key_b64 is None: - log.warn("unlock denied or failed") - return {"command": cmd, "messageId": mid, "timestamp": ts, "response": False} - log.info("-> unlock granted, key delivered") - return {"command": cmd, "messageId": mid, "timestamp": ts, "response": True, "userKeyB64": key_b64} + def _handle_unlock(self, cmd: str, mid: int) -> dict: + key_b64 = self._unseal_key() + if key_b64 is None: + log.warn("unlock denied or failed") + return self._reply(cmd, mid, response=False) + log.info("-> unlock granted") + resp = self._reply(cmd, mid, response=True, userKeyB64=key_b64) + key_b64 = None + return resp - if cmd in ("getBiometricsStatus", "getBiometricsStatusForUser"): - log.info(f"-> biometrics available") - return {"command": cmd, "messageId": mid, "timestamp": ts, "response": 0} + def _handle_status(self, cmd: str, mid: int) -> dict: + log.info("-> biometrics available") + return self._reply(cmd, mid, response=0) - if cmd == "authenticateWithBiometrics": - log.info("-> authenticated") - return {"command": cmd, "messageId": mid, "timestamp": ts, "response": True} - - return None + def _handle_auth(self, cmd: str, mid: int) -> dict: + log.info("-> authenticated") + return self._reply(cmd, mid, response=True) def _unseal_key(self) -> str | None: - log.info(f"requesting {self._store.name} password via GUI") - pw = ask_password("Bitwarden Unlock", f"Enter {self._store.name} password:") + pw = self._prompt(f"Enter {self._store.name} password:") if pw is None: - log.info("user cancelled dialog") + log.info("cancelled") return None try: raw = self._store.load(self._uid, pw) - b64 = base64.b64encode(raw).decode() - log.info(f"unsealed {len(raw)}B key from {self._store.name}") - raw = None pw = None - log.info("key material wiped from memory") + b64 = base64.b64encode(bytes(raw)).decode() + if isinstance(raw, bytearray): + wipe(raw) + log.info(f"unsealed {len(raw)}B from {self._store.name}") return b64 except Exception as e: log.error(f"unseal failed: {e}") diff --git a/secmem.py b/secmem.py new file mode 100644 index 0000000..e0f1c89 --- /dev/null +++ b/secmem.py @@ -0,0 +1,61 @@ +import ctypes +import ctypes.util +import sys + +if sys.platform == "darwin": + _libc = ctypes.CDLL("libSystem.B.dylib") +else: + _libc = ctypes.CDLL(ctypes.util.find_library("c")) + +_mlock = _libc.mlock +_mlock.argtypes = [ctypes.c_void_p, ctypes.c_size_t] +_mlock.restype = ctypes.c_int + +_munlock = _libc.munlock +_munlock.argtypes = [ctypes.c_void_p, ctypes.c_size_t] +_munlock.restype = ctypes.c_int + + +def _addr(buf: bytearray) -> int: + return ctypes.addressof((ctypes.c_char * len(buf)).from_buffer(buf)) + + +def mlock(buf: bytearray): + if len(buf) > 0: + _mlock(_addr(buf), len(buf)) + + +def munlock(buf: bytearray): + if len(buf) > 0: + _munlock(_addr(buf), len(buf)) + + +def wipe(buf: bytearray): + for i in range(len(buf)): + buf[i] = 0 + + +class SecureBuffer: + __slots__ = ("_buf",) + + def __init__(self, data: bytes | bytearray): + self._buf = bytearray(data) + mlock(self._buf) + + @property + def raw(self) -> bytearray: + return self._buf + + def __len__(self): + return len(self._buf) + + def __bytes__(self): + return bytes(self._buf) + + def __del__(self): + self.close() + + def close(self): + if self._buf: + wipe(self._buf) + munlock(self._buf) diff --git a/storage/pin.py b/storage/pin.py index ae0aaeb..9c4f3e1 100644 --- a/storage/pin.py +++ b/storage/pin.py @@ -1,16 +1,19 @@ -import base64 import hashlib -import hmac import os from pathlib import Path -from cryptography.hazmat.primitives import padding -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.ciphers.aead import AESGCM from . import KeyStore +VERSION = 1 STORE_DIR = Path.home() / ".cache" / "com.bitwarden.desktop" / "keys" +SCRYPT_N = 2**17 +SCRYPT_R = 8 +SCRYPT_P = 1 +SCRYPT_MAXMEM = 256 * 1024 * 1024 + class PinKeyStore(KeyStore): def __init__(self, store_dir: Path = STORE_DIR): @@ -33,28 +36,21 @@ class PinKeyStore(KeyStore): def store(self, uid: str, data: bytes, auth: str): salt = os.urandom(32) - enc_key, mac_key = _derive(auth, salt) - iv = os.urandom(16) - padder = padding.PKCS7(128).padder() - padded = padder.update(data) + padder.finalize() - ct = Cipher(algorithms.AES(enc_key), modes.CBC(iv)).encryptor() - encrypted = ct.update(padded) + ct.finalize() - mac = hmac.new(mac_key, salt + iv + encrypted, hashlib.sha256).digest() - blob = salt + iv + mac + encrypted + key = _derive(auth, salt) + nonce = os.urandom(12) + ct = AESGCM(key).encrypt(nonce, data, salt) + blob = bytes([VERSION]) + salt + nonce + ct self._path(uid).write_bytes(blob) os.chmod(str(self._path(uid)), 0o600) def load(self, uid: str, auth: str) -> bytes: blob = self._path(uid).read_bytes() - salt, iv, mac_stored, ct = blob[:32], blob[32:48], blob[48:80], blob[80:] - enc_key, mac_key = _derive(auth, salt) - mac_check = hmac.new(mac_key, salt + iv + ct, hashlib.sha256).digest() - if not hmac.compare_digest(mac_stored, mac_check): - raise ValueError("Wrong PIN or corrupted data") - dec = Cipher(algorithms.AES(enc_key), modes.CBC(iv)).decryptor() - padded = dec.update(ct) + dec.finalize() - unpadder = padding.PKCS7(128).unpadder() - return unpadder.update(padded) + unpadder.finalize() + _ver, salt, nonce, ct = blob[0], blob[1:33], blob[33:45], blob[45:] + key = _derive(auth, salt) + try: + return AESGCM(key).decrypt(nonce, ct, salt) + except Exception: + raise ValueError("wrong password or corrupted data") def remove(self, uid: str): p = self._path(uid) @@ -62,6 +58,8 @@ class PinKeyStore(KeyStore): p.unlink() -def _derive(password: str, salt: bytes) -> tuple[bytes, bytes]: - raw = hashlib.pbkdf2_hmac("sha256", password.encode(), salt, 600_000, dklen=64) - return raw[:32], raw[32:] +def _derive(password: str, salt: bytes) -> bytes: + return hashlib.scrypt( + password.encode(), salt=salt, + n=SCRYPT_N, r=SCRYPT_R, p=SCRYPT_P, dklen=32, maxmem=SCRYPT_MAXMEM, + )