Remove tx loop and rely on tx confirmations

This change removes the infinite transmission loop with sleep periods
between each transmission. Now, a new transmission can start by a call
to "tx_start" on the following two conditions:

1) As soon as the application starts (if there is a previously paid
   order waiting already).
2) As soon as the current transmission ends. The server will immediately
   look for the next order ready for transmission and start it if
   available.

Meanwhile, the condition for ending the current transmission (i.e., for
calling "tx_end(order)") is when all Tx confirmations are received from
all regions.
This commit is contained in:
Blockstream Satellite 2021-12-21 16:31:41 -03:00
parent 0fc668d490
commit 2cfc39804e
15 changed files with 309 additions and 310 deletions

View File

@ -20,7 +20,7 @@ services:
- REDIS_URI=redis://redis:6379
volumes:
- data:/data
tx-daemon:
workers:
build: server
depends_on:
- api-server
@ -32,7 +32,7 @@ services:
- ENV=development
volumes:
- data:/data
command: daemon.sh
command: workers.sh
sse-server:
build:
context: sse/

View File

@ -1,3 +0,0 @@
#!/bin/bash
set -e
python3 daemon.py

View File

@ -10,6 +10,7 @@ from constants import InvoiceStatus, OrderStatus
from database import db
from error import get_http_error_resp
from models import Order, RxConfirmation, TxConfirmation
import transmitter
import constants
from utils import hmac_sha256_digest
@ -75,6 +76,7 @@ def maybe_mark_order_as_paid(order_id):
validate_bid(order.message_size, order.bid):
order.status = OrderStatus.paid.value
db.session.commit()
transmitter.tx_start()
def expire_order(order):
@ -116,8 +118,10 @@ def synthesize_presumed_rx_confirmations(order):
presumed)
def received_criteria_met(order):
if order.status != OrderStatus.sent.value:
def sent_or_received_criteria_met(order):
if order.status not in [
OrderStatus.transmitting.value, OrderStatus.sent.value
]:
return False
order_tx_confirmations = TxConfirmation.query.filter_by(
@ -134,6 +138,10 @@ def received_criteria_met(order):
if len(tx_confirmed_regions) < len(constants.Regions):
return False
# Reaching this point means that all tx hosts sent
# tx_confirmations and the ongoing transmission can be ended
transmitter.tx_end(order)
# Some regions should confirm Rx
expected_rx_confirmations = set([
info['id'] for region, info in constants.SATELLITE_REGIONS.items()

View File

@ -249,6 +249,10 @@ class TxConfirmationResource(Resource):
if errors:
return errors, HTTPStatus.BAD_REQUEST
# Find order by sequence number. Note only transmitting/transmitted
# orders have a sequence number, whereas still pending or paid orders
# do not. Hence, the following query implicitly ensures the order is
# already transmitting/transmitted.
order = Order.query.filter_by(tx_seq_num=tx_seq_num).first()
if not order:
return get_http_error_resp('SEQUENCE_NUMBER_NOT_FOUND', tx_seq_num)
@ -262,7 +266,7 @@ class TxConfirmationResource(Resource):
order_helpers.add_confirmation_if_not_present(
TxConfirmation, order, region_number)
order_helpers.received_criteria_met(order)
order_helpers.sent_or_received_criteria_met(order)
return {
'message': f'transmission confirmed for regions {args["regions"]}'
@ -289,7 +293,7 @@ class RxConfirmationResource(Resource):
order_helpers.add_confirmation_if_not_present(RxConfirmation, order,
region_in_request)
order_helpers.received_criteria_met(order)
order_helpers.sent_or_received_criteria_met(order)
return {
'message': f'reception confirmed for region {region_in_request}'

View File

@ -4,6 +4,7 @@ import logging
from flask import Flask
from flask_restful import Api
import redis
import constants
from database import db
@ -23,7 +24,7 @@ def create_app(from_test=False):
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{constants.DB_FILE}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['TESTING'] = from_test
app.config["REDIS_URL"] = constants.REDIS_URI
app.config["REDIS_INSTANCE"] = redis.from_url(constants.REDIS_URI)
db.init_app(app)
with app.app_context():

9
server/tests/conftest.py Normal file
View File

@ -0,0 +1,9 @@
import pytest
@pytest.fixture
def mockredis(mocker):
_mr = mocker.Mock(name="mockredis")
mocker.patch("transmitter.redis", return_value=_mr)
mocker.patch("transmitter.redis.from_url", return_value=_mr)
return _mr

View File

@ -1,143 +0,0 @@
from datetime import datetime, timedelta
import os
import pytest
from unittest.mock import patch
from common import generate_test_order
from constants import InvoiceStatus, OrderStatus
import daemon
from database import db
from models import Order, Invoice
from invoice_helpers import pay_invoice
import constants
import server
@pytest.fixture
def tx_engine(mocker):
tx_engine = daemon.TxEngine()
yield tx_engine
@pytest.fixture
def mockredis(mocker):
_mr = mocker.Mock(name="mockredis")
mocker.patch("transmitter.redis", return_value=_mr)
mocker.patch("transmitter.redis.from_url", return_value=_mr)
return _mr
@pytest.fixture
def client(mockredis):
app = server.create_app(from_test=True)
app.app_context().push()
with app.test_client() as client:
yield client
server.teardown_app(app)
@pytest.fixture
def daemon_app():
app = daemon.create_app()
yield app
@patch('orders.new_invoice')
def test_tx_engine(mock_new_invoice, client, tx_engine):
# prepare test env
# create an old transmitted order
completed_order_uuid = generate_test_order(
mock_new_invoice,
client,
invoice_status=InvoiceStatus.paid,
order_status=OrderStatus.transmitting)['uuid']
# create two sendable orders
first_sendable_order_uuid = generate_test_order(mock_new_invoice,
client,
order_id=5,
bid=1000)['uuid']
first_sendable_db_order = \
Order.query.filter_by(uuid=first_sendable_order_uuid).first()
pay_invoice(first_sendable_db_order.invoices[0])
db.session.commit()
second_sendable_order_uuid = generate_test_order(mock_new_invoice,
client,
order_id=6,
bid=2000)['uuid']
second_sendable_db_order = \
Order.query.filter_by(uuid=second_sendable_order_uuid).first()
pay_invoice(second_sendable_db_order.invoices[0])
db.session.commit()
tx_engine.start(2)
completed_db_order = \
Order.query.filter_by(uuid=completed_order_uuid).first()
assert completed_db_order.status == OrderStatus.sent.value
assert completed_db_order.ended_transmission_at is not None
first_sendable_db_order = \
Order.query.filter_by(uuid=first_sendable_order_uuid).first()
second_sendable_db_order = \
Order.query.filter_by(uuid=second_sendable_order_uuid).first()
assert first_sendable_db_order.status == constants.OrderStatus.sent.value
assert second_sendable_db_order.status == constants.OrderStatus.sent.value
# The second order has a higher bid_per_byte, so it should be sent first
assert first_sendable_db_order.tx_seq_num == 2
assert second_sendable_db_order.tx_seq_num == 1
assert second_sendable_db_order.ended_transmission_at \
< first_sendable_db_order.ended_transmission_at
@patch('orders.new_invoice')
def test_cleanup_database(mock_new_invoice, client, tx_engine, daemon_app):
# prepare test env
# create an invoice that must get expired
pending_invoice_lid = generate_test_order(
mock_new_invoice, client, order_id=2)['lightning_invoice']['id']
pending_db_invoice = \
Invoice.query.filter_by(lid=pending_invoice_lid).first()
pending_db_invoice.expires_at = datetime.utcnow() - timedelta(days=1)
db.session.commit()
# create an order that must get expired
pending_order_uuid = generate_test_order(mock_new_invoice,
client,
order_id=3)['uuid']
pending_db_order = Order.query.filter_by(uuid=pending_order_uuid).first()
pending_db_order.created_at = datetime.utcnow() - \
timedelta(days=constants.EXPIRE_PENDING_ORDERS_AFTER_DAYS
+ 1)
db.session.commit()
# Create an order whose transmission ended a long time ago. The
# corresponding message file should be deleted.
sent_order_uuid = generate_test_order(
mock_new_invoice,
client,
order_id=4,
invoice_status=InvoiceStatus.paid)['uuid']
sent_db_order = Order.query.filter_by(uuid=sent_order_uuid).first()
sent_db_order.ended_transmission_at = datetime.utcnow() -\
timedelta(days=constants.MESSAGE_FILE_RETENTION_TIME_DAYS
+ 1)
db.session.commit()
daemon.cleanup_database(daemon_app)
pending_db_invoice = \
Invoice.query.filter_by(lid=pending_invoice_lid).first()
assert pending_db_invoice.status == InvoiceStatus.expired.value
pending_db_order = Order.query.filter_by(uuid=pending_order_uuid).first()
assert pending_db_order.status == OrderStatus.expired.value
message_path = os.path.join(constants.MSG_STORE_PATH, pending_order_uuid)
assert not os.path.exists(message_path)
message_path = os.path.join(constants.MSG_STORE_PATH, sent_order_uuid)
assert not os.path.exists(message_path)

View File

@ -3,7 +3,7 @@ from http import HTTPStatus
from unittest.mock import patch
from database import db
from constants import InvoiceStatus
from constants import InvoiceStatus, OrderStatus
from error import assert_error
from models import Invoice, Order
from utils import hmac_sha256_digest
@ -16,7 +16,7 @@ from common import new_invoice,\
@pytest.fixture
def client():
def client(mockredis):
app = server.create_app(from_test=True)
app.app_context().push()
with app.test_client() as client:
@ -109,7 +109,7 @@ def test_paid_invoice_callback_successfully(mock_new_invoice, client):
db_invoice = Invoice.query.filter_by(lid=invoice.lid).first()
db_order = Order.query.filter_by(uuid=uuid_order).first()
assert db_invoice.status == InvoiceStatus.paid.value
assert db_order.status == InvoiceStatus.paid.value
assert db_order.status == OrderStatus.transmitting.value
assert db_invoice.paid_at is not None
@ -191,7 +191,7 @@ def test_pay_multiple_invoices(mock_new_invoice, client):
db_order = Order.query.filter_by(uuid=uuid).first()
assert db_invoice.status == InvoiceStatus.paid.value
assert db_invoice.paid_at is not None
assert db_order.status == InvoiceStatus.paid.value
assert db_order.status == OrderStatus.transmitting.value
assert db_order.bid == total_bid
assert db_order.unpaid_bid == 0

View File

@ -22,7 +22,7 @@ from common import check_invoice, check_upload, new_invoice,\
@pytest.fixture
def client():
def client(mockredis):
app = server.create_app(from_test=True)
app.app_context().push()
with app.test_client() as client:
@ -449,14 +449,14 @@ def test_get_sent_message_by_seq_number_for_paid_order(mock_new_invoice,
uuid = generate_test_order(mock_new_invoice, client)['uuid']
db_order = Order.query.filter_by(uuid=uuid).first()
pay_invoice(db_order.invoices[0])
db_order.tx_seq_num = 1
db.session.commit()
# Try to get sent_message for a paid order by sequence number. sent_message
# can only be retrieved for transmitting, sent, or received messages.
# Because the invoice was paid, the order should go immediately into
# transmitting state and be assigned with sequence number 1.
rv = client.get('/message/1')
assert rv.status_code == HTTPStatus.NOT_FOUND
assert_error(rv.get_json(), 'SEQUENCE_NUMBER_NOT_FOUND')
assert rv.status_code == HTTPStatus.OK
received_message = rv.data
check_received_message(uuid, received_message)
@patch('orders.new_invoice')
@ -585,8 +585,8 @@ def test_confirm_rx(mock_new_invoice, client):
Regions.g18.value, Regions.e113.value, Regions.t18v_c.value,
Regions.t18v_c.value
]])
def test_received_criteria_met_inadequate_regions(mock_new_invoice, client,
tx_regions, rx_regions):
def test_sent_or_received_criteria_met_inadequate_regions(
mock_new_invoice, client, tx_regions, rx_regions):
uuid = generate_test_order(mock_new_invoice,
client,
order_status=OrderStatus.sent,
@ -608,7 +608,8 @@ def test_received_criteria_met_inadequate_regions(mock_new_invoice, client,
@patch('orders.new_invoice')
def test_received_criteria_met_for_unsent_order(mock_new_invoice, client):
def test_sent_or_received_criteria_met_for_unsent_order(
mock_new_invoice, client):
uuid = generate_test_order(mock_new_invoice, client, tx_seq_num=1)['uuid']
# Confirm tx for all 6 regions
post_rv = client.post('/order/tx/1',
@ -632,17 +633,17 @@ def test_received_criteria_met_for_unsent_order(mock_new_invoice, client):
@patch('orders.new_invoice')
def test_received_criteria_met_successfully(mock_new_invoice, client):
def test_sent_or_received_criteria_met_successfully(mock_new_invoice, client):
uuid = generate_test_order(mock_new_invoice,
client,
order_status=OrderStatus.sent,
order_status=OrderStatus.transmitting,
tx_seq_num=1)['uuid']
# Confirm tx for all 6 regions
post_rv = client.post('/order/tx/1',
data={'regions': [[e.value for e in Regions]]})
assert post_rv.status_code == HTTPStatus.OK
# Order's status should still be sent
# Order's status should change to sent
db_order = Order.query.filter_by(uuid=uuid).first()
assert db_order.status == OrderStatus.sent.value

View File

@ -17,7 +17,7 @@ from common import new_invoice, place_order, generate_test_order
@pytest.fixture
def client():
def client(mockredis):
app = server.create_app(from_test=True)
app.app_context().push()
with app.test_client() as client:
@ -347,13 +347,23 @@ def test_get_sent_orders_paging(mock_new_invoice, client):
@patch('constants.FORCE_PAYMENT', True)
def test_create_order_with_force_payment_enabled(mock_new_invoice, client):
uuid_order1 = generate_test_order(mock_new_invoice, client)['uuid']
uuid_order2 = generate_test_order(mock_new_invoice, client)['uuid']
db_order = Order.query.filter_by(uuid=uuid_order1).first()
db_invoice = db_order.invoices[0]
db_order1 = Order.query.filter_by(uuid=uuid_order1).first()
db_invoice1 = db_order1.invoices[0]
# because FORCE_PAYMENT is set and this order had only 1 invoice
# the expectation is that both the invoice and the order have the
# paid status
assert db_order.status == OrderStatus.paid.value
assert db_invoice.status == InvoiceStatus.paid.value
assert db_invoice.paid_at is not None
db_order2 = Order.query.filter_by(uuid=uuid_order2).first()
db_invoice2 = db_order2.invoices[0]
# Since FORCE_PAYMENT is set and both orders have only one invoice, both
# orders change their statuses to paid. Furthermore, the payment triggers a
# Tx start verification and, since the transmit queue is empty, order1
# immediately changes to transmitting state. In contrast, order2 stays in
# paid state as it needs to wait until the transmission of order1 finishes.
assert db_order1.status == OrderStatus.transmitting.value
assert db_invoice1.status == InvoiceStatus.paid.value
assert db_invoice1.paid_at is not None
assert db_order2.status == OrderStatus.paid.value
assert db_invoice2.status == InvoiceStatus.paid.value
assert db_invoice2.paid_at is not None

View File

@ -4,23 +4,15 @@ from unittest.mock import patch
from common import generate_test_order
from constants import InvoiceStatus, OrderStatus
from transmitter import TxEngine
import transmitter
from database import db
from schemas import order_schema
from models import Order
from invoice_helpers import pay_invoice
import constants
import server
@pytest.fixture
def mockredis(mocker):
_mr = mocker.Mock(name="mockredis")
mocker.patch("transmitter.redis", return_value=_mr)
mocker.patch("transmitter.redis.from_url", return_value=_mr)
return _mr
@pytest.fixture
def app(mockredis):
app = server.create_app(from_test=True)
@ -35,12 +27,6 @@ def client(app):
yield client
@pytest.fixture
def tx_engine():
tx_engine = TxEngine()
yield tx_engine
def assert_redis_call(mockredis, order):
msg = order_schema.dump(order)
mockredis.publish.assert_called_with(channel='transmissions',
@ -48,7 +34,7 @@ def assert_redis_call(mockredis, order):
@patch('orders.new_invoice')
def test_tx_resume(mock_new_invoice, client, tx_engine, mockredis):
def test_tx_resume(mock_new_invoice, client, mockredis):
# Create two orders: one in transmitting state, the other in pending state.
first_order_uuid = generate_test_order(
mock_new_invoice, client,
@ -59,7 +45,7 @@ def test_tx_resume(mock_new_invoice, client, tx_engine, mockredis):
client,
order_id=2)['uuid']
tx_engine.tx_resume()
transmitter.tx_resume()
assert_redis_call(mockredis, first_db_order)
# The expectation is that the transmitting order changes to sent. The other
@ -74,27 +60,21 @@ def test_tx_resume(mock_new_invoice, client, tx_engine, mockredis):
@patch('orders.new_invoice')
def test_tx_start(mock_new_invoice, client, tx_engine, mockredis):
def test_tx_start(mock_new_invoice, client, mockredis):
# Create two orders and pay only for the first.
first_order_uuid = generate_test_order(mock_new_invoice, client)['uuid']
second_order_uuid = generate_test_order(
mock_new_invoice,
client,
order_id=2,
invoice_status=InvoiceStatus.paid)['uuid']
second_order_uuid = generate_test_order(mock_new_invoice,
client,
order_id=2)['uuid']
first_db_order = Order.query.filter_by(uuid=first_order_uuid).first()
pay_invoice(first_db_order.invoices[0])
db.session.commit()
second_db_order = Order.query.filter_by(uuid=first_order_uuid).first()
tx_engine.tx_start(first_db_order)
tx_engine.tx_start(second_db_order)
pay_invoice(first_db_order.invoices[0])
assert_redis_call(mockredis, first_db_order)
assert_redis_call(mockredis, second_db_order)
db.session.commit()
# The expectation is that the first order gets transmitted and the second
# order stays untouched.
# stays untouched. The invoice callback handler should call tx_start.
first_db_order = Order.query.filter_by(uuid=first_order_uuid).first()
assert first_db_order.status == OrderStatus.transmitting.value
assert first_db_order.tx_seq_num is not None
@ -103,42 +83,108 @@ def test_tx_start(mock_new_invoice, client, tx_engine, mockredis):
assert second_db_order.status == OrderStatus.pending.value
assert second_db_order.tx_seq_num is None
# Calling tx_start explicitly won't change anything since the second order
# is still unpaid
transmitter.tx_start()
second_db_order = Order.query.filter_by(uuid=second_order_uuid).first()
assert second_db_order.status == OrderStatus.pending.value
assert second_db_order.tx_seq_num is None
@patch('orders.new_invoice')
def test_assign_tx_seq_num(mock_new_invoice, client, tx_engine):
def test_assign_tx_seq_num(mock_new_invoice, client):
# make some orders
first_order_uuid = generate_test_order(
mock_new_invoice, client, invoice_status=InvoiceStatus.paid)['uuid']
first_order_uuid = generate_test_order(mock_new_invoice, client)['uuid']
first_db_order = Order.query.filter_by(uuid=first_order_uuid).first()
assert first_db_order.tx_seq_num is None
second_order_uuid = generate_test_order(
mock_new_invoice,
client,
order_id=2,
invoice_status=InvoiceStatus.paid)['uuid']
second_order_uuid = generate_test_order(mock_new_invoice,
client,
order_id=2)['uuid']
second_db_order = Order.query.filter_by(uuid=second_order_uuid).first()
assert second_db_order.tx_seq_num is None
third_order_uuid = generate_test_order(
mock_new_invoice,
client,
order_id=3,
invoice_status=InvoiceStatus.paid)['uuid']
third_order_uuid = generate_test_order(mock_new_invoice,
client,
order_id=3)['uuid']
third_db_order = Order.query.filter_by(uuid=third_order_uuid).first()
assert third_db_order.tx_seq_num is None
tx_engine.assign_tx_seq_num(first_db_order)
transmitter.assign_tx_seq_num(first_db_order)
db.session.commit()
first_db_order = Order.query.filter_by(uuid=first_order_uuid).first()
assert first_db_order.tx_seq_num == 1
tx_engine.assign_tx_seq_num(second_db_order)
transmitter.assign_tx_seq_num(second_db_order)
db.session.commit()
second_db_order = Order.query.filter_by(uuid=second_order_uuid).first()
assert second_db_order.tx_seq_num == 2
tx_engine.assign_tx_seq_num(third_db_order)
transmitter.assign_tx_seq_num(third_db_order)
db.session.commit()
third_db_order = Order.query.filter_by(uuid=third_order_uuid).first()
assert third_db_order.tx_seq_num == 3
@patch('orders.new_invoice')
def test_start(mock_new_invoice, client, mockredis):
# prepare test env
# create an old transmitted order
completed_order_uuid = generate_test_order(
mock_new_invoice,
client,
invoice_status=InvoiceStatus.paid,
order_status=OrderStatus.transmitting)['uuid']
# create two sendable orders
first_sendable_order_uuid = generate_test_order(mock_new_invoice,
client,
order_id=5,
bid=1000)['uuid']
first_sendable_db_order = \
Order.query.filter_by(uuid=first_sendable_order_uuid).first()
pay_invoice(first_sendable_db_order.invoices[0])
db.session.commit()
second_sendable_order_uuid = generate_test_order(mock_new_invoice,
client,
order_id=6,
bid=2000)['uuid']
second_sendable_db_order = \
Order.query.filter_by(uuid=second_sendable_order_uuid).first()
pay_invoice(second_sendable_db_order.invoices[0])
db.session.commit()
# tx_resume() will finalize the completed order and call
# tx_start() which leads to one of the paid orders being sent,
# The tx_start() function method tries to send the other order
# but it can't because the first sent order has not ended yet.
# The direct call to tx_end() below will force ending the first
# transmission and trigger the second transmission
transmitter.tx_resume()
transmitter.tx_start()
first_sendable_db_order = \
Order.query.filter_by(uuid=first_sendable_order_uuid).first()
second_sendable_db_order = \
Order.query.filter_by(uuid=second_sendable_order_uuid).first()
assert first_sendable_db_order.status == OrderStatus.paid.value
assert second_sendable_db_order.status == OrderStatus.transmitting.value
transmitter.tx_end(second_sendable_db_order)
completed_db_order = \
Order.query.filter_by(uuid=completed_order_uuid).first()
assert completed_db_order.status == OrderStatus.sent.value
assert completed_db_order.ended_transmission_at is not None
first_sendable_db_order = \
Order.query.filter_by(uuid=first_sendable_order_uuid).first()
second_sendable_db_order = \
Order.query.filter_by(uuid=second_sendable_order_uuid).first()
assert first_sendable_db_order.status ==\
constants.OrderStatus.transmitting.value
assert second_sendable_db_order.status ==\
constants.OrderStatus.sent.value
# The second order has a higher bid_per_byte, so it should be sent first
assert first_sendable_db_order.tx_seq_num == 2
assert second_sendable_db_order.tx_seq_num == 1

View File

@ -0,0 +1,77 @@
from datetime import datetime, timedelta
import os
import pytest
from unittest.mock import patch
from common import generate_test_order
from constants import InvoiceStatus, OrderStatus
from database import db
from models import Order, Invoice
import constants
import server
from worker_manager import cleanup_database
@pytest.fixture
def app():
app = server.create_app(from_test=True)
app.app_context().push()
yield app
server.teardown_app(app)
@pytest.fixture
def client(app):
with app.test_client() as client:
yield client
@patch('orders.new_invoice')
def test_cleanup_database(mock_new_invoice, client, app):
# prepare test env
# create an invoice that must get expired
pending_invoice_lid = generate_test_order(
mock_new_invoice, client, order_id=2)['lightning_invoice']['id']
pending_db_invoice = \
Invoice.query.filter_by(lid=pending_invoice_lid).first()
pending_db_invoice.expires_at = datetime.utcnow() - timedelta(days=1)
db.session.commit()
# create an order that must get expired
pending_order_uuid = generate_test_order(mock_new_invoice,
client,
order_id=3)['uuid']
pending_db_order = Order.query.filter_by(uuid=pending_order_uuid).first()
pending_db_order.created_at = datetime.utcnow() - \
timedelta(days=constants.EXPIRE_PENDING_ORDERS_AFTER_DAYS
+ 1)
db.session.commit()
# Create an order whose transmission ended a long time ago. The
# corresponding message file should be deleted.
sent_order_uuid = generate_test_order(
mock_new_invoice,
client,
order_id=4,
invoice_status=InvoiceStatus.paid)['uuid']
sent_db_order = Order.query.filter_by(uuid=sent_order_uuid).first()
sent_db_order.ended_transmission_at = datetime.utcnow() -\
timedelta(days=constants.MESSAGE_FILE_RETENTION_TIME_DAYS
+ 1)
db.session.commit()
cleanup_database(app)
pending_db_invoice = \
Invoice.query.filter_by(lid=pending_invoice_lid).first()
assert pending_db_invoice.status == InvoiceStatus.expired.value
pending_db_order = Order.query.filter_by(uuid=pending_order_uuid).first()
assert pending_db_order.status == OrderStatus.expired.value
message_path = os.path.join(constants.MSG_STORE_PATH, pending_order_uuid)
assert not os.path.exists(message_path)
message_path = os.path.join(constants.MSG_STORE_PATH, sent_order_uuid)
assert not os.path.exists(message_path)

View File

@ -1,8 +1,6 @@
import datetime
import logging
import time
import redis
from flask import current_app
import constants
@ -11,81 +9,63 @@ from models import Order
from schemas import order_schema
class TxEngine:
def __init__(self):
self.redis = redis.from_url(current_app.config.get("REDIS_URL"))
def assign_tx_seq_num(order):
"""Assign Tx sequence number to order"""
last_tx_order = Order.query.order_by(Order.tx_seq_num.desc()).first()
def assign_tx_seq_num(self, order):
"""Assign Tx sequence number to order"""
last_tx_order = Order.query.order_by(Order.tx_seq_num.desc()).first()
if last_tx_order.tx_seq_num:
order.tx_seq_num = last_tx_order.tx_seq_num + 1
else:
order.tx_seq_num = 1
db.session.commit()
if last_tx_order.tx_seq_num:
order.tx_seq_num = last_tx_order.tx_seq_num + 1
else:
order.tx_seq_num = 1
db.session.commit()
def publish_to_sse_server(self, order):
msg = order_schema.dumps(order)
self.redis.publish(channel=constants.SUB_CHANNELS[0], message=msg)
def redis():
return current_app.config.get("REDIS_INSTANCE")
def publish_to_sse_server(order):
msg = order_schema.dumps(order)
redis().publish(channel=constants.SUB_CHANNELS[0], message=msg)
return
def tx_start():
# Do not start a new transmission if another order is
# being transmitted right now
transmitting_orders = Order.query.filter_by(
status=constants.OrderStatus.transmitting.value).all()
if transmitting_orders:
return
# Pick the order with the highest bid and start transmission
order = Order.query.filter_by(
status=constants.OrderStatus.paid.value).order_by(
Order.bid_per_byte.desc()).first()
if order:
logging.info(f'transmission start {order.uuid}')
assign_tx_seq_num(order)
order.status = constants.OrderStatus.transmitting.value
order.started_transmission_at = datetime.datetime.utcnow()
db.session.commit()
publish_to_sse_server(order)
def tx_start(self, order):
"""Start transmission"""
if order.status == constants.OrderStatus.paid.value:
logging.info(f'transmission start {order.uuid}')
self.assign_tx_seq_num(order)
order.status = constants.OrderStatus.transmitting.value
order.started_transmission_at = datetime.datetime.utcnow()
db.session.commit()
self.publish_to_sse_server(order)
def tx_end(self, order):
"""End transmission"""
if order.status == constants.OrderStatus.transmitting.value:
logging.info(f'transmission end {order.uuid}')
order.status = constants.OrderStatus.sent.value
order.ended_transmission_at = datetime.datetime.utcnow()
db.session.commit()
self.publish_to_sse_server(order)
def tx_end(order):
"""End transmission"""
if order.status == constants.OrderStatus.transmitting.value:
logging.info(f'transmission end {order.uuid}')
order.status = constants.OrderStatus.sent.value
order.ended_transmission_at = datetime.datetime.utcnow()
db.session.commit()
publish_to_sse_server(order)
# Start the next queued order as soon as the current order finishes
tx_start()
def tx_resume(self):
"""Resume interrupted transmissions"""
orders = Order.query.filter_by(
status=constants.OrderStatus.transmitting.value).all()
for order in orders:
logging.info(f'resuming interrupted transmission {order.uuid}')
self.tx_end(order)
# rounds = -1 means run forever and is the default value. Values other than
# -1 are mainly used from tests.
def start(self, rounds=-1):
logging.info("Starting transmitter")
round_count = 0
self.tx_resume()
while True:
sendable_order = None
while sendable_order is None:
# Look for an elligble order to transmit and, if one is found,
# begin transmitting it.
sendable_order = Order.query.filter_by(
status=constants.OrderStatus.paid.value).order_by(
Order.bid_per_byte.desc()).first()
if sendable_order:
self.tx_start(sendable_order)
else:
time.sleep(1.0)
if constants.TRANSMIT_RATE:
transmit_time = \
sendable_order.message_size / constants.TRANSMIT_RATE
logging.info(f'sleeping for {transmit_time} while \
{sendable_order.uuid} transmits')
time.sleep(transmit_time)
self.tx_end(sendable_order)
if rounds > -1:
round_count += 1
if round_count >= rounds:
return
def tx_resume():
"""Resume interrupted transmissions"""
orders = Order.query.filter_by(
status=constants.OrderStatus.transmitting.value).all()
for order in orders:
logging.info(f'resuming interrupted transmission {order.uuid}')
tx_end(order)

View File

@ -1,12 +1,13 @@
import logging
from flask import Flask
import redis
import constants
import invoice_helpers
import order_helpers
import transmitter
from database import db
from transmitter import TxEngine
from worker import Worker
ONE_MINUTE = 60
@ -29,14 +30,22 @@ def cleanup_database(app):
"{} orders, and removed {} files".format(*work))
def start_workers(app):
# Workers
cleanup_worker = Worker(period=CLEANUP_DUTY_CYCLE,
fcn=cleanup_database,
args=(app, ),
name="database cleaner")
cleanup_worker.thread.join()
def create_app():
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{constants.DB_FILE}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config["REDIS_URL"] = constants.REDIS_URI
app.config["REDIS_INSTANCE"] = redis.from_url(constants.REDIS_URI)
db.init_app(app)
with app.app_context():
db.create_all()
return app
@ -44,16 +53,13 @@ def main():
logging.basicConfig(level=logging.DEBUG, format=constants.LOGGING_FORMAT)
app = create_app()
# Workers
Worker(period=CLEANUP_DUTY_CYCLE,
fcn=cleanup_database,
args=(app, ),
name="database cleaner")
# Main Tx Engine
with app.app_context():
tx_engine = TxEngine()
tx_engine.start()
db.create_all()
# In order to avoid running resume/start on each gunicorn worker, these
# calls are being made from this module only instead of the main server
transmitter.tx_resume()
transmitter.tx_start()
start_workers(app)
if __name__ == '__main__':

3
server/workers.sh Normal file
View File

@ -0,0 +1,3 @@
#!/bin/bash
set -e
python3 worker_manager.py