mirror of
https://github.com/morgan9e/eTL-downloader
synced 2026-04-14 00:14:34 +09:00
Improved logic
This commit is contained in:
11
etl.py
11
etl.py
@@ -5,16 +5,11 @@ import sys
|
||||
from datetime import datetime
|
||||
from login import etl_login
|
||||
|
||||
UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
|
||||
|
||||
sess = etl_login()
|
||||
csrf = sess.cookies.get('_csrf_token')
|
||||
CSRF = urllib.parse.unquote(csrf)
|
||||
|
||||
auth = {
|
||||
"user-agent": UA,
|
||||
"X-CSRF-Token": CSRF,
|
||||
}
|
||||
auth = { "X-CSRF-Token": CSRF }
|
||||
|
||||
API = "https://myetl.snu.ac.kr/api"
|
||||
|
||||
@@ -59,7 +54,7 @@ def get_subpath(dir, parent = []):
|
||||
|
||||
|
||||
def sync_etl(lecture, name = ""):
|
||||
basepath = "./download"
|
||||
basepath = "./"
|
||||
root = rget(f"{API}/v1/courses/{lecture}/folders/root")
|
||||
print()
|
||||
print(f"{root['id']} {root['full_name']}")
|
||||
@@ -70,7 +65,7 @@ def sync_etl(lecture, name = ""):
|
||||
local_dir = "/".join([i.replace(" ","_") for i in [i for i in file['path'] if i != "unfiled"]])
|
||||
if basepath:
|
||||
local_dir = os.path.join(basepath, local_dir)
|
||||
local_path = os.path.join(local_dir, file['display_name'].replace(" ","+"))
|
||||
local_path = os.path.join(local_dir, file['display_name'])
|
||||
|
||||
if not os.path.exists(local_dir):
|
||||
os.makedirs(local_dir, exist_ok=True)
|
||||
|
||||
124
login.py
124
login.py
@@ -10,13 +10,53 @@ from base64 import b64decode
|
||||
from Crypto.PublicKey import RSA
|
||||
from Crypto.Cipher import PKCS1_v1_5
|
||||
|
||||
from fido import VirtualFidoDevice as FidoDevice
|
||||
from passkey import VirtPasskeyDevice as Passkey
|
||||
|
||||
class CredStore:
|
||||
def __init__(self):
|
||||
self.store = {}
|
||||
def __getitem__(self, key):
|
||||
return self.store.get(key)
|
||||
def __setitem__(self, key, val):
|
||||
self.store[key] = val
|
||||
self.__dict__[key] = val
|
||||
def get(self, key):
|
||||
return self.__getitem__(key)
|
||||
def put(self, key, val):
|
||||
self.__setitem__(key, val)
|
||||
|
||||
cred = CredStore()
|
||||
|
||||
cred.SNU_ID = ""
|
||||
cred.SNU_NM = ""
|
||||
cred.SNU_PW = ""
|
||||
cred.PASSKEY = os.path.join(os.path.dirname(os.path.abspath(__file__)), "snu_fido.json")
|
||||
|
||||
class Session(requests.Session):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.default_headers = {
|
||||
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
|
||||
}
|
||||
|
||||
def _merge_headers(self, headers):
|
||||
merged = headers.copy() if headers else {}
|
||||
for k, v in self.default_headers.items():
|
||||
if k not in merged:
|
||||
merged[k] = v
|
||||
return merged
|
||||
|
||||
def get(self, url, **kwargs):
|
||||
kwargs["headers"] = self._merge_headers(kwargs.get("headers"))
|
||||
return super().get(url, **kwargs)
|
||||
|
||||
def post(self, url, **kwargs):
|
||||
kwargs["headers"] = self._merge_headers(kwargs.get("headers"))
|
||||
return super().post(url, **kwargs)
|
||||
|
||||
SNU_PW = "" # Unused on Passkey
|
||||
SNU_ID = ""
|
||||
SNU_NM = ""
|
||||
|
||||
def key_init(sess):
|
||||
|
||||
curve = P256
|
||||
n = curve.q
|
||||
byte_length = (n.bit_length() + 7) // 8
|
||||
@@ -58,22 +98,26 @@ def key_init(sess):
|
||||
|
||||
|
||||
def encrypt_login(sess, data):
|
||||
|
||||
passni_iv, passni_key = key_init(sess)
|
||||
encrypt_data = seed_cbc_encrypt(data.encode(), passni_key, passni_iv)
|
||||
return encrypt_data
|
||||
|
||||
|
||||
def sso_register_passkey(sess = None):
|
||||
if not sess:
|
||||
sess = requests.session()
|
||||
|
||||
def sso_register_passkey(sess, passkey = None):
|
||||
print("[*] Credential not found, register new passkey.")
|
||||
|
||||
if not passkey:
|
||||
print(f"[*] Using Passkey store: {cred.PASSKEY}")
|
||||
passkey = Passkey(cred.PASSKEY)
|
||||
|
||||
payload = {"crtfc_type": "fido2", "lang": "ko", "return_url": "https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/view", "lnksys_id": "snu-mfa-sso"}
|
||||
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/snu/regist/step", data=payload)
|
||||
|
||||
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/com/ajaxNextKey")
|
||||
next_key = resp.text
|
||||
|
||||
payload = {"next_key": next_key, "sel_user_id": "", "lang": "ko", "user_id": SNU_ID, "user_name": "", "user_birth": ""}
|
||||
payload = {"next_key": next_key, "sel_user_id": "", "lang": "ko", "user_id": cred.SNU_ID, "user_name": "", "user_birth": ""}
|
||||
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/snu/com/ajaxUserIdCheck", data=payload)
|
||||
if not (r := resp.json()).get("result"):
|
||||
raise Exception(r)
|
||||
@@ -86,7 +130,7 @@ def sso_register_passkey(sess = None):
|
||||
if not (r := resp.json()).get("result"):
|
||||
raise Exception(r)
|
||||
|
||||
print("Verification Code sent to mail.")
|
||||
print("[*] 2FA is sent to registered mail.")
|
||||
verif = input("? ")
|
||||
|
||||
payload = {"next_key": next_key, "gubun": "self", "lang": "ko", "crtfc_no": verif}
|
||||
@@ -117,8 +161,8 @@ def sso_register_passkey(sess = None):
|
||||
if result.get("status") != "ok":
|
||||
raise Exception(result)
|
||||
|
||||
fido = FidoDevice("snu_fido.json")
|
||||
resp = fido.create(result, "https://mfalogin.snu.ac.kr")
|
||||
# Register new passkey
|
||||
resp = passkey.create(result, "https://mfalogin.snu.ac.kr")
|
||||
|
||||
data = {
|
||||
"type": resp["type"],
|
||||
@@ -134,11 +178,13 @@ def sso_register_passkey(sess = None):
|
||||
|
||||
payload = {"next_key": next_key, "type": "fido2", "lang": "ko"}
|
||||
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/snu/regist/step05", data=payload)
|
||||
|
||||
|
||||
print("[*] New passkey registered successfully")
|
||||
|
||||
|
||||
def sso_login(sess = None, agt_resp = None, auth_type = "passkey"):
|
||||
if not sess:
|
||||
sess = requests.session()
|
||||
sess = Session()
|
||||
|
||||
if not agt_resp:
|
||||
resp = sess.get("https://my.snu.ac.kr/SSOService.do")
|
||||
@@ -158,7 +204,7 @@ def sso_login(sess = None, agt_resp = None, auth_type = "passkey"):
|
||||
|
||||
if auth_type == "passkey":
|
||||
key_init(sess)
|
||||
payload = {'user_id': SNU_ID, 'crtfc_type': 'fido2', 'login_key': login_key}
|
||||
payload = {'user_id': cred.SNU_ID, 'crtfc_type': 'fido2', 'login_key': login_key}
|
||||
resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/fido2/ajaxIDTokenCreate", data=payload)
|
||||
result = resp.json()
|
||||
if not result["result"]:
|
||||
@@ -169,15 +215,21 @@ def sso_login(sess = None, agt_resp = None, auth_type = "passkey"):
|
||||
payload = {"id_token": id_token, "userVerification": "preferred"}
|
||||
resp = sess.post("https://mfalogin.snu.ac.kr/mfa/user/fido2/options", json=payload, headers={"origin": "https://nsso.snu.ac.kr"})
|
||||
result = resp.json()
|
||||
|
||||
fido = FidoDevice("snu_fido.json")
|
||||
|
||||
print(f"[*] Using Passkey store: {cred.PASSKEY}")
|
||||
passkey = Passkey(cred.PASSKEY)
|
||||
|
||||
try:
|
||||
resp = fido.get(result, "https://nsso.snu.ac.kr")
|
||||
except FidoDevice.CredNotFoundError:
|
||||
sso_register_passkey()
|
||||
print("Passkey created, please rerun.")
|
||||
return
|
||||
|
||||
resp = passkey.get(result, "https://nsso.snu.ac.kr")
|
||||
|
||||
except Passkey.CredNotFoundError:
|
||||
try:
|
||||
sso_register_passkey(sess, passkey)
|
||||
resp = passkey.get(result, "https://nsso.snu.ac.kr")
|
||||
except Exception as e:
|
||||
print(f"Unknown error: {e}")
|
||||
exit(1)
|
||||
|
||||
data = {
|
||||
"type": "public-key",
|
||||
"id": resp["id"],
|
||||
@@ -198,12 +250,12 @@ def sso_login(sess = None, agt_resp = None, auth_type = "passkey"):
|
||||
resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/fido2/ajaxUserAuthFido2", data=payload)
|
||||
if not (result := resp.json()).get("result"):
|
||||
raise Exception(result)
|
||||
print(resp.json())
|
||||
print(f"[*] Login Success! {resp.json()}")
|
||||
|
||||
elif auth_type in ["sms", "main"]:
|
||||
|
||||
# Login
|
||||
ed = encrypt_login(sess, f'{{"login_id":"{SNU_ID}","login_pwd":"{SNU_PW}"}}')
|
||||
ed = encrypt_login(sess, f'{{"login_id":"{cred.SNU_ID}","login_pwd":"{cred.SNU_PW}"}}')
|
||||
payload = {'user_data': ed.hex(), 'login_key': login_key}
|
||||
resp = sess.post("https://nsso.snu.ac.kr/sso/usr/snu/mfa/login/auth", data=payload)
|
||||
|
||||
@@ -231,27 +283,13 @@ def sso_login(sess = None, agt_resp = None, auth_type = "passkey"):
|
||||
return resp
|
||||
|
||||
|
||||
def save_login(sess):
|
||||
cookies = [(i.name, i.value) for i in sess.cookies]
|
||||
with open("sess.json", "w") as f:
|
||||
json.dump(cookies, f, indent=4)
|
||||
|
||||
|
||||
def etl_login():
|
||||
sess = requests.session()
|
||||
|
||||
## Since we automated Passkey, its better to just log-in everytime.
|
||||
|
||||
# if os.path.exists("sess.json"):
|
||||
# with open("sess.json", "r") as f:
|
||||
# cookies = json.load(f)
|
||||
# for i in cookies:
|
||||
# sess.cookies.set(i[0], i[1])
|
||||
#
|
||||
|
||||
sess = Session()
|
||||
sso = sess.get("https://etl.snu.ac.kr/passni/sso/spLogin.php")
|
||||
# SSO Login
|
||||
resp = sso_login(sess, agt_resp=sso)
|
||||
# save_login(sess)
|
||||
|
||||
if "gw-cb.php" not in resp.text:
|
||||
print(resp.text)
|
||||
raise Exception("Login Failed")
|
||||
@@ -274,7 +312,7 @@ def etl_login():
|
||||
|
||||
payload = {
|
||||
"utf8": "✓", "redirect_to_ssl": "1", "after_login_url": "",
|
||||
"pseudonym_session[unique_id]": SNU_NM,
|
||||
"pseudonym_session[unique_id]": cred.SNU_NM,
|
||||
"pseudonym_session[password]": pt.decode(),
|
||||
"pseudonym_session[remember_me]": "1"
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ from Crypto.Signature import DSS
|
||||
from Crypto.Hash import SHA256
|
||||
from typing import Dict, Any, Optional
|
||||
|
||||
|
||||
import getpass
|
||||
from fido2.hid import CtapHidDevice
|
||||
from fido2.client import Fido2Client, UserInteraction
|
||||
@@ -18,8 +17,8 @@ from fido2.webauthn import PublicKeyCredentialCreationOptions, PublicKeyCredenti
|
||||
from fido2.utils import websafe_encode, websafe_decode
|
||||
|
||||
|
||||
class YkFidoDevice:
|
||||
def __init__(self):
|
||||
class YubiPasskeyDevice:
|
||||
def __init__(self, dummy = ""):
|
||||
devices = list(CtapHidDevice.list_devices())
|
||||
if not devices:
|
||||
raise Exception("No FIDO2 devices found.")
|
||||
@@ -139,8 +138,8 @@ class YkFidoDevice:
|
||||
return result
|
||||
|
||||
|
||||
class VirtualFidoDevice:
|
||||
def __init__(self, file: str = "fido.json"):
|
||||
class VirtPasskeyDevice:
|
||||
def __init__(self, file: str = "passkey.json"):
|
||||
self.file = file
|
||||
self.credentials = {}
|
||||
self._load_credentials()
|
||||
@@ -158,7 +157,7 @@ class VirtualFidoDevice:
|
||||
try:
|
||||
with open(self.file, 'r') as f:
|
||||
self.credentials = json.load(f)
|
||||
except FileNotFoundError:
|
||||
except os.FileNotExistsError:
|
||||
self.credentials = {}
|
||||
|
||||
def _save_credentials(self):
|
||||
@@ -223,7 +222,7 @@ class VirtualFidoDevice:
|
||||
if not origin:
|
||||
raise self.InputDataError("origin")
|
||||
|
||||
rp_id = rp.get("id", "").encode()
|
||||
rp_id = rp.get("id").encode()
|
||||
|
||||
user_id = user.get("id")
|
||||
if isinstance(user_id, str):
|
||||
@@ -245,34 +244,25 @@ class VirtualFidoDevice:
|
||||
|
||||
auth_data = self._create_authenticator_data(rp_id, counter=0, credential_data=attested_data)
|
||||
|
||||
client_data = ('{"type":"%s","challenge":"%s","origin":"%s","crossOrigin":false}'
|
||||
% ("webauthn.create", self._b64url(challenge), origin)).encode()
|
||||
client_data_hash = hashlib.sha256(client_data).digest()
|
||||
|
||||
signature_data = auth_data + client_data_hash
|
||||
|
||||
h = SHA256.new(signature_data)
|
||||
signer = DSS.new(key, 'fips-186-3', encoding='der')
|
||||
signature = signer.sign(h)
|
||||
|
||||
# Self Attestation
|
||||
attn_fmt = "packed"
|
||||
attn_stmt = {
|
||||
"alg": -7,
|
||||
"sig": signature
|
||||
attestation_obj = {
|
||||
"fmt": "none",
|
||||
"authData": auth_data,
|
||||
"attStmt": {}
|
||||
}
|
||||
attestation_cbor = cbor2.dumps(attestation_obj)
|
||||
|
||||
attn_obj = {
|
||||
"fmt": attn_fmt,
|
||||
"attStmt": attn_stmt,
|
||||
"authData": auth_data
|
||||
}
|
||||
attn_cbor = cbor2.dumps(attn_obj)
|
||||
client_data = {
|
||||
"challenge": self._b64url(challenge),
|
||||
"origin": origin,
|
||||
"type": "webauthn.create",
|
||||
"crossOrigin": False,
|
||||
}
|
||||
|
||||
client_data_json = json.dumps(client_data).encode()
|
||||
|
||||
self.credentials[credential_id_b64] = {
|
||||
"private_key": private_key,
|
||||
"rp_id": rp_id.decode(),
|
||||
"rp_id": self._b64url(rp_id),
|
||||
"user_id": self._b64url(user_id),
|
||||
"user_name": user.get('displayName', ''),
|
||||
"created": int(time.time()),
|
||||
@@ -285,12 +275,11 @@ class VirtualFidoDevice:
|
||||
"id": credential_id_b64,
|
||||
"rawId": credential_id_b64,
|
||||
"response": {
|
||||
"attestationObject": self._b64url(attn_cbor),
|
||||
"clientDataJSON": self._b64url(client_data),
|
||||
"attestationObject": self._b64url(attestation_cbor),
|
||||
"clientDataJSON": self._b64url(client_data_json),
|
||||
"publicKey": self._b64url(cose_pubkey),
|
||||
"authenticatorData": self._b64url(auth_data),
|
||||
"pubKeyAlgo": str(alg),
|
||||
"transports": ["usb"]
|
||||
"transports": ["internal"]
|
||||
},
|
||||
"type": "public-key"
|
||||
}
|
||||
@@ -359,11 +348,14 @@ class VirtualFidoDevice:
|
||||
return response
|
||||
|
||||
|
||||
Passkey = VirtPasskeyDevice
|
||||
|
||||
|
||||
if __name__=="__main__":
|
||||
import requests
|
||||
|
||||
sess = requests.Session()
|
||||
fido = VirtualFidoDevice()
|
||||
passkey = Passkey()
|
||||
|
||||
payload = {
|
||||
"algorithms": ["es256"], "attachment": "all", "attestation": "none", "discoverable_credential": "preferred",
|
||||
@@ -371,7 +363,7 @@ if __name__=="__main__":
|
||||
}
|
||||
resp = sess.post("https://webauthn.io/registration/options", json=payload)
|
||||
print(resp.json())
|
||||
data = fido.create(resp.json(), origin="https://webauthn.io")
|
||||
data = passkey.create(resp.json(), origin="https://webauthn.io")
|
||||
data["rawId"] = data["id"]
|
||||
print(data)
|
||||
resp = sess.post("https://webauthn.io/registration/verification", json={"response": data, "username": "asdf"})
|
||||
@@ -383,7 +375,7 @@ if __name__=="__main__":
|
||||
payload = {"username":"asdf", "user_verification":"preferred", "hints":[]}
|
||||
resp = sess.post("https://webauthn.io/authentication/options", json=payload, headers={"origin": "https://webauthn.io"})
|
||||
print(resp.json())
|
||||
data = fido.get(resp.json(), origin="https://webauthn.io")
|
||||
data = passkey.get(resp.json(), origin="https://webauthn.io")
|
||||
print(data)
|
||||
data["rawId"] = data["id"]
|
||||
resp = sess.post("https://webauthn.io/authentication/verification", json={"response": data, "username": "asdf"})
|
||||
Reference in New Issue
Block a user