Added possibilty to use different crypto backends.

* Created NoiseBackend class serving as a base for backends

* Refactored NoiseProtocol name parsing

* Refactored existing spec-defined functions into abstract classes.
Implementing classes are connecting crypto primitives to expected
interfaces.

* Refactored existing usage of Cryptography as source of crypto into
"default" backend (along with in-house implementation of X448).

* Provisioned "experimental" backend, it will contain e.g. non-default
crypto algorithms

* Backend can be chosen while creating NoiseConnection, though by
default, the Cryptography backend ("default") is used

Closes #7
This commit is contained in:
Piotr Lizończyk
2018-07-16 01:26:20 +02:00
parent 6fc772aead
commit e84db3c232
23 changed files with 583 additions and 417 deletions

1
.gitignore vendored
View File

@@ -44,6 +44,7 @@ nosetests.xml
coverage.xml
*,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo

View File

View File

@@ -0,0 +1,3 @@
from .backend import DefaultNoiseBackend
noise_backend = DefaultNoiseBackend()

View File

@@ -0,0 +1,38 @@
from noise.backends.default.ciphers import ChaCha20Cipher, AESGCMCipher
from noise.backends.default.diffie_hellmans import ED25519, ED448
from noise.backends.default.hashes import hmac_hash, BLAKE2sHash, BLAKE2bHash, SHA256Hash, SHA512Hash
from noise.backends.default.keypairs import KeyPair25519, KeyPair448
from noise.backends.noise_backend import NoiseBackend
class DefaultNoiseBackend(NoiseBackend):
"""
Contains all the crypto methods endorsed by Noise Protocol specification, using Cryptography as backend
"""
def __init__(self):
super(DefaultNoiseBackend, self).__init__()
self.diffie_hellmans = {
'25519': ED25519,
'448': ED448
}
self.ciphers = {
'AESGCM': AESGCMCipher,
'ChaChaPoly': ChaCha20Cipher
}
self.hashes = {
'BLAKE2s': BLAKE2sHash,
'BLAKE2b': BLAKE2bHash,
'SHA256': SHA256Hash,
'SHA512': SHA512Hash
}
self.keypairs = {
'25519': KeyPair25519,
'448': KeyPair448
}
self.hmac = hmac_hash

View File

@@ -0,0 +1,35 @@
import abc
from cryptography.hazmat.primitives.ciphers.aead import AESGCM, ChaCha20Poly1305
from noise.functions.cipher import Cipher
class CryptographyCipher(Cipher, metaclass=abc.ABCMeta):
def encrypt(self, k, n, ad, plaintext):
return self.cipher.encrypt(nonce=self.format_nonce(n), data=plaintext, associated_data=ad)
def decrypt(self, k, n, ad, ciphertext):
return self.cipher.decrypt(nonce=self.format_nonce(n), data=ciphertext, associated_data=ad)
@abc.abstractmethod
def format_nonce(self, n):
raise NotImplementedError
class AESGCMCipher(CryptographyCipher):
@property
def klass(self):
return AESGCM
def format_nonce(self, n):
return b'\x00\x00\x00\x00' + n.to_bytes(length=8, byteorder='big')
class ChaCha20Cipher(CryptographyCipher):
@property
def klass(self):
return ChaCha20Poly1305
def format_nonce(self, n):
return b'\x00\x00\x00\x00' + n.to_bytes(length=8, byteorder='little')

View File

@@ -0,0 +1,44 @@
from cryptography.hazmat.primitives.asymmetric import x25519
from noise.backends.default.crypto import X448
from noise.backends.default.keypairs import KeyPair25519, KeyPair448
from noise.exceptions import NoiseValueError
from noise.functions.dh import DH
class ED25519(DH):
@property
def klass(self):
return KeyPair25519
@property
def dhlen(self):
return 32
def generate_keypair(self) -> 'KeyPair':
private_key = x25519.X25519PrivateKey.generate()
public_key = private_key.public_key()
return KeyPair25519(private_key, public_key, public_key.public_bytes())
def dh(self, private_key, public_key) -> bytes:
if not isinstance(private_key, x25519.X25519PrivateKey) or not isinstance(public_key, x25519.X25519PublicKey):
raise NoiseValueError('Invalid keys! Must be x25519.X25519PrivateKey and x25519.X25519PublicKey instances')
return private_key.exchange(public_key)
class ED448(DH):
@property
def klass(self):
return KeyPair448
@property
def dhlen(self):
return 56
def generate_keypair(self) -> 'KeyPair':
return KeyPair448.new()
def dh(self, private_key, public_key) -> bytes:
if len(private_key) != self.dhlen or len(public_key) != self.dhlen:
raise NoiseValueError('Invalid length of keys! Should be {}'.format(self.dhlen))
return X448.mul(private_key, public_key)

View File

@@ -0,0 +1,80 @@
import abc
from functools import partial
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.hmac import HMAC
from noise.functions.hash import Hash
cryptography_backend = default_backend()
class CryptographyHash(Hash, metaclass=abc.ABCMeta):
def hash(self, data):
digest = hashes.Hash(self.fn(), cryptography_backend)
digest.update(data)
return digest.finalize()
class SHA256Hash(CryptographyHash):
@property
def fn(self):
return hashes.SHA256
@property
def hashlen(self):
return 32
@property
def blocklen(self):
return 64
class SHA512Hash(CryptographyHash):
@property
def fn(self):
return hashes.SHA512
@property
def hashlen(self):
return 64
@property
def blocklen(self):
return 128
class BLAKE2sHash(CryptographyHash):
@property
def fn(self):
return partial(hashes.BLAKE2s, digest_size=self.hashlen)
@property
def hashlen(self):
return 32
@property
def blocklen(self):
return 64
class BLAKE2bHash(CryptographyHash):
@property
def fn(self):
return partial(hashes.BLAKE2b, digest_size=self.hashlen)
@property
def hashlen(self):
return 64
@property
def blocklen(self):
return 128
def hmac_hash(key, data, algorithm):
# Applies HMAC using the HASH() function.
hmac = HMAC(key=key, algorithm=algorithm(), backend=cryptography_backend)
hmac.update(data=data)
return hmac.finalize()

View File

@@ -0,0 +1,51 @@
import os
import warnings
from cryptography.hazmat.primitives.asymmetric import x25519
from noise.backends.default.crypto import X448
from noise.exceptions import NoiseValueError
from noise.functions.keypair import KeyPair
class KeyPair25519(KeyPair):
@classmethod
def from_private_bytes(cls, private_bytes):
if len(private_bytes) != 32:
raise NoiseValueError('Invalid length of private_bytes! Should be 32')
private = x25519.X25519PrivateKey._from_private_bytes(private_bytes)
public = private.public_key()
return cls(private=private, public=public, public_bytes=public.public_bytes())
@classmethod
def from_public_bytes(cls, public_bytes):
if len(public_bytes) != 32:
raise NoiseValueError('Invalid length of public_bytes! Should be 32')
public = x25519.X25519PublicKey.from_public_bytes(public_bytes)
return cls(public=public, public_bytes=public.public_bytes())
class KeyPair448(KeyPair):
def __init__(self, *args, **kwargs):
super(KeyPair448, self).__init__(*args, **kwargs)
warnings.warn('This implementation of ed448 is likely to be very insecure! USE ONLY FOR TESTING!')
@classmethod
def from_private_bytes(cls, private_bytes):
if len(private_bytes) != 56:
raise NoiseValueError('Invalid length of private_bytes! Should be 56')
private = private_bytes
public = X448.mul_5(private)
return cls(private=private, public=public, public_bytes=public)
@classmethod
def from_public_bytes(cls, public_bytes):
if len(public_bytes) != 56:
raise NoiseValueError('Invalid length of private_bytes! Should be 56')
return cls(public=public_bytes, public_bytes=public_bytes)
@classmethod
def new(cls):
private = os.urandom(56)
public = X448.mul_5(private)
return cls(private=private, public=public, public_bytes=public)

View File

@@ -0,0 +1,3 @@
from noise.backends.experimental.backend import ExperimentalNoiseBackend
noise_backend = ExperimentalNoiseBackend()

View File

@@ -0,0 +1,8 @@
from noise.backends.default import DefaultNoiseBackend
class ExperimentalNoiseBackend(DefaultNoiseBackend):
"""
Contains all the default crypto methods, but also methods not directly endorsed by Noise Protocol specification
"""
pass

View File

@@ -0,0 +1,64 @@
from noise.exceptions import NoiseProtocolNameError
from noise.functions.hash import hkdf
from noise.patterns import (PatternN, PatternK, PatternX, PatternNN, PatternKN, PatternNK, PatternKK, PatternNX,
PatternKX, PatternXN, PatternIN, PatternXK, PatternIK, PatternXX, PatternIX)
class NoiseBackend:
"""
Base for creating backends.
Implementing classes must define supported crypto methods in appropriate dict (diffie_hellmans, ciphers, etc.)
HMAC function must be defined as well.
Dicts use convention for keys - they must match the string that occurs in Noise Protocol name.
"""
def __init__(self):
self.patterns = {
'N': PatternN,
'K': PatternK,
'X': PatternX,
'NN': PatternNN,
'KN': PatternKN,
'NK': PatternNK,
'KK': PatternKK,
'NX': PatternNX,
'KX': PatternKX,
'XN': PatternXN,
'IN': PatternIN,
'XK': PatternXK,
'IK': PatternIK,
'XX': PatternXX,
'IX': PatternIX,
}
self.diffie_hellmans = {}
self.ciphers = {}
self.hashes = {}
self.keypairs = {}
self.hmac = None
self.hkdf = hkdf
@property
def methods(self):
return {
'pattern': self.patterns,
'dh': self.diffie_hellmans,
'cipher': self.ciphers,
'hash': self.hashes,
'keypair': self.keypairs
}
def map_protocol_name_to_crypto(self, unpacked_name):
mappings = {}
# Validate if we know everything that Noise Protocol is supposed to use and map appropriate functions
for method, map_dict in self.methods.items():
looked_up_func = getattr(unpacked_name, method)
func = map_dict.get(looked_up_func)
if not func:
raise NoiseProtocolNameError('Unknown {} in Noise Protocol name, given {}, known {}'.format(
method, looked_up_func, " ".join(map_dict)))
mappings[method] = func
return mappings

View File

@@ -3,6 +3,7 @@ from typing import Union, List
from cryptography.exceptions import InvalidTag
from noise.backends.default import noise_backend
from noise.constants import MAX_MESSAGE_LEN
from noise.exceptions import NoisePSKError, NoiseValueError, NoiseHandshakeError, NoiseInvalidMessage
from .noise_protocol import NoiseProtocol
@@ -21,6 +22,7 @@ _keypairs = {Keypair.STATIC: 's', Keypair.REMOTE_STATIC: 'rs',
class NoiseConnection(object):
def __init__(self):
self.backend = None
self.noise_protocol = None
self.protocol_name = None
self.handshake_finished = False
@@ -28,14 +30,14 @@ class NoiseConnection(object):
self._next_fn = None
@classmethod
def from_name(cls, name: Union[str, bytes]):
def from_name(cls, name: Union[str, bytes], backend=noise_backend):
instance = cls()
# Forgiving passing string. Bytes are good too, anything else will fail inside NoiseProtocol
try:
instance.protocol_name = name.encode('ascii') if isinstance(name, str) else name
except ValueError:
raise NoiseValueError('If passing string as protocol name, it must contain only ASCII characters')
instance.noise_protocol = NoiseProtocol(protocol_name=name)
instance.noise_protocol = NoiseProtocol(protocol_name=name, backend=backend)
return instance
def set_psks(self, psk: Union[bytes, str] = None, psks: List[Union[str, bytes]] = None):
@@ -74,21 +76,21 @@ class NoiseConnection(object):
def set_keypair_from_private_bytes(self, keypair: Keypair, private_bytes: bytes):
self.noise_protocol.keypairs[_keypairs[keypair]] = \
self.noise_protocol.dh_fn.keypair_cls.from_private_bytes(private_bytes)
self.noise_protocol.dh_fn.klass.from_private_bytes(private_bytes)
def set_keypair_from_public_bytes(self, keypair: Keypair, private_bytes: bytes):
self.noise_protocol.keypairs[_keypairs[keypair]] = \
self.noise_protocol.dh_fn.keypair_cls.from_public_bytes(private_bytes)
self.noise_protocol.dh_fn.klass.from_public_bytes(private_bytes)
def set_keypair_from_private_path(self, keypair: Keypair, path: str):
with open(path, 'rb') as fd:
self.noise_protocol.keypairs[_keypairs[keypair]] = \
self.noise_protocol.dh_fn.keypair_cls.from_private_bytes(fd.read())
self.noise_protocol.dh_fn.klass.from_private_bytes(fd.read())
def set_keypair_from_public_path(self, keypair: Keypair, path: str):
with open(path, 'rb') as fd:
self.noise_protocol.keypairs[_keypairs[keypair]] = \
self.noise_protocol.dh_fn.keypair_cls.from_public_bytes(fd.read())
self.noise_protocol.dh_fn.klass.from_public_bytes(fd.read())
def start_handshake(self):
self.noise_protocol.validate()

View File

@@ -1,251 +0,0 @@
import abc
import warnings
from functools import partial
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import x25519
from cryptography.hazmat.primitives.ciphers.aead import AESGCM, ChaCha20Poly1305
from cryptography.hazmat.primitives.hmac import HMAC
from noise.constants import MAX_NONCE
from noise.exceptions import NoiseValueError
from .crypto import X448
backend = default_backend()
class DH(object):
def __init__(self, method):
if method == 'ed25519':
self.method = method
self.dhlen = 32
self.keypair_cls = KeyPair25519
self.generate_keypair = self._25519_generate_keypair
self.dh = self._25519_dh
elif method == 'ed448':
self.method = method
self.dhlen = 56
self.keypair_cls = KeyPair448
self.generate_keypair = self._448_generate_keypair
self.dh = self._448_dh
else:
raise NotImplementedError('DH method: {}'.format(method))
def _25519_generate_keypair(self) -> '_KeyPair':
private_key = x25519.X25519PrivateKey.generate()
public_key = private_key.public_key()
return KeyPair25519(private_key, public_key, public_key.public_bytes())
def _25519_dh(self, private_key: 'x25519.X25519PrivateKey', public_key: 'x25519.X25519PublicKey') -> bytes:
if not isinstance(private_key, x25519.X25519PrivateKey) or not isinstance(public_key, x25519.X25519PublicKey):
raise NoiseValueError('Invalid keys! Must be x25519.X25519PrivateKey and x25519.X25519PublicKey instances')
return private_key.exchange(public_key)
def _448_generate_keypair(self) -> '_KeyPair':
return KeyPair448.new()
def _448_dh(self, private_key: bytes, public_key: bytes) -> bytes:
if len(private_key) != self.dhlen or len(public_key) != self.dhlen:
raise NoiseValueError('Invalid length of keys! Should be {}'.format(self.dhlen))
return X448.mul(private_key, public_key)
class Cipher(object):
def __init__(self, method):
if method == 'AESGCM':
self._cipher = AESGCM
self.encrypt = self._aesgcm_encrypt
self.decrypt = self._aesgcm_decrypt
self.rekey = self._default_rekey
elif method == 'ChaCha20':
self._cipher = ChaCha20Poly1305
self.encrypt = self._chacha20_encrypt
self.decrypt = self._chacha20_decrypt
self.rekey = self._default_rekey
else:
raise NotImplementedError('Cipher method: {}'.format(method))
self.cipher = None
def _aesgcm_encrypt(self, k, n, ad, plaintext):
return self.cipher.encrypt(nonce=self._aesgcm_nonce(n), data=plaintext, associated_data=ad)
def _aesgcm_decrypt(self, k, n, ad, ciphertext):
return self.cipher.decrypt(nonce=self._aesgcm_nonce(n), data=ciphertext, associated_data=ad)
def _aesgcm_nonce(self, n):
return b'\x00\x00\x00\x00' + n.to_bytes(length=8, byteorder='big')
def _chacha20_encrypt(self, k, n, ad, plaintext):
return self.cipher.encrypt(nonce=self._chacha20_nonce(n), data=plaintext, associated_data=ad)
def _chacha20_decrypt(self, k, n, ad, ciphertext):
return self.cipher.decrypt(nonce=self._chacha20_nonce(n), data=ciphertext, associated_data=ad)
def _chacha20_nonce(self, n):
return b'\x00\x00\x00\x00' + n.to_bytes(length=8, byteorder='little')
def _default_rekey(self, k):
return self.encrypt(k, MAX_NONCE, b'', b'\x00' * 32)[:32]
def initialize(self, key):
self.cipher = self._cipher(key)
class Hash(object):
def __init__(self, method):
if method == 'SHA256':
self.hashlen = 32
self.blocklen = 64
self.hash = self._hash_sha256
self.fn = hashes.SHA256
elif method == 'SHA512':
self.hashlen = 64
self.blocklen = 128
self.hash = self._hash_sha512
self.fn = hashes.SHA512
elif method == 'BLAKE2s':
self.hashlen = 32
self.blocklen = 64
self.hash = self._hash_blake2s
self.fn = partial(hashes.BLAKE2s, digest_size=self.hashlen)
elif method == 'BLAKE2b':
self.hashlen = 64
self.blocklen = 128
self.hash = self._hash_blake2b
self.fn = partial(hashes.BLAKE2b, digest_size=self.hashlen)
else:
raise NotImplementedError('Hash method: {}'.format(method))
def _hash_sha256(self, data):
digest = hashes.Hash(hashes.SHA256(), backend)
digest.update(data)
return digest.finalize()
def _hash_sha512(self, data):
digest = hashes.Hash(hashes.SHA512(), backend)
digest.update(data)
return digest.finalize()
def _hash_blake2s(self, data):
digest = hashes.Hash(hashes.BLAKE2s(digest_size=self.hashlen), backend)
digest.update(data)
return digest.finalize()
def _hash_blake2b(self, data):
digest = hashes.Hash(hashes.BLAKE2b(digest_size=self.hashlen), backend)
digest.update(data)
return digest.finalize()
class _KeyPair(object):
__metaclass__ = abc.ABCMeta
def __init__(self, private=None, public=None, public_bytes=None):
self.private = private
self.public = public
self.public_bytes = public_bytes
@classmethod
@abc.abstractmethod
def from_private_bytes(cls, private_bytes):
raise NotImplementedError
@classmethod
@abc.abstractmethod
def from_public_bytes(cls, public_bytes):
raise NotImplementedError
class KeyPair25519(_KeyPair):
@classmethod
def from_private_bytes(cls, private_bytes):
if len(private_bytes) != 32:
raise NoiseValueError('Invalid length of private_bytes! Should be 32')
private = x25519.X25519PrivateKey._from_private_bytes(private_bytes)
public = private.public_key()
return cls(private=private, public=public, public_bytes=public.public_bytes())
@classmethod
def from_public_bytes(cls, public_bytes):
if len(public_bytes) != 32:
raise NoiseValueError('Invalid length of public_bytes! Should be 32')
public = x25519.X25519PublicKey.from_public_bytes(public_bytes)
return cls(public=public, public_bytes=public.public_bytes())
class KeyPair448(_KeyPair):
def __init__(self, *args, **kwargs):
super(KeyPair448, self).__init__(*args, **kwargs)
warnings.warn('This implementation of ed448 is likely to be very insecure! USE ONLY FOR TESTING!')
@classmethod
def from_private_bytes(cls, private_bytes):
if len(private_bytes) != 56:
raise NoiseValueError('Invalid length of private_bytes! Should be 56')
private = private_bytes
public = X448.mul_5(private)
return cls(private=private, public=public, public_bytes=public)
@classmethod
def from_public_bytes(cls, public_bytes):
if len(public_bytes) != 56:
raise NoiseValueError('Invalid length of private_bytes! Should be 56')
return cls(public=public_bytes, public_bytes=public_bytes)
@classmethod
def new(cls):
private = os.urandom(56)
public = X448.mul_5(private)
return cls(private=private, public=public, public_bytes=public)
dh_map = {
'25519': DH('ed25519'),
'448': DH('ed448')
}
cipher_map = {
'AESGCM': partial(Cipher, 'AESGCM'),
'ChaChaPoly': partial(Cipher, 'ChaCha20')
}
hash_map = {
'BLAKE2s': Hash('BLAKE2s'),
'BLAKE2b': Hash('BLAKE2b'),
'SHA256': Hash('SHA256'),
'SHA512': Hash('SHA512')
}
keypair_map = {
'25519': KeyPair25519,
'448': KeyPair448
}
def hmac_hash(key, data, algorithm):
# Applies HMAC using the HASH() function.
hmac = HMAC(key=key, algorithm=algorithm(), backend=backend)
hmac.update(data=data)
return hmac.finalize()
def hkdf(chaining_key, input_key_material, num_outputs, hmac_hash_fn):
# Sets temp_key = HMAC-HASH(chaining_key, input_key_material).
temp_key = hmac_hash_fn(chaining_key, input_key_material)
# Sets output1 = HMAC-HASH(temp_key, byte(0x01)).
output1 = hmac_hash_fn(temp_key, b'\x01')
# Sets output2 = HMAC-HASH(temp_key, output1 || byte(0x02)).
output2 = hmac_hash_fn(temp_key, output1 + b'\x02')
# If num_outputs == 2 then returns the pair (output1, output2).
if num_outputs == 2:
return output1, output2
# Sets output3 = HMAC-HASH(temp_key, output2 || byte(0x03)).
output3 = hmac_hash_fn(temp_key, output2 + b'\x03')
# Returns the triple (output1, output2, output3).
return output1, output2, output3

View File

27
noise/functions/cipher.py Normal file
View File

@@ -0,0 +1,27 @@
import abc
from noise.constants import MAX_NONCE
class Cipher(metaclass=abc.ABCMeta):
def __init__(self):
self.cipher = None
@property
@abc.abstractmethod
def klass(self):
raise NotImplementedError
@abc.abstractmethod
def encrypt(self, k, n, ad, plaintext):
raise NotImplementedError
@abc.abstractmethod
def decrypt(self, k, n, ad, ciphertext):
raise NotImplementedError
def rekey(self, k):
return self.encrypt(k, MAX_NONCE, b'', b'\x00' * 32)[:32]
def initialize(self, key):
self.cipher = self.klass(key)

21
noise/functions/dh.py Normal file
View File

@@ -0,0 +1,21 @@
import abc
class DH(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def klass(self):
raise NotImplementedError
@property
@abc.abstractmethod
def dhlen(self):
raise NotImplementedError
@abc.abstractmethod
def generate_keypair(self) -> 'KeyPair':
raise NotImplementedError
@abc.abstractmethod
def dh(self, private_key, public_key) -> bytes:
raise NotImplementedError

43
noise/functions/hash.py Normal file
View File

@@ -0,0 +1,43 @@
import abc
class Hash(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def fn(self):
raise NotImplementedError
@property
@abc.abstractmethod
def hashlen(self):
raise NotImplementedError
@property
@abc.abstractmethod
def blocklen(self):
raise NotImplementedError
@abc.abstractmethod
def hash(self, data):
raise NotImplementedError
def hkdf(chaining_key, input_key_material, num_outputs, hmac_hash_fn):
# Sets temp_key = HMAC-HASH(chaining_key, input_key_material).
temp_key = hmac_hash_fn(chaining_key, input_key_material)
# Sets output1 = HMAC-HASH(temp_key, byte(0x01)).
output1 = hmac_hash_fn(temp_key, b'\x01')
# Sets output2 = HMAC-HASH(temp_key, output1 || byte(0x02)).
output2 = hmac_hash_fn(temp_key, output1 + b'\x02')
# If num_outputs == 2 then returns the pair (output1, output2).
if num_outputs == 2:
return output1, output2
# Sets output3 = HMAC-HASH(temp_key, output2 || byte(0x03)).
output3 = hmac_hash_fn(temp_key, output2 + b'\x03')
# Returns the triple (output1, output2, output3).
return output1, output2, output3

View File

@@ -0,0 +1,18 @@
import abc
class KeyPair(metaclass=abc.ABCMeta):
def __init__(self, private=None, public=None, public_bytes=None):
self.private = private
self.public = public
self.public_bytes = public_bytes
@classmethod
@abc.abstractmethod
def from_private_bytes(cls, private_bytes):
raise NotImplementedError
@classmethod
@abc.abstractmethod
def from_public_bytes(cls, public_bytes):
raise NotImplementedError

View File

@@ -0,0 +1,78 @@
from typing import List
from noise.constants import TOKEN_PSK
class Pattern(object):
"""
TODO document
"""
def __init__(self):
# As per specification, if both parties have pre-messages, the initiator is listed first. To reduce complexity,
# pre_messages shall be a list of two lists:
# the first for the initiator's pre-messages, the second for the responder
self.pre_messages = [
[],
[]
]
# List of lists of valid tokens, alternating between tokens for initiator and responder
self.tokens = []
self.name = ''
self.one_way = False
self.psk_count = 0
def has_pre_messages(self):
return any(map(lambda x: len(x) > 0, self.pre_messages))
def get_initiator_pre_messages(self) -> list:
return self.pre_messages[0].copy()
def get_responder_pre_messages(self) -> list:
return self.pre_messages[1].copy()
def apply_pattern_modifiers(self, modifiers: List[str]) -> None:
# Applies given pattern modifiers to self.tokens of the Pattern instance.
for modifier in modifiers:
if modifier.startswith('psk'):
try:
index = int(modifier.replace('psk', '', 1))
except ValueError:
raise ValueError('Improper psk modifier {}'.format(modifier))
if index // 2 > len(self.tokens):
raise ValueError('Modifier {} cannot be applied - pattern has not enough messages'.format(modifier))
# Add TOKEN_PSK in the correct place in the correct message
if index == 0: # if 0, insert at the beginning of first message
self.tokens[0].insert(0, TOKEN_PSK)
else: # if bigger than zero, append at the end of first, second etc.
self.tokens[index - 1].append(TOKEN_PSK)
self.psk_count += 1
elif modifier == 'fallback':
raise NotImplementedError # TODO implement
else:
raise ValueError('Unknown pattern modifier {}'.format(modifier))
def get_required_keypairs(self, initiator: bool) -> list:
required = []
if initiator:
if self.name[0] in ('K', 'X', 'I'):
required.append('s')
if self.one_way or self.name[1] == 'K':
required.append('rs')
else:
if self.name[0] == 'K':
required.append('rs')
if self.one_way or self.name[1] in ['K', 'X']:
required.append('s')
return required
class OneWayPattern(Pattern):
def __init__(self):
super(OneWayPattern, self).__init__()
self.one_way = True

View File

@@ -5,48 +5,34 @@ from typing import Tuple
from noise.exceptions import NoiseProtocolNameError, NoisePSKError, NoiseValidationError
from noise.state import HandshakeState
from .constants import MAX_PROTOCOL_NAME_LEN, Empty
from .functions import dh_map, cipher_map, hash_map, keypair_map, hmac_hash, hkdf
from .patterns import patterns_map
class NoiseProtocol(object):
"""
TODO: Document
"""
methods = {
'pattern': patterns_map,
'dh': dh_map,
'cipher': cipher_map,
'hash': hash_map,
'keypair': keypair_map
}
def __init__(self, protocol_name: bytes):
if not isinstance(protocol_name, bytes):
raise NoiseProtocolNameError('Protocol name has to be of type "bytes" not {}'.format(type(protocol_name)))
if len(protocol_name) > MAX_PROTOCOL_NAME_LEN:
raise NoiseProtocolNameError('Protocol name too long, has to be at most '
'{} chars long'.format(MAX_PROTOCOL_NAME_LEN))
def __init__(self, protocol_name: bytes, backend: 'NoiseBackend'):
self.name = protocol_name
mappings, pattern_modifiers = self._parse_protocol_name()
self.backend = backend
unpacked_name = UnpackedName.from_protocol_name(self.name)
mappings = self.backend.map_protocol_name_to_crypto(unpacked_name)
# A valid Pattern instance (see Section 7 of specification (rev 32))
self.pattern = mappings['pattern']()
self.pattern_modifiers = pattern_modifiers
self.pattern_modifiers = unpacked_name.pattern_modifiers
if self.pattern_modifiers:
self.pattern.apply_pattern_modifiers(pattern_modifiers)
self.pattern.apply_pattern_modifiers(self.pattern_modifiers)
# Handle PSK handshake options
self.psks = None
self.is_psk_handshake = any([modifier.startswith('psk') for modifier in self.pattern_modifiers])
self.dh_fn = mappings['dh']
self.dh_fn = mappings['dh']()
self.hash_fn = mappings['hash']()
self.cipher_fn = mappings['cipher']
self.hash_fn = mappings['hash']
self.keypair_fn = mappings['keypair']
self.hmac = partial(hmac_hash, algorithm=self.hash_fn.fn)
self.hkdf = partial(hkdf, hmac_hash_fn=self.hmac)
self.hmac = partial(backend.hmac, algorithm=self.hash_fn.fn)
self.hkdf = partial(backend.hkdf, hmac_hash_fn=self.hmac)
self.prologue = None
self.initiator = None
@@ -60,42 +46,6 @@ class NoiseProtocol(object):
self.keypairs = {'s': None, 'e': None, 'rs': None, 're': None}
def _parse_protocol_name(self) -> Tuple[dict, list]:
unpacked = self.name.decode().split('_')
if unpacked[0] != 'Noise':
raise NoiseProtocolNameError('Noise Protocol name shall begin with Noise! Provided: {}'.format(self.name))
# Extract pattern name and pattern modifiers
pattern = ''
modifiers_str = None
for i, char in enumerate(unpacked[1]):
if char.isupper():
pattern += char
else:
# End of pattern, now look for modifiers
modifiers_str = unpacked[1][i:] # Will be empty string if it exceeds string size
break
modifiers = modifiers_str.split('+') if modifiers_str else []
data = {'pattern': 'Pattern' + pattern,
'dh': unpacked[2],
'cipher': unpacked[3],
'hash': unpacked[4],
'keypair': unpacked[2],
'pattern_modifiers': modifiers}
mapped_data = {}
# Validate if we know everything that Noise Protocol is supposed to use and map appropriate functions
for key, map_dict in self.methods.items():
func = map_dict.get(data[key])
if not func:
raise NoiseProtocolNameError('Unknown {} in Noise Protocol name, given {}, known {}'.format(
key, data[key], " ".join(map_dict)))
mapped_data[key] = func
return mapped_data, modifiers
def handshake_done(self):
if self.pattern.one_way:
if self.initiator:
@@ -141,3 +91,46 @@ class NoiseProtocol(object):
kwargs[keypair] = value
self.handshake_state = HandshakeState.initialize(self, **kwargs)
self.symmetric_state = self.handshake_state.symmetric_state
class UnpackedName:
def __init__(self, pattern, dh, cipher, hash, keypair, pattern_modifiers):
self.pattern = pattern
self.dh = dh
self.cipher = cipher
self.hash = hash
self.keypair = keypair
self.pattern_modifiers = pattern_modifiers
@classmethod
def from_protocol_name(cls, name):
if not isinstance(name, bytes):
raise NoiseProtocolNameError('Protocol name has to be of type "bytes" not {}'.format(type(name)))
if len(name) > MAX_PROTOCOL_NAME_LEN:
raise NoiseProtocolNameError('Protocol name too long, has to be at most '
'{} chars long'.format(MAX_PROTOCOL_NAME_LEN))
unpacked = name.decode().split('_')
if unpacked[0] != 'Noise':
raise NoiseProtocolNameError('Noise Protocol name shall begin with Noise! Provided: {}'.format(name))
# Extract pattern name and pattern modifiers
pattern = ''
modifiers_str = None
for i, char in enumerate(unpacked[1]):
if char.isupper():
pattern += char
else:
# End of pattern, now look for modifiers
modifiers_str = unpacked[1][i:] # Will be empty string if it exceeds string size
break
modifiers = modifiers_str.split('+') if modifiers_str else []
return cls(
pattern=pattern,
dh=unpacked[2],
cipher=unpacked[3],
hash=unpacked[4],
keypair=unpacked[2],
pattern_modifiers=modifiers
)

View File

@@ -1,85 +1,9 @@
from typing import List
from .constants import TOKEN_E, TOKEN_S, TOKEN_EE, TOKEN_ES, TOKEN_SE, TOKEN_SS, TOKEN_PSK
class Pattern(object):
"""
TODO document
"""
def __init__(self):
# As per specification, if both parties have pre-messages, the initiator is listed first. To reduce complexity,
# pre_messages shall be a list of two lists:
# the first for the initiator's pre-messages, the second for the responder
self.pre_messages = [
[],
[]
]
# List of lists of valid tokens, alternating between tokens for initiator and responder
self.tokens = []
self.name = ''
self.one_way = False
self.psk_count = 0
def has_pre_messages(self):
return any(map(lambda x: len(x) > 0, self.pre_messages))
def get_initiator_pre_messages(self) -> list:
return self.pre_messages[0].copy()
def get_responder_pre_messages(self) -> list:
return self.pre_messages[1].copy()
def apply_pattern_modifiers(self, modifiers: List[str]) -> None:
# Applies given pattern modifiers to self.tokens of the Pattern instance.
for modifier in modifiers:
if modifier.startswith('psk'):
try:
index = int(modifier.replace('psk', '', 1))
except ValueError:
raise ValueError('Improper psk modifier {}'.format(modifier))
if index // 2 > len(self.tokens):
raise ValueError('Modifier {} cannot be applied - pattern has not enough messages'.format(modifier))
# Add TOKEN_PSK in the correct place in the correct message
if index == 0: # if 0, insert at the beginning of first message
self.tokens[0].insert(0, TOKEN_PSK)
else: # if bigger than zero, append at the end of first, second etc.
self.tokens[index - 1].append(TOKEN_PSK)
self.psk_count += 1
elif modifier == 'fallback':
raise NotImplementedError # TODO implement
else:
raise ValueError('Unknown pattern modifier {}'.format(modifier))
def get_required_keypairs(self, initiator: bool) -> list:
required = []
if initiator:
if self.name[0] in ('K', 'X', 'I'):
required.append('s')
if self.one_way or self.name[1] == 'K':
required.append('rs')
else:
if self.name[0] == 'K':
required.append('rs')
if self.one_way or self.name[1] in ['K', 'X']:
required.append('s')
return required
from noise.constants import TOKEN_S, TOKEN_E, TOKEN_ES, TOKEN_SS, TOKEN_EE, TOKEN_SE
from noise.functions.patterns import OneWayPattern, Pattern
# One-way patterns
class OneWayPattern(Pattern):
def __init__(self):
super(OneWayPattern, self).__init__()
self.one_way = True
class PatternN(OneWayPattern):
def __init__(self):
super(PatternN, self).__init__()
@@ -281,22 +205,3 @@ class PatternIX(Pattern):
[TOKEN_E, TOKEN_S],
[TOKEN_E, TOKEN_EE, TOKEN_SE, TOKEN_S, TOKEN_ES]
]
patterns_map = {
'PatternN': PatternN,
'PatternK': PatternK,
'PatternX': PatternX,
'PatternNN': PatternNN,
'PatternKN': PatternKN,
'PatternNK': PatternNK,
'PatternKK': PatternKK,
'PatternNX': PatternNX,
'PatternKX': PatternKX,
'PatternXN': PatternXN,
'PatternIN': PatternIN,
'PatternXK': PatternXK,
'PatternIK': PatternIK,
'PatternXX': PatternXX,
'PatternIX': PatternIX,
}

View File

@@ -1,3 +1,4 @@
from noise.backends.default import noise_backend
from noise.noise_protocol import NoiseProtocol
from noise.state import CipherState, SymmetricState
@@ -8,12 +9,14 @@ class TestRevision33Compatibility(object):
fn = None
noise_name = b"Noise_NN_25519_AESGCM_SHA3/256"
modified_backend = noise_backend
modified_backend.hashes['SHA3/256'] = FakeSHA3_256 # Add callable to hash functions mapping
modified_class = NoiseProtocol
modified_class.methods['hash']['SHA3/256'] = FakeSHA3_256 # Add callable to hash functions mapping
modified_class(noise_name)
modified_class(noise_name, modified_backend)
def test_cipher_state_set_nonce(self):
noise_protocol = NoiseProtocol(b"Noise_NN_25519_AESGCM_SHA256")
noise_protocol = NoiseProtocol(b"Noise_NN_25519_AESGCM_SHA256", backend=noise_backend)
cipher_state = CipherState(noise_protocol)
cipher_state.initialize_key(b'\x00'*32)
assert cipher_state.n == 0