Added FIDO2 Passkey support

This commit is contained in:
2025-05-13 22:05:23 +09:00
parent 167cbed1bf
commit 405a06e4e4
4 changed files with 580 additions and 47 deletions

View File

@@ -4,7 +4,9 @@ Download all files from lectures of SNU eTL.
Skips already downloaded files
+) 2FA verification code (mail) required.
Creates locally saved virtual Passkey to reuse after
First time registration required, passkey info is stored at `snu_fido.json`.
### Usage

4
etl.py
View File

@@ -59,7 +59,7 @@ def get_subpath(dir, parent = []):
def sync_etl(lecture, name = ""):
basepath = "."
basepath = "./download"
root = rget(f"{API}/v1/courses/{lecture}/folders/root")
print()
print(f"{root['id']} {root['full_name']}")
@@ -93,4 +93,4 @@ def sync_etl(lecture, name = ""):
except Exception as e:
print(f"- Failed {local_path}: {str(e)}")
if os.path.exists(local_path):
os.remove(local_path)
os.remove(local_path)

390
fido.py Normal file
View File

@@ -0,0 +1,390 @@
import json
import base64
import os
import time
import hashlib
import struct
import cbor2
from Crypto.PublicKey import ECC
from Crypto.Signature import DSS
from Crypto.Hash import SHA256
from typing import Dict, Any, Optional
import getpass
from fido2.hid import CtapHidDevice
from fido2.client import Fido2Client, UserInteraction
from fido2.webauthn import PublicKeyCredentialCreationOptions, PublicKeyCredentialDescriptor, PublicKeyCredentialRequestOptions
from fido2.utils import websafe_encode, websafe_decode
class YkFidoDevice:
def __init__(self):
devices = list(CtapHidDevice.list_devices())
if not devices:
raise Exception("No FIDO2 devices found.")
self.device = devices[0]
print(f"Using FIDO2 device: {self.device}")
class InputDataError(Exception):
def __init__(self, message="", error_code=None):
self.message = f"Input data insufficient or malformed: {message}"
self.error_code = error_code
super().__init__(self.message)
def get_client(self, origin):
class MyUserInteraction(UserInteraction):
def prompt_up(self):
print("\nPlease touch your security key...\n")
def request_pin(self, permissions, rp_id):
print(f"PIN requested for {rp_id}")
return getpass.getpass("Enter your security key's PIN: ")
client = Fido2Client(self.device, origin, user_interaction=MyUserInteraction())
return client
def create(self, create_options, origin = ""):
print("WEBAUTHN_START_REGISTER")
options = {"publicKey": create_options}
if not origin:
origin = f'https://{options["publicKey"]["rp"]["id"]}'
if not origin:
raise self.InputDataError("origin")
client = self.get_client(origin)
options["publicKey"]["challenge"] = websafe_decode(options["publicKey"]["challenge"])
options["publicKey"]["user"]["id"] = websafe_decode(options["publicKey"]["user"]["id"])
if "excludeCredentials" in options["publicKey"]:
for cred in options["publicKey"]["excludeCredentials"]:
cred["id"] = websafe_decode(cred["id"])
pk_create_options = options["publicKey"]
challenge = pk_create_options["challenge"]
rp = pk_create_options["rp"]
user = pk_create_options["user"]
pub_key_cred_params = pk_create_options["pubKeyCredParams"]
pk_options = PublicKeyCredentialCreationOptions(rp, user, challenge, pub_key_cred_params)
print(f"WEBAUTHN_MAKE_CREDENTIAL(RP={rp})")
attestation = client.make_credential(pk_options)
client_data_b64 = attestation.client_data.b64
attestation_object = attestation.attestation_object
credential = attestation.attestation_object.auth_data.credential_data
if not credential:
raise Exception()
result = {
"id": websafe_encode(credential.credential_id),
"rawId": websafe_encode(credential.credential_id),
"type": "public-key",
"response": {
"attestationObject": websafe_encode(attestation_object),
"clientDataJSON": client_data_b64
}
}
print(f"WEBAUTHN_ATTESTATION(ID={result['id']})")
return result
def get(self, get_options, origin = ""):
print("WEBAUTHN_START_AUTHENTICATION")
options = {"publicKey": get_options}
if not origin:
origin = f'https://{options["publicKey"]["rpId"]}'
if not origin:
raise self.InputDataError("origin")
client = self.get_client(origin)
options["publicKey"]["challenge"] = websafe_decode(options["publicKey"]["challenge"])
rp_id = options["publicKey"].get("rpId", "webauthn.io")
challenge = options["publicKey"]["challenge"]
if "allowCredentials" in options["publicKey"]:
for cred in options["publicKey"]["allowCredentials"]:
cred["id"] = websafe_decode(cred["id"])
allowed = [PublicKeyCredentialDescriptor(cred["type"], cred["id"])
for cred in options["publicKey"]["allowCredentials"]]
pk_options = PublicKeyCredentialRequestOptions(challenge, rp_id=rp_id, allow_credentials=allowed)
print(f"WEBAUTHN_GET_ASSERTION(RPID={rp_id})")
assertion_response = client.get_assertion(pk_options)
assertion = assertion_response.get_response(0)
if not assertion.credential_id:
raise Exception()
result = {
"id": websafe_encode(assertion.credential_id),
"rawId": websafe_encode(assertion.credential_id),
"type": "public-key",
"response": {
"authenticatorData": websafe_encode(assertion.authenticator_data),
"clientDataJSON": assertion.client_data.b64,
"signature": websafe_encode(assertion.signature),
"userHandle": websafe_encode(assertion.user_handle) if assertion.user_handle else None
}
}
print(f"WEBAUTHN_AUTHENTICATION(ID={result['id']})")
return result
class VirtualFidoDevice:
def __init__(self, file: str = "fido.json"):
self.file = file
self.credentials = {}
self._load_credentials()
class InputDataError(Exception):
def __init__(self, message="", error_code=None):
super().__init__(f"Input data insufficient or malformed: {message}")
class CredNotFoundError(Exception):
def __init__(self, message="No available credential found", error_code=None):
super().__init__(message)
def _load_credentials(self):
if os.path.exists(self.file):
try:
with open(self.file, 'r') as f:
self.credentials = json.load(f)
except FileNotFoundError:
self.credentials = {}
def _save_credentials(self):
with open(self.file, 'w') as f:
json.dump(self.credentials, f, indent=4)
def _create_authenticator_data(self, rp_id: bytes, counter: int = 0,
user_present: bool = True,
user_verified: bool = True,
credential_data: Optional[bytes] = None) -> bytes:
rp_id_hash = hashlib.sha256(rp_id).digest()
flags = 0
if user_present:
flags |= 1 << 0
if user_verified:
flags |= 1 << 2
if credential_data is not None:
flags |= 1 << 6
counter_bytes = struct.pack(">I", counter)
auth_data = rp_id_hash + bytes([flags]) + counter_bytes
if credential_data is not None:
auth_data += credential_data
return auth_data
def _get_public_key_cose(self, key) -> bytes:
x = key.pointQ.x.to_bytes(32, byteorder='big')
y = key.pointQ.y.to_bytes(32, byteorder='big')
cose_key = {1: 2, 3: -7, -1: 1, -2: x, -3: y}
return cbor2.dumps(cose_key)
def _b64url(self, d):
if isinstance(d, bytes):
return base64.urlsafe_b64encode(d).decode('utf-8').rstrip('=')
elif isinstance(d, str):
return base64.urlsafe_b64decode(d + "===")
def create(self, data: Dict[str, Any], origin: str = "") -> Dict[str, Any]:
challenge = data.get("challenge")
if isinstance(challenge, str):
challenge = self._b64url(challenge)
rp = data.get("rp", {})
user = data.get("user", {})
pub_key_params = data.get("pubKeyCredParams", [])
alg = -7
for param in pub_key_params:
if param.get('type') == 'public-key' and param.get('alg') == -7:
alg = -7
break
if not origin:
origin = data.get("origin")
if not origin:
raise self.InputDataError("origin")
rp_id = rp.get("id", "").encode()
user_id = user.get("id")
if isinstance(user_id, str):
user_id = self._b64url(user_id)
key = ECC.generate(curve='P-256')
private_key = key.export_key(format='PEM')
public_key = key.public_key().export_key(format='PEM') # noqa: F841
credential_id = os.urandom(16)
credential_id_b64 = self._b64url(credential_id)
cose_pubkey = self._get_public_key_cose(key)
cred_id_length = struct.pack(">H", len(credential_id))
aaguid = b'\x00' * 16
attested_data = aaguid + cred_id_length + credential_id + cose_pubkey
auth_data = self._create_authenticator_data(rp_id, counter=0, credential_data=attested_data)
client_data = ('{"type":"%s","challenge":"%s","origin":"%s","crossOrigin":false}'
% ("webauthn.create", self._b64url(challenge), origin)).encode()
client_data_hash = hashlib.sha256(client_data).digest()
signature_data = auth_data + client_data_hash
h = SHA256.new(signature_data)
signer = DSS.new(key, 'fips-186-3', encoding='der')
signature = signer.sign(h)
# Self Attestation
attn_fmt = "packed"
attn_stmt = {
"alg": -7,
"sig": signature
}
attn_obj = {
"fmt": attn_fmt,
"attStmt": attn_stmt,
"authData": auth_data
}
attn_cbor = cbor2.dumps(attn_obj)
self.credentials[credential_id_b64] = {
"private_key": private_key,
"rp_id": rp_id.decode(),
"user_id": self._b64url(user_id),
"user_name": user.get('displayName', ''),
"created": int(time.time()),
"counter": 0
}
self._save_credentials()
response = {
"authenticatorAttachment": "cross-platform",
"id": credential_id_b64,
"rawId": credential_id_b64,
"response": {
"attestationObject": self._b64url(attn_cbor),
"clientDataJSON": self._b64url(client_data),
"publicKey": self._b64url(cose_pubkey),
"authenticatorData": self._b64url(auth_data),
"pubKeyAlgo": str(alg),
"transports": ["usb"]
},
"type": "public-key"
}
return response
def get(self, data: Dict[str, Any], origin: str = "") -> Dict[str, Any]:
challenge = data.get("challenge")
if isinstance(challenge, str):
challenge = self._b64url(challenge)
allowed_credential = data.get("allowCredentials")
for credential in allowed_credential:
credential_id_b64 = credential["id"]
if self.credentials.get(credential_id_b64):
cred = self.credentials[credential_id_b64]
break
else:
raise self.CredNotFoundError()
rp_id = data.get("rpId", "").encode('utf-8')
if not rp_id:
raise self.InputDataError("rp_id")
if not origin:
origin = data.get("origin")
if not origin:
raise self.InputDataError("origin")
counter = cred.get("counter", 0) + 1
cred["counter"] = counter
auth_data = self._create_authenticator_data(
rp_id=rp_id,
counter=counter,
user_present=True,
user_verified=True
)
client_data = ('{"type":"%s","challenge":"%s","origin":"%s","crossOrigin":false}'
% ("webauthn.get", self._b64url(challenge), origin)).encode()
client_data_hash = hashlib.sha256(client_data).digest()
signature_data = auth_data + client_data_hash
key = ECC.import_key(cred["private_key"])
h = SHA256.new(signature_data)
signer = DSS.new(key, 'fips-186-3', encoding='der')
signature = signer.sign(h)
self._save_credentials()
response = {
"authenticatorAttachment": "cross-platform",
"id": credential_id_b64,
"rawId": credential_id_b64,
"response": {
"authenticatorData": self._b64url(auth_data),
"clientDataJSON": self._b64url(client_data),
"signature": self._b64url(signature)
},
"type": "public-key"
}
return response
if __name__=="__main__":
import requests
sess = requests.Session()
fido = VirtualFidoDevice()
payload = {
"algorithms": ["es256"], "attachment": "all", "attestation": "none", "discoverable_credential": "preferred",
"hints": [], "user_verification": "preferred", "username": "asdf"
}
resp = sess.post("https://webauthn.io/registration/options", json=payload)
print(resp.json())
data = fido.create(resp.json(), origin="https://webauthn.io")
data["rawId"] = data["id"]
print(data)
resp = sess.post("https://webauthn.io/registration/verification", json={"response": data, "username": "asdf"})
print(resp.json())
print()
sess.get("https://webauthn.io/logout")
payload = {"username":"asdf", "user_verification":"preferred", "hints":[]}
resp = sess.post("https://webauthn.io/authentication/options", json=payload, headers={"origin": "https://webauthn.io"})
print(resp.json())
data = fido.get(resp.json(), origin="https://webauthn.io")
print(data)
data["rawId"] = data["id"]
resp = sess.post("https://webauthn.io/authentication/verification", json={"response": data, "username": "asdf"})
print(resp.json())

229
login.py
View File

@@ -10,9 +10,13 @@ from base64 import b64decode
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5
from SECRET import SNU_ID, SNU_PW, SNU_NM
from fido import VirtualFidoDevice as FidoDevice
def do(sess, data):
SNU_PW = "" # Unused on Passkey
SNU_ID = ""
SNU_NM = ""
def key_init(sess):
curve = P256
n = curve.q
byte_length = (n.bit_length() + 7) // 8
@@ -22,9 +26,9 @@ def do(sess, data):
client_prikey = format(rand, 'x')
priv_key_int = int(client_prikey, 16)
G = Point(curve.gx, curve.gy, curve)
G = Point(curve.gx, curve.gy, curve)
client_pub = priv_key_int * G
client_pubkey_x = format(client_pub.x, '064x')
client_pubkey_y = format(client_pub.y, '064x')
client_pubkey = client_pubkey_x + client_pubkey_y
@@ -41,20 +45,98 @@ def do(sess, data):
server_point = Point(svr_qx_int, svr_qy_int, curve)
shared_point = server_point * priv_key_int
calkey_x = format(shared_point.x, '064x')
calkey_y = format(shared_point.y, '064x')
client_calkey = calkey_x + calkey_y
passni_key = unhexlify(client_calkey[:64])
passni_iv = unhexlify(client_calkey[64:96])
return passni_iv, passni_key
def encrypt_login(sess, data):
passni_iv, passni_key = key_init(sess)
encrypt_data = seed_cbc_encrypt(data.encode(), passni_key, passni_iv)
return encrypt_data
def sso_login(sess = None, agt_resp = None):
def sso_register_passkey(sess = None):
if not sess:
sess = requests.session()
payload = {"crtfc_type": "fido2", "lang": "ko", "return_url": "https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/view", "lnksys_id": "snu-mfa-sso"}
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/snu/regist/step", data=payload)
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/com/ajaxNextKey")
next_key = resp.text
payload = {"next_key": next_key, "sel_user_id": "", "lang": "ko", "user_id": SNU_ID, "user_name": "", "user_birth": ""}
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/snu/com/ajaxUserIdCheck", data=payload)
if not (r := resp.json()).get("result"):
raise Exception(r)
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/com/ajaxNextKey")
next_key = resp.text
payload = {"next_key": next_key, "gubun": "self", "lang": "ko", "crtfc_no": ""}
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/com/ajaxSendMail", data=payload)
if not (r := resp.json()).get("result"):
raise Exception(r)
print("Verification Code sent to mail.")
verif = input("? ")
payload = {"next_key": next_key, "gubun": "self", "lang": "ko", "crtfc_no": verif}
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/snu/com/ajaxValidCrtfcNo", data=payload)
if not (r := resp.json()).get("result"):
raise Exception(r)
payload = {"next_key": next_key, "gubun": "self", "lang": "ko", "crtfc_no": verif}
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/snu/regist/step04/fido2", data=payload)
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/com/ajaxNextKey")
next_key = resp.text
payload = {
"username": "",
"displayName": "",
"credentialNickname": "",
"authenticatorSelection": {
"requireResidentKey": False,
"authenticatorAttachment": "platform",
"userVerification":"preferred"
},
"attestation": "direct"
}
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/fido2/register", json=payload, headers={"Origin": "https://mfalogin.snu.ac.kr"})
result = resp.json()
if result.get("status") != "ok":
raise Exception(result)
fido = FidoDevice("snu_fido.json")
resp = fido.create(result, "https://mfalogin.snu.ac.kr")
data = {
"type": resp["type"],
"id": resp["id"],
"response": {
"attestationObject": resp["response"]["attestationObject"],
"clientDataJSON": resp["response"]["clientDataJSON"],
},
"clientExtensionResults": {}
}
payload = {"register_data": json.dumps(data)}
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/fido2/register/finish", data=payload, headers={"Origin": "https://mfalogin.snu.ac.kr"})
payload = {"next_key": next_key, "type": "fido2", "lang": "ko"}
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/snu/regist/step05", data=payload)
def sso_login(sess = None, agt_resp = None, auth_type = "passkey"):
if not sess:
sess = requests.session()
@@ -70,59 +152,113 @@ def sso_login(sess = None, agt_resp = None):
payload = {"agt_url": agt_url, "agt_r": agt_r, "agt_id": agt_id}
resp = sess.post("https://nsso.snu.ac.kr/sso/usr/login/link", data=payload)
login_key = resp.text.split('id="login_key" name="login_key" value="')[1].split('"')[0]
ed = do(sess, f'{"login_id":"{SNU_ID}","login_pwd":"{SNU_PW}"}')
if "login_key" in resp.text:
payload = {'user_data': ed.hex(), 'login_key': login_key}
resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/auth", data=payload)
verif_type = 'mail'
login_key = resp.text.split('id="login_key" name="login_key" value="')[1].split('"')[0]
payload = {'crtfc_type': 'mail', 'login_key': login_key}
resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/ajaxUserSend", data=payload)
if auth_type == "passkey":
key_init(sess)
payload = {'user_id': SNU_ID, 'crtfc_type': 'fido2', 'login_key': login_key}
resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/fido2/ajaxIDTokenCreate", data=payload)
result = resp.json()
if not result["result"]:
raise Exception(result)
verif = input("Verification code ? ")
id_token = result["id_token"]
payload = {'crtfc_no': verif, 'login_key': login_key, "bypass_check": "true"}
resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/ajaxUserAuthId", data=payload)
payload = {"id_token": id_token, "userVerification": "preferred"}
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/fido2/options", json=payload, headers={"origin": "https://nsso.snu.ac.kr"})
result = resp.json()
fido = FidoDevice("snu_fido.json")
try:
resp = fido.get(result, "https://nsso.snu.ac.kr")
except FidoDevice.CredNotFoundError:
sso_register_passkey()
print("Passkey created, please rerun.")
return
data = {
"type": "public-key",
"id": resp["id"],
"response": {
"authenticatorData": resp["response"]["authenticatorData"],
"clientDataJSON": resp["response"]["clientDataJSON"],
"signature": resp["response"]["signature"]
},
"clientExtensionResults": {}
}
payload = {"user_data": json.dumps(data), "id_token": id_token}
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/fido2/auth", data=payload, headers={"origin": "https://nsso.snu.ac.kr"})
if not (result := resp.json()).get("result"):
raise Exception(result)
payload = {"user_data": "", "page_lang": "", "login_key": login_key, "pwd_type":""}
resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/login/link", data=payload)
payload = {'login_key': login_key}
resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/fido2/ajaxUserAuthFido2", data=payload)
if not (result := resp.json()).get("result"):
raise Exception(result)
print(resp.json())
elif auth_type in ["sms", "main"]:
# Login
ed = encrypt_login(sess, f'{{"login_id":"{SNU_ID}","login_pwd":"{SNU_PW}"}}')
payload = {'user_data': ed.hex(), 'login_key': login_key}
resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/auth", data=payload)
# 2FA
payload = {'crtfc_type': auth_type, 'login_key': login_key}
resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/ajaxUserSend", data=payload)
verif = input("? ")
payload = {'crtfc_no': verif, 'login_key': login_key, "bypass_check": "true"}
resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/ajaxUserAuthId", data=payload)
# Login complete
payload = {"user_data": "", "page_lang": "", "login_key": login_key, "pwd_type":""}
resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/login/link", data=payload)
target = resp.text.split('name="loginForm" method="post" action="')[1].split('"')[0]
pni_login_type = resp.text.split('name="pni_login_type" value="')[1].split('"')[0]
pni_data = resp.text.split('name="pni_data" value="')[1].split('"')[0]
action = resp.text.split('name="loginForm" method="post" action="')[1].split('"')[0]
payload = {"pni_login_type": pni_login_type, "pni_data": pni_data}
resp = sess.post(action, data=payload)
resp = sess.get("https://my.snu.ac.kr/index.jsp")
return sess
payload = {"pni_login_type": pni_login_type, "pni_data": pni_data}
resp = sess.post(target, data=payload)
return resp
def save_login(sess):
cookies = [(i.name, i.value) for i in sess.cookies]
with open("sess.json", "w") as f:
json.dump(cookies, f, indent=4)
def etl_login():
sess = requests.session()
if os.path.exists("sess.json"):
with open("sess.json", "r") as f:
cookies = json.load(f)
for i in cookies:
sess.cookies.set(i[0], i[1])
else:
sso_login(sess)
save_login(sess)
resp = sess.get("https://etl.snu.ac.kr/login")
## Since we automated Passkey, its better to just log-in everytime.
# if os.path.exists("sess.json"):
# with open("sess.json", "r") as f:
# cookies = json.load(f)
# for i in cookies:
# sess.cookies.set(i[0], i[1])
#
sso = sess.get("https://etl.snu.ac.kr/passni/sso/spLogin.php")
resp = sso_login(sess, agt_resp=sso)
# save_login(sess)
if "gw-cb.php" not in resp.text:
print(resp.text)
raise Exception("Login Failed")
resp = sess.get("https://etl.snu.ac.kr/xn-sso/gw-cb.php")
if "iframe.src=" in resp.text:
cburl = resp.text.split('iframe.src="')[1].split('"')[0]
else:
print(resp.text)
sso_login(sess, agt_resp=resp)
resp = sess.get(cburl)
cpar = resp.text.split("window.loginCryption(")[1].split(")")[0]
@@ -135,7 +271,7 @@ def etl_login():
key = RSA.import_key(pk)
cipher = PKCS1_v1_5.new(key)
pt = cipher.decrypt(ct, b'')
payload = {
"utf8": "", "redirect_to_ssl": "1", "after_login_url": "",
"pseudonym_session[unique_id]": SNU_NM,
@@ -145,3 +281,8 @@ def etl_login():
resp = sess.post("https://myetl.snu.ac.kr/login/canvas", data=payload, headers={"referer": cburl})
return sess
if __name__ == "__main__":
# sso_register_passkey()
sso_login()