Make PaymentChannel.ServerConnection.paymentIncrease asynchronous.

This commit is contained in:
Martin Zachrison 2014-09-23 14:43:41 +02:00 committed by Mike Hearn
parent 798c341eb1
commit 036f0bec27
8 changed files with 53 additions and 20 deletions

View File

@ -3,6 +3,7 @@ package com.google.bitcoin.jni;
import com.google.bitcoin.core.*;
import com.google.bitcoin.protocols.channels.PaymentChannelCloseException;
import com.google.bitcoin.protocols.channels.ServerConnectionEventHandler;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
/**
@ -17,7 +18,7 @@ public class NativePaymentChannelServerConnectionEventHandler extends ServerConn
public native void channelOpen(Sha256Hash channelId);
@Override
public native ByteString paymentIncrease(Coin by, Coin to, ByteString info);
public native ListenableFuture<ByteString> paymentIncrease(Coin by, Coin to, ByteString info);
@Override
public native void channelClosed(PaymentChannelCloseException.CloseReason reason);

View File

@ -61,6 +61,9 @@ public class PaymentChannelCloseException extends Exception {
/** The connection was closed without an ERROR/CLOSE message */
CONNECTION_CLOSED,
/** The server failed processing an UpdatePayment message */
UPDATE_PAYMENT_FAILED,
}
private final CloseReason error;

View File

@ -21,6 +21,7 @@ import com.google.bitcoin.protocols.channels.PaymentChannelCloseException.CloseR
import com.google.bitcoin.utils.Threading;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import net.jcip.annotations.GuardedBy;
import org.bitcoin.paymentchannel.Protos;
@ -103,10 +104,10 @@ public class PaymentChannelServer {
* @param by The increase in total payment
* @param to The new total payment to us (not including fees which may be required to claim the payment)
* @param info Information about this payment increase, used to extend this protocol.
* @return An ack message that will be included in the PaymentAck message to the client. Use null for no ack message.
* @return A future that completes with the ack message that will be included in the PaymentAck message to the client. Use null for no ack message.
*/
@Nullable
public ByteString paymentIncrease(Coin by, Coin to, @Nullable ByteString info);
public ListenableFuture<ByteString> paymentIncrease(Coin by, Coin to, @Nullable ByteString info);
}
private final ServerConnection conn;
@ -368,17 +369,32 @@ public class PaymentChannelServer {
boolean stillUsable = state.incrementPayment(refundSize, msg.getSignature().toByteArray());
Coin bestPaymentChange = state.getBestValueToMe().subtract(lastBestPayment);
ByteString ackInfo = null;
ListenableFuture<ByteString> ackInfoFuture = null;
if (bestPaymentChange.signum() > 0) {
ByteString info = (msg.hasInfo()) ? msg.getInfo() : null;
ackInfo = conn.paymentIncrease(bestPaymentChange, state.getBestValueToMe(), info);
ackInfoFuture = conn.paymentIncrease(bestPaymentChange, state.getBestValueToMe(), info);
}
if (sendAck) {
Protos.TwoWayChannelMessage.Builder ack = Protos.TwoWayChannelMessage.newBuilder();
final Protos.TwoWayChannelMessage.Builder ack = Protos.TwoWayChannelMessage.newBuilder();
ack.setType(Protos.TwoWayChannelMessage.MessageType.PAYMENT_ACK);
if (ackInfo != null) ack.setPaymentAck(ack.getPaymentAckBuilder().setInfo(ackInfo));
conn.sendToClient(ack.build());
if (ackInfoFuture == null) {
conn.sendToClient(ack.build());
} else {
Futures.addCallback(ackInfoFuture, new FutureCallback<ByteString>() {
@Override
public void onSuccess(@Nullable ByteString result) {
if (result != null) ack.setPaymentAck(ack.getPaymentAckBuilder().setInfo(result));
conn.sendToClient(ack.build());
}
@Override
public void onFailure(Throwable t) {
log.info("Failed retrieving paymentIncrease info future");
error("Failed processing payment update", Protos.Error.ErrorCode.OTHER, CloseReason.UPDATE_PAYMENT_FAILED);
}
});
}
}
if (!stillUsable) {
@ -551,7 +567,7 @@ public class PaymentChannelServer {
* <p>Closes the connection by generating a settle message for the client and calls
* {@link ServerConnection#destroyConnection(CloseReason)}. Note that this does not broadcast
* the payment transaction and the client may still resume the same channel if they reconnect</p>
*
* <p>
* <p>Note that {@link PaymentChannelServer#connectionClosed()} must still be called after the connection fully
* closes.</p>
*/

View File

@ -25,6 +25,7 @@ import com.google.bitcoin.net.NioServer;
import com.google.bitcoin.net.ProtobufParser;
import com.google.bitcoin.net.StreamParserFactory;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import org.bitcoin.paymentchannel.Protos;
@ -83,7 +84,7 @@ public class PaymentChannelServerListener {
eventHandler.channelOpen(contractHash);
}
@Override public ByteString paymentIncrease(Coin by, Coin to, @Nullable ByteString info) {
@Override public ListenableFuture<ByteString> paymentIncrease(Coin by, Coin to, @Nullable ByteString info) {
return eventHandler.paymentIncrease(by, to, info);
}
});

View File

@ -20,6 +20,7 @@ import com.google.bitcoin.core.Coin;
import com.google.bitcoin.core.Sha256Hash;
import com.google.bitcoin.net.ProtobufParser;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import org.bitcoin.paymentchannel.Protos;
@ -74,7 +75,7 @@ public abstract class ServerConnectionEventHandler {
* @return acknowledgment information to be sent to the client.
*/
@Nullable
public abstract ByteString paymentIncrease(Coin by, Coin to, ByteString info);
public abstract ListenableFuture<ByteString> paymentIncrease(Coin by, Coin to, ByteString info);
/**
* <p>Called when the channel was closed for some reason. May be called without a call to

View File

@ -21,6 +21,7 @@ import com.google.bitcoin.store.WalletProtobufSerializer;
import com.google.bitcoin.testing.TestWithWallet;
import com.google.bitcoin.utils.Threading;
import com.google.bitcoin.wallet.WalletFiles;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
@ -35,6 +36,7 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@ -131,9 +133,9 @@ public class ChannelConnectionTest extends TestWithWallet {
}
@Override
public ByteString paymentIncrease(Coin by, Coin to, ByteString info) {
public ListenableFuture<ByteString> paymentIncrease(Coin by, Coin to, ByteString info) {
q.add(new ChannelTestUtils.UpdatePair(to, info));
return null;
return Futures.immediateFuture(info);
}
@Override
@ -174,11 +176,16 @@ public class ChannelConnectionTest extends TestWithWallet {
Thread.sleep(1250); // No timeouts once the channel is open
Coin amount = client.state().getValueSpent();
q.take().assertPair(amount, null);
ByteString[] infos = new ByteString[]{null, ByteString.copyFromUtf8("one"),ByteString.copyFromUtf8("two")};
for (ByteString info : infos) {
client.incrementPayment(CENT, info).get();
for (String info : new String[] {null, "one", "two"} ) {
final ByteString bytes = (info==null) ? null :ByteString.copyFromUtf8(info);
final PaymentIncrementAck ack = client.incrementPayment(CENT, bytes).get();
if (info != null) {
final ByteString ackInfo = ack.getInfo();
assertNotNull("Ack info is null", ackInfo);
assertEquals("Ack info differs ", info, ackInfo.toStringUtf8());
}
amount = amount.add(CENT);
q.take().assertPair(amount, info);
q.take().assertPair(amount, bytes);
}
latch.await();

View File

@ -5,6 +5,9 @@ import com.google.bitcoin.core.Sha256Hash;
import com.google.bitcoin.core.TransactionBroadcaster;
import com.google.bitcoin.core.Wallet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import org.bitcoin.paymentchannel.Protos;
@ -37,9 +40,9 @@ public class ChannelTestUtils {
}
@Override
public ByteString paymentIncrease(Coin by, Coin to, @Nullable ByteString info) {
public ListenableFuture<ByteString> paymentIncrease(Coin by, Coin to, @Nullable ByteString info) {
q.add(new UpdatePair(to, info));
return ByteString.copyFromUtf8(by.toPlainString());
return Futures.immediateFuture(ByteString.copyFromUtf8(by.toPlainString()));
}
public Protos.TwoWayChannelMessage getNextMsg() throws InterruptedException {

View File

@ -28,6 +28,7 @@ import com.google.bitcoin.protocols.channels.*;
import com.google.bitcoin.utils.BriefLogFormatter;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import org.slf4j.LoggerFactory;
@ -102,7 +103,7 @@ public class ExamplePaymentChannelServer implements PaymentChannelServerListener
}
@Override
public ByteString paymentIncrease(Coin by, Coin to, ByteString info) {
public ListenableFuture<ByteString> paymentIncrease(Coin by, Coin to, ByteString info) {
log.info("Client {} paid increased payment by {} for a total of " + to.toString(), clientAddress, by);
return null;
}