mirror of
https://github.com/Blockstream/satellite-api.git
synced 2024-11-19 04:50:01 +01:00
Remove tx loop and rely on tx confirmations
This change removes the infinite transmission loop with sleep periods between each transmission. Now, a new transmission can start by a call to "tx_start" on the following two conditions: 1) As soon as the application starts (if there is a previously paid order waiting already). 2) As soon as the current transmission ends. The server will immediately look for the next order ready for transmission and start it if available. Meanwhile, the condition for ending the current transmission (i.e., for calling "tx_end(order)") is when all Tx confirmations are received from all regions.
This commit is contained in:
parent
0fc668d490
commit
2cfc39804e
@ -20,7 +20,7 @@ services:
|
||||
- REDIS_URI=redis://redis:6379
|
||||
volumes:
|
||||
- data:/data
|
||||
tx-daemon:
|
||||
workers:
|
||||
build: server
|
||||
depends_on:
|
||||
- api-server
|
||||
@ -32,7 +32,7 @@ services:
|
||||
- ENV=development
|
||||
volumes:
|
||||
- data:/data
|
||||
command: daemon.sh
|
||||
command: workers.sh
|
||||
sse-server:
|
||||
build:
|
||||
context: sse/
|
||||
|
@ -1,3 +0,0 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
python3 daemon.py
|
@ -10,6 +10,7 @@ from constants import InvoiceStatus, OrderStatus
|
||||
from database import db
|
||||
from error import get_http_error_resp
|
||||
from models import Order, RxConfirmation, TxConfirmation
|
||||
import transmitter
|
||||
import constants
|
||||
from utils import hmac_sha256_digest
|
||||
|
||||
@ -75,6 +76,7 @@ def maybe_mark_order_as_paid(order_id):
|
||||
validate_bid(order.message_size, order.bid):
|
||||
order.status = OrderStatus.paid.value
|
||||
db.session.commit()
|
||||
transmitter.tx_start()
|
||||
|
||||
|
||||
def expire_order(order):
|
||||
@ -116,8 +118,10 @@ def synthesize_presumed_rx_confirmations(order):
|
||||
presumed)
|
||||
|
||||
|
||||
def received_criteria_met(order):
|
||||
if order.status != OrderStatus.sent.value:
|
||||
def sent_or_received_criteria_met(order):
|
||||
if order.status not in [
|
||||
OrderStatus.transmitting.value, OrderStatus.sent.value
|
||||
]:
|
||||
return False
|
||||
|
||||
order_tx_confirmations = TxConfirmation.query.filter_by(
|
||||
@ -134,6 +138,10 @@ def received_criteria_met(order):
|
||||
if len(tx_confirmed_regions) < len(constants.Regions):
|
||||
return False
|
||||
|
||||
# Reaching this point means that all tx hosts sent
|
||||
# tx_confirmations and the ongoing transmission can be ended
|
||||
transmitter.tx_end(order)
|
||||
|
||||
# Some regions should confirm Rx
|
||||
expected_rx_confirmations = set([
|
||||
info['id'] for region, info in constants.SATELLITE_REGIONS.items()
|
||||
|
@ -249,6 +249,10 @@ class TxConfirmationResource(Resource):
|
||||
if errors:
|
||||
return errors, HTTPStatus.BAD_REQUEST
|
||||
|
||||
# Find order by sequence number. Note only transmitting/transmitted
|
||||
# orders have a sequence number, whereas still pending or paid orders
|
||||
# do not. Hence, the following query implicitly ensures the order is
|
||||
# already transmitting/transmitted.
|
||||
order = Order.query.filter_by(tx_seq_num=tx_seq_num).first()
|
||||
if not order:
|
||||
return get_http_error_resp('SEQUENCE_NUMBER_NOT_FOUND', tx_seq_num)
|
||||
@ -262,7 +266,7 @@ class TxConfirmationResource(Resource):
|
||||
order_helpers.add_confirmation_if_not_present(
|
||||
TxConfirmation, order, region_number)
|
||||
|
||||
order_helpers.received_criteria_met(order)
|
||||
order_helpers.sent_or_received_criteria_met(order)
|
||||
|
||||
return {
|
||||
'message': f'transmission confirmed for regions {args["regions"]}'
|
||||
@ -289,7 +293,7 @@ class RxConfirmationResource(Resource):
|
||||
order_helpers.add_confirmation_if_not_present(RxConfirmation, order,
|
||||
region_in_request)
|
||||
|
||||
order_helpers.received_criteria_met(order)
|
||||
order_helpers.sent_or_received_criteria_met(order)
|
||||
|
||||
return {
|
||||
'message': f'reception confirmed for region {region_in_request}'
|
||||
|
@ -4,6 +4,7 @@ import logging
|
||||
|
||||
from flask import Flask
|
||||
from flask_restful import Api
|
||||
import redis
|
||||
|
||||
import constants
|
||||
from database import db
|
||||
@ -23,7 +24,7 @@ def create_app(from_test=False):
|
||||
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{constants.DB_FILE}'
|
||||
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
||||
app.config['TESTING'] = from_test
|
||||
app.config["REDIS_URL"] = constants.REDIS_URI
|
||||
app.config["REDIS_INSTANCE"] = redis.from_url(constants.REDIS_URI)
|
||||
|
||||
db.init_app(app)
|
||||
with app.app_context():
|
||||
|
9
server/tests/conftest.py
Normal file
9
server/tests/conftest.py
Normal file
@ -0,0 +1,9 @@
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mockredis(mocker):
|
||||
_mr = mocker.Mock(name="mockredis")
|
||||
mocker.patch("transmitter.redis", return_value=_mr)
|
||||
mocker.patch("transmitter.redis.from_url", return_value=_mr)
|
||||
return _mr
|
@ -1,143 +0,0 @@
|
||||
from datetime import datetime, timedelta
|
||||
import os
|
||||
import pytest
|
||||
from unittest.mock import patch
|
||||
|
||||
from common import generate_test_order
|
||||
from constants import InvoiceStatus, OrderStatus
|
||||
import daemon
|
||||
from database import db
|
||||
from models import Order, Invoice
|
||||
from invoice_helpers import pay_invoice
|
||||
import constants
|
||||
import server
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tx_engine(mocker):
|
||||
tx_engine = daemon.TxEngine()
|
||||
yield tx_engine
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mockredis(mocker):
|
||||
_mr = mocker.Mock(name="mockredis")
|
||||
mocker.patch("transmitter.redis", return_value=_mr)
|
||||
mocker.patch("transmitter.redis.from_url", return_value=_mr)
|
||||
return _mr
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(mockredis):
|
||||
app = server.create_app(from_test=True)
|
||||
app.app_context().push()
|
||||
with app.test_client() as client:
|
||||
yield client
|
||||
server.teardown_app(app)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def daemon_app():
|
||||
app = daemon.create_app()
|
||||
yield app
|
||||
|
||||
|
||||
@patch('orders.new_invoice')
|
||||
def test_tx_engine(mock_new_invoice, client, tx_engine):
|
||||
# prepare test env
|
||||
|
||||
# create an old transmitted order
|
||||
completed_order_uuid = generate_test_order(
|
||||
mock_new_invoice,
|
||||
client,
|
||||
invoice_status=InvoiceStatus.paid,
|
||||
order_status=OrderStatus.transmitting)['uuid']
|
||||
|
||||
# create two sendable orders
|
||||
first_sendable_order_uuid = generate_test_order(mock_new_invoice,
|
||||
client,
|
||||
order_id=5,
|
||||
bid=1000)['uuid']
|
||||
first_sendable_db_order = \
|
||||
Order.query.filter_by(uuid=first_sendable_order_uuid).first()
|
||||
pay_invoice(first_sendable_db_order.invoices[0])
|
||||
db.session.commit()
|
||||
|
||||
second_sendable_order_uuid = generate_test_order(mock_new_invoice,
|
||||
client,
|
||||
order_id=6,
|
||||
bid=2000)['uuid']
|
||||
second_sendable_db_order = \
|
||||
Order.query.filter_by(uuid=second_sendable_order_uuid).first()
|
||||
pay_invoice(second_sendable_db_order.invoices[0])
|
||||
db.session.commit()
|
||||
|
||||
tx_engine.start(2)
|
||||
|
||||
completed_db_order = \
|
||||
Order.query.filter_by(uuid=completed_order_uuid).first()
|
||||
assert completed_db_order.status == OrderStatus.sent.value
|
||||
assert completed_db_order.ended_transmission_at is not None
|
||||
|
||||
first_sendable_db_order = \
|
||||
Order.query.filter_by(uuid=first_sendable_order_uuid).first()
|
||||
second_sendable_db_order = \
|
||||
Order.query.filter_by(uuid=second_sendable_order_uuid).first()
|
||||
assert first_sendable_db_order.status == constants.OrderStatus.sent.value
|
||||
assert second_sendable_db_order.status == constants.OrderStatus.sent.value
|
||||
# The second order has a higher bid_per_byte, so it should be sent first
|
||||
assert first_sendable_db_order.tx_seq_num == 2
|
||||
assert second_sendable_db_order.tx_seq_num == 1
|
||||
assert second_sendable_db_order.ended_transmission_at \
|
||||
< first_sendable_db_order.ended_transmission_at
|
||||
|
||||
|
||||
@patch('orders.new_invoice')
|
||||
def test_cleanup_database(mock_new_invoice, client, tx_engine, daemon_app):
|
||||
# prepare test env
|
||||
|
||||
# create an invoice that must get expired
|
||||
pending_invoice_lid = generate_test_order(
|
||||
mock_new_invoice, client, order_id=2)['lightning_invoice']['id']
|
||||
pending_db_invoice = \
|
||||
Invoice.query.filter_by(lid=pending_invoice_lid).first()
|
||||
pending_db_invoice.expires_at = datetime.utcnow() - timedelta(days=1)
|
||||
db.session.commit()
|
||||
|
||||
# create an order that must get expired
|
||||
pending_order_uuid = generate_test_order(mock_new_invoice,
|
||||
client,
|
||||
order_id=3)['uuid']
|
||||
pending_db_order = Order.query.filter_by(uuid=pending_order_uuid).first()
|
||||
pending_db_order.created_at = datetime.utcnow() - \
|
||||
timedelta(days=constants.EXPIRE_PENDING_ORDERS_AFTER_DAYS
|
||||
+ 1)
|
||||
db.session.commit()
|
||||
|
||||
# Create an order whose transmission ended a long time ago. The
|
||||
# corresponding message file should be deleted.
|
||||
sent_order_uuid = generate_test_order(
|
||||
mock_new_invoice,
|
||||
client,
|
||||
order_id=4,
|
||||
invoice_status=InvoiceStatus.paid)['uuid']
|
||||
sent_db_order = Order.query.filter_by(uuid=sent_order_uuid).first()
|
||||
sent_db_order.ended_transmission_at = datetime.utcnow() -\
|
||||
timedelta(days=constants.MESSAGE_FILE_RETENTION_TIME_DAYS
|
||||
+ 1)
|
||||
db.session.commit()
|
||||
|
||||
daemon.cleanup_database(daemon_app)
|
||||
|
||||
pending_db_invoice = \
|
||||
Invoice.query.filter_by(lid=pending_invoice_lid).first()
|
||||
assert pending_db_invoice.status == InvoiceStatus.expired.value
|
||||
|
||||
pending_db_order = Order.query.filter_by(uuid=pending_order_uuid).first()
|
||||
assert pending_db_order.status == OrderStatus.expired.value
|
||||
|
||||
message_path = os.path.join(constants.MSG_STORE_PATH, pending_order_uuid)
|
||||
assert not os.path.exists(message_path)
|
||||
|
||||
message_path = os.path.join(constants.MSG_STORE_PATH, sent_order_uuid)
|
||||
assert not os.path.exists(message_path)
|
@ -3,7 +3,7 @@ from http import HTTPStatus
|
||||
from unittest.mock import patch
|
||||
|
||||
from database import db
|
||||
from constants import InvoiceStatus
|
||||
from constants import InvoiceStatus, OrderStatus
|
||||
from error import assert_error
|
||||
from models import Invoice, Order
|
||||
from utils import hmac_sha256_digest
|
||||
@ -16,7 +16,7 @@ from common import new_invoice,\
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
def client(mockredis):
|
||||
app = server.create_app(from_test=True)
|
||||
app.app_context().push()
|
||||
with app.test_client() as client:
|
||||
@ -109,7 +109,7 @@ def test_paid_invoice_callback_successfully(mock_new_invoice, client):
|
||||
db_invoice = Invoice.query.filter_by(lid=invoice.lid).first()
|
||||
db_order = Order.query.filter_by(uuid=uuid_order).first()
|
||||
assert db_invoice.status == InvoiceStatus.paid.value
|
||||
assert db_order.status == InvoiceStatus.paid.value
|
||||
assert db_order.status == OrderStatus.transmitting.value
|
||||
assert db_invoice.paid_at is not None
|
||||
|
||||
|
||||
@ -191,7 +191,7 @@ def test_pay_multiple_invoices(mock_new_invoice, client):
|
||||
db_order = Order.query.filter_by(uuid=uuid).first()
|
||||
assert db_invoice.status == InvoiceStatus.paid.value
|
||||
assert db_invoice.paid_at is not None
|
||||
assert db_order.status == InvoiceStatus.paid.value
|
||||
assert db_order.status == OrderStatus.transmitting.value
|
||||
assert db_order.bid == total_bid
|
||||
assert db_order.unpaid_bid == 0
|
||||
|
||||
|
@ -22,7 +22,7 @@ from common import check_invoice, check_upload, new_invoice,\
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
def client(mockredis):
|
||||
app = server.create_app(from_test=True)
|
||||
app.app_context().push()
|
||||
with app.test_client() as client:
|
||||
@ -449,14 +449,14 @@ def test_get_sent_message_by_seq_number_for_paid_order(mock_new_invoice,
|
||||
uuid = generate_test_order(mock_new_invoice, client)['uuid']
|
||||
db_order = Order.query.filter_by(uuid=uuid).first()
|
||||
pay_invoice(db_order.invoices[0])
|
||||
db_order.tx_seq_num = 1
|
||||
db.session.commit()
|
||||
|
||||
# Try to get sent_message for a paid order by sequence number. sent_message
|
||||
# can only be retrieved for transmitting, sent, or received messages.
|
||||
# Because the invoice was paid, the order should go immediately into
|
||||
# transmitting state and be assigned with sequence number 1.
|
||||
rv = client.get('/message/1')
|
||||
assert rv.status_code == HTTPStatus.NOT_FOUND
|
||||
assert_error(rv.get_json(), 'SEQUENCE_NUMBER_NOT_FOUND')
|
||||
assert rv.status_code == HTTPStatus.OK
|
||||
received_message = rv.data
|
||||
check_received_message(uuid, received_message)
|
||||
|
||||
|
||||
@patch('orders.new_invoice')
|
||||
@ -585,8 +585,8 @@ def test_confirm_rx(mock_new_invoice, client):
|
||||
Regions.g18.value, Regions.e113.value, Regions.t18v_c.value,
|
||||
Regions.t18v_c.value
|
||||
]])
|
||||
def test_received_criteria_met_inadequate_regions(mock_new_invoice, client,
|
||||
tx_regions, rx_regions):
|
||||
def test_sent_or_received_criteria_met_inadequate_regions(
|
||||
mock_new_invoice, client, tx_regions, rx_regions):
|
||||
uuid = generate_test_order(mock_new_invoice,
|
||||
client,
|
||||
order_status=OrderStatus.sent,
|
||||
@ -608,7 +608,8 @@ def test_received_criteria_met_inadequate_regions(mock_new_invoice, client,
|
||||
|
||||
|
||||
@patch('orders.new_invoice')
|
||||
def test_received_criteria_met_for_unsent_order(mock_new_invoice, client):
|
||||
def test_sent_or_received_criteria_met_for_unsent_order(
|
||||
mock_new_invoice, client):
|
||||
uuid = generate_test_order(mock_new_invoice, client, tx_seq_num=1)['uuid']
|
||||
# Confirm tx for all 6 regions
|
||||
post_rv = client.post('/order/tx/1',
|
||||
@ -632,17 +633,17 @@ def test_received_criteria_met_for_unsent_order(mock_new_invoice, client):
|
||||
|
||||
|
||||
@patch('orders.new_invoice')
|
||||
def test_received_criteria_met_successfully(mock_new_invoice, client):
|
||||
def test_sent_or_received_criteria_met_successfully(mock_new_invoice, client):
|
||||
uuid = generate_test_order(mock_new_invoice,
|
||||
client,
|
||||
order_status=OrderStatus.sent,
|
||||
order_status=OrderStatus.transmitting,
|
||||
tx_seq_num=1)['uuid']
|
||||
|
||||
# Confirm tx for all 6 regions
|
||||
post_rv = client.post('/order/tx/1',
|
||||
data={'regions': [[e.value for e in Regions]]})
|
||||
assert post_rv.status_code == HTTPStatus.OK
|
||||
# Order's status should still be sent
|
||||
# Order's status should change to sent
|
||||
db_order = Order.query.filter_by(uuid=uuid).first()
|
||||
assert db_order.status == OrderStatus.sent.value
|
||||
|
||||
|
@ -17,7 +17,7 @@ from common import new_invoice, place_order, generate_test_order
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
def client(mockredis):
|
||||
app = server.create_app(from_test=True)
|
||||
app.app_context().push()
|
||||
with app.test_client() as client:
|
||||
@ -347,13 +347,23 @@ def test_get_sent_orders_paging(mock_new_invoice, client):
|
||||
@patch('constants.FORCE_PAYMENT', True)
|
||||
def test_create_order_with_force_payment_enabled(mock_new_invoice, client):
|
||||
uuid_order1 = generate_test_order(mock_new_invoice, client)['uuid']
|
||||
uuid_order2 = generate_test_order(mock_new_invoice, client)['uuid']
|
||||
|
||||
db_order = Order.query.filter_by(uuid=uuid_order1).first()
|
||||
db_invoice = db_order.invoices[0]
|
||||
db_order1 = Order.query.filter_by(uuid=uuid_order1).first()
|
||||
db_invoice1 = db_order1.invoices[0]
|
||||
|
||||
# because FORCE_PAYMENT is set and this order had only 1 invoice
|
||||
# the expectation is that both the invoice and the order have the
|
||||
# paid status
|
||||
assert db_order.status == OrderStatus.paid.value
|
||||
assert db_invoice.status == InvoiceStatus.paid.value
|
||||
assert db_invoice.paid_at is not None
|
||||
db_order2 = Order.query.filter_by(uuid=uuid_order2).first()
|
||||
db_invoice2 = db_order2.invoices[0]
|
||||
|
||||
# Since FORCE_PAYMENT is set and both orders have only one invoice, both
|
||||
# orders change their statuses to paid. Furthermore, the payment triggers a
|
||||
# Tx start verification and, since the transmit queue is empty, order1
|
||||
# immediately changes to transmitting state. In contrast, order2 stays in
|
||||
# paid state as it needs to wait until the transmission of order1 finishes.
|
||||
assert db_order1.status == OrderStatus.transmitting.value
|
||||
assert db_invoice1.status == InvoiceStatus.paid.value
|
||||
assert db_invoice1.paid_at is not None
|
||||
|
||||
assert db_order2.status == OrderStatus.paid.value
|
||||
assert db_invoice2.status == InvoiceStatus.paid.value
|
||||
assert db_invoice2.paid_at is not None
|
||||
|
@ -4,23 +4,15 @@ from unittest.mock import patch
|
||||
|
||||
from common import generate_test_order
|
||||
from constants import InvoiceStatus, OrderStatus
|
||||
from transmitter import TxEngine
|
||||
import transmitter
|
||||
from database import db
|
||||
from schemas import order_schema
|
||||
|
||||
from models import Order
|
||||
from invoice_helpers import pay_invoice
|
||||
import constants
|
||||
import server
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mockredis(mocker):
|
||||
_mr = mocker.Mock(name="mockredis")
|
||||
mocker.patch("transmitter.redis", return_value=_mr)
|
||||
mocker.patch("transmitter.redis.from_url", return_value=_mr)
|
||||
return _mr
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app(mockredis):
|
||||
app = server.create_app(from_test=True)
|
||||
@ -35,12 +27,6 @@ def client(app):
|
||||
yield client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tx_engine():
|
||||
tx_engine = TxEngine()
|
||||
yield tx_engine
|
||||
|
||||
|
||||
def assert_redis_call(mockredis, order):
|
||||
msg = order_schema.dump(order)
|
||||
mockredis.publish.assert_called_with(channel='transmissions',
|
||||
@ -48,7 +34,7 @@ def assert_redis_call(mockredis, order):
|
||||
|
||||
|
||||
@patch('orders.new_invoice')
|
||||
def test_tx_resume(mock_new_invoice, client, tx_engine, mockredis):
|
||||
def test_tx_resume(mock_new_invoice, client, mockredis):
|
||||
# Create two orders: one in transmitting state, the other in pending state.
|
||||
first_order_uuid = generate_test_order(
|
||||
mock_new_invoice, client,
|
||||
@ -59,7 +45,7 @@ def test_tx_resume(mock_new_invoice, client, tx_engine, mockredis):
|
||||
client,
|
||||
order_id=2)['uuid']
|
||||
|
||||
tx_engine.tx_resume()
|
||||
transmitter.tx_resume()
|
||||
assert_redis_call(mockredis, first_db_order)
|
||||
|
||||
# The expectation is that the transmitting order changes to sent. The other
|
||||
@ -74,27 +60,21 @@ def test_tx_resume(mock_new_invoice, client, tx_engine, mockredis):
|
||||
|
||||
|
||||
@patch('orders.new_invoice')
|
||||
def test_tx_start(mock_new_invoice, client, tx_engine, mockredis):
|
||||
def test_tx_start(mock_new_invoice, client, mockredis):
|
||||
# Create two orders and pay only for the first.
|
||||
first_order_uuid = generate_test_order(mock_new_invoice, client)['uuid']
|
||||
second_order_uuid = generate_test_order(
|
||||
mock_new_invoice,
|
||||
client,
|
||||
order_id=2,
|
||||
invoice_status=InvoiceStatus.paid)['uuid']
|
||||
second_order_uuid = generate_test_order(mock_new_invoice,
|
||||
client,
|
||||
order_id=2)['uuid']
|
||||
|
||||
first_db_order = Order.query.filter_by(uuid=first_order_uuid).first()
|
||||
pay_invoice(first_db_order.invoices[0])
|
||||
db.session.commit()
|
||||
second_db_order = Order.query.filter_by(uuid=first_order_uuid).first()
|
||||
|
||||
tx_engine.tx_start(first_db_order)
|
||||
tx_engine.tx_start(second_db_order)
|
||||
pay_invoice(first_db_order.invoices[0])
|
||||
assert_redis_call(mockredis, first_db_order)
|
||||
assert_redis_call(mockredis, second_db_order)
|
||||
db.session.commit()
|
||||
|
||||
# The expectation is that the first order gets transmitted and the second
|
||||
# order stays untouched.
|
||||
# stays untouched. The invoice callback handler should call tx_start.
|
||||
first_db_order = Order.query.filter_by(uuid=first_order_uuid).first()
|
||||
assert first_db_order.status == OrderStatus.transmitting.value
|
||||
assert first_db_order.tx_seq_num is not None
|
||||
@ -103,42 +83,108 @@ def test_tx_start(mock_new_invoice, client, tx_engine, mockredis):
|
||||
assert second_db_order.status == OrderStatus.pending.value
|
||||
assert second_db_order.tx_seq_num is None
|
||||
|
||||
# Calling tx_start explicitly won't change anything since the second order
|
||||
# is still unpaid
|
||||
transmitter.tx_start()
|
||||
second_db_order = Order.query.filter_by(uuid=second_order_uuid).first()
|
||||
assert second_db_order.status == OrderStatus.pending.value
|
||||
assert second_db_order.tx_seq_num is None
|
||||
|
||||
|
||||
@patch('orders.new_invoice')
|
||||
def test_assign_tx_seq_num(mock_new_invoice, client, tx_engine):
|
||||
def test_assign_tx_seq_num(mock_new_invoice, client):
|
||||
# make some orders
|
||||
first_order_uuid = generate_test_order(
|
||||
mock_new_invoice, client, invoice_status=InvoiceStatus.paid)['uuid']
|
||||
first_order_uuid = generate_test_order(mock_new_invoice, client)['uuid']
|
||||
first_db_order = Order.query.filter_by(uuid=first_order_uuid).first()
|
||||
assert first_db_order.tx_seq_num is None
|
||||
|
||||
second_order_uuid = generate_test_order(
|
||||
mock_new_invoice,
|
||||
client,
|
||||
order_id=2,
|
||||
invoice_status=InvoiceStatus.paid)['uuid']
|
||||
second_order_uuid = generate_test_order(mock_new_invoice,
|
||||
client,
|
||||
order_id=2)['uuid']
|
||||
second_db_order = Order.query.filter_by(uuid=second_order_uuid).first()
|
||||
assert second_db_order.tx_seq_num is None
|
||||
|
||||
third_order_uuid = generate_test_order(
|
||||
mock_new_invoice,
|
||||
client,
|
||||
order_id=3,
|
||||
invoice_status=InvoiceStatus.paid)['uuid']
|
||||
third_order_uuid = generate_test_order(mock_new_invoice,
|
||||
client,
|
||||
order_id=3)['uuid']
|
||||
third_db_order = Order.query.filter_by(uuid=third_order_uuid).first()
|
||||
assert third_db_order.tx_seq_num is None
|
||||
|
||||
tx_engine.assign_tx_seq_num(first_db_order)
|
||||
transmitter.assign_tx_seq_num(first_db_order)
|
||||
db.session.commit()
|
||||
first_db_order = Order.query.filter_by(uuid=first_order_uuid).first()
|
||||
assert first_db_order.tx_seq_num == 1
|
||||
|
||||
tx_engine.assign_tx_seq_num(second_db_order)
|
||||
transmitter.assign_tx_seq_num(second_db_order)
|
||||
db.session.commit()
|
||||
second_db_order = Order.query.filter_by(uuid=second_order_uuid).first()
|
||||
assert second_db_order.tx_seq_num == 2
|
||||
|
||||
tx_engine.assign_tx_seq_num(third_db_order)
|
||||
transmitter.assign_tx_seq_num(third_db_order)
|
||||
db.session.commit()
|
||||
third_db_order = Order.query.filter_by(uuid=third_order_uuid).first()
|
||||
assert third_db_order.tx_seq_num == 3
|
||||
|
||||
|
||||
@patch('orders.new_invoice')
|
||||
def test_start(mock_new_invoice, client, mockredis):
|
||||
# prepare test env
|
||||
|
||||
# create an old transmitted order
|
||||
completed_order_uuid = generate_test_order(
|
||||
mock_new_invoice,
|
||||
client,
|
||||
invoice_status=InvoiceStatus.paid,
|
||||
order_status=OrderStatus.transmitting)['uuid']
|
||||
|
||||
# create two sendable orders
|
||||
first_sendable_order_uuid = generate_test_order(mock_new_invoice,
|
||||
client,
|
||||
order_id=5,
|
||||
bid=1000)['uuid']
|
||||
first_sendable_db_order = \
|
||||
Order.query.filter_by(uuid=first_sendable_order_uuid).first()
|
||||
pay_invoice(first_sendable_db_order.invoices[0])
|
||||
db.session.commit()
|
||||
|
||||
second_sendable_order_uuid = generate_test_order(mock_new_invoice,
|
||||
client,
|
||||
order_id=6,
|
||||
bid=2000)['uuid']
|
||||
second_sendable_db_order = \
|
||||
Order.query.filter_by(uuid=second_sendable_order_uuid).first()
|
||||
pay_invoice(second_sendable_db_order.invoices[0])
|
||||
db.session.commit()
|
||||
|
||||
# tx_resume() will finalize the completed order and call
|
||||
# tx_start() which leads to one of the paid orders being sent,
|
||||
# The tx_start() function method tries to send the other order
|
||||
# but it can't because the first sent order has not ended yet.
|
||||
# The direct call to tx_end() below will force ending the first
|
||||
# transmission and trigger the second transmission
|
||||
transmitter.tx_resume()
|
||||
transmitter.tx_start()
|
||||
first_sendable_db_order = \
|
||||
Order.query.filter_by(uuid=first_sendable_order_uuid).first()
|
||||
second_sendable_db_order = \
|
||||
Order.query.filter_by(uuid=second_sendable_order_uuid).first()
|
||||
assert first_sendable_db_order.status == OrderStatus.paid.value
|
||||
assert second_sendable_db_order.status == OrderStatus.transmitting.value
|
||||
transmitter.tx_end(second_sendable_db_order)
|
||||
|
||||
completed_db_order = \
|
||||
Order.query.filter_by(uuid=completed_order_uuid).first()
|
||||
assert completed_db_order.status == OrderStatus.sent.value
|
||||
assert completed_db_order.ended_transmission_at is not None
|
||||
|
||||
first_sendable_db_order = \
|
||||
Order.query.filter_by(uuid=first_sendable_order_uuid).first()
|
||||
second_sendable_db_order = \
|
||||
Order.query.filter_by(uuid=second_sendable_order_uuid).first()
|
||||
assert first_sendable_db_order.status ==\
|
||||
constants.OrderStatus.transmitting.value
|
||||
assert second_sendable_db_order.status ==\
|
||||
constants.OrderStatus.sent.value
|
||||
# The second order has a higher bid_per_byte, so it should be sent first
|
||||
assert first_sendable_db_order.tx_seq_num == 2
|
||||
assert second_sendable_db_order.tx_seq_num == 1
|
||||
|
77
server/tests/test_worker_manager.py
Normal file
77
server/tests/test_worker_manager.py
Normal file
@ -0,0 +1,77 @@
|
||||
from datetime import datetime, timedelta
|
||||
import os
|
||||
import pytest
|
||||
from unittest.mock import patch
|
||||
|
||||
from common import generate_test_order
|
||||
from constants import InvoiceStatus, OrderStatus
|
||||
from database import db
|
||||
from models import Order, Invoice
|
||||
import constants
|
||||
import server
|
||||
from worker_manager import cleanup_database
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app():
|
||||
app = server.create_app(from_test=True)
|
||||
app.app_context().push()
|
||||
yield app
|
||||
server.teardown_app(app)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(app):
|
||||
with app.test_client() as client:
|
||||
yield client
|
||||
|
||||
|
||||
@patch('orders.new_invoice')
|
||||
def test_cleanup_database(mock_new_invoice, client, app):
|
||||
# prepare test env
|
||||
|
||||
# create an invoice that must get expired
|
||||
pending_invoice_lid = generate_test_order(
|
||||
mock_new_invoice, client, order_id=2)['lightning_invoice']['id']
|
||||
pending_db_invoice = \
|
||||
Invoice.query.filter_by(lid=pending_invoice_lid).first()
|
||||
pending_db_invoice.expires_at = datetime.utcnow() - timedelta(days=1)
|
||||
db.session.commit()
|
||||
|
||||
# create an order that must get expired
|
||||
pending_order_uuid = generate_test_order(mock_new_invoice,
|
||||
client,
|
||||
order_id=3)['uuid']
|
||||
pending_db_order = Order.query.filter_by(uuid=pending_order_uuid).first()
|
||||
pending_db_order.created_at = datetime.utcnow() - \
|
||||
timedelta(days=constants.EXPIRE_PENDING_ORDERS_AFTER_DAYS
|
||||
+ 1)
|
||||
db.session.commit()
|
||||
|
||||
# Create an order whose transmission ended a long time ago. The
|
||||
# corresponding message file should be deleted.
|
||||
sent_order_uuid = generate_test_order(
|
||||
mock_new_invoice,
|
||||
client,
|
||||
order_id=4,
|
||||
invoice_status=InvoiceStatus.paid)['uuid']
|
||||
sent_db_order = Order.query.filter_by(uuid=sent_order_uuid).first()
|
||||
sent_db_order.ended_transmission_at = datetime.utcnow() -\
|
||||
timedelta(days=constants.MESSAGE_FILE_RETENTION_TIME_DAYS
|
||||
+ 1)
|
||||
db.session.commit()
|
||||
|
||||
cleanup_database(app)
|
||||
|
||||
pending_db_invoice = \
|
||||
Invoice.query.filter_by(lid=pending_invoice_lid).first()
|
||||
assert pending_db_invoice.status == InvoiceStatus.expired.value
|
||||
|
||||
pending_db_order = Order.query.filter_by(uuid=pending_order_uuid).first()
|
||||
assert pending_db_order.status == OrderStatus.expired.value
|
||||
|
||||
message_path = os.path.join(constants.MSG_STORE_PATH, pending_order_uuid)
|
||||
assert not os.path.exists(message_path)
|
||||
|
||||
message_path = os.path.join(constants.MSG_STORE_PATH, sent_order_uuid)
|
||||
assert not os.path.exists(message_path)
|
@ -1,8 +1,6 @@
|
||||
import datetime
|
||||
import logging
|
||||
import time
|
||||
|
||||
import redis
|
||||
from flask import current_app
|
||||
|
||||
import constants
|
||||
@ -11,81 +9,63 @@ from models import Order
|
||||
from schemas import order_schema
|
||||
|
||||
|
||||
class TxEngine:
|
||||
def __init__(self):
|
||||
self.redis = redis.from_url(current_app.config.get("REDIS_URL"))
|
||||
def assign_tx_seq_num(order):
|
||||
"""Assign Tx sequence number to order"""
|
||||
last_tx_order = Order.query.order_by(Order.tx_seq_num.desc()).first()
|
||||
|
||||
def assign_tx_seq_num(self, order):
|
||||
"""Assign Tx sequence number to order"""
|
||||
last_tx_order = Order.query.order_by(Order.tx_seq_num.desc()).first()
|
||||
if last_tx_order.tx_seq_num:
|
||||
order.tx_seq_num = last_tx_order.tx_seq_num + 1
|
||||
else:
|
||||
order.tx_seq_num = 1
|
||||
db.session.commit()
|
||||
|
||||
if last_tx_order.tx_seq_num:
|
||||
order.tx_seq_num = last_tx_order.tx_seq_num + 1
|
||||
else:
|
||||
order.tx_seq_num = 1
|
||||
db.session.commit()
|
||||
|
||||
def publish_to_sse_server(self, order):
|
||||
msg = order_schema.dumps(order)
|
||||
self.redis.publish(channel=constants.SUB_CHANNELS[0], message=msg)
|
||||
def redis():
|
||||
return current_app.config.get("REDIS_INSTANCE")
|
||||
|
||||
|
||||
def publish_to_sse_server(order):
|
||||
msg = order_schema.dumps(order)
|
||||
redis().publish(channel=constants.SUB_CHANNELS[0], message=msg)
|
||||
return
|
||||
|
||||
|
||||
def tx_start():
|
||||
# Do not start a new transmission if another order is
|
||||
# being transmitted right now
|
||||
transmitting_orders = Order.query.filter_by(
|
||||
status=constants.OrderStatus.transmitting.value).all()
|
||||
if transmitting_orders:
|
||||
return
|
||||
# Pick the order with the highest bid and start transmission
|
||||
order = Order.query.filter_by(
|
||||
status=constants.OrderStatus.paid.value).order_by(
|
||||
Order.bid_per_byte.desc()).first()
|
||||
if order:
|
||||
logging.info(f'transmission start {order.uuid}')
|
||||
assign_tx_seq_num(order)
|
||||
order.status = constants.OrderStatus.transmitting.value
|
||||
order.started_transmission_at = datetime.datetime.utcnow()
|
||||
db.session.commit()
|
||||
publish_to_sse_server(order)
|
||||
|
||||
def tx_start(self, order):
|
||||
"""Start transmission"""
|
||||
if order.status == constants.OrderStatus.paid.value:
|
||||
logging.info(f'transmission start {order.uuid}')
|
||||
self.assign_tx_seq_num(order)
|
||||
order.status = constants.OrderStatus.transmitting.value
|
||||
order.started_transmission_at = datetime.datetime.utcnow()
|
||||
db.session.commit()
|
||||
self.publish_to_sse_server(order)
|
||||
|
||||
def tx_end(self, order):
|
||||
"""End transmission"""
|
||||
if order.status == constants.OrderStatus.transmitting.value:
|
||||
logging.info(f'transmission end {order.uuid}')
|
||||
order.status = constants.OrderStatus.sent.value
|
||||
order.ended_transmission_at = datetime.datetime.utcnow()
|
||||
db.session.commit()
|
||||
self.publish_to_sse_server(order)
|
||||
def tx_end(order):
|
||||
"""End transmission"""
|
||||
if order.status == constants.OrderStatus.transmitting.value:
|
||||
logging.info(f'transmission end {order.uuid}')
|
||||
order.status = constants.OrderStatus.sent.value
|
||||
order.ended_transmission_at = datetime.datetime.utcnow()
|
||||
db.session.commit()
|
||||
publish_to_sse_server(order)
|
||||
# Start the next queued order as soon as the current order finishes
|
||||
tx_start()
|
||||
|
||||
def tx_resume(self):
|
||||
"""Resume interrupted transmissions"""
|
||||
orders = Order.query.filter_by(
|
||||
status=constants.OrderStatus.transmitting.value).all()
|
||||
for order in orders:
|
||||
logging.info(f'resuming interrupted transmission {order.uuid}')
|
||||
self.tx_end(order)
|
||||
|
||||
# rounds = -1 means run forever and is the default value. Values other than
|
||||
# -1 are mainly used from tests.
|
||||
def start(self, rounds=-1):
|
||||
logging.info("Starting transmitter")
|
||||
round_count = 0
|
||||
self.tx_resume()
|
||||
|
||||
while True:
|
||||
sendable_order = None
|
||||
while sendable_order is None:
|
||||
# Look for an elligble order to transmit and, if one is found,
|
||||
# begin transmitting it.
|
||||
sendable_order = Order.query.filter_by(
|
||||
status=constants.OrderStatus.paid.value).order_by(
|
||||
Order.bid_per_byte.desc()).first()
|
||||
if sendable_order:
|
||||
self.tx_start(sendable_order)
|
||||
else:
|
||||
time.sleep(1.0)
|
||||
|
||||
if constants.TRANSMIT_RATE:
|
||||
transmit_time = \
|
||||
sendable_order.message_size / constants.TRANSMIT_RATE
|
||||
logging.info(f'sleeping for {transmit_time} while \
|
||||
{sendable_order.uuid} transmits')
|
||||
time.sleep(transmit_time)
|
||||
|
||||
self.tx_end(sendable_order)
|
||||
if rounds > -1:
|
||||
round_count += 1
|
||||
if round_count >= rounds:
|
||||
return
|
||||
def tx_resume():
|
||||
"""Resume interrupted transmissions"""
|
||||
orders = Order.query.filter_by(
|
||||
status=constants.OrderStatus.transmitting.value).all()
|
||||
for order in orders:
|
||||
logging.info(f'resuming interrupted transmission {order.uuid}')
|
||||
tx_end(order)
|
||||
|
@ -1,12 +1,13 @@
|
||||
import logging
|
||||
|
||||
from flask import Flask
|
||||
import redis
|
||||
|
||||
import constants
|
||||
import invoice_helpers
|
||||
import order_helpers
|
||||
import transmitter
|
||||
from database import db
|
||||
from transmitter import TxEngine
|
||||
from worker import Worker
|
||||
|
||||
ONE_MINUTE = 60
|
||||
@ -29,14 +30,22 @@ def cleanup_database(app):
|
||||
"{} orders, and removed {} files".format(*work))
|
||||
|
||||
|
||||
def start_workers(app):
|
||||
# Workers
|
||||
cleanup_worker = Worker(period=CLEANUP_DUTY_CYCLE,
|
||||
fcn=cleanup_database,
|
||||
args=(app, ),
|
||||
name="database cleaner")
|
||||
|
||||
cleanup_worker.thread.join()
|
||||
|
||||
|
||||
def create_app():
|
||||
app = Flask(__name__)
|
||||
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{constants.DB_FILE}'
|
||||
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
||||
app.config["REDIS_URL"] = constants.REDIS_URI
|
||||
app.config["REDIS_INSTANCE"] = redis.from_url(constants.REDIS_URI)
|
||||
db.init_app(app)
|
||||
with app.app_context():
|
||||
db.create_all()
|
||||
return app
|
||||
|
||||
|
||||
@ -44,16 +53,13 @@ def main():
|
||||
logging.basicConfig(level=logging.DEBUG, format=constants.LOGGING_FORMAT)
|
||||
app = create_app()
|
||||
|
||||
# Workers
|
||||
Worker(period=CLEANUP_DUTY_CYCLE,
|
||||
fcn=cleanup_database,
|
||||
args=(app, ),
|
||||
name="database cleaner")
|
||||
|
||||
# Main Tx Engine
|
||||
with app.app_context():
|
||||
tx_engine = TxEngine()
|
||||
tx_engine.start()
|
||||
db.create_all()
|
||||
# In order to avoid running resume/start on each gunicorn worker, these
|
||||
# calls are being made from this module only instead of the main server
|
||||
transmitter.tx_resume()
|
||||
transmitter.tx_start()
|
||||
start_workers(app)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
3
server/workers.sh
Normal file
3
server/workers.sh
Normal file
@ -0,0 +1,3 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
python3 worker_manager.py
|
Loading…
Reference in New Issue
Block a user