blockstream-satellite-api/server/worker.py
Blockstream Satellite 76a51eec52 Add missing blank lines around class methods
This verification was added on yapf 0.32 and it follows the pep8
specification.
2021-12-28 12:22:58 -03:00

41 lines
1 KiB
Python

import logging
import threading
import time
class Worker:
"""Worker Class
A class to execute periodic background tasks.
Args:
period : Worker period in seconds.
fcn : Function to execute every period.
args : Tuple with arguments for the given function.
name : Optional worker name.
"""
def __init__(self, period, fcn, args, name=""):
assert (isinstance(args, tuple))
self.period = period
self.fcn = fcn
self.args = args
self.name = name
self.enable = True
logging.info(f"Starting worker: {self.name}")
self.thread = threading.Thread(target=self.loop, daemon=True)
self.thread.start()
def loop(self):
next_call = time.time()
while (self.enable):
self.fcn(*self.args)
next_call += self.period
sleep = next_call - time.time()
if (sleep > 0):
time.sleep(sleep)
def stop(self):
logging.info(f"Stopping worker: {self.name}")
self.enable = False