Convert relay.c/relay.h to channel_t

Note: this is a squashed commit; see branch bug6465_rebased_v2 of user/andrea/tor.git for full history of the following 10 commits:

Convert relay.c/relay.h to channel_t
Updating the timestamp if n_flushed > 0 at the end of channel_flush_from_first_active_circuit() was redundant since channel_write_cell() et al. do it themselves.
Get rid of now-unnecessary time parameter in channel_flush_from_first_active_circuit()
Get rid of now-unnecessary time parameter in channel_flush_from_first_active_circuit() in connection_or.c
Add non-inlined external call for channeltls.c to free a packed_cell_t
Appease make check-spaces in relay.c
Replace channel_get_write_queue_len() with sufficient and easier to implement channel_has_queued_writes() in relay.c
Rename channel_touched_by_client() and client_used field for consistency with other timestamps in relay.c
Don't double-free packed cells in relay.c (channel_t Tor now bootstraps and works as a client)
Rearrange channel_t struct to use a union distinguishing listener from cell-bearing channels in relay.c
This commit is contained in:
Andrea Shepard 2012-08-25 14:30:01 -07:00 committed by Andrea Shepard
parent 4768c0efe3
commit e136f7ccb4
3 changed files with 236 additions and 181 deletions

View file

@ -265,9 +265,6 @@ or_connection_new(int socket_family)
or_conn->timestamp_last_added_nonpadding = time(NULL); or_conn->timestamp_last_added_nonpadding = time(NULL);
or_conn->active_circuit_pqueue = smartlist_new();
or_conn->active_circuit_pqueue_last_recalibrated = cell_ewma_get_tick();
return or_conn; return or_conn;
} }
@ -508,7 +505,6 @@ _connection_free(connection_t *conn)
or_conn->tls = NULL; or_conn->tls = NULL;
or_handshake_state_free(or_conn->handshake_state); or_handshake_state_free(or_conn->handshake_state);
or_conn->handshake_state = NULL; or_conn->handshake_state = NULL;
smartlist_free(or_conn->active_circuit_pqueue);
tor_free(or_conn->nickname); tor_free(or_conn->nickname);
} }
if (conn->type == CONN_TYPE_AP) { if (conn->type == CONN_TYPE_AP) {

View file

@ -14,6 +14,7 @@
#define RELAY_PRIVATE #define RELAY_PRIVATE
#include "or.h" #include "or.h"
#include "buffers.h" #include "buffers.h"
#include "channel.h"
#include "circuitbuild.h" #include "circuitbuild.h"
#include "circuitlist.h" #include "circuitlist.h"
#include "config.h" #include "config.h"
@ -166,7 +167,7 @@ int
circuit_receive_relay_cell(cell_t *cell, circuit_t *circ, circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
cell_direction_t cell_direction) cell_direction_t cell_direction)
{ {
or_connection_t *or_conn=NULL; channel_t *chan = NULL;
crypt_path_t *layer_hint=NULL; crypt_path_t *layer_hint=NULL;
char recognized=0; char recognized=0;
int reason; int reason;
@ -213,17 +214,17 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
/* not recognized. pass it on. */ /* not recognized. pass it on. */
if (cell_direction == CELL_DIRECTION_OUT) { if (cell_direction == CELL_DIRECTION_OUT) {
cell->circ_id = circ->n_circ_id; /* switch it */ cell->circ_id = circ->n_circ_id; /* switch it */
or_conn = circ->n_conn; chan = circ->n_chan;
} else if (! CIRCUIT_IS_ORIGIN(circ)) { } else if (! CIRCUIT_IS_ORIGIN(circ)) {
cell->circ_id = TO_OR_CIRCUIT(circ)->p_circ_id; /* switch it */ cell->circ_id = TO_OR_CIRCUIT(circ)->p_circ_id; /* switch it */
or_conn = TO_OR_CIRCUIT(circ)->p_conn; chan = TO_OR_CIRCUIT(circ)->p_chan;
} else { } else {
log_fn(LOG_PROTOCOL_WARN, LD_OR, log_fn(LOG_PROTOCOL_WARN, LD_OR,
"Dropping unrecognized inbound cell on origin circuit."); "Dropping unrecognized inbound cell on origin circuit.");
return 0; return 0;
} }
if (!or_conn) { if (!chan) {
// XXXX Can this splice stuff be done more cleanly? // XXXX Can this splice stuff be done more cleanly?
if (! CIRCUIT_IS_ORIGIN(circ) && if (! CIRCUIT_IS_ORIGIN(circ) &&
TO_OR_CIRCUIT(circ)->rend_splice && TO_OR_CIRCUIT(circ)->rend_splice &&
@ -254,7 +255,7 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
* we might kill the circ before we relay * we might kill the circ before we relay
* the cells. */ * the cells. */
append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction, 0); append_cell_to_circuit_queue(circ, chan, cell, cell_direction, 0);
return 0; return 0;
} }
@ -353,13 +354,13 @@ circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
cell_direction_t cell_direction, cell_direction_t cell_direction,
crypt_path_t *layer_hint, streamid_t on_stream) crypt_path_t *layer_hint, streamid_t on_stream)
{ {
or_connection_t *conn; /* where to send the cell */ channel_t *chan; /* where to send the cell */
if (cell_direction == CELL_DIRECTION_OUT) { if (cell_direction == CELL_DIRECTION_OUT) {
crypt_path_t *thishop; /* counter for repeated crypts */ crypt_path_t *thishop; /* counter for repeated crypts */
conn = circ->n_conn; chan = circ->n_chan;
if (!CIRCUIT_IS_ORIGIN(circ) || !conn) { if (!CIRCUIT_IS_ORIGIN(circ) || !chan) {
log_warn(LD_BUG,"outgoing relay cell has n_conn==NULL. Dropping."); log_warn(LD_BUG,"outgoing relay cell has n_chan==NULL. Dropping.");
return 0; /* just drop it */ return 0; /* just drop it */
} }
@ -388,14 +389,14 @@ circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
return 0; /* just drop it */ return 0; /* just drop it */
} }
or_circ = TO_OR_CIRCUIT(circ); or_circ = TO_OR_CIRCUIT(circ);
conn = or_circ->p_conn; chan = or_circ->p_chan;
relay_set_digest(or_circ->p_digest, cell); relay_set_digest(or_circ->p_digest, cell);
if (relay_crypt_one_payload(or_circ->p_crypto, cell->payload, 1) < 0) if (relay_crypt_one_payload(or_circ->p_crypto, cell->payload, 1) < 0)
return -1; return -1;
} }
++stats_n_relay_cells_relayed; ++stats_n_relay_cells_relayed;
append_cell_to_circuit_queue(circ, conn, cell, cell_direction, on_stream); append_cell_to_circuit_queue(circ, chan, cell, cell_direction, on_stream);
return 0; return 0;
} }
@ -561,9 +562,9 @@ relay_send_command_from_edge(streamid_t stream_id, circuit_t *circ,
geoip_change_dirreq_state(circ->dirreq_id, DIRREQ_TUNNELED, geoip_change_dirreq_state(circ->dirreq_id, DIRREQ_TUNNELED,
DIRREQ_END_CELL_SENT); DIRREQ_END_CELL_SENT);
if (cell_direction == CELL_DIRECTION_OUT && circ->n_conn) { if (cell_direction == CELL_DIRECTION_OUT && circ->n_chan) {
/* if we're using relaybandwidthrate, this conn wants priority */ /* if we're using relaybandwidthrate, this conn wants priority */
circ->n_conn->client_used = approx_time(); channel_timestamp_client(circ->n_chan);
} }
if (cell_direction == CELL_DIRECTION_OUT) { if (cell_direction == CELL_DIRECTION_OUT) {
@ -1095,7 +1096,8 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
* and linked. */ * and linked. */
static uint64_t next_id = 0; static uint64_t next_id = 0;
circ->dirreq_id = ++next_id; circ->dirreq_id = ++next_id;
TO_CONN(TO_OR_CIRCUIT(circ)->p_conn)->dirreq_id = circ->dirreq_id; tor_assert(!(TO_OR_CIRCUIT(circ)->p_chan->is_listener));
TO_OR_CIRCUIT(circ)->p_chan->u.cell_chan.dirreq_id = circ->dirreq_id;
} }
return connection_exit_begin_conn(cell, circ); return connection_exit_begin_conn(cell, circ);
@ -1230,12 +1232,12 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
"'truncate' unsupported at origin. Dropping."); "'truncate' unsupported at origin. Dropping.");
return 0; return 0;
} }
if (circ->n_conn) { if (circ->n_chan) {
uint8_t trunc_reason = get_uint8(cell->payload + RELAY_HEADER_SIZE); uint8_t trunc_reason = get_uint8(cell->payload + RELAY_HEADER_SIZE);
circuit_clear_cell_queue(circ, circ->n_conn); circuit_clear_cell_queue(circ, circ->n_chan);
connection_or_send_destroy(circ->n_circ_id, circ->n_conn, channel_send_destroy(circ->n_circ_id, circ->n_chan,
trunc_reason); trunc_reason);
circuit_set_n_circid_orconn(circ, 0, NULL); circuit_set_n_circid_chan(circ, 0, NULL);
} }
log_debug(LD_EXIT, "Processed 'truncate', replying."); log_debug(LD_EXIT, "Processed 'truncate', replying.");
{ {
@ -1594,10 +1596,10 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
* needed to fill the cell queue. */ * needed to fill the cell queue. */
int max_to_package = circ->package_window; int max_to_package = circ->package_window;
if (CIRCUIT_IS_ORIGIN(circ)) { if (CIRCUIT_IS_ORIGIN(circ)) {
cells_on_queue = circ->n_conn_cells.n; cells_on_queue = circ->n_chan_cells.n;
} else { } else {
or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
cells_on_queue = or_circ->p_conn_cells.n; cells_on_queue = or_circ->p_chan_cells.n;
} }
if (CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue < max_to_package) if (CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue < max_to_package)
max_to_package = CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue; max_to_package = CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue;
@ -1842,6 +1844,13 @@ packed_cell_new(void)
return mp_pool_get(cell_pool); return mp_pool_get(cell_pool);
} }
/** Return a packed cell used outside by channel_t lower layer */
void
packed_cell_free(packed_cell_t *cell)
{
packed_cell_free_unchecked(cell);
}
/** Log current statistics for cell pool allocation at log level /** Log current statistics for cell pool allocation at log level
* <b>severity</b>. */ * <b>severity</b>. */
void void
@ -1851,9 +1860,9 @@ dump_cell_pool_usage(int severity)
int n_circs = 0; int n_circs = 0;
int n_cells = 0; int n_cells = 0;
for (c = _circuit_get_global_list(); c; c = c->next) { for (c = _circuit_get_global_list(); c; c = c->next) {
n_cells += c->n_conn_cells.n; n_cells += c->n_chan_cells.n;
if (!CIRCUIT_IS_ORIGIN(c)) if (!CIRCUIT_IS_ORIGIN(c))
n_cells += TO_OR_CIRCUIT(c)->p_conn_cells.n; n_cells += TO_OR_CIRCUIT(c)->p_chan_cells.n;
++n_circs; ++n_circs;
} }
log(severity, LD_MM, "%d cells allocated on %d circuits. %d cells leaked.", log(severity, LD_MM, "%d cells allocated on %d circuits. %d cells leaked.",
@ -1964,35 +1973,35 @@ cell_queue_pop(cell_queue_t *queue)
return cell; return cell;
} }
/** Return a pointer to the "next_active_on_{n,p}_conn" pointer of <b>circ</b>, /** Return a pointer to the "next_active_on_{n,p}_chan" pointer of <b>circ</b>,
* depending on whether <b>conn</b> matches n_conn or p_conn. */ * depending on whether <b>chan</b> matches n_chan or p_chan. */
static INLINE circuit_t ** static INLINE circuit_t **
next_circ_on_conn_p(circuit_t *circ, or_connection_t *conn) next_circ_on_chan_p(circuit_t *circ, channel_t *chan)
{ {
tor_assert(circ); tor_assert(circ);
tor_assert(conn); tor_assert(chan);
if (conn == circ->n_conn) { if (chan == circ->n_chan) {
return &circ->next_active_on_n_conn; return &circ->next_active_on_n_chan;
} else { } else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
tor_assert(conn == orcirc->p_conn); tor_assert(chan == orcirc->p_chan);
return &orcirc->next_active_on_p_conn; return &orcirc->next_active_on_p_chan;
} }
} }
/** Return a pointer to the "prev_active_on_{n,p}_conn" pointer of <b>circ</b>, /** Return a pointer to the "prev_active_on_{n,p}_chan" pointer of <b>circ</b>,
* depending on whether <b>conn</b> matches n_conn or p_conn. */ * depending on whether <b>chan</b> matches n_chan or p_chan. */
static INLINE circuit_t ** static INLINE circuit_t **
prev_circ_on_conn_p(circuit_t *circ, or_connection_t *conn) prev_circ_on_chan_p(circuit_t *circ, channel_t *chan)
{ {
tor_assert(circ); tor_assert(circ);
tor_assert(conn); tor_assert(chan);
if (conn == circ->n_conn) { if (chan == circ->n_chan) {
return &circ->prev_active_on_n_conn; return &circ->prev_active_on_n_chan;
} else { } else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
tor_assert(conn == orcirc->p_conn); tor_assert(chan == orcirc->p_chan);
return &orcirc->prev_active_on_p_conn; return &orcirc->prev_active_on_p_chan;
} }
} }
@ -2013,7 +2022,7 @@ compare_cell_ewma_counts(const void *p1, const void *p2)
static circuit_t * static circuit_t *
cell_ewma_to_circuit(cell_ewma_t *ewma) cell_ewma_to_circuit(cell_ewma_t *ewma)
{ {
if (ewma->is_for_p_conn) { if (ewma->is_for_p_chan) {
/* This is an or_circuit_t's p_cell_ewma. */ /* This is an or_circuit_t's p_cell_ewma. */
or_circuit_t *orcirc = SUBTYPE_P(ewma, or_circuit_t, p_cell_ewma); or_circuit_t *orcirc = SUBTYPE_P(ewma, or_circuit_t, p_cell_ewma);
return TO_CIRCUIT(orcirc); return TO_CIRCUIT(orcirc);
@ -2162,165 +2171,203 @@ scale_single_cell_ewma(cell_ewma_t *ewma, unsigned cur_tick)
ewma->last_adjusted_tick = cur_tick; ewma->last_adjusted_tick = cur_tick;
} }
/** Adjust the cell count of every active circuit on <b>conn</b> so /** Adjust the cell count of every active circuit on <b>chan</b> so
* that they are scaled with respect to <b>cur_tick</b> */ * that they are scaled with respect to <b>cur_tick</b> */
static void static void
scale_active_circuits(or_connection_t *conn, unsigned cur_tick) scale_active_circuits(channel_t *chan, unsigned cur_tick)
{ {
double factor;
double factor = get_scale_factor( tor_assert(chan);
conn->active_circuit_pqueue_last_recalibrated, tor_assert(!(chan->is_listener));
cur_tick);
factor =
get_scale_factor(
chan->u.cell_chan.active_circuit_pqueue_last_recalibrated,
cur_tick);
/** Ordinarily it isn't okay to change the value of an element in a heap, /** Ordinarily it isn't okay to change the value of an element in a heap,
* but it's okay here, since we are preserving the order. */ * but it's okay here, since we are preserving the order. */
SMARTLIST_FOREACH(conn->active_circuit_pqueue, cell_ewma_t *, e, { SMARTLIST_FOREACH_BEGIN(
chan->u.cell_chan.active_circuit_pqueue,
cell_ewma_t *, e) {
tor_assert(e->last_adjusted_tick == tor_assert(e->last_adjusted_tick ==
conn->active_circuit_pqueue_last_recalibrated); chan->u.cell_chan.active_circuit_pqueue_last_recalibrated);
e->cell_count *= factor; e->cell_count *= factor;
e->last_adjusted_tick = cur_tick; e->last_adjusted_tick = cur_tick;
}); } SMARTLIST_FOREACH_END(e);
conn->active_circuit_pqueue_last_recalibrated = cur_tick; chan->u.cell_chan.active_circuit_pqueue_last_recalibrated = cur_tick;
} }
/** Rescale <b>ewma</b> to the same scale as <b>conn</b>, and add it to /** Rescale <b>ewma</b> to the same scale as <b>chan</b>, and add it to
* <b>conn</b>'s priority queue of active circuits */ * <b>chan</b>'s priority queue of active circuits */
static void static void
add_cell_ewma_to_conn(or_connection_t *conn, cell_ewma_t *ewma) add_cell_ewma_to_chan(channel_t *chan, cell_ewma_t *ewma)
{ {
tor_assert(chan);
tor_assert(!(chan->is_listener));
tor_assert(ewma);
tor_assert(ewma->heap_index == -1); tor_assert(ewma->heap_index == -1);
scale_single_cell_ewma(ewma,
conn->active_circuit_pqueue_last_recalibrated);
smartlist_pqueue_add(conn->active_circuit_pqueue, scale_single_cell_ewma(
ewma,
chan->u.cell_chan.active_circuit_pqueue_last_recalibrated);
smartlist_pqueue_add(chan->u.cell_chan.active_circuit_pqueue,
compare_cell_ewma_counts, compare_cell_ewma_counts,
STRUCT_OFFSET(cell_ewma_t, heap_index), STRUCT_OFFSET(cell_ewma_t, heap_index),
ewma); ewma);
} }
/** Remove <b>ewma</b> from <b>conn</b>'s priority queue of active circuits */ /** Remove <b>ewma</b> from <b>chan</b>'s priority queue of active circuits */
static void static void
remove_cell_ewma_from_conn(or_connection_t *conn, cell_ewma_t *ewma) remove_cell_ewma_from_chan(channel_t *chan, cell_ewma_t *ewma)
{ {
tor_assert(chan);
tor_assert(!(chan->is_listener));
tor_assert(ewma);
tor_assert(ewma->heap_index != -1); tor_assert(ewma->heap_index != -1);
smartlist_pqueue_remove(conn->active_circuit_pqueue,
smartlist_pqueue_remove(chan->u.cell_chan.active_circuit_pqueue,
compare_cell_ewma_counts, compare_cell_ewma_counts,
STRUCT_OFFSET(cell_ewma_t, heap_index), STRUCT_OFFSET(cell_ewma_t, heap_index),
ewma); ewma);
} }
/** Remove and return the first cell_ewma_t from conn's priority queue of /** Remove and return the first cell_ewma_t from chan's priority queue of
* active circuits. Requires that the priority queue is nonempty. */ * active circuits. Requires that the priority queue is nonempty. */
static cell_ewma_t * static cell_ewma_t *
pop_first_cell_ewma_from_conn(or_connection_t *conn) pop_first_cell_ewma_from_chan(channel_t *chan)
{ {
return smartlist_pqueue_pop(conn->active_circuit_pqueue, tor_assert(chan);
tor_assert(!(chan->is_listener));
return smartlist_pqueue_pop(chan->u.cell_chan.active_circuit_pqueue,
compare_cell_ewma_counts, compare_cell_ewma_counts,
STRUCT_OFFSET(cell_ewma_t, heap_index)); STRUCT_OFFSET(cell_ewma_t, heap_index));
} }
/** Add <b>circ</b> to the list of circuits with pending cells on /** Add <b>circ</b> to the list of circuits with pending cells on
* <b>conn</b>. No effect if <b>circ</b> is already linked. */ * <b>chan</b>. No effect if <b>circ</b> is already linked. */
void void
make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn) make_circuit_active_on_chan(circuit_t *circ, channel_t *chan)
{ {
circuit_t **nextp = next_circ_on_conn_p(circ, conn); circuit_t **nextp = NULL, **prevp = NULL;
circuit_t **prevp = prev_circ_on_conn_p(circ, conn);
tor_assert(chan);
tor_assert(!(chan->is_listener));
tor_assert(circ);
nextp = next_circ_on_chan_p(circ, chan);
prevp = prev_circ_on_chan_p(circ, chan);
if (*nextp && *prevp) { if (*nextp && *prevp) {
/* Already active. */ /* Already active. */
return; return;
} }
assert_active_circuits_ok_paranoid(conn); assert_active_circuits_ok_paranoid(chan);
if (! conn->active_circuits) { if (!(chan->u.cell_chan.active_circuits)) {
conn->active_circuits = circ; chan->u.cell_chan.active_circuits = circ;
*prevp = *nextp = circ; *prevp = *nextp = circ;
} else { } else {
circuit_t *head = conn->active_circuits; circuit_t *head = chan->u.cell_chan.active_circuits;
circuit_t *old_tail = *prev_circ_on_conn_p(head, conn); circuit_t *old_tail = *prev_circ_on_chan_p(head, chan);
*next_circ_on_conn_p(old_tail, conn) = circ; *next_circ_on_chan_p(old_tail, chan) = circ;
*nextp = head; *nextp = head;
*prev_circ_on_conn_p(head, conn) = circ; *prev_circ_on_chan_p(head, chan) = circ;
*prevp = old_tail; *prevp = old_tail;
} }
if (circ->n_conn == conn) { if (circ->n_chan == chan) {
add_cell_ewma_to_conn(conn, &circ->n_cell_ewma); add_cell_ewma_to_chan(chan, &circ->n_cell_ewma);
} else { } else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
tor_assert(conn == orcirc->p_conn); tor_assert(chan == orcirc->p_chan);
add_cell_ewma_to_conn(conn, &orcirc->p_cell_ewma); add_cell_ewma_to_chan(chan, &orcirc->p_cell_ewma);
} }
assert_active_circuits_ok_paranoid(conn); assert_active_circuits_ok_paranoid(chan);
} }
/** Remove <b>circ</b> from the list of circuits with pending cells on /** Remove <b>circ</b> from the list of circuits with pending cells on
* <b>conn</b>. No effect if <b>circ</b> is already unlinked. */ * <b>chan</b>. No effect if <b>circ</b> is already unlinked. */
void void
make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn) make_circuit_inactive_on_chan(circuit_t *circ, channel_t *chan)
{ {
circuit_t **nextp = next_circ_on_conn_p(circ, conn); circuit_t **nextp = NULL, **prevp = NULL;
circuit_t **prevp = prev_circ_on_conn_p(circ, conn); circuit_t *next = NULL, *prev = NULL;
circuit_t *next = *nextp, *prev = *prevp;
tor_assert(chan);
tor_assert(!(chan->is_listener));
tor_assert(circ);
nextp = next_circ_on_chan_p(circ, chan);
prevp = prev_circ_on_chan_p(circ, chan);
next = *nextp;
prev = *prevp;
if (!next && !prev) { if (!next && !prev) {
/* Already inactive. */ /* Already inactive. */
return; return;
} }
assert_active_circuits_ok_paranoid(conn); assert_active_circuits_ok_paranoid(chan);
tor_assert(next && prev); tor_assert(next && prev);
tor_assert(*prev_circ_on_conn_p(next, conn) == circ); tor_assert(*prev_circ_on_chan_p(next, chan) == circ);
tor_assert(*next_circ_on_conn_p(prev, conn) == circ); tor_assert(*next_circ_on_chan_p(prev, chan) == circ);
if (next == circ) { if (next == circ) {
conn->active_circuits = NULL; chan->u.cell_chan.active_circuits = NULL;
} else { } else {
*prev_circ_on_conn_p(next, conn) = prev; *prev_circ_on_chan_p(next, chan) = prev;
*next_circ_on_conn_p(prev, conn) = next; *next_circ_on_chan_p(prev, chan) = next;
if (conn->active_circuits == circ) if (chan->u.cell_chan.active_circuits == circ)
conn->active_circuits = next; chan->u.cell_chan.active_circuits = next;
} }
*prevp = *nextp = NULL; *prevp = *nextp = NULL;
if (circ->n_conn == conn) { if (circ->n_chan == chan) {
remove_cell_ewma_from_conn(conn, &circ->n_cell_ewma); remove_cell_ewma_from_chan(chan, &circ->n_cell_ewma);
} else { } else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
tor_assert(conn == orcirc->p_conn); tor_assert(chan == orcirc->p_chan);
remove_cell_ewma_from_conn(conn, &orcirc->p_cell_ewma); remove_cell_ewma_from_chan(chan, &orcirc->p_cell_ewma);
} }
assert_active_circuits_ok_paranoid(conn); assert_active_circuits_ok_paranoid(chan);
} }
/** Remove all circuits from the list of circuits with pending cells on /** Remove all circuits from the list of circuits with pending cells on
* <b>conn</b>. */ * <b>chan</b>. */
void void
connection_or_unlink_all_active_circs(or_connection_t *orconn) channel_unlink_all_active_circs(channel_t *chan)
{ {
circuit_t *head = orconn->active_circuits; circuit_t *head = NULL, *cur = NULL;
circuit_t *cur = head;
tor_assert(chan);
tor_assert(!(chan->is_listener));
cur = head = chan->u.cell_chan.active_circuits;
if (! head) if (! head)
return; return;
do { do {
circuit_t *next = *next_circ_on_conn_p(cur, orconn); circuit_t *next = *next_circ_on_chan_p(cur, chan);
*prev_circ_on_conn_p(cur, orconn) = NULL; *prev_circ_on_chan_p(cur, chan) = NULL;
*next_circ_on_conn_p(cur, orconn) = NULL; *next_circ_on_chan_p(cur, chan) = NULL;
cur = next; cur = next;
} while (cur != head); } while (cur != head);
orconn->active_circuits = NULL; chan->u.cell_chan.active_circuits = NULL;
SMARTLIST_FOREACH(orconn->active_circuit_pqueue, cell_ewma_t *, e, SMARTLIST_FOREACH(chan->u.cell_chan.active_circuit_pqueue,
cell_ewma_t *, e,
e->heap_index = -1); e->heap_index = -1);
smartlist_clear(orconn->active_circuit_pqueue); smartlist_clear(chan->u.cell_chan.active_circuit_pqueue);
} }
/** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false) /** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
* every edge connection that is using <b>circ</b> to write to <b>orconn</b>, * every edge connection that is using <b>circ</b> to write to <b>chan</b>,
* and start or stop reading as appropriate. * and start or stop reading as appropriate.
* *
* If <b>stream_id</b> is nonzero, block only the edge connection whose * If <b>stream_id</b> is nonzero, block only the edge connection whose
@ -2329,17 +2376,17 @@ connection_or_unlink_all_active_circs(or_connection_t *orconn)
* Returns the number of streams whose status we changed. * Returns the number of streams whose status we changed.
*/ */
static int static int
set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn, set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan,
int block, streamid_t stream_id) int block, streamid_t stream_id)
{ {
edge_connection_t *edge = NULL; edge_connection_t *edge = NULL;
int n = 0; int n = 0;
if (circ->n_conn == orconn) { if (circ->n_chan == chan) {
circ->streams_blocked_on_n_conn = block; circ->streams_blocked_on_n_chan = block;
if (CIRCUIT_IS_ORIGIN(circ)) if (CIRCUIT_IS_ORIGIN(circ))
edge = TO_ORIGIN_CIRCUIT(circ)->p_streams; edge = TO_ORIGIN_CIRCUIT(circ)->p_streams;
} else { } else {
circ->streams_blocked_on_p_conn = block; circ->streams_blocked_on_p_chan = block;
tor_assert(!CIRCUIT_IS_ORIGIN(circ)); tor_assert(!CIRCUIT_IS_ORIGIN(circ));
edge = TO_OR_CIRCUIT(circ)->n_streams; edge = TO_OR_CIRCUIT(circ)->n_streams;
} }
@ -2374,12 +2421,11 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
} }
/** Pull as many cells as possible (but no more than <b>max</b>) from the /** Pull as many cells as possible (but no more than <b>max</b>) from the
* queue of the first active circuit on <b>conn</b>, and write them to * queue of the first active circuit on <b>chan</b>, and write them to
* <b>conn</b>-&gt;outbuf. Return the number of cells written. Advance * <b>chan</b>-&gt;outbuf. Return the number of cells written. Advance
* the active circuit pointer to the next active circuit in the ring. */ * the active circuit pointer to the next active circuit in the ring. */
int int
connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max, channel_flush_from_first_active_circuit(channel_t *chan, int max)
time_t now)
{ {
int n_flushed; int n_flushed;
cell_queue_t *queue; cell_queue_t *queue;
@ -2393,9 +2439,12 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
cell_ewma_t *cell_ewma = NULL; cell_ewma_t *cell_ewma = NULL;
double ewma_increment = -1; double ewma_increment = -1;
circ = conn->active_circuits; tor_assert(chan);
tor_assert(!(chan->is_listener));
circ = chan->u.cell_chan.active_circuits;
if (!circ) return 0; if (!circ) return 0;
assert_active_circuits_ok_paranoid(conn); assert_active_circuits_ok_paranoid(chan);
/* See if we're doing the ewma circuit selection algorithm. */ /* See if we're doing the ewma circuit selection algorithm. */
if (ewma_enabled) { if (ewma_enabled) {
@ -2404,28 +2453,28 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
tor_gettimeofday_cached(&now_hires); tor_gettimeofday_cached(&now_hires);
tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick); tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick);
if (tick != conn->active_circuit_pqueue_last_recalibrated) { if (tick != chan->u.cell_chan.active_circuit_pqueue_last_recalibrated) {
scale_active_circuits(conn, tick); scale_active_circuits(chan, tick);
} }
ewma_increment = pow(ewma_scale_factor, -fractional_tick); ewma_increment = pow(ewma_scale_factor, -fractional_tick);
cell_ewma = smartlist_get(conn->active_circuit_pqueue, 0); cell_ewma = smartlist_get(chan->u.cell_chan.active_circuit_pqueue, 0);
circ = cell_ewma_to_circuit(cell_ewma); circ = cell_ewma_to_circuit(cell_ewma);
} }
if (circ->n_conn == conn) { if (circ->n_chan == chan) {
queue = &circ->n_conn_cells; queue = &circ->n_chan_cells;
streams_blocked = circ->streams_blocked_on_n_conn; streams_blocked = circ->streams_blocked_on_n_chan;
} else { } else {
queue = &TO_OR_CIRCUIT(circ)->p_conn_cells; queue = &TO_OR_CIRCUIT(circ)->p_chan_cells;
streams_blocked = circ->streams_blocked_on_p_conn; streams_blocked = circ->streams_blocked_on_p_chan;
} }
tor_assert(*next_circ_on_conn_p(circ,conn)); tor_assert(*next_circ_on_chan_p(circ, chan));
for (n_flushed = 0; n_flushed < max && queue->head; ) { for (n_flushed = 0; n_flushed < max && queue->head; ) {
packed_cell_t *cell = cell_queue_pop(queue); packed_cell_t *cell = cell_queue_pop(queue);
tor_assert(*next_circ_on_conn_p(circ,conn)); tor_assert(*next_circ_on_chan_p(circ, chan));
/* Calculate the exact time that this cell has spent in the queue. */ /* Calculate the exact time that this cell has spent in the queue. */
if (get_options()->CellStatistics && !CIRCUIT_IS_ORIGIN(circ)) { if (get_options()->CellStatistics && !CIRCUIT_IS_ORIGIN(circ)) {
@ -2462,14 +2511,18 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
/* If we just flushed our queue and this circuit is used for a /* If we just flushed our queue and this circuit is used for a
* tunneled directory request, possibly advance its state. */ * tunneled directory request, possibly advance its state. */
if (queue->n == 0 && TO_CONN(conn)->dirreq_id) if (queue->n == 0 && chan->u.cell_chan.dirreq_id)
geoip_change_dirreq_state(TO_CONN(conn)->dirreq_id, geoip_change_dirreq_state(chan->u.cell_chan.dirreq_id,
DIRREQ_TUNNELED, DIRREQ_TUNNELED,
DIRREQ_CIRC_QUEUE_FLUSHED); DIRREQ_CIRC_QUEUE_FLUSHED);
connection_write_to_buf(cell->body, CELL_NETWORK_SIZE, TO_CONN(conn)); channel_write_packed_cell(chan, cell);
/*
* Don't packed_cell_free_unchecked(cell) here because the channel will
* do so when it gets out of the channel queue (probably already did, in
* which case that was an immediate double-free bug).
*/
packed_cell_free_unchecked(cell);
++n_flushed; ++n_flushed;
if (cell_ewma) { if (cell_ewma) {
cell_ewma_t *tmp; cell_ewma_t *tmp;
@ -2477,44 +2530,43 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
/* We pop and re-add the cell_ewma_t here, not above, since we need to /* We pop and re-add the cell_ewma_t here, not above, since we need to
* re-add it immediately to keep the priority queue consistent with * re-add it immediately to keep the priority queue consistent with
* the linked-list implementation */ * the linked-list implementation */
tmp = pop_first_cell_ewma_from_conn(conn); tmp = pop_first_cell_ewma_from_chan(chan);
tor_assert(tmp == cell_ewma); tor_assert(tmp == cell_ewma);
add_cell_ewma_to_conn(conn, cell_ewma); add_cell_ewma_to_chan(chan, cell_ewma);
} }
if (!ewma_enabled && circ != conn->active_circuits) { if (!ewma_enabled && circ != chan->u.cell_chan.active_circuits) {
/* If this happens, the current circuit just got made inactive by /* If this happens, the current circuit just got made inactive by
* a call in connection_write_to_buf(). That's nothing to worry about: * a call in connection_write_to_buf(). That's nothing to worry about:
* circuit_make_inactive_on_conn() already advanced conn->active_circuits * circuit_make_inactive_on_conn() already advanced chan->active_circuits
* for us. * for us.
*/ */
assert_active_circuits_ok_paranoid(conn); assert_active_circuits_ok_paranoid(chan);
goto done; goto done;
} }
} }
tor_assert(*next_circ_on_conn_p(circ,conn)); tor_assert(*next_circ_on_chan_p(circ, chan));
assert_active_circuits_ok_paranoid(conn); assert_active_circuits_ok_paranoid(chan);
conn->active_circuits = *next_circ_on_conn_p(circ, conn); chan->u.cell_chan.active_circuits = *next_circ_on_chan_p(circ, chan);
/* Is the cell queue low enough to unblock all the streams that are waiting /* Is the cell queue low enough to unblock all the streams that are waiting
* to write to this circuit? */ * to write to this circuit? */
if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE) if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
set_streams_blocked_on_circ(circ, conn, 0, 0); /* unblock streams */ set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */
/* Did we just run out of cells on this circuit's queue? */ /* Did we just run out of cells on this circuit's queue? */
if (queue->n == 0) { if (queue->n == 0) {
log_debug(LD_GENERAL, "Made a circuit inactive."); log_debug(LD_GENERAL, "Made a circuit inactive.");
make_circuit_inactive_on_conn(circ, conn); make_circuit_inactive_on_chan(circ, chan);
} }
done: done:
if (n_flushed)
conn->timestamp_last_added_nonpadding = now;
return n_flushed; return n_flushed;
} }
/** Add <b>cell</b> to the queue of <b>circ</b> writing to <b>orconn</b> /** Add <b>cell</b> to the queue of <b>circ</b> writing to <b>chan</b>
* transmitting in <b>direction</b>. */ * transmitting in <b>direction</b>. */
void void
append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn, append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
cell_t *cell, cell_direction_t direction, cell_t *cell, cell_direction_t direction,
streamid_t fromstream) streamid_t fromstream)
{ {
@ -2524,12 +2576,12 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
return; return;
if (direction == CELL_DIRECTION_OUT) { if (direction == CELL_DIRECTION_OUT) {
queue = &circ->n_conn_cells; queue = &circ->n_chan_cells;
streams_blocked = circ->streams_blocked_on_n_conn; streams_blocked = circ->streams_blocked_on_n_chan;
} else { } else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
queue = &orcirc->p_conn_cells; queue = &orcirc->p_chan_cells;
streams_blocked = circ->streams_blocked_on_p_conn; streams_blocked = circ->streams_blocked_on_p_chan;
} }
cell_queue_append_packed_copy(queue, cell); cell_queue_append_packed_copy(queue, cell);
@ -2537,27 +2589,27 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
/* If we have too many cells on the circuit, we should stop reading from /* If we have too many cells on the circuit, we should stop reading from
* the edge streams for a while. */ * the edge streams for a while. */
if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE) if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE)
set_streams_blocked_on_circ(circ, orconn, 1, 0); /* block streams */ set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */
if (streams_blocked && fromstream) { if (streams_blocked && fromstream) {
/* This edge connection is apparently not blocked; block it. */ /* This edge connection is apparently not blocked; block it. */
set_streams_blocked_on_circ(circ, orconn, 1, fromstream); set_streams_blocked_on_circ(circ, chan, 1, fromstream);
} }
if (queue->n == 1) { if (queue->n == 1) {
/* This was the first cell added to the queue. We need to make this /* This was the first cell added to the queue. We need to make this
* circuit active. */ * circuit active. */
log_debug(LD_GENERAL, "Made a circuit active."); log_debug(LD_GENERAL, "Made a circuit active.");
make_circuit_active_on_conn(circ, orconn); make_circuit_active_on_chan(circ, chan);
} }
if (! connection_get_outbuf_len(TO_CONN(orconn))) { if (!channel_has_queued_writes(chan)) {
/* There is no data at all waiting to be sent on the outbuf. Add a /* There is no data at all waiting to be sent on the outbuf. Add a
* cell, so that we can notice when it gets flushed, flushed_some can * cell, so that we can notice when it gets flushed, flushed_some can
* get called, and we can start putting more data onto the buffer then. * get called, and we can start putting more data onto the buffer then.
*/ */
log_debug(LD_GENERAL, "Primed a buffer."); log_debug(LD_GENERAL, "Primed a buffer.");
connection_or_flush_from_first_active_circuit(orconn, 1, approx_time()); channel_flush_from_first_active_circuit(chan, 1);
} }
} }
@ -2621,21 +2673,21 @@ decode_address_from_payload(tor_addr_t *addr_out, const uint8_t *payload,
return payload + 2 + payload[1]; return payload + 2 + payload[1];
} }
/** Remove all the cells queued on <b>circ</b> for <b>orconn</b>. */ /** Remove all the cells queued on <b>circ</b> for <b>chan</b>. */
void void
circuit_clear_cell_queue(circuit_t *circ, or_connection_t *orconn) circuit_clear_cell_queue(circuit_t *circ, channel_t *chan)
{ {
cell_queue_t *queue; cell_queue_t *queue;
if (circ->n_conn == orconn) { if (circ->n_chan == chan) {
queue = &circ->n_conn_cells; queue = &circ->n_chan_cells;
} else { } else {
or_circuit_t *orcirc = TO_OR_CIRCUIT(circ); or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
tor_assert(orcirc->p_conn == orconn); tor_assert(orcirc->p_chan == chan);
queue = &orcirc->p_conn_cells; queue = &orcirc->p_chan_cells;
} }
if (queue->n) if (queue->n)
make_circuit_inactive_on_conn(circ,orconn); make_circuit_inactive_on_chan(circ, chan);
cell_queue_clear(queue); cell_queue_clear(queue);
} }
@ -2643,36 +2695,41 @@ circuit_clear_cell_queue(circuit_t *circ, or_connection_t *orconn)
/** Fail with an assert if the active circuits ring on <b>orconn</b> is /** Fail with an assert if the active circuits ring on <b>orconn</b> is
* corrupt. */ * corrupt. */
void void
assert_active_circuits_ok(or_connection_t *orconn) assert_active_circuits_ok(channel_t *chan)
{ {
circuit_t *head = orconn->active_circuits; circuit_t *head = NULL, *cur = NULL;
circuit_t *cur = head;
int n = 0; int n = 0;
tor_assert(chan);
tor_assert(!(chan->is_listener));
cur = head = chan->u.cell_chan.active_circuits;
if (! head) if (! head)
return; return;
do { do {
circuit_t *next = *next_circ_on_conn_p(cur, orconn); circuit_t *next = *next_circ_on_chan_p(cur, chan);
circuit_t *prev = *prev_circ_on_conn_p(cur, orconn); circuit_t *prev = *prev_circ_on_chan_p(cur, chan);
cell_ewma_t *ewma; cell_ewma_t *ewma;
tor_assert(next); tor_assert(next);
tor_assert(prev); tor_assert(prev);
tor_assert(*next_circ_on_conn_p(prev, orconn) == cur); tor_assert(*next_circ_on_chan_p(prev, chan) == cur);
tor_assert(*prev_circ_on_conn_p(next, orconn) == cur); tor_assert(*prev_circ_on_chan_p(next, chan) == cur);
if (orconn == cur->n_conn) { if (chan == cur->n_chan) {
ewma = &cur->n_cell_ewma; ewma = &cur->n_cell_ewma;
tor_assert(!ewma->is_for_p_conn); tor_assert(!ewma->is_for_p_chan);
} else { } else {
ewma = &TO_OR_CIRCUIT(cur)->p_cell_ewma; ewma = &TO_OR_CIRCUIT(cur)->p_cell_ewma;
tor_assert(ewma->is_for_p_conn); tor_assert(ewma->is_for_p_chan);
} }
tor_assert(ewma->heap_index != -1); tor_assert(ewma->heap_index != -1);
tor_assert(ewma == smartlist_get(orconn->active_circuit_pqueue, tor_assert(ewma == smartlist_get(chan->u.cell_chan.active_circuit_pqueue,
ewma->heap_index)); ewma->heap_index));
n++; n++;
cur = next; cur = next;
} while (cur != head); } while (cur != head);
tor_assert(n == smartlist_len(orconn->active_circuit_pqueue)); tor_assert(n == smartlist_len(chan->u.cell_chan.active_circuit_pqueue));
} }
/** Return 1 if we shouldn't restart reading on this circuit, even if /** Return 1 if we shouldn't restart reading on this circuit, even if
@ -2682,9 +2739,9 @@ static int
circuit_queue_streams_are_blocked(circuit_t *circ) circuit_queue_streams_are_blocked(circuit_t *circ)
{ {
if (CIRCUIT_IS_ORIGIN(circ)) { if (CIRCUIT_IS_ORIGIN(circ)) {
return circ->streams_blocked_on_n_conn; return circ->streams_blocked_on_n_chan;
} else { } else {
return circ->streams_blocked_on_p_conn; return circ->streams_blocked_on_p_chan;
} }
} }

View file

@ -41,6 +41,9 @@ void free_cell_pool(void);
void clean_cell_pool(void); void clean_cell_pool(void);
void dump_cell_pool_usage(int severity); void dump_cell_pool_usage(int severity);
/* For channeltls.c */
void packed_cell_free(packed_cell_t *cell);
void cell_queue_clear(cell_queue_t *queue); void cell_queue_clear(cell_queue_t *queue);
void cell_queue_append(cell_queue_t *queue, packed_cell_t *cell); void cell_queue_append(cell_queue_t *queue, packed_cell_t *cell);
void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell); void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell);
@ -49,9 +52,8 @@ void append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
cell_t *cell, cell_direction_t direction, cell_t *cell, cell_direction_t direction,
streamid_t fromstream); streamid_t fromstream);
void channel_unlink_all_active_circs(channel_t *chan); void channel_unlink_all_active_circs(channel_t *chan);
int channel_flush_from_first_active_circuit(channel_t *conn, int channel_flush_from_first_active_circuit(channel_t *chan, int max);
int max, time_t now); void assert_active_circuits_ok(channel_t *chan);
void assert_active_circuits_ok(or_connection_t *orconn);
void make_circuit_inactive_on_chan(circuit_t *circ, channel_t *chan); void make_circuit_inactive_on_chan(circuit_t *circ, channel_t *chan);
void make_circuit_active_on_chan(circuit_t *circ, channel_t *chan); void make_circuit_active_on_chan(circuit_t *circ, channel_t *chan);