mirror of
https://gitlab.torproject.org/tpo/core/tor.git
synced 2024-11-20 10:12:15 +01:00
Multithreading support for event-queue code.
This commit is contained in:
parent
81f3572467
commit
e2a6a7ec61
@ -594,13 +594,20 @@ typedef struct queued_event_s {
|
||||
char *msg;
|
||||
} queued_event_t;
|
||||
|
||||
/** If this is greater than 0, we don't allow new events to be queued. */
|
||||
/** If this is greater than 0, we don't allow new events to be queued.
|
||||
* XXXX This should be thread-local. */
|
||||
static int block_event_queue = 0;
|
||||
|
||||
/** Holds a smartlist of queued_event_t objects that may need to be sent
|
||||
* to one or more controllers */
|
||||
static smartlist_t *queued_control_events = NULL;
|
||||
|
||||
/** True if the flush_queued_events_event is pending. */
|
||||
static int flush_queued_event_pending = 0;
|
||||
|
||||
/** Lock to protect the above fields. */
|
||||
static tor_mutex_t *queued_control_events_lock = NULL;
|
||||
|
||||
/** An event that should fire in order to flush the contents of
|
||||
* queued_control_events. */
|
||||
static struct event *flush_queued_events_event = NULL;
|
||||
@ -621,6 +628,10 @@ control_initialize_event_queue(void)
|
||||
tor_assert(flush_queued_events_event);
|
||||
}
|
||||
}
|
||||
|
||||
if (queued_control_events_lock == NULL) {
|
||||
queued_control_events_lock = tor_mutex_new();
|
||||
}
|
||||
}
|
||||
|
||||
/** Helper: inserts an event on the list of events queued to be sent to
|
||||
@ -643,30 +654,43 @@ queue_control_event_string,(uint16_t event, char *msg))
|
||||
tor_free(msg);
|
||||
return;
|
||||
}
|
||||
if (block_event_queue) {
|
||||
tor_free(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
queued_event_t *ev = tor_malloc(sizeof(*ev));
|
||||
ev->event = event;
|
||||
ev->msg = msg;
|
||||
|
||||
tor_mutex_acquire(queued_control_events_lock);
|
||||
if (block_event_queue) { /* XXXX This should be thread-specific. */
|
||||
tor_mutex_release(queued_control_events_lock);
|
||||
tor_free(msg);
|
||||
tor_free(ev);
|
||||
return;
|
||||
}
|
||||
|
||||
/* No queueing an event while queueing an event */
|
||||
++block_event_queue;
|
||||
|
||||
tor_assert(queued_control_events);
|
||||
smartlist_add(queued_control_events, ev);
|
||||
|
||||
/* We just put the first event on the queue; mark the queue to be
|
||||
* flushed.
|
||||
*/
|
||||
if (smartlist_len(queued_control_events) == 1) {
|
||||
tor_assert(flush_queued_events_event);
|
||||
event_active(flush_queued_events_event, EV_READ, 1);
|
||||
int activate_event = 0;
|
||||
if (! flush_queued_event_pending && in_main_thread()) {
|
||||
activate_event = 1;
|
||||
flush_queued_event_pending = 1;
|
||||
}
|
||||
|
||||
--block_event_queue;
|
||||
|
||||
tor_mutex_release(queued_control_events_lock);
|
||||
|
||||
/* We just put an event on the queue; mark the queue to be
|
||||
* flushed. We only do this from the main thread for now; otherwise,
|
||||
* we'd need to incur locking overhead in Libevent or use a socket.
|
||||
*/
|
||||
if (activate_event) {
|
||||
tor_assert(flush_queued_events_event);
|
||||
event_active(flush_queued_events_event, EV_READ, 1);
|
||||
}
|
||||
}
|
||||
|
||||
/** Release all storage held by <b>ev</b>. */
|
||||
@ -687,15 +711,20 @@ queued_event_free(queued_event_t *ev)
|
||||
static void
|
||||
queued_events_flush_all(int force)
|
||||
{
|
||||
smartlist_t *all_conns = get_connection_array();
|
||||
smartlist_t *controllers = smartlist_new();
|
||||
|
||||
if (PREDICT_UNLIKELY(queued_control_events == NULL)) {
|
||||
return;
|
||||
}
|
||||
smartlist_t *all_conns = get_connection_array();
|
||||
smartlist_t *controllers = smartlist_new();
|
||||
smartlist_t *queued_events;
|
||||
|
||||
tor_mutex_acquire(queued_control_events_lock);
|
||||
/* No queueing an event while flushing events. */
|
||||
++block_event_queue;
|
||||
flush_queued_event_pending = 0;
|
||||
queued_events = queued_control_events;
|
||||
queued_control_events = smartlist_new();
|
||||
tor_mutex_release(queued_control_events_lock);
|
||||
|
||||
/* Gather all the controllers that will care... */
|
||||
SMARTLIST_FOREACH_BEGIN(all_conns, connection_t *, conn) {
|
||||
@ -708,7 +737,7 @@ queued_events_flush_all(int force)
|
||||
}
|
||||
} SMARTLIST_FOREACH_END(conn);
|
||||
|
||||
SMARTLIST_FOREACH_BEGIN(queued_control_events, queued_event_t *, ev) {
|
||||
SMARTLIST_FOREACH_BEGIN(queued_events, queued_event_t *, ev) {
|
||||
const event_mask_t bit = ((event_mask_t)1) << ev->event;
|
||||
const size_t msg_len = strlen(ev->msg);
|
||||
SMARTLIST_FOREACH_BEGIN(controllers, control_connection_t *,
|
||||
@ -728,10 +757,12 @@ queued_events_flush_all(int force)
|
||||
} SMARTLIST_FOREACH_END(control_conn);
|
||||
}
|
||||
|
||||
smartlist_clear(queued_control_events);
|
||||
smartlist_free(queued_events);
|
||||
smartlist_free(controllers);
|
||||
|
||||
tor_mutex_acquire(queued_control_events_lock);
|
||||
--block_event_queue;
|
||||
tor_mutex_release(queued_control_events_lock);
|
||||
}
|
||||
|
||||
/** Libevent callback: Flushes pending events to controllers that are
|
||||
|
Loading…
Reference in New Issue
Block a user