Merge branch 'tor-gitlab/mr/444'

This commit is contained in:
David Goulet 2021-10-04 10:49:27 -04:00
commit 1873d4c14c
29 changed files with 2016 additions and 109 deletions

3
changes/ticket40450 Normal file
View file

@ -0,0 +1,3 @@
o Major features (congestion control):
- Implement support for flow control over congestion controlled circuits.
This work comes from proposal 324. Closes ticket 40450.

View file

@ -27,6 +27,8 @@
#include "core/or/channel.h"
#include "core/or/channelpadding.h"
#include "core/or/circuitpadding.h"
#include "core/or/congestion_control_common.h"
#include "core/or/congestion_control_flow.h"
#include "core/or/circuitlist.h"
#include "core/or/command.h"
#include "core/or/connection_or.h"
@ -630,6 +632,8 @@ tor_init(int argc, char *argv[])
* until we get a consensus */
channelpadding_new_consensus_params(NULL);
circpad_new_consensus_params(NULL);
congestion_control_new_consensus_params(NULL);
flow_control_new_consensus_params(NULL);
/* Initialize circuit padding to defaults+torrc until we get a consensus */
circpad_machines_init();

View file

@ -117,6 +117,7 @@
#include "lib/cc/ctassert.h"
#include "lib/sandbox/sandbox.h"
#include "lib/net/buffers_net.h"
#include "lib/net/address.h"
#include "lib/tls/tortls.h"
#include "lib/evloop/compat_libevent.h"
#include "lib/compress/compress.h"
@ -146,6 +147,8 @@
#include "feature/nodelist/routerinfo_st.h"
#include "core/or/socks_request_st.h"
#include "core/or/congestion_control_flow.h"
/**
* On Windows and Linux we cannot reliably bind() a socket to an
* address and port if: 1) There's already a socket bound to wildcard
@ -612,6 +615,11 @@ entry_connection_new(int type, int socket_family)
entry_conn->entry_cfg.ipv4_traffic = 1;
else if (socket_family == AF_INET6)
entry_conn->entry_cfg.ipv6_traffic = 1;
/* Initialize the read token bucket to the maximum value which is the same as
* no rate limiting. */
token_bucket_rw_init(&ENTRY_TO_EDGE_CONN(entry_conn)->bucket, INT32_MAX,
INT32_MAX, monotime_coarse_get_stamp());
return entry_conn;
}
@ -623,6 +631,10 @@ edge_connection_new(int type, int socket_family)
edge_connection_t *edge_conn = tor_malloc_zero(sizeof(edge_connection_t));
tor_assert(type == CONN_TYPE_EXIT);
connection_init(time(NULL), TO_CONN(edge_conn), type, socket_family);
/* Initialize the read token bucket to the maximum value which is the same as
* no rate limiting. */
token_bucket_rw_init(&edge_conn->bucket, INT32_MAX, INT32_MAX,
monotime_coarse_get_stamp());
return edge_conn;
}
@ -3457,6 +3469,19 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
base = get_cell_network_size(or_conn->wide_circ_ids);
}
/* Edge connection have their own read bucket due to flow control being able
* to set a rate limit for them. However, for exit connections, we still need
* to honor the global bucket as well. */
if (CONN_IS_EDGE(conn)) {
const edge_connection_t *edge_conn = CONST_TO_EDGE_CONN(conn);
conn_bucket = token_bucket_rw_get_read(&edge_conn->bucket);
if (conn->type == CONN_TYPE_EXIT) {
/* Decide between our limit and the global one. */
goto end;
}
return conn_bucket;
}
if (!connection_is_rate_limited(conn)) {
/* be willing to read on local conns even if our buckets are empty */
return conn_bucket>=0 ? conn_bucket : 1<<14;
@ -3467,6 +3492,7 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
global_bucket_val = MIN(global_bucket_val, relayed);
}
end:
return connection_bucket_get_share(base, priority,
global_bucket_val, conn_bucket);
}
@ -3644,6 +3670,13 @@ connection_buckets_decrement(connection_t *conn, time_t now,
record_num_bytes_transferred_impl(conn, now, num_read, num_written);
/* Edge connection need to decrement the read side of the bucket used by our
* congestion control. */
if (CONN_IS_EDGE(conn) && num_read > 0) {
edge_connection_t *edge_conn = TO_EDGE_CONN(conn);
token_bucket_rw_dec(&edge_conn->bucket, num_read, 0);
}
if (!connection_is_rate_limited(conn))
return; /* local IPs are free */
@ -3697,14 +3730,16 @@ connection_write_bw_exhausted(connection_t *conn, bool is_global_bw)
void
connection_consider_empty_read_buckets(connection_t *conn)
{
int is_global = 1;
const char *reason;
if (!connection_is_rate_limited(conn))
if (CONN_IS_EDGE(conn) &&
token_bucket_rw_get_read(&TO_EDGE_CONN(conn)->bucket) <= 0) {
reason = "edge connection read bucket exhausted. Pausing.";
is_global = false;
} else if (!connection_is_rate_limited(conn)) {
return; /* Always okay. */
int is_global = 1;
if (token_bucket_rw_get_read(&global_bucket) <= 0) {
} else if (token_bucket_rw_get_read(&global_bucket) <= 0) {
reason = "global read bucket exhausted. Pausing.";
} else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
token_bucket_rw_get_read(&global_relayed_bucket) <= 0) {
@ -3714,8 +3749,9 @@ connection_consider_empty_read_buckets(connection_t *conn)
token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) <= 0) {
reason = "connection read bucket exhausted. Pausing.";
is_global = false;
} else
} else {
return; /* all good, no need to stop it */
}
LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
connection_read_bw_exhausted(conn, is_global);
@ -3819,6 +3855,10 @@ connection_bucket_refill_single(connection_t *conn, uint32_t now_ts)
or_connection_t *or_conn = TO_OR_CONN(conn);
token_bucket_rw_refill(&or_conn->bucket, now_ts);
}
if (CONN_IS_EDGE(conn)) {
token_bucket_rw_refill(&TO_EDGE_CONN(conn)->bucket, now_ts);
}
}
/**
@ -4556,9 +4596,9 @@ connection_handle_write_impl(connection_t *conn, int force)
!dont_stop_writing) { /* it's done flushing */
if (connection_finished_flushing(conn) < 0) {
/* already marked */
return -1;
goto err;
}
return 0;
goto done;
}
/* Call even if result is 0, since the global write bucket may
@ -4568,7 +4608,17 @@ connection_handle_write_impl(connection_t *conn, int force)
if (n_read > 0 && connection_is_reading(conn))
connection_consider_empty_read_buckets(conn);
done:
/* If this is an edge connection with congestion control, check to see
* if it is time to send an xon */
if (conn_uses_flow_control(conn)) {
flow_control_decide_xon(TO_EDGE_CONN(conn), n_written);
}
return 0;
err:
return -1;
}
/* DOCDOC connection_handle_write */

View file

@ -641,6 +641,13 @@ connection_start_reading,(connection_t *conn))
if (connection_should_read_from_linked_conn(conn))
connection_start_reading_from_linked_conn(conn);
} else {
if (CONN_IS_EDGE(conn) && TO_EDGE_CONN(conn)->xoff_received) {
/* We should not get called here if we're waiting for an XON, but
* belt-and-suspenders */
log_notice(LD_NET,
"Request to start reading on an edgeconn blocked with XOFF");
return;
}
if (event_add(conn->read_event, NULL))
log_warn(LD_NET, "Error from libevent setting read event state for %d "
"to watched: %s",

View file

@ -64,6 +64,7 @@
#include "trunnel/netinfo.h"
#include "core/or/channelpadding.h"
#include "core/or/extendinfo.h"
#include "core/or/congestion_control_common.h"
#include "core/or/cell_st.h"
#include "core/or/cell_queue_st.h"
@ -793,7 +794,7 @@ channel_tls_num_cells_writeable_method(channel_t *chan)
cell_network_size = get_cell_network_size(tlschan->conn->wide_circ_ids);
outbuf_len = connection_get_outbuf_len(TO_CONN(tlschan->conn));
/* Get the number of cells */
n = CEIL_DIV(OR_CONN_HIGHWATER - outbuf_len, cell_network_size);
n = CEIL_DIV(or_conn_highwatermark() - outbuf_len, cell_network_size);
if (n < 0) n = 0;
#if SIZEOF_SIZE_T > SIZEOF_INT
if (n > INT_MAX) n = INT_MAX;

View file

@ -2602,6 +2602,7 @@ circuits_handle_oom(size_t current_allocation)
size_t mem_recovered=0;
int n_circuits_killed=0;
int n_dirconns_killed=0;
int n_edgeconns_killed = 0;
uint32_t now_ts;
log_notice(LD_GENERAL, "We're low on memory (cell queues total alloc:"
" %"TOR_PRIuSZ" buffer total alloc: %" TOR_PRIuSZ ","
@ -2668,12 +2669,19 @@ circuits_handle_oom(size_t current_allocation)
if (conn_age < circ->age_tmp) {
break;
}
if (conn->type == CONN_TYPE_DIR && conn->linked_conn == NULL) {
/* Also consider edge connections so we don't accumulate bytes on the
* outbuf due to a malicious destination holding off the read on us. */
if ((conn->type == CONN_TYPE_DIR && conn->linked_conn == NULL) ||
CONN_IS_EDGE(conn)) {
if (!conn->marked_for_close)
connection_mark_for_close(conn);
mem_recovered += single_conn_free_bytes(conn);
++n_dirconns_killed;
if (conn->type == CONN_TYPE_DIR) {
++n_dirconns_killed;
} else {
++n_edgeconns_killed;
}
if (mem_recovered >= mem_to_recover)
goto done_recovering_mem;
@ -2703,11 +2711,12 @@ circuits_handle_oom(size_t current_allocation)
done_recovering_mem:
log_notice(LD_GENERAL, "Removed %"TOR_PRIuSZ" bytes by killing %d circuits; "
"%d circuits remain alive. Also killed %d non-linked directory "
"connections.",
"connections. Killed %d edge connections",
mem_recovered,
n_circuits_killed,
smartlist_len(circlist) - n_circuits_killed,
n_dirconns_killed);
n_dirconns_killed,
n_edgeconns_killed);
return mem_recovered;
}

View file

@ -45,7 +45,10 @@
/*** EWMA parameter #defines ***/
/** How long does a tick last (seconds)? */
#define EWMA_TICK_LEN 10
#define EWMA_TICK_LEN_DEFAULT 10
#define EWMA_TICK_LEN_MIN 1
#define EWMA_TICK_LEN_MAX 600
static int ewma_tick_len = EWMA_TICK_LEN_DEFAULT;
/** The default per-tick scale factor, if it hasn't been overridden by a
* consensus or a configuration setting. zero means "disabled". */
@ -148,7 +151,7 @@ cell_ewma_get_tick(void)
monotime_coarse_get(&now);
int32_t msec_diff = monotime_coarse_diff_msec32(&start_of_current_tick,
&now);
return current_tick_num + msec_diff / (1000*EWMA_TICK_LEN);
return current_tick_num + msec_diff / (1000*ewma_tick_len);
}
/**
@ -527,15 +530,15 @@ cell_ewma_get_current_tick_and_fraction(double *remainder_out)
monotime_coarse_get(&now);
int32_t msec_diff = monotime_coarse_diff_msec32(&start_of_current_tick,
&now);
if (msec_diff > (1000*EWMA_TICK_LEN)) {
unsigned ticks_difference = msec_diff / (1000*EWMA_TICK_LEN);
if (msec_diff > (1000*ewma_tick_len)) {
unsigned ticks_difference = msec_diff / (1000*ewma_tick_len);
monotime_coarse_add_msec(&start_of_current_tick,
&start_of_current_tick,
ticks_difference * 1000 * EWMA_TICK_LEN);
ticks_difference * 1000 * ewma_tick_len);
current_tick_num += ticks_difference;
msec_diff %= 1000*EWMA_TICK_LEN;
msec_diff %= 1000*ewma_tick_len;
}
*remainder_out = ((double)msec_diff) / (1.0e3 * EWMA_TICK_LEN);
*remainder_out = ((double)msec_diff) / (1.0e3 * ewma_tick_len);
return current_tick_num;
}
@ -605,15 +608,20 @@ cmux_ewma_set_options(const or_options_t *options,
/* Both options and consensus can be NULL. This assures us to either get a
* valid configured value or the default one. */
halflife = get_circuit_priority_halflife(options, consensus, &source);
ewma_tick_len = networkstatus_get_param(consensus,
"CircuitPriorityTickSecs",
EWMA_TICK_LEN_DEFAULT,
EWMA_TICK_LEN_MIN,
EWMA_TICK_LEN_MAX);
/* convert halflife into halflife-per-tick. */
halflife /= EWMA_TICK_LEN;
halflife /= ewma_tick_len;
/* compute per-tick scale factor. */
ewma_scale_factor = exp(LOG_ONEHALF / halflife);
log_info(LD_OR,
"Enabled cell_ewma algorithm because of value in %s; "
"scale factor is %f per %d seconds",
source, ewma_scale_factor, EWMA_TICK_LEN);
source, ewma_scale_factor, ewma_tick_len);
}
/** Return the multiplier necessary to convert the value of a cell sent in

View file

@ -22,28 +22,51 @@
#include "core/or/congestion_control_nola.h"
#include "core/or/congestion_control_westwood.h"
#include "core/or/congestion_control_st.h"
#include "core/or/trace_probes_cc.h"
#include "lib/time/compat_time.h"
#include "feature/nodelist/networkstatus.h"
/* Consensus parameter defaults */
/* Consensus parameter defaults.
*
* More details for each of the parameters can be found in proposal 324,
* section 6.5 including tuning notes. */
#define CIRCWINDOW_INIT (500)
#define CWND_INC_PCT_SS_DFLT (100)
#define SENDME_INC_DFLT (50)
#define CWND_MIN_DFLT (MAX(100, SENDME_INC_DFLT))
#define SENDME_INC_DFLT (50)
#define CWND_INC_DFLT (50)
#define CWND_INC_PCT_SS_DFLT (100)
#define CWND_INC_RATE_DFLT (1)
#define CWND_MAX_DFLT (INT32_MAX)
#define CWND_MIN_DFLT (MAX(100, SENDME_INC_DFLT))
#define BWE_SENDME_MIN_DFLT (5)
#define EWMA_CWND_COUNT_DFLT (2)
/* BDP algorithms for each congestion control algorithms use the piecewise
* estimattor. See section 3.1.4 of proposal 324. */
#define WESTWOOD_BDP_ALG BDP_ALG_PIECEWISE
#define VEGAS_BDP_MIX_ALG BDP_ALG_PIECEWISE
#define NOLA_BDP_ALG BDP_ALG_PIECEWISE
#define EWMA_CWND_COUNT_DFLT 2
/* Indicate OR connection buffer limitations used to stop or start accepting
* cells in its outbuf.
*
* These watermarks are historical to tor in a sense that they've been used
* almost from the genesis point. And were likely defined to fit the bounds of
* TLS records of 16KB which would be around 32 cells.
*
* These are defaults of the consensus parameter "orconn_high" and "orconn_low"
* values. */
#define OR_CONN_HIGHWATER_DFLT (32*1024)
#define OR_CONN_LOWWATER_DFLT (16*1024)
#define BWE_SENDME_MIN_DFLT 5
/* Low and high values of circuit cell queue sizes. They are used to tell when
* to start or stop reading on the streams attached on the circuit.
*
* These are defaults of the consensus parameters "cellq_high" and "cellq_low".
*/
#define CELL_QUEUE_LOW_DFLT (10)
#define CELL_QUEUE_HIGH_DFLT (256)
static uint64_t congestion_control_update_circuit_rtt(congestion_control_t *,
uint64_t);
@ -52,6 +75,59 @@ static bool congestion_control_update_circuit_bdp(congestion_control_t *,
const crypt_path_t *,
uint64_t, uint64_t);
/* Consensus parameters cached. The non static ones are extern. */
static uint32_t cwnd_max = CWND_MAX_DFLT;
int32_t cell_queue_high = CELL_QUEUE_HIGH_DFLT;
int32_t cell_queue_low = CELL_QUEUE_LOW_DFLT;
uint32_t or_conn_highwater = OR_CONN_HIGHWATER_DFLT;
uint32_t or_conn_lowwater = OR_CONN_LOWWATER_DFLT;
/**
* Update global congestion control related consensus parameter values,
* every consensus update.
*/
void
congestion_control_new_consensus_params(const networkstatus_t *ns)
{
#define CELL_QUEUE_HIGH_MIN (1)
#define CELL_QUEUE_HIGH_MAX (1000)
cell_queue_high = networkstatus_get_param(ns, "cellq_high",
CELL_QUEUE_HIGH_DFLT,
CELL_QUEUE_HIGH_MIN,
CELL_QUEUE_HIGH_MAX);
#define CELL_QUEUE_LOW_MIN (1)
#define CELL_QUEUE_LOW_MAX (1000)
cell_queue_low = networkstatus_get_param(ns, "cellq_low",
CELL_QUEUE_LOW_DFLT,
CELL_QUEUE_LOW_MIN,
CELL_QUEUE_LOW_MAX);
#define OR_CONN_HIGHWATER_MIN (CELL_PAYLOAD_SIZE)
#define OR_CONN_HIGHWATER_MAX (INT32_MAX)
or_conn_highwater =
networkstatus_get_param(ns, "orconn_high",
OR_CONN_HIGHWATER_DFLT,
OR_CONN_HIGHWATER_MIN,
OR_CONN_HIGHWATER_MAX);
#define OR_CONN_LOWWATER_MIN (CELL_PAYLOAD_SIZE)
#define OR_CONN_LOWWATER_MAX (INT32_MAX)
or_conn_lowwater =
networkstatus_get_param(ns, "orconn_low",
OR_CONN_LOWWATER_DFLT,
OR_CONN_LOWWATER_MIN,
OR_CONN_LOWWATER_MAX);
#define CWND_MAX_MIN 500
#define CWND_MAX_MAX (INT32_MAX)
cwnd_max =
networkstatus_get_param(NULL, "cc_cwnd_max",
CWND_MAX_DFLT,
CWND_MAX_MIN,
CWND_MAX_MAX);
}
/**
* Set congestion control parameters on a circuit's congestion
* control object based on values from the consensus.
@ -224,24 +300,6 @@ congestion_control_free_(congestion_control_t *cc)
tor_free(cc);
}
/**
* Compute an N-count EWMA, aka N-EWMA. N-EWMA is defined as:
* EWMA = alpha*value + (1-alpha)*EWMA_prev
* with alpha = 2/(N+1).
*
* This works out to:
* EWMA = value*2/(N+1) + EMA_prev*(N-1)/(N+1)
* = (value*2 + EWMA_prev*(N-1))/(N+1)
*/
static inline uint64_t
n_count_ewma(uint64_t curr, uint64_t prev, uint64_t N)
{
if (prev == 0)
return curr;
else
return (2*curr + (N-1)*prev)/(N+1);
}
/**
* Enqueue a u64 timestamp to the end of a queue of timestamps.
*/
@ -558,10 +616,16 @@ time_delta_should_use_heuristics(const congestion_control_t *cc)
return false;
}
static bool is_monotime_clock_broken = false;
/**
* Returns true if the monotime delta is 0, or is significantly
* different than the previous delta. Either case indicates
* that the monotime time source stalled or jumped.
*
* Also caches the clock state in the is_monotime_clock_broken flag,
* so we can also provide a is_monotime_clock_reliable() function,
* used by flow control rate timing.
*/
static bool
time_delta_stalled_or_jumped(const congestion_control_t *cc,
@ -573,22 +637,30 @@ time_delta_stalled_or_jumped(const congestion_control_t *cc,
static ratelim_t stall_info_limit = RATELIM_INIT(60);
log_fn_ratelim(&stall_info_limit, LOG_INFO, LD_CIRC,
"Congestion control cannot measure RTT due to monotime stall.");
return true;
/* If delta is every 0, the monotime clock has stalled, and we should
* not use it anywhere. */
is_monotime_clock_broken = true;
return is_monotime_clock_broken;
}
/* If the old_delta is 0, we have no previous values. So
* just assume this one is valid (beause it is non-zero) */
if (old_delta == 0)
return false;
/* If the old_delta is 0, we have no previous values on this circuit.
*
* So, return the global monotime status from other circuits, and
* do not update.
*/
if (old_delta == 0) {
return is_monotime_clock_broken;
}
/*
* For the heuristic cases, we need at least a few timestamps,
* to average out any previous partial stalls or jumps. So until
* than point, let's just delcare these time values "good enough
* to use".
* than point, let's just use the cached status from other circuits.
*/
if (!time_delta_should_use_heuristics(cc)) {
return false;
return is_monotime_clock_broken;
}
/* If old_delta is significantly larger than new_delta, then
@ -601,7 +673,9 @@ time_delta_stalled_or_jumped(const congestion_control_t *cc,
"), likely due to clock jump.",
new_delta/1000, old_delta/1000);
return true;
is_monotime_clock_broken = true;
return is_monotime_clock_broken;
}
/* If new_delta is significantly larger than old_delta, then
@ -613,10 +687,24 @@ time_delta_stalled_or_jumped(const congestion_control_t *cc,
"), likely due to clock jump.",
new_delta/1000, old_delta/1000);
return true;
is_monotime_clock_broken = true;
return is_monotime_clock_broken;
}
return false;
/* All good! Update cached status, too */
is_monotime_clock_broken = false;
return is_monotime_clock_broken;
}
/**
* Is the monotime clock stalled according to any circuits?
*/
bool
is_monotime_clock_reliable(void)
{
return !is_monotime_clock_broken;
}
/**
@ -753,7 +841,7 @@ congestion_control_update_circuit_bdp(congestion_control_t *cc,
SMARTLIST_FOREACH(cc->sendme_arrival_timestamps, uint64_t *, t,
tor_free(t));
smartlist_clear(cc->sendme_arrival_timestamps);
} else if (curr_rtt_usec) {
} else if (curr_rtt_usec && is_monotime_clock_reliable()) {
/* Sendme-based BDP will quickly measure BDP in much less than
* a cwnd worth of data when in use (in 2-10 SENDMEs).
*
@ -903,7 +991,12 @@ congestion_control_update_circuit_bdp(congestion_control_t *cc,
/* We updated BDP this round if either we had a blocked channel, or
* the curr_rtt_usec was not 0. */
return (blocked_on_chan || curr_rtt_usec != 0);
bool ret = (blocked_on_chan || curr_rtt_usec != 0);
if (ret) {
tor_trace(TR_SUBSYS(cc), TR_EV(bdp_update), circ, cc, curr_rtt_usec,
sendme_rate_bdp);
}
return ret;
}
/**
@ -914,20 +1007,32 @@ congestion_control_dispatch_cc_alg(congestion_control_t *cc,
const circuit_t *circ,
const crypt_path_t *layer_hint)
{
int ret = -END_CIRC_REASON_INTERNAL;
switch (cc->cc_alg) {
case CC_ALG_WESTWOOD:
return congestion_control_westwood_process_sendme(cc, circ, layer_hint);
ret = congestion_control_westwood_process_sendme(cc, circ, layer_hint);
break;
case CC_ALG_VEGAS:
return congestion_control_vegas_process_sendme(cc, circ, layer_hint);
ret = congestion_control_vegas_process_sendme(cc, circ, layer_hint);
break;
case CC_ALG_NOLA:
return congestion_control_nola_process_sendme(cc, circ, layer_hint);
ret = congestion_control_nola_process_sendme(cc, circ, layer_hint);
break;
case CC_ALG_SENDME:
default:
tor_assert(0);
}
return -END_CIRC_REASON_INTERNAL;
if (cc->cwnd > cwnd_max) {
static ratelim_t cwnd_limit = RATELIM_INIT(60);
log_fn_ratelim(&cwnd_limit, LOG_NOTICE, LD_CIRC,
"Congestion control cwnd %"PRIu64" exceeds max %d, clamping.",
cc->cwnd, cwnd_max);
cc->cwnd = cwnd_max;
}
return ret;
}

View file

@ -39,6 +39,64 @@ int congestion_control_get_package_window(const circuit_t *,
int sendme_get_inc_count(const circuit_t *, const crypt_path_t *);
bool circuit_sent_cell_for_sendme(const circuit_t *, const crypt_path_t *);
bool is_monotime_clock_reliable(void);
void congestion_control_new_consensus_params(const networkstatus_t *ns);
/* Ugh, C.. these four are private. Use the getter instead, when
* external to the congestion control code. */
extern uint32_t or_conn_highwater;
extern uint32_t or_conn_lowwater;
extern int32_t cell_queue_high;
extern int32_t cell_queue_low;
/** Stop writing on an orconn when its outbuf is this large */
static inline uint32_t
or_conn_highwatermark(void)
{
return or_conn_highwater;
}
/** Resume writing on an orconn when its outbuf is less than this */
static inline uint32_t
or_conn_lowwatermark(void)
{
return or_conn_lowwater;
}
/** Stop reading on edge connections when we have this many cells
* waiting on the appropriate queue. */
static inline int32_t
cell_queue_highwatermark(void)
{
return cell_queue_high;
}
/** Start reading from edge connections again when we get down to this many
* cells. */
static inline int32_t
cell_queue_lowwatermark(void)
{
return cell_queue_low;
}
/**
* Compute an N-count EWMA, aka N-EWMA. N-EWMA is defined as:
* EWMA = alpha*value + (1-alpha)*EWMA_prev
* with alpha = 2/(N+1).
*
* This works out to:
* EWMA = value*2/(N+1) + EMA_prev*(N-1)/(N+1)
* = (value*2 + EWMA_prev*(N-1))/(N+1)
*/
static inline uint64_t
n_count_ewma(uint64_t curr, uint64_t prev, uint64_t N)
{
if (prev == 0)
return curr;
else
return (2*curr + (N-1)*prev)/(N+1);
}
/* Private section starts. */
#ifdef TOR_CONGESTION_CONTROL_PRIVATE

View file

@ -0,0 +1,710 @@
/* Copyright (c) 2019-2021, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file congestion_control_flow.c
* \brief Code that implements flow control for congestion controlled
* circuits.
*/
#define TOR_CONGESTION_CONTROL_FLOW_PRIVATE
#include "core/or/or.h"
#include "core/or/relay.h"
#include "core/mainloop/connection.h"
#include "core/or/connection_edge.h"
#include "core/mainloop/mainloop.h"
#include "core/or/congestion_control_common.h"
#include "core/or/congestion_control_flow.h"
#include "core/or/congestion_control_st.h"
#include "core/or/circuitlist.h"
#include "core/or/trace_probes_cc.h"
#include "feature/nodelist/networkstatus.h"
#include "trunnel/flow_control_cells.h"
#include "core/or/connection_st.h"
#include "core/or/cell_st.h"
#include "app/config/config.h"
/** Cache consensus parameters */
static uint32_t xoff_client;
static uint32_t xoff_exit;
static uint32_t xon_change_pct;
static uint32_t xon_ewma_cnt;
static uint32_t xon_rate_bytes;
/* In normal operation, we can get a burst of up to 32 cells before returning
* to libevent to flush the outbuf. This is a heuristic from hardcoded values
* and strange logic in connection_bucket_get_share(). */
#define MAX_EXPECTED_CELL_BURST 32
/* The following three are for dropmark rate limiting. They define when we
* scale down our XON, XOFF, and xmit byte counts. Early scaling is beneficial
* because it limits the ability of spurious XON/XOFF to be sent after large
* amounts of data without XON/XOFF. At these limits, after 10MB of data (or
* more), an adversary can only inject (log2(10MB)-log2(200*500))*100 ~= 1000
* cells of fake XOFF/XON before the xmit byte count will be halved enough to
* triggering a limit. */
#define XON_COUNT_SCALE_AT 200
#define XOFF_COUNT_SCALE_AT 200
#define ONE_MEGABYTE (UINT64_C(1) << 20)
#define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE)
/**
* Return the congestion control object of the given edge connection.
*
* Returns NULL if the edge connection doesn't have a cpath_layer or not
* attached to a circuit. But also if the cpath_layer or circuit doesn't have a
* congestion control object.
*/
static inline const congestion_control_t *
edge_get_ccontrol(const edge_connection_t *edge)
{
if (edge->cpath_layer)
return edge->cpath_layer->ccontrol;
else if (edge->on_circuit)
return edge->on_circuit->ccontrol;
else
return NULL;
}
/**
* Update global congestion control related consensus parameter values, every
* consensus update.
*
* More details for each of the parameters can be found in proposal 324,
* section 6.5 including tuning notes.
*/
void
flow_control_new_consensus_params(const networkstatus_t *ns)
{
#define CC_XOFF_CLIENT_DFLT 500
#define CC_XOFF_CLIENT_MIN 1
#define CC_XOFF_CLIENT_MAX 10000
xoff_client = networkstatus_get_param(ns, "cc_xoff_client",
CC_XOFF_CLIENT_DFLT,
CC_XOFF_CLIENT_MIN,
CC_XOFF_CLIENT_MAX)*RELAY_PAYLOAD_SIZE;
#define CC_XOFF_EXIT_DFLT 500
#define CC_XOFF_EXIT_MIN 1
#define CC_XOFF_EXIT_MAX 10000
xoff_exit = networkstatus_get_param(ns, "cc_xoff_exit",
CC_XOFF_EXIT_DFLT,
CC_XOFF_EXIT_MIN,
CC_XOFF_EXIT_MAX)*RELAY_PAYLOAD_SIZE;
#define CC_XON_CHANGE_PCT_DFLT 25
#define CC_XON_CHANGE_PCT_MIN 1
#define CC_XON_CHANGE_PCT_MAX 99
xon_change_pct = networkstatus_get_param(ns, "cc_xon_change_pct",
CC_XON_CHANGE_PCT_DFLT,
CC_XON_CHANGE_PCT_MIN,
CC_XON_CHANGE_PCT_MAX);
#define CC_XON_RATE_BYTES_DFLT (500)
#define CC_XON_RATE_BYTES_MIN (1)
#define CC_XON_RATE_BYTES_MAX (5000)
xon_rate_bytes = networkstatus_get_param(ns, "cc_xon_rate",
CC_XON_RATE_BYTES_DFLT,
CC_XON_RATE_BYTES_MIN,
CC_XON_RATE_BYTES_MAX)*RELAY_PAYLOAD_SIZE;
#define CC_XON_EWMA_CNT_DFLT (2)
#define CC_XON_EWMA_CNT_MIN (1)
#define CC_XON_EWMA_CNT_MAX (100)
xon_ewma_cnt = networkstatus_get_param(ns, "cc_xon_ewma_cnt",
CC_XON_EWMA_CNT_DFLT,
CC_XON_EWMA_CNT_MIN,
CC_XON_EWMA_CNT_MAX);
}
/**
* Send an XOFF for this stream, and note that we sent one
*/
static void
circuit_send_stream_xoff(edge_connection_t *stream)
{
xoff_cell_t xoff;
uint8_t payload[CELL_PAYLOAD_SIZE];
ssize_t xoff_size;
memset(&xoff, 0, sizeof(xoff));
memset(payload, 0, sizeof(payload));
xoff_cell_set_version(&xoff, 0);
if ((xoff_size = xoff_cell_encode(payload, CELL_PAYLOAD_SIZE, &xoff)) < 0) {
log_warn(LD_BUG, "Failed to encode xon cell");
return;
}
if (connection_edge_send_command(stream, RELAY_COMMAND_XOFF,
(char*)payload, (size_t)xoff_size) == 0) {
stream->xoff_sent = true;
}
}
/**
* Compute the recent drain rate (write rate) for this edge
* connection and return it, in KB/sec (1000 bytes/sec).
*
* Returns 0 if the monotime clock is busted.
*/
static inline uint32_t
compute_drain_rate(const edge_connection_t *stream)
{
if (BUG(!is_monotime_clock_reliable())) {
log_warn(LD_BUG, "Computing drain rate with stalled monotime clock");
return 0;
}
uint64_t delta = monotime_absolute_usec() - stream->drain_start_usec;
if (delta == 0) {
log_warn(LD_BUG, "Computing stream drain rate with zero time delta");
return 0;
}
/* Overflow checks */
if (stream->prev_drained_bytes > INT32_MAX/1000 || /* Intermediate */
stream->prev_drained_bytes/delta > INT32_MAX/1000) { /* full value */
return INT32_MAX;
}
/* kb/sec = bytes/usec * 1000 usec/msec * 1000 msec/sec * kb/1000bytes */
return MAX(1, (uint32_t)(stream->prev_drained_bytes * 1000)/delta);
}
/**
* Send an XON for this stream, with appropriate advisory rate information.
*
* Reverts the xoff sent status, and stores the rate information we sent,
* in case it changes.
*/
static void
circuit_send_stream_xon(edge_connection_t *stream)
{
xon_cell_t xon;
uint8_t payload[CELL_PAYLOAD_SIZE];
ssize_t xon_size;
memset(&xon, 0, sizeof(xon));
memset(payload, 0, sizeof(payload));
xon_cell_set_version(&xon, 0);
xon_cell_set_kbps_ewma(&xon, stream->ewma_drain_rate);
if ((xon_size = xon_cell_encode(payload, CELL_PAYLOAD_SIZE, &xon)) < 0) {
log_warn(LD_BUG, "Failed to encode xon cell");
return;
}
/* Store the advisory rate information, to send advisory updates if
* it changes */
stream->ewma_rate_last_sent = stream->ewma_drain_rate;
if (connection_edge_send_command(stream, RELAY_COMMAND_XON, (char*)payload,
(size_t)xon_size) == 0) {
/* Revert the xoff sent status, so we can send another one if need be */
stream->xoff_sent = false;
}
}
/**
* Process a stream XOFF, parsing it, and then stopping reading on
* the edge connection.
*
* Record that we have recieved an xoff, so we know not to resume
* reading on this edge conn until we get an XON.
*
* Returns false if the XOFF did not validate; true if it does.
*/
bool
circuit_process_stream_xoff(edge_connection_t *conn,
const crypt_path_t *layer_hint,
const cell_t *cell)
{
(void)cell;
bool retval = true;
if (BUG(!conn)) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XOFF on invalid stream?");
return false;
}
/* Make sure this XOFF came from the right hop */
if (layer_hint && layer_hint != conn->cpath_layer) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XOFF from wrong hop.");
return false;
}
if (edge_get_ccontrol(conn) == NULL) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XOFF for non-congestion control circuit");
return false;
}
if (conn->xoff_received) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got multiple XOFF on connection");
return false;
}
/* If we are near the max, scale everything down */
if (conn->num_xoff_recv == XOFF_COUNT_SCALE_AT) {
log_info(LD_EDGE, "Scaling down for XOFF count: %d %d %d",
conn->total_bytes_xmit,
conn->num_xoff_recv,
conn->num_xon_recv);
conn->total_bytes_xmit /= 2;
conn->num_xoff_recv /= 2;
conn->num_xon_recv /= 2;
}
conn->num_xoff_recv++;
/* Client-side check to make sure that XOFF is not sent too early,
* for dropmark attacks. The main sidechannel risk is early cells,
* but we also check to make sure that we have not received more XOFFs
* than could have been generated by the bytes we sent.
*/
if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) {
uint32_t limit = 0;
/* TODO: This limit technically needs to come from negotiation,
* and be bounds checked for sanity, because the other endpoint
* may have a different consensus */
if (conn->hs_ident)
limit = xoff_client;
else
limit = xoff_exit;
if (conn->total_bytes_xmit < limit*conn->num_xoff_recv) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got extra XOFF for bytes sent. Got %d, expected max %d",
conn->num_xoff_recv, conn->total_bytes_xmit/limit);
/* We still process this, because the only dropmark defenses
* in C tor are via the vanguards addon's use of the read valid
* cells. So just signal that we think this is not valid protocol
* data and proceed. */
retval = false;
}
}
// TODO: Count how many xoffs we have; log if "too many", for shadow
// analysis of chatter. Possibly add to extra-info?
log_info(LD_EDGE, "Got XOFF!");
connection_stop_reading(TO_CONN(conn));
conn->xoff_received = true;
return retval;
}
/**
* Process a stream XON, and if it validates, clear the xoff
* flag and resume reading on this edge connection.
*
* Also, use provided rate information to rate limit
* reading on this edge (or packagaing from it onto
* the circuit), to avoid XON/XOFF chatter.
*
* Returns true if the XON validates, false otherwise.
*/
bool
circuit_process_stream_xon(edge_connection_t *conn,
const crypt_path_t *layer_hint,
const cell_t *cell)
{
xon_cell_t *xon;
bool retval = true;
if (BUG(!conn)) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XON on invalid stream?");
return false;
}
/* Make sure this XON came from the right hop */
if (layer_hint && layer_hint != conn->cpath_layer) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XON from wrong hop.");
return false;
}
if (edge_get_ccontrol(conn) == NULL) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XON for non-congestion control circuit");
return false;
}
if (xon_cell_parse(&xon, cell->payload+RELAY_HEADER_SIZE,
CELL_PAYLOAD_SIZE-RELAY_HEADER_SIZE) < 0) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Received malformed XON cell.");
return false;
}
/* If we are near the max, scale everything down */
if (conn->num_xon_recv == XON_COUNT_SCALE_AT) {
log_info(LD_EDGE, "Scaling down for XON count: %d %d %d",
conn->total_bytes_xmit,
conn->num_xoff_recv,
conn->num_xon_recv);
conn->total_bytes_xmit /= 2;
conn->num_xoff_recv /= 2;
conn->num_xon_recv /= 2;
}
conn->num_xon_recv++;
/* Client-side check to make sure that XON is not sent too early,
* for dropmark attacks. The main sidechannel risk is early cells,
* but we also check to see that we did not get more XONs than make
* sense for the number of bytes we sent.
*/
if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) {
uint32_t limit = 0;
/* TODO: This limit technically needs to come from negotiation,
* and be bounds checked for sanity, because the other endpoint
* may have a different consensus */
if (conn->hs_ident)
limit = MIN(xoff_client, xon_rate_bytes);
else
limit = MIN(xoff_exit, xon_rate_bytes);
if (conn->total_bytes_xmit < limit*conn->num_xon_recv) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got extra XON for bytes sent. Got %d, expected max %d",
conn->num_xon_recv, conn->total_bytes_xmit/limit);
/* We still process this, because the only dropmark defenses
* in C tor are via the vanguards addon's use of the read valid
* cells. So just signal that we think this is not valid protocol
* data and proceed. */
retval = false;
}
}
log_info(LD_EDGE, "Got XON: %d", xon->kbps_ewma);
/* Adjust the token bucket of this edge connection with the drain rate in
* the XON. Rate is in bytes from kilobit (kpbs). */
uint64_t rate = xon_cell_get_kbps_ewma(xon) * 1000;
if (rate == 0 || INT32_MAX < rate) {
/* No rate. */
rate = INT32_MAX;
}
token_bucket_rw_adjust(&conn->bucket, (uint32_t) rate, (uint32_t) rate);
if (conn->xoff_received) {
/* Clear the fact that we got an XOFF, so that this edge can
* start and stop reading normally */
conn->xoff_received = false;
connection_start_reading(TO_CONN(conn));
}
xon_cell_free(xon);
return retval;
}
/**
* Called from sendme_stream_data_received(), when data arrives
* from a circuit to our edge's outbuf, to decide if we need to send
* an XOFF.
*
* Returns the amount of cells remaining until the buffer is full, at
* which point it sends an XOFF, and returns 0.
*
* Returns less than 0 if we have queued more than a congestion window
* worth of data and need to close the circuit.
*/
int
flow_control_decide_xoff(edge_connection_t *stream)
{
size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
uint32_t buffer_limit_xoff = 0;
if (BUG(edge_get_ccontrol(stream) == NULL)) {
log_err(LD_BUG, "Flow control called for non-congestion control circuit");
return -1;
}
/* Onion services and clients are typically localhost edges, so they
* need different buffering limits than exits do */
if (TO_CONN(stream)->type == CONN_TYPE_AP || stream->hs_ident != NULL) {
buffer_limit_xoff = xoff_client;
} else {
buffer_limit_xoff = xoff_exit;
}
if (total_buffered > buffer_limit_xoff) {
if (!stream->xoff_sent) {
log_info(LD_EDGE, "Sending XOFF: %ld %d",
total_buffered, buffer_limit_xoff);
tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xoff_sending), stream);
circuit_send_stream_xoff(stream);
/* Clear the drain rate. It is considered wrong if we
* got all the way to XOFF */
stream->ewma_drain_rate = 0;
}
}
/* If the outbuf has accumulated more than the expected burst limit of
* cells, then assume it is not draining, and call decide_xon. We must
* do this because writes only happen when the socket unblocks, so
* may not otherwise notice accumulation of data in the outbuf for
* advisory XONs. */
if (total_buffered > MAX_EXPECTED_CELL_BURST*RELAY_PAYLOAD_SIZE) {
flow_control_decide_xon(stream, 0);
}
/* Flow control always takes more data; we rely on the oomkiller to
* handle misbehavior. */
return 0;
}
/**
* Returns true if the stream's drain rate has changed significantly.
*
* Returns false if the monotime clock is stalled, or if we have
* no previous drain rate information.
*/
static bool
stream_drain_rate_changed(const edge_connection_t *stream)
{
if (!is_monotime_clock_reliable()) {
return false;
}
if (!stream->ewma_rate_last_sent) {
return false;
}
if (stream->ewma_drain_rate >
(100+(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) {
return true;
}
if (stream->ewma_drain_rate <
(100-(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) {
return true;
}
return false;
}
/**
* Called whenever we drain an edge connection outbuf by writing on
* its socket, to decide if it is time to send an xon.
*
* The n_written parameter tells us how many bytes we have written
* this time, which is used to compute the advisory drain rate fields.
*/
void
flow_control_decide_xon(edge_connection_t *stream, size_t n_written)
{
size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
/* Bounds check the number of drained bytes, and scale */
if (stream->drained_bytes >= UINT32_MAX - n_written) {
/* Cut the bytes in half, and move the start time up halfway to now
* (if we have one). */
stream->drained_bytes /= 2;
if (stream->drain_start_usec) {
uint64_t now = monotime_absolute_usec();
stream->drain_start_usec = now - (now-stream->drain_start_usec)/2;
}
}
/* Accumulate drained bytes since last rate computation */
stream->drained_bytes += n_written;
tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon), stream, n_written);
/* Check for bad monotime clock and bytecount wrap */
if (!is_monotime_clock_reliable()) {
/* If the monotime clock ever goes wrong, the safest thing to do
* is just clear our short-term rate info and wait for the clock to
* become reliable again.. */
stream->drain_start_usec = 0;
stream->drained_bytes = 0;
} else {
/* If we have no drain start timestamp, and we still have
* remaining buffer, start the buffering counter */
if (!stream->drain_start_usec && total_buffered > 0) {
log_debug(LD_EDGE, "Began edge buffering: %d %d %ld",
stream->ewma_rate_last_sent,
stream->ewma_drain_rate,
total_buffered);
tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_start),
stream);
stream->drain_start_usec = monotime_absolute_usec();
stream->drained_bytes = 0;
}
}
if (stream->drain_start_usec) {
/* If we have spent enough time in a queued state, update our drain
* rate. */
if (stream->drained_bytes > xon_rate_bytes) {
/* No previous drained bytes means it is the first time we are computing
* it so use the value we just drained onto the socket as a baseline. It
* won't be accurate but it will be a start towards the right value.
*
* We have to do this in order to have a drain rate else we could be
* sending a drain rate of 0 in an XON which would be undesirable and
* basically like sending an XOFF. */
if (stream->prev_drained_bytes == 0) {
stream->prev_drained_bytes = stream->drained_bytes;
}
uint32_t drain_rate = compute_drain_rate(stream);
/* Once the drain rate has been computed, note how many bytes we just
* drained so it can be used at the next calculation. We do this here
* because it gets reset once the rate is changed. */
stream->prev_drained_bytes = stream->drained_bytes;
if (drain_rate) {
stream->ewma_drain_rate =
(uint32_t)n_count_ewma(drain_rate,
stream->ewma_drain_rate,
xon_ewma_cnt);
log_debug(LD_EDGE, "Updating drain rate: %d %d %ld",
drain_rate,
stream->ewma_drain_rate,
total_buffered);
tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_update),
stream, drain_rate);
/* Reset recent byte counts. This prevents us from sending advisory
* XONs more frequent than every xon_rate_bytes. */
stream->drained_bytes = 0;
stream->drain_start_usec = 0;
}
}
}
/* If we don't have an XOFF outstanding, consider updating an
* old rate */
if (!stream->xoff_sent) {
if (stream_drain_rate_changed(stream)) {
/* If we are still buffering and the rate changed, update
* advisory XON */
log_info(LD_EDGE, "Sending rate-change XON: %d %d %ld",
stream->ewma_rate_last_sent,
stream->ewma_drain_rate,
total_buffered);
tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_rate_change), stream);
circuit_send_stream_xon(stream);
}
} else if (total_buffered == 0) {
log_info(LD_EDGE, "Sending XON: %d %d %ld",
stream->ewma_rate_last_sent,
stream->ewma_drain_rate,
total_buffered);
tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_partial_drain), stream);
circuit_send_stream_xon(stream);
}
/* If the buffer has fully emptied, clear the drain timestamp,
* so we can total only bytes drained while outbuf is 0. */
if (total_buffered == 0) {
stream->drain_start_usec = 0;
/* After we've spent 'xon_rate_bytes' with the queue fully drained,
* double any rate we sent. */
if (stream->drained_bytes >= xon_rate_bytes &&
stream->ewma_rate_last_sent) {
stream->ewma_drain_rate = MIN(INT32_MAX, 2*stream->ewma_drain_rate);
log_debug(LD_EDGE,
"Queue empty for xon_rate_limit bytes: %d %d",
stream->ewma_rate_last_sent,
stream->ewma_drain_rate);
tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_doubled), stream);
/* Resetting the drained bytes count. We need to keep its value as a
* previous so the drain rate calculation takes into account what was
* actually drain the last time. */
stream->prev_drained_bytes = stream->drained_bytes;
stream->drained_bytes = 0;
}
}
return;
}
/**
* Note that we packaged some data on this stream. Used to enforce
* client-side dropmark limits
*/
void
flow_control_note_sent_data(edge_connection_t *stream, size_t len)
{
/* If we are near the max, scale everything down */
if (stream->total_bytes_xmit >= TOTAL_XMIT_SCALE_AT-len) {
log_info(LD_EDGE, "Scaling down for flow control xmit bytes:: %d %d %d",
stream->total_bytes_xmit,
stream->num_xoff_recv,
stream->num_xon_recv);
stream->total_bytes_xmit /= 2;
stream->num_xoff_recv /= 2;
stream->num_xon_recv /= 2;
}
stream->total_bytes_xmit += len;
}
/** Returns true if an edge connection uses flow control */
bool
edge_uses_flow_control(const edge_connection_t *stream)
{
bool ret = (stream->on_circuit && stream->on_circuit->ccontrol) ||
(stream->cpath_layer && stream->cpath_layer->ccontrol);
/* All circuits with congestion control use flow control */
return ret;
}
/**
* Returns the max RTT for the circuit that carries this stream,
* as observed by congestion control.
*/
uint64_t
edge_get_max_rtt(const edge_connection_t *stream)
{
if (stream->on_circuit && stream->on_circuit->ccontrol)
return stream->on_circuit->ccontrol->max_rtt_usec;
else if (stream->cpath_layer && stream->cpath_layer->ccontrol)
return stream->cpath_layer->ccontrol->max_rtt_usec;
return 0;
}
/** Returns true if a connection is an edge conn that uses flow control */
bool
conn_uses_flow_control(connection_t *conn)
{
bool ret = false;
if (CONN_IS_EDGE(conn)) {
edge_connection_t *edge = TO_EDGE_CONN(conn);
if (edge_uses_flow_control(edge)) {
ret = true;
}
}
return ret;
}

View file

@ -0,0 +1,48 @@
/* Copyright (c) 2019-2021, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file congestion_control_flow.h
* \brief APIs for stream flow control on congestion controlled circuits.
**/
#ifndef TOR_CONGESTION_CONTROL_FLOW_H
#define TOR_CONGESTION_CONTROL_FLOW_H
#include "core/or/crypt_path_st.h"
#include "core/or/circuit_st.h"
#include "core/or/edge_connection_st.h"
void flow_control_new_consensus_params(const struct networkstatus_t *);
bool circuit_process_stream_xoff(edge_connection_t *conn,
const crypt_path_t *layer_hint,
const cell_t *cell);
bool circuit_process_stream_xon(edge_connection_t *conn,
const crypt_path_t *layer_hint,
const cell_t *cell);
int flow_control_decide_xoff(edge_connection_t *stream);
void flow_control_decide_xon(edge_connection_t *stream, size_t n_written);
void flow_control_note_sent_data(edge_connection_t *stream, size_t len);
bool edge_uses_flow_control(const edge_connection_t *stream);
bool conn_uses_flow_control(connection_t *stream);
uint64_t edge_get_max_rtt(const edge_connection_t *);
/* Private section starts. */
#ifdef TOR_CONGESTION_CONTROL_FLOW_PRIVATE
/*
* Unit tests declaractions.
*/
#ifdef TOR_UNIT_TESTS
#endif /* defined(TOR_UNIT_TESTS) */
#endif /* defined(TOR_CONGESTION_CONTROL_FLOW_PRIVATE) */
#endif /* !defined(TOR_CONGESTION_CONTROL_FLOW_H) */

View file

@ -69,6 +69,8 @@
#include "core/or/circuituse.h"
#include "core/or/circuitpadding.h"
#include "core/or/connection_edge.h"
#include "core/or/congestion_control_flow.h"
#include "core/or/circuitstats.h"
#include "core/or/connection_or.h"
#include "core/or/extendinfo.h"
#include "core/or/policies.h"
@ -614,20 +616,39 @@ connection_half_edge_add(const edge_connection_t *conn,
half_conn->stream_id = conn->stream_id;
// How many sendme's should I expect?
half_conn->sendmes_pending =
(STREAMWINDOW_START-conn->package_window)/STREAMWINDOW_INCREMENT;
// Is there a connected cell pending?
half_conn->connected_pending = conn->base_.state ==
AP_CONN_STATE_CONNECT_WAIT;
/* Data should only arrive if we're not waiting on a resolved cell.
* It can arrive after waiting on connected, because of optimistic
* data. */
if (conn->base_.state != AP_CONN_STATE_RESOLVE_WAIT) {
// How many more data cells can arrive on this id?
half_conn->data_pending = conn->deliver_window;
if (edge_uses_flow_control(conn)) {
/* If the edge uses the new congestion control flow control, we must use
* time-based limits on half-edge activity. */
uint64_t timeout_usec = (uint64_t)(get_circuit_build_timeout_ms()*1000);
half_conn->used_ccontrol = 1;
/* If this is an onion service circuit, double the CBT as an approximate
* value for the other half of the circuit */
if (conn->hs_ident) {
timeout_usec *= 2;
}
/* The stream should stop seeing any use after the larger of the circuit
* RTT and the overall circuit build timeout */
half_conn->end_ack_expected_usec = MAX(timeout_usec,
edge_get_max_rtt(conn)) +
monotime_absolute_usec();
} else {
// How many sendme's should I expect?
half_conn->sendmes_pending =
(STREAMWINDOW_START-conn->package_window)/STREAMWINDOW_INCREMENT;
/* Data should only arrive if we're not waiting on a resolved cell.
* It can arrive after waiting on connected, because of optimistic
* data. */
if (conn->base_.state != AP_CONN_STATE_RESOLVE_WAIT) {
// How many more data cells can arrive on this id?
half_conn->data_pending = conn->deliver_window;
}
}
insert_at = smartlist_bsearch_idx(circ->half_streams, &half_conn->stream_id,
@ -688,6 +709,12 @@ connection_half_edge_is_valid_data(const smartlist_t *half_conns,
if (!half)
return 0;
if (half->used_ccontrol) {
if (monotime_absolute_usec() > half->end_ack_expected_usec)
return 0;
return 1;
}
if (half->data_pending > 0) {
half->data_pending--;
return 1;
@ -740,6 +767,10 @@ connection_half_edge_is_valid_sendme(const smartlist_t *half_conns,
if (!half)
return 0;
/* congestion control edges don't use sendmes */
if (half->used_ccontrol)
return 0;
if (half->sendmes_pending > 0) {
half->sendmes_pending--;
return 1;

View file

@ -65,6 +65,7 @@
#include "core/or/scheduler.h"
#include "feature/nodelist/torcert.h"
#include "core/or/channelpadding.h"
#include "core/or/congestion_control_common.h"
#include "feature/dirauth/authmode.h"
#include "feature/hs/hs_service.h"
@ -636,7 +637,7 @@ connection_or_flushed_some(or_connection_t *conn)
/* If we're under the low water mark, add cells until we're just over the
* high water mark. */
datalen = connection_get_outbuf_len(TO_CONN(conn));
if (datalen < OR_CONN_LOWWATER) {
if (datalen < or_conn_lowwatermark()) {
/* Let the scheduler know */
scheduler_channel_wants_writes(TLS_CHAN_TO_BASE(conn->chan));
}
@ -660,9 +661,9 @@ connection_or_num_cells_writeable(or_connection_t *conn)
* used to trigger when to start writing after we've stopped.
*/
datalen = connection_get_outbuf_len(TO_CONN(conn));
if (datalen < OR_CONN_HIGHWATER) {
if (datalen < or_conn_highwatermark()) {
cell_network_size = get_cell_network_size(conn->wide_circ_ids);
n = CEIL_DIV(OR_CONN_HIGHWATER - datalen, cell_network_size);
n = CEIL_DIV(or_conn_highwatermark() - datalen, cell_network_size);
}
return n;

View file

@ -15,6 +15,7 @@
#include "core/or/or.h"
#include "core/or/connection_st.h"
#include "lib/evloop/token_bucket.h"
/** Subtype of connection_t for an "edge connection" -- that is, an entry (ap)
* connection, or an exit. */
@ -73,6 +74,60 @@ struct edge_connection_t {
* that's going away and being used on channels instead. We still tag
* edge connections with dirreq_id from circuits, so it's copied here. */
uint64_t dirreq_id;
/* The following are flow control fields */
/** Used for rate limiting the read side of this edge connection when
* congestion control is enabled on its circuit. The XON cell ewma_drain_rate
* parameter is used to set the bucket limits. */
token_bucket_rw_t bucket;
/**
* Monotime timestamp of the last time we sent a flow control message
* for this edge, used to compute advisory rates */
uint64_t drain_start_usec;
/**
* Number of bytes written since we either emptied our buffers,
* or sent an advisory drate rate. Can wrap, buf if so,
* we must reset the usec timestamp above. (Or make this u64, idk).
*/
uint32_t drained_bytes;
uint32_t prev_drained_bytes;
/**
* N_EWMA of the drain rate of writes on this edge conn
* while buffers were present.
*/
uint32_t ewma_drain_rate;
/**
* The ewma drain rate the last time we sent an xon.
*/
uint32_t ewma_rate_last_sent;
/**
* The following fields are used to count the total bytes sent on this
* stream, and compare them to the number of XON and XOFFs recieved, so
* that clients can check rate limits of XOFF/XON to prevent dropmark
* attacks. */
uint32_t total_bytes_xmit;
/** Number of XOFFs received */
uint8_t num_xoff_recv;
/** Number of XONs received */
uint8_t num_xon_recv;
/**
* Flag that tells us if an XOFF has been sent; cleared when we send an XON.
* Used to avoid sending multiple */
uint8_t xoff_sent : 1;
/** Flag that tells us if an XOFF has been received; cleared when we get
* an XON. Used to ensure that this edge keeps reads on its edge socket
* disabled. */
uint8_t xoff_received : 1;
};
#endif /* !defined(EDGE_CONNECTION_ST_H) */

View file

@ -31,6 +31,18 @@ typedef struct half_edge_t {
* our deliver window */
int data_pending;
/**
* Monotime timestamp of when the other end should have successfuly
* shut down the stream and stop sending data, based on the larger
* of circuit RTT and CBT. Used if 'used_ccontrol' is true, to expire
* the half_edge at this monotime timestamp. */
uint64_t end_ack_expected_usec;
/**
* Did this edge use congestion control? If so, use
* timer instead of pending data approach */
int used_ccontrol : 1;
/** Is there a connected cell pending? */
int connected_pending : 1;
} half_edge_t;

View file

@ -39,6 +39,7 @@ LIBTOR_APP_A_SOURCES += \
src/core/or/congestion_control_vegas.c \
src/core/or/congestion_control_nola.c \
src/core/or/congestion_control_westwood.c \
src/core/or/congestion_control_flow.c \
src/core/or/status.c \
src/core/or/versions.c
@ -82,6 +83,7 @@ noinst_HEADERS += \
src/core/or/entry_port_cfg_st.h \
src/core/or/extend_info_st.h \
src/core/or/listener_connection_st.h \
src/core/or/lttng_cc.inc \
src/core/or/lttng_circuit.inc \
src/core/or/onion.h \
src/core/or/or.h \
@ -102,6 +104,7 @@ noinst_HEADERS += \
src/core/or/relay_crypto_st.h \
src/core/or/scheduler.h \
src/core/or/sendme.h \
src/core/or/congestion_control_flow.h \
src/core/or/congestion_control_common.h \
src/core/or/congestion_control_vegas.h \
src/core/or/congestion_control_nola.h \
@ -115,7 +118,9 @@ noinst_HEADERS += \
if USE_TRACING_INSTRUMENTATION_LTTNG
LIBTOR_APP_A_SOURCES += \
src/core/or/trace_probes_cc.c \
src/core/or/trace_probes_circuit.c
noinst_HEADERS += \
src/core/or/trace_probes_cc.h \
src/core/or/trace_probes_circuit.h
endif

166
src/core/or/lttng_cc.inc Normal file
View file

@ -0,0 +1,166 @@
/* Copyright (c) 2021, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file lttng_cc.inc
* \brief LTTng tracing probe declaration for the congestion control subsystem.
* It is in this .inc file due to the non C standard syntax and the way
* we guard the header with the LTTng specific
* TRACEPOINT_HEADER_MULTI_READ.
**/
#include "orconfig.h"
/* We only build the following if LTTng instrumentation has been enabled. */
#ifdef USE_TRACING_INSTRUMENTATION_LTTNG
/* The following defines are LTTng-UST specific. */
#undef TRACEPOINT_PROVIDER
#define TRACEPOINT_PROVIDER tor_cc
#undef TRACEPOINT_INCLUDE
#define TRACEPOINT_INCLUDE "./src/core/or/lttng_cc.inc"
#if !defined(LTTNG_CC_INC) || defined(TRACEPOINT_HEADER_MULTI_READ)
#define LTTNG_CC_INC
#include <lttng/tracepoint.h>
/*
* Flow Control
*/
/* Emitted everytime the flow_control_decide_xon() function is called. */
TRACEPOINT_EVENT(tor_cc, flow_decide_xon,
TP_ARGS(const edge_connection_t *, stream, size_t, n_written),
TP_FIELDS(
ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier)
ctf_integer(size_t, written_bytes, n_written)
ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes)
ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes)
ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent)
ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate)
ctf_integer(size_t, outbuf_len,
connection_get_outbuf_len(TO_CONN(stream)))
)
)
/* Emitted when flow control starts measuring the drain rate. */
TRACEPOINT_EVENT(tor_cc, flow_decide_xon_drain_start,
TP_ARGS(const edge_connection_t *, stream),
TP_FIELDS(
ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier)
ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes)
ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes)
ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent)
ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate)
ctf_integer(size_t, outbuf_len,
connection_get_outbuf_len(TO_CONN(stream)))
)
)
/* Emitted when the drain rate is updated. The new_drain_rate value is what was
* just computed. */
TRACEPOINT_EVENT(tor_cc, flow_decide_xon_drain_update,
TP_ARGS(const edge_connection_t *, stream, uint32_t, drain_rate),
TP_FIELDS(
ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier)
ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes)
ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes)
ctf_integer(uint32_t, new_drain_rate, drain_rate)
ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent)
ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate)
ctf_integer(size_t, outbuf_len,
connection_get_outbuf_len(TO_CONN(stream)))
)
)
/* Emitted when an XON cell is sent due to a notice in a drain rate change. */
TRACEPOINT_EVENT(tor_cc, flow_decide_xon_rate_change,
TP_ARGS(const edge_connection_t *, stream),
TP_FIELDS(
ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier)
ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes)
ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes)
ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent)
ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate)
ctf_integer(size_t, outbuf_len,
connection_get_outbuf_len(TO_CONN(stream)))
)
)
/* Emitted when an XON cell is sent because we partially or fully drained the
* edge connection buffer. */
TRACEPOINT_EVENT(tor_cc, flow_decide_xon_partial_drain,
TP_ARGS(const edge_connection_t *, stream),
TP_FIELDS(
ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier)
ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes)
ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes)
ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent)
ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate)
ctf_integer(size_t, outbuf_len,
connection_get_outbuf_len(TO_CONN(stream)))
)
)
/* Emitted when we double the drain rate which is an attempt to see if we can
* speed things up. */
TRACEPOINT_EVENT(tor_cc, flow_decide_xon_drain_doubled,
TP_ARGS(const edge_connection_t *, stream),
TP_FIELDS(
ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier)
ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes)
ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes)
ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent)
ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate)
ctf_integer(size_t, outbuf_len,
connection_get_outbuf_len(TO_CONN(stream)))
)
)
/* XOFF */
/* Emitted when we send an XOFF cell. */
TRACEPOINT_EVENT(tor_cc, flow_decide_xoff_sending,
TP_ARGS(const edge_connection_t *, stream),
TP_FIELDS(
ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier)
ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes)
ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes)
ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent)
ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate)
ctf_integer(size_t, outbuf_len,
connection_get_outbuf_len(TO_CONN(stream)))
)
)
/*
* Congestion Control
*/
/* Emitted when the BDP value has been updated. */
TRACEPOINT_EVENT(tor_cc, bdp_update,
TP_ARGS(const circuit_t *, circ, const congestion_control_t *, cc,
uint64_t, curr_rtt_usec, uint64_t, sendme_rate_bdp),
TP_FIELDS(
ctf_integer(uint64_t, circuit_ptr, circ)
ctf_integer(uint32_t, n_circ_id, circ->n_circ_id)
ctf_integer(uint64_t, min_rtt_usec, cc->min_rtt_usec)
ctf_integer(uint64_t, curr_rtt_usec, curr_rtt_usec)
ctf_integer(uint64_t, ewma_rtt_usec, cc->ewma_rtt_usec)
ctf_integer(uint64_t, max_rtt_usec, cc->max_rtt_usec)
ctf_integer(uint64_t, bdp_inflight_rtt, cc->bdp[BDP_ALG_INFLIGHT_RTT])
ctf_integer(uint64_t, bdp_cwnd_rtt, cc->bdp[BDP_ALG_CWND_RTT])
ctf_integer(uint64_t, bdp_sendme_rate, cc->bdp[BDP_ALG_SENDME_RATE])
ctf_integer(uint64_t, bdp_piecewise, cc->bdp[BDP_ALG_PIECEWISE])
ctf_integer(uint64_t, sendme_rate_bdp, sendme_rate_bdp)
)
)
#endif /* LTTNG_CC_INC || TRACEPOINT_HEADER_MULTI_READ */
/* Must be included after the probes declaration. */
#include <lttng/tracepoint-event.h>
#endif /* USE_TRACING_INSTRUMENTATION_LTTNG */

View file

@ -210,6 +210,9 @@ struct curve25519_public_key_t;
#define RELAY_COMMAND_PADDING_NEGOTIATE 41
#define RELAY_COMMAND_PADDING_NEGOTIATED 42
#define RELAY_COMMAND_XOFF 43
#define RELAY_COMMAND_XON 44
/* Reasons why an OR connection is closed. */
#define END_OR_CONN_REASON_DONE 1
#define END_OR_CONN_REASON_REFUSED 2 /* connection refused */
@ -591,18 +594,6 @@ typedef struct or_handshake_state_t or_handshake_state_t;
/** Length of Extended ORPort connection identifier. */
#define EXT_OR_CONN_ID_LEN DIGEST_LEN /* 20 */
/*
* OR_CONN_HIGHWATER and OR_CONN_LOWWATER moved from connection_or.c so
* channeltls.c can see them too.
*/
/** When adding cells to an OR connection's outbuf, keep adding until the
* outbuf is at least this long, or we run out of cells. */
#define OR_CONN_HIGHWATER (32*1024)
/** Add cells to an OR connection's outbuf whenever the outbuf's data length
* drops below this size. */
#define OR_CONN_LOWWATER (16*1024)
typedef struct connection_t connection_t;
typedef struct control_connection_t control_connection_t;

View file

@ -98,6 +98,7 @@
#include "core/or/socks_request_st.h"
#include "core/or/sendme.h"
#include "core/or/congestion_control_common.h"
#include "core/or/congestion_control_flow.h"
static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
cell_direction_t cell_direction,
@ -116,13 +117,6 @@ static void adjust_exit_policy_from_exitpolicy_failure(origin_circuit_t *circ,
node_t *node,
const tor_addr_t *addr);
/** Stop reading on edge connections when we have this many cells
* waiting on the appropriate queue. */
#define CELL_QUEUE_HIGHWATER_SIZE 256
/** Start reading from edge connections again when we get down to this many
* cells. */
#define CELL_QUEUE_LOWWATER_SIZE 64
/** Stats: how many relay cells have originated at this hop, or have
* been relayed onward (not recognized at this hop)?
*/
@ -1739,6 +1733,44 @@ handle_relay_cell_command(cell_t *cell, circuit_t *circ,
sendme_connection_edge_consider_sending(conn);
}
return 0;
case RELAY_COMMAND_XOFF:
if (!conn) {
if (CIRCUIT_IS_ORIGIN(circ)) {
origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
if (relay_crypt_from_last_hop(ocirc, layer_hint) &&
connection_half_edge_is_valid_data(ocirc->half_streams,
rh->stream_id)) {
circuit_read_valid_data(ocirc, rh->length);
}
}
return 0;
}
if (circuit_process_stream_xoff(conn, layer_hint, cell)) {
if (CIRCUIT_IS_ORIGIN(circ)) {
circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length);
}
}
return 0;
case RELAY_COMMAND_XON:
if (!conn) {
if (CIRCUIT_IS_ORIGIN(circ)) {
origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
if (relay_crypt_from_last_hop(ocirc, layer_hint) &&
connection_half_edge_is_valid_data(ocirc->half_streams,
rh->stream_id)) {
circuit_read_valid_data(ocirc, rh->length);
}
}
return 0;
}
if (circuit_process_stream_xon(conn, layer_hint, cell)) {
if (CIRCUIT_IS_ORIGIN(circ)) {
circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length);
}
}
return 0;
case RELAY_COMMAND_END:
reason = rh->length > 0 ?
@ -2287,7 +2319,7 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
}
/* Handle the stream-level SENDME package window. */
if (sendme_note_stream_data_packaged(conn) < 0) {
if (sendme_note_stream_data_packaged(conn, length) < 0) {
connection_stop_reading(TO_CONN(conn));
log_debug(domain,"conn->package_window reached 0.");
circuit_consider_stop_edge_reading(circ, cpath_layer);
@ -2361,8 +2393,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
cells_on_queue = or_circ->p_chan_cells.n;
}
if (CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue < max_to_package)
max_to_package = CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue;
if (cell_queue_highwatermark() - cells_on_queue < max_to_package)
max_to_package = cell_queue_highwatermark() - cells_on_queue;
/* Once we used to start listening on the streams in the order they
* appeared in the linked list. That leads to starvation on the
@ -2402,7 +2434,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
/* Activate reading starting from the chosen stream */
for (conn=chosen_stream; conn; conn = conn->next_stream) {
/* Start reading for the streams starting from here */
if (conn->base_.marked_for_close || conn->package_window <= 0)
if (conn->base_.marked_for_close || conn->package_window <= 0 ||
conn->xoff_received)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));
@ -2413,7 +2446,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
}
/* Go back and do the ones we skipped, circular-style */
for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) {
if (conn->base_.marked_for_close || conn->package_window <= 0)
if (conn->base_.marked_for_close || conn->package_window <= 0 ||
conn->xoff_received)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));
@ -3080,7 +3114,7 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
/* Is the cell queue low enough to unblock all the streams that are waiting
* to write to this circuit? */
if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
if (streams_blocked && queue->n <= cell_queue_lowwatermark())
set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */
/* If n_flushed < max still, loop around and pick another circuit */
@ -3198,7 +3232,7 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
/* If we have too many cells on the circuit, we should stop reading from
* the edge streams for a while. */
if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE)
if (!streams_blocked && queue->n >= cell_queue_highwatermark())
set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */
if (streams_blocked && fromstream) {

View file

@ -22,6 +22,7 @@
#include "core/or/relay.h"
#include "core/or/sendme.h"
#include "core/or/congestion_control_common.h"
#include "core/or/congestion_control_flow.h"
#include "feature/nodelist/networkstatus.h"
#include "lib/ctime/di_ops.h"
#include "trunnel/sendme_cell.h"
@ -370,6 +371,10 @@ sendme_connection_edge_consider_sending(edge_connection_t *conn)
int log_domain = TO_CONN(conn)->type == CONN_TYPE_AP ? LD_APP : LD_EXIT;
/* If we use flow control, we do not send stream sendmes */
if (edge_uses_flow_control(conn))
goto end;
/* Don't send it if we still have data to deliver. */
if (connection_outbuf_too_full(TO_CONN(conn))) {
goto end;
@ -546,6 +551,12 @@ sendme_process_stream_level(edge_connection_t *conn, circuit_t *circ,
tor_assert(conn);
tor_assert(circ);
if (edge_uses_flow_control(conn)) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Congestion control got stream sendme");
return -END_CIRC_REASON_TORPROTOCOL;
}
/* Don't allow the other endpoint to request more than our maximum (i.e.
* initial) stream SENDME window worth of data. Well-behaved stock clients
* will not request more than this max (as per the check in the while loop
@ -603,7 +614,12 @@ int
sendme_stream_data_received(edge_connection_t *conn)
{
tor_assert(conn);
return --conn->deliver_window;
if (edge_uses_flow_control(conn)) {
return flow_control_decide_xoff(conn);
} else {
return --conn->deliver_window;
}
}
/* Called when a relay DATA cell is packaged on the given circuit. If
@ -651,10 +667,18 @@ sendme_note_circuit_data_packaged(circuit_t *circ, crypt_path_t *layer_hint)
/* Called when a relay DATA cell is packaged for the given edge connection
* conn. Update the package window and return its new value. */
int
sendme_note_stream_data_packaged(edge_connection_t *conn)
sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len)
{
tor_assert(conn);
if (edge_uses_flow_control(conn)) {
flow_control_note_sent_data(conn, len);
if (conn->xoff_received)
return -1;
else
return 1;
}
--conn->package_window;
log_debug(LD_APP, "Stream package_window now %d.", conn->package_window);
return conn->package_window;

View file

@ -33,7 +33,7 @@ int sendme_circuit_data_received(circuit_t *circ, crypt_path_t *layer_hint);
/* Update package window functions. */
int sendme_note_circuit_data_packaged(circuit_t *circ,
crypt_path_t *layer_hint);
int sendme_note_stream_data_packaged(edge_connection_t *conn);
int sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len);
/* Record cell digest on circuit. */
void sendme_record_cell_digest_on_circ(circuit_t *circ, crypt_path_t *cpath);

View file

@ -0,0 +1,33 @@
/* Copyright (c) 2021, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file trace_probes_cc.c
* \brief Tracepoint provider source file for the cc subsystem. Probes
* are generated within this C file for LTTng-UST
**/
#include "orconfig.h"
/*
* Following section is specific to LTTng-UST.
*/
#ifdef USE_TRACING_INSTRUMENTATION_LTTNG
/* Header files that the probes need. */
#include "core/or/or.h"
#include "core/or/channel.h"
#include "core/or/circuit_st.h"
#include "core/or/circuitlist.h"
#include "core/or/congestion_control_st.h"
#include "core/or/connection_st.h"
#include "core/or/edge_connection_st.h"
#include "core/or/or_circuit_st.h"
#include "core/or/origin_circuit_st.h"
#define TRACEPOINT_DEFINE
#define TRACEPOINT_CREATE_PROBES
#include "core/or/trace_probes_cc.h"
#endif /* defined(USE_TRACING_INSTRUMENTATION_LTTNG) */

View file

@ -0,0 +1,22 @@
/* Copyright (c) 2021, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file trace_probes_cc.c
* \brief The tracing probes for the congestion control subsystem.
* Currently, only LTTng-UST probes are available.
**/
#ifndef TOR_TRACE_PROBES_CC_H
#define TOR_TRACE_PROBES_CC_H
#include "lib/trace/events.h"
/* We only build the following if LTTng instrumentation has been enabled. */
#ifdef USE_TRACING_INSTRUMENTATION_LTTNG
#include "core/or/lttng_cc.inc"
#endif /* USE_TRACING_INSTRUMENTATION_LTTNG */
#endif /* !defined(TOR_TRACE_PROBES_CC_H) */

View file

@ -45,6 +45,8 @@
#include "core/or/channel.h"
#include "core/or/channelpadding.h"
#include "core/or/circuitpadding.h"
#include "core/or/congestion_control_common.h"
#include "core/or/congestion_control_flow.h"
#include "core/or/circuitmux.h"
#include "core/or/circuitmux_ewma.h"
#include "core/or/circuitstats.h"
@ -1699,6 +1701,8 @@ notify_after_networkstatus_changes(void)
channelpadding_new_consensus_params(c);
circpad_new_consensus_params(c);
router_new_consensus_params(c);
congestion_control_new_consensus_params(c);
flow_control_new_consensus_params(c);
/* Maintenance of our L2 guard list */
maintain_layer2_guards();

View file

@ -20,6 +20,7 @@
#include "lib/tls/tortls.h"
#include "core/or/or_connection_st.h"
#include "core/or/congestion_control_common.h"
/* Test suite stuff */
#include "test/test.h"
@ -155,7 +156,7 @@ test_channeltls_num_bytes_queued(void *arg)
* - 2 cells.
*/
n = ch->num_cells_writeable(ch);
tt_int_op(n, OP_EQ, CEIL_DIV(OR_CONN_HIGHWATER, 512) - 2);
tt_int_op(n, OP_EQ, CEIL_DIV(or_conn_highwatermark(), 512) - 2);
UNMOCK(buf_datalen);
tlschan_buf_datalen_mock_target = NULL;
tlschan_buf_datalen_mock_size = 0;

View file

@ -0,0 +1,382 @@
/* flow_control_cells.c -- generated by Trunnel v1.5.3.
* https://gitweb.torproject.org/trunnel.git
* You probably shouldn't edit this file.
*/
#include <stdlib.h>
#include "trunnel-impl.h"
#include "flow_control_cells.h"
#define TRUNNEL_SET_ERROR_CODE(obj) \
do { \
(obj)->trunnel_error_code_ = 1; \
} while (0)
#if defined(__COVERITY__) || defined(__clang_analyzer__)
/* If we're running a static analysis tool, we don't want it to complain
* that some of our remaining-bytes checks are dead-code. */
int flowcontrolcells_deadcode_dummy__ = 0;
#define OR_DEADCODE_DUMMY || flowcontrolcells_deadcode_dummy__
#else
#define OR_DEADCODE_DUMMY
#endif
#define CHECK_REMAINING(nbytes, label) \
do { \
if (remaining < (nbytes) OR_DEADCODE_DUMMY) { \
goto label; \
} \
} while (0)
xoff_cell_t *
xoff_cell_new(void)
{
xoff_cell_t *val = trunnel_calloc(1, sizeof(xoff_cell_t));
if (NULL == val)
return NULL;
return val;
}
/** Release all storage held inside 'obj', but do not free 'obj'.
*/
static void
xoff_cell_clear(xoff_cell_t *obj)
{
(void) obj;
}
void
xoff_cell_free(xoff_cell_t *obj)
{
if (obj == NULL)
return;
xoff_cell_clear(obj);
trunnel_memwipe(obj, sizeof(xoff_cell_t));
trunnel_free_(obj);
}
uint8_t
xoff_cell_get_version(const xoff_cell_t *inp)
{
return inp->version;
}
int
xoff_cell_set_version(xoff_cell_t *inp, uint8_t val)
{
if (! ((val == 0))) {
TRUNNEL_SET_ERROR_CODE(inp);
return -1;
}
inp->version = val;
return 0;
}
const char *
xoff_cell_check(const xoff_cell_t *obj)
{
if (obj == NULL)
return "Object was NULL";
if (obj->trunnel_error_code_)
return "A set function failed on this object";
if (! (obj->version == 0))
return "Integer out of bounds";
return NULL;
}
ssize_t
xoff_cell_encoded_len(const xoff_cell_t *obj)
{
ssize_t result = 0;
if (NULL != xoff_cell_check(obj))
return -1;
/* Length of u8 version IN [0] */
result += 1;
return result;
}
int
xoff_cell_clear_errors(xoff_cell_t *obj)
{
int r = obj->trunnel_error_code_;
obj->trunnel_error_code_ = 0;
return r;
}
ssize_t
xoff_cell_encode(uint8_t *output, const size_t avail, const xoff_cell_t *obj)
{
ssize_t result = 0;
size_t written = 0;
uint8_t *ptr = output;
const char *msg;
#ifdef TRUNNEL_CHECK_ENCODED_LEN
const ssize_t encoded_len = xoff_cell_encoded_len(obj);
#endif
if (NULL != (msg = xoff_cell_check(obj)))
goto check_failed;
#ifdef TRUNNEL_CHECK_ENCODED_LEN
trunnel_assert(encoded_len >= 0);
#endif
/* Encode u8 version IN [0] */
trunnel_assert(written <= avail);
if (avail - written < 1)
goto truncated;
trunnel_set_uint8(ptr, (obj->version));
written += 1; ptr += 1;
trunnel_assert(ptr == output + written);
#ifdef TRUNNEL_CHECK_ENCODED_LEN
{
trunnel_assert(encoded_len >= 0);
trunnel_assert((size_t)encoded_len == written);
}
#endif
return written;
truncated:
result = -2;
goto fail;
check_failed:
(void)msg;
result = -1;
goto fail;
fail:
trunnel_assert(result < 0);
return result;
}
/** As xoff_cell_parse(), but do not allocate the output object.
*/
static ssize_t
xoff_cell_parse_into(xoff_cell_t *obj, const uint8_t *input, const size_t len_in)
{
const uint8_t *ptr = input;
size_t remaining = len_in;
ssize_t result = 0;
(void)result;
/* Parse u8 version IN [0] */
CHECK_REMAINING(1, truncated);
obj->version = (trunnel_get_uint8(ptr));
remaining -= 1; ptr += 1;
if (! (obj->version == 0))
goto fail;
trunnel_assert(ptr + remaining == input + len_in);
return len_in - remaining;
truncated:
return -2;
fail:
result = -1;
return result;
}
ssize_t
xoff_cell_parse(xoff_cell_t **output, const uint8_t *input, const size_t len_in)
{
ssize_t result;
*output = xoff_cell_new();
if (NULL == *output)
return -1;
result = xoff_cell_parse_into(*output, input, len_in);
if (result < 0) {
xoff_cell_free(*output);
*output = NULL;
}
return result;
}
xon_cell_t *
xon_cell_new(void)
{
xon_cell_t *val = trunnel_calloc(1, sizeof(xon_cell_t));
if (NULL == val)
return NULL;
return val;
}
/** Release all storage held inside 'obj', but do not free 'obj'.
*/
static void
xon_cell_clear(xon_cell_t *obj)
{
(void) obj;
}
void
xon_cell_free(xon_cell_t *obj)
{
if (obj == NULL)
return;
xon_cell_clear(obj);
trunnel_memwipe(obj, sizeof(xon_cell_t));
trunnel_free_(obj);
}
uint8_t
xon_cell_get_version(const xon_cell_t *inp)
{
return inp->version;
}
int
xon_cell_set_version(xon_cell_t *inp, uint8_t val)
{
if (! ((val == 0))) {
TRUNNEL_SET_ERROR_CODE(inp);
return -1;
}
inp->version = val;
return 0;
}
uint32_t
xon_cell_get_kbps_ewma(const xon_cell_t *inp)
{
return inp->kbps_ewma;
}
int
xon_cell_set_kbps_ewma(xon_cell_t *inp, uint32_t val)
{
inp->kbps_ewma = val;
return 0;
}
const char *
xon_cell_check(const xon_cell_t *obj)
{
if (obj == NULL)
return "Object was NULL";
if (obj->trunnel_error_code_)
return "A set function failed on this object";
if (! (obj->version == 0))
return "Integer out of bounds";
return NULL;
}
ssize_t
xon_cell_encoded_len(const xon_cell_t *obj)
{
ssize_t result = 0;
if (NULL != xon_cell_check(obj))
return -1;
/* Length of u8 version IN [0] */
result += 1;
/* Length of u32 kbps_ewma */
result += 4;
return result;
}
int
xon_cell_clear_errors(xon_cell_t *obj)
{
int r = obj->trunnel_error_code_;
obj->trunnel_error_code_ = 0;
return r;
}
ssize_t
xon_cell_encode(uint8_t *output, const size_t avail, const xon_cell_t *obj)
{
ssize_t result = 0;
size_t written = 0;
uint8_t *ptr = output;
const char *msg;
#ifdef TRUNNEL_CHECK_ENCODED_LEN
const ssize_t encoded_len = xon_cell_encoded_len(obj);
#endif
if (NULL != (msg = xon_cell_check(obj)))
goto check_failed;
#ifdef TRUNNEL_CHECK_ENCODED_LEN
trunnel_assert(encoded_len >= 0);
#endif
/* Encode u8 version IN [0] */
trunnel_assert(written <= avail);
if (avail - written < 1)
goto truncated;
trunnel_set_uint8(ptr, (obj->version));
written += 1; ptr += 1;
/* Encode u32 kbps_ewma */
trunnel_assert(written <= avail);
if (avail - written < 4)
goto truncated;
trunnel_set_uint32(ptr, trunnel_htonl(obj->kbps_ewma));
written += 4; ptr += 4;
trunnel_assert(ptr == output + written);
#ifdef TRUNNEL_CHECK_ENCODED_LEN
{
trunnel_assert(encoded_len >= 0);
trunnel_assert((size_t)encoded_len == written);
}
#endif
return written;
truncated:
result = -2;
goto fail;
check_failed:
(void)msg;
result = -1;
goto fail;
fail:
trunnel_assert(result < 0);
return result;
}
/** As xon_cell_parse(), but do not allocate the output object.
*/
static ssize_t
xon_cell_parse_into(xon_cell_t *obj, const uint8_t *input, const size_t len_in)
{
const uint8_t *ptr = input;
size_t remaining = len_in;
ssize_t result = 0;
(void)result;
/* Parse u8 version IN [0] */
CHECK_REMAINING(1, truncated);
obj->version = (trunnel_get_uint8(ptr));
remaining -= 1; ptr += 1;
if (! (obj->version == 0))
goto fail;
/* Parse u32 kbps_ewma */
CHECK_REMAINING(4, truncated);
obj->kbps_ewma = trunnel_ntohl(trunnel_get_uint32(ptr));
remaining -= 4; ptr += 4;
trunnel_assert(ptr + remaining == input + len_in);
return len_in - remaining;
truncated:
return -2;
fail:
result = -1;
return result;
}
ssize_t
xon_cell_parse(xon_cell_t **output, const uint8_t *input, const size_t len_in)
{
ssize_t result;
*output = xon_cell_new();
if (NULL == *output)
return -1;
result = xon_cell_parse_into(*output, input, len_in);
if (result < 0) {
xon_cell_free(*output);
*output = NULL;
}
return result;
}

View file

@ -0,0 +1,120 @@
/* flow_control_cells.h -- generated by Trunnel v1.5.3.
* https://gitweb.torproject.org/trunnel.git
* You probably shouldn't edit this file.
*/
#ifndef TRUNNEL_FLOW_CONTROL_CELLS_H
#define TRUNNEL_FLOW_CONTROL_CELLS_H
#include <stdint.h>
#include "trunnel.h"
#if !defined(TRUNNEL_OPAQUE) && !defined(TRUNNEL_OPAQUE_XOFF_CELL)
struct xoff_cell_st {
uint8_t version;
uint8_t trunnel_error_code_;
};
#endif
typedef struct xoff_cell_st xoff_cell_t;
#if !defined(TRUNNEL_OPAQUE) && !defined(TRUNNEL_OPAQUE_XON_CELL)
struct xon_cell_st {
uint8_t version;
uint32_t kbps_ewma;
uint8_t trunnel_error_code_;
};
#endif
typedef struct xon_cell_st xon_cell_t;
/** Return a newly allocated xoff_cell with all elements set to zero.
*/
xoff_cell_t *xoff_cell_new(void);
/** Release all storage held by the xoff_cell in 'victim'. (Do nothing
* if 'victim' is NULL.)
*/
void xoff_cell_free(xoff_cell_t *victim);
/** Try to parse a xoff_cell from the buffer in 'input', using up to
* 'len_in' bytes from the input buffer. On success, return the number
* of bytes consumed and set *output to the newly allocated
* xoff_cell_t. On failure, return -2 if the input appears truncated,
* and -1 if the input is otherwise invalid.
*/
ssize_t xoff_cell_parse(xoff_cell_t **output, const uint8_t *input, const size_t len_in);
/** Return the number of bytes we expect to need to encode the
* xoff_cell in 'obj'. On failure, return a negative value. Note that
* this value may be an overestimate, and can even be an underestimate
* for certain unencodeable objects.
*/
ssize_t xoff_cell_encoded_len(const xoff_cell_t *obj);
/** Try to encode the xoff_cell from 'input' into the buffer at
* 'output', using up to 'avail' bytes of the output buffer. On
* success, return the number of bytes used. On failure, return -2 if
* the buffer was not long enough, and -1 if the input was invalid.
*/
ssize_t xoff_cell_encode(uint8_t *output, size_t avail, const xoff_cell_t *input);
/** Check whether the internal state of the xoff_cell in 'obj' is
* consistent. Return NULL if it is, and a short message if it is not.
*/
const char *xoff_cell_check(const xoff_cell_t *obj);
/** Clear any errors that were set on the object 'obj' by its setter
* functions. Return true iff errors were cleared.
*/
int xoff_cell_clear_errors(xoff_cell_t *obj);
/** Return the value of the version field of the xoff_cell_t in 'inp'
*/
uint8_t xoff_cell_get_version(const xoff_cell_t *inp);
/** Set the value of the version field of the xoff_cell_t in 'inp' to
* 'val'. Return 0 on success; return -1 and set the error code on
* 'inp' on failure.
*/
int xoff_cell_set_version(xoff_cell_t *inp, uint8_t val);
/** Return a newly allocated xon_cell with all elements set to zero.
*/
xon_cell_t *xon_cell_new(void);
/** Release all storage held by the xon_cell in 'victim'. (Do nothing
* if 'victim' is NULL.)
*/
void xon_cell_free(xon_cell_t *victim);
/** Try to parse a xon_cell from the buffer in 'input', using up to
* 'len_in' bytes from the input buffer. On success, return the number
* of bytes consumed and set *output to the newly allocated
* xon_cell_t. On failure, return -2 if the input appears truncated,
* and -1 if the input is otherwise invalid.
*/
ssize_t xon_cell_parse(xon_cell_t **output, const uint8_t *input, const size_t len_in);
/** Return the number of bytes we expect to need to encode the
* xon_cell in 'obj'. On failure, return a negative value. Note that
* this value may be an overestimate, and can even be an underestimate
* for certain unencodeable objects.
*/
ssize_t xon_cell_encoded_len(const xon_cell_t *obj);
/** Try to encode the xon_cell from 'input' into the buffer at
* 'output', using up to 'avail' bytes of the output buffer. On
* success, return the number of bytes used. On failure, return -2 if
* the buffer was not long enough, and -1 if the input was invalid.
*/
ssize_t xon_cell_encode(uint8_t *output, size_t avail, const xon_cell_t *input);
/** Check whether the internal state of the xon_cell in 'obj' is
* consistent. Return NULL if it is, and a short message if it is not.
*/
const char *xon_cell_check(const xon_cell_t *obj);
/** Clear any errors that were set on the object 'obj' by its setter
* functions. Return true iff errors were cleared.
*/
int xon_cell_clear_errors(xon_cell_t *obj);
/** Return the value of the version field of the xon_cell_t in 'inp'
*/
uint8_t xon_cell_get_version(const xon_cell_t *inp);
/** Set the value of the version field of the xon_cell_t in 'inp' to
* 'val'. Return 0 on success; return -1 and set the error code on
* 'inp' on failure.
*/
int xon_cell_set_version(xon_cell_t *inp, uint8_t val);
/** Return the value of the kbps_ewma field of the xon_cell_t in 'inp'
*/
uint32_t xon_cell_get_kbps_ewma(const xon_cell_t *inp);
/** Set the value of the kbps_ewma field of the xon_cell_t in 'inp' to
* 'val'. Return 0 on success; return -1 and set the error code on
* 'inp' on failure.
*/
int xon_cell_set_kbps_ewma(xon_cell_t *inp, uint32_t val);
#endif

View file

@ -0,0 +1,20 @@
/* This file contains the xon and xoff cell definitions, for flow control. */
/* xoff cell definition. Tells the other endpoint to stop sending, because
* we have too much data queued for this stream. */
struct xoff_cell {
/* Version field. */
u8 version IN [0x00];
}
/* xon cell declaration. Tells the other endpoint to resume sending and/or
* update its sending rate on this stream based on advisory information. */
struct xon_cell {
/* Version field. */
u8 version IN [0x00];
/* Advisory field: The ewma rate of socket drain we have seen so far
* on this stream, in kilobytes/sec (1000 bytes/sec). May be zero,
* which means no rate advice. */
u32 kbps_ewma;
}

View file

@ -12,6 +12,7 @@ TRUNNELINPUTS = \
src/trunnel/pwbox.trunnel \
src/trunnel/channelpadding_negotiation.trunnel \
src/trunnel/sendme_cell.trunnel \
src/trunnel/flow_control_cells.trunnel \
src/trunnel/socks5.trunnel \
src/trunnel/circpad_negotiation.trunnel
@ -26,6 +27,7 @@ TRUNNELSOURCES = \
src/trunnel/hs/cell_rendezvous.c \
src/trunnel/channelpadding_negotiation.c \
src/trunnel/sendme_cell.c \
src/trunnel/flow_control_cells.c \
src/trunnel/socks5.c \
src/trunnel/netinfo.c \
src/trunnel/circpad_negotiation.c
@ -43,6 +45,7 @@ TRUNNELHEADERS = \
src/trunnel/hs/cell_rendezvous.h \
src/trunnel/channelpadding_negotiation.h \
src/trunnel/sendme_cell.h \
src/trunnel/flow_control_cells.h \
src/trunnel/socks5.h \
src/trunnel/netinfo.h \
src/trunnel/circpad_negotiation.h