Refactored protobuffer AddressEntryList and PeerList

This commit is contained in:
Manfred Karrer 2017-05-13 21:21:40 +02:00
parent f8be0f2c2e
commit 07cd0b5f00
11 changed files with 156 additions and 135 deletions

View File

@ -61,5 +61,4 @@ public class PersistableList<T extends PersistablePayload> implements Persistabl
public Message toProtoMessage() {
return toProto.apply(list);
}
}

View File

@ -20,32 +20,25 @@ package io.bisq.common.proto.persistable;
import com.google.common.collect.Lists;
import com.google.protobuf.Message;
import io.bisq.generated.protobuffer.PB;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.*;
import org.springframework.util.CollectionUtils;
import java.util.List;
@EqualsAndHashCode
@AllArgsConstructor
@NoArgsConstructor
public class PersistableViewPath implements PersistableEnvelope {
@Getter
@Setter
public class PersistableViewPath implements PersistableEnvelope {
private List<String> viewPath = Lists.newArrayList();
@Override
public Message toProtoMessage() {
return CollectionUtils.isEmpty(viewPath) ?
PB.PersistableEnvelope.newBuilder()
.setViewPathAsString(PB.ViewPathAsString.newBuilder())
.build()
:
PB.PersistableEnvelope.newBuilder()
.setViewPathAsString(PB.ViewPathAsString.newBuilder()
.addAllViewPath(viewPath))
.build();
final PB.ViewPathAsString.Builder builder = PB.ViewPathAsString.newBuilder();
if (!CollectionUtils.isEmpty(viewPath))
builder.addAllViewPath(viewPath);
return PB.PersistableEnvelope.newBuilder().setViewPathAsString(builder).build();
}
public static PersistableEnvelope fromProto(PB.ViewPathAsString proto) {

View File

@ -19,7 +19,6 @@ package io.bisq.common.storage;
import com.google.common.util.concurrent.CycleDetectingLockFactory;
import io.bisq.common.UserThread;
import io.bisq.common.io.LookAheadObjectInputStream;
import io.bisq.common.proto.persistable.PersistableEnvelope;
import io.bisq.common.proto.persistable.PersistenceProtoResolver;
import io.bisq.common.util.Utilities;
@ -28,7 +27,6 @@ import lombok.extern.slf4j.Slf4j;
import java.io.*;
import java.nio.file.Paths;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -44,7 +42,7 @@ public class FileManager<T extends PersistableEnvelope> {
private final AtomicBoolean savePending;
private final long delay;
private final Callable<Void> saveFileTask;
private T serializable;
private T persistable;
private final PersistenceProtoResolver persistenceProtoResolver;
private final ReentrantLock writeLock = CycleDetectingLockFactory.newInstance(CycleDetectingLockFactory.Policies.THROW).newReentrantLock("writeLock");
@ -70,7 +68,7 @@ public class FileManager<T extends PersistableEnvelope> {
// Some other scheduled request already beat us to it.
return null;
}
saveNowInternal(serializable);
saveNowInternal(persistable);
return null;
};
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
@ -86,19 +84,19 @@ public class FileManager<T extends PersistableEnvelope> {
/**
* Actually write the wallet file to disk, using an atomic rename when possible. Runs on the current thread.
*/
public void saveNow(T serializable) {
saveNowInternal(serializable);
public void saveNow(T persistable) {
saveNowInternal(persistable);
}
/**
* Queues up a save in the background. Useful for not very important wallet changes.
*/
public void saveLater(T serializable) {
saveLater(serializable, delay);
public void saveLater(T persistable) {
saveLater(persistable, delay);
}
public void saveLater(T serializable, long delayInMilli) {
this.serializable = serializable;
public void saveLater(T persistable, long delayInMilli) {
this.persistable = persistable;
if (savePending.getAndSet(true))
return; // Already pending.
@ -108,21 +106,17 @@ public class FileManager<T extends PersistableEnvelope> {
public synchronized T read(File file) throws IOException, ClassNotFoundException {
log.debug("read" + file);
Optional<PersistableEnvelope> persistable = Optional.empty();
try (final FileInputStream fileInputStream = new FileInputStream(file)) {
persistable = persistenceProtoResolver.fromProto(PB.PersistableEnvelope.parseDelimitedFrom(fileInputStream));
return (T) persistenceProtoResolver.fromProto(PB.PersistableEnvelope.parseDelimitedFrom(fileInputStream));
} catch (Throwable t) {
log.error("Exception at proto read: " + t.getMessage() + " " + file.getName());
//if(DevEnv.DEV_MODE)
throw new RuntimeException("Exception at proto read: " + t.getMessage() + " " + file.getName());
}
if (persistable.isPresent()) {
log.info("Reading DiscEnvelope: {}", persistable.get().getClass());
//noinspection unchecked
return (T) persistable.get();
}
try (final FileInputStream fileInputStream = new FileInputStream(file);
/*try (final FileInputStream fileInputStream = new FileInputStream(file);
final ObjectInputStream objectInputStream = new LookAheadObjectInputStream(fileInputStream, false)) {
//noinspection unchecked
log.warn("Still using Serializable storing for file: {}", file);
@ -130,7 +124,7 @@ public class FileManager<T extends PersistableEnvelope> {
} catch (Throwable t) {
log.error("Exception at read: " + t.getMessage());
throw t;
}
}*/
}
public synchronized void removeFile(String fileName) {
@ -195,11 +189,12 @@ public class FileManager<T extends PersistableEnvelope> {
ObjectOutputStream objectOutputStream = null;
PrintWriter printWriter = null;
// is it a protobuffer thing?
log.error("persistable.class " + persistable.getClass().getSimpleName());
log.error("persistable " + persistable);
PB.PersistableEnvelope protoPersistable = null;
try {
protoPersistable = (PB.PersistableEnvelope) persistable.toProtoMessage();
log.error("protoPersistable " + protoPersistable);
} catch (Throwable e) {
log.debug("Not protobufferable: {}, {}, {}", persistable.getClass().getSimpleName(), storageFile, e.getStackTrace());
}

View File

@ -64,7 +64,7 @@ public class Storage<T extends PersistableEnvelope> {
private final File dir;
private FileManager<T> fileManager;
private File storageFile;
private T serializable;
private T persistable;
private String fileName;
private int numMaxBackupFiles = 10;
private final PersistenceProtoResolver persistenceProtoResolver;
@ -102,13 +102,13 @@ public class Storage<T extends PersistableEnvelope> {
}
@Nullable
public T initAndGetPersisted(T serializable) {
return initAndGetPersisted(serializable, serializable.getClass().getSimpleName());
public T initAndGetPersisted(T persistable) {
return initAndGetPersisted(persistable, persistable.getClass().getSimpleName());
}
@Nullable
public T initAndGetPersisted(T serializable, String fileName) {
this.serializable = serializable;
public T initAndGetPersisted(T persistable, String fileName) {
this.persistable = persistable;
this.fileName = fileName;
storageFile = new File(dir, fileName);
fileManager = new FileManager<>(dir, storageFile, 600, persistenceProtoResolver);
@ -117,11 +117,11 @@ public class Storage<T extends PersistableEnvelope> {
}
public void queueUpForSave() {
queueUpForSave(serializable);
queueUpForSave(persistable);
}
public void queueUpForSave(long delayInMilli) {
queueUpForSave(serializable, delayInMilli);
queueUpForSave(persistable, delayInMilli);
}
public void setNumMaxBackupFiles(int numMaxBackupFiles) {
@ -129,25 +129,25 @@ public class Storage<T extends PersistableEnvelope> {
}
// Save delayed and on a background thread
public void queueUpForSave(T serializable) {
if (serializable != null) {
public void queueUpForSave(T persistable) {
if (persistable != null) {
log.trace("save " + fileName);
checkNotNull(storageFile, "storageFile = null. Call setupFileStorage before using read/write.");
fileManager.saveLater(serializable);
fileManager.saveLater(persistable);
} else {
log.trace("queueUpForSave called but no serializable set");
log.trace("queueUpForSave called but no persistable set");
}
}
public void queueUpForSave(T serializable, long delayInMilli) {
if (serializable != null) {
public void queueUpForSave(T persistable, long delayInMilli) {
if (persistable != null) {
log.trace("save " + fileName);
checkNotNull(storageFile, "storageFile = null. Call setupFileStorage before using read/write.");
fileManager.saveLater(serializable, delayInMilli);
fileManager.saveLater(persistable, delayInMilli);
} else {
log.trace("queueUpForSave called but no serializable set");
log.trace("queueUpForSave called but no persistable set");
}
}

View File

@ -800,7 +800,7 @@ message PersistableEnvelope {
oneof message {
AddressEntryList address_entry_list = 1;
ViewPathAsString view_path_as_string = 2;
PeersList peers_list = 3;
PeerList peer_list = 3;
Preferences preferences = 4;
UserPayload user_payload = 5;
CompensationRequestPayload compensation_request_payload = 6;
@ -1155,8 +1155,8 @@ message Transaction {
bytes hash = 1;
}
message PeersList {
repeated Peer peers = 1;
message PeerList {
repeated Peer peer = 1;
}
message PersistedEntryMap {

View File

@ -67,7 +67,6 @@ public final class AddressEntry implements PersistablePayload {
@Getter
private final byte[] pubKeyHash;
@Nullable
private long coinLockedInMultiSig;
@Nullable
@ -96,12 +95,16 @@ public final class AddressEntry implements PersistablePayload {
pubKeyHash = keyPair.getPubKeyHash();
}
// called from Resolver
public AddressEntry(byte[] pubKey,
///////////////////////////////////////////////////////////////////////////////////////////
// PROTO BUFFER
///////////////////////////////////////////////////////////////////////////////////////////
private AddressEntry(byte[] pubKey,
byte[] pubKeyHash,
Context context,
@Nullable String offerId,
@Nullable Coin coinLockedInMultiSig) {
Coin coinLockedInMultiSig) {
this.pubKey = pubKey;
this.pubKeyHash = pubKeyHash;
this.context = context;
@ -109,6 +112,30 @@ public final class AddressEntry implements PersistablePayload {
this.coinLockedInMultiSig = coinLockedInMultiSig.value;
}
public static AddressEntry fromProto(PB.AddressEntry proto) {
return new AddressEntry(proto.getPubKey().toByteArray(),
proto.getPubKeyHash().toByteArray(),
AddressEntry.Context.valueOf(proto.getContext().name()),
proto.getOfferId().isEmpty() ? null : proto.getOfferId(),
Coin.valueOf(proto.getCoinLockedInMultiSig()));
}
@Override
public Message toProtoMessage() {
PB.AddressEntry.Builder builder = PB.AddressEntry.newBuilder()
.setPubKey(ByteString.copyFrom(pubKey))
.setPubKeyHash(ByteString.copyFrom(pubKeyHash))
.setContext(PB.AddressEntry.Context.valueOf(context.name()))
.setCoinLockedInMultiSig(coinLockedInMultiSig);
Optional.ofNullable(offerId).ifPresent(builder::setOfferId);
return builder.build();
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
// Set after wallet is ready
public void setDeterministicKey(DeterministicKey deterministicKey) {
this.keyPair = deterministicKey;
@ -127,11 +154,6 @@ public final class AddressEntry implements PersistablePayload {
this.coinLockedInMultiSig = coinLockedInMultiSig.value;
}
///////////////////////////////////////////////////////////////////////////////////////////
// Getters
///////////////////////////////////////////////////////////////////////////////////////////
// For display we usually only display the first 8 characters.
@Nullable
public String getShortOfferId() {
@ -176,15 +198,4 @@ public final class AddressEntry implements PersistablePayload {
", address=" + getAddressString() +
'}';
}
@Override
public Message toProtoMessage() {
PB.AddressEntry.Builder builder = PB.AddressEntry.newBuilder()
.setContext(PB.AddressEntry.Context.valueOf(context.name()))
.setPubKey(ByteString.copyFrom(pubKey))
.setCoinLockedInMultiSig(coinLockedInMultiSig)
.setPubKeyHash(ByteString.copyFrom(pubKeyHash));
Optional.ofNullable(offerId).ifPresent(builder::setOfferId);
return builder.build();
}
}

View File

@ -23,7 +23,6 @@ import io.bisq.common.proto.persistable.PersistableEnvelope;
import io.bisq.common.storage.Storage;
import io.bisq.generated.protobuffer.PB;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.bitcoinj.crypto.DeterministicKey;
@ -40,18 +39,33 @@ import java.util.stream.Stream;
@ToString
@Slf4j
public final class AddressEntryList implements PersistableEnvelope {
final transient private Storage<AddressEntryList> storage;
transient private Storage<AddressEntryList> storage;
transient private Wallet wallet;
@Getter
private List<AddressEntry> list = new ArrayList<>();
@Setter
private boolean doPersist;
@Inject
public AddressEntryList(Storage<AddressEntryList> storage) {
this.storage = storage;
}
///////////////////////////////////////////////////////////////////////////////////////////
// PROTO BUFFER
///////////////////////////////////////////////////////////////////////////////////////////
private AddressEntryList(List<AddressEntry> list) {
this.list = list;
}
public static AddressEntryList fromProto(PB.AddressEntryList proto) {
return new AddressEntryList(proto.getAddressEntryList().stream().map(AddressEntry::fromProto).collect(Collectors.toList()));
}
///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////
public void onWalletReady(Wallet wallet) {
this.wallet = wallet;
@ -66,7 +80,6 @@ public final class AddressEntryList implements PersistableEnvelope {
}
}
} else {
doPersist = true;
add(new AddressEntry(wallet.freshReceiveKey(), AddressEntry.Context.ARBITRATOR));
persist();
}
@ -110,7 +123,6 @@ public final class AddressEntryList implements PersistableEnvelope {
}
public void persist() {
if (doPersist)
storage.queueUpForSave(50);
}
@ -123,4 +135,5 @@ public final class AddressEntryList implements PersistableEnvelope {
.build();
return build;
}
}

View File

@ -8,7 +8,6 @@ import io.bisq.common.proto.persistable.PersistableList;
import io.bisq.common.proto.persistable.PersistableViewPath;
import io.bisq.common.proto.persistable.PersistenceProtoResolver;
import io.bisq.common.storage.Storage;
import io.bisq.core.btc.AddressEntry;
import io.bisq.core.btc.AddressEntryList;
import io.bisq.core.btc.wallet.BtcWalletService;
import io.bisq.core.dao.compensation.CompensationRequestPayload;
@ -21,10 +20,9 @@ import io.bisq.core.user.BlockChainExplorer;
import io.bisq.core.user.Preferences;
import io.bisq.core.user.UserPayload;
import io.bisq.generated.protobuffer.PB;
import io.bisq.network.p2p.peers.peerexchange.Peer;
import io.bisq.network.p2p.peers.peerexchange.PeerList;
import io.bisq.network.p2p.storage.SequenceNumberMap;
import lombok.extern.slf4j.Slf4j;
import org.bitcoinj.core.Coin;
import javax.inject.Inject;
import javax.inject.Named;
@ -34,7 +32,6 @@ import java.util.stream.Collectors;
@Slf4j
public class CorePersistenceProtoResolver extends CoreProtoResolver implements PersistenceProtoResolver {
private final Provider<AddressEntryList> addressEntryListProvider;
private final Provider<Preferences> preferencesProvider;
private final Storage<TradableList<OpenOffer>> openOfferStorage;
private final Storage<TradableList<BuyerAsMakerTrade>> buyerAsMakerTradeStorage;
@ -45,11 +42,9 @@ public class CorePersistenceProtoResolver extends CoreProtoResolver implements P
@Inject
public CorePersistenceProtoResolver(Provider<Preferences> preferencesProvider,
Provider<AddressEntryList> addressEntryListProvider,
Provider<BtcWalletService> btcWalletService,
@Named(Storage.STORAGE_DIR) File storageDir) {
this.preferencesProvider = preferencesProvider;
this.addressEntryListProvider = addressEntryListProvider;
this.btcWalletService = btcWalletService;
openOfferStorage = new Storage<>(storageDir, this);
@ -65,13 +60,13 @@ public class CorePersistenceProtoResolver extends CoreProtoResolver implements P
switch (proto.getMessageCase()) {
case ADDRESS_ENTRY_LIST:
return fillAddressEntryList(proto, addressEntryListProvider.get());
return AddressEntryList.fromProto(proto.getAddressEntryList());
case VIEW_PATH_AS_STRING:
return PersistableViewPath.fromProto(proto.getViewPathAsString());
case TRADABLE_LIST:
return getTradableList(proto.getTradableList());
case PEERS_LIST:
return getPeersList(proto.getPeersList());
case PEER_LIST:
return PeerList.fromProto(proto.getPeerList());
case COMPENSATION_REQUEST_PAYLOAD:
// TODO There will be another object for PersistableEnvelope
return CompensationRequestPayload.fromProto(proto.getCompensationRequestPayload());
@ -98,11 +93,6 @@ public class CorePersistenceProtoResolver extends CoreProtoResolver implements P
.map(TradeStatistics::fromProto).collect(Collectors.toList()));
}
private PersistableEnvelope getPeersList(PB.PeersList envelope) {
return new PersistableList<>(envelope.getPeersList().stream().map(Peer::fromProto)
.collect(Collectors.toList()));
}
private Preferences fillPreferences(PB.PersistableEnvelope envelope, Preferences preferences) {
final PB.Preferences env = envelope.getPreferences();
preferences.setUserLanguage(env.getUserLanguage());
@ -160,17 +150,4 @@ public class CorePersistenceProtoResolver extends CoreProtoResolver implements P
preferences.setDoPersist(true);
return preferences;
}
private AddressEntryList fillAddressEntryList(PB.PersistableEnvelope envelope, AddressEntryList addressEntryList) {
envelope.getAddressEntryList().getAddressEntryList().stream().forEach(addressEntry -> {
final AddressEntry entry = new AddressEntry(addressEntry.getPubKey().toByteArray(),
addressEntry.getPubKeyHash().toByteArray(),
AddressEntry.Context.valueOf(addressEntry.getContext().name()),
addressEntry.getOfferId(),
Coin.valueOf(addressEntry.getCoinLockedInMultiSig()));
addressEntryList.addAddressEntry(entry);
});
addressEntryList.setDoPersist(true);
return addressEntryList;
}
}

View File

@ -1,18 +1,15 @@
package io.bisq.network.p2p.peers;
import com.google.protobuf.Message;
import io.bisq.common.Clock;
import io.bisq.common.Timer;
import io.bisq.common.UserThread;
import io.bisq.common.app.Log;
import io.bisq.common.proto.ProtoCollectionUtil;
import io.bisq.common.proto.persistable.PersistableList;
import io.bisq.common.proto.persistable.PersistenceProtoResolver;
import io.bisq.common.storage.Storage;
import io.bisq.generated.protobuffer.PB;
import io.bisq.network.p2p.NodeAddress;
import io.bisq.network.p2p.network.*;
import io.bisq.network.p2p.peers.peerexchange.Peer;
import io.bisq.network.p2p.peers.peerexchange.PeerList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -21,7 +18,6 @@ import java.io.File;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
public class PeerManager implements ConnectionListener {
@ -78,7 +74,7 @@ public class PeerManager implements ConnectionListener {
private final NetworkNode networkNode;
private final Clock clock;
private final Set<NodeAddress> seedNodeAddresses;
private final Storage<PersistableList<Peer>> dbStorage;
private final Storage<PeerList> dbStorage;
private final HashSet<Peer> persistedPeers = new HashSet<>();
private final Set<Peer> reportedPeers = new HashSet<>();
@ -101,10 +97,10 @@ public class PeerManager implements ConnectionListener {
this.seedNodeAddresses = new HashSet<>(seedNodeAddresses);
networkNode.addConnectionListener(this);
dbStorage = new Storage<>(storageDir, persistenceProtoResolver);
PersistableList persistedList = dbStorage.initAndGetPersistedWithFileName("PersistedPeers");
if (persistedList != null) {
log.debug("We have persisted reported list. persistedList.size()=" + persistedList.getList().size());
this.persistedPeers.addAll(persistedList.getList());
PeerList persistedPeerList = dbStorage.initAndGetPersistedWithFileName("PeerList");
if (persistedPeerList != null) {
log.debug("We have persisted reported list. persistedPeerList.size()=" + persistedPeerList.getList().size());
this.persistedPeers.addAll(persistedPeerList.getList());
}
// we check if app was idle for more then 5 sec.
@ -394,7 +390,7 @@ public class PeerManager implements ConnectionListener {
persistedPeers.addAll(reportedPeersToAdd);
purgePersistedPeersIfExceeds();
if (dbStorage != null)
dbStorage.queueUpForSave(new PersistableList(persistedPeers, getListToProto()), 2000); // We clone it to avoid ConcurrentModificationExceptions at save
dbStorage.queueUpForSave(new PeerList(new ArrayList<>(persistedPeers)), 2000);
printReportedPeers();
} else {
@ -459,10 +455,8 @@ public class PeerManager implements ConnectionListener {
if (persistedPeers.contains(persistedPeer)) {
persistedPeers.remove(persistedPeer);
if (dbStorage != null) {
PersistableList serializable = new PersistableList(persistedPeers, getListToProto());
dbStorage.queueUpForSave(serializable, 2000);
}
if (dbStorage != null)
dbStorage.queueUpForSave(new PeerList(new ArrayList<>(persistedPeers)), 2000);
return true;
} else {
@ -631,10 +625,4 @@ public class PeerManager implements ConnectionListener {
log.debug(result.toString());
}
}
private Function<List<Peer>, Message> getListToProto() {
return (List<Peer> list) -> {
return PB.PersistableEnvelope.newBuilder().setPeersList(PB.PeersList.newBuilder().addAllPeers(ProtoCollectionUtil.collectionToProto(list))).build();
};
}
}

View File

@ -0,0 +1,46 @@
/*
* This file is part of bisq.
*
* bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package io.bisq.network.p2p.peers.peerexchange;
import com.google.protobuf.Message;
import io.bisq.common.proto.persistable.PersistableEnvelope;
import io.bisq.common.proto.persistable.PersistableList;
import io.bisq.generated.protobuffer.PB;
import java.util.List;
import java.util.stream.Collectors;
public class PeerList extends PersistableList<Peer> {
public PeerList(List<Peer> list) {
super(list);
}
@Override
public Message toProtoMessage() {
return PB.PersistableEnvelope.newBuilder()
.setPeerList(PB.PeerList.newBuilder()
.addAllPeer(getList().stream().map(Peer::toProtoMessage).collect(Collectors.toList())))
.build();
}
public static PersistableEnvelope fromProto(PB.PeerList proto) {
return new PeerList(proto.getPeerList().stream().map(Peer::fromProto)
.collect(Collectors.toList()));
}
}

View File

@ -16,7 +16,6 @@ import org.slf4j.LoggerFactory;
import java.io.*;
import java.security.*;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -207,7 +206,7 @@ public class TestUtils {
}
@Override
public Optional<PersistableEnvelope> fromProto(PB.PersistableEnvelope persistable) {
public PersistableEnvelope fromProto(PB.PersistableEnvelope persistable) {
return null;
}
};