mirror of
https://github.com/bitcoin/bitcoin.git
synced 2024-11-19 09:53:47 +01:00
scripted-diff: Rename SingleThreadedSchedulerClient to SerialTaskRunner
-BEGIN VERIFY SCRIPT- s() { git grep -l "$1" src | (grep -v "$3" || cat;) | xargs sed -i "s/$1/$2/g"; } s 'SingleThreadedSchedulerClient' 'SerialTaskRunner' '' s 'SinglethreadedSchedulerClient' 'SerialTaskRunner' '' s 'm_schedulerClient' 'm_task_runner' '' s 'AddToProcessQueue' 'insert' '' s 'EmptyQueue' 'flush' '' s 'CallbacksPending' 'size' 'validation' sed -i '109s/CallbacksPending/size/' src/validationinterface.cpp -END VERIFY SCRIPT- Co-authored-by: Russell Yanofsky <russ@yanofsky.org>
This commit is contained in:
parent
4abde2c4e3
commit
0d6d2b650d
@ -129,7 +129,7 @@ bool CScheduler::AreThreadsServicingQueue() const
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue()
|
void SerialTaskRunner::MaybeScheduleProcessQueue()
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
LOCK(m_callbacks_mutex);
|
LOCK(m_callbacks_mutex);
|
||||||
@ -142,7 +142,7 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue()
|
|||||||
m_scheduler.schedule([this] { this->ProcessQueue(); }, std::chrono::steady_clock::now());
|
m_scheduler.schedule([this] { this->ProcessQueue(); }, std::chrono::steady_clock::now());
|
||||||
}
|
}
|
||||||
|
|
||||||
void SingleThreadedSchedulerClient::ProcessQueue()
|
void SerialTaskRunner::ProcessQueue()
|
||||||
{
|
{
|
||||||
std::function<void()> callback;
|
std::function<void()> callback;
|
||||||
{
|
{
|
||||||
@ -158,8 +158,8 @@ void SingleThreadedSchedulerClient::ProcessQueue()
|
|||||||
// RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
|
// RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
|
||||||
// to ensure both happen safely even if callback() throws.
|
// to ensure both happen safely even if callback() throws.
|
||||||
struct RAIICallbacksRunning {
|
struct RAIICallbacksRunning {
|
||||||
SingleThreadedSchedulerClient* instance;
|
SerialTaskRunner* instance;
|
||||||
explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
|
explicit RAIICallbacksRunning(SerialTaskRunner* _instance) : instance(_instance) {}
|
||||||
~RAIICallbacksRunning()
|
~RAIICallbacksRunning()
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
@ -173,7 +173,7 @@ void SingleThreadedSchedulerClient::ProcessQueue()
|
|||||||
callback();
|
callback();
|
||||||
}
|
}
|
||||||
|
|
||||||
void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func)
|
void SerialTaskRunner::insert(std::function<void()> func)
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
LOCK(m_callbacks_mutex);
|
LOCK(m_callbacks_mutex);
|
||||||
@ -182,7 +182,7 @@ void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func
|
|||||||
MaybeScheduleProcessQueue();
|
MaybeScheduleProcessQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
void SingleThreadedSchedulerClient::EmptyQueue()
|
void SerialTaskRunner::flush()
|
||||||
{
|
{
|
||||||
assert(!m_scheduler.AreThreadsServicingQueue());
|
assert(!m_scheduler.AreThreadsServicingQueue());
|
||||||
bool should_continue = true;
|
bool should_continue = true;
|
||||||
@ -193,7 +193,7 @@ void SingleThreadedSchedulerClient::EmptyQueue()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t SingleThreadedSchedulerClient::CallbacksPending()
|
size_t SerialTaskRunner::size()
|
||||||
{
|
{
|
||||||
LOCK(m_callbacks_mutex);
|
LOCK(m_callbacks_mutex);
|
||||||
return m_callbacks_pending.size();
|
return m_callbacks_pending.size();
|
||||||
|
@ -120,7 +120,7 @@ private:
|
|||||||
* B() will be able to observe all of the effects of callback A() which executed
|
* B() will be able to observe all of the effects of callback A() which executed
|
||||||
* before it.
|
* before it.
|
||||||
*/
|
*/
|
||||||
class SingleThreadedSchedulerClient
|
class SerialTaskRunner
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
CScheduler& m_scheduler;
|
CScheduler& m_scheduler;
|
||||||
@ -133,7 +133,7 @@ private:
|
|||||||
void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
|
void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
explicit SingleThreadedSchedulerClient(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {}
|
explicit SerialTaskRunner(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a callback to be executed. Callbacks are executed serially
|
* Add a callback to be executed. Callbacks are executed serially
|
||||||
@ -141,15 +141,15 @@ public:
|
|||||||
* Practically, this means that callbacks can behave as if they are executed
|
* Practically, this means that callbacks can behave as if they are executed
|
||||||
* in order by a single thread.
|
* in order by a single thread.
|
||||||
*/
|
*/
|
||||||
void AddToProcessQueue(std::function<void()> func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
|
void insert(std::function<void()> func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Processes all remaining queue members on the calling thread, blocking until queue is empty
|
* Processes all remaining queue members on the calling thread, blocking until queue is empty
|
||||||
* Must be called after the CScheduler has no remaining processing threads!
|
* Must be called after the CScheduler has no remaining processing threads!
|
||||||
*/
|
*/
|
||||||
void EmptyQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
|
void flush() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
|
||||||
|
|
||||||
size_t CallbacksPending() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
|
size_t size() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif // BITCOIN_SCHEDULER_H
|
#endif // BITCOIN_SCHEDULER_H
|
||||||
|
@ -129,8 +129,8 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
|
|||||||
CScheduler scheduler;
|
CScheduler scheduler;
|
||||||
|
|
||||||
// each queue should be well ordered with respect to itself but not other queues
|
// each queue should be well ordered with respect to itself but not other queues
|
||||||
SingleThreadedSchedulerClient queue1(scheduler);
|
SerialTaskRunner queue1(scheduler);
|
||||||
SingleThreadedSchedulerClient queue2(scheduler);
|
SerialTaskRunner queue2(scheduler);
|
||||||
|
|
||||||
// create more threads than queues
|
// create more threads than queues
|
||||||
// if the queues only permit execution of one task at once then
|
// if the queues only permit execution of one task at once then
|
||||||
@ -142,7 +142,7 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
|
|||||||
threads.emplace_back([&] { scheduler.serviceQueue(); });
|
threads.emplace_back([&] { scheduler.serviceQueue(); });
|
||||||
}
|
}
|
||||||
|
|
||||||
// these are not atomic, if SinglethreadedSchedulerClient prevents
|
// these are not atomic, if SerialTaskRunner prevents
|
||||||
// parallel execution at the queue level no synchronization should be required here
|
// parallel execution at the queue level no synchronization should be required here
|
||||||
int counter1 = 0;
|
int counter1 = 0;
|
||||||
int counter2 = 0;
|
int counter2 = 0;
|
||||||
@ -150,12 +150,12 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
|
|||||||
// just simply count up on each queue - if execution is properly ordered then
|
// just simply count up on each queue - if execution is properly ordered then
|
||||||
// the callbacks should run in exactly the order in which they were enqueued
|
// the callbacks should run in exactly the order in which they were enqueued
|
||||||
for (int i = 0; i < 100; ++i) {
|
for (int i = 0; i < 100; ++i) {
|
||||||
queue1.AddToProcessQueue([i, &counter1]() {
|
queue1.insert([i, &counter1]() {
|
||||||
bool expectation = i == counter1++;
|
bool expectation = i == counter1++;
|
||||||
assert(expectation);
|
assert(expectation);
|
||||||
});
|
});
|
||||||
|
|
||||||
queue2.AddToProcessQueue([i, &counter2]() {
|
queue2.insert([i, &counter2]() {
|
||||||
bool expectation = i == counter2++;
|
bool expectation = i == counter2++;
|
||||||
assert(expectation);
|
assert(expectation);
|
||||||
});
|
});
|
||||||
|
@ -45,9 +45,9 @@ public:
|
|||||||
// We are not allowed to assume the scheduler only runs in one thread,
|
// We are not allowed to assume the scheduler only runs in one thread,
|
||||||
// but must ensure all callbacks happen in-order, so we end up creating
|
// but must ensure all callbacks happen in-order, so we end up creating
|
||||||
// our own queue here :(
|
// our own queue here :(
|
||||||
SingleThreadedSchedulerClient m_schedulerClient;
|
SerialTaskRunner m_task_runner;
|
||||||
|
|
||||||
explicit MainSignalsImpl(CScheduler& scheduler LIFETIMEBOUND) : m_schedulerClient(scheduler) {}
|
explicit MainSignalsImpl(CScheduler& scheduler LIFETIMEBOUND) : m_task_runner(scheduler) {}
|
||||||
|
|
||||||
void Register(std::shared_ptr<CValidationInterface> callbacks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
|
void Register(std::shared_ptr<CValidationInterface> callbacks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
|
||||||
{
|
{
|
||||||
@ -101,12 +101,12 @@ CMainSignals::~CMainSignals() {}
|
|||||||
|
|
||||||
void CMainSignals::FlushBackgroundCallbacks()
|
void CMainSignals::FlushBackgroundCallbacks()
|
||||||
{
|
{
|
||||||
m_internals->m_schedulerClient.EmptyQueue();
|
m_internals->m_task_runner.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t CMainSignals::CallbacksPending()
|
size_t CMainSignals::CallbacksPending()
|
||||||
{
|
{
|
||||||
return m_internals->m_schedulerClient.CallbacksPending();
|
return m_internals->m_task_runner.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
void CMainSignals::RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
|
void CMainSignals::RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
|
||||||
@ -140,7 +140,7 @@ void CMainSignals::UnregisterAllValidationInterfaces()
|
|||||||
|
|
||||||
void CMainSignals::CallFunctionInValidationInterfaceQueue(std::function<void()> func)
|
void CMainSignals::CallFunctionInValidationInterfaceQueue(std::function<void()> func)
|
||||||
{
|
{
|
||||||
m_internals->m_schedulerClient.AddToProcessQueue(std::move(func));
|
m_internals->m_task_runner.insert(std::move(func));
|
||||||
}
|
}
|
||||||
|
|
||||||
void CMainSignals::SyncWithValidationInterfaceQueue()
|
void CMainSignals::SyncWithValidationInterfaceQueue()
|
||||||
@ -162,7 +162,7 @@ void CMainSignals::SyncWithValidationInterfaceQueue()
|
|||||||
do { \
|
do { \
|
||||||
auto local_name = (name); \
|
auto local_name = (name); \
|
||||||
LOG_EVENT("Enqueuing " fmt, local_name, __VA_ARGS__); \
|
LOG_EVENT("Enqueuing " fmt, local_name, __VA_ARGS__); \
|
||||||
m_internals->m_schedulerClient.AddToProcessQueue([=] { \
|
m_internals->m_task_runner.insert([=] { \
|
||||||
LOG_EVENT(fmt, local_name, __VA_ARGS__); \
|
LOG_EVENT(fmt, local_name, __VA_ARGS__); \
|
||||||
event(); \
|
event(); \
|
||||||
}); \
|
}); \
|
||||||
|
Loading…
Reference in New Issue
Block a user