diff --git a/channeld/channel.c b/channeld/channel.c index d9ed24a5b..d1a1684fc 100644 --- a/channeld/channel.c +++ b/channeld/channel.c @@ -112,11 +112,6 @@ struct peer { const u8 *peer_outmsg; size_t peer_outoff; -#if DEVELOPER - /* Sabotage fd after sending next msg. */ - bool post_sabotage; -#endif - /* Messages from master / gossipd: we queue them since we * might be waiting for a specific reply. */ struct msg_queue from_master, from_gossipd; @@ -178,6 +173,84 @@ static void *tal_arr_append_(void **p, size_t size) } #define tal_arr_append(p) tal_arr_append_((void **)(p), sizeof(**(p))) +static void do_peer_write(struct peer *peer) +{ + int r; + size_t len = tal_len(peer->peer_outmsg); + + r = write(PEER_FD, peer->peer_outmsg + peer->peer_outoff, + len - peer->peer_outoff); + if (r < 0) + status_failed(STATUS_FAIL_PEER_IO, + "Peer write failed: %s", strerror(errno)); + + peer->peer_outoff += r; + if (peer->peer_outoff == len) + peer->peer_outmsg = tal_free(peer->peer_outmsg); +} + +static bool peer_write_pending(struct peer *peer) +{ + const u8 *msg; + + if (peer->peer_outmsg) + return true; + + msg = msg_dequeue(&peer->peer_out); + if (!msg) + return false; + + status_trace("peer_out %s", wire_type_name(fromwire_peektype(msg))); + peer->peer_outmsg = cryptomsg_encrypt_msg(peer, &peer->cs, take(msg)); + peer->peer_outoff = 0; + return true; +} + +/* Synchronous flush of all pending packets. */ +static void flush_peer_out(struct peer *peer) +{ + while (peer_write_pending(peer)) + do_peer_write(peer); +} + +static void enqueue_peer_msg(struct peer *peer, const u8 *msg TAKES) +{ +#if DEVELOPER + enum dev_disconnect d = dev_disconnect(fromwire_peektype(msg)); + + /* We want to effect this exact packet, so flush any pending. */ + if (d != DEV_DISCONNECT_NORMAL) + flush_peer_out(peer); + + switch (d) { + case DEV_DISCONNECT_BEFORE: + /* Fail immediately. */ + dev_sabotage_fd(PEER_FD); + msg_enqueue(&peer->peer_out, msg); + flush_peer_out(peer); + /* Should not return */ + abort(); + case DEV_DISCONNECT_DROPPKT: + tal_free(msg); + /* Fail next time we try to do something. */ + dev_sabotage_fd(PEER_FD); + return; + case DEV_DISCONNECT_AFTER: + msg_enqueue(&peer->peer_out, msg); + flush_peer_out(peer); + dev_sabotage_fd(PEER_FD); + return; + case DEV_DISCONNECT_BLACKHOLE: + msg_enqueue(&peer->peer_out, msg); + dev_blackhole_fd(PEER_FD); + return; + case DEV_DISCONNECT_NORMAL: + break; + } +#endif + msg_enqueue(&peer->peer_out, msg); +} + static void gossip_in(struct peer *peer, const u8 *msg) { u8 *gossip; @@ -192,7 +265,7 @@ static void gossip_in(struct peer *peer, const u8 *msg) if (type == WIRE_CHANNEL_ANNOUNCEMENT || type == WIRE_CHANNEL_UPDATE || type == WIRE_NODE_ANNOUNCEMENT) - msg_enqueue(&peer->peer_out, gossip); + enqueue_peer_msg(peer, gossip); else status_failed(STATUS_FAIL_GOSSIP_IO, "Got bad message type %s from gossipd: %s", @@ -287,7 +360,7 @@ static void send_announcement_signatures(struct peer *peer) tmpctx, &peer->channel_id, &peer->short_channel_ids[LOCAL], &peer->announcement_node_sigs[LOCAL], &peer->announcement_bitcoin_sigs[LOCAL]); - msg_enqueue(&peer->peer_out, take(msg)); + enqueue_peer_msg(peer, take(msg)); tal_free(tmpctx); } @@ -612,7 +685,7 @@ static void maybe_send_shutdown(struct peer *peer) msg = towire_shutdown(peer, &peer->channel_id, peer->unsent_shutdown_scriptpubkey); - msg_enqueue(&peer->peer_out, take(msg)); + enqueue_peer_msg(peer, take(msg)); peer->unsent_shutdown_scriptpubkey = tal_free(peer->unsent_shutdown_scriptpubkey); peer->shutdown_sent[LOCAL] = true; @@ -804,7 +877,7 @@ static void send_commit(struct peer *peer) feerate, max); msg = towire_update_fee(peer, &peer->channel_id, feerate); - msg_enqueue(&peer->peer_out, take(msg)); + enqueue_peer_msg(peer, take(msg)); } /* BOLT #2: @@ -846,7 +919,7 @@ static void send_commit(struct peer *peer) msg = towire_commitment_signed(peer, &peer->channel_id, &peer->next_commit_sigs->commit_sig, peer->next_commit_sigs->htlc_sigs); - msg_enqueue(&peer->peer_out, take(msg)); + enqueue_peer_msg(peer, take(msg)); peer->next_commit_sigs = tal_free(peer->next_commit_sigs); maybe_send_shutdown(peer); @@ -918,7 +991,7 @@ static void send_revocation(struct peer *peer) start_commit_timer(peer); } - msg_enqueue(&peer->peer_out, take(msg)); + enqueue_peer_msg(peer, take(msg)); } static u8 *got_commitsig_msg(const tal_t *ctx, @@ -1389,7 +1462,7 @@ static void handle_ping(struct peer *peer, const u8 *msg) : "nothing"); if (pong) - msg_enqueue(&peer->peer_out, take(pong)); + enqueue_peer_msg(peer, take(pong)); } static void handle_pong(struct peer *peer, const u8 *pong) @@ -1541,7 +1614,7 @@ static void resend_revoke(struct peer *peer) { /* Current commit is peer->next_index[LOCAL]-1, revoke prior */ u8 *msg = make_revocation_msg(peer, peer->next_index[LOCAL]-2); - msg_enqueue(&peer->peer_out, take(msg)); + enqueue_peer_msg(peer, take(msg)); } static void send_fail_or_fulfill(struct peer *peer, const struct htlc *h) @@ -1566,7 +1639,7 @@ static void send_fail_or_fulfill(struct peer *peer, const struct htlc *h) &peer->channel_id, "HTLC %"PRIu64" state %s not failed/fulfilled", h->id, htlc_state_name(h->state)); - msg_enqueue(&peer->peer_out, take(msg)); + enqueue_peer_msg(peer, take(msg)); } static void resend_commitment(struct peer *peer, const struct changed_htlc *last) @@ -1607,7 +1680,7 @@ static void resend_commitment(struct peer *peer, const struct changed_htlc *last abs_locktime_to_blocks( &h->expiry), h->routing); - msg_enqueue(&peer->peer_out, take(msg)); + enqueue_peer_msg(peer, take(msg)); } else if (h->state == SENT_REMOVE_COMMIT) { send_fail_or_fulfill(peer, h); } @@ -1617,7 +1690,7 @@ static void resend_commitment(struct peer *peer, const struct changed_htlc *last if (peer->channel->funder == LOCAL) { msg = towire_update_fee(peer, &peer->channel_id, channel_feerate(peer->channel, REMOTE)); - msg_enqueue(&peer->peer_out, take(msg)); + enqueue_peer_msg(peer, take(msg)); } /* Re-send the commitment_signed itself. */ @@ -1625,7 +1698,7 @@ static void resend_commitment(struct peer *peer, const struct changed_htlc *last msg = towire_commitment_signed(peer, &peer->channel_id, &commit_sigs->commit_sig, commit_sigs->htlc_sigs); - msg_enqueue(&peer->peer_out, take(msg)); + enqueue_peer_msg(peer, take(msg)); tal_free(commit_sigs); assert(peer->revocations_received == peer->next_index[REMOTE] - 2); @@ -1703,7 +1776,7 @@ again: msg = towire_funding_locked(peer, &peer->channel_id, &next_per_commit_point); - msg_enqueue(&peer->peer_out, take(msg)); + enqueue_peer_msg(peer, take(msg)); } /* Note: next_index is the index of the current commit we're working @@ -1807,7 +1880,7 @@ again: * disable flag */ cupdate = create_channel_update(peer, peer, false); wire_sync_write(GOSSIP_FD, cupdate); - msg_enqueue(&peer->peer_out, take(cupdate)); + enqueue_peer_msg(peer, take(cupdate)); /* Corner case: we will get upset with them if they send * commitment_signed with no changes. But it could be that we sent a @@ -1835,7 +1908,7 @@ static void handle_funding_locked(struct peer *peer, const u8 *msg) type_to_string(trc, struct pubkey, &next_per_commit_point)); msg = towire_funding_locked(peer, &peer->channel_id, &next_per_commit_point); - msg_enqueue(&peer->peer_out, take(msg)); + enqueue_peer_msg(peer, take(msg)); peer->funding_locked[LOCAL] = true; if (peer->funding_locked[REMOTE]) { @@ -1893,7 +1966,7 @@ static void handle_offer_htlc(struct peer *peer, const u8 *inmsg) peer->htlc_id, amount_msat, &payment_hash, cltv_expiry, onion_routing_packet); - msg_enqueue(&peer->peer_out, take(msg)); + enqueue_peer_msg(peer, take(msg)); peer->funding_locked[LOCAL] = true; start_commit_timer(peer); /* Tell the master. */ @@ -1983,7 +2056,7 @@ static void handle_preimage(struct peer *peer, const u8 *inmsg) case CHANNEL_ERR_REMOVE_OK: msg = towire_update_fulfill_htlc(peer, &peer->channel_id, id, &preimage); - msg_enqueue(&peer->peer_out, take(msg)); + enqueue_peer_msg(peer, take(msg)); start_commit_timer(peer); return; /* These shouldn't happen, because any offered HTLC (which would give @@ -2149,7 +2222,7 @@ static void handle_fail(struct peer *peer, const u8 *inmsg) msg = towire_update_fail_htlc(peer, &peer->channel_id, id, reply); } - msg_enqueue(&peer->peer_out, take(msg)); + enqueue_peer_msg(peer, take(msg)); start_commit_timer(peer); return; case CHANNEL_ERR_NO_SUCH_ID: @@ -2175,7 +2248,7 @@ static void handle_ping_cmd(struct peer *peer, const u8 *inmsg) if (tal_len(ping) > 65535) status_failed(STATUS_FAIL_MASTER_IO, "Oversize channel_ping"); - msg_enqueue(&peer->peer_out, take(ping)); + enqueue_peer_msg(peer, take(ping)); status_trace("sending ping expecting %sresponse", num_pong_bytes >= 65532 ? "no " : ""); @@ -2400,79 +2473,15 @@ static void init_channel(struct peer *peer) /* If we have a funding_signed message, send that immediately */ if (funding_signed) - msg_enqueue(&peer->peer_out, take(funding_signed)); + enqueue_peer_msg(peer, take(funding_signed)); tal_free(msg); } -#ifndef TESTING -static void do_peer_write(struct peer *peer) -{ - int r; - size_t len = tal_len(peer->peer_outmsg); - - r = write(PEER_FD, peer->peer_outmsg + peer->peer_outoff, - len - peer->peer_outoff); - if (r < 0) - status_failed(STATUS_FAIL_PEER_IO, - "Peer write failed: %s", strerror(errno)); - - peer->peer_outoff += r; - if (peer->peer_outoff == len) { - peer->peer_outmsg = tal_free(peer->peer_outmsg); -#if DEVELOPER - if (peer->post_sabotage) - dev_sabotage_fd(PEER_FD); -#endif - } -} - -static bool peer_write_pending(struct peer *peer) -{ - const u8 *msg; - - if (peer->peer_outmsg) - return true; - - msg = msg_dequeue(&peer->peer_out); - if (!msg) - return false; - -#if DEVELOPER - peer->post_sabotage = false; - - switch (dev_disconnect(fromwire_peektype(msg))) { - case DEV_DISCONNECT_BEFORE: - dev_sabotage_fd(PEER_FD); - break; - case DEV_DISCONNECT_DROPPKT: - tal_free(msg); - peer->post_sabotage = true; - peer->peer_outmsg = NULL; - peer->peer_outoff = 0; - return true; - case DEV_DISCONNECT_AFTER: - peer->post_sabotage = true; - break; - case DEV_DISCONNECT_BLACKHOLE: - dev_blackhole_fd(PEER_FD); - break; - case DEV_DISCONNECT_NORMAL: - break; - } -#endif - - status_trace("peer_out %s", wire_type_name(fromwire_peektype(msg))); - peer->peer_outmsg = cryptomsg_encrypt_msg(peer, &peer->cs, take(msg)); - peer->peer_outoff = 0; - return true; -} - static void send_shutdown_complete(struct peer *peer) { /* Push out any incomplete messages to peer. */ - while (peer_write_pending(peer)) - do_peer_write(peer); + flush_peer_out(peer); /* Now we can tell master shutdown is complete. */ wire_sync_write(MASTER_FD, @@ -2513,9 +2522,6 @@ int main(int argc, char *argv[]) msg_queue_init(&peer->peer_out, peer); peer->peer_outmsg = NULL; peer->peer_outoff = 0; -#if DEVELOPER - peer->post_sabotage = false; -#endif peer->next_commit_sigs = NULL; peer->shutdown_sent[LOCAL] = false; @@ -2637,4 +2643,3 @@ int main(int argc, char *argv[]) return 0; } -#endif /* TESTING */