diff --git a/README.md b/README.md index a10533f..e7a2f46 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,9 @@ Download all files from lectures of SNU eTL. Skips already downloaded files -+) 2FA verification code (mail) required. +Creates locally saved virtual Passkey to reuse after + +First time registration required, passkey info is stored at `snu_fido.json`. ### Usage diff --git a/etl.py b/etl.py index 7fe20f6..990415e 100644 --- a/etl.py +++ b/etl.py @@ -59,7 +59,7 @@ def get_subpath(dir, parent = []): def sync_etl(lecture, name = ""): - basepath = "." + basepath = "./download" root = rget(f"{API}/v1/courses/{lecture}/folders/root") print() print(f"{root['id']} {root['full_name']}") @@ -93,4 +93,4 @@ def sync_etl(lecture, name = ""): except Exception as e: print(f"- Failed {local_path}: {str(e)}") if os.path.exists(local_path): - os.remove(local_path) \ No newline at end of file + os.remove(local_path) diff --git a/fido.py b/fido.py new file mode 100644 index 0000000..a54e18c --- /dev/null +++ b/fido.py @@ -0,0 +1,390 @@ +import json +import base64 +import os +import time +import hashlib +import struct +import cbor2 +from Crypto.PublicKey import ECC +from Crypto.Signature import DSS +from Crypto.Hash import SHA256 +from typing import Dict, Any, Optional + + +import getpass +from fido2.hid import CtapHidDevice +from fido2.client import Fido2Client, UserInteraction +from fido2.webauthn import PublicKeyCredentialCreationOptions, PublicKeyCredentialDescriptor, PublicKeyCredentialRequestOptions +from fido2.utils import websafe_encode, websafe_decode + + +class YkFidoDevice: + def __init__(self): + devices = list(CtapHidDevice.list_devices()) + if not devices: + raise Exception("No FIDO2 devices found.") + self.device = devices[0] + print(f"Using FIDO2 device: {self.device}") + + class InputDataError(Exception): + def __init__(self, message="", error_code=None): + self.message = f"Input data insufficient or malformed: {message}" + self.error_code = error_code + super().__init__(self.message) + + def get_client(self, origin): + class MyUserInteraction(UserInteraction): + def prompt_up(self): + print("\nPlease touch your security key...\n") + def request_pin(self, permissions, rp_id): + print(f"PIN requested for {rp_id}") + return getpass.getpass("Enter your security key's PIN: ") + + client = Fido2Client(self.device, origin, user_interaction=MyUserInteraction()) + return client + + def create(self, create_options, origin = ""): + print("WEBAUTHN_START_REGISTER") + options = {"publicKey": create_options} + + if not origin: + origin = f'https://{options["publicKey"]["rp"]["id"]}' + if not origin: + raise self.InputDataError("origin") + + client = self.get_client(origin) + + options["publicKey"]["challenge"] = websafe_decode(options["publicKey"]["challenge"]) + options["publicKey"]["user"]["id"] = websafe_decode(options["publicKey"]["user"]["id"]) + + if "excludeCredentials" in options["publicKey"]: + for cred in options["publicKey"]["excludeCredentials"]: + cred["id"] = websafe_decode(cred["id"]) + + pk_create_options = options["publicKey"] + challenge = pk_create_options["challenge"] + rp = pk_create_options["rp"] + user = pk_create_options["user"] + pub_key_cred_params = pk_create_options["pubKeyCredParams"] + + pk_options = PublicKeyCredentialCreationOptions(rp, user, challenge, pub_key_cred_params) + + print(f"WEBAUTHN_MAKE_CREDENTIAL(RP={rp})") + + attestation = client.make_credential(pk_options) + + client_data_b64 = attestation.client_data.b64 + attestation_object = attestation.attestation_object + credential = attestation.attestation_object.auth_data.credential_data + if not credential: + raise Exception() + + result = { + "id": websafe_encode(credential.credential_id), + "rawId": websafe_encode(credential.credential_id), + "type": "public-key", + "response": { + "attestationObject": websafe_encode(attestation_object), + "clientDataJSON": client_data_b64 + } + } + print(f"WEBAUTHN_ATTESTATION(ID={result['id']})") + return result + + def get(self, get_options, origin = ""): + print("WEBAUTHN_START_AUTHENTICATION") + options = {"publicKey": get_options} + + if not origin: + origin = f'https://{options["publicKey"]["rpId"]}' + if not origin: + raise self.InputDataError("origin") + + client = self.get_client(origin) + + options["publicKey"]["challenge"] = websafe_decode(options["publicKey"]["challenge"]) + + rp_id = options["publicKey"].get("rpId", "webauthn.io") + challenge = options["publicKey"]["challenge"] + + if "allowCredentials" in options["publicKey"]: + for cred in options["publicKey"]["allowCredentials"]: + cred["id"] = websafe_decode(cred["id"]) + + allowed = [PublicKeyCredentialDescriptor(cred["type"], cred["id"]) + for cred in options["publicKey"]["allowCredentials"]] + + pk_options = PublicKeyCredentialRequestOptions(challenge, rp_id=rp_id, allow_credentials=allowed) + + print(f"WEBAUTHN_GET_ASSERTION(RPID={rp_id})") + + assertion_response = client.get_assertion(pk_options) + + assertion = assertion_response.get_response(0) + if not assertion.credential_id: + raise Exception() + + result = { + "id": websafe_encode(assertion.credential_id), + "rawId": websafe_encode(assertion.credential_id), + "type": "public-key", + "response": { + "authenticatorData": websafe_encode(assertion.authenticator_data), + "clientDataJSON": assertion.client_data.b64, + "signature": websafe_encode(assertion.signature), + "userHandle": websafe_encode(assertion.user_handle) if assertion.user_handle else None + } + } + print(f"WEBAUTHN_AUTHENTICATION(ID={result['id']})") + return result + + +class VirtualFidoDevice: + def __init__(self, file: str = "fido.json"): + self.file = file + self.credentials = {} + self._load_credentials() + + class InputDataError(Exception): + def __init__(self, message="", error_code=None): + super().__init__(f"Input data insufficient or malformed: {message}") + + class CredNotFoundError(Exception): + def __init__(self, message="No available credential found", error_code=None): + super().__init__(message) + + def _load_credentials(self): + if os.path.exists(self.file): + try: + with open(self.file, 'r') as f: + self.credentials = json.load(f) + except FileNotFoundError: + self.credentials = {} + + def _save_credentials(self): + with open(self.file, 'w') as f: + json.dump(self.credentials, f, indent=4) + + def _create_authenticator_data(self, rp_id: bytes, counter: int = 0, + user_present: bool = True, + user_verified: bool = True, + credential_data: Optional[bytes] = None) -> bytes: + + rp_id_hash = hashlib.sha256(rp_id).digest() + + flags = 0 + if user_present: + flags |= 1 << 0 + if user_verified: + flags |= 1 << 2 + if credential_data is not None: + flags |= 1 << 6 + + counter_bytes = struct.pack(">I", counter) + + auth_data = rp_id_hash + bytes([flags]) + counter_bytes + + if credential_data is not None: + auth_data += credential_data + + return auth_data + + def _get_public_key_cose(self, key) -> bytes: + x = key.pointQ.x.to_bytes(32, byteorder='big') + y = key.pointQ.y.to_bytes(32, byteorder='big') + cose_key = {1: 2, 3: -7, -1: 1, -2: x, -3: y} + return cbor2.dumps(cose_key) + + def _b64url(self, d): + if isinstance(d, bytes): + return base64.urlsafe_b64encode(d).decode('utf-8').rstrip('=') + elif isinstance(d, str): + return base64.urlsafe_b64decode(d + "===") + + + def create(self, data: Dict[str, Any], origin: str = "") -> Dict[str, Any]: + challenge = data.get("challenge") + if isinstance(challenge, str): + challenge = self._b64url(challenge) + + rp = data.get("rp", {}) + user = data.get("user", {}) + + pub_key_params = data.get("pubKeyCredParams", []) + + alg = -7 + for param in pub_key_params: + if param.get('type') == 'public-key' and param.get('alg') == -7: + alg = -7 + break + + if not origin: + origin = data.get("origin") + if not origin: + raise self.InputDataError("origin") + + rp_id = rp.get("id", "").encode() + + user_id = user.get("id") + if isinstance(user_id, str): + user_id = self._b64url(user_id) + + key = ECC.generate(curve='P-256') + private_key = key.export_key(format='PEM') + public_key = key.public_key().export_key(format='PEM') # noqa: F841 + + credential_id = os.urandom(16) + credential_id_b64 = self._b64url(credential_id) + + cose_pubkey = self._get_public_key_cose(key) + + cred_id_length = struct.pack(">H", len(credential_id)) + + aaguid = b'\x00' * 16 + attested_data = aaguid + cred_id_length + credential_id + cose_pubkey + + auth_data = self._create_authenticator_data(rp_id, counter=0, credential_data=attested_data) + + client_data = ('{"type":"%s","challenge":"%s","origin":"%s","crossOrigin":false}' + % ("webauthn.create", self._b64url(challenge), origin)).encode() + client_data_hash = hashlib.sha256(client_data).digest() + + signature_data = auth_data + client_data_hash + + h = SHA256.new(signature_data) + signer = DSS.new(key, 'fips-186-3', encoding='der') + signature = signer.sign(h) + + # Self Attestation + attn_fmt = "packed" + attn_stmt = { + "alg": -7, + "sig": signature + } + + attn_obj = { + "fmt": attn_fmt, + "attStmt": attn_stmt, + "authData": auth_data + } + attn_cbor = cbor2.dumps(attn_obj) + + + self.credentials[credential_id_b64] = { + "private_key": private_key, + "rp_id": rp_id.decode(), + "user_id": self._b64url(user_id), + "user_name": user.get('displayName', ''), + "created": int(time.time()), + "counter": 0 + } + self._save_credentials() + + response = { + "authenticatorAttachment": "cross-platform", + "id": credential_id_b64, + "rawId": credential_id_b64, + "response": { + "attestationObject": self._b64url(attn_cbor), + "clientDataJSON": self._b64url(client_data), + "publicKey": self._b64url(cose_pubkey), + "authenticatorData": self._b64url(auth_data), + "pubKeyAlgo": str(alg), + "transports": ["usb"] + }, + "type": "public-key" + } + return response + + + def get(self, data: Dict[str, Any], origin: str = "") -> Dict[str, Any]: + + challenge = data.get("challenge") + if isinstance(challenge, str): + challenge = self._b64url(challenge) + + allowed_credential = data.get("allowCredentials") + + for credential in allowed_credential: + credential_id_b64 = credential["id"] + if self.credentials.get(credential_id_b64): + cred = self.credentials[credential_id_b64] + break + else: + raise self.CredNotFoundError() + + rp_id = data.get("rpId", "").encode('utf-8') + if not rp_id: + raise self.InputDataError("rp_id") + + if not origin: + origin = data.get("origin") + if not origin: + raise self.InputDataError("origin") + + counter = cred.get("counter", 0) + 1 + cred["counter"] = counter + + auth_data = self._create_authenticator_data( + rp_id=rp_id, + counter=counter, + user_present=True, + user_verified=True + ) + + client_data = ('{"type":"%s","challenge":"%s","origin":"%s","crossOrigin":false}' + % ("webauthn.get", self._b64url(challenge), origin)).encode() + client_data_hash = hashlib.sha256(client_data).digest() + + signature_data = auth_data + client_data_hash + + key = ECC.import_key(cred["private_key"]) + h = SHA256.new(signature_data) + signer = DSS.new(key, 'fips-186-3', encoding='der') + signature = signer.sign(h) + + self._save_credentials() + + response = { + "authenticatorAttachment": "cross-platform", + "id": credential_id_b64, + "rawId": credential_id_b64, + "response": { + "authenticatorData": self._b64url(auth_data), + "clientDataJSON": self._b64url(client_data), + "signature": self._b64url(signature) + }, + "type": "public-key" + } + return response + + +if __name__=="__main__": + import requests + + sess = requests.Session() + fido = VirtualFidoDevice() + + payload = { + "algorithms": ["es256"], "attachment": "all", "attestation": "none", "discoverable_credential": "preferred", + "hints": [], "user_verification": "preferred", "username": "asdf" + } + resp = sess.post("https://webauthn.io/registration/options", json=payload) + print(resp.json()) + data = fido.create(resp.json(), origin="https://webauthn.io") + data["rawId"] = data["id"] + print(data) + resp = sess.post("https://webauthn.io/registration/verification", json={"response": data, "username": "asdf"}) + print(resp.json()) + print() + + sess.get("https://webauthn.io/logout") + + payload = {"username":"asdf", "user_verification":"preferred", "hints":[]} + resp = sess.post("https://webauthn.io/authentication/options", json=payload, headers={"origin": "https://webauthn.io"}) + print(resp.json()) + data = fido.get(resp.json(), origin="https://webauthn.io") + print(data) + data["rawId"] = data["id"] + resp = sess.post("https://webauthn.io/authentication/verification", json={"response": data, "username": "asdf"}) + print(resp.json()) \ No newline at end of file diff --git a/login.py b/login.py index ccb415a..89230d9 100644 --- a/login.py +++ b/login.py @@ -10,9 +10,13 @@ from base64 import b64decode from Crypto.PublicKey import RSA from Crypto.Cipher import PKCS1_v1_5 -from SECRET import SNU_ID, SNU_PW, SNU_NM +from fido import VirtualFidoDevice as FidoDevice -def do(sess, data): +SNU_PW = "" # Unused on Passkey +SNU_ID = "" +SNU_NM = "" + +def key_init(sess): curve = P256 n = curve.q byte_length = (n.bit_length() + 7) // 8 @@ -22,9 +26,9 @@ def do(sess, data): client_prikey = format(rand, 'x') priv_key_int = int(client_prikey, 16) - G = Point(curve.gx, curve.gy, curve) + G = Point(curve.gx, curve.gy, curve) client_pub = priv_key_int * G - + client_pubkey_x = format(client_pub.x, '064x') client_pubkey_y = format(client_pub.y, '064x') client_pubkey = client_pubkey_x + client_pubkey_y @@ -41,20 +45,98 @@ def do(sess, data): server_point = Point(svr_qx_int, svr_qy_int, curve) shared_point = server_point * priv_key_int - + calkey_x = format(shared_point.x, '064x') calkey_y = format(shared_point.y, '064x') - + client_calkey = calkey_x + calkey_y - + passni_key = unhexlify(client_calkey[:64]) passni_iv = unhexlify(client_calkey[64:96]) - + + return passni_iv, passni_key + + +def encrypt_login(sess, data): + passni_iv, passni_key = key_init(sess) encrypt_data = seed_cbc_encrypt(data.encode(), passni_key, passni_iv) - return encrypt_data -def sso_login(sess = None, agt_resp = None): + +def sso_register_passkey(sess = None): + if not sess: + sess = requests.session() + + payload = {"crtfc_type": "fido2", "lang": "ko", "return_url": "https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/view", "lnksys_id": "snu-mfa-sso"} + resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/snu/regist/step", data=payload) + + resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/com/ajaxNextKey") + next_key = resp.text + + payload = {"next_key": next_key, "sel_user_id": "", "lang": "ko", "user_id": SNU_ID, "user_name": "", "user_birth": ""} + resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/snu/com/ajaxUserIdCheck", data=payload) + if not (r := resp.json()).get("result"): + raise Exception(r) + + resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/com/ajaxNextKey") + next_key = resp.text + + payload = {"next_key": next_key, "gubun": "self", "lang": "ko", "crtfc_no": ""} + resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/com/ajaxSendMail", data=payload) + if not (r := resp.json()).get("result"): + raise Exception(r) + + print("Verification Code sent to mail.") + verif = input("? ") + + payload = {"next_key": next_key, "gubun": "self", "lang": "ko", "crtfc_no": verif} + resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/snu/com/ajaxValidCrtfcNo", data=payload) + if not (r := resp.json()).get("result"): + raise Exception(r) + + payload = {"next_key": next_key, "gubun": "self", "lang": "ko", "crtfc_no": verif} + resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/snu/regist/step04/fido2", data=payload) + + resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/com/ajaxNextKey") + next_key = resp.text + + payload = { + "username": "", + "displayName": "", + "credentialNickname": "", + "authenticatorSelection": { + "requireResidentKey": False, + "authenticatorAttachment": "platform", + "userVerification":"preferred" + }, + "attestation": "direct" + } + + resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/fido2/register", json=payload, headers={"Origin": "https://mfalogin.snu.ac.kr"}) + result = resp.json() + if result.get("status") != "ok": + raise Exception(result) + + fido = FidoDevice("snu_fido.json") + resp = fido.create(result, "https://mfalogin.snu.ac.kr") + + data = { + "type": resp["type"], + "id": resp["id"], + "response": { + "attestationObject": resp["response"]["attestationObject"], + "clientDataJSON": resp["response"]["clientDataJSON"], + }, + "clientExtensionResults": {} + } + payload = {"register_data": json.dumps(data)} + resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/fido2/register/finish", data=payload, headers={"Origin": "https://mfalogin.snu.ac.kr"}) + + payload = {"next_key": next_key, "type": "fido2", "lang": "ko"} + resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/snu/regist/step05", data=payload) + + +def sso_login(sess = None, agt_resp = None, auth_type = "passkey"): if not sess: sess = requests.session() @@ -70,59 +152,113 @@ def sso_login(sess = None, agt_resp = None): payload = {"agt_url": agt_url, "agt_r": agt_r, "agt_id": agt_id} resp = sess.post("https://nsso.snu.ac.kr/sso/usr/login/link", data=payload) - login_key = resp.text.split('id="login_key" name="login_key" value="')[1].split('"')[0] - ed = do(sess, f'{"login_id":"{SNU_ID}","login_pwd":"{SNU_PW}"}') + if "login_key" in resp.text: - payload = {'user_data': ed.hex(), 'login_key': login_key} - resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/auth", data=payload) - - verif_type = 'mail' + login_key = resp.text.split('id="login_key" name="login_key" value="')[1].split('"')[0] - payload = {'crtfc_type': 'mail', 'login_key': login_key} - resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/ajaxUserSend", data=payload) + if auth_type == "passkey": + key_init(sess) + payload = {'user_id': SNU_ID, 'crtfc_type': 'fido2', 'login_key': login_key} + resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/fido2/ajaxIDTokenCreate", data=payload) + result = resp.json() + if not result["result"]: + raise Exception(result) - verif = input("Verification code ? ") + id_token = result["id_token"] - payload = {'crtfc_no': verif, 'login_key': login_key, "bypass_check": "true"} - resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/ajaxUserAuthId", data=payload) + payload = {"id_token": id_token, "userVerification": "preferred"} + resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/fido2/options", json=payload, headers={"origin": "https://nsso.snu.ac.kr"}) + result = resp.json() + + fido = FidoDevice("snu_fido.json") + try: + resp = fido.get(result, "https://nsso.snu.ac.kr") + except FidoDevice.CredNotFoundError: + sso_register_passkey() + print("Passkey created, please rerun.") + return + + data = { + "type": "public-key", + "id": resp["id"], + "response": { + "authenticatorData": resp["response"]["authenticatorData"], + "clientDataJSON": resp["response"]["clientDataJSON"], + "signature": resp["response"]["signature"] + }, + "clientExtensionResults": {} + } + + payload = {"user_data": json.dumps(data), "id_token": id_token} + resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/fido2/auth", data=payload, headers={"origin": "https://nsso.snu.ac.kr"}) + if not (result := resp.json()).get("result"): + raise Exception(result) - payload = {"user_data": "", "page_lang": "", "login_key": login_key, "pwd_type":""} - resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/login/link", data=payload) + payload = {'login_key': login_key} + resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/fido2/ajaxUserAuthFido2", data=payload) + if not (result := resp.json()).get("result"): + raise Exception(result) + print(resp.json()) + + elif auth_type in ["sms", "main"]: + + # Login + ed = encrypt_login(sess, f'{{"login_id":"{SNU_ID}","login_pwd":"{SNU_PW}"}}') + payload = {'user_data': ed.hex(), 'login_key': login_key} + resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/auth", data=payload) + + # 2FA + payload = {'crtfc_type': auth_type, 'login_key': login_key} + resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/ajaxUserSend", data=payload) + + verif = input("? ") + + payload = {'crtfc_no': verif, 'login_key': login_key, "bypass_check": "true"} + resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/ajaxUserAuthId", data=payload) + + # Login complete + + payload = {"user_data": "", "page_lang": "", "login_key": login_key, "pwd_type":""} + resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/login/link", data=payload) + + target = resp.text.split('name="loginForm" method="post" action="')[1].split('"')[0] pni_login_type = resp.text.split('name="pni_login_type" value="')[1].split('"')[0] pni_data = resp.text.split('name="pni_data" value="')[1].split('"')[0] - action = resp.text.split('name="loginForm" method="post" action="')[1].split('"')[0] - - payload = {"pni_login_type": pni_login_type, "pni_data": pni_data} - resp = sess.post(action, data=payload) - - resp = sess.get("https://my.snu.ac.kr/index.jsp") - return sess + payload = {"pni_login_type": pni_login_type, "pni_data": pni_data} + resp = sess.post(target, data=payload) + + return resp + def save_login(sess): cookies = [(i.name, i.value) for i in sess.cookies] with open("sess.json", "w") as f: json.dump(cookies, f, indent=4) + def etl_login(): sess = requests.session() - if os.path.exists("sess.json"): - with open("sess.json", "r") as f: - cookies = json.load(f) - for i in cookies: - sess.cookies.set(i[0], i[1]) - else: - sso_login(sess) - save_login(sess) - - resp = sess.get("https://etl.snu.ac.kr/login") + ## Since we automated Passkey, its better to just log-in everytime. + # if os.path.exists("sess.json"): + # with open("sess.json", "r") as f: + # cookies = json.load(f) + # for i in cookies: + # sess.cookies.set(i[0], i[1]) + # + + sso = sess.get("https://etl.snu.ac.kr/passni/sso/spLogin.php") + resp = sso_login(sess, agt_resp=sso) + # save_login(sess) + if "gw-cb.php" not in resp.text: + print(resp.text) + raise Exception("Login Failed") + + resp = sess.get("https://etl.snu.ac.kr/xn-sso/gw-cb.php") if "iframe.src=" in resp.text: cburl = resp.text.split('iframe.src="')[1].split('"')[0] - else: - print(resp.text) - sso_login(sess, agt_resp=resp) resp = sess.get(cburl) cpar = resp.text.split("window.loginCryption(")[1].split(")")[0] @@ -135,7 +271,7 @@ def etl_login(): key = RSA.import_key(pk) cipher = PKCS1_v1_5.new(key) pt = cipher.decrypt(ct, b'') - + payload = { "utf8": "✓", "redirect_to_ssl": "1", "after_login_url": "", "pseudonym_session[unique_id]": SNU_NM, @@ -145,3 +281,8 @@ def etl_login(): resp = sess.post("https://myetl.snu.ac.kr/login/canvas", data=payload, headers={"referer": cburl}) return sess + + +if __name__ == "__main__": + # sso_register_passkey() + sso_login()