Refactor the directory spool implementation

The old implementation had duplicated code in a bunch of places, and
it interspersed spool-management with resource management.  The new
implementation should make it easier to add new resource types and
maintain the spooling code.

Closing ticket 21651.
This commit is contained in:
Nick Mathewson 2017-03-13 15:38:20 -04:00
parent 8587f663ee
commit 8aa69a1b69
10 changed files with 657 additions and 528 deletions

View File

@ -0,0 +1,7 @@
o Code simplification and refactoring:
- The logic that directory caches use to spool request to clients,
serving them one part at a time so as not to allocate too much memory,
has been refactored for consistency. Previously there was a separate
spooling implementation per type of spoolable data. Now there
is one common spooling implementation, with extensible data types.
Closes ticket 21651.

View File

@ -629,12 +629,12 @@ connection_free_(connection_t *conn)
tor_free(dir_conn->requested_resource); tor_free(dir_conn->requested_resource);
tor_zlib_free(dir_conn->zlib_state); tor_zlib_free(dir_conn->zlib_state);
if (dir_conn->fingerprint_stack) { if (dir_conn->spool) {
SMARTLIST_FOREACH(dir_conn->fingerprint_stack, char *, cp, tor_free(cp)); SMARTLIST_FOREACH(dir_conn->spool, spooled_resource_t *, spooled,
smartlist_free(dir_conn->fingerprint_stack); spooled_resource_free(spooled));
smartlist_free(dir_conn->spool);
} }
cached_dir_decref(dir_conn->cached_dir);
rend_data_free(dir_conn->rend_data); rend_data_free(dir_conn->rend_data);
if (dir_conn->guard_state) { if (dir_conn->guard_state) {
/* Cancel before freeing, if it's still there. */ /* Cancel before freeing, if it's still there. */

View File

@ -1368,7 +1368,7 @@ directory_initiate_command_rend(const tor_addr_port_t *or_addr_port,
/** Return true iff anything we say on <b>conn</b> is being encrypted before /** Return true iff anything we say on <b>conn</b> is being encrypted before
* we send it to the client/server. */ * we send it to the client/server. */
int int
connection_dir_is_encrypted(dir_connection_t *conn) connection_dir_is_encrypted(const dir_connection_t *conn)
{ {
/* Right now it's sufficient to see if conn is or has been linked, since /* Right now it's sufficient to see if conn is or has been linked, since
* the only thing it could be linked to is an edge connection on a * the only thing it could be linked to is an edge connection on a
@ -3061,10 +3061,10 @@ handle_get_current_consensus(dir_connection_t *conn,
const char *url = args->url; const char *url = args->url;
const int compressed = args->compressed; const int compressed = args->compressed;
const time_t if_modified_since = args->if_modified_since; const time_t if_modified_since = args->if_modified_since;
int clear_spool = 0;
{ {
/* v3 network status fetch. */ /* v3 network status fetch. */
smartlist_t *dir_fps = smartlist_new();
long lifetime = NETWORKSTATUS_CACHE_LIFETIME; long lifetime = NETWORKSTATUS_CACHE_LIFETIME;
networkstatus_t *v; networkstatus_t *v;
@ -3098,7 +3098,6 @@ handle_get_current_consensus(dir_connection_t *conn,
if (v && !networkstatus_consensus_reasonably_live(v, now)) { if (v && !networkstatus_consensus_reasonably_live(v, now)) {
write_http_status_line(conn, 404, "Consensus is too old"); write_http_status_line(conn, 404, "Consensus is too old");
warn_consensus_is_too_old(v, flavor, now); warn_consensus_is_too_old(v, flavor, now);
smartlist_free(dir_fps);
geoip_note_ns_response(GEOIP_REJECT_NOT_FOUND); geoip_note_ns_response(GEOIP_REJECT_NOT_FOUND);
tor_free(flavor); tor_free(flavor);
goto done; goto done;
@ -3108,51 +3107,54 @@ handle_get_current_consensus(dir_connection_t *conn,
!client_likes_consensus(v, want_fps)) { !client_likes_consensus(v, want_fps)) {
write_http_status_line(conn, 404, "Consensus not signed by sufficient " write_http_status_line(conn, 404, "Consensus not signed by sufficient "
"number of requested authorities"); "number of requested authorities");
smartlist_free(dir_fps);
geoip_note_ns_response(GEOIP_REJECT_NOT_ENOUGH_SIGS); geoip_note_ns_response(GEOIP_REJECT_NOT_ENOUGH_SIGS);
tor_free(flavor); tor_free(flavor);
goto done; goto done;
} }
conn->spool = smartlist_new();
clear_spool = 1;
{ {
char *fp = tor_malloc_zero(DIGEST_LEN); spooled_resource_t *spooled;
if (flavor) if (flavor)
strlcpy(fp, flavor, DIGEST_LEN); spooled = spooled_resource_new(DIR_SPOOL_NETWORKSTATUS,
(uint8_t*)flavor, strlen(flavor));
else
spooled = spooled_resource_new(DIR_SPOOL_NETWORKSTATUS,
NULL, 0);
tor_free(flavor); tor_free(flavor);
smartlist_add(dir_fps, fp); smartlist_add(conn->spool, spooled);
} }
lifetime = (v && v->fresh_until > now) ? v->fresh_until - now : 0; lifetime = (v && v->fresh_until > now) ? v->fresh_until - now : 0;
if (!smartlist_len(dir_fps)) { /* we failed to create/cache cp */ if (!smartlist_len(conn->spool)) { /* we failed to create/cache cp */
write_http_status_line(conn, 503, "Network status object unavailable"); write_http_status_line(conn, 503, "Network status object unavailable");
smartlist_free(dir_fps);
geoip_note_ns_response(GEOIP_REJECT_UNAVAILABLE); geoip_note_ns_response(GEOIP_REJECT_UNAVAILABLE);
goto done; goto done;
} }
if (!dirserv_remove_old_statuses(dir_fps, if_modified_since)) { size_t size_guess = 0;
int n_expired = 0;
dirserv_spool_remove_missing_and_guess_size(conn, if_modified_since,
compressed,
&size_guess,
&n_expired);
if (!smartlist_len(conn->spool) && !n_expired) {
write_http_status_line(conn, 404, "Not found"); write_http_status_line(conn, 404, "Not found");
SMARTLIST_FOREACH(dir_fps, char *, cp, tor_free(cp));
smartlist_free(dir_fps);
geoip_note_ns_response(GEOIP_REJECT_NOT_FOUND); geoip_note_ns_response(GEOIP_REJECT_NOT_FOUND);
goto done; goto done;
} else if (!smartlist_len(dir_fps)) { } else if (!smartlist_len(conn->spool)) {
write_http_status_line(conn, 304, "Not modified"); write_http_status_line(conn, 304, "Not modified");
SMARTLIST_FOREACH(dir_fps, char *, cp, tor_free(cp));
smartlist_free(dir_fps);
geoip_note_ns_response(GEOIP_REJECT_NOT_MODIFIED); geoip_note_ns_response(GEOIP_REJECT_NOT_MODIFIED);
goto done; goto done;
} }
size_t dlen = dirserv_estimate_data_size(dir_fps, 0, compressed); if (global_write_bucket_low(TO_CONN(conn), size_guess, 2)) {
if (global_write_bucket_low(TO_CONN(conn), dlen, 2)) {
log_debug(LD_DIRSERV, log_debug(LD_DIRSERV,
"Client asked for network status lists, but we've been " "Client asked for network status lists, but we've been "
"writing too many bytes lately. Sending 503 Dir busy."); "writing too many bytes lately. Sending 503 Dir busy.");
write_http_status_line(conn, 503, "Directory busy, try again later"); write_http_status_line(conn, 503, "Directory busy, try again later");
SMARTLIST_FOREACH(dir_fps, char *, fp, tor_free(fp));
smartlist_free(dir_fps);
geoip_note_ns_response(GEOIP_REJECT_BUSY); geoip_note_ns_response(GEOIP_REJECT_BUSY);
goto done; goto done;
} }
@ -3166,25 +3168,27 @@ handle_get_current_consensus(dir_connection_t *conn,
/* Note that a request for a network status has started, so that we /* Note that a request for a network status has started, so that we
* can measure the download time later on. */ * can measure the download time later on. */
if (conn->dirreq_id) if (conn->dirreq_id)
geoip_start_dirreq(conn->dirreq_id, dlen, DIRREQ_TUNNELED); geoip_start_dirreq(conn->dirreq_id, size_guess, DIRREQ_TUNNELED);
else else
geoip_start_dirreq(TO_CONN(conn)->global_identifier, dlen, geoip_start_dirreq(TO_CONN(conn)->global_identifier, size_guess,
DIRREQ_DIRECT); DIRREQ_DIRECT);
} }
clear_spool = 0;
write_http_response_header(conn, -1, compressed, write_http_response_header(conn, -1, compressed,
smartlist_len(dir_fps) == 1 ? lifetime : 0); smartlist_len(conn->spool) == 1 ? lifetime : 0);
conn->fingerprint_stack = dir_fps;
if (! compressed) if (! compressed)
conn->zlib_state = tor_zlib_new(0, ZLIB_METHOD, HIGH_COMPRESSION); conn->zlib_state = tor_zlib_new(0, ZLIB_METHOD, HIGH_COMPRESSION);
/* Prime the connection with some data. */ /* Prime the connection with some data. */
conn->dir_spool_src = DIR_SPOOL_NETWORKSTATUS;
connection_dirserv_flushed_some(conn); connection_dirserv_flushed_some(conn);
goto done; goto done;
} }
done: done:
if (clear_spool) {
dir_conn_clear_spool(conn);
}
return 0; return 0;
} }
@ -3302,43 +3306,45 @@ handle_get_microdesc(dir_connection_t *conn, const get_handler_args_t *args)
{ {
const char *url = args->url; const char *url = args->url;
const int compressed = args->compressed; const int compressed = args->compressed;
int clear_spool = 1;
{ {
smartlist_t *fps = smartlist_new(); conn->spool = smartlist_new();
dir_split_resource_into_fingerprints(url+strlen("/tor/micro/d/"), dir_split_resource_into_spoolable(url+strlen("/tor/micro/d/"),
fps, NULL, DIR_SPOOL_MICRODESC,
conn->spool, NULL,
DSR_DIGEST256|DSR_BASE64|DSR_SORT_UNIQ); DSR_DIGEST256|DSR_BASE64|DSR_SORT_UNIQ);
if (!dirserv_have_any_microdesc(fps)) { size_t size_guess = 0;
dirserv_spool_remove_missing_and_guess_size(conn, 0, compressed,
&size_guess, NULL);
if (smartlist_len(conn->spool) == 0) {
write_http_status_line(conn, 404, "Not found"); write_http_status_line(conn, 404, "Not found");
SMARTLIST_FOREACH(fps, char *, fp, tor_free(fp));
smartlist_free(fps);
goto done; goto done;
} }
size_t dlen = dirserv_estimate_microdesc_size(fps, compressed); if (global_write_bucket_low(TO_CONN(conn), size_guess, 2)) {
if (global_write_bucket_low(TO_CONN(conn), dlen, 2)) {
log_info(LD_DIRSERV, log_info(LD_DIRSERV,
"Client asked for server descriptors, but we've been " "Client asked for server descriptors, but we've been "
"writing too many bytes lately. Sending 503 Dir busy."); "writing too many bytes lately. Sending 503 Dir busy.");
write_http_status_line(conn, 503, "Directory busy, try again later"); write_http_status_line(conn, 503, "Directory busy, try again later");
SMARTLIST_FOREACH(fps, char *, fp, tor_free(fp));
smartlist_free(fps);
goto done; goto done;
} }
clear_spool = 0;
write_http_response_header(conn, -1, compressed, MICRODESC_CACHE_LIFETIME); write_http_response_header(conn, -1, compressed, MICRODESC_CACHE_LIFETIME);
conn->dir_spool_src = DIR_SPOOL_MICRODESC;
conn->fingerprint_stack = fps;
if (compressed) if (compressed)
conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD, conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD,
choose_compression_level(dlen)); choose_compression_level(size_guess));
connection_dirserv_flushed_some(conn); connection_dirserv_flushed_some(conn);
goto done; goto done;
} }
done: done:
if (clear_spool) {
dir_conn_clear_spool(conn);
}
return 0; return 0;
} }
@ -3350,69 +3356,88 @@ handle_get_descriptor(dir_connection_t *conn, const get_handler_args_t *args)
const char *url = args->url; const char *url = args->url;
const int compressed = args->compressed; const int compressed = args->compressed;
const or_options_t *options = get_options(); const or_options_t *options = get_options();
int clear_spool = 1;
if (!strcmpstart(url,"/tor/server/") || if (!strcmpstart(url,"/tor/server/") ||
(!options->BridgeAuthoritativeDir && (!options->BridgeAuthoritativeDir &&
!options->BridgeRelay && !strcmpstart(url,"/tor/extra/"))) { !options->BridgeRelay && !strcmpstart(url,"/tor/extra/"))) {
size_t dlen;
int res; int res;
const char *msg; const char *msg = NULL;
int cache_lifetime = 0; int cache_lifetime = 0;
int is_extra = !strcmpstart(url,"/tor/extra/"); int is_extra = !strcmpstart(url,"/tor/extra/");
url += is_extra ? strlen("/tor/extra/") : strlen("/tor/server/"); url += is_extra ? strlen("/tor/extra/") : strlen("/tor/server/");
conn->fingerprint_stack = smartlist_new(); dir_spool_source_t source;
res = dirserv_get_routerdesc_fingerprints(conn->fingerprint_stack, url, time_t publish_cutoff = 0;
&msg, if (!strcmpstart(url, "d/")) {
!connection_dir_is_encrypted(conn), source =
is_extra);
if (!strcmpstart(url, "fp/")) {
if (smartlist_len(conn->fingerprint_stack) == 1)
cache_lifetime = ROUTERDESC_CACHE_LIFETIME;
} else if (!strcmpstart(url, "authority")) {
cache_lifetime = ROUTERDESC_CACHE_LIFETIME;
} else if (!strcmpstart(url, "all")) {
cache_lifetime = FULL_DIR_CACHE_LIFETIME;
} else if (!strcmpstart(url, "d/")) {
if (smartlist_len(conn->fingerprint_stack) == 1)
cache_lifetime = ROUTERDESC_BY_DIGEST_CACHE_LIFETIME;
}
if (!strcmpstart(url, "d/"))
conn->dir_spool_src =
is_extra ? DIR_SPOOL_EXTRA_BY_DIGEST : DIR_SPOOL_SERVER_BY_DIGEST; is_extra ? DIR_SPOOL_EXTRA_BY_DIGEST : DIR_SPOOL_SERVER_BY_DIGEST;
else } else {
conn->dir_spool_src = source =
is_extra ? DIR_SPOOL_EXTRA_BY_FP : DIR_SPOOL_SERVER_BY_FP; is_extra ? DIR_SPOOL_EXTRA_BY_FP : DIR_SPOOL_SERVER_BY_FP;
/* We only want to apply a publish cutoff when we're requesting
if (!dirserv_have_any_serverdesc(conn->fingerprint_stack, * resources by fingerprint. */
conn->dir_spool_src)) { publish_cutoff = time(NULL) - ROUTER_MAX_AGE_TO_PUBLISH;
res = -1;
msg = "Not found";
} }
if (res < 0) conn->spool = smartlist_new();
res = dirserv_get_routerdesc_spool(conn->spool, url,
source,
connection_dir_is_encrypted(conn),
&msg);
if (!strcmpstart(url, "all")) {
cache_lifetime = FULL_DIR_CACHE_LIFETIME;
} else if (smartlist_len(conn->spool) == 1) {
cache_lifetime = ROUTERDESC_BY_DIGEST_CACHE_LIFETIME;
}
size_t size_guess = 0;
int n_expired = 0;
dirserv_spool_remove_missing_and_guess_size(conn, publish_cutoff,
compressed, &size_guess,
&n_expired);
/* If we are the bridge authority and the descriptor is a bridge
* descriptor, remember that we served this descriptor for desc stats. */
/* XXXX it's a bit of a kludge to have this here. */
if (get_options()->BridgeAuthoritativeDir &&
source == DIR_SPOOL_SERVER_BY_FP) {
SMARTLIST_FOREACH_BEGIN(conn->spool, spooled_resource_t *, spooled) {
const routerinfo_t *router =
router_get_by_id_digest((const char *)spooled->digest);
/* router can be NULL here when the bridge auth is asked for its own
* descriptor. */
if (router && router->purpose == ROUTER_PURPOSE_BRIDGE)
rep_hist_note_desc_served(router->cache_info.identity_digest);
} SMARTLIST_FOREACH_END(spooled);
}
if (res < 0 || size_guess == 0 || smartlist_len(conn->spool) == 0) {
if (msg == NULL)
msg = "Not found";
write_http_status_line(conn, 404, msg); write_http_status_line(conn, 404, msg);
else { } else {
dlen = dirserv_estimate_data_size(conn->fingerprint_stack, if (global_write_bucket_low(TO_CONN(conn), size_guess, 2)) {
1, compressed);
if (global_write_bucket_low(TO_CONN(conn), dlen, 2)) {
log_info(LD_DIRSERV, log_info(LD_DIRSERV,
"Client asked for server descriptors, but we've been " "Client asked for server descriptors, but we've been "
"writing too many bytes lately. Sending 503 Dir busy."); "writing too many bytes lately. Sending 503 Dir busy.");
write_http_status_line(conn, 503, "Directory busy, try again later"); write_http_status_line(conn, 503, "Directory busy, try again later");
conn->dir_spool_src = DIR_SPOOL_NONE; dir_conn_clear_spool(conn);
goto done; goto done;
} }
write_http_response_header(conn, -1, compressed, cache_lifetime); write_http_response_header(conn, -1, compressed, cache_lifetime);
if (compressed) if (compressed)
conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD, conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD,
choose_compression_level(dlen)); choose_compression_level(size_guess));
clear_spool = 0;
/* Prime the connection with some data. */ /* Prime the connection with some data. */
connection_dirserv_flushed_some(conn); connection_dirserv_flushed_some(conn);
} }
goto done; goto done;
} }
done: done:
return 0; if (clear_spool)
dir_conn_clear_spool(conn);
return 0;
} }
/** Helper function for GET /tor/keys/... /** Helper function for GET /tor/keys/...
@ -3936,7 +3961,7 @@ connection_dir_finished_flushing(dir_connection_t *conn)
conn->base_.state = DIR_CONN_STATE_CLIENT_READING; conn->base_.state = DIR_CONN_STATE_CLIENT_READING;
return 0; return 0;
case DIR_CONN_STATE_SERVER_WRITING: case DIR_CONN_STATE_SERVER_WRITING:
if (conn->dir_spool_src != DIR_SPOOL_NONE) { if (conn->spool) {
log_warn(LD_BUG, "Emptied a dirserv buffer, but it's still spooling!"); log_warn(LD_BUG, "Emptied a dirserv buffer, but it's still spooling!");
connection_mark_for_close(TO_CONN(conn)); connection_mark_for_close(TO_CONN(conn));
} else { } else {
@ -4619,3 +4644,34 @@ dir_split_resource_into_fingerprints(const char *resource,
return 0; return 0;
} }
/** As dir_split_resource_into_fingerprints, but instead fills
* <b>spool_out</b> with a list of spoolable_resource_t for the resource
* identified through <b>source</b>. */
int
dir_split_resource_into_spoolable(const char *resource,
dir_spool_source_t source,
smartlist_t *spool_out,
int *compressed_out,
int flags)
{
smartlist_t *fingerprints = smartlist_new();
tor_assert(flags & (DSR_HEX|DSR_BASE64));
const size_t digest_len =
(flags & DSR_DIGEST256) ? DIGEST256_LEN : DIGEST_LEN;
int r = dir_split_resource_into_fingerprints(resource, fingerprints,
compressed_out, flags);
/* This is not a very efficient implementation XXXX */
SMARTLIST_FOREACH_BEGIN(fingerprints, uint8_t *, digest) {
spooled_resource_t *spooled =
spooled_resource_new(source, digest, digest_len);
if (spooled)
smartlist_add(spool_out, spooled);
tor_free(digest);
} SMARTLIST_FOREACH_END(digest);
smartlist_free(fingerprints);
return r;
}

View File

@ -66,7 +66,7 @@ void directory_initiate_command_routerstatus_rend(const routerstatus_t *status,
int parse_http_response(const char *headers, int *code, time_t *date, int parse_http_response(const char *headers, int *code, time_t *date,
compress_method_t *compression, char **response); compress_method_t *compression, char **response);
int connection_dir_is_encrypted(dir_connection_t *conn); int connection_dir_is_encrypted(const dir_connection_t *conn);
int connection_dir_reached_eof(dir_connection_t *conn); int connection_dir_reached_eof(dir_connection_t *conn);
int connection_dir_process_inbuf(dir_connection_t *conn); int connection_dir_process_inbuf(dir_connection_t *conn);
int connection_dir_finished_flushing(dir_connection_t *conn); int connection_dir_finished_flushing(dir_connection_t *conn);
@ -88,7 +88,12 @@ void directory_initiate_command(const tor_addr_t *or_addr, uint16_t or_port,
int dir_split_resource_into_fingerprints(const char *resource, int dir_split_resource_into_fingerprints(const char *resource,
smartlist_t *fp_out, int *compressed_out, smartlist_t *fp_out, int *compressed_out,
int flags); int flags);
enum dir_spool_source_t;
int dir_split_resource_into_spoolable(const char *resource,
enum dir_spool_source_t source,
smartlist_t *spool_out,
int *compressed_out,
int flags);
int dir_split_resource_into_fingerprint_pairs(const char *res, int dir_split_resource_into_fingerprint_pairs(const char *res,
smartlist_t *pairs_out); smartlist_t *pairs_out);
char *directory_dump_request_log(void); char *directory_dump_request_log(void);

View File

@ -81,14 +81,23 @@ dirserv_get_status_impl(const char *fp, const char *nickname,
int severity); int severity);
static void clear_cached_dir(cached_dir_t *d); static void clear_cached_dir(cached_dir_t *d);
static const signed_descriptor_t *get_signed_descriptor_by_fp( static const signed_descriptor_t *get_signed_descriptor_by_fp(
const char *fp, const uint8_t *fp,
int extrainfo, int extrainfo);
time_t publish_cutoff);
static was_router_added_t dirserv_add_extrainfo(extrainfo_t *ei, static was_router_added_t dirserv_add_extrainfo(extrainfo_t *ei,
const char **msg); const char **msg);
static uint32_t dirserv_get_bandwidth_for_router_kb(const routerinfo_t *ri); static uint32_t dirserv_get_bandwidth_for_router_kb(const routerinfo_t *ri);
static uint32_t dirserv_get_credible_bandwidth_kb(const routerinfo_t *ri); static uint32_t dirserv_get_credible_bandwidth_kb(const routerinfo_t *ri);
static int spooled_resource_lookup_body(const spooled_resource_t *spooled,
int conn_is_encrypted,
const uint8_t **body_out,
size_t *size_out,
time_t *published_out);
static cached_dir_t *spooled_resource_lookup_cached_dir(
const spooled_resource_t *spooled,
time_t *published_out);
static cached_dir_t *lookup_cached_dir_by_fp(const uint8_t *fp);
/************** Fingerprint handling code ************/ /************** Fingerprint handling code ************/
/* 1 Historically used to indicate Named */ /* 1 Historically used to indicate Named */
@ -3037,58 +3046,61 @@ dirserv_generate_networkstatus_vote_obj(crypto_pk_t *private_key,
* requests, adds identity digests. * requests, adds identity digests.
*/ */
int int
dirserv_get_routerdesc_fingerprints(smartlist_t *fps_out, const char *key, dirserv_get_routerdesc_spool(smartlist_t *spool_out,
const char **msg, int for_unencrypted_conn, const char *key,
int is_extrainfo) dir_spool_source_t source,
int conn_is_encrypted,
const char **msg_out)
{ {
int by_id = 1; *msg_out = NULL;
*msg = NULL;
if (!strcmp(key, "all")) { if (!strcmp(key, "all")) {
routerlist_t *rl = router_get_routerlist(); const routerlist_t *rl = router_get_routerlist();
SMARTLIST_FOREACH(rl->routers, routerinfo_t *, r, SMARTLIST_FOREACH_BEGIN(rl->routers, const routerinfo_t *, r) {
smartlist_add(fps_out, spooled_resource_t *spooled;
tor_memdup(r->cache_info.identity_digest, DIGEST_LEN))); spooled = spooled_resource_new(source,
/* Treat "all" requests as if they were unencrypted */ (const uint8_t *)r->cache_info.identity_digest,
for_unencrypted_conn = 1; DIGEST_LEN);
/* Treat "all" requests as if they were unencrypted */
conn_is_encrypted = 0;
smartlist_add(spool_out, spooled);
} SMARTLIST_FOREACH_END(r);
} else if (!strcmp(key, "authority")) { } else if (!strcmp(key, "authority")) {
const routerinfo_t *ri = router_get_my_routerinfo(); const routerinfo_t *ri = router_get_my_routerinfo();
if (ri) if (ri)
smartlist_add(fps_out, smartlist_add(spool_out,
tor_memdup(ri->cache_info.identity_digest, DIGEST_LEN)); spooled_resource_new(source,
(const uint8_t *)ri->cache_info.identity_digest,
DIGEST_LEN));
} else if (!strcmpstart(key, "d/")) { } else if (!strcmpstart(key, "d/")) {
by_id = 0;
key += strlen("d/"); key += strlen("d/");
dir_split_resource_into_fingerprints(key, fps_out, NULL, dir_split_resource_into_spoolable(key, source, spool_out, NULL,
DSR_HEX|DSR_SORT_UNIQ); DSR_HEX|DSR_SORT_UNIQ);
} else if (!strcmpstart(key, "fp/")) { } else if (!strcmpstart(key, "fp/")) {
key += strlen("fp/"); key += strlen("fp/");
dir_split_resource_into_fingerprints(key, fps_out, NULL, dir_split_resource_into_spoolable(key, source, spool_out, NULL,
DSR_HEX|DSR_SORT_UNIQ); DSR_HEX|DSR_SORT_UNIQ);
} else { } else {
*msg = "Key not recognized"; *msg_out = "Not found";
return -1; return -1;
} }
if (for_unencrypted_conn) { if (! conn_is_encrypted) {
/* Remove anything that insists it not be sent unencrypted. */ /* Remove anything that insists it not be sent unencrypted. */
SMARTLIST_FOREACH_BEGIN(fps_out, char *, cp) { SMARTLIST_FOREACH_BEGIN(spool_out, spooled_resource_t *, spooled) {
const signed_descriptor_t *sd; const uint8_t *body = NULL;
if (by_id) size_t bodylen = 0;
sd = get_signed_descriptor_by_fp(cp,is_extrainfo,0); int r = spooled_resource_lookup_body(spooled, conn_is_encrypted,
else if (is_extrainfo) &body, &bodylen, NULL);
sd = extrainfo_get_by_descriptor_digest(cp); if (r < 0 || body == NULL || bodylen == 0) {
else SMARTLIST_DEL_CURRENT(spool_out, spooled);
sd = router_get_by_descriptor_digest(cp); spooled_resource_free(spooled);
if (sd && !sd->send_unencrypted) { }
tor_free(cp); } SMARTLIST_FOREACH_END(spooled);
SMARTLIST_DEL_CURRENT(fps_out, cp);
}
} SMARTLIST_FOREACH_END(cp);
} }
if (!smartlist_len(fps_out)) { if (!smartlist_len(spool_out)) {
*msg = "Servers unavailable"; *msg_out = "Servers unavailable";
return -1; return -1;
} }
return 0; return 0;
@ -3352,45 +3364,351 @@ dirserv_test_reachability(time_t now)
ctr = (ctr + 1) % REACHABILITY_MODULO_PER_TEST; /* increment ctr */ ctr = (ctr + 1) % REACHABILITY_MODULO_PER_TEST; /* increment ctr */
} }
/* ==========
* Spooling code.
* ========== */
spooled_resource_t *
spooled_resource_new(dir_spool_source_t source,
const uint8_t *digest, size_t digestlen)
{
spooled_resource_t *spooled = tor_malloc_zero(sizeof(spooled_resource_t));
spooled->spool_source = source;
switch (source) {
case DIR_SPOOL_NETWORKSTATUS:
spooled->spool_eagerly = 0;
break;
case DIR_SPOOL_SERVER_BY_DIGEST:
case DIR_SPOOL_SERVER_BY_FP:
case DIR_SPOOL_EXTRA_BY_DIGEST:
case DIR_SPOOL_EXTRA_BY_FP:
case DIR_SPOOL_MICRODESC:
default:
spooled->spool_eagerly = 1;
break;
}
tor_assert(digestlen <= sizeof(spooled->digest));
if (digest)
memcpy(spooled->digest, digest, digestlen);
return spooled;
}
/** Release all storage held by <b>spooled</b>. */
void
spooled_resource_free(spooled_resource_t *spooled)
{
if (spooled == NULL)
return;
if (spooled->cached_dir_ref) {
cached_dir_decref(spooled->cached_dir_ref);
}
tor_free(spooled);
}
/** When spooling data from a cached_dir_t object, we always add
* at least this much. */
#define DIRSERV_CACHED_DIR_CHUNK_SIZE 8192
/** Return an compression ratio for compressing objects from <b>source</b>.
*/
static double
estimate_compression_ratio(dir_spool_source_t source)
{
/* We should put in better estimates here, depending on the number of
objects and their type */
(void) source;
return 0.5;
}
/** Return an estimated number of bytes needed for transmitting the
* resource in <b>spooled</b> on <b>conn</b>
*
* As a convenient side-effect, set *<b>published_out</b> to the resource's
* publication time.
*/
static size_t
spooled_resource_estimate_size(const spooled_resource_t *spooled,
dir_connection_t *conn,
int compressed,
time_t *published_out)
{
if (spooled->spool_eagerly) {
const uint8_t *body = NULL;
size_t bodylen = 0;
int r = spooled_resource_lookup_body(spooled,
connection_dir_is_encrypted(conn),
&body, &bodylen,
published_out);
if (r == -1 || body == NULL || bodylen == 0)
return 0;
if (compressed) {
double ratio = estimate_compression_ratio(spooled->spool_source);
bodylen = (size_t)(bodylen * ratio);
}
return bodylen;
} else {
cached_dir_t *cached;
if (spooled->cached_dir_ref) {
cached = spooled->cached_dir_ref;
} else {
cached = spooled_resource_lookup_cached_dir(spooled,
published_out);
}
if (cached == NULL) {
return 0;
}
size_t result = compressed ? cached->dir_z_len : cached->dir_len;
return result;
}
}
/** Return code for spooled_resource_flush_some */
typedef enum {
SRFS_ERR = -1,
SRFS_MORE = 0,
SRFS_DONE
} spooled_resource_flush_status_t;
/** Flush some or all of the bytes from <b>spooled</b> onto <b>conn</b>.
* Return SRFS_ERR on error, SRFS_MORE if there are more bytes to flush from
* this spooled resource, or SRFS_DONE if we are done flushing this spooled
* resource.
*/
static spooled_resource_flush_status_t
spooled_resource_flush_some(spooled_resource_t *spooled,
dir_connection_t *conn)
{
if (spooled->spool_eagerly) {
/* Spool_eagerly resources are sent all-at-once. */
const uint8_t *body = NULL;
size_t bodylen = 0;
int r = spooled_resource_lookup_body(spooled,
connection_dir_is_encrypted(conn),
&body, &bodylen, NULL);
if (r == -1 || body == NULL || bodylen == 0) {
/* Absent objects count as "done". */
return SRFS_DONE;
}
if (conn->zlib_state) {
connection_write_to_buf_zlib((const char*)body, bodylen, conn, 0);
} else {
connection_write_to_buf((const char*)body, bodylen, TO_CONN(conn));
}
return SRFS_DONE;
} else {
cached_dir_t *cached = spooled->cached_dir_ref;
if (cached == NULL) {
/* The cached_dir_t hasn't been materialized yet. So let's look it up. */
cached = spooled->cached_dir_ref =
spooled_resource_lookup_cached_dir(spooled, NULL);
if (!cached) {
/* Absent objects count as done. */
return SRFS_DONE;
}
++cached->refcnt;
tor_assert_nonfatal(spooled->cached_dir_offset == 0);
}
/* How many bytes left to flush? */
int64_t remaining = 0;
remaining = cached->dir_z_len - spooled->cached_dir_offset;
if (BUG(remaining < 0))
return SRFS_ERR;
ssize_t bytes = MIN(DIRSERV_CACHED_DIR_CHUNK_SIZE, remaining);
if (conn->zlib_state) {
connection_write_to_buf_zlib(cached->dir_z + spooled->cached_dir_offset,
bytes, conn, 0);
} else {
connection_write_to_buf(cached->dir_z + spooled->cached_dir_offset,
bytes, TO_CONN(conn));
}
spooled->cached_dir_offset += bytes;
if (spooled->cached_dir_offset >= (off_t)cached->dir_z_len) {
return SRFS_DONE;
} else {
return SRFS_MORE;
}
}
}
/** Helper: find the cached_dir_t for a spooled_resource_t, for
* sending it to <b>conn</b>. Set *<b>published_out</b>, if provided,
* to the published time of the cached_dir_t.
*
* DOES NOT increase the reference count on the result. Callers must do that
* themselves if they mean to hang on to it.
*/
static cached_dir_t *
spooled_resource_lookup_cached_dir(const spooled_resource_t *spooled,
time_t *published_out)
{
tor_assert(spooled->spool_eagerly == 0);
cached_dir_t *d = lookup_cached_dir_by_fp(spooled->digest);
if (d != NULL) {
if (published_out)
*published_out = d->published;
}
return d;
}
/** Helper: Look up the body for an eagerly-served spooled_resource. If
* <b>conn_is_encrypted</b> is false, don't look up any resource that
* shouldn't be sent over an unencrypted connection. On success, set
* <b>body_out</b>, <b>size_out</b>, and <b>published_out</b> to refer
* to the resource's body, size, and publication date, and return 0.
* On failure return -1. */
static int
spooled_resource_lookup_body(const spooled_resource_t *spooled,
int conn_is_encrypted,
const uint8_t **body_out,
size_t *size_out,
time_t *published_out)
{
tor_assert(spooled->spool_eagerly == 1);
const signed_descriptor_t *sd = NULL;
switch (spooled->spool_source) {
case DIR_SPOOL_EXTRA_BY_FP: {
sd = get_signed_descriptor_by_fp(spooled->digest, 1);
break;
}
case DIR_SPOOL_SERVER_BY_FP: {
sd = get_signed_descriptor_by_fp(spooled->digest, 0);
break;
}
case DIR_SPOOL_SERVER_BY_DIGEST: {
sd = router_get_by_descriptor_digest((const char *)spooled->digest);
break;
}
case DIR_SPOOL_EXTRA_BY_DIGEST: {
sd = extrainfo_get_by_descriptor_digest((const char *)spooled->digest);
break;
}
case DIR_SPOOL_MICRODESC: {
microdesc_t *md = microdesc_cache_lookup_by_digest256(
get_microdesc_cache(),
(const char *)spooled->digest);
if (! md || ! md->body) {
return -1;
}
*body_out = (const uint8_t *)md->body;
*size_out = md->bodylen;
if (published_out)
*published_out = TIME_MAX;
return 0;
}
case DIR_SPOOL_NETWORKSTATUS:
default:
/* LCOV_EXCL_START */
tor_assert_nonfatal_unreached();
return -1;
/* LCOV_EXCL_STOP */
}
/* If we get here, then we tried to set "sd" to a signed_descriptor_t. */
if (sd == NULL) {
return -1;
}
if (sd->send_unencrypted == 0 && ! conn_is_encrypted) {
/* we did this check once before (so we could have an accurate size
* estimate and maybe send a 404 if somebody asked for only bridges on
* a connection), but we need to do it again in case a previously
* unknown bridge descriptor has shown up between then and now. */
return -1;
}
*body_out = (const uint8_t *) signed_descriptor_get_body(sd);
*size_out = sd->signed_descriptor_len;
if (published_out)
*published_out = sd->published_on;
return 0;
}
/** Given a fingerprint <b>fp</b> which is either set if we're looking for a /** Given a fingerprint <b>fp</b> which is either set if we're looking for a
* v2 status, or zeroes if we're looking for a v3 status, or a NUL-padded * v2 status, or zeroes if we're looking for a v3 status, or a NUL-padded
* flavor name if we want a flavored v3 status, return a pointer to the * flavor name if we want a flavored v3 status, return a pointer to the
* appropriate cached dir object, or NULL if there isn't one available. */ * appropriate cached dir object, or NULL if there isn't one available. */
static cached_dir_t * static cached_dir_t *
lookup_cached_dir_by_fp(const char *fp) lookup_cached_dir_by_fp(const uint8_t *fp)
{ {
cached_dir_t *d = NULL; cached_dir_t *d = NULL;
if (tor_digest_is_zero(fp) && cached_consensuses) { if (tor_digest_is_zero((const char *)fp) && cached_consensuses) {
d = strmap_get(cached_consensuses, "ns"); d = strmap_get(cached_consensuses, "ns");
} else if (memchr(fp, '\0', DIGEST_LEN) && cached_consensuses && } else if (memchr(fp, '\0', DIGEST_LEN) && cached_consensuses) {
(d = strmap_get(cached_consensuses, fp))) { /* this here interface is a nasty hack: we're shoving a flavor into
/* this here interface is a nasty hack XXXX */; * a digest field. */
d = strmap_get(cached_consensuses, (const char *)fp);
} }
return d; return d;
} }
/** Remove from <b>fps</b> every networkstatus key where both /** Try to guess the number of bytes that will be needed to send the
* a) we have a networkstatus document and * spooled objects for <b>conn</b>'s outgoing spool. In the process,
* b) it is not newer than <b>cutoff</b>. * remove every element of the spool that refers to an absent object, or
* * which was published earlier than <b>cutoff</b>. Set *<b>size_out</b>
* Return 1 if any items were present at all; else return 0. * to the number of bytes, and *<b>n_expired_out</b> to the number of
*/ * objects removed for being too old. */
int void
dirserv_remove_old_statuses(smartlist_t *fps, time_t cutoff) dirserv_spool_remove_missing_and_guess_size(dir_connection_t *conn,
time_t cutoff,
int compression,
uint64_t *size_out,
int *n_expired_out)
{ {
int found_any = 0; if (BUG(!conn))
SMARTLIST_FOREACH_BEGIN(fps, char *, digest) { return;
cached_dir_t *d = lookup_cached_dir_by_fp(digest);
if (!d)
continue;
found_any = 1;
if (d->published <= cutoff) {
tor_free(digest);
SMARTLIST_DEL_CURRENT(fps, digest);
}
} SMARTLIST_FOREACH_END(digest);
return found_any; smartlist_t *spool = conn->spool;
if (!spool) {
if (size_out)
*size_out = 0;
if (n_expired_out)
*n_expired_out = 0;
return;
}
int n_expired = 0;
uint64_t total = 0;
SMARTLIST_FOREACH_BEGIN(spool, spooled_resource_t *, spooled) {
time_t published = TIME_MAX;
size_t sz = spooled_resource_estimate_size(spooled, conn,
compression, &published);
if (published < cutoff) {
++n_expired;
SMARTLIST_DEL_CURRENT(spool, spooled);
spooled_resource_free(spooled);
} else if (sz == 0) {
SMARTLIST_DEL_CURRENT(spool, spooled);
spooled_resource_free(spooled);
} else {
total += sz;
}
} SMARTLIST_FOREACH_END(spooled);
if (size_out)
*size_out = total;
if (n_expired_out)
*n_expired_out = n_expired;
}
/** Helper: used to sort a connection's spool. */
static int
dirserv_spool_sort_comparison_(const void **a_, const void **b_)
{
const spooled_resource_t *a = *a_;
const spooled_resource_t *b = *b_;
return fast_memcmp(a->digest, b->digest, sizeof(a->digest));
}
/** Sort all the entries in <b>conn</b> by digest. */
void
dirserv_spool_sort(dir_connection_t *conn)
{
if (conn->spool == NULL)
return;
smartlist_sort(conn->spool, dirserv_spool_sort_comparison_);
} }
/** Return the cache-info for identity fingerprint <b>fp</b>, or /** Return the cache-info for identity fingerprint <b>fp</b>, or
@ -3398,18 +3716,16 @@ dirserv_remove_old_statuses(smartlist_t *fps, time_t cutoff)
* NULL if not found or if the descriptor is older than * NULL if not found or if the descriptor is older than
* <b>publish_cutoff</b>. */ * <b>publish_cutoff</b>. */
static const signed_descriptor_t * static const signed_descriptor_t *
get_signed_descriptor_by_fp(const char *fp, int extrainfo, get_signed_descriptor_by_fp(const uint8_t *fp, int extrainfo)
time_t publish_cutoff)
{ {
if (router_digest_is_me(fp)) { if (router_digest_is_me((const char *)fp)) {
if (extrainfo) if (extrainfo)
return &(router_get_my_extrainfo()->cache_info); return &(router_get_my_extrainfo()->cache_info);
else else
return &(router_get_my_routerinfo()->cache_info); return &(router_get_my_routerinfo()->cache_info);
} else { } else {
const routerinfo_t *ri = router_get_by_id_digest(fp); const routerinfo_t *ri = router_get_by_id_digest((const char *)fp);
if (ri && if (ri) {
ri->cache_info.published_on > publish_cutoff) {
if (extrainfo) if (extrainfo)
return extrainfo_get_by_descriptor_digest( return extrainfo_get_by_descriptor_digest(
ri->cache_info.extra_info_digest); ri->cache_info.extra_info_digest);
@ -3420,346 +3736,66 @@ get_signed_descriptor_by_fp(const char *fp, int extrainfo,
return NULL; return NULL;
} }
/** Return true iff we have any of the documents (extrainfo or routerdesc)
* specified by the fingerprints in <b>fps</b> and <b>spool_src</b>. Used to
* decide whether to send a 404. */
int
dirserv_have_any_serverdesc(smartlist_t *fps, int spool_src)
{
time_t publish_cutoff = time(NULL)-ROUTER_MAX_AGE_TO_PUBLISH;
SMARTLIST_FOREACH_BEGIN(fps, const char *, fp) {
switch (spool_src)
{
case DIR_SPOOL_EXTRA_BY_DIGEST:
if (extrainfo_get_by_descriptor_digest(fp)) return 1;
break;
case DIR_SPOOL_SERVER_BY_DIGEST:
if (router_get_by_descriptor_digest(fp)) return 1;
break;
case DIR_SPOOL_EXTRA_BY_FP:
case DIR_SPOOL_SERVER_BY_FP:
if (get_signed_descriptor_by_fp(fp,
spool_src == DIR_SPOOL_EXTRA_BY_FP, publish_cutoff))
return 1;
break;
}
} SMARTLIST_FOREACH_END(fp);
return 0;
}
/** Return true iff any of the 256-bit elements in <b>fps</b> is the digest of
* a microdescriptor we have. */
int
dirserv_have_any_microdesc(const smartlist_t *fps)
{
microdesc_cache_t *cache = get_microdesc_cache();
SMARTLIST_FOREACH(fps, const char *, fp,
if (microdesc_cache_lookup_by_digest256(cache, fp))
return 1);
return 0;
}
/** Return an approximate estimate of the number of bytes that will
* be needed to transmit the server descriptors (if is_serverdescs --
* they can be either d/ or fp/ queries) or networkstatus objects (if
* !is_serverdescs) listed in <b>fps</b>. If <b>compressed</b> is set,
* we guess how large the data will be after compression.
*
* The return value is an estimate; it might be larger or smaller.
**/
size_t
dirserv_estimate_data_size(smartlist_t *fps, int is_serverdescs,
int compressed)
{
size_t result;
tor_assert(fps);
if (is_serverdescs) {
int n = smartlist_len(fps);
const routerinfo_t *me = router_get_my_routerinfo();
result = (me?me->cache_info.signed_descriptor_len:2048) * n;
if (compressed)
result /= 2; /* observed compressibility is between 35 and 55%. */
} else {
result = 0;
SMARTLIST_FOREACH(fps, const char *, digest, {
cached_dir_t *dir = lookup_cached_dir_by_fp(digest);
if (dir)
result += compressed ? dir->dir_z_len : dir->dir_len;
});
}
return result;
}
/** Given a list of microdescriptor hashes, guess how many bytes will be
* needed to transmit them, and return the guess. */
size_t
dirserv_estimate_microdesc_size(const smartlist_t *fps, int compressed)
{
size_t result = smartlist_len(fps) * microdesc_average_size(NULL);
if (compressed)
result /= 2;
return result;
}
/** When we're spooling data onto our outbuf, add more whenever we dip /** When we're spooling data onto our outbuf, add more whenever we dip
* below this threshold. */ * below this threshold. */
#define DIRSERV_BUFFER_MIN 16384 #define DIRSERV_BUFFER_MIN 16384
/** Spooling helper: called when we have no more data to spool to <b>conn</b>.
* Flushes any remaining data to be (un)compressed, and changes the spool
* source to NONE. Returns 0 on success, negative on failure. */
static int
connection_dirserv_finish_spooling(dir_connection_t *conn)
{
if (conn->zlib_state) {
connection_write_to_buf_zlib("", 0, conn, 1);
tor_zlib_free(conn->zlib_state);
conn->zlib_state = NULL;
}
conn->dir_spool_src = DIR_SPOOL_NONE;
return 0;
}
/** Spooling helper: called when we're sending a bunch of server descriptors,
* and the outbuf has become too empty. Pulls some entries from
* fingerprint_stack, and writes the corresponding servers onto outbuf. If we
* run out of entries, flushes the zlib state and sets the spool source to
* NONE. Returns 0 on success, negative on failure.
*/
static int
connection_dirserv_add_servers_to_outbuf(dir_connection_t *conn)
{
int by_fp = (conn->dir_spool_src == DIR_SPOOL_SERVER_BY_FP ||
conn->dir_spool_src == DIR_SPOOL_EXTRA_BY_FP);
int extra = (conn->dir_spool_src == DIR_SPOOL_EXTRA_BY_FP ||
conn->dir_spool_src == DIR_SPOOL_EXTRA_BY_DIGEST);
time_t publish_cutoff = time(NULL)-ROUTER_MAX_AGE_TO_PUBLISH;
const or_options_t *options = get_options();
while (smartlist_len(conn->fingerprint_stack) &&
connection_get_outbuf_len(TO_CONN(conn)) < DIRSERV_BUFFER_MIN) {
const char *body;
char *fp = smartlist_pop_last(conn->fingerprint_stack);
const signed_descriptor_t *sd = NULL;
if (by_fp) {
sd = get_signed_descriptor_by_fp(fp, extra, publish_cutoff);
} else {
sd = extra ? extrainfo_get_by_descriptor_digest(fp)
: router_get_by_descriptor_digest(fp);
}
tor_free(fp);
if (!sd)
continue;
if (!connection_dir_is_encrypted(conn) && !sd->send_unencrypted) {
/* we did this check once before (so we could have an accurate size
* estimate and maybe send a 404 if somebody asked for only bridges on a
* connection), but we need to do it again in case a previously
* unknown bridge descriptor has shown up between then and now. */
continue;
}
/** If we are the bridge authority and the descriptor is a bridge
* descriptor, remember that we served this descriptor for desc stats. */
if (options->BridgeAuthoritativeDir && by_fp) {
const routerinfo_t *router =
router_get_by_id_digest(sd->identity_digest);
/* router can be NULL here when the bridge auth is asked for its own
* descriptor. */
if (router && router->purpose == ROUTER_PURPOSE_BRIDGE)
rep_hist_note_desc_served(sd->identity_digest);
}
body = signed_descriptor_get_body(sd);
if (conn->zlib_state) {
int last = ! smartlist_len(conn->fingerprint_stack);
connection_write_to_buf_zlib(body, sd->signed_descriptor_len, conn,
last);
if (last) {
tor_zlib_free(conn->zlib_state);
conn->zlib_state = NULL;
}
} else {
connection_write_to_buf(body,
sd->signed_descriptor_len,
TO_CONN(conn));
}
}
if (!smartlist_len(conn->fingerprint_stack)) {
/* We just wrote the last one; finish up. */
if (conn->zlib_state) {
connection_write_to_buf_zlib("", 0, conn, 1);
tor_zlib_free(conn->zlib_state);
conn->zlib_state = NULL;
}
conn->dir_spool_src = DIR_SPOOL_NONE;
smartlist_free(conn->fingerprint_stack);
conn->fingerprint_stack = NULL;
}
return 0;
}
/** Spooling helper: called when we're sending a bunch of microdescriptors,
* and the outbuf has become too empty. Pulls some entries from
* fingerprint_stack, and writes the corresponding microdescs onto outbuf. If
* we run out of entries, flushes the zlib state and sets the spool source to
* NONE. Returns 0 on success, negative on failure.
*/
static int
connection_dirserv_add_microdescs_to_outbuf(dir_connection_t *conn)
{
microdesc_cache_t *cache = get_microdesc_cache();
while (smartlist_len(conn->fingerprint_stack) &&
connection_get_outbuf_len(TO_CONN(conn)) < DIRSERV_BUFFER_MIN) {
char *fp256 = smartlist_pop_last(conn->fingerprint_stack);
microdesc_t *md = microdesc_cache_lookup_by_digest256(cache, fp256);
tor_free(fp256);
if (!md || !md->body)
continue;
if (conn->zlib_state) {
int last = !smartlist_len(conn->fingerprint_stack);
connection_write_to_buf_zlib(md->body, md->bodylen, conn, last);
if (last) {
tor_zlib_free(conn->zlib_state);
conn->zlib_state = NULL;
}
} else {
connection_write_to_buf(md->body, md->bodylen, TO_CONN(conn));
}
}
if (!smartlist_len(conn->fingerprint_stack)) {
if (conn->zlib_state) {
connection_write_to_buf_zlib("", 0, conn, 1);
tor_zlib_free(conn->zlib_state);
conn->zlib_state = NULL;
}
conn->dir_spool_src = DIR_SPOOL_NONE;
smartlist_free(conn->fingerprint_stack);
conn->fingerprint_stack = NULL;
}
return 0;
}
/** Spooling helper: Called when we're sending a directory or networkstatus,
* and the outbuf has become too empty. Pulls some bytes from
* <b>conn</b>-\>cached_dir-\>dir_z, uncompresses them if appropriate, and
* puts them on the outbuf. If we run out of entries, flushes the zlib state
* and sets the spool source to NONE. Returns 0 on success, negative on
* failure. */
static int
connection_dirserv_add_dir_bytes_to_outbuf(dir_connection_t *conn)
{
ssize_t bytes;
int64_t remaining;
bytes = DIRSERV_BUFFER_MIN - connection_get_outbuf_len(TO_CONN(conn));
tor_assert(bytes > 0);
tor_assert(conn->cached_dir);
if (bytes < 8192)
bytes = 8192;
remaining = conn->cached_dir->dir_z_len - conn->cached_dir_offset;
if (BUG(remaining < 0)) {
remaining = 0;
}
if (bytes > remaining) {
bytes = (ssize_t) remaining;
if (BUG(bytes < 0))
return -1;
}
if (conn->zlib_state) {
connection_write_to_buf_zlib(
conn->cached_dir->dir_z + conn->cached_dir_offset,
bytes, conn, bytes == remaining);
} else {
connection_write_to_buf(conn->cached_dir->dir_z + conn->cached_dir_offset,
bytes, TO_CONN(conn));
}
conn->cached_dir_offset += bytes;
if (conn->cached_dir_offset >= (off_t)conn->cached_dir->dir_z_len) {
/* We just wrote the last one; finish up. */
connection_dirserv_finish_spooling(conn);
cached_dir_decref(conn->cached_dir);
conn->cached_dir = NULL;
}
return 0;
}
/** Spooling helper: Called when we're spooling networkstatus objects on
* <b>conn</b>, and the outbuf has become too empty. If the current
* networkstatus object (in <b>conn</b>-\>cached_dir) has more data, pull data
* from there. Otherwise, pop the next fingerprint from fingerprint_stack,
* and start spooling the next networkstatus. (A digest of all 0 bytes is
* treated as a request for the current consensus.) If we run out of entries,
* flushes the zlib state and sets the spool source to NONE. Returns 0 on
* success, negative on failure. */
static int
connection_dirserv_add_networkstatus_bytes_to_outbuf(dir_connection_t *conn)
{
while (connection_get_outbuf_len(TO_CONN(conn)) < DIRSERV_BUFFER_MIN) {
if (conn->cached_dir) {
int uncompressing = (conn->zlib_state != NULL);
int r = connection_dirserv_add_dir_bytes_to_outbuf(conn);
if (conn->dir_spool_src == DIR_SPOOL_NONE) {
/* add_dir_bytes thinks we're done with the cached_dir. But we
* may have more cached_dirs! */
conn->dir_spool_src = DIR_SPOOL_NETWORKSTATUS;
/* This bit is tricky. If we were uncompressing the last
* networkstatus, we may need to make a new zlib object to
* uncompress the next one. */
if (uncompressing && ! conn->zlib_state &&
conn->fingerprint_stack &&
smartlist_len(conn->fingerprint_stack)) {
conn->zlib_state = tor_zlib_new(0, ZLIB_METHOD, HIGH_COMPRESSION);
}
}
if (r) return r;
} else if (conn->fingerprint_stack &&
smartlist_len(conn->fingerprint_stack)) {
/* Add another networkstatus; start serving it. */
char *fp = smartlist_pop_last(conn->fingerprint_stack);
cached_dir_t *d = lookup_cached_dir_by_fp(fp);
tor_free(fp);
if (d) {
++d->refcnt;
conn->cached_dir = d;
conn->cached_dir_offset = 0;
}
} else {
connection_dirserv_finish_spooling(conn);
smartlist_free(conn->fingerprint_stack);
conn->fingerprint_stack = NULL;
return 0;
}
}
return 0;
}
/** Called whenever we have flushed some directory data in state /** Called whenever we have flushed some directory data in state
* SERVER_WRITING. */ * SERVER_WRITING. */
int int
connection_dirserv_flushed_some(dir_connection_t *conn) connection_dirserv_flushed_some(dir_connection_t *conn)
{ {
tor_assert(conn->base_.state == DIR_CONN_STATE_SERVER_WRITING); tor_assert(conn->base_.state == DIR_CONN_STATE_SERVER_WRITING);
if (conn->spool == NULL)
if (connection_get_outbuf_len(TO_CONN(conn)) >= DIRSERV_BUFFER_MIN)
return 0; return 0;
switch (conn->dir_spool_src) { while (connection_get_outbuf_len(TO_CONN(conn)) < DIRSERV_BUFFER_MIN &&
case DIR_SPOOL_EXTRA_BY_DIGEST: smartlist_len(conn->spool)) {
case DIR_SPOOL_EXTRA_BY_FP: spooled_resource_t *spooled =
case DIR_SPOOL_SERVER_BY_DIGEST: smartlist_get(conn->spool, smartlist_len(conn->spool)-1);
case DIR_SPOOL_SERVER_BY_FP: spooled_resource_flush_status_t status;
return connection_dirserv_add_servers_to_outbuf(conn); status = spooled_resource_flush_some(spooled, conn);
case DIR_SPOOL_MICRODESC: if (status == SRFS_ERR) {
return connection_dirserv_add_microdescs_to_outbuf(conn); return -1;
case DIR_SPOOL_NETWORKSTATUS: } else if (status == SRFS_MORE) {
return connection_dirserv_add_networkstatus_bytes_to_outbuf(conn);
case DIR_SPOOL_NONE:
default:
return 0; return 0;
}
tor_assert(status == SRFS_DONE);
/* If we're here, we're done flushing this resource. */
tor_assert(smartlist_pop_last(conn->spool) == spooled);
spooled_resource_free(spooled);
} }
if (smartlist_len(conn->spool) > 0) {
/* We're still spooling something. */
return 0;
}
/* If we get here, we're done. */
smartlist_free(conn->spool);
conn->spool = NULL;
if (conn->zlib_state) {
/* Flush the zlib state: there could be more bytes pending in there, and
* we don't want to omit bytes. */
connection_write_to_buf_zlib("", 0, conn, 1);
tor_zlib_free(conn->zlib_state);
conn->zlib_state = NULL;
}
return 0;
}
/** Remove every element from <b>conn</b>'s outgoing spool, and delete
* the spool. */
void
dir_conn_clear_spool(dir_connection_t *conn)
{
if (!conn || ! conn->spool)
return;
SMARTLIST_FOREACH(conn->spool, spooled_resource_t *, s,
spooled_resource_free(s));
smartlist_free(conn->spool);
conn->spool = NULL;
} }
/** Return true iff <b>line</b> is a valid RecommendedPackages line. /** Return true iff <b>line</b> is a valid RecommendedPackages line.

View File

@ -32,6 +32,53 @@
/** Maximum allowable length of a version line in a networkstatus. */ /** Maximum allowable length of a version line in a networkstatus. */
#define MAX_V_LINE_LEN 128 #define MAX_V_LINE_LEN 128
/** Ways to convert a spoolable_resource_t to a bunch of bytes. */
typedef enum dir_spool_source_t {
DIR_SPOOL_SERVER_BY_DIGEST=1, DIR_SPOOL_SERVER_BY_FP,
DIR_SPOOL_EXTRA_BY_DIGEST, DIR_SPOOL_EXTRA_BY_FP,
DIR_SPOOL_MICRODESC,
DIR_SPOOL_NETWORKSTATUS,
} dir_spool_source_t;
#define dir_spool_source_bitfield_t ENUM_BF(dir_spool_source_t)
/** Object to remember the identity of an object that we are spooling,
* or about to spool, in response to a directory request.
*
* (Why do we spool? Because some directory responses are very large,
* and we don't want to just shove the complete answer into the output
* buffer: that would take a ridiculous amount of RAM.)
*
* If the spooled resource is relatively small (like microdescriptors,
* descriptors, etc), we look them up by ID as needed, and add the whole
* thing onto the output buffer at once. If the spooled reseource is
* big (like networkstatus documents), we reference-count it, and add it
* a few K at a time.
*/
typedef struct spooled_resource_t {
/**
* If true, we add the entire object to the outbuf. If false,
* we spool the object a few K at a time.
*/
unsigned spool_eagerly : 1;
/**
* Tells us what kind of object to get, and how to look it up.
*/
dir_spool_source_bitfield_t spool_source : 7;
/**
* Tells us the specific object to spool.
*/
uint8_t digest[DIGEST256_LEN];
/**
* A large object that we're spooling. Holds a reference count. Only
* used when spool_eagerly is false.
*/
struct cached_dir_t *cached_dir_ref;
/**
* The current offset into cached_dir. Only used when spool_eagerly is
* false */
off_t cached_dir_offset;
} spooled_resource_t;
int connection_dirserv_flushed_some(dir_connection_t *conn); int connection_dirserv_flushed_some(dir_connection_t *conn);
int dirserv_add_own_fingerprint(crypto_pk_t *pk); int dirserv_add_own_fingerprint(crypto_pk_t *pk);
@ -65,10 +112,10 @@ void dirserv_set_cached_consensus_networkstatus(const char *consensus,
const common_digests_t *digests, const common_digests_t *digests,
time_t published); time_t published);
void dirserv_clear_old_networkstatuses(time_t cutoff); void dirserv_clear_old_networkstatuses(time_t cutoff);
int dirserv_get_routerdesc_fingerprints(smartlist_t *fps_out, const char *key, int dirserv_get_routerdesc_spool(smartlist_t *spools_out, const char *key,
const char **msg, dir_spool_source_t source,
int for_unencrypted_conn, int conn_is_encrytped,
int is_extrainfo); const char **msg_out);
int dirserv_get_routerdescs(smartlist_t *descs_out, const char *key, int dirserv_get_routerdescs(smartlist_t *descs_out, const char *key,
const char **msg); const char **msg);
void dirserv_orconn_tls_done(const tor_addr_t *addr, void dirserv_orconn_tls_done(const tor_addr_t *addr,
@ -89,13 +136,6 @@ void dirserv_set_node_flags_from_authoritative_status(node_t *node,
uint32_t authstatus); uint32_t authstatus);
int dirserv_would_reject_router(const routerstatus_t *rs); int dirserv_would_reject_router(const routerstatus_t *rs);
int dirserv_remove_old_statuses(smartlist_t *fps, time_t cutoff);
int dirserv_have_any_serverdesc(smartlist_t *fps, int spool_src);
int dirserv_have_any_microdesc(const smartlist_t *fps);
size_t dirserv_estimate_data_size(smartlist_t *fps, int is_serverdescs,
int compressed);
size_t dirserv_estimate_microdesc_size(const smartlist_t *fps, int compressed);
char *routerstatus_format_entry( char *routerstatus_format_entry(
const routerstatus_t *rs, const routerstatus_t *rs,
const char *version, const char *version,
@ -141,5 +181,17 @@ int dirserv_read_measured_bandwidths(const char *from_file,
int dirserv_read_guardfraction_file(const char *fname, int dirserv_read_guardfraction_file(const char *fname,
smartlist_t *vote_routerstatuses); smartlist_t *vote_routerstatuses);
spooled_resource_t *spooled_resource_new(dir_spool_source_t source,
const uint8_t *digest,
size_t digestlen);
void spooled_resource_free(spooled_resource_t *spooled);
void dirserv_spool_remove_missing_and_guess_size(dir_connection_t *conn,
time_t cutoff,
int compression,
uint64_t *size_out,
int *n_expired_out);
void dirserv_spool_sort(dir_connection_t *conn);
void dir_conn_clear_spool(dir_connection_t *conn);
#endif #endif

View File

@ -804,18 +804,6 @@ microdesc_cache_lookup_by_digest256(microdesc_cache_t *cache, const char *d)
return md; return md;
} }
/** Return the mean size of decriptors added to <b>cache</b> since it was last
* cleared. Used to estimate the size of large downloads. */
size_t
microdesc_average_size(microdesc_cache_t *cache)
{
if (!cache)
cache = get_microdesc_cache();
if (!cache->n_seen)
return 512;
return (size_t)(cache->total_len_seen / cache->n_seen);
}
/** Return a smartlist of all the sha256 digest of the microdescriptors that /** Return a smartlist of all the sha256 digest of the microdescriptors that
* are listed in <b>ns</b> but not present in <b>cache</b>. Returns pointers * are listed in <b>ns</b> but not present in <b>cache</b>. Returns pointers
* to internals of <b>ns</b>; you should not free the members of the resulting * to internals of <b>ns</b>; you should not free the members of the resulting

View File

@ -32,8 +32,6 @@ void microdesc_cache_clear(microdesc_cache_t *cache);
microdesc_t *microdesc_cache_lookup_by_digest256(microdesc_cache_t *cache, microdesc_t *microdesc_cache_lookup_by_digest256(microdesc_cache_t *cache,
const char *d); const char *d);
size_t microdesc_average_size(microdesc_cache_t *cache);
smartlist_t *microdesc_list_missing_digest256(networkstatus_t *ns, smartlist_t *microdesc_list_missing_digest256(networkstatus_t *ns,
microdesc_cache_t *cache, microdesc_cache_t *cache,
int downloadable_only, int downloadable_only,

View File

@ -1732,14 +1732,6 @@ typedef struct entry_connection_t {
unsigned int is_socks_socket:1; unsigned int is_socks_socket:1;
} entry_connection_t; } entry_connection_t;
typedef enum {
DIR_SPOOL_NONE=0, DIR_SPOOL_SERVER_BY_DIGEST, DIR_SPOOL_SERVER_BY_FP,
DIR_SPOOL_EXTRA_BY_DIGEST, DIR_SPOOL_EXTRA_BY_FP,
DIR_SPOOL_NETWORKSTATUS,
DIR_SPOOL_MICRODESC, /* NOTE: if we add another entry, add another bit. */
} dir_spool_source_t;
#define dir_spool_source_bitfield_t ENUM_BF(dir_spool_source_t)
/** Subtype of connection_t for an "directory connection" -- that is, an HTTP /** Subtype of connection_t for an "directory connection" -- that is, an HTTP
* connection to retrieve or serve directory material. */ * connection to retrieve or serve directory material. */
typedef struct dir_connection_t { typedef struct dir_connection_t {
@ -1754,21 +1746,13 @@ typedef struct dir_connection_t {
char *requested_resource; char *requested_resource;
unsigned int dirconn_direct:1; /**< Is this dirconn direct, or via Tor? */ unsigned int dirconn_direct:1; /**< Is this dirconn direct, or via Tor? */
/* Used only for server sides of some dir connections, to implement
* "spooling" of directory material to the outbuf. Otherwise, we'd have
* to append everything to the outbuf in one enormous chunk. */
/** What exactly are we spooling right now? */
dir_spool_source_bitfield_t dir_spool_src : 3;
/** If we're fetching descriptors, what router purpose shall we assign /** If we're fetching descriptors, what router purpose shall we assign
* to them? */ * to them? */
uint8_t router_purpose; uint8_t router_purpose;
/** List of fingerprints for networkstatuses or descriptors to be spooled. */
smartlist_t *fingerprint_stack; /** List of spooled_resource_t for objects that we're spooling. We use
/** A cached_dir_t object that we're currently spooling out */ * it from back to front. */
struct cached_dir_t *cached_dir; smartlist_t *spool;
/** The current offset into cached_dir. */
off_t cached_dir_offset;
/** The zlib object doing on-the-fly compression for spooled data. */ /** The zlib object doing on-the-fly compression for spooled data. */
tor_zlib_state_t *zlib_state; tor_zlib_state_t *zlib_state;

View File

@ -743,7 +743,7 @@ test_dir_handle_get_server_descriptors_not_found(void* data)
NULL, NULL, 1, 0); NULL, NULL, 1, 0);
tt_str_op(NOT_FOUND, OP_EQ, header); tt_str_op(NOT_FOUND, OP_EQ, header);
tt_int_op(conn->dir_spool_src, OP_EQ, DIR_SPOOL_SERVER_BY_FP); tt_ptr_op(conn->spool, OP_EQ, NULL);
done: done:
UNMOCK(connection_write_to_buf_impl_); UNMOCK(connection_write_to_buf_impl_);
@ -773,6 +773,7 @@ test_dir_handle_get_server_descriptors_all(void* data)
tt_int_op(smartlist_len(our_routerlist->routers), OP_GE, 1); tt_int_op(smartlist_len(our_routerlist->routers), OP_GE, 1);
mock_routerinfo = smartlist_get(our_routerlist->routers, 0); mock_routerinfo = smartlist_get(our_routerlist->routers, 0);
set_server_identity_key(mock_routerinfo->identity_pkey); set_server_identity_key(mock_routerinfo->identity_pkey);
mock_routerinfo->cache_info.published_on = time(NULL);
/* Treat "all" requests as if they were unencrypted */ /* Treat "all" requests as if they were unencrypted */
mock_routerinfo->cache_info.send_unencrypted = 1; mock_routerinfo->cache_info.send_unencrypted = 1;
@ -787,7 +788,7 @@ test_dir_handle_get_server_descriptors_all(void* data)
//which is smaller than that by annotation_len bytes //which is smaller than that by annotation_len bytes
fetch_from_buf_http(TO_CONN(conn)->outbuf, &header, MAX_HEADERS_SIZE, fetch_from_buf_http(TO_CONN(conn)->outbuf, &header, MAX_HEADERS_SIZE,
&body, &body_used, &body, &body_used,
mock_routerinfo->cache_info.signed_descriptor_len+1, 0); 1024*1024, 0);
tt_assert(header); tt_assert(header);
tt_assert(body); tt_assert(body);
@ -803,7 +804,7 @@ test_dir_handle_get_server_descriptors_all(void* data)
tt_str_op(body, OP_EQ, mock_routerinfo->cache_info.signed_descriptor_body + tt_str_op(body, OP_EQ, mock_routerinfo->cache_info.signed_descriptor_body +
mock_routerinfo->cache_info.annotations_len); mock_routerinfo->cache_info.annotations_len);
tt_int_op(conn->dir_spool_src, OP_EQ, DIR_SPOOL_NONE); tt_ptr_op(conn->spool, OP_EQ, NULL);
done: done:
NS_UNMOCK(router_get_my_routerinfo); NS_UNMOCK(router_get_my_routerinfo);
@ -882,6 +883,7 @@ test_dir_handle_get_server_descriptors_authority(void* data)
mock_routerinfo->cache_info.signed_descriptor_len = mock_routerinfo->cache_info.signed_descriptor_len =
strlen(TEST_DESCRIPTOR) - annotation_len;; strlen(TEST_DESCRIPTOR) - annotation_len;;
mock_routerinfo->cache_info.annotations_len = annotation_len; mock_routerinfo->cache_info.annotations_len = annotation_len;
mock_routerinfo->cache_info.published_on = time(NULL);
conn = new_dir_conn(); conn = new_dir_conn();
@ -904,7 +906,7 @@ test_dir_handle_get_server_descriptors_authority(void* data)
tt_int_op(body_used, OP_EQ, strlen(body)); tt_int_op(body_used, OP_EQ, strlen(body));
tt_str_op(body, OP_EQ, TEST_DESCRIPTOR + annotation_len); tt_str_op(body, OP_EQ, TEST_DESCRIPTOR + annotation_len);
tt_int_op(conn->dir_spool_src, OP_EQ, DIR_SPOOL_NONE); tt_ptr_op(conn->spool, OP_EQ, NULL);
done: done:
NS_UNMOCK(router_get_my_routerinfo); NS_UNMOCK(router_get_my_routerinfo);
@ -946,6 +948,7 @@ test_dir_handle_get_server_descriptors_fp(void* data)
mock_routerinfo->cache_info.signed_descriptor_len = mock_routerinfo->cache_info.signed_descriptor_len =
strlen(TEST_DESCRIPTOR) - annotation_len; strlen(TEST_DESCRIPTOR) - annotation_len;
mock_routerinfo->cache_info.annotations_len = annotation_len; mock_routerinfo->cache_info.annotations_len = annotation_len;
mock_routerinfo->cache_info.published_on = time(NULL);
conn = new_dir_conn(); conn = new_dir_conn();
@ -975,7 +978,7 @@ test_dir_handle_get_server_descriptors_fp(void* data)
tt_int_op(body_used, OP_EQ, strlen(body)); tt_int_op(body_used, OP_EQ, strlen(body));
tt_str_op(body, OP_EQ, TEST_DESCRIPTOR + annotation_len); tt_str_op(body, OP_EQ, TEST_DESCRIPTOR + annotation_len);
tt_int_op(conn->dir_spool_src, OP_EQ, DIR_SPOOL_NONE); tt_ptr_op(conn->spool, OP_EQ, NULL);
done: done:
NS_UNMOCK(router_get_my_routerinfo); NS_UNMOCK(router_get_my_routerinfo);
@ -1041,7 +1044,7 @@ test_dir_handle_get_server_descriptors_d(void* data)
tt_str_op(body, OP_EQ, router->cache_info.signed_descriptor_body + tt_str_op(body, OP_EQ, router->cache_info.signed_descriptor_body +
router->cache_info.annotations_len); router->cache_info.annotations_len);
tt_int_op(conn->dir_spool_src, OP_EQ, DIR_SPOOL_NONE); tt_ptr_op(conn->spool, OP_EQ, NULL);
done: done:
UNMOCK(connection_write_to_buf_impl_); UNMOCK(connection_write_to_buf_impl_);
@ -1096,7 +1099,7 @@ test_dir_handle_get_server_descriptors_busy(void* data)
tt_assert(header); tt_assert(header);
tt_str_op(SERVER_BUSY, OP_EQ, header); tt_str_op(SERVER_BUSY, OP_EQ, header);
tt_int_op(conn->dir_spool_src, OP_EQ, DIR_SPOOL_NONE); tt_ptr_op(conn->spool, OP_EQ, NULL);
done: done:
UNMOCK(get_options); UNMOCK(get_options);