Merge pull request #4943 from chimp1984/add-tx-broadcast-to-mempool-explorer-api

Add tx broadcast to mempool explorer api
This commit is contained in:
sqrrm 2020-12-15 22:57:37 +01:00 committed by GitHub
commit bc60db39ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 316 additions and 32 deletions

View File

@ -27,6 +27,7 @@ import bisq.core.btc.nodes.LocalBitcoinNode;
import bisq.core.btc.setup.WalletsSetup;
import bisq.core.btc.wallet.BtcWalletService;
import bisq.core.btc.wallet.WalletsManager;
import bisq.core.btc.wallet.http.MemPoolSpaceTxBroadcaster;
import bisq.core.dao.governance.voteresult.VoteResultException;
import bisq.core.dao.state.unconfirmed.UnconfirmedBsqChangeOutputListService;
import bisq.core.locale.Res;
@ -41,6 +42,7 @@ import bisq.core.user.User;
import bisq.core.util.FormattingUtils;
import bisq.core.util.coin.CoinFormatter;
import bisq.network.Socks5ProxyProvider;
import bisq.network.p2p.P2PService;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
@ -210,7 +212,8 @@ public class BisqSetup {
TorSetup torSetup,
@Named(FormattingUtils.BTC_FORMATTER_KEY) CoinFormatter formatter,
LocalBitcoinNode localBitcoinNode,
AppStartupState appStartupState) {
AppStartupState appStartupState,
Socks5ProxyProvider socks5ProxyProvider) {
this.domainInitialisation = domainInitialisation;
this.p2PNetworkSetup = p2PNetworkSetup;
this.walletAppSetup = walletAppSetup;
@ -230,6 +233,8 @@ public class BisqSetup {
this.formatter = formatter;
this.localBitcoinNode = localBitcoinNode;
this.appStartupState = appStartupState;
MemPoolSpaceTxBroadcaster.init(socks5ProxyProvider, preferences);
}
///////////////////////////////////////////////////////////////////////////////////////////

View File

@ -24,6 +24,7 @@ import bisq.core.btc.exceptions.WalletException;
import bisq.core.btc.model.AddressEntry;
import bisq.core.btc.model.AddressEntryList;
import bisq.core.btc.setup.WalletsSetup;
import bisq.core.btc.wallet.http.MemPoolSpaceTxBroadcaster;
import bisq.core.provider.fee.FeeService;
import bisq.core.user.Preferences;
@ -957,6 +958,10 @@ public class BtcWalletService extends WalletService {
try {
sendResult = wallet.sendCoins(sendRequest);
printTx("FeeEstimationTransaction", newTransaction);
// For better redundancy in case the broadcast via BitcoinJ fails we also
// publish the tx via mempool nodes.
MemPoolSpaceTxBroadcaster.broadcastTx(sendResult.tx);
} catch (InsufficientMoneyException e2) {
errorMessageHandler.handleErrorMessage("We did not get the correct fee calculated. " + (e2.missing != null ? e2.missing.toFriendlyString() : ""));
}
@ -1139,7 +1144,11 @@ public class BtcWalletService extends WalletService {
if (memo != null) {
sendResult.tx.setMemo(memo);
}
printTx("sendFunds", sendResult.tx);
// For better redundancy in case the broadcast via BitcoinJ fails we also
// publish the tx via mempool nodes.
MemPoolSpaceTxBroadcaster.broadcastTx(sendResult.tx);
return sendResult.tx.getTxId().toString();
}
@ -1160,6 +1169,11 @@ public class BtcWalletService extends WalletService {
sendResult.tx.setMemo(memo);
}
printTx("sendFunds", sendResult.tx);
// For better redundancy in case the broadcast via BitcoinJ fails we also
// publish the tx via mempool nodes.
MemPoolSpaceTxBroadcaster.broadcastTx(sendResult.tx);
return sendResult.tx;
}

View File

@ -19,6 +19,7 @@ package bisq.core.btc.wallet;
import bisq.core.btc.exceptions.TxBroadcastException;
import bisq.core.btc.exceptions.TxBroadcastTimeoutException;
import bisq.core.btc.wallet.http.MemPoolSpaceTxBroadcaster;
import bisq.common.Timer;
import bisq.common.UserThread;
@ -135,6 +136,10 @@ public class TxBroadcaster {
"the peerGroup.broadcastTransaction callback.", throwable)));
}
}, MoreExecutors.directExecutor());
// For better redundancy in case the broadcast via BitcoinJ fails we also
// publish the tx via mempool nodes.
MemPoolSpaceTxBroadcaster.broadcastTx(tx);
}
private static void stopAndRemoveTimer(String txId) {

View File

@ -23,6 +23,7 @@ import bisq.core.btc.listeners.AddressConfidenceListener;
import bisq.core.btc.listeners.BalanceListener;
import bisq.core.btc.listeners.TxConfidenceListener;
import bisq.core.btc.setup.WalletsSetup;
import bisq.core.btc.wallet.http.MemPoolSpaceTxBroadcaster;
import bisq.core.provider.fee.FeeService;
import bisq.core.user.Preferences;
@ -535,7 +536,12 @@ public abstract class WalletService {
sendRequest.aesKey = aesKey;
Wallet.SendResult sendResult = wallet.sendCoins(sendRequest);
printTx("empty btc wallet", sendResult.tx);
Futures.addCallback(sendResult.broadcastComplete, new FutureCallback<Transaction>() {
// For better redundancy in case the broadcast via BitcoinJ fails we also
// publish the tx via mempool nodes.
MemPoolSpaceTxBroadcaster.broadcastTx(sendResult.tx);
Futures.addCallback(sendResult.broadcastComplete, new FutureCallback<>() {
@Override
public void onSuccess(Transaction result) {
log.info("emptyBtcWallet onSuccess Transaction=" + result);

View File

@ -0,0 +1,146 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.core.btc.wallet.http;
import bisq.core.user.Preferences;
import bisq.network.Socks5ProxyProvider;
import bisq.network.http.HttpException;
import bisq.common.app.Version;
import bisq.common.config.Config;
import bisq.common.util.Utilities;
import org.bitcoinj.core.Transaction;
import org.bitcoinj.core.Utils;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static com.google.common.base.Preconditions.checkNotNull;
@Slf4j
public class MemPoolSpaceTxBroadcaster {
private static Socks5ProxyProvider socks5ProxyProvider;
private static Preferences preferences;
private static final ListeningExecutorService executorService = Utilities.getListeningExecutorService(
"MemPoolSpaceTxBroadcaster", 3, 5, 10 * 60);
public static void init(Socks5ProxyProvider socks5ProxyProvider,
Preferences preferences) {
MemPoolSpaceTxBroadcaster.socks5ProxyProvider = socks5ProxyProvider;
MemPoolSpaceTxBroadcaster.preferences = preferences;
}
public static void broadcastTx(Transaction tx) {
if (!Config.baseCurrencyNetwork().isMainnet()) {
log.info("MemPoolSpaceTxBroadcaster only supports mainnet");
return;
}
if (socks5ProxyProvider == null) {
log.warn("We got broadcastTx called before init was called.");
return;
}
String txIdToSend = tx.getTxId().toString();
String rawTx = Utils.HEX.encode(tx.bitcoinSerialize(true));
List<String> txBroadcastServices = new ArrayList<>(preferences.getDefaultTxBroadcastServices());
// Broadcast to first service
String serviceAddress = broadcastTx(txIdToSend, rawTx, txBroadcastServices);
if (serviceAddress != null) {
// Broadcast to second service
txBroadcastServices.remove(serviceAddress);
broadcastTx(txIdToSend, rawTx, txBroadcastServices);
}
}
@Nullable
private static String broadcastTx(String txIdToSend, String rawTx, List<String> txBroadcastServices) {
String serviceAddress = getRandomServiceAddress(txBroadcastServices);
if (serviceAddress == null) {
log.warn("We don't have a serviceAddress available. txBroadcastServices={}", txBroadcastServices);
return null;
}
broadcastTx(serviceAddress, txIdToSend, rawTx);
return serviceAddress;
}
private static void broadcastTx(String serviceAddress, String txIdToSend, String rawTx) {
TxBroadcastHttpClient httpClient = new TxBroadcastHttpClient(socks5ProxyProvider);
httpClient.setBaseUrl(serviceAddress);
httpClient.setIgnoreSocks5Proxy(false);
log.info("We broadcast rawTx {} to {}", rawTx, serviceAddress);
ListenableFuture<String> future = executorService.submit(() -> {
Thread.currentThread().setName("MemPoolSpaceTxBroadcaster @ " + serviceAddress);
return httpClient.post(rawTx, "User-Agent", "bisq/" + Version.VERSION);
});
Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(String txId) {
if (txId.equals(txIdToSend)) {
log.info("Broadcast of raw tx with txId {} to {} was successful. rawTx={}",
txId, serviceAddress, rawTx);
} else {
log.error("The txId we got returned from the service does not match " +
"out tx of the sending tx. txId={}; txIdToSend={}",
txId, txIdToSend);
}
}
public void onFailure(@NotNull Throwable throwable) {
Throwable cause = throwable.getCause();
if (cause instanceof HttpException) {
int responseCode = ((HttpException) cause).getResponseCode();
String message = cause.getMessage();
// See all error codes at: https://github.com/bitcoin/bitcoin/blob/master/src/rpc/protocol.h
if (responseCode == 400 && message.contains("code\":-27")) {
log.info("Broadcast of raw tx to {} failed as transaction {} is already confirmed",
serviceAddress, txIdToSend);
} else {
log.info("Broadcast of raw tx to {} failed for transaction {}. responseCode={}, error={}",
serviceAddress, txIdToSend, responseCode, message);
}
} else {
log.warn("Broadcast of raw tx with txId {} to {} failed. Error={}",
txIdToSend, serviceAddress, throwable.toString());
}
}
}, MoreExecutors.directExecutor());
}
@Nullable
private static String getRandomServiceAddress(List<String> txBroadcastServices) {
List<String> list = checkNotNull(txBroadcastServices);
return !list.isEmpty() ? list.get(new Random().nextInt(list.size())) : null;
}
}

View File

@ -0,0 +1,32 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package bisq.core.btc.wallet.http;
import bisq.core.trade.txproof.AssetTxProofHttpClient;
import bisq.network.Socks5ProxyProvider;
import bisq.network.http.HttpClientImpl;
import lombok.extern.slf4j.Slf4j;
@Slf4j
class TxBroadcastHttpClient extends HttpClientImpl implements AssetTxProofHttpClient {
TxBroadcastHttpClient(Socks5ProxyProvider socks5ProxyProvider) {
super(socks5ProxyProvider);
}
}

View File

@ -131,6 +131,19 @@ public final class Preferences implements PersistedDataHost, BridgeAddressProvid
"devinxmrwu4jrfq2zmq5kqjpxb44hx7i7didebkwrtvmvygj4uuop2ad.onion" // @devinbileck
));
private static final ArrayList<String> TX_BROADCAST_SERVICES_CLEAR_NET = new ArrayList<>(Arrays.asList(
"https://mempool.space/api/tx", // @wiz
"https://mempool.emzy.de/api/tx", // @emzy
"https://mempool.bisq.services/api/tx" // @devinbileck
));
private static final ArrayList<String> TX_BROADCAST_SERVICES = new ArrayList<>(Arrays.asList(
"http://mempoolhqx4isw62xs7abwphsq7ldayuidyx2v2oethdhhj6mlo2r6ad.onion/api/tx", // @wiz
"http://mempool4t6mypeemozyterviq3i5de4kpoua65r3qkn5i3kknu5l2cad.onion/api/tx", // @emzy
"http://mempoolusb2f67qi7mz2it7n5e77a6komdzx6wftobcduxszkdfun2yd.onion/api/tx" // @devinbileck
));
public static final boolean USE_SYMMETRIC_SECURITY_DEPOSIT = true;
@ -912,6 +925,14 @@ public final class Preferences implements PersistedDataHost, BridgeAddressProvid
}
}
public List<String> getDefaultTxBroadcastServices() {
if (config.useLocalhostForP2P) {
return TX_BROADCAST_SERVICES_CLEAR_NET;
} else {
return TX_BROADCAST_SERVICES;
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private

View File

@ -22,6 +22,7 @@ import bisq.network.Socks5ProxyProvider;
import bisq.common.app.Version;
import bisq.common.util.Utilities;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
@ -30,6 +31,7 @@ import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
@ -43,10 +45,13 @@ import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -145,9 +150,9 @@ public class HttpClientImpl implements HttpClient {
@Nullable String headerKey,
@Nullable String headerValue) throws IOException {
long ts = System.currentTimeMillis();
String spec = baseUrl + param;
log.info("requestWithoutProxy: URL={}, httpMethod={}", spec, httpMethod);
log.info("requestWithoutProxy: URL={}, param={}, httpMethod={}", baseUrl, param, httpMethod);
try {
String spec = httpMethod == HttpMethod.GET ? baseUrl + param : baseUrl;
URL url = new URL(spec);
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod(httpMethod.name());
@ -158,23 +163,46 @@ public class HttpClientImpl implements HttpClient {
connection.setRequestProperty(headerKey, headerValue);
}
if (connection.getResponseCode() == 200) {
if (httpMethod == HttpMethod.POST) {
connection.setDoOutput(true);
connection.getOutputStream().write(param.getBytes(StandardCharsets.UTF_8));
}
int responseCode = connection.getResponseCode();
if (responseCode == 200) {
String response = convertInputStreamToString(connection.getInputStream());
log.info("Response for {} took {} ms. Data size:{}, response: {}",
spec,
log.info("Response from {} with param {} took {} ms. Data size:{}, response: {}",
baseUrl,
param,
System.currentTimeMillis() - ts,
Utilities.readableFileSize(response.getBytes().length),
Utilities.toTruncatedString(response));
return response;
} else {
String error = convertInputStreamToString(connection.getErrorStream());
connection.getErrorStream().close();
throw new HttpException(error);
InputStream errorStream = connection.getErrorStream();
if (errorStream != null) {
String error = convertInputStreamToString(errorStream);
errorStream.close();
log.info("Received errorMsg '{}' with responseCode {} from {}. Response took: {} ms. param: {}",
error,
responseCode,
baseUrl,
System.currentTimeMillis() - ts,
param);
throw new HttpException(error, responseCode);
} else {
log.info("Response with responseCode {} from {}. Response took: {} ms. param: {}",
responseCode,
baseUrl,
System.currentTimeMillis() - ts,
param);
throw new HttpException("Request failed", responseCode);
}
}
} catch (Throwable t) {
String message = "Error at requestWithoutProxy with URL: " + spec + ". Throwable=" + t.getMessage();
log.error(message);
throw new IOException(message);
String message = "Error at requestWithoutProxy with url " + baseUrl + " and param " + param +
". Throwable=" + t.getMessage();
throw new IOException(message, t);
} finally {
try {
if (connection != null) {
@ -195,8 +223,7 @@ public class HttpClientImpl implements HttpClient {
@Nullable String headerKey,
@Nullable String headerValue) throws IOException {
long ts = System.currentTimeMillis();
String uri = baseUrl + param;
log.info("requestWithoutProxy: uri={}, httpMethod={}", uri, httpMethod);
log.info("requestWithoutProxy: baseUrl={}, param={}, httpMethod={}", baseUrl, param, httpMethod);
// This code is adapted from:
// http://stackoverflow.com/a/25203021/5616248
@ -212,7 +239,7 @@ public class HttpClientImpl implements HttpClient {
new PoolingHttpClientConnectionManager(reg) :
new PoolingHttpClientConnectionManager(reg, new FakeDnsResolver());
try {
closeableHttpClient = HttpClients.custom().setConnectionManager(cm).build();
closeableHttpClient = checkNotNull(HttpClients.custom().setConnectionManager(cm).build());
InetSocketAddress socksAddress = new InetSocketAddress(socks5Proxy.getInetAddress(), socks5Proxy.getPort());
// remove me: Use this to test with system-wide Tor proxy, or change port for another proxy.
@ -221,23 +248,36 @@ public class HttpClientImpl implements HttpClient {
HttpClientContext context = HttpClientContext.create();
context.setAttribute("socks.address", socksAddress);
HttpUriRequest request = getHttpUriRequest(httpMethod, uri);
if (headerKey != null && headerValue != null)
HttpUriRequest request = getHttpUriRequest(httpMethod, baseUrl, param);
if (headerKey != null && headerValue != null) {
request.setHeader(headerKey, headerValue);
}
try (CloseableHttpResponse httpResponse = checkNotNull(closeableHttpClient).execute(request, context)) {
try (CloseableHttpResponse httpResponse = closeableHttpClient.execute(request, context)) {
String response = convertInputStreamToString(httpResponse.getEntity().getContent());
log.info("Response for {} took {} ms. Data size:{}, response: {}",
uri,
System.currentTimeMillis() - ts,
Utilities.readableFileSize(response.getBytes().length),
Utilities.toTruncatedString(response));
return response;
int statusCode = httpResponse.getStatusLine().getStatusCode();
if (statusCode == 200) {
log.info("Response from {} took {} ms. Data size:{}, response: {}, param: {}",
baseUrl,
System.currentTimeMillis() - ts,
Utilities.readableFileSize(response.getBytes().length),
Utilities.toTruncatedString(response),
param);
return response;
} else {
log.info("Received errorMsg '{}' with statusCode {} from {}. Response took: {} ms. param: {}",
response,
statusCode,
baseUrl,
System.currentTimeMillis() - ts,
param);
throw new HttpException(response, statusCode);
}
}
} catch (Throwable t) {
String message = "Error at doRequestWithProxy with URL: " + uri + ". Throwable=" + t.getMessage();
log.error(message);
throw new IOException(message);
String message = "Error at doRequestWithProxy with url " + baseUrl + " and param " + param +
". Throwable=" + t.getMessage();
throw new IOException(message, t);
} finally {
if (closeableHttpClient != null) {
closeableHttpClient.close();
@ -247,12 +287,17 @@ public class HttpClientImpl implements HttpClient {
}
}
private HttpUriRequest getHttpUriRequest(HttpMethod httpMethod, String uri) {
private HttpUriRequest getHttpUriRequest(HttpMethod httpMethod, String baseUrl, String param)
throws UnsupportedEncodingException {
switch (httpMethod) {
case GET:
return new HttpGet(uri);
return new HttpGet(baseUrl + param);
case POST:
return new HttpPost(uri);
HttpPost httpPost = new HttpPost(baseUrl);
HttpEntity httpEntity = new StringEntity(param);
httpPost.setEntity(httpEntity);
return httpPost;
default:
throw new IllegalArgumentException("HttpMethod not supported: " + httpMethod);
}

View File

@ -17,8 +17,18 @@
package bisq.network.http;
import lombok.Getter;
public class HttpException extends Exception {
@Getter
private int responseCode;
public HttpException(String message) {
super(message);
}
public HttpException(String message, int responseCode) {
super(message);
this.responseCode = responseCode;
}
}