From 04409f202d2f6c9ceb3619aa8f19adb013336e5f Mon Sep 17 00:00:00 2001 From: Roger Dingledine Date: Wed, 13 Dec 2006 07:08:36 +0000 Subject: [PATCH] reenable write limiting. nick finally convinced me this was a smart move. more todo sub-items remain for it. svn:r9101 --- doc/TODO | 9 ++- src/or/connection.c | 151 ++++++++++++++++++++++++----------------- src/or/connection_or.c | 2 +- src/or/main.c | 2 +- src/or/or.h | 10 +-- 5 files changed, 103 insertions(+), 71 deletions(-) diff --git a/doc/TODO b/doc/TODO index ea40f01009..96f452f975 100644 --- a/doc/TODO +++ b/doc/TODO @@ -216,9 +216,12 @@ Deferred from 0.1.2.x: - Improvements to bandwidth counting R - look into "uncounting" bytes spent on local connections, so we can bandwidthrate but still have fast downloads. -R - "bandwidth classes", for incoming vs initiated-here conns. -d - Write limiting; separate token bucket for write - - Write-limit directory responses (need to research) +R - "bandwidth classes", for incoming vs initiated-here conns, + and to give dir conns lower priority. + . Write limiting; separate token bucket for write + - preemptively give a 503 to some dir requests + - per-conn write buckets + - separate config options for read vs write limiting - Directory guards - RAM use in directory authorities. - Memory use improvements: diff --git a/src/or/connection.c b/src/or/connection.c index ea7bc71706..7f3f34e8da 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -19,7 +19,7 @@ static connection_t *connection_create_listener(const char *listenaddress, static int connection_init_accepted_conn(connection_t *conn, uint8_t listener_type); static int connection_handle_listener_read(connection_t *conn, int new_type); -static int connection_receiver_bucket_should_increase(or_connection_t *conn); +static int connection_read_bucket_should_increase(or_connection_t *conn); static int connection_finished_flushing(connection_t *conn); static int connection_flushed_some(connection_t *conn); static int connection_finished_connecting(connection_t *conn); @@ -1103,59 +1103,55 @@ retry_all_listeners(int force, smartlist_t *replaced_conns, extern int global_read_bucket, global_write_bucket; -/** How many bytes at most can we read onto this connection? */ static int -connection_bucket_read_limit(connection_t *conn) +connection_bucket_round_robin(int base, int global_bucket, int conn_bucket) { int at_most; - int base = connection_speaks_cells(conn) ? - CELL_NETWORK_SIZE : RELAY_PAYLOAD_SIZE; /* Do a rudimentary round-robin so one circuit can't hog a connection. * Pick at most 32 cells, at least 4 cells if possible, and if we're in * the middle pick 1/8 of the available bandwidth. */ - at_most = global_read_bucket / 8; + at_most = global_bucket / 8; at_most -= (at_most % base); /* round down */ if (at_most > 32*base) /* 16 KB */ at_most = 32*base; else if (at_most < 4*base) /* 2 KB */ at_most = 4*base; - if (at_most > global_read_bucket) - at_most = global_read_bucket; + if (at_most > global_bucket) + at_most = global_bucket; - if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { - or_connection_t *or_conn = TO_OR_CONN(conn); - if (at_most > or_conn->receiver_bucket) - at_most = or_conn->receiver_bucket; - } + if (conn_bucket >= 0 && at_most > conn_bucket) + at_most = conn_bucket; if (at_most < 0) return 0; return at_most; } +/** How many bytes at most can we read onto this connection? */ +static int +connection_bucket_read_limit(connection_t *conn) +{ + int base = connection_speaks_cells(conn) ? + CELL_NETWORK_SIZE : RELAY_PAYLOAD_SIZE; + int conn_bucket = -1; + if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { + or_connection_t *or_conn = TO_OR_CONN(conn); + conn_bucket = or_conn->read_bucket; + } + return connection_bucket_round_robin(base, global_read_bucket, conn_bucket); +} + /** How many bytes at most can we write onto this connection? */ int connection_bucket_write_limit(connection_t *conn) { - unsigned int at_most; + int base = connection_speaks_cells(conn) ? + CELL_NETWORK_SIZE : RELAY_PAYLOAD_SIZE; - /* do a rudimentary round-robin so one circuit can't hog a connection */ - if (connection_speaks_cells(conn)) { - at_most = 32*(CELL_NETWORK_SIZE); - } else { - at_most = 32*(RELAY_PAYLOAD_SIZE); - } - - if (at_most > conn->outbuf_flushlen) - at_most = conn->outbuf_flushlen; - -#if 0 /* don't enable til we actually do write limiting */ - if (at_most > global_write_bucket) - at_most = global_write_bucket; -#endif - return at_most; + return connection_bucket_round_robin(base, global_write_bucket, + conn->outbuf_flushlen); } /** Return 1 if the global write bucket has no bytes in it, @@ -1172,31 +1168,56 @@ connection_read_bucket_decrement(connection_t *conn, int num_read) { global_read_bucket -= num_read; if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) { - TO_OR_CONN(conn)->receiver_bucket -= num_read; + TO_OR_CONN(conn)->read_bucket -= num_read; } } -/** If we have exhausted our global read bucket, or the read bucket for conn, +/** If we have exhausted our global buckets, or the buckets for conn, * stop reading. */ static void -connection_consider_empty_buckets(connection_t *conn) +connection_consider_empty_read_buckets(connection_t *conn) { if (global_read_bucket <= 0) { - LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,"global bucket exhausted. Pausing.")); + LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, + "global read bucket exhausted. Pausing.")); conn->wants_to_read = 1; connection_stop_reading(conn); return; } if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN && - TO_OR_CONN(conn)->receiver_bucket <= 0) { + TO_OR_CONN(conn)->read_bucket <= 0) { LOG_FN_CONN(conn, - (LOG_DEBUG,LD_NET,"receiver bucket exhausted. Pausing.")); + (LOG_DEBUG,LD_NET,"read bucket exhausted. Pausing.")); conn->wants_to_read = 1; connection_stop_reading(conn); } } +/** If we have exhausted our global buckets, or the buckets for conn, + * stop writing. */ +static void +connection_consider_empty_write_buckets(connection_t *conn) +{ + if (global_write_bucket <= 0) { + LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, + "global write bucket exhausted. Pausing.")); + conn->wants_to_write = 1; + connection_stop_writing(conn); + return; + } +#if 0 + if (connection_speaks_cells(conn) && + conn->state == OR_CONN_STATE_OPEN && + TO_OR_CONN(conn)->write_bucket <= 0) { + LOG_FN_CONN(conn, + (LOG_DEBUG,LD_NET,"write bucket exhausted. Pausing.")); + conn->wants_to_write = 1; + connection_stop_writing(conn); + } +#endif +} + /** Initialize the global read bucket to options->BandwidthBurst. */ void connection_bucket_init(void) @@ -1209,24 +1230,25 @@ connection_bucket_init(void) /** A second has rolled over; increment buckets appropriately. */ void -connection_bucket_refill(struct timeval *now) +connection_bucket_refill(int seconds_elapsed) { int i, n; connection_t *conn; connection_t **carray; or_options_t *options = get_options(); - /* Not used, but it should be! We might have rolled over more than one - * second! XXXX */ - (void) now; /* refill the global buckets */ if (global_read_bucket < (int)options->BandwidthBurst) { - global_read_bucket += (int)options->BandwidthRate; - log_debug(LD_NET,"global_read_bucket now %d.", global_read_bucket); + global_read_bucket += (int)options->BandwidthRate*seconds_elapsed; + if (global_read_bucket > (int)options->BandwidthBurst) + global_read_bucket = (int)options->BandwidthBurst; + log(LOG_DEBUG, LD_NET,"global_read_bucket now %d.", global_read_bucket); } if (global_write_bucket < (int)options->BandwidthBurst) { - global_write_bucket += (int)options->BandwidthRate; - log_debug(LD_NET,"global_write_bucket now %d.", global_write_bucket); + global_write_bucket += (int)options->BandwidthRate*seconds_elapsed; + if (global_write_bucket > (int)options->BandwidthBurst) + global_write_bucket = (int)options->BandwidthBurst; + log(LOG_DEBUG, LD_NET,"global_write_bucket now %d.", global_write_bucket); } /* refill the per-connection buckets */ @@ -1236,30 +1258,32 @@ connection_bucket_refill(struct timeval *now) if (connection_speaks_cells(conn)) { or_connection_t *or_conn = TO_OR_CONN(conn); - if (connection_receiver_bucket_should_increase(or_conn)) { - or_conn->receiver_bucket += or_conn->bandwidthrate; - if (or_conn->receiver_bucket > or_conn->bandwidthburst) - or_conn->receiver_bucket = or_conn->bandwidthburst; + if (connection_read_bucket_should_increase(or_conn)) { + or_conn->read_bucket += or_conn->bandwidthrate*seconds_elapsed; + if (or_conn->read_bucket > or_conn->bandwidthburst) + or_conn->read_bucket = or_conn->bandwidthburst; //log_fn(LOG_DEBUG,"Receiver bucket %d now %d.", i, - // conn->receiver_bucket); + // conn->read_bucket); } } if (conn->wants_to_read == 1 /* it's marked to turn reading back on now */ && global_read_bucket > 0 /* and we're allowed to read */ - && global_write_bucket > 0 /* and we're allowed to write (XXXX, - * not the best place to check this.) */ && (!connection_speaks_cells(conn) || conn->state != OR_CONN_STATE_OPEN || - TO_OR_CONN(conn)->receiver_bucket > 0)) { - /* and either a non-cell conn or a cell conn with non-empty bucket */ - LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,"waking up conn (fd %d)",conn->s)); + TO_OR_CONN(conn)->read_bucket > 0)) { + /* and either a non-cell conn or a cell conn with non-empty bucket */ + LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, + "waking up conn (fd %d) for read",conn->s)); conn->wants_to_read = 0; connection_start_reading(conn); - if (conn->wants_to_write == 1) { - conn->wants_to_write = 0; - connection_start_writing(conn); - } + } + if (conn->wants_to_write == 1 && + global_write_bucket > 0) { /* and we're allowed to write */ + LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET, + "waking up conn (fd %d) for write",conn->s)); + conn->wants_to_write = 0; + connection_start_writing(conn); } } } @@ -1268,13 +1292,13 @@ connection_bucket_refill(struct timeval *now) * should add another pile of tokens to it? */ static int -connection_receiver_bucket_should_increase(or_connection_t *conn) +connection_read_bucket_should_increase(or_connection_t *conn) { tor_assert(conn); if (conn->_base.state != OR_CONN_STATE_OPEN) return 0; /* only open connections play the rate limiting game */ - if (conn->receiver_bucket >= conn->bandwidthburst) + if (conn->read_bucket >= conn->bandwidthburst) return 0; return 1; @@ -1470,7 +1494,7 @@ connection_read_to_buf(connection_t *conn, int *max_to_read) /* Call even if result is 0, since the global read bucket may * have reached 0 on a different conn, and this guy needs to * know to stop reading. */ - connection_consider_empty_buckets(conn); + connection_consider_empty_read_buckets(conn); return 0; } @@ -1644,8 +1668,13 @@ connection_handle_write(connection_t *conn) /* already marked */ return -1; } + return 0; } + /* Call even if result is 0, since the global write bucket may + * have reached 0 on a different conn, and this guy needs to + * know to stop writing. */ + connection_consider_empty_write_buckets(conn); return 0; } @@ -2267,7 +2296,7 @@ assert_connection_ok(connection_t *conn, time_t now) * gave a bad cert/etc, then we won't have assigned bandwidth, * yet it will be open. -RD */ -// tor_assert(conn->receiver_bucket >= 0); +// tor_assert(conn->read_bucket >= 0); } // tor_assert(conn->addr && conn->port); tor_assert(conn->address); diff --git a/src/or/connection_or.c b/src/or/connection_or.c index 7f6f2be958..6518be5753 100644 --- a/src/or/connection_or.c +++ b/src/or/connection_or.c @@ -315,7 +315,7 @@ connection_or_init_conn_from_address(or_connection_t *conn, or_options_t *options = get_options(); routerinfo_t *r = router_get_by_digest(id_digest); conn->bandwidthrate = (int)options->BandwidthRate; - conn->receiver_bucket = conn->bandwidthburst = (int)options->BandwidthBurst; + conn->read_bucket = conn->bandwidthburst = (int)options->BandwidthBurst; connection_or_set_identity_digest(conn, id_digest); conn->_base.addr = addr; conn->_base.port = port; diff --git a/src/or/main.c b/src/or/main.c index 163cf188da..d134bb3928 100644 --- a/src/or/main.c +++ b/src/or/main.c @@ -994,7 +994,7 @@ second_elapsed_callback(int fd, short event, void *args) accounting_add_bytes(bytes_read, bytes_written, seconds_elapsed); control_event_bandwidth_used((uint32_t)bytes_read,(uint32_t)bytes_written); - connection_bucket_refill(&now); + connection_bucket_refill(seconds_elapsed); stats_prev_global_read_bucket = global_read_bucket; stats_prev_global_write_bucket = global_write_bucket; diff --git a/src/or/or.h b/src/or/or.h index 2bcc9fdca2..78355b3071 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -723,12 +723,12 @@ typedef struct or_connection_t { time_t timestamp_lastempty; /**< When was the outbuf last completely empty?*/ - /* bandwidth* and receiver_bucket only used by ORs in OPEN state: */ + /* bandwidth* and read_bucket only used by ORs in OPEN state: */ int bandwidthrate; /**< Bytes/s added to the bucket. (OPEN ORs only.) */ int bandwidthburst; /**< Max bucket size for this conn. (OPEN ORs only.) */ - int receiver_bucket; /**< When this hits 0, stop receiving. Every second we - * add 'bandwidthrate' to this, capping it at - * bandwidthburst. (OPEN ORs only) */ + int read_bucket; /**< When this hits 0, stop receiving. Every second we + * add 'bandwidthrate' to this, capping it at + * bandwidthburst. (OPEN ORs only) */ circ_id_type_t circ_id_type; /**< When we send CREATE cells along this * connection, which half of the space should * we use? */ @@ -1968,7 +1968,7 @@ int retry_all_listeners(int force, smartlist_t *replaced_conns, int connection_bucket_write_limit(connection_t *conn); int global_write_bucket_empty(void); void connection_bucket_init(void); -void connection_bucket_refill(struct timeval *now); +void connection_bucket_refill(int seconds_elapsed); int connection_handle_read(connection_t *conn);