Skip to content
Commits on Source (3)
[bumpversion]
current_version = 0.2.4
current_version = 0.2.5
commit = True
tag = True
tag_name = {new_version}
......
......@@ -10,7 +10,7 @@ from setuptools import setup, find_packages
from setuptools.command.develop import develop
__version__ = '0.2.4'
__version__ = '0.2.5'
here = path.abspath(path.dirname(__file__))
......
......@@ -18,7 +18,7 @@ __author__ = 'Guy K. Kloss <guy@mysinglesource.io>'
import base64
from collections import OrderedDict
import nacl.utils
from typing import Union, Callable
from typing import Optional, Union, Callable
from sspyjose import Jose
from sspyjose import cryptography_aead_wrap as cryptography_aead
......@@ -49,30 +49,35 @@ class Jwe(Jose):
- C20P - symmetric, 256-bit key size, 96-bit nonce
[https://tools.ietf.org/id/draft-amringer-jose-chacha-00.txt]
(RFC 7539 and RFC 8439).
- A256GCM - symmetric, 256-bit key size, 96-bit nonce, no key wrapping
(basic RFC 7516).
- X25519 - public key-based, 256-bit key size (RFC 8037).
- `unsecured` - non-standard, unencrypted, 'symmetric' JWE
(contains an `{"alg": "unsecured"}` header).
"""
_header = None # type: dict
_header = None # type: Optional[dict]
"""Protected header."""
_key = None # type: bytes
_key = None # type: Optional[bytes]
"""Encrypted key."""
_nonce = None # type: bytes
_nonce = None # type: Optional[bytes]
"""Nonce/initialisation vector."""
_message = None # type: dict
_message = None # type: Optional[dict]
"""Plain text message."""
_ciphertext = None # type: bytes
_ciphertext = None # type: Optional[bytes]
"""Ciphertext."""
_tag = None # type: bytes
_tag = None # type: Optional[bytes]
"""Authentication tag/MAC."""
_SERIALISATION_PARTS = ('header', 'key', 'nonce', 'ciphertext', 'tag')
"""Parts of the JWE serialisation specification."""
_jwk = None # type: Jwk
_jwk = None # type: Optional[Jwk]
"""JWK to use for en/decryption."""
_message_bytes = None # type: bytes
_message_bytes = None # type: Optional[bytes]
"""Retains the byte serialisation of the message content in verbatim."""
def __init__(self, *,
from_compact: Union[bytes, str] = None, jwk: Jwk = None):
from_compact: Union[bytes, str, None] = None,
jwk: Optional[Jwk] = None):
"""
Constructor.
......@@ -89,8 +94,10 @@ class Jwe(Jose):
@classmethod
def get_instance(
cls, *,
enc: str = None, alg: str = None,
from_compact: Union[bytes, str] = None, jwk: Jwk = None) -> 'Jwe':
enc: Optional[str] = None,
alg: Optional[str] = None,
from_compact: Union[bytes, str, None] = None,
jwk: Optional[Jwk] = None) -> 'Jwe':
"""
Get an instance of a specific JWE object by algorithm via factory.
......@@ -116,7 +123,7 @@ class Jwe(Jose):
enc = content.get('enc')
alg = content.get('alg')
# Nothing, so go with the default.
if enc is None:
if enc is None and alg != 'unsecured':
enc = Jose.DEFAULT_ENC
alg = alg or 'dir'
klass = _CLASS_MAPPING.get((enc, alg))
......@@ -288,11 +295,11 @@ class SymmetricJwe(Jwe):
Symmetric key encryption.
"""
_DEFAULT_HEADER = None # type: dict
_KEYBYTES = 0
_NONCEBYTES = 0
_aead_encrypt_function = None # type: Callable
_aead_decrypt_function = None # type: Callable
_DEFAULT_HEADER = None # type: Optional[dict]
_KEYBYTES = 0 # type: Optional[int]
_NONCEBYTES = 0 # type: Optional[int]
_aead_encrypt_function = None # type: Optional[Callable]
_aead_decrypt_function = None # type: Optional[Callable]
def encrypt(self) -> (bytes, bytes):
"""
......@@ -303,22 +310,39 @@ class SymmetricJwe(Jwe):
plaintext = self.get_message_bytes()
self._header = self._DEFAULT_HEADER.copy()
header_bytes = utils.dict_to_base64(self._header).encode('utf-8')
self._nonce = nacl.utils.random(nacl_aead.NONCEBYTES)
result = self.__class__._aead_encrypt_function(
plaintext, header_bytes, self._nonce, self._jwk.k)
if self._header['alg'] != 'unsecured':
self._nonce = nacl.utils.random(nacl_aead.NONCEBYTES)
else:
result = plaintext, None
if self.__class__._aead_encrypt_function:
result = self.__class__._aead_encrypt_function(
plaintext, header_bytes, self._nonce, self._jwk.k)
self._ciphertext, self._tag = result
return result
def decrypt(self) -> bytes:
def decrypt(self, *, allow_unsecured: bool = False) -> bytes:
"""
Decrypts the JWE ciphertext and stores the resulting message.
:param allow_unsecured: Allows explicitly for access to non-standard,
"unsecured" JWEs (default: forbidden).
:return: Decrypted message.
"""
header_bytes = utils.dict_to_base64(self._header).encode('utf-8')
result = self.__class__._aead_decrypt_function(
self._ciphertext, self._tag, header_bytes, self._nonce,
self._jwk.k)
result = None
if self.__class__._aead_decrypt_function:
result = self.__class__._aead_decrypt_function(
self._ciphertext, self._tag, header_bytes, self._nonce,
self._jwk.k)
elif self._header['alg'] == 'unsecured':
if allow_unsecured is not True:
raise RuntimeError('Access to unsecured (non-standard) JWE is'
' forbidden unless explicitly allowed.')
if self._nonce or self._tag or self._key:
raise RuntimeError('Unsecured (non-standard) JWEs must not'
' contain a key, tag or nonce.')
result = self._ciphertext
self._message_bytes = result
self._message = utils.json_to_dict(result)
return self._message
......@@ -353,6 +377,18 @@ class AES256GCMJwe(SymmetricJwe):
_aead_decrypt_function = cryptography_aead.aes256gcm_decrypt
class UnsecuredJwe(SymmetricJwe):
"""
Non-standard JWE object that is unsecured (not encrypted).
"""
_DEFAULT_HEADER = OrderedDict([('alg', 'unsecured')])
_KEYBYTES = None
_NONCEBYTES = None
_aead_encrypt_function = None
_aead_decrypt_function = None
class X25519Jwe(Jwe):
"""
X25519 public key encryption base class.
......@@ -364,11 +400,11 @@ class X25519Jwe(Jwe):
- RFC 7539
"""
_DEFAULT_HEADER = None # type: dict
_KEYBYTES = 0
_NONCEBYTES = 0
_aead_encrypt_function = None # type: Callable
_aead_decrypt_function = None # type: Callable
_DEFAULT_HEADER = None # type: Optional[dict]
_KEYBYTES = 0 # type: Optional[int]
_NONCEBYTES = 0 # type: Optional[int]
_aead_encrypt_function = None # type: Optional[Callable]
_aead_decrypt_function = None # type: Optional[Callable]
def ecdh_es_key(self, full_key: X25519Jwk, public_key: X25519Jwk,
*, apu: bytes = b'', apv: bytes = b'') -> bytes:
......@@ -509,6 +545,9 @@ _CLASS_MAPPING = {
(AES256GCMJwe._DEFAULT_HEADER['enc'],
AES256GCMJwe._DEFAULT_HEADER['alg']):
AES256GCMJwe,
(UnsecuredJwe._DEFAULT_HEADER.get('enc'),
UnsecuredJwe._DEFAULT_HEADER['alg']):
UnsecuredJwe,
(X25519ChaCha20Poly1305Jwe._DEFAULT_HEADER['enc'],
X25519ChaCha20Poly1305Jwe._DEFAULT_HEADER['alg']):
X25519ChaCha20Poly1305Jwe,
......
......@@ -18,7 +18,7 @@ __author__ = 'Guy K. Kloss <guy@mysinglesource.io>'
from collections import OrderedDict
import copy
import nacl.bindings
import typing
from typing import Iterable, Union, Optional
import nacl.public
import nacl.signing
......@@ -44,9 +44,9 @@ class Jwk(Jose):
serialisation.
"""
_JSON_PRIVATE_KEY_ELEMENTS = None # type: typing.Iterable
_JSON_PRIVATE_KEY_ELEMENTS = None # type: Optional[Iterable]
"""Elements of the key containing secrets."""
_JSON_BINARY_ELEMENTS = None # type: typing.Iterable
_JSON_BINARY_ELEMENTS = None # type: Optional[Iterable]
"""Elements of the key containing binary data."""
def __init__(self, kty: str):
......@@ -61,12 +61,12 @@ class Jwk(Jose):
@classmethod
def get_instance(
cls, *,
alg: str = None,
crv: str = None,
from_json: str = None,
from_dict: dict = None,
alg: Optional[str] = None,
crv: Optional[str] = None,
from_json: Optional[str] = None,
from_dict: Optional[dict] = None,
generate: bool = False,
from_secret: typing.Union[str, bytes] = None) -> 'Jwk':
from_secret: Union[str, bytes, None] = None) -> 'Jwk':
"""
Get an instance of a specific JWK object by algorithm via factory.
......@@ -176,14 +176,14 @@ class Ed25519Jwk(Jwk):
_JSON_PRIVATE_KEY_ELEMENTS = ('d',)
_JSON_BINARY_ELEMENTS = ('x', 'd')
_DEFAULT_HEADER = OrderedDict([('crv', 'Ed25519')])
_signing_key = None # type: nacl.signining.SigningKey
_signing_key = None # type: Optional[nacl.signining.SigningKey]
"""PyNaCl signing key (private seed)."""
_verify_key = None # type: nacl.signining.VerifyKey
_verify_key = None # type: Optional[nacl.signining.VerifyKey]
"""PyNaCl verify key (public)."""
def __init__(self, *,
from_json: str = None,
from_dict: dict = None,
from_json: Optional[str] = None,
from_dict: Optional[dict] = None,
generate: bool = False):
"""
Constructor.
......@@ -254,7 +254,7 @@ class Ed25519Jwk(Jwk):
return self._data['x']
@x.setter
def x(self, value: typing.Union[bytes, str]):
def x(self, value: Union[bytes, str]):
"""
Set public key.
......@@ -279,7 +279,7 @@ class Ed25519Jwk(Jwk):
return self._data.get('d')
@d.setter
def d(self, value: typing.Union[bytes, str]):
def d(self, value: Union[bytes, str]):
"""
Set signing key seed.
......@@ -324,12 +324,12 @@ class X25519Jwk(Jwk):
_JSON_PRIVATE_KEY_ELEMENTS = ('d',)
_JSON_BINARY_ELEMENTS = ('x', 'd')
_DEFAULT_HEADER = OrderedDict([('crv', 'X25519')])
_dh_key = None # type: nacl.public.PrivateKey
_dh_key = None # type: Optional[nacl.public.PrivateKey]
"""PyNaCl Diffie-Hellman key (private)."""
def __init__(self, *,
from_json: str = None,
from_dict: dict = None,
from_json: Optional[str] = None,
from_dict: Optional[dict] = None,
generate: bool = False):
"""
Constructor.
......@@ -405,7 +405,7 @@ class X25519Jwk(Jwk):
return self._data['d']
@d.setter
def d(self, value: typing.Union[bytes, str]):
def d(self, value: Union[bytes, str]):
"""
Set private key.
......@@ -444,13 +444,13 @@ class SymmetricJwk(Jwk):
_JSON_PRIVATE_KEY_ELEMENTS = ('k',)
_JSON_BINARY_ELEMENTS = ('k',)
_DEFAULT_HEADER = None # type: dict
_KEY_SIZE = None # type: int
_DEFAULT_HEADER = None # type: Optional[dict]
_KEY_SIZE = None # type: Optional[int]
def __init__(self, *,
from_json: str = None,
from_dict: dict = None,
from_secret: typing.Union[str, bytes] = None,
from_json: Optional[str] = None,
from_dict: Optional[dict] = None,
from_secret: Union[str, bytes, None] = None,
generate: bool = False):
"""
Constructor.
......@@ -508,7 +508,7 @@ class SymmetricJwk(Jwk):
return self._data['k']
@k.setter
def k(self, value: typing.Union[bytes, str]):
def k(self, value: Union[bytes, str]):
"""
Set encryption key.
......
......@@ -18,7 +18,7 @@ __author__ = 'Guy K. Kloss <guy@mysinglesource.io>'
import base64
from collections import OrderedDict
import json
from typing import Union, List
from typing import Union, List, Optional
import nacl.hash
from sspyjose import Jose
......@@ -35,13 +35,13 @@ class Jws(Jose):
- Ed25519 - public key-based, 256-bit key size (RFC 8037).
"""
_header = None # type: dict
_header = None # type: Optional[dict]
"""Protected (signed) header."""
_payload = None # type: dict
_payload = None # type: Optional[dict]
"""Payload (signed)."""
_signature = None # type: bytes
_signature = None # type: Optional[bytes]
"""Signature."""
_jwk = None # type: Union[bytes, list[bytes]]
_jwk = None # type: Union[bytes, list[bytes], None]
"""Signing key."""
_SERIALISATION_PARTS = ('header', 'payload', 'signature')
"""Parts of the JWS compact serialisation specification."""
......@@ -49,8 +49,8 @@ class Jws(Jose):
"""Default header to use for this JWS type."""
def __init__(self, *,
from_compact: Union[bytes, str] = None,
from_json: Union[bytes, str] = None,
from_compact: Union[bytes, str, None] = None,
from_json: Union[bytes, str, None] = None,
jwk: Jwk = None):
"""
Constructor.
......@@ -74,10 +74,10 @@ class Jws(Jose):
@classmethod
def get_instance(
cls, *,
alg: str = None,
from_compact: Union[bytes, str] = None,
from_json: Union[bytes, str] = None,
jwk: Jwk = None) -> 'Jws':
alg: Optional[str] = None,
from_compact: Union[bytes, str, None] = None,
from_json: Union[bytes, str, None] = None,
jwk: Optional[Jwk] = None) -> 'Jws':
"""
Get an instance of a specific JWS object by algorithm via factory.
......
......@@ -19,7 +19,7 @@ import base64
from collections import OrderedDict
import json
import re
from typing import Iterable, Union
from typing import Iterable, Union, Optional
_NUMBER_TEST = re.compile(r'^[0-9]+$')
......@@ -56,7 +56,7 @@ def _transform_simple(data):
return data
def pack_bytes(data: dict, to_remove: Iterable[str] = None) -> dict:
def pack_bytes(data: dict, to_remove: Optional[Iterable[str]] = None) -> dict:
"""
Convert `bytes` in a dictionary to a copy with base64 URL strings.
......@@ -87,7 +87,7 @@ def pack_bytes(data: dict, to_remove: Iterable[str] = None) -> dict:
return result
def unpack_bytes(data: dict, binary: Iterable[str] = None) -> dict:
def unpack_bytes(data: dict, binary: Optional[Iterable[str]] = None) -> dict:
"""
Convert base64 URL encoded strings in a JOSE dictionary to `bytes`.
......@@ -106,7 +106,7 @@ def unpack_bytes(data: dict, binary: Iterable[str] = None) -> dict:
return data
def dict_to_json(data: dict, to_remove: Iterable[str] = None) -> str:
def dict_to_json(data: dict, to_remove: Optional[Iterable[str]] = None) -> str:
"""
Convert a JOSE representation of a JOSE object to JSON representation.
......@@ -124,7 +124,7 @@ def dict_to_json(data: dict, to_remove: Iterable[str] = None) -> str:
def json_to_dict(data: Union[str, bytes],
binary: Iterable[str] = None) -> dict:
binary: Optional[Iterable[str]] = None) -> dict:
"""
Convert a JSON representation of a JOSE object to a Python dict.
......@@ -138,7 +138,8 @@ def json_to_dict(data: Union[str, bytes],
binary)
def dict_to_base64(data: dict, to_remove: Iterable[str] = None) -> str:
def dict_to_base64(data: dict,
to_remove: Optional[Iterable[str]] = None) -> str:
"""
Convert a JOSE dict to a URL-safe base64 representation of a JSON string.
......@@ -156,7 +157,7 @@ def dict_to_base64(data: dict, to_remove: Iterable[str] = None) -> str:
return bytes_to_string(json_form.encode('utf-8'))
def base64_to_dict(data: str, binary: Iterable[str] = None) -> dict:
def base64_to_dict(data: str, binary: Optional[Iterable[str]] = None) -> dict:
"""
Convert a URL base64 representation of a JSON string to a `dict`.
......
......@@ -148,6 +148,21 @@ JWE_AES256GCM_DECODED = {
}
JWE_AES256GCM_MESSAGE = {'advice': "Don't panic!"}
JWE_UNSECURED = (
'eyJhbGciOiJ1bnNlY3VyZWQifQ.'
'.'
'.'
'eyJhZHZpY2UiOiJEb24ndCBwYW5pYyEifQ.'
'')
JWE_UNSECURED_DECODED = {
'header': {'alg': 'unsecured'},
'key': None,
'nonce': None,
'ciphertext': b'{"advice":"Don\'t panic!"}',
'tag': None
}
JWE_UNSECURED_MESSAGE = {'advice': "Don't panic!"}
JWE_X25519CHACHA20POLY1305 = (
'eyJhbGciOiJFQ0RILUVTIiwiZW5jIjoiQzIwUCIsImVwayI6eyJrdHkiOiJPS1AiLCJjcn'
'YiOiJYMjU1MTkiLCJ4IjoiVTFHQ29PdjhOMGxlUEVwWlZMMGVXRU94cmlxV19pa0FuWmRE'
......
......@@ -28,6 +28,7 @@ from sspyjose import Jose
from sspyjose.jwe import (Jwe,
ChaCha20Poly1305Jwe,
AES256GCMJwe,
UnsecuredJwe,
X25519ChaCha20Poly1305Jwe,
X25519AES256GCMJwe)
from sspyjose.jwk import (Jwk,
......@@ -42,6 +43,9 @@ from tests.data import (JWE_CHACHA20POLY1305,
JWE_AES256GCM,
JWE_AES256GCM_DECODED,
JWE_AES256GCM_MESSAGE,
JWE_UNSECURED,
JWE_UNSECURED_DECODED,
JWE_UNSECURED_MESSAGE,
JWE_X25519CHACHA20POLY1305,
JWE_X25519CHACHA20POLY1305_DECODED,
JWE_X25519CHACHA20POLY1305_MESSAGE,
......@@ -476,6 +480,92 @@ class AES256GCMJweTest(unittest.TestCase):
self.assertDictEqual(result, {})
class UnsecuredJweTest(unittest.TestCase):
"""Testing the UnsecuredJwe class."""
my_jwe = None
def setUp(self): # noqa: D102
self.my_jwe = UnsecuredJwe()
def tearDown(self): # noqa: D102
pass
def test_vanilla_constructor(self):
"""Make a vanilla/empty JWE."""
jwe = UnsecuredJwe()
self.assertEqual(jwe.header, None)
self.assertEqual(jwe.key, None)
self.assertEqual(jwe.nonce, None)
self.assertEqual(jwe.ciphertext, None)
self.assertEqual(jwe.tag, None)
self.assertEqual(jwe.message, None)
self.assertEqual(jwe.jwk, None)
self.assertEqual(jwe._message_bytes, None)
def test_constructor_via_factory(self):
"""Make an empty JWE via the factory."""
jwe = Jwe.get_instance(alg='unsecured')
self.assertIsInstance(jwe, UnsecuredJwe)
self.assertEqual(jwe._DEFAULT_HEADER['alg'], 'unsecured')
self.assertNotIn('enc', jwe._DEFAULT_HEADER)
self.assertEqual(jwe.header, None)
self.assertEqual(jwe.key, None)
self.assertEqual(jwe.nonce, None)
self.assertEqual(jwe.ciphertext, None)
self.assertEqual(jwe.tag, None)
self.assertEqual(jwe.message, None)
self.assertEqual(jwe.jwk, None)
self.assertEqual(jwe._message_bytes, None)
def test_encrypt(self):
"""(Pseudo-) encrypt a JWE message."""
self.my_jwe.message = JWE_UNSECURED_MESSAGE
result = self.my_jwe.encrypt()
self.assertIs(self.my_jwe.nonce, None)
self.assertEqual(result[0], b'{"advice":"Don\'t panic!"}')
self.assertEqual(result[1], None)
self.assertEqual(self.my_jwe.ciphertext,
JWE_UNSECURED_DECODED['ciphertext'])
self.assertEqual(self.my_jwe._message_bytes,
JWE_UNSECURED_DECODED['ciphertext'])
def test_decrypt(self):
"""(Pseudo-) decrypt a JWE message."""
self.my_jwe.load_compact(JWE_UNSECURED)
result = self.my_jwe.decrypt(allow_unsecured=True)
self.assertDictEqual(result, JWE_UNSECURED_MESSAGE)
self.assertEqual(self.my_jwe.message, JWE_UNSECURED_MESSAGE)
self.assertEqual(self.my_jwe._message_bytes,
b'{"advice":"Don\'t panic!"}')
def test_decrypt_forbidden(self):
"""Disallow (pseudo-) decrypt a JWE message."""
self.my_jwe.load_compact(JWE_UNSECURED)
self.assertRaises(RuntimeError, self.my_jwe.decrypt)
def test_round_trips(self):
"""JWE en-/decrypt round trips."""
for text in TEXT_CONTENT:
message = {'text': text}
encrypter = UnsecuredJwe()
encrypter.message = message
encrypter.encrypt()
jwe_cipher = encrypter.serialise()
decrypter = UnsecuredJwe()
decrypter.load_compact(jwe_cipher)
recovered = decrypter.decrypt(allow_unsecured=True)
self.assertDictEqual(recovered, message)
def test_decrypt_fails(self):
"""Failed integrity while decrypting a JWE message."""
self.my_jwe.load_compact(JWE_UNSECURED)
self.my_jwe.tag = b'\x42'
self.assertRaises(RuntimeError, self.my_jwe.decrypt,
allow_unsecured=True)
class X25519ChaCha20Poly1305JweTest(unittest.TestCase):
"""Testing the X25519ChaCha20Poly1305Jwe class."""
......
......@@ -15,6 +15,8 @@
__author__ = 'Guy K. Kloss <guy@mysinglesource.io>'
from typing import Union
_counter = 0
......@@ -44,7 +46,7 @@ def random_mocker(increment: int = 1):
return mocker
def deep_sort(obj):
def deep_sort(obj: Union[dict, list]):
"""Recursively sort list or dict nested lists."""
if isinstance(obj, dict):
_sorted = {}
......