commit 70fba2b72e53ae24c26acb3f0bb90688b44516d5 Author: Morgan Date: Tue May 13 22:02:04 2025 +0900 Init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7c126c9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.json +__pycache__/* diff --git a/README.md b/README.md new file mode 100644 index 0000000..c239c37 --- /dev/null +++ b/README.md @@ -0,0 +1,19 @@ +# Virtual WebAuthn + +### Unsafe implementation of WebAuthn for private key transparency. + +### `fido.py` + +Virtual WebAuthn implemention of `navigator.credentials.get()` and `navigator.credentials.create()`, with self-attestation. + +### `webauthn_server.py` + +Simple FastAPI server that acts as proxy for `fido.py` and browser environment. + +Use `webauthn_server.js` in userscript.js (like TamperMonkey), WebAuthn requests will be forwarded to your local script. + + + +Private key for your Passkeys are stored in JSON file, you can backup your private key since **YOU OWN THE KEY**. + +Works on most WebAuthn websites, including Google, Microsoft. \ No newline at end of file diff --git a/fido.py b/fido.py new file mode 100644 index 0000000..8333003 --- /dev/null +++ b/fido.py @@ -0,0 +1,396 @@ +import json +import base64 +import os +import time +import hashlib +import struct +import cbor2 +from Crypto.PublicKey import ECC +from Crypto.Signature import DSS +from Crypto.Hash import SHA256 +from typing import Dict, Any, Optional + + +import getpass +from fido2.hid import CtapHidDevice +from fido2.client import Fido2Client, UserInteraction +from fido2.webauthn import PublicKeyCredentialCreationOptions, PublicKeyCredentialDescriptor, PublicKeyCredentialRequestOptions +from fido2.utils import websafe_encode, websafe_decode + + +class YkFidoDevice: + def __init__(self): + devices = list(CtapHidDevice.list_devices()) + if not devices: + raise Exception("No FIDO2 devices found.") + self.device = devices[0] + print(f"Using FIDO2 device: {self.device}") + + class InputDataError(Exception): + def __init__(self, message="", error_code=None): + self.message = f"Input data insufficient or malformed: {message}" + self.error_code = error_code + super().__init__(self.message) + + def get_client(self, origin): + class MyUserInteraction(UserInteraction): + def prompt_up(self): + print("\nPlease touch your security key...\n") + def request_pin(self, permissions, rp_id): + print(f"PIN requested for {rp_id}") + return getpass.getpass("Enter your security key's PIN: ") + + client = Fido2Client(self.device, origin, user_interaction=MyUserInteraction()) + return client + + def create(self, create_options, origin = ""): + print("WEBAUTHN_START_REGISTER") + options = {"publicKey": create_options} + + if not origin: + origin = f'https://{options["publicKey"]["rp"]["id"]}' + if not origin: + raise self.InputDataError("origin") + + client = self.get_client(origin) + + options["publicKey"]["challenge"] = websafe_decode(options["publicKey"]["challenge"]) + options["publicKey"]["user"]["id"] = websafe_decode(options["publicKey"]["user"]["id"]) + + if "excludeCredentials" in options["publicKey"]: + for cred in options["publicKey"]["excludeCredentials"]: + cred["id"] = websafe_decode(cred["id"]) + + pk_create_options = options["publicKey"] + challenge = pk_create_options["challenge"] + rp = pk_create_options["rp"] + user = pk_create_options["user"] + pub_key_cred_params = pk_create_options["pubKeyCredParams"] + + pk_options = PublicKeyCredentialCreationOptions(rp, user, challenge, pub_key_cred_params) + + print(f"WEBAUTHN_MAKE_CREDENTIAL(RP={rp})") + + attestation = client.make_credential(pk_options) + + client_data_b64 = attestation.client_data.b64 + attestation_object = attestation.attestation_object + credential = attestation.attestation_object.auth_data.credential_data + if not credential: + raise Exception() + + result = { + "id": websafe_encode(credential.credential_id), + "rawId": websafe_encode(credential.credential_id), + "type": "public-key", + "response": { + "attestationObject": websafe_encode(attestation_object), + "clientDataJSON": client_data_b64 + } + } + print(f"WEBAUTHN_ATTESTATION(ID={result['id']})") + return result + + def get(self, get_options, origin = ""): + print("WEBAUTHN_START_AUTHENTICATION") + options = {"publicKey": get_options} + + if not origin: + origin = f'https://{options["publicKey"]["rpId"]}' + if not origin: + raise self.InputDataError("origin") + + client = self.get_client(origin) + + options["publicKey"]["challenge"] = websafe_decode(options["publicKey"]["challenge"]) + + rp_id = options["publicKey"].get("rpId", "webauthn.io") + challenge = options["publicKey"]["challenge"] + + if "allowCredentials" in options["publicKey"]: + for cred in options["publicKey"]["allowCredentials"]: + cred["id"] = websafe_decode(cred["id"]) + + allowed = [PublicKeyCredentialDescriptor(cred["type"], cred["id"]) + for cred in options["publicKey"]["allowCredentials"]] + + pk_options = PublicKeyCredentialRequestOptions(challenge, rp_id=rp_id, allow_credentials=allowed) + + print(f"WEBAUTHN_GET_ASSERTION(RPID={rp_id})") + + assertion_response = client.get_assertion(pk_options) + + assertion = assertion_response.get_response(0) + if not assertion.credential_id: + raise Exception() + + result = { + "id": websafe_encode(assertion.credential_id), + "rawId": websafe_encode(assertion.credential_id), + "type": "public-key", + "response": { + "authenticatorData": websafe_encode(assertion.authenticator_data), + "clientDataJSON": assertion.client_data.b64, + "signature": websafe_encode(assertion.signature), + "userHandle": websafe_encode(assertion.user_handle) if assertion.user_handle else None + } + } + print(f"WEBAUTHN_AUTHENTICATION(ID={result['id']})") + return result + + +class VirtualFidoDevice: + def __init__(self, file: str = "fido.json"): + self.file = file + self.credentials = {} + self._load_credentials() + + class InputDataError(Exception): + def __init__(self, message="", error_code=None): + super().__init__(f"Input data insufficient or malformed: {message}") + + class CredNotFoundError(Exception): + def __init__(self, message="No available credential found", error_code=None): + super().__init__(message) + + def _load_credentials(self): + if os.path.exists(self.file): + try: + with open(self.file, 'r') as f: + self.credentials = json.load(f) + except FileNotFoundError: + self.credentials = {} + + def _save_credentials(self): + with open(self.file, 'w') as f: + json.dump(self.credentials, f, indent=4) + + def _create_authenticator_data(self, rp_id: bytes, counter: int = 0, + user_present: bool = True, + user_verified: bool = True, + credential_data: Optional[bytes] = None) -> bytes: + + rp_id_hash = hashlib.sha256(rp_id).digest() + + flags = 0 + if user_present: + flags |= 1 << 0 + if user_verified: + flags |= 1 << 2 + if credential_data is not None: + flags |= 1 << 6 + + counter_bytes = struct.pack(">I", counter) + + auth_data = rp_id_hash + bytes([flags]) + counter_bytes + + if credential_data is not None: + auth_data += credential_data + + return auth_data + + def _get_public_key_cose(self, key) -> bytes: + x = key.pointQ.x.to_bytes(32, byteorder='big') + y = key.pointQ.y.to_bytes(32, byteorder='big') + cose_key = {1: 2, 3: -7, -1: 1, -2: x, -3: y} + return cbor2.dumps(cose_key) + + def _b64url(self, d): + if isinstance(d, bytes): + return base64.urlsafe_b64encode(d).decode('utf-8').rstrip('=') + elif isinstance(d, str): + return base64.urlsafe_b64decode(d + "===") + + + def create(self, data: Dict[str, Any], origin: str = "") -> Dict[str, Any]: + challenge = data.get("challenge") + if isinstance(challenge, str): + challenge = self._b64url(challenge) + + rp = data.get("rp", {}) + user = data.get("user", {}) + + pub_key_params = data.get("pubKeyCredParams", []) + + alg = -7 + for param in pub_key_params: + if param.get('type') == 'public-key' and param.get('alg') == -7: + alg = -7 + break + + if not origin: + origin = data.get("origin") + if not origin: + raise self.InputDataError("origin") + + rp_id = rp.get("id", "").encode() + + user_id = user.get("id") + if isinstance(user_id, str): + user_id = self._b64url(user_id) + + key = ECC.generate(curve='P-256') + private_key = key.export_key(format='PEM') + public_key = key.public_key().export_key(format='PEM') # noqa: F841 + + credential_id = os.urandom(16) + credential_id_b64 = self._b64url(credential_id) + + cose_pubkey = self._get_public_key_cose(key) + + cred_id_length = struct.pack(">H", len(credential_id)) + + aaguid = b'\x00' * 16 + attested_data = aaguid + cred_id_length + credential_id + cose_pubkey + + auth_data = self._create_authenticator_data(rp_id, counter=0, credential_data=attested_data) + + client_data = ('{"type":"%s","challenge":"%s","origin":"%s","crossOrigin":false}' + % ("webauthn.create", self._b64url(challenge), origin)).encode() + client_data_hash = hashlib.sha256(client_data).digest() + + signature_data = auth_data + client_data_hash + + h = SHA256.new(signature_data) + signer = DSS.new(key, 'fips-186-3', encoding='der') + signature = signer.sign(h) + + # Self Attestation + attn_fmt = "packed" + attn_stmt = { + "alg": -7, + "sig": signature + } + + attn_obj = { + "fmt": attn_fmt, + "attStmt": attn_stmt, + "authData": auth_data + } + attn_cbor = cbor2.dumps(attn_obj) + + + self.credentials[credential_id_b64] = { + "private_key": private_key, + "rp_id": rp_id.decode(), + "user_id": self._b64url(user_id), + "user_name": user.get('displayName', ''), + "created": int(time.time()), + "counter": 0 + } + self._save_credentials() + + response = { + "authenticatorAttachment": "cross-platform", + "id": credential_id_b64, + "rawId": credential_id_b64, + "response": { + "attestationObject": self._b64url(attn_cbor), + "clientDataJSON": self._b64url(client_data), + "publicKey": self._b64url(cose_pubkey), + "authenticatorData": self._b64url(auth_data), + "pubKeyAlgo": str(alg), + "transports": ["usb"] + }, + "type": "public-key" + } + return response + + + def get(self, data: Dict[str, Any], origin: str = "") -> Dict[str, Any]: + challenge = data.get("challenge") + if isinstance(challenge, str): + challenge = self._b64url(challenge) + + rp_id = data.get("rpId", "").encode('utf-8') + if not rp_id: + raise self.InputDataError("rp_id") + + if not origin: + origin = data.get("origin") + if not origin: + raise self.InputDataError("origin") + + allowed_credential = data.get("allowCredentials") + cred = None + if allowed_credential: + for credential in allowed_credential: + credential_id_b64 = credential["id"] + if self.credentials.get(credential_id_b64): + cred = self.credentials[credential_id_b64] + break + else: + for credential_id_b64, my_credential in self.credentials.items(): + if my_credential["rp_id"] == rp_id.decode(): + cred = my_credential + break + if not cred: + raise self.CredNotFoundError() + + counter = cred.get("counter", 0) + 1 + cred["counter"] = counter + + auth_data = self._create_authenticator_data( + rp_id=rp_id, + counter=counter, + user_present=True, + user_verified=True + ) + + client_data = ('{"type":"%s","challenge":"%s","origin":"%s","crossOrigin":false}' + % ("webauthn.get", self._b64url(challenge), origin)).encode() + client_data_hash = hashlib.sha256(client_data).digest() + + signature_data = auth_data + client_data_hash + + key = ECC.import_key(cred["private_key"]) + h = SHA256.new(signature_data) + signer = DSS.new(key, 'fips-186-3', encoding='der') + signature = signer.sign(h) + + self._save_credentials() + + response = { + "authenticatorAttachment": "cross-platform", + "id": credential_id_b64, + "rawId": credential_id_b64, + "response": { + "authenticatorData": self._b64url(auth_data), + "clientDataJSON": self._b64url(client_data), + "signature": self._b64url(signature), + "userHandle": cred["user_id"] + }, + "type": "public-key" + } + return response + + +if __name__=="__main__": + import requests + + sess = requests.Session() + fido = VirtualFidoDevice() + + payload = { + "algorithms": ["es256"], "attachment": "all", "attestation": "none", "discoverable_credential": "preferred", + "hints": [], "user_verification": "preferred", "username": "asdf" + } + resp = sess.post("https://webauthn.io/registration/options", json=payload) + print(resp.json()) + data = fido.create(resp.json(), origin="https://webauthn.io") + data["rawId"] = data["id"] + print(data) + resp = sess.post("https://webauthn.io/registration/verification", json={"response": data, "username": "asdf"}) + print(resp.json()) + print() + + sess.get("https://webauthn.io/logout") + + payload = {"username":"asdf", "user_verification":"preferred", "hints":[]} + resp = sess.post("https://webauthn.io/authentication/options", json=payload, headers={"origin": "https://webauthn.io"}) + print(resp.json()) + data = fido.get(resp.json(), origin="https://webauthn.io") + print(data) + data["rawId"] = data["id"] + resp = sess.post("https://webauthn.io/authentication/verification", json={"response": data, "username": "asdf"}) + print(resp.json()) \ No newline at end of file diff --git a/webauthn_server.js b/webauthn_server.js new file mode 100644 index 0000000..0307939 --- /dev/null +++ b/webauthn_server.js @@ -0,0 +1,152 @@ +// ==UserScript== +// @name WebAuthnOffload +// @description +// @version 1.0 +// @author @morgan9e +// @include * +// @connect 127.0.0.1 +// @grant GM_xmlhttpRequest +// ==/UserScript== + +function abb64(buffer) { + const bytes = new Uint8Array(buffer); + let binary = ''; + for (let i = 0; i < bytes.byteLength; i++) { + binary += String.fromCharCode(bytes[i]); + } + return btoa(binary) + .replace(/\+/g, '-') + .replace(/\//g, '_') + .replace(/=+$/, ''); +} + +function b64ab(input) { + const binary = atob(input.replace(/-/g, '+').replace(/_/g, '/')); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < binary.length; i++) { + bytes[i] = binary.charCodeAt(i); + } + return bytes.buffer; +} + +function myFetch(url, options = {}) { + return new Promise((resolve, reject) => { + GM_xmlhttpRequest({ + method: options.method || 'GET', + url: url, + headers: options.headers || {}, + data: options.body || undefined, + responseType: options.responseType || 'json', + onload: function(response) { + const responseObj = { + ok: response.status >= 200 && response.status < 300, + status: response.status, + statusText: response.statusText, + headers: response.responseHeaders, + text: () => Promise.resolve(response.responseText), + json: () => Promise.resolve(JSON.parse(response.responseText)), + response: response + }; + resolve(responseObj); + }, + onerror: function(error) { + reject(new Error(`Request to ${url} failed`)); + } + }); + }); +} + +const origGet = navigator.credentials.get; +const origCreate = navigator.credentials.create; + +navigator.credentials.get = async function(options) { + console.log("navigator.credentials.get", options) + try { + const authOptions = {publicKey: Object.assign({}, options.publicKey)}; + console.log(authOptions); + authOptions.publicKey.challenge = abb64(authOptions.publicKey.challenge) + authOptions.publicKey.allowCredentials = authOptions.publicKey.allowCredentials.map(credential => ({ + ...credential, id: abb64(credential.id) + })); + const response = await myFetch('http://127.0.0.1:20492', { + method: 'POST', + headers: {'Content-Type': 'application/json'}, + body: JSON.stringify({ + type: "get", + data: { ...authOptions, origin: window.origin } + }) + }); + if (!response.ok) throw new Error(`server error: ${response.status}`) + const resp = await response.json() + console.log("server response:", resp) + const credential = { + id: resp.id, + type: resp.type, + rawId: b64ab(resp.rawId), + response: { + authenticatorData: b64ab(resp.response.authenticatorData), + clientDataJSON: b64ab(resp.response.clientDataJSON), + signature: b64ab(resp.response.signature) + }, + getClientExtensionResults: () => { return {} } + } + if (resp.response.userHandle) { + credential.response.userHandle = b64ab(resp.response.userHandle); + } + console.log(credential) + return credential; + } catch (error) { + console.error(`Error: ${error.message}, falling back to browser`); + let r = await origGet.call(navigator.credentials, options); + console.log(r); + return r; + } +}; + +navigator.credentials.create = async function(options) { + console.log("navigator.credentials.create", options) + try { + const authOptions = {publicKey: Object.assign({}, options.publicKey)}; + console.log(authOptions); + authOptions.publicKey.challenge = abb64(authOptions.publicKey.challenge) + authOptions.publicKey.user.id = abb64(authOptions.publicKey.user.id) + const response = await myFetch('http://127.0.0.1:20492', { + method: 'POST', + headers: {'Content-Type': 'application/json'}, + body: JSON.stringify({ + type: "create", + data: { ...authOptions, origin: window.origin } + }) + }); + if (!response.ok) throw new Error(`server error: ${response.status}`) + const resp = await response.json() + console.log("server response:", resp) + const credential = { + id: resp.id, + type: resp.type, + rawId: b64ab(resp.rawId), + response: { + attestationObject: b64ab(resp.response.attestationObject), + clientDataJSON: b64ab(resp.response.clientDataJSON), + pubKeyAlgo: resp.response.pubKeyAlgo, + publicKey: b64ab(resp.response.publicKey), + transports: resp.response.transports, + authenticatorData: b64ab(resp.response.authenticatorData), + getAuthenticatorData:() => { return b64ab(resp.response.authenticatorData) }, + getPublicKey: () => { return b64ab(resp.response.publicKey) }, + getPublicKeyAlgorithm: () => { return resp.response.pubKeyAlgo }, + getTransports: () => { return resp.response.transports } + }, + getClientExtensionResults: () => { return {} } + } + console.log(credential) + return credential; + } catch (error) { + console.error(`Error: ${error.message}, falling back to browser`); + let r = await origCreate.call(navigator.credentials, options); + console.log(r); + return r; + } +}; + +console.log("Injected WebAuthn") \ No newline at end of file diff --git a/webauthn_server.py b/webauthn_server.py new file mode 100644 index 0000000..0d53aa4 --- /dev/null +++ b/webauthn_server.py @@ -0,0 +1,54 @@ +from fastapi import FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +from typing import Dict, Any +import uvicorn +import json +from fido import VirtualFidoDevice as FidoDevice + +app = FastAPI() + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +class WebAuthnRequest(BaseModel): + type: str + data: Dict[str, Any] + +@app.post('/') +async def handle(param: WebAuthnRequest): + if param.type == "get": + try: + options = param.data.get("publicKey", {}) + print(f"webauthn.get {json.dumps(options, indent=4)}") + webauthn = FidoDevice() + assertion = webauthn.get(options, param.data.get("origin", "")) + return assertion + + except Exception as e: + import traceback + print(f"error.webauthn.get: {e}") + print(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + + elif param.type == "create": + try: + options = param.data.get("publicKey", {}) + print(f"webauthn.create {json.dumps(options, indent=4)}") + webauthn = FidoDevice() + attestation = webauthn.create(options, param.data.get("origin", "")) + return attestation + + except Exception as e: + import traceback + print(f"error.webauthn.create: {e}") + print(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + +if __name__ == "__main__": + uvicorn.run(app, host="127.0.0.1", port=20492)