Prop#329 streams: Handle stream usage with conflux

This adds utility functions to help stream block decisions, as well as cpath
layer_hint checks for stream cell acceptance, and syncing stream lists
for conflux circuits.

These functions are then called throughout the codebase to properly manage
conflux streams.
This commit is contained in:
Mike Perry 2023-04-02 21:06:20 +00:00
parent 21c861bfa3
commit 2f865b4bba
14 changed files with 596 additions and 107 deletions

View file

@ -2368,6 +2368,7 @@ circuit_about_to_free(circuit_t *circ)
if (! CIRCUIT_IS_ORIGIN(circ)) {
or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
edge_connection_t *conn;
for (conn=or_circ->n_streams; conn; conn=conn->next_stream)
connection_edge_destroy(or_circ->p_circ_id, conn);
or_circ->n_streams = NULL;

View file

@ -78,6 +78,8 @@
#include "core/crypto/relay_crypto.h"
#include "feature/nodelist/nodelist.h"
#include "src/core/or/conflux_util.h"
#include "app/config/config.h"
static inline circpad_circuit_state_t circpad_circuit_state(
@ -251,8 +253,11 @@ circpad_marked_circuit_for_padding(circuit_t *circ, int reason)
* has shut down, but using the MaxCircuitDirtiness timer instead of
* the idle circuit timer (again, we want this because we're not
* supposed to look idle to Guard nodes that can see our lifespan). */
if (!circ->timestamp_dirty)
if (!circ->timestamp_dirty) {
circ->timestamp_dirty = approx_time();
if (circ->conflux && CIRCUIT_IS_ORIGIN(circ))
conflux_sync_circ_fields(circ->conflux, TO_ORIGIN_CIRCUIT(circ));
}
/* Take ownership of the circuit */
circuit_change_purpose(circ, CIRCUIT_PURPOSE_C_CIRCUIT_PADDING);

View file

@ -64,6 +64,7 @@
#include "lib/time/tvdiff.h"
#include "lib/trace/events.h"
#include "src/core/mainloop/mainloop.h"
#include "core/or/conflux.h"
#include "core/or/cpath_build_state_st.h"
#include "feature/dircommon/dir_connection_st.h"
@ -700,7 +701,6 @@ circuit_expire_building(void)
} else { /* circuit not open, consider recording failure as timeout */
int first_hop_succeeded = TO_ORIGIN_CIRCUIT(victim)->cpath &&
TO_ORIGIN_CIRCUIT(victim)->cpath->state == CPATH_STATE_OPEN;
if (TO_ORIGIN_CIRCUIT(victim)->p_streams != NULL) {
log_warn(LD_BUG, "Circuit %d (purpose %d, %s) has timed out, "
"yet has attached streams!",
@ -1351,6 +1351,7 @@ circuit_detach_stream(circuit_t *circ, edge_connection_t *conn)
int removed = 0;
if (conn == origin_circ->p_streams) {
origin_circ->p_streams = conn->next_stream;
conflux_update_p_streams(origin_circ, conn->next_stream);
removed = 1;
} else {
for (prevconn = origin_circ->p_streams;
@ -1383,10 +1384,12 @@ circuit_detach_stream(circuit_t *circ, edge_connection_t *conn)
or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
if (conn == or_circ->n_streams) {
or_circ->n_streams = conn->next_stream;
conflux_update_n_streams(or_circ, conn->next_stream);
return;
}
if (conn == or_circ->resolving_streams) {
or_circ->resolving_streams = conn->next_stream;
conflux_update_resolving_streams(or_circ, conn->next_stream);
return;
}
@ -2556,7 +2559,13 @@ circuit_get_open_circ_or_launch(entry_connection_t *conn,
}
/** Return true iff <b>crypt_path</b> is one of the crypt_paths for
* <b>circ</b>. */
* <b>circ</b>.
*
* WARNING: This function only validates that the cpath is on the *current*
* circuit, for internal consistency checking. For codepaths involving streams,
* or cpaths or layer_hints that could be from a different circuit due to
* conflux, use edge_uses_cpath() or conflux_validate_source_hop() instead.
*/
static int
cpath_is_on_circuit(origin_circuit_t *circ, crypt_path_t *crypt_path)
{
@ -2594,6 +2603,7 @@ link_apconn_to_circ(entry_connection_t *apconn, origin_circuit_t *circ,
ENTRY_TO_EDGE_CONN(apconn)->on_circuit = TO_CIRCUIT(circ);
/* assert_connection_ok(conn, time(NULL)); */
circ->p_streams = ENTRY_TO_EDGE_CONN(apconn);
conflux_update_p_streams(circ, ENTRY_TO_EDGE_CONN(apconn));
if (connection_edge_is_rendezvous_stream(ENTRY_TO_EDGE_CONN(apconn))) {
/* We are attaching a stream to a rendezvous circuit. That means
@ -2733,6 +2743,9 @@ connection_ap_handshake_attach_chosen_circuit(entry_connection_t *conn,
/* When stream isolation is in use and controlled by an application
* we are willing to keep using the stream. */
circ->base_.timestamp_dirty = approx_time();
if (TO_CIRCUIT(circ)->conflux) {
conflux_sync_circ_fields(TO_CIRCUIT(circ)->conflux, circ);
}
}
pathbias_count_use_attempt(circ);
@ -3103,6 +3116,10 @@ mark_circuit_unusable_for_new_conns(origin_circuit_t *circ)
circ->base_.timestamp_dirty -= options->MaxCircuitDirtiness;
circ->unusable_for_new_conns = 1;
if (TO_CIRCUIT(circ)->conflux) {
conflux_sync_circ_fields(TO_CIRCUIT(circ)->conflux, circ);
}
}
/**

393
src/core/or/conflux_util.c Normal file
View file

@ -0,0 +1,393 @@
/* Copyright (c) 2021, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file conflux_util.c
* \brief Conflux utility functions for stream blocking and management.
*/
#define TOR_CONFLUX_PRIVATE
#include "core/or/or.h"
#include "core/or/circuit_st.h"
#include "core/or/sendme.h"
#include "core/or/congestion_control_common.h"
#include "core/or/congestion_control_st.h"
#include "core/or/circuitlist.h"
#include "core/or/origin_circuit_st.h"
#include "core/or/or_circuit_st.h"
#include "core/or/conflux.h"
#include "core/or/conflux_params.h"
#include "core/or/conflux_util.h"
#include "core/or/conflux_st.h"
#include "lib/time/compat_time.h"
#include "app/config/config.h"
/**
* This is a utility function that returns the package window circuit,
* regardless of if it has a conflux pair or not.
*/
int
circuit_get_package_window(circuit_t *circ,
const crypt_path_t *cpath)
{
if (circ->conflux) {
if (CIRCUIT_IS_ORIGIN(circ)) {
tor_assert_nonfatal(circ->purpose ==
CIRCUIT_PURPOSE_CONFLUX_LINKED);
}
circ = conflux_decide_next_circ(circ->conflux);
/* If conflux has no circuit to send on, the package window is 0. */
if (!circ) {
return 0;
}
/* If we are the origin, we need to get the last hop's cpath for
* congestion control information. */
if (CIRCUIT_IS_ORIGIN(circ)) {
cpath = CONST_TO_ORIGIN_CIRCUIT(circ)->cpath->prev;
} else {
if (BUG(cpath != NULL)) {
log_warn(LD_BUG, "cpath is not NULL for non-origin circuit");
}
}
}
return congestion_control_get_package_window(circ, cpath);
}
/**
* Returns true if conflux can send a data cell.
*
* Used to decide if we should block streams or not, for
* proccess_sendme_cell(), circuit_resume_edge_reading(),
* circuit_consider_stop_edge_reading(), circuit_resume_edge_reading_helper(),
* channel_flush_from_first_active_circuit()
*/
bool
conflux_can_send(conflux_t *cfx)
{
const circuit_t *send_circ = conflux_decide_next_circ(cfx);
/* If we have a circuit, we can send */
if (send_circ) {
return true;
} else {
return false;
}
}
/**
* For a given conflux circuit, return the cpath of the destination.
*
* The cpath destination is the last hop of the circuit, or NULL if
* the circuit is a non-origin circuit.
*/
crypt_path_t *
conflux_get_destination_hop(circuit_t *circ)
{
if (BUG(!circ)) {
log_warn(LD_BUG, "No circuit to send on for conflux");
return NULL;
} else {
/* Conflux circuits always send multiplexed relay commands to
* to the last hop. (Non-multiplexed commands go on their
* original circuit and hop). */
if (CIRCUIT_IS_ORIGIN(circ)) {
return TO_ORIGIN_CIRCUIT(circ)->cpath->prev;
} else {
return NULL;
}
}
}
/**
* Validates that the source of a cell is from the last hop of the circuit
* for origin circuits, and that there are no further hops for non-origin
* circuits.
*/
bool
conflux_validate_source_hop(circuit_t *in_circ,
crypt_path_t *layer_hint)
{
crypt_path_t *dest = conflux_get_destination_hop(in_circ);
if (dest != layer_hint) {
log_warn(LD_CIRC, "Got conflux command from incorrect hop");
return false;
}
if (layer_hint == NULL) {
/* We should not have further hops attached to this circuit */
if (in_circ->n_chan) {
log_warn(LD_BUG, "Got conflux command on circuit with further hops");
return false;
}
}
return true;
}
/**
* Returns true if the edge connection uses the given cpath.
*
* If there is a conflux object, we inspect all the last hops of the conflux
* circuits.
*/
bool
edge_uses_cpath(const edge_connection_t *conn,
const crypt_path_t *cpath)
{
if (!conn->on_circuit)
return false;
if (CIRCUIT_IS_ORIGIN(conn->on_circuit)) {
if (conn->on_circuit->conflux) {
tor_assert_nonfatal(conn->on_circuit->purpose ==
CIRCUIT_PURPOSE_CONFLUX_LINKED);
/* If the circuit is an origin circuit with a conflux object, the cpath
* is valid if it came from any of the conflux circuit's last hops. */
CONFLUX_FOR_EACH_LEG_BEGIN(conn->on_circuit->conflux, leg) {
const origin_circuit_t *ocirc = CONST_TO_ORIGIN_CIRCUIT(leg->circ);
if (ocirc->cpath->prev == cpath) {
return true;
}
} CONFLUX_FOR_EACH_LEG_END(leg);
} else {
return cpath == conn->cpath_layer;
}
} else {
/* For non-origin circuits, cpath should be null */
return cpath == NULL;
}
return false;
}
/**
* Returns the max RTT for the circuit that carries this stream,
* as observed by congestion control. For conflux circuits,
* we return the max RTT across all circuits.
*/
uint64_t
edge_get_max_rtt(const edge_connection_t *stream)
{
if (!stream->on_circuit)
return 0;
if (stream->on_circuit->conflux) {
tor_assert_nonfatal(stream->on_circuit->purpose ==
CIRCUIT_PURPOSE_CONFLUX_LINKED);
/* Find the max rtt from the ccontrol object of each circuit. */
uint64_t max_rtt = 0;
CONFLUX_FOR_EACH_LEG_BEGIN(stream->on_circuit->conflux, leg) {
const congestion_control_t *cc = circuit_ccontrol(leg->circ);
if (cc->max_rtt_usec > max_rtt) {
max_rtt = cc->max_rtt_usec;
}
} CONFLUX_FOR_EACH_LEG_END(leg);
return max_rtt;
} else {
if (stream->on_circuit && stream->on_circuit->ccontrol)
return stream->on_circuit->ccontrol->max_rtt_usec;
else if (stream->cpath_layer && stream->cpath_layer->ccontrol)
return stream->cpath_layer->ccontrol->max_rtt_usec;
}
return 0;
}
/**
* Return true iff our decryption layer_hint is from the last hop
* in a circuit.
*/
bool
relay_crypt_from_last_hop(const origin_circuit_t *circ,
const crypt_path_t *layer_hint)
{
tor_assert(circ);
tor_assert(layer_hint);
tor_assert(circ->cpath);
if (TO_CIRCUIT(circ)->conflux) {
tor_assert_nonfatal(TO_CIRCUIT(circ)->purpose ==
CIRCUIT_PURPOSE_CONFLUX_LINKED);
/* If we are a conflux circuit, we need to check if the layer_hint
* is from the last hop of any of the conflux circuits. */
CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) {
const origin_circuit_t *ocirc = CONST_TO_ORIGIN_CIRCUIT(leg->circ);
if (layer_hint == ocirc->cpath->prev) {
return true;
}
} CONFLUX_FOR_EACH_LEG_END(leg);
log_fn(LOG_PROTOCOL_WARN, LD_CIRC,
"Got unexpected relay data from intermediate hop");
return false;
} else {
if (layer_hint != circ->cpath->prev) {
log_fn(LOG_PROTOCOL_WARN, LD_CIRC,
"Got unexpected relay data from intermediate hop");
return false;
}
return true;
}
}
/**
* Update the head of the n_streams list on all circuits in the conflux
* set.
*/
void
conflux_update_p_streams(origin_circuit_t *circ, edge_connection_t *stream)
{
tor_assert(circ);
if (TO_CIRCUIT(circ)->conflux) {
tor_assert_nonfatal(TO_CIRCUIT(circ)->purpose ==
CIRCUIT_PURPOSE_CONFLUX_LINKED);
CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) {
TO_ORIGIN_CIRCUIT(leg->circ)->p_streams = stream;
} CONFLUX_FOR_EACH_LEG_END(leg);
}
}
/**
* Sync the next_stream_id, timestamp_dirty, and circuit_idle_timeout
* fields of a conflux set to the values in a particular circuit.
*
* This is called upon link, and whenever one of these fields
* changes on ref_circ. The ref_circ values are copied to all
* other circuits in the conflux set.
*/
void
conflux_sync_circ_fields(conflux_t *cfx, origin_circuit_t *ref_circ)
{
tor_assert(cfx);
tor_assert(ref_circ);
CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
if (leg->circ == TO_CIRCUIT(ref_circ)) {
continue;
}
origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(leg->circ);
ocirc->next_stream_id = ref_circ->next_stream_id;
leg->circ->timestamp_dirty = TO_CIRCUIT(ref_circ)->timestamp_dirty;
ocirc->circuit_idle_timeout = ref_circ->circuit_idle_timeout;
ocirc->unusable_for_new_conns = ref_circ->unusable_for_new_conns;
} CONFLUX_FOR_EACH_LEG_END(leg);
}
/**
* Update the head of the n_streams list on all circuits in the conflux
* set.
*/
void
conflux_update_n_streams(or_circuit_t *circ, edge_connection_t *stream)
{
tor_assert(circ);
if (TO_CIRCUIT(circ)->conflux) {
CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) {
TO_OR_CIRCUIT(leg->circ)->n_streams = stream;
} CONFLUX_FOR_EACH_LEG_END(leg);
}
}
/**
* Update the head of the resolving_streams list on all circuits in the conflux
* set.
*/
void
conflux_update_resolving_streams(or_circuit_t *circ, edge_connection_t *stream)
{
tor_assert(circ);
if (TO_CIRCUIT(circ)->conflux) {
CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) {
TO_OR_CIRCUIT(leg->circ)->resolving_streams = stream;
} CONFLUX_FOR_EACH_LEG_END(leg);
}
}
/**
* Update the half_streams list on all circuits in the conflux
*/
void
conflux_update_half_streams(origin_circuit_t *circ, smartlist_t *half_streams)
{
tor_assert(circ);
if (TO_CIRCUIT(circ)->conflux) {
tor_assert_nonfatal(TO_CIRCUIT(circ)->purpose ==
CIRCUIT_PURPOSE_CONFLUX_LINKED);
CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) {
TO_ORIGIN_CIRCUIT(leg->circ)->half_streams = half_streams;
} CONFLUX_FOR_EACH_LEG_END(leg);
}
}
/**
* Helper function that emits non-fatal asserts if the stream lists
* or next_stream_id is out of sync between any of the conflux legs.
*/
void
conflux_validate_stream_lists(const conflux_t *cfx)
{
const conflux_leg_t *first_leg = smartlist_get(cfx->legs, 0);
tor_assert(first_leg);
/* Compare the stream lists of the first leg to all other legs. */
if (CIRCUIT_IS_ORIGIN(first_leg->circ)) {
const origin_circuit_t *f_circ =
CONST_TO_ORIGIN_CIRCUIT(first_leg->circ);
CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
const origin_circuit_t *l_circ = CONST_TO_ORIGIN_CIRCUIT(leg->circ);
tor_assert_nonfatal(l_circ->p_streams == f_circ->p_streams);
tor_assert_nonfatal(l_circ->half_streams == f_circ->half_streams);
tor_assert_nonfatal(l_circ->next_stream_id == f_circ->next_stream_id);
} CONFLUX_FOR_EACH_LEG_END(leg);
} else {
const or_circuit_t *f_circ = CONST_TO_OR_CIRCUIT(first_leg->circ);
CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
const or_circuit_t *l_circ = CONST_TO_OR_CIRCUIT(leg->circ);
tor_assert_nonfatal(l_circ->n_streams == f_circ->n_streams);
tor_assert_nonfatal(l_circ->resolving_streams ==
f_circ->resolving_streams);
} CONFLUX_FOR_EACH_LEG_END(leg);
}
}
/**
* Validate the conflux set has two legs, and both circuits have
* no nonce, and for origin circuits, the purpose is CONFLUX_PURPOSE_LINKED.
*/
void
conflux_validate_legs(const conflux_t *cfx)
{
tor_assert(cfx);
// TODO-329-UDP: Eventually we want to allow three legs for the
// exit case, to allow reconnection of legs to hit an RTT target.
// For now, this validation helps find bugs.
if (BUG(smartlist_len(cfx->legs) > conflux_params_get_num_legs_set())) {
log_warn(LD_BUG, "Number of legs is above maximum of %d allowed: %d\n",
conflux_params_get_num_legs_set(), smartlist_len(cfx->legs));
}
CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
/* Ensure we have no pending nonce on the circ */
tor_assert_nonfatal(leg->circ->conflux_pending_nonce == NULL);
tor_assert_nonfatal(leg->circ->conflux != NULL);
if (CIRCUIT_IS_ORIGIN(leg->circ)) {
tor_assert_nonfatal(leg->circ->purpose ==
CIRCUIT_PURPOSE_CONFLUX_LINKED);
}
} CONFLUX_FOR_EACH_LEG_END(leg);
}

View file

@ -0,0 +1,59 @@
/* Copyright (c) 2023, The Tor Project, Inc. */
/* See LICENSE for licensing information */
/**
* \file conflux_util.h
* \brief Header file for conflux_util.c.
**/
#ifndef TOR_CONFLUX_UTIL_H
#define TOR_CONFLUX_UTIL_H
/* Forward decls */
typedef struct edge_connection_t edge_connection_t;
typedef struct crypt_path_t crypt_path_t;
typedef struct origin_circuit_t origin_circuit_t;
typedef struct conflux_t conflux_t;
/* True iff the given circuit_t circ is conflux related. */
static inline bool
CIRCUIT_IS_CONFLUX(const circuit_t *circ)
{
if (circ->conflux_pending_nonce) {
if (CIRCUIT_IS_ORIGIN(circ))
tor_assert_nonfatal(circ->purpose == CIRCUIT_PURPOSE_CONFLUX_UNLINKED);
return true;
} else if (circ->conflux) {
if (CIRCUIT_IS_ORIGIN(circ))
tor_assert_nonfatal(circ->purpose == CIRCUIT_PURPOSE_CONFLUX_LINKED);
return true;
} else {
tor_assert_nonfatal(circ->purpose != CIRCUIT_PURPOSE_CONFLUX_LINKED);
tor_assert_nonfatal(circ->purpose != CIRCUIT_PURPOSE_CONFLUX_UNLINKED);
return false;
}
}
int circuit_get_package_window(circuit_t *circ,
const crypt_path_t *cpath);
bool conflux_can_send(conflux_t *cfx);
bool edge_uses_cpath(const edge_connection_t *conn,
const crypt_path_t *cpath);
crypt_path_t *conflux_get_destination_hop(circuit_t *circ);
bool conflux_validate_source_hop(circuit_t *in_circ,
crypt_path_t *layer_hint);
uint64_t edge_get_max_rtt(const edge_connection_t *stream);
bool relay_crypt_from_last_hop(const origin_circuit_t *circ,
const crypt_path_t *layer_hint);
void conflux_update_p_streams(origin_circuit_t *, edge_connection_t *);
void conflux_update_half_streams(origin_circuit_t *, smartlist_t *);
void conflux_update_n_streams(or_circuit_t *, edge_connection_t *);
void conflux_update_resolving_streams(or_circuit_t *, edge_connection_t *);
void conflux_sync_circ_fields(conflux_t *cfx, origin_circuit_t *ref_circ);
void conflux_validate_stream_lists(const conflux_t *cfx);
void conflux_validate_legs(const conflux_t *cfx);
#endif /* TOR_CONFLUX_UTIL_H */

View file

@ -24,6 +24,7 @@
#include "core/or/congestion_control_westwood.h"
#include "core/or/congestion_control_st.h"
#include "core/or/conflux.h"
#include "core/or/conflux_util.h"
#include "core/or/trace_probes_cc.h"
#include "lib/time/compat_time.h"
#include "feature/nodelist/networkstatus.h"
@ -703,7 +704,7 @@ circuit_has_active_streams(const circuit_t *circ,
if (conn->base_.marked_for_close)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
if (edge_uses_cpath(conn, layer_hint)) {
if (connection_get_inbuf_len(TO_CONN(conn)) > 0) {
log_info(LD_CIRC, "CC: More in edge inbuf...");
return 1;

View file

@ -28,6 +28,7 @@
#include "core/or/connection_st.h"
#include "core/or/cell_st.h"
#include "app/config/config.h"
#include "core/or/conflux_util.h"
/** Cache consensus parameters */
static uint32_t xoff_client;
@ -60,27 +61,6 @@ double cc_stats_flow_xon_outbuf_ma = 0;
#define ONE_MEGABYTE (UINT64_C(1) << 20)
#define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE)
/**
* Return the congestion control object of the given edge connection.
*
* Returns NULL if the edge connection doesn't have a cpath_layer or not
* attached to a circuit. But also if the cpath_layer or circuit doesn't have a
* congestion control object.
*/
static inline const congestion_control_t *
edge_get_ccontrol(const edge_connection_t *edge)
{
congestion_control_t *ccontrol = NULL;
if (edge->on_circuit && edge->on_circuit->ccontrol) {
ccontrol = edge->on_circuit->ccontrol;
} else if (edge->cpath_layer && edge->cpath_layer->ccontrol) {
ccontrol = edge->cpath_layer->ccontrol;
}
return ccontrol;
}
/**
* Update global congestion control related consensus parameter values, every
* consensus update.
@ -265,13 +245,13 @@ circuit_process_stream_xoff(edge_connection_t *conn,
}
/* Make sure this XOFF came from the right hop */
if (layer_hint && layer_hint != conn->cpath_layer) {
if (!edge_uses_cpath(conn, layer_hint)) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XOFF from wrong hop.");
return false;
}
if (edge_get_ccontrol(conn) == NULL) {
if (!edge_uses_flow_control(conn)) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XOFF for non-congestion control circuit");
return false;
@ -359,13 +339,13 @@ circuit_process_stream_xon(edge_connection_t *conn,
}
/* Make sure this XON came from the right hop */
if (layer_hint && layer_hint != conn->cpath_layer) {
if (!edge_uses_cpath(conn, layer_hint)) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XON from wrong hop.");
return false;
}
if (edge_get_ccontrol(conn) == NULL) {
if (!edge_uses_flow_control(conn)) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XON for non-congestion control circuit");
return false;
@ -464,7 +444,7 @@ flow_control_decide_xoff(edge_connection_t *stream)
size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
uint32_t buffer_limit_xoff = 0;
if (BUG(edge_get_ccontrol(stream) == NULL)) {
if (BUG(!edge_uses_flow_control(stream))) {
log_err(LD_BUG, "Flow control called for non-congestion control circuit");
return -1;
}
@ -717,21 +697,6 @@ edge_uses_flow_control(const edge_connection_t *stream)
return ret;
}
/**
* Returns the max RTT for the circuit that carries this stream,
* as observed by congestion control.
*/
uint64_t
edge_get_max_rtt(const edge_connection_t *stream)
{
if (stream->on_circuit && stream->on_circuit->ccontrol)
return stream->on_circuit->ccontrol->max_rtt_usec;
else if (stream->cpath_layer && stream->cpath_layer->ccontrol)
return stream->cpath_layer->ccontrol->max_rtt_usec;
return 0;
}
/** Returns true if a connection is an edge conn that uses flow control */
bool
conn_uses_flow_control(connection_t *conn)

View file

@ -31,8 +31,6 @@ bool edge_uses_flow_control(const edge_connection_t *stream);
bool conn_uses_flow_control(connection_t *stream);
uint64_t edge_get_max_rtt(const edge_connection_t *);
/** Metricsport externs */
extern uint64_t cc_stats_flow_num_xoff_sent;
extern uint64_t cc_stats_flow_num_xon_sent;

View file

@ -70,6 +70,7 @@
#include "core/or/circuitpadding.h"
#include "core/or/connection_edge.h"
#include "core/or/congestion_control_flow.h"
#include "core/or/conflux_util.h"
#include "core/or/circuitstats.h"
#include "core/or/connection_or.h"
#include "core/or/extendinfo.h"
@ -628,6 +629,7 @@ connection_half_edge_add(const edge_connection_t *conn,
if (!circ->half_streams) {
circ->half_streams = smartlist_new();
conflux_update_half_streams(circ, circ->half_streams);
}
half_conn->stream_id = conn->stream_id;
@ -3102,6 +3104,10 @@ get_unique_stream_id_by_circ(origin_circuit_t *circ)
test_stream_id))
goto again;
if (TO_CIRCUIT(circ)->conflux) {
conflux_sync_circ_fields(TO_CIRCUIT(circ)->conflux, circ);
}
return test_stream_id;
}

View file

@ -28,11 +28,15 @@ struct edge_connection_t {
* circuit? */
int deliver_window; /**< How many more relay cells can end at me? */
struct circuit_t *on_circuit; /**< The circuit (if any) that this edge
* connection is using. */
/** The circuit (if any) that this edge connection is using.
* Note that edges that use conflux should use the helpers
* in conflux_util.c instead of accessing this directly. */
struct circuit_t *on_circuit;
/** A pointer to which node in the circ this conn exits at. Set for AP
* connections and for hidden service exit connections. */
* connections and for hidden service exit connections.
* Note that edges that use conflux should use the helpers
* in conflux_util.c instead of accessing this directly. */
struct crypt_path_t *cpath_layer;
/* Hidden service connection identifier for edge connections. Used by the HS

View file

@ -35,10 +35,18 @@ struct or_circuit_t {
cell_queue_t p_chan_cells;
/** The channel that is previous in this circuit. */
channel_t *p_chan;
/** Linked list of Exit streams associated with this circuit. */
/** Linked list of Exit streams associated with this circuit.
*
* Note that any updates to this pointer must be followed with
* conflux_update_n_streams() to keep the other legs n_streams
* in sync. */
edge_connection_t *n_streams;
/** Linked list of Exit streams associated with this circuit that are
* still being resolved. */
* still being resolved.
*
* Just like with n_streams, any updates to this pointer must
* be followed with conflux_update_resolving_streams().
*/
edge_connection_t *resolving_streams;
/** Cryptographic state used for encrypting and authenticating relay

View file

@ -80,11 +80,18 @@ struct origin_circuit_t {
circuit_t base_;
/** Linked list of AP streams (or EXIT streams if hidden service)
* associated with this circuit. */
* associated with this circuit.
*
* Any updates to this pointer must be followed with
* conflux_update_p_streams(). */
edge_connection_t *p_streams;
/** Smartlist of half-closed streams (half_edge_t*) that still
* have pending activity */
* have pending activity.
*
* Any updates to this pointer must be followed with
* conflux_update_half_streams().
*/
smartlist_t *half_streams;
/** Bytes read on this circuit since last call to

View file

@ -122,7 +122,8 @@ static int connection_edge_process_ordered_relay_cell(cell_t *cell,
edge_connection_t *conn,
crypt_path_t *layer_hint,
relay_header_t *rh);
static void set_block_state_for_streams(edge_connection_t *stream_list,
static void set_block_state_for_streams(circuit_t *circ,
edge_connection_t *stream_list,
int block, streamid_t stream_id);
/** Stats: how many relay cells have originated at this hop, or have
@ -455,7 +456,7 @@ relay_lookup_conn(circuit_t *circ, cell_t *cell,
tmpconn=tmpconn->next_stream) {
if (rh.stream_id == tmpconn->stream_id &&
!tmpconn->base_.marked_for_close &&
tmpconn->cpath_layer == layer_hint) {
edge_uses_cpath(tmpconn, layer_hint)) {
log_debug(LD_APP,"found conn for stream %d.", rh.stream_id);
return tmpconn;
}
@ -1549,25 +1550,6 @@ connection_edge_process_relay_cell_not_open(
// return -1;
}
/**
* Return true iff our decryption layer_hint is from the last hop
* in a circuit.
*/
static bool
relay_crypt_from_last_hop(origin_circuit_t *circ, crypt_path_t *layer_hint)
{
tor_assert(circ);
tor_assert(layer_hint);
tor_assert(circ->cpath);
if (layer_hint != circ->cpath->prev) {
log_fn(LOG_PROTOCOL_WARN, LD_CIRC,
"Got unexpected relay data from intermediate hop");
return false;
}
return true;
}
/** Process a SENDME cell that arrived on <b>circ</b>. If it is a stream level
* cell, it is destined for the given <b>conn</b>. If it is a circuit level
* cell, it is destined for the <b>layer_hint</b>. The <b>domain</b> is the
@ -2454,6 +2436,15 @@ circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
log_debug(layer_hint?LD_APP:LD_EXIT,"Too big queue, no resuming");
return;
}
/* If we have a conflux negotiated, and it still can't send on
* any circuit, then do not resume sending. */
if (circ->conflux && !conflux_can_send(circ->conflux)) {
log_debug(layer_hint?LD_APP:LD_EXIT,
"Conflux can't send, not resuming edges");
return;
}
log_debug(layer_hint?LD_APP:LD_EXIT,"resuming");
if (CIRCUIT_IS_ORIGIN(circ))
@ -2487,20 +2478,6 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
return 0;
}
/* How many cells do we have space for? It will be the minimum of
* the number needed to exhaust the package window, and the minimum
* needed to fill the cell queue. */
max_to_package = congestion_control_get_package_window(circ, layer_hint);
if (CIRCUIT_IS_ORIGIN(circ)) {
cells_on_queue = circ->n_chan_cells.n;
} else {
or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
cells_on_queue = or_circ->p_chan_cells.n;
}
if (cell_queue_highwatermark() - cells_on_queue < max_to_package)
max_to_package = cell_queue_highwatermark() - cells_on_queue;
/* Once we used to start listening on the streams in the order they
* appeared in the linked list. That leads to starvation on the
* streams that appeared later on the list, since the first streams
@ -2539,11 +2516,13 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
/* Activate reading starting from the chosen stream */
for (conn=chosen_stream; conn; conn = conn->next_stream) {
/* Start reading for the streams starting from here */
if (conn->base_.marked_for_close || conn->package_window <= 0 ||
conn->xoff_received)
if (conn->base_.marked_for_close || conn->package_window <= 0)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));
if (edge_uses_cpath(conn, layer_hint)) {
if (!conn->xoff_received) {
connection_start_reading(TO_CONN(conn));
}
if (connection_get_inbuf_len(TO_CONN(conn)) > 0)
++n_packaging_streams;
@ -2551,11 +2530,13 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
}
/* Go back and do the ones we skipped, circular-style */
for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) {
if (conn->base_.marked_for_close || conn->package_window <= 0 ||
conn->xoff_received)
if (conn->base_.marked_for_close || conn->package_window <= 0)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
connection_start_reading(TO_CONN(conn));
if (edge_uses_cpath(conn, layer_hint)) {
if (!conn->xoff_received) {
connection_start_reading(TO_CONN(conn));
}
if (connection_get_inbuf_len(TO_CONN(conn)) > 0)
++n_packaging_streams;
@ -2567,6 +2548,32 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
again:
/* If we're using conflux, the circuit we decide to send on may change
* after we're sending. Get it again, and re-check package windows
* for it */
if (circ->conflux) {
if (circuit_consider_stop_edge_reading(circ, layer_hint))
return -1;
circ = conflux_decide_next_circ(circ->conflux);
/* Get the destination layer hint for this circuit */
layer_hint = conflux_get_destination_hop(circ);
}
/* How many cells do we have space for? It will be the minimum of
* the number needed to exhaust the package window, and the minimum
* needed to fill the cell queue. */
max_to_package = congestion_control_get_package_window(circ, layer_hint);
if (CIRCUIT_IS_ORIGIN(circ)) {
cells_on_queue = circ->n_chan_cells.n;
} else {
or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
cells_on_queue = or_circ->p_chan_cells.n;
}
if (cell_queue_highwatermark() - cells_on_queue < max_to_package)
max_to_package = cell_queue_highwatermark() - cells_on_queue;
cells_per_conn = CEIL_DIV(max_to_package, n_packaging_streams);
packaged_this_round = 0;
@ -2580,7 +2587,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
for (conn=first_conn; conn; conn=conn->next_stream) {
if (conn->base_.marked_for_close || conn->package_window <= 0)
continue;
if (!layer_hint || conn->cpath_layer == layer_hint) {
if (edge_uses_cpath(conn, layer_hint)) {
int n = cells_per_conn, r;
/* handle whatever might still be on the inbuf */
r = connection_edge_package_raw_inbuf(conn, 1, &n);
@ -2638,7 +2645,7 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
log_debug(domain,"considering circ->package_window %d",
circ->package_window);
if (congestion_control_get_package_window(circ, layer_hint) <= 0) {
if (circuit_get_package_window(circ, layer_hint) <= 0) {
log_debug(domain,"yes, not-at-origin. stopped.");
for (conn = or_circ->n_streams; conn; conn=conn->next_stream)
connection_stop_reading(TO_CONN(conn));
@ -2649,11 +2656,11 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
/* else, layer hint is defined, use it */
log_debug(domain,"considering layer_hint->package_window %d",
layer_hint->package_window);
if (congestion_control_get_package_window(circ, layer_hint) <= 0) {
if (circuit_get_package_window(circ, layer_hint) <= 0) {
log_debug(domain,"yes, at-origin. stopped.");
for (conn = TO_ORIGIN_CIRCUIT(circ)->p_streams; conn;
conn=conn->next_stream) {
if (conn->cpath_layer == layer_hint)
if (edge_uses_cpath(conn, layer_hint))
connection_stop_reading(TO_CONN(conn));
}
return 1;
@ -3029,7 +3036,7 @@ set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block)
edge = TO_OR_CIRCUIT(circ)->n_streams;
}
set_block_state_for_streams(edge, block, 0);
set_block_state_for_streams(circ, edge, block, 0);
}
/**
@ -3039,15 +3046,29 @@ set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block)
* in the stream list. If it is non-zero, only apply to that specific stream.
*/
static void
set_block_state_for_streams(edge_connection_t *stream_list, int block,
streamid_t stream_id)
set_block_state_for_streams(circuit_t *circ, edge_connection_t *stream_list,
int block, streamid_t stream_id)
{
/* If we have a conflux object, we need to examine its status before
* blocking and unblocking streams. */
if (circ->conflux) {
bool can_send = conflux_can_send(circ->conflux);
if (block && can_send) {
/* Don't actually block streams, since conflux can send*/
return;
} else if (!block && !can_send) {
/* Don't actually unblock streams, since conflux still can't send */
return;
}
}
for (edge_connection_t *edge = stream_list; edge; edge = edge->next_stream) {
connection_t *conn = TO_CONN(edge);
if (stream_id && edge->stream_id != stream_id)
continue;
if (!conn->read_event) {
if (!conn->read_event || edge->xoff_received) {
/* This connection is a placeholder for something; probably a DNS
* request. It can't actually stop or start reading.*/
continue;
@ -3412,8 +3433,8 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
if (circ_blocked && fromstream) {
/* This edge connection is apparently not blocked; this can happen for
* new streams on a blocked circuit, for their CONNECTED response.
* block it now. */
set_block_state_for_streams(stream_list, 1, fromstream);
* block it now, unless we have conflux. */
set_block_state_for_streams(circ, stream_list, 1, fromstream);
}
update_circuit_on_cmux(circ, direction);

View file

@ -71,6 +71,7 @@
#include "core/or/edge_connection_st.h"
#include "core/or/or_circuit_st.h"
#include "core/or/conflux_util.h"
#include "ht.h"
@ -650,6 +651,7 @@ dns_resolve(edge_connection_t *exitconn)
* connected cell. */
exitconn->next_stream = oncirc->n_streams;
oncirc->n_streams = exitconn;
conflux_update_n_streams(oncirc, exitconn);
}
break;
case 0:
@ -658,6 +660,7 @@ dns_resolve(edge_connection_t *exitconn)
exitconn->base_.state = EXIT_CONN_STATE_RESOLVING;
exitconn->next_stream = oncirc->resolving_streams;
oncirc->resolving_streams = exitconn;
conflux_update_resolving_streams(oncirc, exitconn);
break;
case -2:
case -1:
@ -1234,6 +1237,7 @@ inform_pending_connections(cached_resolve_t *resolve)
pend->conn->next_stream = TO_OR_CIRCUIT(circ)->n_streams;
pend->conn->on_circuit = circ;
TO_OR_CIRCUIT(circ)->n_streams = pend->conn;
conflux_update_n_streams(TO_OR_CIRCUIT(circ), pend->conn);
connection_exit_connect(pend->conn);
} else {