# Manta Python
# Manta Protocol Implementation for Python
# Copyright (C) 2018-2019 Alessandro ViganĂ²
"""
Library with a basic implementation of a Manta :term:`POS`.
"""
from __future__ import annotations
import asyncio
import base64
from decimal import Decimal
import logging
import uuid
from typing import List, Dict, Optional
import paho.mqtt.client as mqtt
from .base import MantaComponent
from .messages import MerchantOrderRequestMessage, AckMessage, Status
logger = logging.getLogger(__name__)
def generate_session_id() -> str:
return base64.b64encode(uuid.uuid4().bytes, b"-_").decode("utf-8")
# The following is more secure
# return base64.b64encode(M2Crypto.m2.rand_bytes(num_bytes))
def wrap_callback(f):
def wrapper(self: Store, *args):
self.loop.call_soon_threadsafe(f, self, *args)
return wrapper
[docs]class Store(MantaComponent):
"""
Implements a Manta :term:`POS`. This class needs an *asyncio* loop
to run correctly as some of its features are implemented as
*coroutines*.
Args:
device_id: Device unique identifier (also called :term:`application_id`)
associated with the :term:`POS`
host: Hostname of the Manta broker
client_options: A Dict of options to be passed to MQTT Client (like
username, password)
port: port of the Manta broker
Attributes:
acks: queue of :class:`~.messages.AckMessage` instances
device_id: Device unique identifier (also called
:term:`application_id`) associated with the :term:`POS`
loop: the *asyncio* loop that manages the asynchronous parts of this
object
session_id: :term:`session_id` of the ongoing session, if any
"""
loop: asyncio.AbstractEventLoop
connected: asyncio.Event
device_id: str
session_id: Optional[str] = None
acks: asyncio.Queue
first_connect = False
subscriptions: List[str] = []
def __init__(self, device_id: str, host: str = "localhost",
client_options: Dict = None, port: int = 1883):
client_options = {} if client_options is None else client_options
self.device_id = device_id
self.host = host
self.mqtt_client = mqtt.Client(**client_options)
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_message = self.on_message
self.mqtt_client.on_disconnect = self.on_disconnect
try:
self.loop = asyncio.get_event_loop()
except RuntimeError:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.acks = asyncio.Queue(loop=self.loop)
self.connected = asyncio.Event(loop=self.loop)
self.port = port
[docs] def close(self):
"""Disconnect and stop :term:`MQTT` client's processing loop."""
self.mqtt_client.disconnect()
self.mqtt_client.loop_stop()
# noinspection PyUnusedLocal
@wrap_callback
def on_disconnect(self, client, userdata, rc):
self.connected.clear()
# noinspection PyUnusedLocal
@wrap_callback
def on_connect(self, client, userdata, flags, rc):
logger.info("Connected")
self.connected.set()
# noinspection PyUnusedLocal
@wrap_callback
def on_message(self, client: mqtt.Client, userdata, msg):
logger.info("Got {} on {}".format(msg.payload, msg.topic))
tokens = msg.topic.split('/')
if tokens[0] == 'acks':
session_id = tokens[1]
logger.info("Got ack message")
ack = AckMessage.from_json(msg.payload)
self.acks.put_nowait(ack)
[docs] def subscribe(self, topic: str):
"""
Subscribe to the given :term:`MQTT` topic.
Args:
topic: string containing the topic name.
"""
self.mqtt_client.subscribe(topic)
self.subscriptions.append(topic)
[docs] def clean(self):
"""
Clean the :class:`~.messages.AckMessage` queue and unsubscribe
from any active :term:`MQTT` subscriptions.
"""
self.acks = asyncio.Queue()
if len(self.subscriptions) > 0:
self.mqtt_client.unsubscribe(self.subscriptions)
self.subscriptions.clear()
[docs] async def connect(self):
"""
Connect to the :term:`MQTT` broker and wait for the connection
confirmation.
This is a coroutine.
"""
if not self.first_connect:
self.mqtt_client.connect(self.host, port=self.port)
self.mqtt_client.loop_start()
self.first_connect = True
await self.connected.wait()
[docs] async def merchant_order_request(self, amount: Decimal, fiat: str,
crypto: str = None) -> AckMessage:
"""
Create a new Merchant Order and publish it to the
:ref:`merchant_order_request/{application_id}` topic. Raises an
exception if an :class:`~.messages.AckMessage` isn't received
in less than 3 seconds.
Args:
amount: Fiat Amount requested
fiat: Fiat Currency requested (ex. 'EUR')
crypto: Crypto Currency requested (ex. 'NANO')
Returns:
ack message with status 'NEW' if confirmed by Payment Processor or
Timeout Exception
This is a coroutine.
"""
await self.connect()
self.clean()
self.session_id = generate_session_id()
request = MerchantOrderRequestMessage(
amount=amount,
session_id=self.session_id,
fiat_currency=fiat,
crypto_currency=crypto
)
self.subscribe("acks/{}".format(self.session_id))
self.mqtt_client.publish("merchant_order_request/{}".format(self.device_id),
request.to_json())
logger.info("Publishing merchant_order_request for session {}".format(self.session_id))
result: AckMessage = await asyncio.wait_for(self.acks.get(), 3)
if result.status != Status.NEW:
raise Exception("Invalid ack")
return result