From 6868f30cedf783bc6ba883206899ae7d59473c0c Mon Sep 17 00:00:00 2001 From: Mike Perry Date: Fri, 6 Aug 2021 18:47:05 +0000 Subject: [PATCH 01/18] Add trunnel definitions for xon/xoff cells. --- src/trunnel/flow_control_cells.c | 382 +++++++++++++++++++++++++ src/trunnel/flow_control_cells.h | 120 ++++++++ src/trunnel/flow_control_cells.trunnel | 20 ++ src/trunnel/include.am | 3 + 4 files changed, 525 insertions(+) create mode 100644 src/trunnel/flow_control_cells.c create mode 100644 src/trunnel/flow_control_cells.h create mode 100644 src/trunnel/flow_control_cells.trunnel diff --git a/src/trunnel/flow_control_cells.c b/src/trunnel/flow_control_cells.c new file mode 100644 index 0000000000..df44756d6b --- /dev/null +++ b/src/trunnel/flow_control_cells.c @@ -0,0 +1,382 @@ +/* flow_control_cells.c -- generated by Trunnel v1.5.3. + * https://gitweb.torproject.org/trunnel.git + * You probably shouldn't edit this file. + */ +#include +#include "trunnel-impl.h" + +#include "flow_control_cells.h" + +#define TRUNNEL_SET_ERROR_CODE(obj) \ + do { \ + (obj)->trunnel_error_code_ = 1; \ + } while (0) + +#if defined(__COVERITY__) || defined(__clang_analyzer__) +/* If we're running a static analysis tool, we don't want it to complain + * that some of our remaining-bytes checks are dead-code. */ +int flowcontrolcells_deadcode_dummy__ = 0; +#define OR_DEADCODE_DUMMY || flowcontrolcells_deadcode_dummy__ +#else +#define OR_DEADCODE_DUMMY +#endif + +#define CHECK_REMAINING(nbytes, label) \ + do { \ + if (remaining < (nbytes) OR_DEADCODE_DUMMY) { \ + goto label; \ + } \ + } while (0) + +xoff_cell_t * +xoff_cell_new(void) +{ + xoff_cell_t *val = trunnel_calloc(1, sizeof(xoff_cell_t)); + if (NULL == val) + return NULL; + return val; +} + +/** Release all storage held inside 'obj', but do not free 'obj'. + */ +static void +xoff_cell_clear(xoff_cell_t *obj) +{ + (void) obj; +} + +void +xoff_cell_free(xoff_cell_t *obj) +{ + if (obj == NULL) + return; + xoff_cell_clear(obj); + trunnel_memwipe(obj, sizeof(xoff_cell_t)); + trunnel_free_(obj); +} + +uint8_t +xoff_cell_get_version(const xoff_cell_t *inp) +{ + return inp->version; +} +int +xoff_cell_set_version(xoff_cell_t *inp, uint8_t val) +{ + if (! ((val == 0))) { + TRUNNEL_SET_ERROR_CODE(inp); + return -1; + } + inp->version = val; + return 0; +} +const char * +xoff_cell_check(const xoff_cell_t *obj) +{ + if (obj == NULL) + return "Object was NULL"; + if (obj->trunnel_error_code_) + return "A set function failed on this object"; + if (! (obj->version == 0)) + return "Integer out of bounds"; + return NULL; +} + +ssize_t +xoff_cell_encoded_len(const xoff_cell_t *obj) +{ + ssize_t result = 0; + + if (NULL != xoff_cell_check(obj)) + return -1; + + + /* Length of u8 version IN [0] */ + result += 1; + return result; +} +int +xoff_cell_clear_errors(xoff_cell_t *obj) +{ + int r = obj->trunnel_error_code_; + obj->trunnel_error_code_ = 0; + return r; +} +ssize_t +xoff_cell_encode(uint8_t *output, const size_t avail, const xoff_cell_t *obj) +{ + ssize_t result = 0; + size_t written = 0; + uint8_t *ptr = output; + const char *msg; +#ifdef TRUNNEL_CHECK_ENCODED_LEN + const ssize_t encoded_len = xoff_cell_encoded_len(obj); +#endif + + if (NULL != (msg = xoff_cell_check(obj))) + goto check_failed; + +#ifdef TRUNNEL_CHECK_ENCODED_LEN + trunnel_assert(encoded_len >= 0); +#endif + + /* Encode u8 version IN [0] */ + trunnel_assert(written <= avail); + if (avail - written < 1) + goto truncated; + trunnel_set_uint8(ptr, (obj->version)); + written += 1; ptr += 1; + + + trunnel_assert(ptr == output + written); +#ifdef TRUNNEL_CHECK_ENCODED_LEN + { + trunnel_assert(encoded_len >= 0); + trunnel_assert((size_t)encoded_len == written); + } + +#endif + + return written; + + truncated: + result = -2; + goto fail; + check_failed: + (void)msg; + result = -1; + goto fail; + fail: + trunnel_assert(result < 0); + return result; +} + +/** As xoff_cell_parse(), but do not allocate the output object. + */ +static ssize_t +xoff_cell_parse_into(xoff_cell_t *obj, const uint8_t *input, const size_t len_in) +{ + const uint8_t *ptr = input; + size_t remaining = len_in; + ssize_t result = 0; + (void)result; + + /* Parse u8 version IN [0] */ + CHECK_REMAINING(1, truncated); + obj->version = (trunnel_get_uint8(ptr)); + remaining -= 1; ptr += 1; + if (! (obj->version == 0)) + goto fail; + trunnel_assert(ptr + remaining == input + len_in); + return len_in - remaining; + + truncated: + return -2; + fail: + result = -1; + return result; +} + +ssize_t +xoff_cell_parse(xoff_cell_t **output, const uint8_t *input, const size_t len_in) +{ + ssize_t result; + *output = xoff_cell_new(); + if (NULL == *output) + return -1; + result = xoff_cell_parse_into(*output, input, len_in); + if (result < 0) { + xoff_cell_free(*output); + *output = NULL; + } + return result; +} +xon_cell_t * +xon_cell_new(void) +{ + xon_cell_t *val = trunnel_calloc(1, sizeof(xon_cell_t)); + if (NULL == val) + return NULL; + return val; +} + +/** Release all storage held inside 'obj', but do not free 'obj'. + */ +static void +xon_cell_clear(xon_cell_t *obj) +{ + (void) obj; +} + +void +xon_cell_free(xon_cell_t *obj) +{ + if (obj == NULL) + return; + xon_cell_clear(obj); + trunnel_memwipe(obj, sizeof(xon_cell_t)); + trunnel_free_(obj); +} + +uint8_t +xon_cell_get_version(const xon_cell_t *inp) +{ + return inp->version; +} +int +xon_cell_set_version(xon_cell_t *inp, uint8_t val) +{ + if (! ((val == 0))) { + TRUNNEL_SET_ERROR_CODE(inp); + return -1; + } + inp->version = val; + return 0; +} +uint32_t +xon_cell_get_kbps_ewma(const xon_cell_t *inp) +{ + return inp->kbps_ewma; +} +int +xon_cell_set_kbps_ewma(xon_cell_t *inp, uint32_t val) +{ + inp->kbps_ewma = val; + return 0; +} +const char * +xon_cell_check(const xon_cell_t *obj) +{ + if (obj == NULL) + return "Object was NULL"; + if (obj->trunnel_error_code_) + return "A set function failed on this object"; + if (! (obj->version == 0)) + return "Integer out of bounds"; + return NULL; +} + +ssize_t +xon_cell_encoded_len(const xon_cell_t *obj) +{ + ssize_t result = 0; + + if (NULL != xon_cell_check(obj)) + return -1; + + + /* Length of u8 version IN [0] */ + result += 1; + + /* Length of u32 kbps_ewma */ + result += 4; + return result; +} +int +xon_cell_clear_errors(xon_cell_t *obj) +{ + int r = obj->trunnel_error_code_; + obj->trunnel_error_code_ = 0; + return r; +} +ssize_t +xon_cell_encode(uint8_t *output, const size_t avail, const xon_cell_t *obj) +{ + ssize_t result = 0; + size_t written = 0; + uint8_t *ptr = output; + const char *msg; +#ifdef TRUNNEL_CHECK_ENCODED_LEN + const ssize_t encoded_len = xon_cell_encoded_len(obj); +#endif + + if (NULL != (msg = xon_cell_check(obj))) + goto check_failed; + +#ifdef TRUNNEL_CHECK_ENCODED_LEN + trunnel_assert(encoded_len >= 0); +#endif + + /* Encode u8 version IN [0] */ + trunnel_assert(written <= avail); + if (avail - written < 1) + goto truncated; + trunnel_set_uint8(ptr, (obj->version)); + written += 1; ptr += 1; + + /* Encode u32 kbps_ewma */ + trunnel_assert(written <= avail); + if (avail - written < 4) + goto truncated; + trunnel_set_uint32(ptr, trunnel_htonl(obj->kbps_ewma)); + written += 4; ptr += 4; + + + trunnel_assert(ptr == output + written); +#ifdef TRUNNEL_CHECK_ENCODED_LEN + { + trunnel_assert(encoded_len >= 0); + trunnel_assert((size_t)encoded_len == written); + } + +#endif + + return written; + + truncated: + result = -2; + goto fail; + check_failed: + (void)msg; + result = -1; + goto fail; + fail: + trunnel_assert(result < 0); + return result; +} + +/** As xon_cell_parse(), but do not allocate the output object. + */ +static ssize_t +xon_cell_parse_into(xon_cell_t *obj, const uint8_t *input, const size_t len_in) +{ + const uint8_t *ptr = input; + size_t remaining = len_in; + ssize_t result = 0; + (void)result; + + /* Parse u8 version IN [0] */ + CHECK_REMAINING(1, truncated); + obj->version = (trunnel_get_uint8(ptr)); + remaining -= 1; ptr += 1; + if (! (obj->version == 0)) + goto fail; + + /* Parse u32 kbps_ewma */ + CHECK_REMAINING(4, truncated); + obj->kbps_ewma = trunnel_ntohl(trunnel_get_uint32(ptr)); + remaining -= 4; ptr += 4; + trunnel_assert(ptr + remaining == input + len_in); + return len_in - remaining; + + truncated: + return -2; + fail: + result = -1; + return result; +} + +ssize_t +xon_cell_parse(xon_cell_t **output, const uint8_t *input, const size_t len_in) +{ + ssize_t result; + *output = xon_cell_new(); + if (NULL == *output) + return -1; + result = xon_cell_parse_into(*output, input, len_in); + if (result < 0) { + xon_cell_free(*output); + *output = NULL; + } + return result; +} diff --git a/src/trunnel/flow_control_cells.h b/src/trunnel/flow_control_cells.h new file mode 100644 index 0000000000..b8108b9a24 --- /dev/null +++ b/src/trunnel/flow_control_cells.h @@ -0,0 +1,120 @@ +/* flow_control_cells.h -- generated by Trunnel v1.5.3. + * https://gitweb.torproject.org/trunnel.git + * You probably shouldn't edit this file. + */ +#ifndef TRUNNEL_FLOW_CONTROL_CELLS_H +#define TRUNNEL_FLOW_CONTROL_CELLS_H + +#include +#include "trunnel.h" + +#if !defined(TRUNNEL_OPAQUE) && !defined(TRUNNEL_OPAQUE_XOFF_CELL) +struct xoff_cell_st { + uint8_t version; + uint8_t trunnel_error_code_; +}; +#endif +typedef struct xoff_cell_st xoff_cell_t; +#if !defined(TRUNNEL_OPAQUE) && !defined(TRUNNEL_OPAQUE_XON_CELL) +struct xon_cell_st { + uint8_t version; + uint32_t kbps_ewma; + uint8_t trunnel_error_code_; +}; +#endif +typedef struct xon_cell_st xon_cell_t; +/** Return a newly allocated xoff_cell with all elements set to zero. + */ +xoff_cell_t *xoff_cell_new(void); +/** Release all storage held by the xoff_cell in 'victim'. (Do nothing + * if 'victim' is NULL.) + */ +void xoff_cell_free(xoff_cell_t *victim); +/** Try to parse a xoff_cell from the buffer in 'input', using up to + * 'len_in' bytes from the input buffer. On success, return the number + * of bytes consumed and set *output to the newly allocated + * xoff_cell_t. On failure, return -2 if the input appears truncated, + * and -1 if the input is otherwise invalid. + */ +ssize_t xoff_cell_parse(xoff_cell_t **output, const uint8_t *input, const size_t len_in); +/** Return the number of bytes we expect to need to encode the + * xoff_cell in 'obj'. On failure, return a negative value. Note that + * this value may be an overestimate, and can even be an underestimate + * for certain unencodeable objects. + */ +ssize_t xoff_cell_encoded_len(const xoff_cell_t *obj); +/** Try to encode the xoff_cell from 'input' into the buffer at + * 'output', using up to 'avail' bytes of the output buffer. On + * success, return the number of bytes used. On failure, return -2 if + * the buffer was not long enough, and -1 if the input was invalid. + */ +ssize_t xoff_cell_encode(uint8_t *output, size_t avail, const xoff_cell_t *input); +/** Check whether the internal state of the xoff_cell in 'obj' is + * consistent. Return NULL if it is, and a short message if it is not. + */ +const char *xoff_cell_check(const xoff_cell_t *obj); +/** Clear any errors that were set on the object 'obj' by its setter + * functions. Return true iff errors were cleared. + */ +int xoff_cell_clear_errors(xoff_cell_t *obj); +/** Return the value of the version field of the xoff_cell_t in 'inp' + */ +uint8_t xoff_cell_get_version(const xoff_cell_t *inp); +/** Set the value of the version field of the xoff_cell_t in 'inp' to + * 'val'. Return 0 on success; return -1 and set the error code on + * 'inp' on failure. + */ +int xoff_cell_set_version(xoff_cell_t *inp, uint8_t val); +/** Return a newly allocated xon_cell with all elements set to zero. + */ +xon_cell_t *xon_cell_new(void); +/** Release all storage held by the xon_cell in 'victim'. (Do nothing + * if 'victim' is NULL.) + */ +void xon_cell_free(xon_cell_t *victim); +/** Try to parse a xon_cell from the buffer in 'input', using up to + * 'len_in' bytes from the input buffer. On success, return the number + * of bytes consumed and set *output to the newly allocated + * xon_cell_t. On failure, return -2 if the input appears truncated, + * and -1 if the input is otherwise invalid. + */ +ssize_t xon_cell_parse(xon_cell_t **output, const uint8_t *input, const size_t len_in); +/** Return the number of bytes we expect to need to encode the + * xon_cell in 'obj'. On failure, return a negative value. Note that + * this value may be an overestimate, and can even be an underestimate + * for certain unencodeable objects. + */ +ssize_t xon_cell_encoded_len(const xon_cell_t *obj); +/** Try to encode the xon_cell from 'input' into the buffer at + * 'output', using up to 'avail' bytes of the output buffer. On + * success, return the number of bytes used. On failure, return -2 if + * the buffer was not long enough, and -1 if the input was invalid. + */ +ssize_t xon_cell_encode(uint8_t *output, size_t avail, const xon_cell_t *input); +/** Check whether the internal state of the xon_cell in 'obj' is + * consistent. Return NULL if it is, and a short message if it is not. + */ +const char *xon_cell_check(const xon_cell_t *obj); +/** Clear any errors that were set on the object 'obj' by its setter + * functions. Return true iff errors were cleared. + */ +int xon_cell_clear_errors(xon_cell_t *obj); +/** Return the value of the version field of the xon_cell_t in 'inp' + */ +uint8_t xon_cell_get_version(const xon_cell_t *inp); +/** Set the value of the version field of the xon_cell_t in 'inp' to + * 'val'. Return 0 on success; return -1 and set the error code on + * 'inp' on failure. + */ +int xon_cell_set_version(xon_cell_t *inp, uint8_t val); +/** Return the value of the kbps_ewma field of the xon_cell_t in 'inp' + */ +uint32_t xon_cell_get_kbps_ewma(const xon_cell_t *inp); +/** Set the value of the kbps_ewma field of the xon_cell_t in 'inp' to + * 'val'. Return 0 on success; return -1 and set the error code on + * 'inp' on failure. + */ +int xon_cell_set_kbps_ewma(xon_cell_t *inp, uint32_t val); + + +#endif diff --git a/src/trunnel/flow_control_cells.trunnel b/src/trunnel/flow_control_cells.trunnel new file mode 100644 index 0000000000..9d07b568a9 --- /dev/null +++ b/src/trunnel/flow_control_cells.trunnel @@ -0,0 +1,20 @@ +/* This file contains the xon and xoff cell definitions, for flow control. */ + +/* xoff cell definition. Tells the other endpoint to stop sending, because + * we have too much data queued for this stream. */ +struct xoff_cell { + /* Version field. */ + u8 version IN [0x00]; +} + +/* xon cell declaration. Tells the other endpoint to resume sending and/or + * update its sending rate on this stream based on advisory information. */ +struct xon_cell { + /* Version field. */ + u8 version IN [0x00]; + + /* Advisory field: The ewma rate of socket drain we have seen so far + * on this stream, in kilobytes/sec (1000 bytes/sec). May be zero, + * which means no rate advice. */ + u32 kbps_ewma; +} diff --git a/src/trunnel/include.am b/src/trunnel/include.am index 6c3a5ff06b..00a96536f1 100644 --- a/src/trunnel/include.am +++ b/src/trunnel/include.am @@ -12,6 +12,7 @@ TRUNNELINPUTS = \ src/trunnel/pwbox.trunnel \ src/trunnel/channelpadding_negotiation.trunnel \ src/trunnel/sendme_cell.trunnel \ + src/trunnel/flow_control_cells.trunnel \ src/trunnel/socks5.trunnel \ src/trunnel/circpad_negotiation.trunnel @@ -26,6 +27,7 @@ TRUNNELSOURCES = \ src/trunnel/hs/cell_rendezvous.c \ src/trunnel/channelpadding_negotiation.c \ src/trunnel/sendme_cell.c \ + src/trunnel/flow_control_cells.c \ src/trunnel/socks5.c \ src/trunnel/netinfo.c \ src/trunnel/circpad_negotiation.c @@ -43,6 +45,7 @@ TRUNNELHEADERS = \ src/trunnel/hs/cell_rendezvous.h \ src/trunnel/channelpadding_negotiation.h \ src/trunnel/sendme_cell.h \ + src/trunnel/flow_control_cells.h \ src/trunnel/socks5.h \ src/trunnel/netinfo.h \ src/trunnel/circpad_negotiation.h From 8f9cf1ec4341f23c6a09c4f8194eb0c546cdb187 Mon Sep 17 00:00:00 2001 From: Mike Perry Date: Fri, 20 Aug 2021 15:44:16 +0000 Subject: [PATCH 02/18] Export a global notion of monotime clock stall/jump. Monotime clock functionality is a global property, and flow control also needs to know if it can trust the clock. --- src/core/or/congestion_control_common.c | 54 +++++++++++++++++++------ src/core/or/congestion_control_common.h | 1 + 2 files changed, 43 insertions(+), 12 deletions(-) diff --git a/src/core/or/congestion_control_common.c b/src/core/or/congestion_control_common.c index 9db1d7d664..edb65d2b70 100644 --- a/src/core/or/congestion_control_common.c +++ b/src/core/or/congestion_control_common.c @@ -558,10 +558,16 @@ time_delta_should_use_heuristics(const congestion_control_t *cc) return false; } +static bool is_monotime_clock_broken = false; + /** * Returns true if the monotime delta is 0, or is significantly * different than the previous delta. Either case indicates * that the monotime time source stalled or jumped. + * + * Also caches the clock state in the is_monotime_clock_broken flag, + * so we can also provide a is_monotime_clock_reliable() function, + * used by flow control rate timing. */ static bool time_delta_stalled_or_jumped(const congestion_control_t *cc, @@ -573,22 +579,30 @@ time_delta_stalled_or_jumped(const congestion_control_t *cc, static ratelim_t stall_info_limit = RATELIM_INIT(60); log_fn_ratelim(&stall_info_limit, LOG_INFO, LD_CIRC, "Congestion control cannot measure RTT due to monotime stall."); - return true; + + /* If delta is every 0, the monotime clock has stalled, and we should + * not use it anywhere. */ + is_monotime_clock_broken = true; + + return is_monotime_clock_broken; } - /* If the old_delta is 0, we have no previous values. So - * just assume this one is valid (beause it is non-zero) */ - if (old_delta == 0) - return false; + /* If the old_delta is 0, we have no previous values on this circuit. + * + * So, return the global monotime status from other circuits, and + * do not update. + */ + if (old_delta == 0) { + return is_monotime_clock_broken; + } /* * For the heuristic cases, we need at least a few timestamps, * to average out any previous partial stalls or jumps. So until - * than point, let's just delcare these time values "good enough - * to use". + * than point, let's just use the cached status from other circuits. */ if (!time_delta_should_use_heuristics(cc)) { - return false; + return is_monotime_clock_broken; } /* If old_delta is significantly larger than new_delta, then @@ -601,7 +615,9 @@ time_delta_stalled_or_jumped(const congestion_control_t *cc, "), likely due to clock jump.", new_delta/1000, old_delta/1000); - return true; + is_monotime_clock_broken = true; + + return is_monotime_clock_broken; } /* If new_delta is significantly larger than old_delta, then @@ -613,10 +629,24 @@ time_delta_stalled_or_jumped(const congestion_control_t *cc, "), likely due to clock jump.", new_delta/1000, old_delta/1000); - return true; + is_monotime_clock_broken = true; + + return is_monotime_clock_broken; } - return false; + /* All good! Update cached status, too */ + is_monotime_clock_broken = false; + + return is_monotime_clock_broken; +} + +/** + * Is the monotime clock stalled according to any circuits? + */ +bool +is_monotime_clock_reliable(void) +{ + return !is_monotime_clock_broken; } /** @@ -753,7 +783,7 @@ congestion_control_update_circuit_bdp(congestion_control_t *cc, SMARTLIST_FOREACH(cc->sendme_arrival_timestamps, uint64_t *, t, tor_free(t)); smartlist_clear(cc->sendme_arrival_timestamps); - } else if (curr_rtt_usec) { + } else if (curr_rtt_usec && is_monotime_clock_reliable()) { /* Sendme-based BDP will quickly measure BDP in much less than * a cwnd worth of data when in use (in 2-10 SENDMEs). * diff --git a/src/core/or/congestion_control_common.h b/src/core/or/congestion_control_common.h index 4193d94cba..12da0cb4e0 100644 --- a/src/core/or/congestion_control_common.h +++ b/src/core/or/congestion_control_common.h @@ -39,6 +39,7 @@ int congestion_control_get_package_window(const circuit_t *, int sendme_get_inc_count(const circuit_t *, const crypt_path_t *); bool circuit_sent_cell_for_sendme(const circuit_t *, const crypt_path_t *); +bool is_monotime_clock_reliable(void); /* Private section starts. */ #ifdef TOR_CONGESTION_CONTROL_PRIVATE From 33d8974f4d6b3697f021ece1b9b12a9b8c69e4b5 Mon Sep 17 00:00:00 2001 From: Mike Perry Date: Sat, 21 Aug 2021 00:02:30 +0000 Subject: [PATCH 03/18] Export the n_ewma function for flow control use. --- src/core/or/congestion_control_common.c | 18 ------------------ src/core/or/congestion_control_common.h | 18 ++++++++++++++++++ 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/core/or/congestion_control_common.c b/src/core/or/congestion_control_common.c index edb65d2b70..fa603e8df8 100644 --- a/src/core/or/congestion_control_common.c +++ b/src/core/or/congestion_control_common.c @@ -224,24 +224,6 @@ congestion_control_free_(congestion_control_t *cc) tor_free(cc); } -/** - * Compute an N-count EWMA, aka N-EWMA. N-EWMA is defined as: - * EWMA = alpha*value + (1-alpha)*EWMA_prev - * with alpha = 2/(N+1). - * - * This works out to: - * EWMA = value*2/(N+1) + EMA_prev*(N-1)/(N+1) - * = (value*2 + EWMA_prev*(N-1))/(N+1) - */ -static inline uint64_t -n_count_ewma(uint64_t curr, uint64_t prev, uint64_t N) -{ - if (prev == 0) - return curr; - else - return (2*curr + (N-1)*prev)/(N+1); -} - /** * Enqueue a u64 timestamp to the end of a queue of timestamps. */ diff --git a/src/core/or/congestion_control_common.h b/src/core/or/congestion_control_common.h index 12da0cb4e0..e8b9681ac6 100644 --- a/src/core/or/congestion_control_common.h +++ b/src/core/or/congestion_control_common.h @@ -41,6 +41,24 @@ int sendme_get_inc_count(const circuit_t *, const crypt_path_t *); bool circuit_sent_cell_for_sendme(const circuit_t *, const crypt_path_t *); bool is_monotime_clock_reliable(void); +/** + * Compute an N-count EWMA, aka N-EWMA. N-EWMA is defined as: + * EWMA = alpha*value + (1-alpha)*EWMA_prev + * with alpha = 2/(N+1). + * + * This works out to: + * EWMA = value*2/(N+1) + EMA_prev*(N-1)/(N+1) + * = (value*2 + EWMA_prev*(N-1))/(N+1) + */ +static inline uint64_t +n_count_ewma(uint64_t curr, uint64_t prev, uint64_t N) +{ + if (prev == 0) + return curr; + else + return (2*curr + (N-1)*prev)/(N+1); +} + /* Private section starts. */ #ifdef TOR_CONGESTION_CONTROL_PRIVATE From ca7f5c05a6f7016f9f96a2260c474632c5b5c529 Mon Sep 17 00:00:00 2001 From: Mike Perry Date: Tue, 10 Aug 2021 21:35:22 +0000 Subject: [PATCH 04/18] Prop#324: Add fields to edge connection. --- src/core/or/edge_connection_st.h | 55 ++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/src/core/or/edge_connection_st.h b/src/core/or/edge_connection_st.h index 0120c3df25..dab32fc8d0 100644 --- a/src/core/or/edge_connection_st.h +++ b/src/core/or/edge_connection_st.h @@ -15,6 +15,7 @@ #include "core/or/or.h" #include "core/or/connection_st.h" +#include "lib/evloop/token_bucket.h" /** Subtype of connection_t for an "edge connection" -- that is, an entry (ap) * connection, or an exit. */ @@ -73,6 +74,60 @@ struct edge_connection_t { * that's going away and being used on channels instead. We still tag * edge connections with dirreq_id from circuits, so it's copied here. */ uint64_t dirreq_id; + + /* The following are flow control fields */ + + /** Used for rate limiting the read side of this edge connection when + * congestion control is enabled on its circuit. The XON cell ewma_drain_rate + * parameter is used to set the bucket limits. */ + token_bucket_rw_t bucket; + + /** + * Monotime timestamp of the last time we sent a flow control message + * for this edge, used to compute advisory rates */ + uint64_t drain_start_usec; + + /** + * Number of bytes written since we either emptied our buffers, + * or sent an advisory drate rate. Can wrap, buf if so, + * we must reset the usec timestamp above. (Or make this u64, idk). + */ + uint32_t drained_bytes; + uint32_t prev_drained_bytes; + + /** + * N_EWMA of the drain rate of writes on this edge conn + * while buffers were present. + */ + uint32_t ewma_drain_rate; + + /** + * The ewma drain rate the last time we sent an xon. + */ + uint32_t ewma_rate_last_sent; + + /** + * The following fields are used to count the total bytes sent on this + * stream, and compare them to the number of XON and XOFFs recieved, so + * that clients can check rate limits of XOFF/XON to prevent dropmark + * attacks. */ + uint32_t total_bytes_xmit; + + /** Number of XOFFs received */ + uint8_t num_xoff_recv; + + /** Number of XONs received */ + uint8_t num_xon_recv; + + /** + * Flag that tells us if an XOFF has been sent; cleared when we send an XON. + * Used to avoid sending multiple */ + uint8_t xoff_sent : 1; + + /** Flag that tells us if an XOFF has been received; cleared when we get + * an XON. Used to ensure that this edge keeps reads on its edge socket + * disabled. */ + uint8_t xoff_received : 1; }; #endif /* !defined(EDGE_CONNECTION_ST_H) */ From 819b69244a7d15014c031683a0327e8561cd58e9 Mon Sep 17 00:00:00 2001 From: David Goulet Date: Tue, 28 Sep 2021 22:23:32 +0000 Subject: [PATCH 05/18] Support rate limiting of edge connections reads. We only need to rate limit reading on edges for flow control, as per the rate that comes in the XON from the other side. When we rate limit reading from the edge source to this rate, we will only deliver that fast to the other side, thus satisfying its rate request. Signed-off-by: David Goulet --- src/core/mainloop/connection.c | 50 ++++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/src/core/mainloop/connection.c b/src/core/mainloop/connection.c index 79e034fb34..48bea792ae 100644 --- a/src/core/mainloop/connection.c +++ b/src/core/mainloop/connection.c @@ -117,6 +117,7 @@ #include "lib/cc/ctassert.h" #include "lib/sandbox/sandbox.h" #include "lib/net/buffers_net.h" +#include "lib/net/address.h" #include "lib/tls/tortls.h" #include "lib/evloop/compat_libevent.h" #include "lib/compress/compress.h" @@ -612,6 +613,11 @@ entry_connection_new(int type, int socket_family) entry_conn->entry_cfg.ipv4_traffic = 1; else if (socket_family == AF_INET6) entry_conn->entry_cfg.ipv6_traffic = 1; + + /* Initialize the read token bucket to the maximum value which is the same as + * no rate limiting. */ + token_bucket_rw_init(&ENTRY_TO_EDGE_CONN(entry_conn)->bucket, INT32_MAX, + INT32_MAX, monotime_coarse_get_stamp()); return entry_conn; } @@ -623,6 +629,10 @@ edge_connection_new(int type, int socket_family) edge_connection_t *edge_conn = tor_malloc_zero(sizeof(edge_connection_t)); tor_assert(type == CONN_TYPE_EXIT); connection_init(time(NULL), TO_CONN(edge_conn), type, socket_family); + /* Initialize the read token bucket to the maximum value which is the same as + * no rate limiting. */ + token_bucket_rw_init(&edge_conn->bucket, INT32_MAX, INT32_MAX, + monotime_coarse_get_stamp()); return edge_conn; } @@ -3457,6 +3467,19 @@ connection_bucket_read_limit(connection_t *conn, time_t now) base = get_cell_network_size(or_conn->wide_circ_ids); } + /* Edge connection have their own read bucket due to flow control being able + * to set a rate limit for them. However, for exit connections, we still need + * to honor the global bucket as well. */ + if (CONN_IS_EDGE(conn)) { + const edge_connection_t *edge_conn = CONST_TO_EDGE_CONN(conn); + conn_bucket = token_bucket_rw_get_read(&edge_conn->bucket); + if (conn->type == CONN_TYPE_EXIT) { + /* Decide between our limit and the global one. */ + goto end; + } + return conn_bucket; + } + if (!connection_is_rate_limited(conn)) { /* be willing to read on local conns even if our buckets are empty */ return conn_bucket>=0 ? conn_bucket : 1<<14; @@ -3467,6 +3490,7 @@ connection_bucket_read_limit(connection_t *conn, time_t now) global_bucket_val = MIN(global_bucket_val, relayed); } + end: return connection_bucket_get_share(base, priority, global_bucket_val, conn_bucket); } @@ -3644,6 +3668,13 @@ connection_buckets_decrement(connection_t *conn, time_t now, record_num_bytes_transferred_impl(conn, now, num_read, num_written); + /* Edge connection need to decrement the read side of the bucket used by our + * congestion control. */ + if (CONN_IS_EDGE(conn) && num_read > 0) { + edge_connection_t *edge_conn = TO_EDGE_CONN(conn); + token_bucket_rw_dec(&edge_conn->bucket, num_read, 0); + } + if (!connection_is_rate_limited(conn)) return; /* local IPs are free */ @@ -3697,14 +3728,16 @@ connection_write_bw_exhausted(connection_t *conn, bool is_global_bw) void connection_consider_empty_read_buckets(connection_t *conn) { + int is_global = 1; const char *reason; - if (!connection_is_rate_limited(conn)) + if (CONN_IS_EDGE(conn) && + token_bucket_rw_get_read(&TO_EDGE_CONN(conn)->bucket) <= 0) { + reason = "edge connection read bucket exhausted. Pausing."; + is_global = false; + } else if (!connection_is_rate_limited(conn)) { return; /* Always okay. */ - - int is_global = 1; - - if (token_bucket_rw_get_read(&global_bucket) <= 0) { + } else if (token_bucket_rw_get_read(&global_bucket) <= 0) { reason = "global read bucket exhausted. Pausing."; } else if (connection_counts_as_relayed_traffic(conn, approx_time()) && token_bucket_rw_get_read(&global_relayed_bucket) <= 0) { @@ -3714,8 +3747,9 @@ connection_consider_empty_read_buckets(connection_t *conn) token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) <= 0) { reason = "connection read bucket exhausted. Pausing."; is_global = false; - } else + } else { return; /* all good, no need to stop it */ + } LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason)); connection_read_bw_exhausted(conn, is_global); @@ -3819,6 +3853,10 @@ connection_bucket_refill_single(connection_t *conn, uint32_t now_ts) or_connection_t *or_conn = TO_OR_CONN(conn); token_bucket_rw_refill(&or_conn->bucket, now_ts); } + + if (CONN_IS_EDGE(conn)) { + token_bucket_rw_refill(&TO_EDGE_CONN(conn)->bucket, now_ts); + } } /** From 896c16c3b175d24b2247b37008bf61faebb6dee9 Mon Sep 17 00:00:00 2001 From: David Goulet Date: Tue, 28 Sep 2021 22:27:51 +0000 Subject: [PATCH 06/18] Add lttng trace support. Signed-off-by: David Goulet --- src/core/or/include.am | 3 + src/core/or/lttng_cc.inc | 166 ++++++++++++++++++++++++++++++++++ src/core/or/trace_probes_cc.c | 33 +++++++ src/core/or/trace_probes_cc.h | 22 +++++ 4 files changed, 224 insertions(+) create mode 100644 src/core/or/lttng_cc.inc create mode 100644 src/core/or/trace_probes_cc.c create mode 100644 src/core/or/trace_probes_cc.h diff --git a/src/core/or/include.am b/src/core/or/include.am index d142062216..66529b70b2 100644 --- a/src/core/or/include.am +++ b/src/core/or/include.am @@ -82,6 +82,7 @@ noinst_HEADERS += \ src/core/or/entry_port_cfg_st.h \ src/core/or/extend_info_st.h \ src/core/or/listener_connection_st.h \ + src/core/or/lttng_cc.inc \ src/core/or/lttng_circuit.inc \ src/core/or/onion.h \ src/core/or/or.h \ @@ -115,7 +116,9 @@ noinst_HEADERS += \ if USE_TRACING_INSTRUMENTATION_LTTNG LIBTOR_APP_A_SOURCES += \ + src/core/or/trace_probes_cc.c \ src/core/or/trace_probes_circuit.c noinst_HEADERS += \ + src/core/or/trace_probes_cc.h \ src/core/or/trace_probes_circuit.h endif diff --git a/src/core/or/lttng_cc.inc b/src/core/or/lttng_cc.inc new file mode 100644 index 0000000000..b7bf58e196 --- /dev/null +++ b/src/core/or/lttng_cc.inc @@ -0,0 +1,166 @@ +/* Copyright (c) 2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file lttng_cc.inc + * \brief LTTng tracing probe declaration for the congestion control subsystem. + * It is in this .inc file due to the non C standard syntax and the way + * we guard the header with the LTTng specific + * TRACEPOINT_HEADER_MULTI_READ. + **/ + +#include "orconfig.h" + +/* We only build the following if LTTng instrumentation has been enabled. */ +#ifdef USE_TRACING_INSTRUMENTATION_LTTNG + +/* The following defines are LTTng-UST specific. */ +#undef TRACEPOINT_PROVIDER +#define TRACEPOINT_PROVIDER tor_cc + +#undef TRACEPOINT_INCLUDE +#define TRACEPOINT_INCLUDE "./src/core/or/lttng_cc.inc" + +#if !defined(LTTNG_CC_INC) || defined(TRACEPOINT_HEADER_MULTI_READ) +#define LTTNG_CC_INC + +#include + +/* + * Flow Control + */ + +/* Emitted everytime the flow_control_decide_xon() function is called. */ +TRACEPOINT_EVENT(tor_cc, flow_decide_xon, + TP_ARGS(const edge_connection_t *, stream, size_t, n_written), + TP_FIELDS( + ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier) + ctf_integer(size_t, written_bytes, n_written) + ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes) + ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes) + ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent) + ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate) + ctf_integer(size_t, outbuf_len, + connection_get_outbuf_len(TO_CONN(stream))) + ) +) + +/* Emitted when flow control starts measuring the drain rate. */ +TRACEPOINT_EVENT(tor_cc, flow_decide_xon_drain_start, + TP_ARGS(const edge_connection_t *, stream), + TP_FIELDS( + ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier) + ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes) + ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes) + ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent) + ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate) + ctf_integer(size_t, outbuf_len, + connection_get_outbuf_len(TO_CONN(stream))) + ) +) + +/* Emitted when the drain rate is updated. The new_drain_rate value is what was + * just computed. */ +TRACEPOINT_EVENT(tor_cc, flow_decide_xon_drain_update, + TP_ARGS(const edge_connection_t *, stream, uint32_t, drain_rate), + TP_FIELDS( + ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier) + ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes) + ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes) + ctf_integer(uint32_t, new_drain_rate, drain_rate) + ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent) + ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate) + ctf_integer(size_t, outbuf_len, + connection_get_outbuf_len(TO_CONN(stream))) + ) +) + +/* Emitted when an XON cell is sent due to a notice in a drain rate change. */ +TRACEPOINT_EVENT(tor_cc, flow_decide_xon_rate_change, + TP_ARGS(const edge_connection_t *, stream), + TP_FIELDS( + ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier) + ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes) + ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes) + ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent) + ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate) + ctf_integer(size_t, outbuf_len, + connection_get_outbuf_len(TO_CONN(stream))) + ) +) + +/* Emitted when an XON cell is sent because we partially or fully drained the + * edge connection buffer. */ +TRACEPOINT_EVENT(tor_cc, flow_decide_xon_partial_drain, + TP_ARGS(const edge_connection_t *, stream), + TP_FIELDS( + ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier) + ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes) + ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes) + ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent) + ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate) + ctf_integer(size_t, outbuf_len, + connection_get_outbuf_len(TO_CONN(stream))) + ) +) + +/* Emitted when we double the drain rate which is an attempt to see if we can + * speed things up. */ +TRACEPOINT_EVENT(tor_cc, flow_decide_xon_drain_doubled, + TP_ARGS(const edge_connection_t *, stream), + TP_FIELDS( + ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier) + ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes) + ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes) + ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent) + ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate) + ctf_integer(size_t, outbuf_len, + connection_get_outbuf_len(TO_CONN(stream))) + ) +) + +/* XOFF */ + +/* Emitted when we send an XOFF cell. */ +TRACEPOINT_EVENT(tor_cc, flow_decide_xoff_sending, + TP_ARGS(const edge_connection_t *, stream), + TP_FIELDS( + ctf_integer(uint64_t, stream_id, TO_CONN(stream)->global_identifier) + ctf_integer(uint32_t, drained_bytes_current, stream->drained_bytes) + ctf_integer(uint32_t, drained_bytes_previous, stream->prev_drained_bytes) + ctf_integer(uint32_t, ewma_drain_rate_last, stream->ewma_rate_last_sent) + ctf_integer(uint32_t, ewma_drain_rate_current, stream->ewma_drain_rate) + ctf_integer(size_t, outbuf_len, + connection_get_outbuf_len(TO_CONN(stream))) + ) +) + +/* + * Congestion Control + */ + +/* Emitted when the BDP value has been updated. */ +TRACEPOINT_EVENT(tor_cc, bdp_update, + TP_ARGS(const circuit_t *, circ, const congestion_control_t *, cc, + uint64_t, curr_rtt_usec, uint64_t, sendme_rate_bdp), + TP_FIELDS( + ctf_integer(uint64_t, circuit_ptr, circ) + ctf_integer(uint32_t, n_circ_id, circ->n_circ_id) + ctf_integer(uint64_t, min_rtt_usec, cc->min_rtt_usec) + ctf_integer(uint64_t, curr_rtt_usec, curr_rtt_usec) + ctf_integer(uint64_t, ewma_rtt_usec, cc->ewma_rtt_usec) + ctf_integer(uint64_t, max_rtt_usec, cc->max_rtt_usec) + ctf_integer(uint64_t, bdp_inflight_rtt, cc->bdp[BDP_ALG_INFLIGHT_RTT]) + ctf_integer(uint64_t, bdp_cwnd_rtt, cc->bdp[BDP_ALG_CWND_RTT]) + ctf_integer(uint64_t, bdp_sendme_rate, cc->bdp[BDP_ALG_SENDME_RATE]) + ctf_integer(uint64_t, bdp_piecewise, cc->bdp[BDP_ALG_PIECEWISE]) + ctf_integer(uint64_t, sendme_rate_bdp, sendme_rate_bdp) + ) +) + +#endif /* LTTNG_CC_INC || TRACEPOINT_HEADER_MULTI_READ */ + +/* Must be included after the probes declaration. */ +#include + +#endif /* USE_TRACING_INSTRUMENTATION_LTTNG */ diff --git a/src/core/or/trace_probes_cc.c b/src/core/or/trace_probes_cc.c new file mode 100644 index 0000000000..d52646da4f --- /dev/null +++ b/src/core/or/trace_probes_cc.c @@ -0,0 +1,33 @@ +/* Copyright (c) 2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file trace_probes_cc.c + * \brief Tracepoint provider source file for the cc subsystem. Probes + * are generated within this C file for LTTng-UST + **/ + +#include "orconfig.h" + +/* + * Following section is specific to LTTng-UST. + */ +#ifdef USE_TRACING_INSTRUMENTATION_LTTNG + +/* Header files that the probes need. */ +#include "core/or/or.h" +#include "core/or/channel.h" +#include "core/or/circuit_st.h" +#include "core/or/circuitlist.h" +#include "core/or/congestion_control_st.h" +#include "core/or/connection_st.h" +#include "core/or/edge_connection_st.h" +#include "core/or/or_circuit_st.h" +#include "core/or/origin_circuit_st.h" + +#define TRACEPOINT_DEFINE +#define TRACEPOINT_CREATE_PROBES + +#include "core/or/trace_probes_cc.h" + +#endif /* defined(USE_TRACING_INSTRUMENTATION_LTTNG) */ diff --git a/src/core/or/trace_probes_cc.h b/src/core/or/trace_probes_cc.h new file mode 100644 index 0000000000..1f87528723 --- /dev/null +++ b/src/core/or/trace_probes_cc.h @@ -0,0 +1,22 @@ +/* Copyright (c) 2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file trace_probes_cc.c + * \brief The tracing probes for the congestion control subsystem. + * Currently, only LTTng-UST probes are available. + **/ + +#ifndef TOR_TRACE_PROBES_CC_H +#define TOR_TRACE_PROBES_CC_H + +#include "lib/trace/events.h" + +/* We only build the following if LTTng instrumentation has been enabled. */ +#ifdef USE_TRACING_INSTRUMENTATION_LTTNG + +#include "core/or/lttng_cc.inc" + +#endif /* USE_TRACING_INSTRUMENTATION_LTTNG */ + +#endif /* !defined(TOR_TRACE_PROBES_CC_H) */ From 0b376a9e82539621accd54e77f6c09493ed2fa33 Mon Sep 17 00:00:00 2001 From: David Goulet Date: Tue, 28 Sep 2021 22:37:25 +0000 Subject: [PATCH 07/18] trace: Add congestion control BDP update tracepoints Signed-off-by: David Goulet --- src/core/or/congestion_control_common.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/core/or/congestion_control_common.c b/src/core/or/congestion_control_common.c index fa603e8df8..51691c106d 100644 --- a/src/core/or/congestion_control_common.c +++ b/src/core/or/congestion_control_common.c @@ -22,6 +22,7 @@ #include "core/or/congestion_control_nola.h" #include "core/or/congestion_control_westwood.h" #include "core/or/congestion_control_st.h" +#include "core/or/trace_probes_cc.h" #include "lib/time/compat_time.h" #include "feature/nodelist/networkstatus.h" @@ -915,7 +916,12 @@ congestion_control_update_circuit_bdp(congestion_control_t *cc, /* We updated BDP this round if either we had a blocked channel, or * the curr_rtt_usec was not 0. */ - return (blocked_on_chan || curr_rtt_usec != 0); + bool ret = (blocked_on_chan || curr_rtt_usec != 0); + if (ret) { + tor_trace(TR_SUBSYS(cc), TR_EV(bdp_update), circ, cc, curr_rtt_usec, + sendme_rate_bdp); + } + return ret; } /** From a89a71cd7b4658eba8465f31b5e1bc21e3325a53 Mon Sep 17 00:00:00 2001 From: Mike Perry Date: Tue, 28 Sep 2021 22:28:26 +0000 Subject: [PATCH 08/18] Prop#324: Stream flow control functions --- src/core/or/congestion_control_flow.c | 697 ++++++++++++++++++++++++++ src/core/or/congestion_control_flow.h | 48 ++ 2 files changed, 745 insertions(+) create mode 100644 src/core/or/congestion_control_flow.c create mode 100644 src/core/or/congestion_control_flow.h diff --git a/src/core/or/congestion_control_flow.c b/src/core/or/congestion_control_flow.c new file mode 100644 index 0000000000..6742bb38bb --- /dev/null +++ b/src/core/or/congestion_control_flow.c @@ -0,0 +1,697 @@ +/* Copyright (c) 2019-2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file congestion_control_flow.c + * \brief Code that implements flow control for congestion controlled + * circuits. + */ + +#define TOR_CONGESTION_CONTROL_FLOW_PRIVATE + +#include "core/or/or.h" + +#include "core/or/relay.h" +#include "core/mainloop/connection.h" +#include "core/or/connection_edge.h" +#include "core/mainloop/mainloop.h" +#include "core/or/congestion_control_common.h" +#include "core/or/congestion_control_flow.h" +#include "core/or/congestion_control_st.h" +#include "core/or/circuitlist.h" +#include "core/or/trace_probes_cc.h" +#include "feature/nodelist/networkstatus.h" +#include "trunnel/flow_control_cells.h" + +#include "core/or/connection_st.h" +#include "core/or/cell_st.h" +#include "app/config/config.h" + +/** Cache consensus parameters */ +static uint32_t xoff_client; +static uint32_t xoff_exit; + +static uint32_t xon_change_pct; +static uint32_t xon_rate_bytes; +static uint32_t xon_ewma_cnt; + +/** In normal operation, we can get a burst of up to 32 cells before + * returning to libevent to flush the outbuf. This is a heuristic from + * hardcoded values and strange logic in connection_bucket_get_share(). */ +#define MAX_EXPECTED_CELL_BURST 32 + +/** + * The following three are for dropmark rate limiting. They define when + * we scale down our XON, XOFF, and xmit byte counts. Early scaling + * is beneficial because it limits the ability of spurious XON/XOFF + * to be sent after large amounts of data without XON/XOFF. At these + * limits, after 10MB of data (or more), an adversary can only inject + * (log2(10MB)-log2(200*500))*100 ~= 1000 cells of fake XOFF/XON before + * the xmit byte * count will be halved enough to triggering a limit. */ +#define XON_COUNT_SCALE_AT 200 +#define XOFF_COUNT_SCALE_AT 200 +#define ONE_MEGABYTE (UINT64_C(1) << 20) +#define TOTAL_XMIT_SCALE_AT 10*ONE_MEGABYTE + +static const congestion_control_t * +edge_get_ccontrol(const edge_connection_t *edge) +{ + if (edge->cpath_layer) + return edge->cpath_layer->ccontrol; + else if (edge->on_circuit) + return edge->on_circuit->ccontrol; + else + return NULL; +} + +void +flow_control_new_consensus_params(const networkstatus_t *ns) +{ +#define CC_XOFF_CLIENT_DFLT 500 +#define CC_XOFF_CLIENT_MIN 1 +#define CC_XOFF_CLIENT_MAX 10000 + xoff_client = networkstatus_get_param(ns, "cc_xoff_client", + CC_XOFF_CLIENT_DFLT, + CC_XOFF_CLIENT_MIN, + CC_XOFF_CLIENT_MAX)*RELAY_PAYLOAD_SIZE; + +#define CC_XOFF_EXIT_DFLT 500 +#define CC_XOFF_EXIT_MIN 1 +#define CC_XOFF_EXIT_MAX 10000 + xoff_exit = networkstatus_get_param(ns, "cc_xoff_exit", + CC_XOFF_EXIT_DFLT, + CC_XOFF_EXIT_MIN, + CC_XOFF_EXIT_MAX)*RELAY_PAYLOAD_SIZE; + +#define CC_XON_CHANGE_PCT_DFLT 25 +#define CC_XON_CHANGE_PCT_MIN 1 +#define CC_XON_CHANGE_PCT_MAX 99 + xon_change_pct = networkstatus_get_param(ns, "cc_xon_change_pct", + CC_XON_CHANGE_PCT_DFLT, + CC_XON_CHANGE_PCT_MIN, + CC_XON_CHANGE_PCT_MAX); + +#define CC_XON_RATE_BYTES_DFLT (500) +#define CC_XON_RATE_BYTES_MIN (1) +#define CC_XON_RATE_BYTES_MAX (5000) + xon_rate_bytes = networkstatus_get_param(ns, "cc_xon_rate", + CC_XON_RATE_BYTES_DFLT, + CC_XON_RATE_BYTES_MIN, + CC_XON_RATE_BYTES_MAX)*RELAY_PAYLOAD_SIZE; + +#define CC_XON_EWMA_CNT_DFLT (2) +#define CC_XON_EWMA_CNT_MIN (1) +#define CC_XON_EWMA_CNT_MAX (100) + xon_ewma_cnt = networkstatus_get_param(ns, "cc_xon_ewma_cnt", + CC_XON_EWMA_CNT_DFLT, + CC_XON_EWMA_CNT_MIN, + CC_XON_EWMA_CNT_MAX); +} + +/** + * Send an XOFF for this stream, and note that we sent one + */ +static void +circuit_send_stream_xoff(edge_connection_t *stream) +{ + xoff_cell_t xoff; + uint8_t payload[CELL_PAYLOAD_SIZE]; + ssize_t xoff_size; + + memset(&xoff, 0, sizeof(xoff)); + memset(payload, 0, sizeof(payload)); + + xoff_cell_set_version(&xoff, 0); + + if ((xoff_size = xoff_cell_encode(payload, CELL_PAYLOAD_SIZE, &xoff)) < 0) { + log_warn(LD_BUG, "Failed to encode xon cell"); + return; + } + + if (connection_edge_send_command(stream, RELAY_COMMAND_XOFF, + (char*)payload, (size_t)xoff_size) == 0) { + stream->xoff_sent = true; + } +} + +/** + * Compute the recent drain rate (write rate) for this edge + * connection and return it, in KB/sec (1000 bytes/sec). + * + * Returns 0 if the monotime clock is busted. + */ +static inline uint32_t +compute_drain_rate(const edge_connection_t *stream) +{ + if (BUG(!is_monotime_clock_reliable())) { + log_warn(LD_BUG, "Computing drain rate with stalled monotime clock"); + return 0; + } + + uint64_t delta = monotime_absolute_usec() - stream->drain_start_usec; + + if (delta == 0) { + log_warn(LD_BUG, "Computing stream drain rate with zero time delta"); + return 0; + } + + /* Overflow checks */ + if (stream->prev_drained_bytes > INT32_MAX/1000 || /* Intermediate */ + stream->prev_drained_bytes/delta > INT32_MAX/1000) { /* full value */ + return INT32_MAX; + } + + /* kb/sec = bytes/usec * 1000 usec/msec * 1000 msec/sec * kb/1000bytes */ + return MAX(1, (uint32_t)(stream->prev_drained_bytes * 1000)/delta); +} + +/** + * Send an XON for this stream, with appropriate advisory rate information. + * + * Reverts the xoff sent status, and stores the rate information we sent, + * in case it changes. + */ +static void +circuit_send_stream_xon(edge_connection_t *stream) +{ + xon_cell_t xon; + uint8_t payload[CELL_PAYLOAD_SIZE]; + ssize_t xon_size; + + memset(&xon, 0, sizeof(xon)); + memset(payload, 0, sizeof(payload)); + + xon_cell_set_version(&xon, 0); + xon_cell_set_kbps_ewma(&xon, stream->ewma_drain_rate); + + if ((xon_size = xon_cell_encode(payload, CELL_PAYLOAD_SIZE, &xon)) < 0) { + log_warn(LD_BUG, "Failed to encode xon cell"); + return; + } + + /* Store the advisory rate information, to send advisory updates if + * it changes */ + stream->ewma_rate_last_sent = stream->ewma_drain_rate; + + if (connection_edge_send_command(stream, RELAY_COMMAND_XON, (char*)payload, + (size_t)xon_size) == 0) { + /* Revert the xoff sent status, so we can send another one if need be */ + stream->xoff_sent = false; + } +} + +/** + * Process a stream XOFF, parsing it, and then stopping reading on + * the edge connection. + * + * Record that we have recieved an xoff, so we know not to resume + * reading on this edge conn until we get an XON. + * + * Returns false if the XOFF did not validate; true if it does. + */ +bool +circuit_process_stream_xoff(edge_connection_t *conn, + const crypt_path_t *layer_hint, + const cell_t *cell) +{ + (void)cell; + bool retval = true; + + if (BUG(!conn)) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got XOFF on invalid stream?"); + return false; + } + + /* Make sure this XOFF came from the right hop */ + if (layer_hint && layer_hint != conn->cpath_layer) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got XOFF from wrong hop."); + return false; + } + + if (edge_get_ccontrol(conn) == NULL) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got XOFF for non-congestion control circuit"); + return false; + } + + if (conn->xoff_received) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got multiple XOFF on connection"); + return false; + } + + /* If we are near the max, scale everything down */ + if (conn->num_xoff_recv == XOFF_COUNT_SCALE_AT) { + log_info(LD_EDGE, "Scaling down for XOFF count: %d %d %d", + conn->total_bytes_xmit, + conn->num_xoff_recv, + conn->num_xon_recv); + conn->total_bytes_xmit /= 2; + conn->num_xoff_recv /= 2; + conn->num_xon_recv /= 2; + } + + conn->num_xoff_recv++; + + /* Client-side check to make sure that XOFF is not sent too early, + * for dropmark attacks. The main sidechannel risk is early cells, + * but we also check to make sure that we have not received more XOFFs + * than could have been generated by the bytes we sent. + */ + if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) { + uint32_t limit = 0; + + /* TODO: This limit technically needs to come from negotiation, + * and be bounds checked for sanity, because the other endpoint + * may have a different consensus */ + if (conn->hs_ident) + limit = xoff_client; + else + limit = xoff_exit; + + if (conn->total_bytes_xmit < limit*conn->num_xoff_recv) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got extra XOFF for bytes sent. Got %d, expected max %d", + conn->num_xoff_recv, conn->total_bytes_xmit/limit); + /* We still process this, because the only dropmark defenses + * in C tor are via the vanguards addon's use of the read valid + * cells. So just signal that we think this is not valid protocol + * data and proceed. */ + retval = false; + } + } + + // TODO: Count how many xoffs we have; log if "too many", for shadow + // analysis of chatter. Possibly add to extra-info? + + log_info(LD_EDGE, "Got XOFF!"); + connection_stop_reading(TO_CONN(conn)); + conn->xoff_received = true; + + return retval; +} + +/** + * Process a stream XON, and if it validates, clear the xoff + * flag and resume reading on this edge connection. + * + * Also, use provided rate information to rate limit + * reading on this edge (or packagaing from it onto + * the circuit), to avoid XON/XOFF chatter. + * + * Returns true if the XON validates, false otherwise. + */ +bool +circuit_process_stream_xon(edge_connection_t *conn, + const crypt_path_t *layer_hint, + const cell_t *cell) +{ + xon_cell_t *xon; + bool retval = true; + + if (BUG(!conn)) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got XON on invalid stream?"); + return false; + } + + /* Make sure this XON came from the right hop */ + if (layer_hint && layer_hint != conn->cpath_layer) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got XON from wrong hop."); + return false; + } + + if (edge_get_ccontrol(conn) == NULL) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got XON for non-congestion control circuit"); + return false; + } + + if (xon_cell_parse(&xon, cell->payload+RELAY_HEADER_SIZE, + CELL_PAYLOAD_SIZE-RELAY_HEADER_SIZE) < 0) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Received malformed XON cell."); + return false; + } + + /* If we are near the max, scale everything down */ + if (conn->num_xon_recv == XON_COUNT_SCALE_AT) { + log_info(LD_EDGE, "Scaling down for XON count: %d %d %d", + conn->total_bytes_xmit, + conn->num_xoff_recv, + conn->num_xon_recv); + conn->total_bytes_xmit /= 2; + conn->num_xoff_recv /= 2; + conn->num_xon_recv /= 2; + } + + conn->num_xon_recv++; + + /* Client-side check to make sure that XON is not sent too early, + * for dropmark attacks. The main sidechannel risk is early cells, + * but we also check to see that we did not get more XONs than make + * sense for the number of bytes we sent. + */ + if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) { + uint32_t limit = 0; + + /* TODO: This limit technically needs to come from negotiation, + * and be bounds checked for sanity, because the other endpoint + * may have a different consensus */ + if (conn->hs_ident) + limit = MIN(xoff_client, xon_rate_bytes); + else + limit = MIN(xoff_exit, xon_rate_bytes); + + if (conn->total_bytes_xmit < limit*conn->num_xon_recv) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Got extra XON for bytes sent. Got %d, expected max %d", + conn->num_xon_recv, conn->total_bytes_xmit/limit); + + /* We still process this, because the only dropmark defenses + * in C tor are via the vanguards addon's use of the read valid + * cells. So just signal that we think this is not valid protocol + * data and proceed. */ + retval = false; + } + } + + log_info(LD_EDGE, "Got XON: %d", xon->kbps_ewma); + + /* Adjust the token bucket of this edge connection with the drain rate in + * the XON. Rate is in bytes from kilobit (kpbs). */ + uint64_t rate = xon_cell_get_kbps_ewma(xon) * 1000; + if (rate == 0 || INT32_MAX < rate) { + /* No rate. */ + rate = INT32_MAX; + } + token_bucket_rw_adjust(&conn->bucket, (uint32_t) rate, (uint32_t) rate); + + if (conn->xoff_received) { + /* Clear the fact that we got an XOFF, so that this edge can + * start and stop reading normally */ + conn->xoff_received = false; + connection_start_reading(TO_CONN(conn)); + } + + xon_cell_free(xon); + + return retval; +} + +/** + * Called from sendme_stream_data_received(), when data arrives + * from a circuit to our edge's outbuf, to decide if we need to send + * an XOFF. + * + * Returns the amount of cells remaining until the buffer is full, at + * which point it sends an XOFF, and returns 0. + * + * Returns less than 0 if we have queued more than a congestion window + * worth of data and need to close the circuit. + */ +int +flow_control_decide_xoff(edge_connection_t *stream) +{ + size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream)); + uint32_t buffer_limit_xoff = 0; + + if (BUG(edge_get_ccontrol(stream) == NULL)) { + log_err(LD_BUG, "Flow control called for non-congestion control circuit"); + return -1; + } + + /* Onion services and clients are typically localhost edges, so they + * need different buffering limits than exits do */ + if (TO_CONN(stream)->type == CONN_TYPE_AP || stream->hs_ident != NULL) { + buffer_limit_xoff = xoff_client; + } else { + buffer_limit_xoff = xoff_exit; + } + + if (total_buffered > buffer_limit_xoff) { + if (!stream->xoff_sent) { + log_info(LD_EDGE, "Sending XOFF: %ld %d", + total_buffered, buffer_limit_xoff); + tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xoff_sending), stream); + + circuit_send_stream_xoff(stream); + + /* Clear the drain rate. It is considered wrong if we + * got all the way to XOFF */ + stream->ewma_drain_rate = 0; + } + } + + /* If the outbuf has accumulated more than the expected burst limit of + * cells, then assume it is not draining, and call decide_xon. We must + * do this because writes only happen when the socket unblocks, so + * may not otherwise notice accumulation of data in the outbuf for + * advisory XONs. */ + if (total_buffered > MAX_EXPECTED_CELL_BURST*RELAY_PAYLOAD_SIZE) { + flow_control_decide_xon(stream, 0); + } + + /* Flow control always takes more data; we rely on the oomkiller to + * handle misbehavior. */ + return 0; +} + +/** + * Returns true if the stream's drain rate has changed significantly. + * + * Returns false if the monotime clock is stalled, or if we have + * no previous drain rate information. + */ +static bool +stream_drain_rate_changed(const edge_connection_t *stream) +{ + if (!is_monotime_clock_reliable()) { + return false; + } + + if (!stream->ewma_rate_last_sent) { + return false; + } + + if (stream->ewma_drain_rate > + (100+(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) { + return true; + } + + if (stream->ewma_drain_rate < + (100-(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) { + return true; + } + + return false; +} + +/** + * Called whenever we drain an edge connection outbuf by writing on + * its socket, to decide if it is time to send an xon. + * + * The n_written parameter tells us how many bytes we have written + * this time, which is used to compute the advisory drain rate fields. + */ +void +flow_control_decide_xon(edge_connection_t *stream, size_t n_written) +{ + size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream)); + + /* Bounds check the number of drained bytes, and scale */ + if (stream->drained_bytes >= UINT32_MAX - n_written) { + /* Cut the bytes in half, and move the start time up halfway to now + * (if we have one). */ + stream->drained_bytes /= 2; + + if (stream->drain_start_usec) { + uint64_t now = monotime_absolute_usec(); + + stream->drain_start_usec = now - (now-stream->drain_start_usec)/2; + } + } + + /* Accumulate drained bytes since last rate computation */ + stream->drained_bytes += n_written; + + tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon), stream, n_written); + + /* Check for bad monotime clock and bytecount wrap */ + if (!is_monotime_clock_reliable()) { + /* If the monotime clock ever goes wrong, the safest thing to do + * is just clear our short-term rate info and wait for the clock to + * become reliable again.. */ + stream->drain_start_usec = 0; + stream->drained_bytes = 0; + } else { + /* If we have no drain start timestamp, and we still have + * remaining buffer, start the buffering counter */ + if (!stream->drain_start_usec && total_buffered > 0) { + log_debug(LD_EDGE, "Began edge buffering: %d %d %ld", + stream->ewma_rate_last_sent, + stream->ewma_drain_rate, + total_buffered); + tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_start), + stream); + stream->drain_start_usec = monotime_absolute_usec(); + stream->drained_bytes = 0; + } + } + + if (stream->drain_start_usec) { + /* If we have spent enough time in a queued state, update our drain + * rate. */ + if (stream->drained_bytes > xon_rate_bytes) { + /* No previous drained bytes means it is the first time we are computing + * it so use the value we just drained onto the socket as a baseline. It + * won't be accurate but it will be a start towards the right value. + * + * We have to do this in order to have a drain rate else we could be + * sending a drain rate of 0 in an XON which would be undesirable and + * basically like sending an XOFF. */ + if (stream->prev_drained_bytes == 0) { + stream->prev_drained_bytes = stream->drained_bytes; + } + uint32_t drain_rate = compute_drain_rate(stream); + /* Once the drain rate has been computed, note how many bytes we just + * drained so it can be used at the next calculation. We do this here + * because it gets reset once the rate is changed. */ + stream->prev_drained_bytes = stream->drained_bytes; + + if (drain_rate) { + stream->ewma_drain_rate = + (uint32_t)n_count_ewma(drain_rate, + stream->ewma_drain_rate, + xon_ewma_cnt); + log_debug(LD_EDGE, "Updating drain rate: %d %d %ld", + drain_rate, + stream->ewma_drain_rate, + total_buffered); + tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_update), + stream, drain_rate); + /* Reset recent byte counts. This prevents us from sending advisory + * XONs more frequent than every xon_rate_bytes. */ + stream->drained_bytes = 0; + stream->drain_start_usec = 0; + } + } + } + + /* If we don't have an XOFF outstanding, consider updating an + * old rate */ + if (!stream->xoff_sent) { + if (stream_drain_rate_changed(stream)) { + /* If we are still buffering and the rate changed, update + * advisory XON */ + log_info(LD_EDGE, "Sending rate-change XON: %d %d %ld", + stream->ewma_rate_last_sent, + stream->ewma_drain_rate, + total_buffered); + tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_rate_change), stream); + circuit_send_stream_xon(stream); + } + } else if (total_buffered == 0) { + log_info(LD_EDGE, "Sending XON: %d %d %ld", + stream->ewma_rate_last_sent, + stream->ewma_drain_rate, + total_buffered); + tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_partial_drain), stream); + circuit_send_stream_xon(stream); + } + + /* If the buffer has fully emptied, clear the drain timestamp, + * so we can total only bytes drained while outbuf is 0. */ + if (total_buffered == 0) { + stream->drain_start_usec = 0; + + /* After we've spent 'xon_rate_bytes' with the queue fully drained, + * double any rate we sent. */ + if (stream->drained_bytes >= xon_rate_bytes && + stream->ewma_rate_last_sent) { + stream->ewma_drain_rate = MIN(INT32_MAX, 2*stream->ewma_drain_rate); + + log_debug(LD_EDGE, + "Queue empty for xon_rate_limit bytes: %d %d", + stream->ewma_rate_last_sent, + stream->ewma_drain_rate); + tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_doubled), stream); + /* Resetting the drained bytes count. We need to keep its value as a + * previous so the drain rate calculation takes into account what was + * actually drain the last time. */ + stream->prev_drained_bytes = stream->drained_bytes; + stream->drained_bytes = 0; + } + } + + return; +} + +/** + * Note that we packaged some data on this stream. Used to enforce + * client-side dropmark limits + */ +void +flow_control_note_sent_data(edge_connection_t *stream, size_t len) +{ + /* If we are near the max, scale everything down */ + if (stream->total_bytes_xmit >= TOTAL_XMIT_SCALE_AT-len) { + log_info(LD_EDGE, "Scaling down for flow control xmit bytes:: %d %d %d", + stream->total_bytes_xmit, + stream->num_xoff_recv, + stream->num_xon_recv); + + stream->total_bytes_xmit /= 2; + stream->num_xoff_recv /= 2; + stream->num_xon_recv /= 2; + } + + stream->total_bytes_xmit += len; +} + +/** Returns true if an edge connection uses flow control */ +bool +edge_uses_flow_control(const edge_connection_t *stream) +{ + bool ret = (stream->on_circuit && stream->on_circuit->ccontrol) || + (stream->cpath_layer && stream->cpath_layer->ccontrol); + + /* All circuits with congestion control use flow control */ + return ret; +} + +/** + * Returns the max RTT for the circuit that carries this stream, + * as observed by congestion control. + */ +uint64_t +edge_get_max_rtt(const edge_connection_t *stream) +{ + if (stream->on_circuit && stream->on_circuit->ccontrol) + return stream->on_circuit->ccontrol->max_rtt_usec; + else if (stream->cpath_layer && stream->cpath_layer->ccontrol) + return stream->cpath_layer->ccontrol->max_rtt_usec; + + return 0; +} + +/** Returns true if a connection is an edge conn that uses flow control */ +bool +conn_uses_flow_control(connection_t *conn) +{ + bool ret = false; + + if (CONN_IS_EDGE(conn)) { + edge_connection_t *edge = TO_EDGE_CONN(conn); + + if (edge_uses_flow_control(edge)) { + ret = true; + } + } + + return ret; +} + diff --git a/src/core/or/congestion_control_flow.h b/src/core/or/congestion_control_flow.h new file mode 100644 index 0000000000..6c318027ea --- /dev/null +++ b/src/core/or/congestion_control_flow.h @@ -0,0 +1,48 @@ +/* Copyright (c) 2019-2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file congestion_control_flow.h + * \brief APIs for stream flow control on congestion controlled circuits. + **/ + +#ifndef TOR_CONGESTION_CONTROL_FLOW_H +#define TOR_CONGESTION_CONTROL_FLOW_H + +#include "core/or/crypt_path_st.h" +#include "core/or/circuit_st.h" +#include "core/or/edge_connection_st.h" + +void flow_control_new_consensus_params(const struct networkstatus_t *); + +bool circuit_process_stream_xoff(edge_connection_t *conn, + const crypt_path_t *layer_hint, + const cell_t *cell); +bool circuit_process_stream_xon(edge_connection_t *conn, + const crypt_path_t *layer_hint, + const cell_t *cell); + +int flow_control_decide_xoff(edge_connection_t *stream); +void flow_control_decide_xon(edge_connection_t *stream, size_t n_written); + +void flow_control_note_sent_data(edge_connection_t *stream, size_t len); + +bool edge_uses_flow_control(const edge_connection_t *stream); + +bool conn_uses_flow_control(connection_t *stream); + +uint64_t edge_get_max_rtt(const edge_connection_t *); + +/* Private section starts. */ +#ifdef TOR_CONGESTION_CONTROL_FLOW_PRIVATE + +/* + * Unit tests declaractions. + */ +#ifdef TOR_UNIT_TESTS + +#endif /* defined(TOR_UNIT_TESTS) */ + +#endif /* defined(TOR_CONGESTION_CONTROL_FLOW_PRIVATE) */ + +#endif /* !defined(TOR_CONGESTION_CONTROL_FLOW_H) */ From 0422eb26a70fc1450cc6b57902f189edc4eed10a Mon Sep 17 00:00:00 2001 From: Mike Perry Date: Tue, 10 Aug 2021 21:35:46 +0000 Subject: [PATCH 09/18] Prop#324: Hook up flow control --- src/app/main/main.c | 2 ++ src/core/mainloop/connection.c | 16 ++++++++-- src/core/mainloop/mainloop.c | 7 +++++ src/core/or/or.h | 3 ++ src/core/or/relay.c | 47 ++++++++++++++++++++++++++-- src/core/or/sendme.c | 28 +++++++++++++++-- src/core/or/sendme.h | 2 +- src/feature/nodelist/networkstatus.c | 2 ++ 8 files changed, 99 insertions(+), 8 deletions(-) diff --git a/src/app/main/main.c b/src/app/main/main.c index 89564490e6..0742abe70a 100644 --- a/src/app/main/main.c +++ b/src/app/main/main.c @@ -27,6 +27,7 @@ #include "core/or/channel.h" #include "core/or/channelpadding.h" #include "core/or/circuitpadding.h" +#include "core/or/congestion_control_flow.h" #include "core/or/circuitlist.h" #include "core/or/command.h" #include "core/or/connection_or.h" @@ -630,6 +631,7 @@ tor_init(int argc, char *argv[]) * until we get a consensus */ channelpadding_new_consensus_params(NULL); circpad_new_consensus_params(NULL); + flow_control_new_consensus_params(NULL); /* Initialize circuit padding to defaults+torrc until we get a consensus */ circpad_machines_init(); diff --git a/src/core/mainloop/connection.c b/src/core/mainloop/connection.c index 48bea792ae..9271a70914 100644 --- a/src/core/mainloop/connection.c +++ b/src/core/mainloop/connection.c @@ -147,6 +147,8 @@ #include "feature/nodelist/routerinfo_st.h" #include "core/or/socks_request_st.h" +#include "core/or/congestion_control_flow.h" + /** * On Windows and Linux we cannot reliably bind() a socket to an * address and port if: 1) There's already a socket bound to wildcard @@ -4594,9 +4596,9 @@ connection_handle_write_impl(connection_t *conn, int force) !dont_stop_writing) { /* it's done flushing */ if (connection_finished_flushing(conn) < 0) { /* already marked */ - return -1; + goto err; } - return 0; + goto done; } /* Call even if result is 0, since the global write bucket may @@ -4606,7 +4608,17 @@ connection_handle_write_impl(connection_t *conn, int force) if (n_read > 0 && connection_is_reading(conn)) connection_consider_empty_read_buckets(conn); + done: + /* If this is an edge connection with congestion control, check to see + * if it is time to send an xon */ + if (conn_uses_flow_control(conn)) { + flow_control_decide_xon(TO_EDGE_CONN(conn), n_written); + } + return 0; + + err: + return -1; } /* DOCDOC connection_handle_write */ diff --git a/src/core/mainloop/mainloop.c b/src/core/mainloop/mainloop.c index 37b53db92a..cd57dea3d4 100644 --- a/src/core/mainloop/mainloop.c +++ b/src/core/mainloop/mainloop.c @@ -641,6 +641,13 @@ connection_start_reading,(connection_t *conn)) if (connection_should_read_from_linked_conn(conn)) connection_start_reading_from_linked_conn(conn); } else { + if (CONN_IS_EDGE(conn) && TO_EDGE_CONN(conn)->xoff_received) { + /* We should not get called here if we're waiting for an XON, but + * belt-and-suspenders */ + log_notice(LD_NET, + "Request to start reading on an edgeconn blocked with XOFF"); + return; + } if (event_add(conn->read_event, NULL)) log_warn(LD_NET, "Error from libevent setting read event state for %d " "to watched: %s", diff --git a/src/core/or/or.h b/src/core/or/or.h index 99948f26e2..ad82130301 100644 --- a/src/core/or/or.h +++ b/src/core/or/or.h @@ -210,6 +210,9 @@ struct curve25519_public_key_t; #define RELAY_COMMAND_PADDING_NEGOTIATE 41 #define RELAY_COMMAND_PADDING_NEGOTIATED 42 +#define RELAY_COMMAND_XOFF 43 +#define RELAY_COMMAND_XON 44 + /* Reasons why an OR connection is closed. */ #define END_OR_CONN_REASON_DONE 1 #define END_OR_CONN_REASON_REFUSED 2 /* connection refused */ diff --git a/src/core/or/relay.c b/src/core/or/relay.c index e3d41d7bf0..0e889eb348 100644 --- a/src/core/or/relay.c +++ b/src/core/or/relay.c @@ -98,6 +98,7 @@ #include "core/or/socks_request_st.h" #include "core/or/sendme.h" #include "core/or/congestion_control_common.h" +#include "core/or/congestion_control_flow.h" static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction, @@ -1739,6 +1740,44 @@ handle_relay_cell_command(cell_t *cell, circuit_t *circ, sendme_connection_edge_consider_sending(conn); } + return 0; + case RELAY_COMMAND_XOFF: + if (!conn) { + if (CIRCUIT_IS_ORIGIN(circ)) { + origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ); + if (relay_crypt_from_last_hop(ocirc, layer_hint) && + connection_half_edge_is_valid_data(ocirc->half_streams, + rh->stream_id)) { + circuit_read_valid_data(ocirc, rh->length); + } + } + return 0; + } + + if (circuit_process_stream_xoff(conn, layer_hint, cell)) { + if (CIRCUIT_IS_ORIGIN(circ)) { + circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length); + } + } + return 0; + case RELAY_COMMAND_XON: + if (!conn) { + if (CIRCUIT_IS_ORIGIN(circ)) { + origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ); + if (relay_crypt_from_last_hop(ocirc, layer_hint) && + connection_half_edge_is_valid_data(ocirc->half_streams, + rh->stream_id)) { + circuit_read_valid_data(ocirc, rh->length); + } + } + return 0; + } + + if (circuit_process_stream_xon(conn, layer_hint, cell)) { + if (CIRCUIT_IS_ORIGIN(circ)) { + circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length); + } + } return 0; case RELAY_COMMAND_END: reason = rh->length > 0 ? @@ -2287,7 +2326,7 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial, } /* Handle the stream-level SENDME package window. */ - if (sendme_note_stream_data_packaged(conn) < 0) { + if (sendme_note_stream_data_packaged(conn, length) < 0) { connection_stop_reading(TO_CONN(conn)); log_debug(domain,"conn->package_window reached 0."); circuit_consider_stop_edge_reading(circ, cpath_layer); @@ -2402,7 +2441,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, /* Activate reading starting from the chosen stream */ for (conn=chosen_stream; conn; conn = conn->next_stream) { /* Start reading for the streams starting from here */ - if (conn->base_.marked_for_close || conn->package_window <= 0) + if (conn->base_.marked_for_close || conn->package_window <= 0 || + conn->xoff_received) continue; if (!layer_hint || conn->cpath_layer == layer_hint) { connection_start_reading(TO_CONN(conn)); @@ -2413,7 +2453,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, } /* Go back and do the ones we skipped, circular-style */ for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) { - if (conn->base_.marked_for_close || conn->package_window <= 0) + if (conn->base_.marked_for_close || conn->package_window <= 0 || + conn->xoff_received) continue; if (!layer_hint || conn->cpath_layer == layer_hint) { connection_start_reading(TO_CONN(conn)); diff --git a/src/core/or/sendme.c b/src/core/or/sendme.c index 900490a892..ee670f9d51 100644 --- a/src/core/or/sendme.c +++ b/src/core/or/sendme.c @@ -22,6 +22,7 @@ #include "core/or/relay.h" #include "core/or/sendme.h" #include "core/or/congestion_control_common.h" +#include "core/or/congestion_control_flow.h" #include "feature/nodelist/networkstatus.h" #include "lib/ctime/di_ops.h" #include "trunnel/sendme_cell.h" @@ -370,6 +371,10 @@ sendme_connection_edge_consider_sending(edge_connection_t *conn) int log_domain = TO_CONN(conn)->type == CONN_TYPE_AP ? LD_APP : LD_EXIT; + /* If we use flow control, we do not send stream sendmes */ + if (edge_uses_flow_control(conn)) + goto end; + /* Don't send it if we still have data to deliver. */ if (connection_outbuf_too_full(TO_CONN(conn))) { goto end; @@ -546,6 +551,12 @@ sendme_process_stream_level(edge_connection_t *conn, circuit_t *circ, tor_assert(conn); tor_assert(circ); + if (edge_uses_flow_control(conn)) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Congestion control got stream sendme"); + return -END_CIRC_REASON_TORPROTOCOL; + } + /* Don't allow the other endpoint to request more than our maximum (i.e. * initial) stream SENDME window worth of data. Well-behaved stock clients * will not request more than this max (as per the check in the while loop @@ -603,7 +614,12 @@ int sendme_stream_data_received(edge_connection_t *conn) { tor_assert(conn); - return --conn->deliver_window; + + if (edge_uses_flow_control(conn)) { + return flow_control_decide_xoff(conn); + } else { + return --conn->deliver_window; + } } /* Called when a relay DATA cell is packaged on the given circuit. If @@ -651,10 +667,18 @@ sendme_note_circuit_data_packaged(circuit_t *circ, crypt_path_t *layer_hint) /* Called when a relay DATA cell is packaged for the given edge connection * conn. Update the package window and return its new value. */ int -sendme_note_stream_data_packaged(edge_connection_t *conn) +sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len) { tor_assert(conn); + if (edge_uses_flow_control(conn)) { + flow_control_note_sent_data(conn, len); + if (conn->xoff_received) + return -1; + else + return 1; + } + --conn->package_window; log_debug(LD_APP, "Stream package_window now %d.", conn->package_window); return conn->package_window; diff --git a/src/core/or/sendme.h b/src/core/or/sendme.h index c224d0a921..2abec91a91 100644 --- a/src/core/or/sendme.h +++ b/src/core/or/sendme.h @@ -33,7 +33,7 @@ int sendme_circuit_data_received(circuit_t *circ, crypt_path_t *layer_hint); /* Update package window functions. */ int sendme_note_circuit_data_packaged(circuit_t *circ, crypt_path_t *layer_hint); -int sendme_note_stream_data_packaged(edge_connection_t *conn); +int sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len); /* Record cell digest on circuit. */ void sendme_record_cell_digest_on_circ(circuit_t *circ, crypt_path_t *cpath); diff --git a/src/feature/nodelist/networkstatus.c b/src/feature/nodelist/networkstatus.c index 7a1e73ef60..0138dff033 100644 --- a/src/feature/nodelist/networkstatus.c +++ b/src/feature/nodelist/networkstatus.c @@ -45,6 +45,7 @@ #include "core/or/channel.h" #include "core/or/channelpadding.h" #include "core/or/circuitpadding.h" +#include "core/or/congestion_control_flow.h" #include "core/or/circuitmux.h" #include "core/or/circuitmux_ewma.h" #include "core/or/circuitstats.h" @@ -1699,6 +1700,7 @@ notify_after_networkstatus_changes(void) channelpadding_new_consensus_params(c); circpad_new_consensus_params(c); router_new_consensus_params(c); + flow_control_new_consensus_params(c); /* Maintenance of our L2 guard list */ maintain_layer2_guards(); From 58aca27265980eb1d13daf3c3378251e3cb9c956 Mon Sep 17 00:00:00 2001 From: Mike Perry Date: Tue, 10 Aug 2021 21:20:28 +0000 Subject: [PATCH 10/18] MAKEFILE: Add flow control files to makefile --- src/core/or/include.am | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/core/or/include.am b/src/core/or/include.am index 66529b70b2..278556144c 100644 --- a/src/core/or/include.am +++ b/src/core/or/include.am @@ -39,6 +39,7 @@ LIBTOR_APP_A_SOURCES += \ src/core/or/congestion_control_vegas.c \ src/core/or/congestion_control_nola.c \ src/core/or/congestion_control_westwood.c \ + src/core/or/congestion_control_flow.c \ src/core/or/status.c \ src/core/or/versions.c @@ -103,6 +104,7 @@ noinst_HEADERS += \ src/core/or/relay_crypto_st.h \ src/core/or/scheduler.h \ src/core/or/sendme.h \ + src/core/or/congestion_control_flow.h \ src/core/or/congestion_control_common.h \ src/core/or/congestion_control_vegas.h \ src/core/or/congestion_control_nola.h \ From bd0aabe20f7b0b89a1f91ac11f0bf4f72034b27d Mon Sep 17 00:00:00 2001 From: David Goulet Date: Thu, 9 Sep 2021 10:32:55 -0400 Subject: [PATCH 11/18] oom: Consider edge connections as well Signed-off-by: David Goulet --- src/core/or/circuitlist.c | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/core/or/circuitlist.c b/src/core/or/circuitlist.c index 35d810c660..88c4159294 100644 --- a/src/core/or/circuitlist.c +++ b/src/core/or/circuitlist.c @@ -2602,6 +2602,7 @@ circuits_handle_oom(size_t current_allocation) size_t mem_recovered=0; int n_circuits_killed=0; int n_dirconns_killed=0; + int n_edgeconns_killed = 0; uint32_t now_ts; log_notice(LD_GENERAL, "We're low on memory (cell queues total alloc:" " %"TOR_PRIuSZ" buffer total alloc: %" TOR_PRIuSZ "," @@ -2668,12 +2669,19 @@ circuits_handle_oom(size_t current_allocation) if (conn_age < circ->age_tmp) { break; } - if (conn->type == CONN_TYPE_DIR && conn->linked_conn == NULL) { + /* Also consider edge connections so we don't accumulate bytes on the + * outbuf due to a malicious destination holding off the read on us. */ + if ((conn->type == CONN_TYPE_DIR && conn->linked_conn == NULL) || + CONN_IS_EDGE(conn)) { if (!conn->marked_for_close) connection_mark_for_close(conn); mem_recovered += single_conn_free_bytes(conn); - ++n_dirconns_killed; + if (conn->type == CONN_TYPE_DIR) { + ++n_dirconns_killed; + } else { + ++n_edgeconns_killed; + } if (mem_recovered >= mem_to_recover) goto done_recovering_mem; @@ -2703,11 +2711,12 @@ circuits_handle_oom(size_t current_allocation) done_recovering_mem: log_notice(LD_GENERAL, "Removed %"TOR_PRIuSZ" bytes by killing %d circuits; " "%d circuits remain alive. Also killed %d non-linked directory " - "connections.", + "connections. Killed %d edge connections", mem_recovered, n_circuits_killed, smartlist_len(circlist) - n_circuits_killed, - n_dirconns_killed); + n_dirconns_killed, + n_edgeconns_killed); return mem_recovered; } From 5e17f8acab8bf7380a626e5ef1a162b7e4351dcc Mon Sep 17 00:00:00 2001 From: Mike Perry Date: Thu, 23 Sep 2021 19:36:11 +0000 Subject: [PATCH 12/18] Support time-based half-closed connection handling. Since we no longer use stream SENDMEs for congestion control, we must now use time to decide when data should stop arriving on a half-closed stream. --- src/core/or/connection_edge.c | 51 ++++++++++++++++++++++++++++------- src/core/or/half_edge_st.h | 12 +++++++++ 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/src/core/or/connection_edge.c b/src/core/or/connection_edge.c index 6f6f22a0d4..d4d9d2f759 100644 --- a/src/core/or/connection_edge.c +++ b/src/core/or/connection_edge.c @@ -69,6 +69,8 @@ #include "core/or/circuituse.h" #include "core/or/circuitpadding.h" #include "core/or/connection_edge.h" +#include "core/or/congestion_control_flow.h" +#include "core/or/circuitstats.h" #include "core/or/connection_or.h" #include "core/or/extendinfo.h" #include "core/or/policies.h" @@ -614,20 +616,39 @@ connection_half_edge_add(const edge_connection_t *conn, half_conn->stream_id = conn->stream_id; - // How many sendme's should I expect? - half_conn->sendmes_pending = - (STREAMWINDOW_START-conn->package_window)/STREAMWINDOW_INCREMENT; - // Is there a connected cell pending? half_conn->connected_pending = conn->base_.state == AP_CONN_STATE_CONNECT_WAIT; - /* Data should only arrive if we're not waiting on a resolved cell. - * It can arrive after waiting on connected, because of optimistic - * data. */ - if (conn->base_.state != AP_CONN_STATE_RESOLVE_WAIT) { - // How many more data cells can arrive on this id? - half_conn->data_pending = conn->deliver_window; + if (edge_uses_flow_control(conn)) { + /* If the edge uses the new congestion control flow control, we must use + * time-based limits on half-edge activity. */ + uint64_t timeout_usec = (uint64_t)(get_circuit_build_timeout_ms()*1000); + half_conn->used_ccontrol = 1; + + /* If this is an onion service circuit, double the CBT as an approximate + * value for the other half of the circuit */ + if (conn->hs_ident) { + timeout_usec *= 2; + } + + /* The stream should stop seeing any use after the larger of the circuit + * RTT and the overall circuit build timeout */ + half_conn->end_ack_expected_usec = MAX(timeout_usec, + edge_get_max_rtt(conn)) + + monotime_absolute_usec(); + } else { + // How many sendme's should I expect? + half_conn->sendmes_pending = + (STREAMWINDOW_START-conn->package_window)/STREAMWINDOW_INCREMENT; + + /* Data should only arrive if we're not waiting on a resolved cell. + * It can arrive after waiting on connected, because of optimistic + * data. */ + if (conn->base_.state != AP_CONN_STATE_RESOLVE_WAIT) { + // How many more data cells can arrive on this id? + half_conn->data_pending = conn->deliver_window; + } } insert_at = smartlist_bsearch_idx(circ->half_streams, &half_conn->stream_id, @@ -688,6 +709,12 @@ connection_half_edge_is_valid_data(const smartlist_t *half_conns, if (!half) return 0; + if (half->used_ccontrol) { + if (monotime_absolute_usec() > half->end_ack_expected_usec) + return 0; + return 1; + } + if (half->data_pending > 0) { half->data_pending--; return 1; @@ -740,6 +767,10 @@ connection_half_edge_is_valid_sendme(const smartlist_t *half_conns, if (!half) return 0; + /* congestion control edges don't use sendmes */ + if (half->used_ccontrol) + return 0; + if (half->sendmes_pending > 0) { half->sendmes_pending--; return 1; diff --git a/src/core/or/half_edge_st.h b/src/core/or/half_edge_st.h index c956c7434a..ac97eb19f1 100644 --- a/src/core/or/half_edge_st.h +++ b/src/core/or/half_edge_st.h @@ -31,6 +31,18 @@ typedef struct half_edge_t { * our deliver window */ int data_pending; + /** + * Monotime timestamp of when the other end should have successfuly + * shut down the stream and stop sending data, based on the larger + * of circuit RTT and CBT. Used if 'used_ccontrol' is true, to expire + * the half_edge at this monotime timestamp. */ + uint64_t end_ack_expected_usec; + + /** + * Did this edge use congestion control? If so, use + * timer instead of pending data approach */ + int used_ccontrol : 1; + /** Is there a connected cell pending? */ int connected_pending : 1; } half_edge_t; From 98be8634faf4405edc5388c64d9f834b6ae5b4b1 Mon Sep 17 00:00:00 2001 From: Mike Perry Date: Tue, 28 Sep 2021 17:44:33 +0000 Subject: [PATCH 13/18] Turn CircEWMA tick len into consensus parameter. This will assist tuning of CircEWMA in Shadow and Live. --- src/core/or/circuitmux_ewma.c | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/core/or/circuitmux_ewma.c b/src/core/or/circuitmux_ewma.c index 0382e62f75..adf256ab05 100644 --- a/src/core/or/circuitmux_ewma.c +++ b/src/core/or/circuitmux_ewma.c @@ -45,7 +45,10 @@ /*** EWMA parameter #defines ***/ /** How long does a tick last (seconds)? */ -#define EWMA_TICK_LEN 10 +#define EWMA_TICK_LEN_DEFAULT 10 +#define EWMA_TICK_LEN_MIN 1 +#define EWMA_TICK_LEN_MAX 600 +static int ewma_tick_len = EWMA_TICK_LEN_DEFAULT; /** The default per-tick scale factor, if it hasn't been overridden by a * consensus or a configuration setting. zero means "disabled". */ @@ -148,7 +151,7 @@ cell_ewma_get_tick(void) monotime_coarse_get(&now); int32_t msec_diff = monotime_coarse_diff_msec32(&start_of_current_tick, &now); - return current_tick_num + msec_diff / (1000*EWMA_TICK_LEN); + return current_tick_num + msec_diff / (1000*ewma_tick_len); } /** @@ -527,15 +530,15 @@ cell_ewma_get_current_tick_and_fraction(double *remainder_out) monotime_coarse_get(&now); int32_t msec_diff = monotime_coarse_diff_msec32(&start_of_current_tick, &now); - if (msec_diff > (1000*EWMA_TICK_LEN)) { - unsigned ticks_difference = msec_diff / (1000*EWMA_TICK_LEN); + if (msec_diff > (1000*ewma_tick_len)) { + unsigned ticks_difference = msec_diff / (1000*ewma_tick_len); monotime_coarse_add_msec(&start_of_current_tick, &start_of_current_tick, - ticks_difference * 1000 * EWMA_TICK_LEN); + ticks_difference * 1000 * ewma_tick_len); current_tick_num += ticks_difference; - msec_diff %= 1000*EWMA_TICK_LEN; + msec_diff %= 1000*ewma_tick_len; } - *remainder_out = ((double)msec_diff) / (1.0e3 * EWMA_TICK_LEN); + *remainder_out = ((double)msec_diff) / (1.0e3 * ewma_tick_len); return current_tick_num; } @@ -605,15 +608,20 @@ cmux_ewma_set_options(const or_options_t *options, /* Both options and consensus can be NULL. This assures us to either get a * valid configured value or the default one. */ halflife = get_circuit_priority_halflife(options, consensus, &source); + ewma_tick_len = networkstatus_get_param(consensus, + "CircuitPriorityTickSecs", + EWMA_TICK_LEN_DEFAULT, + EWMA_TICK_LEN_MIN, + EWMA_TICK_LEN_MAX); /* convert halflife into halflife-per-tick. */ - halflife /= EWMA_TICK_LEN; + halflife /= ewma_tick_len; /* compute per-tick scale factor. */ ewma_scale_factor = exp(LOG_ONEHALF / halflife); log_info(LD_OR, "Enabled cell_ewma algorithm because of value in %s; " "scale factor is %f per %d seconds", - source, ewma_scale_factor, EWMA_TICK_LEN); + source, ewma_scale_factor, ewma_tick_len); } /** Return the multiplier necessary to convert the value of a cell sent in From 6dae9903b1d5fe3c5df712eb99076a9adf9d5c8b Mon Sep 17 00:00:00 2001 From: Mike Perry Date: Tue, 28 Sep 2021 21:19:27 +0000 Subject: [PATCH 14/18] Turn orconn watermarks into consensus parameters. Tuning these may reduce memory usage and latency. --- src/app/main/main.c | 2 ++ src/core/or/channeltls.c | 3 ++- src/core/or/congestion_control_common.c | 31 +++++++++++++++++++++++++ src/core/or/congestion_control_common.h | 21 +++++++++++++++++ src/core/or/connection_or.c | 7 +++--- src/core/or/or.h | 12 ---------- src/feature/nodelist/networkstatus.c | 2 ++ src/test/test_channeltls.c | 3 ++- 8 files changed, 64 insertions(+), 17 deletions(-) diff --git a/src/app/main/main.c b/src/app/main/main.c index 0742abe70a..6043b9589f 100644 --- a/src/app/main/main.c +++ b/src/app/main/main.c @@ -27,6 +27,7 @@ #include "core/or/channel.h" #include "core/or/channelpadding.h" #include "core/or/circuitpadding.h" +#include "core/or/congestion_control_common.h" #include "core/or/congestion_control_flow.h" #include "core/or/circuitlist.h" #include "core/or/command.h" @@ -631,6 +632,7 @@ tor_init(int argc, char *argv[]) * until we get a consensus */ channelpadding_new_consensus_params(NULL); circpad_new_consensus_params(NULL); + congestion_control_new_consensus_params(NULL); flow_control_new_consensus_params(NULL); /* Initialize circuit padding to defaults+torrc until we get a consensus */ diff --git a/src/core/or/channeltls.c b/src/core/or/channeltls.c index 481dafef91..9db8e2392d 100644 --- a/src/core/or/channeltls.c +++ b/src/core/or/channeltls.c @@ -64,6 +64,7 @@ #include "trunnel/netinfo.h" #include "core/or/channelpadding.h" #include "core/or/extendinfo.h" +#include "core/or/congestion_control_common.h" #include "core/or/cell_st.h" #include "core/or/cell_queue_st.h" @@ -793,7 +794,7 @@ channel_tls_num_cells_writeable_method(channel_t *chan) cell_network_size = get_cell_network_size(tlschan->conn->wide_circ_ids); outbuf_len = connection_get_outbuf_len(TO_CONN(tlschan->conn)); /* Get the number of cells */ - n = CEIL_DIV(OR_CONN_HIGHWATER - outbuf_len, cell_network_size); + n = CEIL_DIV(or_conn_highwatermark() - outbuf_len, cell_network_size); if (n < 0) n = 0; #if SIZEOF_SIZE_T > SIZEOF_INT if (n > INT_MAX) n = INT_MAX; diff --git a/src/core/or/congestion_control_common.c b/src/core/or/congestion_control_common.c index 51691c106d..393b79459d 100644 --- a/src/core/or/congestion_control_common.c +++ b/src/core/or/congestion_control_common.c @@ -46,6 +46,9 @@ #define BWE_SENDME_MIN_DFLT 5 +#define OR_CONN_HIGHWATER_DFLT (32*1024) +#define OR_CONN_LOWWATER_DFLT (16*1024) + static uint64_t congestion_control_update_circuit_rtt(congestion_control_t *, uint64_t); static bool congestion_control_update_circuit_bdp(congestion_control_t *, @@ -53,6 +56,34 @@ static bool congestion_control_update_circuit_bdp(congestion_control_t *, const crypt_path_t *, uint64_t, uint64_t); +static uint32_t cwnd_max = CWND_MAX_DFLT; +uint32_t or_conn_highwater = OR_CONN_HIGHWATER_DFLT; +uint32_t or_conn_lowwater = OR_CONN_LOWWATER_DFLT; + +/** + * Update global congestion control related consensus parameter values, + * every consensus update. + */ +void +congestion_control_new_consensus_params(const networkstatus_t *ns) +{ +#define OR_CONN_HIGHWATER_MIN (CELL_PAYLOAD_SIZE) +#define OR_CONN_HIGHWATER_MAX (INT32_MAX) + or_conn_highwater = + networkstatus_get_param(ns, "orconn_high", + OR_CONN_HIGHWATER_DFLT, + OR_CONN_HIGHWATER_MIN, + OR_CONN_HIGHWATER_MAX); + +#define OR_CONN_LOWWATER_MIN (CELL_PAYLOAD_SIZE) +#define OR_CONN_LOWWATER_MAX (INT32_MAX) + or_conn_lowwater = + networkstatus_get_param(ns, "orconn_low", + OR_CONN_LOWWATER_DFLT, + OR_CONN_LOWWATER_MIN, + OR_CONN_LOWWATER_MAX); +} + /** * Set congestion control parameters on a circuit's congestion * control object based on values from the consensus. diff --git a/src/core/or/congestion_control_common.h b/src/core/or/congestion_control_common.h index e8b9681ac6..02fee78013 100644 --- a/src/core/or/congestion_control_common.h +++ b/src/core/or/congestion_control_common.h @@ -41,6 +41,27 @@ int sendme_get_inc_count(const circuit_t *, const crypt_path_t *); bool circuit_sent_cell_for_sendme(const circuit_t *, const crypt_path_t *); bool is_monotime_clock_reliable(void); +void congestion_control_new_consensus_params(const networkstatus_t *ns); + +/* Ugh, C.. these two are private. Use the getter instead, when + * external to the congestion control code. */ +extern uint32_t or_conn_highwater; +extern uint32_t or_conn_lowwater; + +/** Stop writing on an orconn when its outbuf is this large */ +static inline uint32_t +or_conn_highwatermark(void) +{ + return or_conn_highwater; +} + +/** Resume writing on an orconn when its outbuf is less than this */ +static inline uint32_t +or_conn_lowwatermark(void) +{ + return or_conn_lowwater; +} + /** * Compute an N-count EWMA, aka N-EWMA. N-EWMA is defined as: * EWMA = alpha*value + (1-alpha)*EWMA_prev diff --git a/src/core/or/connection_or.c b/src/core/or/connection_or.c index dd31638eb3..db9f93e6f6 100644 --- a/src/core/or/connection_or.c +++ b/src/core/or/connection_or.c @@ -65,6 +65,7 @@ #include "core/or/scheduler.h" #include "feature/nodelist/torcert.h" #include "core/or/channelpadding.h" +#include "core/or/congestion_control_common.h" #include "feature/dirauth/authmode.h" #include "feature/hs/hs_service.h" @@ -636,7 +637,7 @@ connection_or_flushed_some(or_connection_t *conn) /* If we're under the low water mark, add cells until we're just over the * high water mark. */ datalen = connection_get_outbuf_len(TO_CONN(conn)); - if (datalen < OR_CONN_LOWWATER) { + if (datalen < or_conn_lowwatermark()) { /* Let the scheduler know */ scheduler_channel_wants_writes(TLS_CHAN_TO_BASE(conn->chan)); } @@ -660,9 +661,9 @@ connection_or_num_cells_writeable(or_connection_t *conn) * used to trigger when to start writing after we've stopped. */ datalen = connection_get_outbuf_len(TO_CONN(conn)); - if (datalen < OR_CONN_HIGHWATER) { + if (datalen < or_conn_highwatermark()) { cell_network_size = get_cell_network_size(conn->wide_circ_ids); - n = CEIL_DIV(OR_CONN_HIGHWATER - datalen, cell_network_size); + n = CEIL_DIV(or_conn_highwatermark() - datalen, cell_network_size); } return n; diff --git a/src/core/or/or.h b/src/core/or/or.h index ad82130301..392a848ee7 100644 --- a/src/core/or/or.h +++ b/src/core/or/or.h @@ -594,18 +594,6 @@ typedef struct or_handshake_state_t or_handshake_state_t; /** Length of Extended ORPort connection identifier. */ #define EXT_OR_CONN_ID_LEN DIGEST_LEN /* 20 */ -/* - * OR_CONN_HIGHWATER and OR_CONN_LOWWATER moved from connection_or.c so - * channeltls.c can see them too. - */ - -/** When adding cells to an OR connection's outbuf, keep adding until the - * outbuf is at least this long, or we run out of cells. */ -#define OR_CONN_HIGHWATER (32*1024) - -/** Add cells to an OR connection's outbuf whenever the outbuf's data length - * drops below this size. */ -#define OR_CONN_LOWWATER (16*1024) typedef struct connection_t connection_t; typedef struct control_connection_t control_connection_t; diff --git a/src/feature/nodelist/networkstatus.c b/src/feature/nodelist/networkstatus.c index 0138dff033..6867d8c98e 100644 --- a/src/feature/nodelist/networkstatus.c +++ b/src/feature/nodelist/networkstatus.c @@ -45,6 +45,7 @@ #include "core/or/channel.h" #include "core/or/channelpadding.h" #include "core/or/circuitpadding.h" +#include "core/or/congestion_control_common.h" #include "core/or/congestion_control_flow.h" #include "core/or/circuitmux.h" #include "core/or/circuitmux_ewma.h" @@ -1700,6 +1701,7 @@ notify_after_networkstatus_changes(void) channelpadding_new_consensus_params(c); circpad_new_consensus_params(c); router_new_consensus_params(c); + congestion_control_new_consensus_params(c); flow_control_new_consensus_params(c); /* Maintenance of our L2 guard list */ diff --git a/src/test/test_channeltls.c b/src/test/test_channeltls.c index 5219c86097..ca7fee2c53 100644 --- a/src/test/test_channeltls.c +++ b/src/test/test_channeltls.c @@ -20,6 +20,7 @@ #include "lib/tls/tortls.h" #include "core/or/or_connection_st.h" +#include "core/or/congestion_control_common.h" /* Test suite stuff */ #include "test/test.h" @@ -155,7 +156,7 @@ test_channeltls_num_bytes_queued(void *arg) * - 2 cells. */ n = ch->num_cells_writeable(ch); - tt_int_op(n, OP_EQ, CEIL_DIV(OR_CONN_HIGHWATER, 512) - 2); + tt_int_op(n, OP_EQ, CEIL_DIV(or_conn_highwatermark(), 512) - 2); UNMOCK(buf_datalen); tlschan_buf_datalen_mock_target = NULL; tlschan_buf_datalen_mock_size = 0; From e9038dc5f2fe6a62af1a32e1e4c51d8b894998e1 Mon Sep 17 00:00:00 2001 From: Mike Perry Date: Tue, 28 Sep 2021 15:17:34 +0000 Subject: [PATCH 15/18] Add a max cwnd consensus parameter and clamp. --- src/core/or/congestion_control_common.c | 29 +++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/src/core/or/congestion_control_common.c b/src/core/or/congestion_control_common.c index 393b79459d..4449d9bfd5 100644 --- a/src/core/or/congestion_control_common.c +++ b/src/core/or/congestion_control_common.c @@ -33,6 +33,7 @@ #define SENDME_INC_DFLT (50) #define CWND_MIN_DFLT (MAX(100, SENDME_INC_DFLT)) +#define CWND_MAX_DFLT (INT32_MAX) #define CWND_INC_DFLT (50) @@ -82,6 +83,14 @@ congestion_control_new_consensus_params(const networkstatus_t *ns) OR_CONN_LOWWATER_DFLT, OR_CONN_LOWWATER_MIN, OR_CONN_LOWWATER_MAX); + +#define CWND_MAX_MIN 500 +#define CWND_MAX_MAX (INT32_MAX) + cwnd_max = + networkstatus_get_param(NULL, "cc_cwnd_max", + CWND_MAX_DFLT, + CWND_MAX_MIN, + CWND_MAX_MAX); } /** @@ -963,20 +972,32 @@ congestion_control_dispatch_cc_alg(congestion_control_t *cc, const circuit_t *circ, const crypt_path_t *layer_hint) { + int ret = -END_CIRC_REASON_INTERNAL; switch (cc->cc_alg) { case CC_ALG_WESTWOOD: - return congestion_control_westwood_process_sendme(cc, circ, layer_hint); + ret = congestion_control_westwood_process_sendme(cc, circ, layer_hint); + break; case CC_ALG_VEGAS: - return congestion_control_vegas_process_sendme(cc, circ, layer_hint); + ret = congestion_control_vegas_process_sendme(cc, circ, layer_hint); + break; case CC_ALG_NOLA: - return congestion_control_nola_process_sendme(cc, circ, layer_hint); + ret = congestion_control_nola_process_sendme(cc, circ, layer_hint); + break; case CC_ALG_SENDME: default: tor_assert(0); } - return -END_CIRC_REASON_INTERNAL; + if (cc->cwnd > cwnd_max) { + static ratelim_t cwnd_limit = RATELIM_INIT(60); + log_fn_ratelim(&cwnd_limit, LOG_NOTICE, LD_CIRC, + "Congestion control cwnd %"PRIu64" exceeds max %d, clamping.", + cc->cwnd, cwnd_max); + cc->cwnd = cwnd_max; + } + + return ret; } From 322f213210d64de3db4ff0486e391378062e5a61 Mon Sep 17 00:00:00 2001 From: Mike Perry Date: Wed, 29 Sep 2021 02:12:56 +0000 Subject: [PATCH 16/18] Turn cell queue watermark limits into consensus params. This allows us to have quicker reaction to blocked orconns as a congestion signal. --- src/core/or/congestion_control_common.c | 20 ++++++++++++++++++++ src/core/or/congestion_control_common.h | 20 +++++++++++++++++++- src/core/or/relay.c | 15 ++++----------- 3 files changed, 43 insertions(+), 12 deletions(-) diff --git a/src/core/or/congestion_control_common.c b/src/core/or/congestion_control_common.c index 4449d9bfd5..be25258249 100644 --- a/src/core/or/congestion_control_common.c +++ b/src/core/or/congestion_control_common.c @@ -50,6 +50,9 @@ #define OR_CONN_HIGHWATER_DFLT (32*1024) #define OR_CONN_LOWWATER_DFLT (16*1024) +#define CELL_QUEUE_LOW_DFLT (10) +#define CELL_QUEUE_HIGH_DFLT (256) + static uint64_t congestion_control_update_circuit_rtt(congestion_control_t *, uint64_t); static bool congestion_control_update_circuit_bdp(congestion_control_t *, @@ -61,6 +64,9 @@ static uint32_t cwnd_max = CWND_MAX_DFLT; uint32_t or_conn_highwater = OR_CONN_HIGHWATER_DFLT; uint32_t or_conn_lowwater = OR_CONN_LOWWATER_DFLT; +int32_t cell_queue_high = CELL_QUEUE_HIGH_DFLT; +int32_t cell_queue_low = CELL_QUEUE_LOW_DFLT; + /** * Update global congestion control related consensus parameter values, * every consensus update. @@ -68,6 +74,20 @@ uint32_t or_conn_lowwater = OR_CONN_LOWWATER_DFLT; void congestion_control_new_consensus_params(const networkstatus_t *ns) { +#define CELL_QUEUE_HIGH_MIN (1) +#define CELL_QUEUE_HIGH_MAX (1000) + cell_queue_high = networkstatus_get_param(ns, "cellq_high", + CELL_QUEUE_HIGH_DFLT, + CELL_QUEUE_HIGH_MIN, + CELL_QUEUE_HIGH_MAX); + +#define CELL_QUEUE_LOW_MIN (1) +#define CELL_QUEUE_LOW_MAX (1000) + cell_queue_low = networkstatus_get_param(ns, "cellq_low", + CELL_QUEUE_LOW_DFLT, + CELL_QUEUE_LOW_MIN, + CELL_QUEUE_LOW_MAX); + #define OR_CONN_HIGHWATER_MIN (CELL_PAYLOAD_SIZE) #define OR_CONN_HIGHWATER_MAX (INT32_MAX) or_conn_highwater = diff --git a/src/core/or/congestion_control_common.h b/src/core/or/congestion_control_common.h index 02fee78013..01dbc1ceb4 100644 --- a/src/core/or/congestion_control_common.h +++ b/src/core/or/congestion_control_common.h @@ -43,10 +43,12 @@ bool is_monotime_clock_reliable(void); void congestion_control_new_consensus_params(const networkstatus_t *ns); -/* Ugh, C.. these two are private. Use the getter instead, when +/* Ugh, C.. these four are private. Use the getter instead, when * external to the congestion control code. */ extern uint32_t or_conn_highwater; extern uint32_t or_conn_lowwater; +extern int32_t cell_queue_high; +extern int32_t cell_queue_low; /** Stop writing on an orconn when its outbuf is this large */ static inline uint32_t @@ -62,6 +64,22 @@ or_conn_lowwatermark(void) return or_conn_lowwater; } +/** Stop reading on edge connections when we have this many cells + * waiting on the appropriate queue. */ +static inline int32_t +cell_queue_highwatermark(void) +{ + return cell_queue_high; +} + +/** Start reading from edge connections again when we get down to this many + * cells. */ +static inline int32_t +cell_queue_lowwatermark(void) +{ + return cell_queue_low; +} + /** * Compute an N-count EWMA, aka N-EWMA. N-EWMA is defined as: * EWMA = alpha*value + (1-alpha)*EWMA_prev diff --git a/src/core/or/relay.c b/src/core/or/relay.c index 0e889eb348..a2123f991c 100644 --- a/src/core/or/relay.c +++ b/src/core/or/relay.c @@ -117,13 +117,6 @@ static void adjust_exit_policy_from_exitpolicy_failure(origin_circuit_t *circ, node_t *node, const tor_addr_t *addr); -/** Stop reading on edge connections when we have this many cells - * waiting on the appropriate queue. */ -#define CELL_QUEUE_HIGHWATER_SIZE 256 -/** Start reading from edge connections again when we get down to this many - * cells. */ -#define CELL_QUEUE_LOWWATER_SIZE 64 - /** Stats: how many relay cells have originated at this hop, or have * been relayed onward (not recognized at this hop)? */ @@ -2400,8 +2393,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); cells_on_queue = or_circ->p_chan_cells.n; } - if (CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue < max_to_package) - max_to_package = CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue; + if (cell_queue_highwatermark() - cells_on_queue < max_to_package) + max_to_package = cell_queue_highwatermark() - cells_on_queue; /* Once we used to start listening on the streams in the order they * appeared in the linked list. That leads to starvation on the @@ -3121,7 +3114,7 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max)) /* Is the cell queue low enough to unblock all the streams that are waiting * to write to this circuit? */ - if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE) + if (streams_blocked && queue->n <= cell_queue_lowwatermark()) set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */ /* If n_flushed < max still, loop around and pick another circuit */ @@ -3239,7 +3232,7 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, /* If we have too many cells on the circuit, we should stop reading from * the edge streams for a while. */ - if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE) + if (!streams_blocked && queue->n >= cell_queue_highwatermark()) set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */ if (streams_blocked && fromstream) { From bfd69655afb2700bc9bf56333fdfe20b0a9b974b Mon Sep 17 00:00:00 2001 From: David Goulet Date: Mon, 4 Oct 2021 10:32:59 -0400 Subject: [PATCH 17/18] cc: Add comments and clean up some syntax Signed-off-by: David Goulet --- src/core/or/congestion_control_common.c | 45 ++++++++++++++++--------- src/core/or/congestion_control_flow.c | 41 ++++++++++++++-------- 2 files changed, 57 insertions(+), 29 deletions(-) diff --git a/src/core/or/congestion_control_common.c b/src/core/or/congestion_control_common.c index be25258249..0919f037db 100644 --- a/src/core/or/congestion_control_common.c +++ b/src/core/or/congestion_control_common.c @@ -26,30 +26,45 @@ #include "lib/time/compat_time.h" #include "feature/nodelist/networkstatus.h" -/* Consensus parameter defaults */ +/* Consensus parameter defaults. + * + * More details for each of the parameters can be found in proposal 324, + * section 6.5 including tuning notes. */ #define CIRCWINDOW_INIT (500) - -#define CWND_INC_PCT_SS_DFLT (100) - -#define SENDME_INC_DFLT (50) -#define CWND_MIN_DFLT (MAX(100, SENDME_INC_DFLT)) -#define CWND_MAX_DFLT (INT32_MAX) +#define SENDME_INC_DFLT (50) #define CWND_INC_DFLT (50) - +#define CWND_INC_PCT_SS_DFLT (100) #define CWND_INC_RATE_DFLT (1) +#define CWND_MAX_DFLT (INT32_MAX) +#define CWND_MIN_DFLT (MAX(100, SENDME_INC_DFLT)) +#define BWE_SENDME_MIN_DFLT (5) +#define EWMA_CWND_COUNT_DFLT (2) + +/* BDP algorithms for each congestion control algorithms use the piecewise + * estimattor. See section 3.1.4 of proposal 324. */ #define WESTWOOD_BDP_ALG BDP_ALG_PIECEWISE #define VEGAS_BDP_MIX_ALG BDP_ALG_PIECEWISE #define NOLA_BDP_ALG BDP_ALG_PIECEWISE -#define EWMA_CWND_COUNT_DFLT 2 - -#define BWE_SENDME_MIN_DFLT 5 - +/* Indicate OR connection buffer limitations used to stop or start accepting + * cells in its outbuf. + * + * These watermarks are historical to tor in a sense that they've been used + * almost from the genesis point. And were likely defined to fit the bounds of + * TLS records of 16KB which would be around 32 cells. + * + * These are defaults of the consensus parameter "orconn_high" and "orconn_low" + * values. */ #define OR_CONN_HIGHWATER_DFLT (32*1024) #define OR_CONN_LOWWATER_DFLT (16*1024) +/* Low and high values of circuit cell queue sizes. They are used to tell when + * to start or stop reading on the streams attached on the circuit. + * + * These are defaults of the consensus parameters "cellq_high" and "cellq_low". + */ #define CELL_QUEUE_LOW_DFLT (10) #define CELL_QUEUE_HIGH_DFLT (256) @@ -60,12 +75,12 @@ static bool congestion_control_update_circuit_bdp(congestion_control_t *, const crypt_path_t *, uint64_t, uint64_t); +/* Consensus parameters cached. The non static ones are extern. */ static uint32_t cwnd_max = CWND_MAX_DFLT; -uint32_t or_conn_highwater = OR_CONN_HIGHWATER_DFLT; -uint32_t or_conn_lowwater = OR_CONN_LOWWATER_DFLT; - int32_t cell_queue_high = CELL_QUEUE_HIGH_DFLT; int32_t cell_queue_low = CELL_QUEUE_LOW_DFLT; +uint32_t or_conn_highwater = OR_CONN_HIGHWATER_DFLT; +uint32_t or_conn_lowwater = OR_CONN_LOWWATER_DFLT; /** * Update global congestion control related consensus parameter values, diff --git a/src/core/or/congestion_control_flow.c b/src/core/or/congestion_control_flow.c index 6742bb38bb..0c2baa96e2 100644 --- a/src/core/or/congestion_control_flow.c +++ b/src/core/or/congestion_control_flow.c @@ -32,28 +32,34 @@ static uint32_t xoff_client; static uint32_t xoff_exit; static uint32_t xon_change_pct; -static uint32_t xon_rate_bytes; static uint32_t xon_ewma_cnt; +static uint32_t xon_rate_bytes; -/** In normal operation, we can get a burst of up to 32 cells before - * returning to libevent to flush the outbuf. This is a heuristic from - * hardcoded values and strange logic in connection_bucket_get_share(). */ +/* In normal operation, we can get a burst of up to 32 cells before returning + * to libevent to flush the outbuf. This is a heuristic from hardcoded values + * and strange logic in connection_bucket_get_share(). */ #define MAX_EXPECTED_CELL_BURST 32 -/** - * The following three are for dropmark rate limiting. They define when - * we scale down our XON, XOFF, and xmit byte counts. Early scaling - * is beneficial because it limits the ability of spurious XON/XOFF - * to be sent after large amounts of data without XON/XOFF. At these - * limits, after 10MB of data (or more), an adversary can only inject - * (log2(10MB)-log2(200*500))*100 ~= 1000 cells of fake XOFF/XON before - * the xmit byte * count will be halved enough to triggering a limit. */ +/* The following three are for dropmark rate limiting. They define when we + * scale down our XON, XOFF, and xmit byte counts. Early scaling is beneficial + * because it limits the ability of spurious XON/XOFF to be sent after large + * amounts of data without XON/XOFF. At these limits, after 10MB of data (or + * more), an adversary can only inject (log2(10MB)-log2(200*500))*100 ~= 1000 + * cells of fake XOFF/XON before the xmit byte count will be halved enough to + * triggering a limit. */ #define XON_COUNT_SCALE_AT 200 #define XOFF_COUNT_SCALE_AT 200 #define ONE_MEGABYTE (UINT64_C(1) << 20) -#define TOTAL_XMIT_SCALE_AT 10*ONE_MEGABYTE +#define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE) -static const congestion_control_t * +/** + * Return the congestion control object of the given edge connection. + * + * Returns NULL if the edge connection doesn't have a cpath_layer or not + * attached to a circuit. But also if the cpath_layer or circuit doesn't have a + * congestion control object. + */ +static inline const congestion_control_t * edge_get_ccontrol(const edge_connection_t *edge) { if (edge->cpath_layer) @@ -64,6 +70,13 @@ edge_get_ccontrol(const edge_connection_t *edge) return NULL; } +/** + * Update global congestion control related consensus parameter values, every + * consensus update. + * + * More details for each of the parameters can be found in proposal 324, + * section 6.5 including tuning notes. + */ void flow_control_new_consensus_params(const networkstatus_t *ns) { From 7005046bd27968069dd56cad3fc66d644c7f517b Mon Sep 17 00:00:00 2001 From: David Goulet Date: Mon, 4 Oct 2021 10:40:18 -0400 Subject: [PATCH 18/18] changes: Add file for ticket 40450 (prop324) Closes #40450 Signed-off-by: David Goulet --- changes/ticket40450 | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changes/ticket40450 diff --git a/changes/ticket40450 b/changes/ticket40450 new file mode 100644 index 0000000000..6753bd04f5 --- /dev/null +++ b/changes/ticket40450 @@ -0,0 +1,3 @@ + o Major features (congestion control): + - Implement support for flow control over congestion controlled circuits. + This work comes from proposal 324. Closes ticket 40450.