# Manta Python
# Manta Protocol Implementation for Python
# Copyright (C) 2018-2019 Alessandro ViganĂ²
# Cryptography library generates UserWarning with latest CFFI libraries
import warnings
warnings.filterwarnings("ignore", message="Global variable")
import base64
from decimal import Decimal
from enum import Enum
from typing import List, Set, TypeVar, Type, Optional, Union
import attr
import cattr
from certvalidator import CertificateValidator, ValidationContext
from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
import simplejson as json
from . import MANTA_VERSION
[docs]class Status(Enum):
"""
Status for ack messages
"""
NEW = "new" #: Created after accepting Merchant Order
INVALID = "invalid" #: Order invalid - Ex timeout. Additional info can be specified in Ack Memo field
PENDING = "pending" #: Created after receiving payment from wallet
CONFIRMING = (
"confirming" #: Paid received by Payment Processor but not yet confirmed
)
PAID = "paid" #: Created after blockchain confirmation
CANCELED = "canceled" #: Order has been canceled
T = TypeVar("T", bound="Message")
def drop_nonattrs(d: dict, type_: type) -> dict:
"""gets rid of all members of the dictionary that wouldn't fit in the given 'attrs' Type"""
attrs_attrs = getattr(type_, "__attrs_attrs__", None)
if attrs_attrs is None:
raise ValueError(f"type {type_} is not an attrs class")
attrs: Set[str] = {attr.name for attr in attrs_attrs}
# attrs: Set[str] = {attr.name for attr in attrs_attrs if attr.init is True}
return {key: val for key, val in d.items() if key in attrs}
def structure_ignore_extras(d: dict, Type: type):
return cattr.structure(drop_nonattrs(d, Type), Type)
@attr.s
class Message:
def unstructure(self):
cattr.register_unstructure_hook(Decimal, lambda d: str(d))
return cattr.unstructure(self)
def to_json(self) -> str:
return json.dumps(self.unstructure(), iterable_as_array=True)
@classmethod
# def from_json(cls, json_str: str):
def from_json(cls: Type[T], json_str: str) -> T:
d = json.loads(json_str)
cattr.register_structure_hook(Decimal, lambda d, t: Decimal(d))
if "version" not in d:
d["version"] = ""
return structure_ignore_extras(d, cls)
[docs]@attr.s(auto_attribs=True)
class MerchantOrderRequestMessage(Message):
"""
Merchant Order Request
Published by :term:`Merchant` on
:ref:`merchant_order_request/{application_id}`.
Args:
amount: amount in fiat currency
fiat_currency: fiat currency
session_id: random uuid base64 safe
crypto_currency: None for manta protocol. Specified for legacy
version: Manta protocol version
"""
amount: Decimal
session_id: str
fiat_currency: str
crypto_currency: Optional[str] = None
version: Optional[str] = MANTA_VERSION
[docs]@attr.s(auto_attribs=True)
class AckMessage(Message):
"""
Ack Message
Order progress message.
Published by the :term:`Payment Processor` on :ref:`acks/{session_id}`.
Args:
txid: progressive transaction ID generated by Merchant
status: ack type
url: url to be used for QR Code or NFC. Used in NEW
amount: amount in crypto currency. Used in NEW
transaction_hash: hash of transaction. After PENDING
memo: extra text field
version: Manta protocol version
"""
txid: str
status: Status
url: Optional[str] = None
amount: Optional[Decimal] = None
transaction_hash: Optional[str] = None
transaction_currency: Optional[str] = None
memo: Optional[str] = None
version: Optional[str] = MANTA_VERSION
[docs]@attr.s(auto_attribs=True)
class Destination(Message):
"""
Destination
Args:
amount: amount in crypto currency
destination_address: destination address for payment
crypto_currency: crypto_currency (ex. NANO, BTC...)mo
"""
amount: Decimal
destination_address: str
crypto_currency: str
[docs]@attr.s(auto_attribs=True)
class Merchant(Message):
"""
Merchant
Args:
name: merchant name
address: merchant address
"""
name: str
address: Optional[str] = None
[docs]@attr.s(auto_attribs=True)
class PaymentRequestMessage(Message):
"""
Payment Request
Generated after request on
:ref:`payment_requests/{session_id}/{crypto_currency}`.
Published in Envelope to Payment Processor on
:ref:`payment_requests/{session_id}`.
Args:
merchant: merchant data
amount: amount in fiat currency
fiat_currency: fiat currency
destinations: list of destination addresses
supported_cryptos: list of supported crypto currencies
"""
merchant: Merchant
amount: Decimal
fiat_currency: str
destinations: List[Destination]
supported_cryptos: Set[str]
def get_envelope(self, key: RSAPrivateKey):
json_message = self.to_json()
signature = base64.b64encode(
key.sign(json_message.encode("utf-8"), padding.PKCS1v15(), hashes.SHA256())
)
return PaymentRequestEnvelope(
message=json_message, signature=signature.decode("utf-8")
)
def get_destination(self, crypto: str) -> Optional[Destination]:
try:
return next(d for d in self.destinations if d.crypto_currency == crypto)
except StopIteration:
return None
[docs]@attr.s(auto_attribs=True)
class PaymentRequestEnvelope(Message):
"""
Payment Request Envelope
Envelope with :class:`.PaymentRequestMessage` and signature
Published by :term:`Payment Processor` on
:ref:`payment_requests/{session_id}`.
Args:
message: message as json string
signature: PKCS#1 v1.5 signature of the message field
version: Manta protocol version
"""
message: str
signature: str
version: Optional[str] = MANTA_VERSION
def unpack(self) -> PaymentRequestMessage:
pr = PaymentRequestMessage.from_json(self.message)
return pr
def verify(self, certificate: Union[str, x509.Certificate]) -> bool:
if isinstance(certificate, x509.Certificate):
cert = certificate
else:
if certificate.startswith("-----BEGIN CERTIFICATE-----"):
pem = certificate.encode()
else:
with open(certificate, "rb") as my_file:
pem = my_file.read()
cert = x509.load_pem_x509_certificate(pem, default_backend())
try:
cert.public_key().verify(
base64.b64decode(self.signature),
self.message.encode("utf-8"),
padding.PKCS1v15(),
hashes.SHA256(),
)
return True
except InvalidSignature:
return False
[docs]@attr.s(auto_attribs=True)
class PaymentMessage(Message):
"""
Payment Message
Published by :term:`Wallet` on :ref:`payments/{session_id}`
Args:
crypto_currency: crypto currency used for payment
transaction_hash: hash of transaction
version: Manta protocol version
"""
crypto_currency: str
transaction_hash: str
version: Optional[str] = MANTA_VERSION
def verify_chain(certificate: Union[str, x509.Certificate], ca: str):
if isinstance(certificate, x509.Certificate):
pem = certificate.public_bytes(serialization.Encoding.PEM)
else:
if certificate.startswith("-----BEGIN CERTIFICATE-----"):
pem = certificate.encode()
else:
with open(certificate, "rb") as my_file:
pem = my_file.read()
# cert = x509.load_pem_x509_certificate(pem, default_backend())
with open(ca, "rb") as my_file:
pem_ca = my_file.read()
# ca = x509.load_pem_x509_certificate(pem_ca, default_backend())
trust_roots = [pem_ca]
context = ValidationContext(trust_roots=trust_roots)
validator = CertificateValidator(pem, validation_context=context)
return validator.validate_usage({"digital_signature"})