Refactor the streaming compression code.

This patch refactors our streaming compression code to allow us to
extend it with non-zlib/non-gzip based compression schemas.

See https://bugs.torproject.org/21663
This commit is contained in:
Alexander Færøy 2017-04-17 14:57:37 +02:00
parent 44cb86adbe
commit 3c4459bcbf
No known key found for this signature in database
GPG key ID: E15081D5D3C3DB53
11 changed files with 93 additions and 87 deletions

View file

@ -399,9 +399,9 @@ detect_compression_method(const char *in, size_t in_len)
}
}
/** Internal state for an incremental zlib compression/decompression. The
* body of this struct is not exposed. */
struct tor_zlib_state_t {
/** Internal state for an incremental compression/decompression. The body of
* this struct is not exposed. */
struct tor_compress_state_t {
struct z_stream_s stream; /**< The zlib stream */
int compress; /**< True if we are compressing; false if we are inflating */
@ -414,14 +414,13 @@ struct tor_zlib_state_t {
size_t allocation;
};
/** Construct and return a tor_zlib_state_t object using <b>method</b>. If
* <b>compress</b>, it's for compression; otherwise it's for
* decompression. */
tor_zlib_state_t *
tor_zlib_new(int compress_, compress_method_t method,
compression_level_t compression_level)
/** Construct and return a tor_compress_state_t object using <b>method</b>. If
* <b>compress</b>, it's for compression; otherwise it's for decompression. */
tor_compress_state_t *
tor_compress_new(int compress_, compress_method_t method,
compression_level_t compression_level)
{
tor_zlib_state_t *out;
tor_compress_state_t *out;
int bits, memlevel;
if (! compress_) {
@ -430,7 +429,7 @@ tor_zlib_new(int compress_, compress_method_t method,
compression_level = HIGH_COMPRESSION;
}
out = tor_malloc_zero(sizeof(tor_zlib_state_t));
out = tor_malloc_zero(sizeof(tor_compress_state_t));
out->stream.zalloc = Z_NULL;
out->stream.zfree = Z_NULL;
out->stream.opaque = NULL;
@ -462,16 +461,17 @@ tor_zlib_new(int compress_, compress_method_t method,
* to *<b>out</b>, adjusting the values as we go. If <b>finish</b> is true,
* we've reached the end of the input.
*
* Return TOR_ZLIB_DONE if we've finished the entire compression/decompression.
* Return TOR_ZLIB_OK if we're processed everything from the input.
* Return TOR_ZLIB_BUF_FULL if we're out of space on <b>out</b>.
* Return TOR_ZLIB_ERR if the stream is corrupt.
* Return TOR_COMPRESS_DONE if we've finished the entire
* compression/decompression.
* Return TOR_COMPRESS_OK if we're processed everything from the input.
* Return TOR_COMPRESS_BUFFER_FULL if we're out of space on <b>out</b>.
* Return TOR_COMPRESS_ERROR if the stream is corrupt.
*/
tor_zlib_output_t
tor_zlib_process(tor_zlib_state_t *state,
char **out, size_t *out_len,
const char **in, size_t *in_len,
int finish)
tor_compress_output_t
tor_compress_process(tor_compress_state_t *state,
char **out, size_t *out_len,
const char **in, size_t *in_len,
int finish)
{
int err;
tor_assert(*in_len <= UINT_MAX);
@ -498,31 +498,31 @@ tor_zlib_process(tor_zlib_state_t *state,
if (! state->compress &&
is_compression_bomb(state->input_so_far, state->output_so_far)) {
log_warn(LD_DIR, "Possible zlib bomb; abandoning stream.");
return TOR_ZLIB_ERR;
return TOR_COMPRESS_ERROR;
}
switch (err)
{
case Z_STREAM_END:
return TOR_ZLIB_DONE;
return TOR_COMPRESS_DONE;
case Z_BUF_ERROR:
if (state->stream.avail_in == 0 && !finish)
return TOR_ZLIB_OK;
return TOR_ZLIB_BUF_FULL;
return TOR_COMPRESS_OK;
return TOR_COMPRESS_BUFFER_FULL;
case Z_OK:
if (state->stream.avail_out == 0 || finish)
return TOR_ZLIB_BUF_FULL;
return TOR_ZLIB_OK;
return TOR_COMPRESS_BUFFER_FULL;
return TOR_COMPRESS_OK;
default:
log_warn(LD_GENERAL, "Gzip returned an error: %s",
state->stream.msg ? state->stream.msg : "<no message>");
return TOR_ZLIB_ERR;
return TOR_COMPRESS_ERROR;
}
}
/** Deallocate <b>state</b>. */
void
tor_zlib_free(tor_zlib_state_t *state)
tor_compress_free(tor_compress_state_t *state)
{
if (!state)
return;
@ -553,7 +553,7 @@ tor_zlib_state_size_precalc(int inflate_, int windowbits, int memlevel)
that is, 32K for windowBits=15 (default value) plus a few kilobytes
for small objects."
*/
return sizeof(tor_zlib_state_t) + sizeof(struct z_stream_s) +
return sizeof(tor_compress_state_t) + sizeof(struct z_stream_s) +
(1 << 15) + A_FEW_KILOBYTES;
} else {
/* Also from zconf.h:
@ -562,7 +562,7 @@ tor_zlib_state_size_precalc(int inflate_, int windowbits, int memlevel)
(1 << (windowBits+2)) + (1 << (memLevel+9))
... plus a few kilobytes for small objects."
*/
return sizeof(tor_zlib_state_t) + sizeof(struct z_stream_s) +
return sizeof(tor_compress_state_t) + sizeof(struct z_stream_s) +
(1 << (windowbits + 2)) + (1 << (memlevel + 9)) + A_FEW_KILOBYTES;
}
#undef A_FEW_KILOBYTES
@ -570,7 +570,7 @@ tor_zlib_state_size_precalc(int inflate_, int windowbits, int memlevel)
/** Return the approximate number of bytes allocated for <b>state</b>. */
size_t
tor_zlib_state_size(const tor_zlib_state_t *state)
tor_compress_state_size(const tor_compress_state_t *state)
{
return state->allocation;
}

View file

@ -46,23 +46,27 @@ tor_zlib_get_header_version_str(void);
compress_method_t detect_compression_method(const char *in, size_t in_len);
/** Return values from tor_zlib_process; see that function's documentation for
* details. */
/** Return values from tor_compress_process; see that function's documentation
* for details. */
typedef enum {
TOR_ZLIB_OK, TOR_ZLIB_DONE, TOR_ZLIB_BUF_FULL, TOR_ZLIB_ERR
} tor_zlib_output_t;
TOR_COMPRESS_OK,
TOR_COMPRESS_DONE,
TOR_COMPRESS_BUFFER_FULL,
TOR_COMPRESS_ERROR
} tor_compress_output_t;
/** Internal state for an incremental zlib compression/decompression. */
typedef struct tor_zlib_state_t tor_zlib_state_t;
tor_zlib_state_t *tor_zlib_new(int compress, compress_method_t method,
compression_level_t level);
typedef struct tor_compress_state_t tor_compress_state_t;
tor_compress_state_t *tor_compress_new(int compress,
compress_method_t method,
compression_level_t level);
tor_zlib_output_t tor_zlib_process(tor_zlib_state_t *state,
char **out, size_t *out_len,
const char **in, size_t *in_len,
int finish);
void tor_zlib_free(tor_zlib_state_t *state);
tor_compress_output_t tor_compress_process(tor_compress_state_t *state,
char **out, size_t *out_len,
const char **in, size_t *in_len,
int finish);
void tor_compress_free(tor_compress_state_t *state);
size_t tor_zlib_state_size(const tor_zlib_state_t *state);
size_t tor_compress_state_size(const tor_compress_state_t *state);
size_t tor_zlib_get_total_allocation(void);
#endif

View file

@ -2088,11 +2088,11 @@ fetch_from_buf_line(buf_t *buf, char *data_out, size_t *data_len)
}
/** Compress on uncompress the <b>data_len</b> bytes in <b>data</b> using the
* zlib state <b>state</b>, appending the result to <b>buf</b>. If
* compression state <b>state</b>, appending the result to <b>buf</b>. If
* <b>done</b> is true, flush the data in the state and finish the
* compression/uncompression. Return -1 on failure, 0 on success. */
int
write_to_buf_zlib(buf_t *buf, tor_zlib_state_t *state,
write_to_buf_zlib(buf_t *buf, tor_compress_state_t *state,
const char *data, size_t data_len,
int done)
{
@ -2108,20 +2108,22 @@ write_to_buf_zlib(buf_t *buf, tor_zlib_state_t *state,
}
next = CHUNK_WRITE_PTR(buf->tail);
avail = old_avail = CHUNK_REMAINING_CAPACITY(buf->tail);
switch (tor_zlib_process(state, &next, &avail, &data, &data_len, done)) {
case TOR_ZLIB_DONE:
switch (tor_compress_process(state, &next, &avail,
&data, &data_len, done)) {
case TOR_COMPRESS_DONE:
over = 1;
break;
case TOR_ZLIB_ERR:
case TOR_COMPRESS_ERROR:
return -1;
case TOR_ZLIB_OK:
case TOR_COMPRESS_OK:
if (data_len == 0)
over = 1;
break;
case TOR_ZLIB_BUF_FULL:
case TOR_COMPRESS_BUFFER_FULL:
if (avail) {
/* Zlib says we need more room (ZLIB_BUF_FULL). Start a new chunk
* automatically, whether were going to or not. */
/* The compression module says we need more room
* (TOR_COMPRESS_BUFFER_FULL). Start a new chunk automatically,
* whether were going to or not. */
need_new_chunk = 1;
}
break;

View file

@ -36,7 +36,7 @@ int flush_buf(tor_socket_t s, buf_t *buf, size_t sz, size_t *buf_flushlen);
int flush_buf_tls(tor_tls_t *tls, buf_t *buf, size_t sz, size_t *buf_flushlen);
int write_to_buf(const char *string, size_t string_len, buf_t *buf);
int write_to_buf_zlib(buf_t *buf, tor_zlib_state_t *state,
int write_to_buf_zlib(buf_t *buf, tor_compress_state_t *state,
const char *data, size_t data_len, int done);
int move_buf_to_buf(buf_t *buf_out, buf_t *buf_in, size_t *buf_flushlen);
int fetch_from_buf(char *string, size_t string_len, buf_t *buf);

View file

@ -1991,8 +1991,8 @@ single_conn_free_bytes(connection_t *conn)
if (conn->type == CONN_TYPE_DIR) {
dir_connection_t *dir_conn = TO_DIR_CONN(conn);
if (dir_conn->zlib_state) {
result += tor_zlib_state_size(dir_conn->zlib_state);
tor_zlib_free(dir_conn->zlib_state);
result += tor_compress_state_size(dir_conn->zlib_state);
tor_compress_free(dir_conn->zlib_state);
dir_conn->zlib_state = NULL;
}
}

View file

@ -628,7 +628,7 @@ connection_free_(connection_t *conn)
dir_connection_t *dir_conn = TO_DIR_CONN(conn);
tor_free(dir_conn->requested_resource);
tor_zlib_free(dir_conn->zlib_state);
tor_compress_free(dir_conn->zlib_state);
if (dir_conn->spool) {
SMARTLIST_FOREACH(dir_conn->spool, spooled_resource_t *, spooled,
spooled_resource_free(spooled));

View file

@ -3178,7 +3178,7 @@ handle_get_current_consensus(dir_connection_t *conn,
write_http_response_header(conn, -1, compressed,
smartlist_len(conn->spool) == 1 ? lifetime : 0);
if (! compressed)
conn->zlib_state = tor_zlib_new(0, ZLIB_METHOD, HIGH_COMPRESSION);
conn->zlib_state = tor_compress_new(0, ZLIB_METHOD, HIGH_COMPRESSION);
/* Prime the connection with some data. */
const int initial_flush_result = connection_dirserv_flushed_some(conn);
@ -3276,8 +3276,8 @@ handle_get_status_vote(dir_connection_t *conn, const get_handler_args_t *args)
if (smartlist_len(items)) {
if (compressed) {
conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD,
choose_compression_level(estimated_len));
conn->zlib_state = tor_compress_new(1, ZLIB_METHOD,
choose_compression_level(estimated_len));
SMARTLIST_FOREACH(items, const char *, c,
connection_write_to_buf_zlib(c, strlen(c), conn, 0));
connection_write_to_buf_zlib("", 0, conn, 1);
@ -3335,7 +3335,7 @@ handle_get_microdesc(dir_connection_t *conn, const get_handler_args_t *args)
write_http_response_header(conn, -1, compressed, MICRODESC_CACHE_LIFETIME);
if (compressed)
conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD,
conn->zlib_state = tor_compress_new(1, ZLIB_METHOD,
choose_compression_level(size_guess));
const int initial_flush_result = connection_dirserv_flushed_some(conn);
@ -3428,7 +3428,7 @@ handle_get_descriptor(dir_connection_t *conn, const get_handler_args_t *args)
}
write_http_response_header(conn, -1, compressed, cache_lifetime);
if (compressed)
conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD,
conn->zlib_state = tor_compress_new(1, ZLIB_METHOD,
choose_compression_level(size_guess));
clear_spool = 0;
/* Prime the connection with some data. */
@ -3519,8 +3519,8 @@ handle_get_keys(dir_connection_t *conn, const get_handler_args_t *args)
write_http_response_header(conn, compressed?-1:len, compressed, 60*60);
if (compressed) {
conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD,
choose_compression_level(len));
conn->zlib_state = tor_compress_new(1, ZLIB_METHOD,
choose_compression_level(len));
SMARTLIST_FOREACH(certs, authority_cert_t *, c,
connection_write_to_buf_zlib(c->cache_info.signed_descriptor_body,
c->cache_info.signed_descriptor_len,

View file

@ -3792,7 +3792,7 @@ connection_dirserv_flushed_some(dir_connection_t *conn)
/* Flush the zlib state: there could be more bytes pending in there, and
* we don't want to omit bytes. */
connection_write_to_buf_zlib("", 0, conn, 1);
tor_zlib_free(conn->zlib_state);
tor_compress_free(conn->zlib_state);
conn->zlib_state = NULL;
}
return 0;

View file

@ -1773,8 +1773,8 @@ typedef struct dir_connection_t {
/** List of spooled_resource_t for objects that we're spooling. We use
* it from back to front. */
smartlist_t *spool;
/** The zlib object doing on-the-fly compression for spooled data. */
tor_zlib_state_t *zlib_state;
/** The compression object doing on-the-fly compression for spooled data. */
tor_compress_state_t *zlib_state;
/** What rendezvous service are we querying for? */
rend_data_t *rend_data;

View file

@ -584,12 +584,12 @@ test_buffers_zlib_impl(int finalize_with_nil)
char *contents = NULL;
char *expanded = NULL;
buf_t *buf = NULL;
tor_zlib_state_t *zlib_state = NULL;
tor_compress_state_t *zlib_state = NULL;
size_t out_len, in_len;
int done;
buf = buf_new_with_capacity(128); /* will round up */
zlib_state = tor_zlib_new(1, ZLIB_METHOD, HIGH_COMPRESSION);
zlib_state = tor_compress_new(1, ZLIB_METHOD, HIGH_COMPRESSION);
msg = tor_malloc(512);
crypto_rand(msg, 512);
@ -621,7 +621,7 @@ test_buffers_zlib_impl(int finalize_with_nil)
done:
buf_free(buf);
tor_zlib_free(zlib_state);
tor_compress_free(zlib_state);
tor_free(contents);
tor_free(expanded);
tor_free(msg);
@ -647,7 +647,7 @@ test_buffers_zlib_fin_at_chunk_end(void *arg)
char *contents = NULL;
char *expanded = NULL;
buf_t *buf = NULL;
tor_zlib_state_t *zlib_state = NULL;
tor_compress_state_t *zlib_state = NULL;
size_t out_len, in_len;
size_t sz, headerjunk;
(void) arg;
@ -666,7 +666,7 @@ test_buffers_zlib_fin_at_chunk_end(void *arg)
tt_uint_op(buf->head->datalen, OP_EQ, headerjunk);
tt_uint_op(buf_datalen(buf), OP_EQ, headerjunk);
/* Write an empty string, with finalization on. */
zlib_state = tor_zlib_new(1, ZLIB_METHOD, HIGH_COMPRESSION);
zlib_state = tor_compress_new(1, ZLIB_METHOD, HIGH_COMPRESSION);
tt_int_op(write_to_buf_zlib(buf, zlib_state, "", 0, 1), OP_EQ, 0);
in_len = buf_datalen(buf);
@ -687,7 +687,7 @@ test_buffers_zlib_fin_at_chunk_end(void *arg)
done:
buf_free(buf);
tor_zlib_free(zlib_state);
tor_compress_free(zlib_state);
tor_free(contents);
tor_free(expanded);
tor_free(msg);

View file

@ -2249,7 +2249,7 @@ test_util_gzip(void *arg)
char *buf1=NULL, *buf2=NULL, *buf3=NULL, *cp1, *cp2;
const char *ccp2;
size_t len1, len2;
tor_zlib_state_t *state = NULL;
tor_compress_state_t *state = NULL;
(void)arg;
buf1 = tor_strdup("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAZAAAAAAAAAAAAAAAAAAAZ");
@ -2320,21 +2320,21 @@ test_util_gzip(void *arg)
tor_free(buf1);
tor_free(buf2);
tor_free(buf3);
state = tor_zlib_new(1, ZLIB_METHOD, HIGH_COMPRESSION);
state = tor_compress_new(1, ZLIB_METHOD, HIGH_COMPRESSION);
tt_assert(state);
cp1 = buf1 = tor_malloc(1024);
len1 = 1024;
ccp2 = "ABCDEFGHIJABCDEFGHIJ";
len2 = 21;
tt_assert(tor_zlib_process(state, &cp1, &len1, &ccp2, &len2, 0)
== TOR_ZLIB_OK);
tt_int_op(tor_compress_process(state, &cp1, &len1, &ccp2, &len2, 0),
OP_EQ, TOR_COMPRESS_OK);
tt_int_op(0, OP_EQ, len2); /* Make sure we compressed it all. */
tt_assert(cp1 > buf1);
len2 = 0;
cp2 = cp1;
tt_assert(tor_zlib_process(state, &cp1, &len1, &ccp2, &len2, 1)
== TOR_ZLIB_DONE);
tt_int_op(tor_compress_process(state, &cp1, &len1, &ccp2, &len2, 1),
OP_EQ, TOR_COMPRESS_DONE);
tt_int_op(0, OP_EQ, len2);
tt_assert(cp1 > cp2); /* Make sure we really added something. */
@ -2346,7 +2346,7 @@ test_util_gzip(void *arg)
done:
if (state)
tor_zlib_free(state);
tor_compress_free(state);
tor_free(buf2);
tor_free(buf3);
tor_free(buf1);
@ -2364,7 +2364,7 @@ test_util_gzip_compression_bomb(void *arg)
char *one_mb = tor_malloc_zero(one_million);
char *result = NULL;
size_t result_len = 0;
tor_zlib_state_t *state = NULL;
tor_compress_state_t *state = NULL;
/* Make sure we can't produce a compression bomb */
setup_full_capture_of_logs(LOG_WARN);
@ -2386,22 +2386,22 @@ test_util_gzip_compression_bomb(void *arg)
ZLIB_METHOD, 0, LOG_WARN));
/* Now try streaming that. */
state = tor_zlib_new(0, ZLIB_METHOD, HIGH_COMPRESSION);
tor_zlib_output_t r;
state = tor_compress_new(0, ZLIB_METHOD, HIGH_COMPRESSION);
tor_compress_output_t r;
const char *inp = compression_bomb;
size_t inlen = 1039;
do {
char *outp = one_mb;
size_t outleft = 4096; /* small on purpose */
r = tor_zlib_process(state, &outp, &outleft, &inp, &inlen, 0);
r = tor_compress_process(state, &outp, &outleft, &inp, &inlen, 0);
tt_int_op(inlen, OP_NE, 0);
} while (r == TOR_ZLIB_BUF_FULL);
} while (r == TOR_COMPRESS_BUFFER_FULL);
tt_int_op(r, OP_EQ, TOR_ZLIB_ERR);
tt_int_op(r, OP_EQ, TOR_COMPRESS_ERROR);
done:
tor_free(one_mb);
tor_zlib_free(state);
tor_compress_free(state);
}
/** Run unit tests for mmap() wrapper functionality. */