Merge pull request #6547 from alvasw/make_proto_output_stream_thread_safe

Make ProtoOutputStream thread-safe
This commit is contained in:
Alejandro García 2023-01-31 19:09:24 +00:00 committed by GitHub
commit 9b61966e07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 88 deletions

View File

@ -142,7 +142,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
private final ConnectionStatistics connectionStatistics;
// set in init
private SynchronizedProtoOutputStream protoOutputStream;
private ProtoOutputStream protoOutputStream;
// mutable data, set from other threads but not changed internally.
@Getter
@ -198,7 +198,7 @@ public class Connection implements HasCapabilities, Runnable, MessageListener {
// When you construct an ObjectInputStream, in the constructor the class attempts to read a header that
// the associated ObjectOutputStream on the other end of the connection has written.
// It will not return until that header has been read.
protoOutputStream = new SynchronizedProtoOutputStream(socket.getOutputStream(), statistic);
protoOutputStream = new ProtoOutputStream(socket.getOutputStream(), statistic);
protoInputStream = socket.getInputStream();
// We create a thread for handling inputStream data
singleThreadExecutor.submit(this);

View File

@ -24,37 +24,65 @@ import bisq.common.proto.network.NetworkEnvelope;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe;
@NotThreadSafe
@ThreadSafe
class ProtoOutputStream {
private static final Logger log = LoggerFactory.getLogger(ProtoOutputStream.class);
private final OutputStream outputStream;
private final Statistic statistic;
private final AtomicBoolean isConnectionActive = new AtomicBoolean(true);
private final Lock lock = new ReentrantLock();
ProtoOutputStream(OutputStream outputStream, Statistic statistic) {
this.outputStream = outputStream;
this.statistic = statistic;
}
void writeEnvelope(NetworkEnvelope envelope) {
lock.lock();
try {
writeEnvelopeOrThrow(envelope);
} catch (IOException e) {
if (!isConnectionActive.get()) {
// Connection was closed by us.
return;
}
log.error("Failed to write envelope", e);
throw new BisqRuntimeException("Failed to write envelope", e);
} finally {
lock.unlock();
}
}
void onConnectionShutdown() {
isConnectionActive.set(false);
boolean acquiredLock = tryToAcquireLock();
if (!acquiredLock) {
return;
}
try {
outputStream.close();
} catch (Throwable t) {
log.error("Failed to close connection", t);
} finally {
lock.unlock();
}
}
@ -74,4 +102,13 @@ class ProtoOutputStream {
statistic.updateLastActivityTimestamp();
}
}
private boolean tryToAcquireLock() {
long shutdownTimeout = Connection.getShutdownTimeout();
try {
return lock.tryLock(shutdownTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return false;
}
}
}

View File

@ -1,84 +0,0 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.network.p2p.network;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.util.Utilities;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.OutputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
class SynchronizedProtoOutputStream extends ProtoOutputStream {
private static final Logger log = LoggerFactory.getLogger(SynchronizedProtoOutputStream.class);
private final ExecutorService executorService;
SynchronizedProtoOutputStream(OutputStream delegate, Statistic statistic) {
super(delegate, statistic);
this.executorService = Utilities.getSingleThreadExecutor(this.getClass());
}
@Override
void writeEnvelope(NetworkEnvelope envelope) {
Future<?> future = executorService.submit(() -> super.writeEnvelope(envelope));
try {
future.get();
} catch (InterruptedException e) {
Thread currentThread = Thread.currentThread();
currentThread.interrupt();
String msg = "Thread " + currentThread + " was interrupted. InterruptedException=" + e;
log.error(msg);
throw new BisqRuntimeException(msg, e);
} catch (ExecutionException e) {
String msg = "Failed to write envelope. ExecutionException " + e;
log.error(msg);
throw new BisqRuntimeException(msg, e);
}
}
void onConnectionShutdown() {
try {
// ProtoOutputStream is not thread-safe that's why try to close the stream
// on the same thread first.
executorService.submit(super::onConnectionShutdown);
//noinspection UnstableApiUsage
boolean terminatedSuccessfully = MoreExecutors.shutdownAndAwaitTermination(
executorService, Connection.getShutdownTimeout() * 2L, TimeUnit.MILLISECONDS);
if (!terminatedSuccessfully) {
super.onConnectionShutdown();
}
} catch (Throwable t) {
log.error("Failed to handle connection shutdown. Throwable={}", t.toString());
}
}
}