Source code for manta.messages

# Manta Python
# Manta Protocol Implementation for Python
# Copyright (C) 2018-2019 Alessandro ViganĂ²

# Cryptography library generates UserWarning with latest CFFI libraries
import warnings

warnings.filterwarnings("ignore", message="Global variable")

import base64
from decimal import Decimal
from enum import Enum
from typing import List, Set, TypeVar, Type, Optional, Union

import attr
import cattr
from certvalidator import CertificateValidator, ValidationContext
from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
import simplejson as json

from . import MANTA_VERSION


[docs]class Status(Enum): """ Status for ack messages """ NEW = "new" #: Created after accepting Merchant Order INVALID = "invalid" #: Order invalid - Ex timeout. Additional info can be specified in Ack Memo field PENDING = "pending" #: Created after receiving payment from wallet CONFIRMING = ( "confirming" #: Paid received by Payment Processor but not yet confirmed ) PAID = "paid" #: Created after blockchain confirmation CANCELED = "canceled" #: Order has been canceled
T = TypeVar("T", bound="Message") def drop_nonattrs(d: dict, type_: type) -> dict: """gets rid of all members of the dictionary that wouldn't fit in the given 'attrs' Type""" attrs_attrs = getattr(type_, "__attrs_attrs__", None) if attrs_attrs is None: raise ValueError(f"type {type_} is not an attrs class") attrs: Set[str] = {attr.name for attr in attrs_attrs} # attrs: Set[str] = {attr.name for attr in attrs_attrs if attr.init is True} return {key: val for key, val in d.items() if key in attrs} def structure_ignore_extras(d: dict, Type: type): return cattr.structure(drop_nonattrs(d, Type), Type) @attr.s class Message: def unstructure(self): cattr.register_unstructure_hook(Decimal, lambda d: str(d)) return cattr.unstructure(self) def to_json(self) -> str: return json.dumps(self.unstructure(), iterable_as_array=True) @classmethod # def from_json(cls, json_str: str): def from_json(cls: Type[T], json_str: str) -> T: d = json.loads(json_str) cattr.register_structure_hook(Decimal, lambda d, t: Decimal(d)) if "version" not in d: d["version"] = "" return structure_ignore_extras(d, cls)
[docs]@attr.s(auto_attribs=True) class MerchantOrderRequestMessage(Message): """ Merchant Order Request Published by :term:`Merchant` on :ref:`merchant_order_request/{application_id}`. Args: amount: amount in fiat currency fiat_currency: fiat currency session_id: random uuid base64 safe crypto_currency: None for manta protocol. Specified for legacy version: Manta protocol version """ amount: Decimal session_id: str fiat_currency: str crypto_currency: Optional[str] = None version: Optional[str] = MANTA_VERSION
[docs]@attr.s(auto_attribs=True) class AckMessage(Message): """ Ack Message Order progress message. Published by the :term:`Payment Processor` on :ref:`acks/{session_id}`. Args: txid: progressive transaction ID generated by Merchant status: ack type url: url to be used for QR Code or NFC. Used in NEW amount: amount in crypto currency. Used in NEW transaction_hash: hash of transaction. After PENDING memo: extra text field version: Manta protocol version """ txid: str status: Status url: Optional[str] = None amount: Optional[Decimal] = None transaction_hash: Optional[str] = None transaction_currency: Optional[str] = None memo: Optional[str] = None version: Optional[str] = MANTA_VERSION
[docs]@attr.s(auto_attribs=True) class Destination(Message): """ Destination Args: amount: amount in crypto currency destination_address: destination address for payment crypto_currency: crypto_currency (ex. NANO, BTC...)mo """ amount: Decimal destination_address: str crypto_currency: str
[docs]@attr.s(auto_attribs=True) class Merchant(Message): """ Merchant Args: name: merchant name address: merchant address """ name: str address: Optional[str] = None
[docs]@attr.s(auto_attribs=True) class PaymentRequestMessage(Message): """ Payment Request Generated after request on :ref:`payment_requests/{session_id}/{crypto_currency}`. Published in Envelope to Payment Processor on :ref:`payment_requests/{session_id}`. Args: merchant: merchant data amount: amount in fiat currency fiat_currency: fiat currency destinations: list of destination addresses supported_cryptos: list of supported crypto currencies """ merchant: Merchant amount: Decimal fiat_currency: str destinations: List[Destination] supported_cryptos: Set[str] def get_envelope(self, key: RSAPrivateKey): json_message = self.to_json() signature = base64.b64encode( key.sign(json_message.encode("utf-8"), padding.PKCS1v15(), hashes.SHA256()) ) return PaymentRequestEnvelope( message=json_message, signature=signature.decode("utf-8") ) def get_destination(self, crypto: str) -> Optional[Destination]: try: return next(d for d in self.destinations if d.crypto_currency == crypto) except StopIteration: return None
[docs]@attr.s(auto_attribs=True) class PaymentRequestEnvelope(Message): """ Payment Request Envelope Envelope with :class:`.PaymentRequestMessage` and signature Published by :term:`Payment Processor` on :ref:`payment_requests/{session_id}`. Args: message: message as json string signature: PKCS#1 v1.5 signature of the message field version: Manta protocol version """ message: str signature: str version: Optional[str] = MANTA_VERSION def unpack(self) -> PaymentRequestMessage: pr = PaymentRequestMessage.from_json(self.message) return pr def verify(self, certificate: Union[str, x509.Certificate]) -> bool: if isinstance(certificate, x509.Certificate): cert = certificate else: if certificate.startswith("-----BEGIN CERTIFICATE-----"): pem = certificate.encode() else: with open(certificate, "rb") as my_file: pem = my_file.read() cert = x509.load_pem_x509_certificate(pem, default_backend()) try: cert.public_key().verify( base64.b64decode(self.signature), self.message.encode("utf-8"), padding.PKCS1v15(), hashes.SHA256(), ) return True except InvalidSignature: return False
[docs]@attr.s(auto_attribs=True) class PaymentMessage(Message): """ Payment Message Published by :term:`Wallet` on :ref:`payments/{session_id}` Args: crypto_currency: crypto currency used for payment transaction_hash: hash of transaction version: Manta protocol version """ crypto_currency: str transaction_hash: str version: Optional[str] = MANTA_VERSION
def verify_chain(certificate: Union[str, x509.Certificate], ca: str): if isinstance(certificate, x509.Certificate): pem = certificate.public_bytes(serialization.Encoding.PEM) else: if certificate.startswith("-----BEGIN CERTIFICATE-----"): pem = certificate.encode() else: with open(certificate, "rb") as my_file: pem = my_file.read() # cert = x509.load_pem_x509_certificate(pem, default_backend()) with open(ca, "rb") as my_file: pem_ca = my_file.read() # ca = x509.load_pem_x509_certificate(pem_ca, default_backend()) trust_roots = [pem_ca] context = ValidationContext(trust_roots=trust_roots) validator = CertificateValidator(pem, validation_context=context) return validator.validate_usage({"digital_signature"})