Fix performance issue in BsqWalletService

The updateBsqWalletTransactions method got called at each block for all
transactions. During block download that wasted a lot of cpu and
led to stuck UI thread and lost connections.
The updateBsqBalance is not cheap (a few ms) and called for 100s of txs
at each block was very problematic.
Furthermore the listeners on the walletTransactions observableList got
triggered which made the situation worse.

We changed the observableList to a ArrayList and use a listener which
gets called after the list is updated.
We also make sure the onTransactionConfidenceChanged listener is not
calling updateBsqWalletTransactions if bsq parsing is not complete and
if the depth of the tx is > 1.
In the updateBsqWalletTransactions method we use a flag and a delay
to ensure that the updateBsqBalance is not called more then once
in 100 ms.

We changed also the getter to return a cloned version of the list to
avoid potential concurrent modification exceptions at clients.

Closes #3175
This commit is contained in:
chimp1984 2019-09-01 22:47:10 +02:00
parent 0a12676946
commit 6dafafd7b1
No known key found for this signature in database
GPG key ID: 9801B4EC591F90E3
5 changed files with 94 additions and 30 deletions

View file

@ -35,6 +35,8 @@ import bisq.core.dao.state.unconfirmed.UnconfirmedBsqChangeOutputListService;
import bisq.core.provider.fee.FeeService;
import bisq.core.user.Preferences;
import bisq.common.UserThread;
import org.bitcoinj.core.Address;
import org.bitcoinj.core.AddressFormatException;
import org.bitcoinj.core.BlockChain;
@ -57,15 +59,14 @@ import org.bitcoinj.wallet.listeners.AbstractWalletEventListener;
import javax.inject.Inject;
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -80,12 +81,20 @@ import static org.bitcoinj.core.TransactionConfidence.ConfidenceType.PENDING;
@Slf4j
public class BsqWalletService extends WalletService implements DaoStateListener {
public interface WalletTransactionsChangeListener {
void onWalletTransactionsChange();
}
private final BsqCoinSelector bsqCoinSelector;
private final NonBsqCoinSelector nonBsqCoinSelector;
private final DaoStateService daoStateService;
private final UnconfirmedBsqChangeOutputListService unconfirmedBsqChangeOutputListService;
private final ObservableList<Transaction> walletTransactions = FXCollections.observableArrayList();
private final List<Transaction> walletTransactions = new ArrayList<>();
private final CopyOnWriteArraySet<BsqBalanceListener> bsqBalanceListeners = new CopyOnWriteArraySet<>();
private final List<WalletTransactionsChangeListener> walletTransactionsChangeListeners = new ArrayList<>();
private boolean updateBsqWalletTransactionsPending;
// balance of non BSQ satoshis
@Getter
@ -152,7 +161,13 @@ public class BsqWalletService extends WalletService implements DaoStateListener
@Override
public void onTransactionConfidenceChanged(Wallet wallet, Transaction tx) {
updateBsqWalletTransactions();
// We are only interested in updates from unconfirmed txs and confirmed txs at the
// time when it gets into a block. Otherwise we would get called
// updateBsqWalletTransactions for each tx as the block depth changes for all.
if (tx.getConfidence().getDepthInBlocks() <= 1 &&
daoStateService.isParseBlockChainComplete()) {
updateBsqWalletTransactions();
}
unconfirmedBsqChangeOutputListService.onTransactionConfidenceChanged(tx);
}
@ -215,6 +230,7 @@ public class BsqWalletService extends WalletService implements DaoStateListener
///////////////////////////////////////////////////////////////////////////////////////////
private void updateBsqBalance() {
long ts = System.currentTimeMillis();
unverifiedBalance = Coin.valueOf(
getTransactions(false).stream()
.filter(tx -> tx.getConfidence().getConfidenceType() == PENDING)
@ -246,7 +262,7 @@ public class BsqWalletService extends WalletService implements DaoStateListener
}
return false;
})
.mapToLong(in -> in != null ? in.getValue().value : 0)
.mapToLong(in -> in.getValue() != null ? in.getValue().value : 0)
.sum();
return outputs - lockedInputs;
})
@ -289,6 +305,7 @@ public class BsqWalletService extends WalletService implements DaoStateListener
bsqBalanceListeners.forEach(e -> e.onUpdateBalances(availableConfirmedBalance, availableNonBsqBalance, unverifiedBalance,
unconfirmedChangeBalance, lockedForVotingBalance, lockupBondsBalance, unlockingBondsBalance));
log.info("updateBsqBalance took {} ms", System.currentTimeMillis() - ts);
}
public void addBsqBalanceListener(BsqBalanceListener listener) {
@ -299,13 +316,21 @@ public class BsqWalletService extends WalletService implements DaoStateListener
bsqBalanceListeners.remove(listener);
}
public void addWalletTransactionsChangeListener(WalletTransactionsChangeListener listener) {
walletTransactionsChangeListeners.add(listener);
}
public void removeWalletTransactionsChangeListener(WalletTransactionsChangeListener listener) {
walletTransactionsChangeListeners.remove(listener);
}
///////////////////////////////////////////////////////////////////////////////////////////
// BSQ TransactionOutputs and Transactions
///////////////////////////////////////////////////////////////////////////////////////////
public ObservableList<Transaction> getWalletTransactions() {
return walletTransactions;
public List<Transaction> getClonedWalletTransactions() {
return new ArrayList<>(walletTransactions);
}
public Stream<Transaction> getPendingWalletTransactionsStream() {
@ -314,9 +339,21 @@ public class BsqWalletService extends WalletService implements DaoStateListener
}
private void updateBsqWalletTransactions() {
walletTransactions.setAll(getTransactions(false));
if (daoStateService.isParseBlockChainComplete()) {
updateBsqBalance();
// We get called updateBsqWalletTransactions multiple times from onWalletChanged, onTransactionConfidenceChanged
// and from onParseBlockCompleteAfterBatchProcessing. But as updateBsqBalance is an expensive operation we do
// not want to call it in a short interval series so we use a flag and a delay to not call it multiple times
// in a 100 ms period.
if (!updateBsqWalletTransactionsPending) {
updateBsqWalletTransactionsPending = true;
UserThread.runAfter(() -> {
walletTransactions.clear();
walletTransactions.addAll(getTransactions(false));
walletTransactionsChangeListeners.forEach(WalletTransactionsChangeListener::onWalletTransactionsChange);
updateBsqBalance();
updateBsqWalletTransactionsPending = false;
}, 100, TimeUnit.MILLISECONDS);
}
}
}
@ -434,7 +471,7 @@ public class BsqWalletService extends WalletService implements DaoStateListener
}
public Optional<Transaction> isWalletTransaction(String txId) {
return getWalletTransactions().stream().filter(e -> e.getHashAsString().equals(txId)).findAny();
return walletTransactions.stream().filter(e -> e.getHashAsString().equals(txId)).findAny();
}
@ -553,7 +590,10 @@ public class BsqWalletService extends WalletService implements DaoStateListener
return tx;
}
private void addInputsAndChangeOutputForTx(Transaction tx, Coin fee, BsqCoinSelector bsqCoinSelector, boolean requireChangeOutput)
private void addInputsAndChangeOutputForTx(Transaction tx,
Coin fee,
BsqCoinSelector bsqCoinSelector,
boolean requireChangeOutput)
throws InsufficientBsqException {
Coin requiredInput;
// If our fee is less then dust limit we increase it so we are sure to not get any dust output.

View file

@ -35,7 +35,6 @@ import org.bitcoinj.core.TransactionOutput;
import javax.inject.Inject;
import javafx.collections.FXCollections;
import javafx.collections.ListChangeListener;
import javafx.collections.ObservableList;
import java.util.Arrays;
@ -55,7 +54,8 @@ import lombok.extern.slf4j.Slf4j;
* unconfirmed txs.
*/
@Slf4j
public abstract class BondRepository<T extends Bond, R extends BondedAsset> implements DaoSetupService {
public abstract class BondRepository<T extends Bond, R extends BondedAsset> implements DaoSetupService,
BsqWalletService.WalletTransactionsChangeListener {
///////////////////////////////////////////////////////////////////////////////////////////
// Static
@ -161,7 +161,7 @@ public abstract class BondRepository<T extends Bond, R extends BondedAsset> impl
update();
}
});
bsqWalletService.getWalletTransactions().addListener((ListChangeListener<Transaction>) c -> update());
bsqWalletService.addWalletTransactionsChangeListener(this);
}
@Override
@ -170,6 +170,16 @@ public abstract class BondRepository<T extends Bond, R extends BondedAsset> impl
}
///////////////////////////////////////////////////////////////////////////////////////////
// BsqWalletService.WalletTransactionsChangeListener
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onWalletTransactionsChange() {
update();
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
@ -195,6 +205,7 @@ public abstract class BondRepository<T extends Bond, R extends BondedAsset> impl
abstract protected Stream<R> getBondedAssetStream();
protected void update() {
log.debug("update");
getBondedAssetStream().forEach(bondedAsset -> {
String uid = bondedAsset.getUid();
bondByUidMap.putIfAbsent(uid, createBond(bondedAsset));

View file

@ -26,12 +26,9 @@ import bisq.core.dao.state.DaoStateListener;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.blockchain.Block;
import org.bitcoinj.core.Transaction;
import javax.inject.Inject;
import javafx.collections.FXCollections;
import javafx.collections.ListChangeListener;
import javafx.collections.ObservableList;
import java.util.Arrays;
@ -50,7 +47,7 @@ import lombok.extern.slf4j.Slf4j;
* unconfirmed txs.
*/
@Slf4j
public class MyBondedReputationRepository implements DaoSetupService {
public class MyBondedReputationRepository implements DaoSetupService, BsqWalletService.WalletTransactionsChangeListener {
private final DaoStateService daoStateService;
private final BsqWalletService bsqWalletService;
private final MyReputationListService myReputationListService;
@ -84,7 +81,7 @@ public class MyBondedReputationRepository implements DaoSetupService {
update();
}
});
bsqWalletService.getWalletTransactions().addListener((ListChangeListener<Transaction>) c -> update());
bsqWalletService.addWalletTransactionsChangeListener(this);
}
@Override
@ -92,11 +89,22 @@ public class MyBondedReputationRepository implements DaoSetupService {
}
///////////////////////////////////////////////////////////////////////////////////////////
// BsqWalletService.WalletTransactionsChangeListener
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onWalletTransactionsChange() {
update();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void update() {
log.debug("update");
// It can be that the same salt/hash is in several lockupTxs, so we use the bondByLockupTxIdMap to eliminate
// duplicates by the collection algorithm.
Map<String, MyBondedReputation> bondByLockupTxIdMap = new HashMap<>();

View file

@ -60,7 +60,7 @@ public class BondedRolesRepository extends BondRepository<BondedRole, Role> {
///////////////////////////////////////////////////////////////////////////////////////////
public boolean isMyRole(Role role) {
Set<String> myWalletTransactionIds = bsqWalletService.getWalletTransactions().stream()
Set<String> myWalletTransactionIds = bsqWalletService.getClonedWalletTransactions().stream()
.map(Transaction::getHashAsString)
.collect(Collectors.toSet());
return getAcceptedBondedRoleProposalStream()

View file

@ -71,13 +71,11 @@ import javafx.beans.property.ReadOnlyObjectWrapper;
import javafx.beans.value.ChangeListener;
import javafx.collections.FXCollections;
import javafx.collections.ListChangeListener;
import javafx.collections.ObservableList;
import javafx.collections.transformation.SortedList;
import javafx.util.Callback;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
@ -85,7 +83,8 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@FxmlView
public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBalanceListener, DaoStateListener {
public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBalanceListener, DaoStateListener,
BsqWalletService.WalletTransactionsChangeListener {
private TableView<BsqTxListItem> tableView;
@ -100,7 +99,6 @@ public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBal
private final ObservableList<BsqTxListItem> observableList = FXCollections.observableArrayList();
// Need to be DoubleProperty as we pass it as reference
private final SortedList<BsqTxListItem> sortedList = new SortedList<>(observableList);
private ListChangeListener<Transaction> walletBsqTransactionsListener;
private int gridRow = 0;
private Label chainHeightLabel;
private ProgressBar chainSyncIndicator;
@ -173,7 +171,6 @@ public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBal
VBox.setVgrow(tableView, Priority.ALWAYS);
root.getChildren().add(vBox);
walletBsqTransactionsListener = change -> updateList();
walletChainHeightListener = (observable, oldValue, newValue) -> {
walletChainHeight = bsqWalletService.getBestChainHeight();
onUpdateAnyChainHeight();
@ -183,7 +180,7 @@ public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBal
@Override
protected void activate() {
bsqBalanceUtil.activate();
bsqWalletService.getWalletTransactions().addListener(walletBsqTransactionsListener);
bsqWalletService.addWalletTransactionsChangeListener(this);
bsqWalletService.addBsqBalanceListener(this);
btcWalletService.getChainHeightProperty().addListener(walletChainHeightListener);
@ -207,7 +204,7 @@ public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBal
protected void deactivate() {
bsqBalanceUtil.deactivate();
sortedList.comparatorProperty().unbind();
bsqWalletService.getWalletTransactions().removeListener(walletBsqTransactionsListener);
bsqWalletService.removeWalletTransactionsChangeListener(this);
bsqWalletService.removeBsqBalanceListener(this);
btcWalletService.getChainHeightProperty().removeListener(walletChainHeightListener);
daoFacade.removeBsqStateListener(this);
@ -254,6 +251,15 @@ public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBal
}
}
///////////////////////////////////////////////////////////////////////////////////////////
// BsqWalletService.WalletTransactionsChangeListener
///////////////////////////////////////////////////////////////////////////////////////////
@Override
public void onWalletTransactionsChange() {
updateList();
}
///////////////////////////////////////////////////////////////////////////////////////////
// Private
@ -299,8 +305,7 @@ public class BsqTxView extends ActivatableView<GridPane, Void> implements BsqBal
private void updateList() {
observableList.forEach(BsqTxListItem::cleanup);
// copy list to avoid ConcurrentModificationException
final List<Transaction> walletTransactions = new ArrayList<>(bsqWalletService.getWalletTransactions());
List<Transaction> walletTransactions = bsqWalletService.getClonedWalletTransactions();
List<BsqTxListItem> items = walletTransactions.stream()
.map(transaction -> {
return new BsqTxListItem(transaction,