Implemented SynchronizedProtoOutputStream

This commit is contained in:
Sergey Rozhnov 2018-02-02 15:55:20 +04:00
parent 5fc7364973
commit 98eabf84fa
3 changed files with 56 additions and 5 deletions

View file

@ -0,0 +1,7 @@
package io.bisq.network.p2p.network;
class BisqRuntimeException extends RuntimeException {
BisqRuntimeException(String message, Throwable cause) {
super(message, cause);
}
}

View file

@ -1,23 +1,40 @@
package io.bisq.network.p2p.network;
import io.bisq.common.proto.network.NetworkEnvelope;
import io.bisq.generated.protobuffer.PB;
import io.bisq.network.p2p.peers.keepalive.messages.KeepAliveMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
import java.io.OutputStream;
@NotThreadSafe
class ProtoOutputStream {
private final OutputStream deleage;
private static final Logger log = LoggerFactory.getLogger(ProtoOutputStream.class);
private final OutputStream delegate;
private final Statistic statistic;
ProtoOutputStream(OutputStream deleage, Statistic statistic) {
this.deleage = deleage;
ProtoOutputStream(OutputStream delegate, Statistic statistic) {
this.delegate = delegate;
this.statistic = statistic;
}
void writeEnvelope(NetworkEnvelope envelope) {
try {
writeEnvelopeOrThrow(envelope);
} catch (IOException e) {
log.error("Failed to write envelope", e);
throw new BisqRuntimeException("Failed to write envelope", e);
}
}
private void writeEnvelopeOrThrow(NetworkEnvelope envelope) throws IOException {
PB.NetworkEnvelope proto = envelope.toProtoNetworkEnvelope();
proto.writeDelimitedTo(deleage);
deleage.flush();
proto.writeDelimitedTo(delegate);
delegate.flush();
statistic.addSentBytes(proto.getSerializedSize());
statistic.addSentMessage(envelope);

View file

@ -0,0 +1,27 @@
package io.bisq.network.p2p.network;
import io.bisq.common.proto.network.NetworkEnvelope;
import javax.annotation.concurrent.ThreadSafe;
import java.io.OutputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ThreadSafe
class SynchronizedProtoOutputStream extends ProtoOutputStream {
private final ExecutorService executorService;
SynchronizedProtoOutputStream(OutputStream delegate, Statistic statistic) {
super(delegate, statistic);
this.executorService = Executors.newSingleThreadExecutor();
}
@Override
void writeEnvelope(NetworkEnvelope envelope) {
executorService.submit(() -> super.writeEnvelope(envelope));
}
void onConnectionShutdown() {
executorService.shutdownNow();
}
}