blockstream-satellite-api/server/tests/test_worker.py
Blockstream Satellite 9d421771e7 Port the satellite API to python
- Preserve the SQLite database and use SQLAlchemy to wrap db
  interactions.
- Use Alembic for database migrations.
- Organize all the python modules on the new server/ directory.
- Use pytest for unit tests and organize test modules at server/tests/.
2021-07-20 12:28:08 -03:00

22 lines
497 B
Python

import queue
import time
import worker
def test_worker():
product_queue = queue.Queue()
def multiply(a, b, q):
return q.put(a * b)
period = 1.0
w = worker.Worker(period, fcn=multiply, args=(2, 3, product_queue))
# Sleep 1/3 of the period and stop the worker before the subsequent
# period
time.sleep(period / 3)
w.stop()
product = product_queue.get()
product_queue.task_done()
assert (product == (2 * 3))
assert (product_queue.empty())