2021-12-21 16:32:30 -03:00
|
|
|
import json
|
2021-07-20 11:00:49 -03:00
|
|
|
import logging
|
2021-12-21 16:32:30 -03:00
|
|
|
from datetime import datetime
|
2021-07-20 11:00:49 -03:00
|
|
|
|
|
|
|
from flask import current_app
|
|
|
|
|
|
|
|
import constants
|
2021-12-21 16:32:30 -03:00
|
|
|
import order_helpers
|
2021-07-20 11:00:49 -03:00
|
|
|
from database import db
|
2021-12-21 16:32:30 -03:00
|
|
|
from models import Order, TxRetry
|
|
|
|
from regions import region_code_to_number_list
|
2021-07-20 11:00:49 -03:00
|
|
|
from schemas import order_schema
|
|
|
|
|
|
|
|
|
2021-12-21 16:31:41 -03:00
|
|
|
def assign_tx_seq_num(order):
|
|
|
|
"""Assign Tx sequence number to order"""
|
|
|
|
last_tx_order = Order.query.order_by(Order.tx_seq_num.desc()).first()
|
2021-07-20 11:00:49 -03:00
|
|
|
|
2021-12-21 16:31:41 -03:00
|
|
|
if last_tx_order.tx_seq_num:
|
|
|
|
order.tx_seq_num = last_tx_order.tx_seq_num + 1
|
|
|
|
else:
|
|
|
|
order.tx_seq_num = 1
|
|
|
|
db.session.commit()
|
2021-07-20 11:00:49 -03:00
|
|
|
|
|
|
|
|
2021-12-21 16:31:41 -03:00
|
|
|
def redis():
|
|
|
|
return current_app.config.get("REDIS_INSTANCE")
|
|
|
|
|
2021-07-20 11:00:49 -03:00
|
|
|
|
2021-12-21 16:32:30 -03:00
|
|
|
def publish_to_sse_server(order, retransmit_info=None):
|
|
|
|
msg = order_schema.dump(order)
|
|
|
|
# If it's a retransmission, take the regions list from the tx_retries table
|
|
|
|
# instead of the orders table.
|
|
|
|
if retransmit_info:
|
|
|
|
msg['regions'] = region_code_to_number_list(
|
|
|
|
retransmit_info.region_code)
|
|
|
|
msg = json.dumps(msg)
|
|
|
|
|
2021-12-21 16:31:41 -03:00
|
|
|
redis().publish(channel=constants.SUB_CHANNELS[0], message=msg)
|
|
|
|
return
|
2021-07-20 11:00:49 -03:00
|
|
|
|
|
|
|
|
2021-12-21 16:31:41 -03:00
|
|
|
def tx_start():
|
|
|
|
transmitting_orders = Order.query.filter_by(
|
|
|
|
status=constants.OrderStatus.transmitting.value).all()
|
2021-12-21 16:32:30 -03:00
|
|
|
|
|
|
|
# Do not start a new transmission if another order is being transmitted
|
|
|
|
# right now
|
|
|
|
if len(transmitting_orders) > 0:
|
|
|
|
return False
|
|
|
|
|
|
|
|
# First, try to find a paid order with the highest bid in the orders table
|
|
|
|
# and start its transmission. If no orders are found there, look into the
|
|
|
|
# tx_retries table and retransmit one of the orders from there if it meets
|
|
|
|
# the retransmission criteria
|
2021-12-21 16:31:41 -03:00
|
|
|
order = Order.query.filter_by(
|
|
|
|
status=constants.OrderStatus.paid.value).order_by(
|
|
|
|
Order.bid_per_byte.desc()).first()
|
2021-12-21 16:32:30 -03:00
|
|
|
|
2021-12-21 16:31:41 -03:00
|
|
|
if order:
|
|
|
|
logging.info(f'transmission start {order.uuid}')
|
|
|
|
assign_tx_seq_num(order)
|
|
|
|
order.status = constants.OrderStatus.transmitting.value
|
2021-12-21 16:32:30 -03:00
|
|
|
order.started_transmission_at = datetime.utcnow()
|
2021-12-21 16:31:41 -03:00
|
|
|
db.session.commit()
|
|
|
|
publish_to_sse_server(order)
|
2021-12-21 16:32:30 -03:00
|
|
|
else:
|
|
|
|
# No order found for the first transmission.
|
|
|
|
# Check if any order requires retransmission.
|
|
|
|
order, retransmit_info = order_helpers.get_next_retransmission()
|
|
|
|
if order and retransmit_info:
|
|
|
|
logging.info(f'retransmission start {order.uuid}')
|
|
|
|
order.status = constants.OrderStatus.transmitting.value
|
|
|
|
retransmit_info.retry_count += 1
|
|
|
|
retransmit_info.last_attempt = datetime.utcnow()
|
|
|
|
retransmit_info.pending = False
|
|
|
|
db.session.commit()
|
|
|
|
publish_to_sse_server(order, retransmit_info)
|
2021-07-20 11:00:49 -03:00
|
|
|
|
|
|
|
|
2021-12-21 16:31:41 -03:00
|
|
|
def tx_end(order):
|
|
|
|
"""End transmission"""
|
2021-12-21 16:32:30 -03:00
|
|
|
if order.ended_transmission_at is None:
|
2021-12-21 16:31:41 -03:00
|
|
|
logging.info(f'transmission end {order.uuid}')
|
2021-12-21 16:32:30 -03:00
|
|
|
order.ended_transmission_at = datetime.utcnow()
|
|
|
|
retransmit_info = TxRetry.query.filter_by(order_id=order.id).first()
|
|
|
|
# Cleanup the TxRetry
|
|
|
|
TxRetry.query.filter_by(order_id=order.id).delete()
|
2021-12-21 16:31:41 -03:00
|
|
|
db.session.commit()
|
2021-12-21 16:32:30 -03:00
|
|
|
publish_to_sse_server(order, retransmit_info)
|
2021-12-21 16:31:41 -03:00
|
|
|
# Start the next queued order as soon as the current order finishes
|
|
|
|
tx_start()
|