1*05c007f2SDonald Chan#!/usr/bin/env python3 2*05c007f2SDonald Chan# SPDX-License-Identifier: BSD-2-Clause 3*05c007f2SDonald Chan# 4*05c007f2SDonald Chan# Copyright Amazon.com Inc. or its affiliates 5*05c007f2SDonald Chan# 6*05c007f2SDonald Chanimport typing 7*05c007f2SDonald Chan 8*05c007f2SDonald Chanimport boto3 9*05c007f2SDonald Chan 10*05c007f2SDonald Chanfrom cryptography.hazmat.primitives import hashes, serialization 11*05c007f2SDonald Chanfrom cryptography.hazmat.primitives.asymmetric import ( 12*05c007f2SDonald Chan AsymmetricSignatureContext, 13*05c007f2SDonald Chan utils as asym_utils, 14*05c007f2SDonald Chan) 15*05c007f2SDonald Chanfrom cryptography.hazmat.primitives.asymmetric.padding import ( 16*05c007f2SDonald Chan AsymmetricPadding, 17*05c007f2SDonald Chan PKCS1v15, 18*05c007f2SDonald Chan PSS, 19*05c007f2SDonald Chan) 20*05c007f2SDonald Chanfrom cryptography.hazmat.primitives.asymmetric.rsa import ( 21*05c007f2SDonald Chan RSAPrivateKey, 22*05c007f2SDonald Chan RSAPrivateNumbers, 23*05c007f2SDonald Chan RSAPublicKey, 24*05c007f2SDonald Chan) 25*05c007f2SDonald Chan 26*05c007f2SDonald Chan 27*05c007f2SDonald Chanclass _RSAPrivateKeyInKMS(RSAPrivateKey): 28*05c007f2SDonald Chan 29*05c007f2SDonald Chan def __init__(self, arn): 30*05c007f2SDonald Chan self.arn = arn 31*05c007f2SDonald Chan self.client = boto3.client('kms') 32*05c007f2SDonald Chan response = self.client.get_public_key(KeyId=self.arn) 33*05c007f2SDonald Chan 34*05c007f2SDonald Chan # Parse public key 35*05c007f2SDonald Chan self.public_key = serialization.load_der_public_key( 36*05c007f2SDonald Chan response['PublicKey']) 37*05c007f2SDonald Chan 38*05c007f2SDonald Chan @property 39*05c007f2SDonald Chan def key_size(self): 40*05c007f2SDonald Chan return self.public_key.key_size 41*05c007f2SDonald Chan 42*05c007f2SDonald Chan def public_key(self) -> RSAPublicKey: 43*05c007f2SDonald Chan return self.public_key 44*05c007f2SDonald Chan 45*05c007f2SDonald Chan def sign(self, data: bytes, padding: AsymmetricPadding, 46*05c007f2SDonald Chan algorithm: typing.Union[asym_utils.Prehashed, 47*05c007f2SDonald Chan hashes.HashAlgorithm] 48*05c007f2SDonald Chan ) -> bytes: 49*05c007f2SDonald Chan if isinstance(algorithm, asym_utils.Prehashed): 50*05c007f2SDonald Chan message_type = 'DIGEST' 51*05c007f2SDonald Chan else: 52*05c007f2SDonald Chan message_type = 'RAW' 53*05c007f2SDonald Chan 54*05c007f2SDonald Chan if isinstance(padding, PSS): 55*05c007f2SDonald Chan signing_alg = 'RSASSA_PSS_' 56*05c007f2SDonald Chan elif isinstance(padding, PKCS1v15): 57*05c007f2SDonald Chan signing_alg = 'RSASSA_PKCS1_V1_5_' 58*05c007f2SDonald Chan else: 59*05c007f2SDonald Chan raise TypeError("Unsupported padding") 60*05c007f2SDonald Chan 61*05c007f2SDonald Chan if (isinstance(algorithm._algorithm, hashes.SHA256) or 62*05c007f2SDonald Chan isinstance(algorithm, hashes.SHA256)): 63*05c007f2SDonald Chan signing_alg += 'SHA_256' 64*05c007f2SDonald Chan elif (isinstance(algorithm._algorithm, hashes.SHA384) or 65*05c007f2SDonald Chan isinstance(algorithm, hashes.SHA384)): 66*05c007f2SDonald Chan signing_alg += 'SHA_384' 67*05c007f2SDonald Chan elif (isinstance(algorithm._algorithm, hashes.SHA512) or 68*05c007f2SDonald Chan isinstance(algorithm, hashes.SHA512)): 69*05c007f2SDonald Chan signing_alg += 'SHA_512' 70*05c007f2SDonald Chan else: 71*05c007f2SDonald Chan raise TypeError("Unsupported hashing algorithm") 72*05c007f2SDonald Chan 73*05c007f2SDonald Chan response = self.client.sign( 74*05c007f2SDonald Chan KeyId=self.arn, Message=data, 75*05c007f2SDonald Chan MessageType=message_type, 76*05c007f2SDonald Chan SigningAlgorithm=signing_alg) 77*05c007f2SDonald Chan 78*05c007f2SDonald Chan return response['Signature'] 79*05c007f2SDonald Chan 80*05c007f2SDonald Chan # No need to implement these functions so we raise an exception 81*05c007f2SDonald Chan def signer( 82*05c007f2SDonald Chan self, padding: AsymmetricPadding, algorithm: hashes.HashAlgorithm 83*05c007f2SDonald Chan ) -> AsymmetricSignatureContext: 84*05c007f2SDonald Chan raise NotImplementedError 85*05c007f2SDonald Chan 86*05c007f2SDonald Chan def decrypt(self, ciphertext: bytes, padding: AsymmetricPadding) -> bytes: 87*05c007f2SDonald Chan raise NotImplementedError 88*05c007f2SDonald Chan 89*05c007f2SDonald Chan def private_numbers(self) -> RSAPrivateNumbers: 90*05c007f2SDonald Chan raise NotImplementedError 91*05c007f2SDonald Chan 92*05c007f2SDonald Chan def private_bytes( 93*05c007f2SDonald Chan self, 94*05c007f2SDonald Chan encoding: serialization.Encoding, 95*05c007f2SDonald Chan format: serialization.PrivateFormat, 96*05c007f2SDonald Chan encryption_algorithm: serialization.KeySerializationEncryption 97*05c007f2SDonald Chan ) -> bytes: 98*05c007f2SDonald Chan raise NotImplementedError 99