Add params to newCachedThreadPool method.

Use executor at httpClient builder.
Use httpClient.sendAsync.
Add keep-alive header.
Add RejectedExecutionHandler.

Signed-off-by: HenrikJannsen <boilingfrog@gmx.com>
This commit is contained in:
HenrikJannsen 2022-12-11 17:06:01 -05:00
parent 21c6c52ec8
commit 6addd27a33
No known key found for this signature in database
GPG Key ID: 02AA2BAE387C8307
2 changed files with 32 additions and 28 deletions

View File

@ -63,6 +63,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@ -136,10 +137,16 @@ public class Utilities {
return executor;
}
public static ExecutorService newCachedThreadPool(int maximumPoolSize) {
return new ThreadPoolExecutor(0, maximumPoolSize,
60, TimeUnit.SECONDS,
new SynchronousQueue<>());
public static ExecutorService newCachedThreadPool(int maximumPoolSize,
long keepAliveTime,
TimeUnit timeUnit,
RejectedExecutionHandler rejectedExecutionHandler) {
return new ThreadPoolExecutor(0,
maximumPoolSize,
keepAliveTime,
timeUnit,
new SynchronousQueue<>(),
rejectedExecutionHandler);
}
@SuppressWarnings("SameParameterValue")

View File

@ -54,12 +54,9 @@ import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@ -122,8 +119,11 @@ public class SeedNodeReportingService {
this.maxConnections = maxConnections;
this.seedNodeReportingServerUrl = seedNodeReportingServerUrl;
executor = Utilities.newCachedThreadPool(5);
httpClient = HttpClient.newHttpClient();
executor = Utilities.newCachedThreadPool(5,
8,
TimeUnit.MINUTES,
(runnable, executor) -> log.error("Execution was rejected. We skip the {} task.", runnable.toString()));
httpClient = HttpClient.newBuilder().executor(executor).build();
heartBeatTimer = UserThread.runPeriodically(this::sendHeartBeat, HEART_BEAT_DELAY_SEC);
@ -246,27 +246,24 @@ public class SeedNodeReportingService {
private void sendReportingItems(ReportingItems reportingItems) {
try {
CompletableFuture.runAsync(() -> {
log.info("Send report to monitor server: {}", reportingItems.toString());
// We send the data as hex encoded protobuf data. We do not use the envelope as it is not part of the p2p system.
byte[] protoMessageAsBytes = reportingItems.toProtoMessageAsBytes();
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(seedNodeReportingServerUrl))
.POST(HttpRequest.BodyPublishers.ofByteArray(protoMessageAsBytes))
.header("User-Agent", getMyAddress())
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
log.error("Response error message: {}", response);
}
} catch (IOException e) {
log.warn("IOException at sending reporting. {}", e.getMessage());
} catch (InterruptedException e) {
throw new RuntimeException(e);
log.info("Send report to monitor server: {}", reportingItems.toString());
// We send the data as hex encoded protobuf data. We do not use the envelope as it is not part of the p2p system.
byte[] protoMessageAsBytes = reportingItems.toProtoMessageAsBytes();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(seedNodeReportingServerUrl))
.POST(HttpRequest.BodyPublishers.ofByteArray(protoMessageAsBytes))
.header("User-Agent", getMyAddress())
.header("Connection", "keep-alive")
.build();
httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()).whenComplete((response, throwable) -> {
if (throwable != null) {
log.warn("Exception at sending reporting data. {}", throwable.getMessage());
} else if (response.statusCode() != 200) {
log.error("Response error message: {}", response);
}
}, executor);
});
} catch (Throwable t) {
// RejectedExecutionException is thrown if we exceed our pool size.
log.error("Did not send reportingItems {} because of exception {}", reportingItems, t.toString());
}
}