From 871656b18d7139cdca19148d42615c719997ccb5 Mon Sep 17 00:00:00 2001 From: Alva Swanson Date: Mon, 30 Jan 2023 16:50:06 +0100 Subject: [PATCH] Make ProtoOutputStream thread-safe Before SynchronizedProtoOutputStream created a new thread and made a blocking call to ProtoOutputStream.writeEnvelope(...). Each connection had an instance of SynchronizedProtoOutputStream, so we had a redundant thread per connection. This change makes ProtoBufOutputStream thread-safe and reduces the number of thread per connection as a side effect. --- .../bisq/network/p2p/network/Connection.java | 4 +- .../p2p/network/ProtoOutputStream.java | 41 ++++++++- .../SynchronizedProtoOutputStream.java | 84 ------------------- 3 files changed, 41 insertions(+), 88 deletions(-) delete mode 100644 p2p/src/main/java/bisq/network/p2p/network/SynchronizedProtoOutputStream.java diff --git a/p2p/src/main/java/bisq/network/p2p/network/Connection.java b/p2p/src/main/java/bisq/network/p2p/network/Connection.java index 5b8a44198b..da434fcd72 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/Connection.java +++ b/p2p/src/main/java/bisq/network/p2p/network/Connection.java @@ -142,7 +142,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { private final ConnectionStatistics connectionStatistics; // set in init - private SynchronizedProtoOutputStream protoOutputStream; + private ProtoOutputStream protoOutputStream; // mutable data, set from other threads but not changed internally. @Getter @@ -198,7 +198,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener { // When you construct an ObjectInputStream, in the constructor the class attempts to read a header that // the associated ObjectOutputStream on the other end of the connection has written. // It will not return until that header has been read. - protoOutputStream = new SynchronizedProtoOutputStream(socket.getOutputStream(), statistic); + protoOutputStream = new ProtoOutputStream(socket.getOutputStream(), statistic); protoInputStream = socket.getInputStream(); // We create a thread for handling inputStream data singleThreadExecutor.submit(this); diff --git a/p2p/src/main/java/bisq/network/p2p/network/ProtoOutputStream.java b/p2p/src/main/java/bisq/network/p2p/network/ProtoOutputStream.java index 9a15e0df19..b18ceed1f8 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/ProtoOutputStream.java +++ b/p2p/src/main/java/bisq/network/p2p/network/ProtoOutputStream.java @@ -24,37 +24,65 @@ import bisq.common.proto.network.NetworkEnvelope; import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.NotThreadSafe; +import javax.annotation.concurrent.ThreadSafe; -@NotThreadSafe +@ThreadSafe class ProtoOutputStream { private static final Logger log = LoggerFactory.getLogger(ProtoOutputStream.class); private final OutputStream outputStream; private final Statistic statistic; + private final AtomicBoolean isConnectionActive = new AtomicBoolean(true); + private final Lock lock = new ReentrantLock(); + ProtoOutputStream(OutputStream outputStream, Statistic statistic) { this.outputStream = outputStream; this.statistic = statistic; } void writeEnvelope(NetworkEnvelope envelope) { + lock.lock(); + try { writeEnvelopeOrThrow(envelope); } catch (IOException e) { + if (!isConnectionActive.get()) { + // Connection was closed by us. + return; + } + log.error("Failed to write envelope", e); throw new BisqRuntimeException("Failed to write envelope", e); + + } finally { + lock.unlock(); } } void onConnectionShutdown() { + isConnectionActive.set(false); + + boolean acquiredLock = tryToAcquireLock(); + if (!acquiredLock) { + return; + } + try { outputStream.close(); } catch (Throwable t) { log.error("Failed to close connection", t); + + } finally { + lock.unlock(); } } @@ -74,4 +102,13 @@ class ProtoOutputStream { statistic.updateLastActivityTimestamp(); } } + + private boolean tryToAcquireLock() { + long shutdownTimeout = Connection.getShutdownTimeout(); + try { + return lock.tryLock(shutdownTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + return false; + } + } } diff --git a/p2p/src/main/java/bisq/network/p2p/network/SynchronizedProtoOutputStream.java b/p2p/src/main/java/bisq/network/p2p/network/SynchronizedProtoOutputStream.java deleted file mode 100644 index 725367c163..0000000000 --- a/p2p/src/main/java/bisq/network/p2p/network/SynchronizedProtoOutputStream.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * This file is part of Bisq. - * - * Bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bisq. If not, see . - */ - -package bisq.network.p2p.network; - -import bisq.common.proto.network.NetworkEnvelope; -import bisq.common.util.Utilities; - -import com.google.common.util.concurrent.MoreExecutors; - -import java.io.OutputStream; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.concurrent.ThreadSafe; - -@ThreadSafe -class SynchronizedProtoOutputStream extends ProtoOutputStream { - private static final Logger log = LoggerFactory.getLogger(SynchronizedProtoOutputStream.class); - - private final ExecutorService executorService; - - SynchronizedProtoOutputStream(OutputStream delegate, Statistic statistic) { - super(delegate, statistic); - this.executorService = Utilities.getSingleThreadExecutor(this.getClass()); - } - - @Override - void writeEnvelope(NetworkEnvelope envelope) { - Future future = executorService.submit(() -> super.writeEnvelope(envelope)); - try { - future.get(); - } catch (InterruptedException e) { - Thread currentThread = Thread.currentThread(); - currentThread.interrupt(); - String msg = "Thread " + currentThread + " was interrupted. InterruptedException=" + e; - log.error(msg); - throw new BisqRuntimeException(msg, e); - } catch (ExecutionException e) { - String msg = "Failed to write envelope. ExecutionException " + e; - log.error(msg); - throw new BisqRuntimeException(msg, e); - } - } - - void onConnectionShutdown() { - try { - // ProtoOutputStream is not thread-safe that's why try to close the stream - // on the same thread first. - executorService.submit(super::onConnectionShutdown); - - //noinspection UnstableApiUsage - boolean terminatedSuccessfully = MoreExecutors.shutdownAndAwaitTermination( - executorService, Connection.getShutdownTimeout() * 2L, TimeUnit.MILLISECONDS); - - if (!terminatedSuccessfully) { - super.onConnectionShutdown(); - } - - } catch (Throwable t) { - log.error("Failed to handle connection shutdown. Throwable={}", t.toString()); - } - } -}