BlockFileLoader: refactor to use single-file Block iterator internally

* Replace iterator that iterates all blocks in a file list with iterator
  that iterates all blocks in a single file
* Rewrite stream() to use flatMap() on a stream of blocks in a file (internally
  using BlockFileIterator)
* Rewrite iterator() to use stream()

This is a step towards a standalone capability to iterate/stream the blocks
in a single .dat file. It also is a step towards higher-performance
implementations that can use multiple threads.
This commit is contained in:
Sean Gilligan 2023-08-13 16:28:05 -07:00 committed by Andreas Schildbach
parent 18126e368b
commit 7776f7c907

View file

@ -1,5 +1,5 @@
/* /*
* Copyright 2012 Matt Corallo. * Copyright by the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -33,6 +33,8 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream; import java.util.stream.Stream;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
@ -43,7 +45,7 @@ import static org.bitcoinj.base.internal.Preconditions.checkArgument;
* blocks together. Importing block data with this tool can be a lot faster than syncing over the network, if you * blocks together. Importing block data with this tool can be a lot faster than syncing over the network, if you
* have the files available.</p> * have the files available.</p>
* *
* <p>In order to comply with {@link Iterable}, {@link BlockIterator} swallows a lot of {@link IOException}s, which may result in a few * <p>In order to comply with {@link Iterable}, {@link BlockFileIterator} swallows a lot of {@link IOException}s, which may result in a few
* blocks being missed followed by a huge set of orphan blocks.</p> * blocks being missed followed by a huge set of orphan blocks.</p>
* *
* <p>To blindly import all files which can be found in Bitcoin Core (version 0.8 or higher) datadir automatically, * <p>To blindly import all files which can be found in Bitcoin Core (version 0.8 or higher) datadir automatically,
@ -111,14 +113,17 @@ public class BlockFileLoader implements Iterable<Block> {
} }
public class BlockIterator implements Iterator<Block> { /**
private final Iterator<File> fileIt; * Iterates all the blocks in a single block file.
private File file = null; */
private FileInputStream currentFileStream = null; public class BlockFileIterator implements Iterator<Block> {
private Block nextBlock = null; private final File file;
private final FileInputStream currentFileStream;
private org.bitcoinj.core.Block nextBlock = null;
public BlockIterator(List<File> fileList) { public BlockFileIterator(File blockFile) throws FileNotFoundException {
this.fileIt = fileList.iterator(); this.file = blockFile;
currentFileStream = new FileInputStream(blockFile);
} }
@Override @Override
@ -138,69 +143,41 @@ public class BlockFileLoader implements Iterable<Block> {
} }
private void loadNextBlock() { private void loadNextBlock() {
while (true) { try {
try { if (currentFileStream.available() < 1) {
if (!fileIt.hasNext() && (currentFileStream == null || currentFileStream.available() < 1)) nextBlock = null;
break; return;
} catch (IOException e) {
currentFileStream = null;
if (!fileIt.hasNext())
break;
} }
while (true) { int nextChar = currentFileStream.read();
try { while (nextChar != -1) {
if (currentFileStream != null && currentFileStream.available() > 0) if (nextChar != ((packetMagic >>> 24) & 0xff)) {
break;
} catch (IOException e1) {
currentFileStream = null;
}
if (!fileIt.hasNext()) {
nextBlock = null;
currentFileStream = null;
return;
}
file = fileIt.next();
try {
currentFileStream = new FileInputStream(file);
} catch (FileNotFoundException e) {
currentFileStream = null;
}
}
try {
int nextChar = currentFileStream.read();
while (nextChar != -1) {
if (nextChar != ((packetMagic >>> 24) & 0xff)) {
nextChar = currentFileStream.read();
continue;
}
nextChar = currentFileStream.read(); nextChar = currentFileStream.read();
if (nextChar != ((packetMagic >>> 16) & 0xff))
continue;
nextChar = currentFileStream.read();
if (nextChar != ((packetMagic >>> 8) & 0xff))
continue;
nextChar = currentFileStream.read();
if (nextChar == (packetMagic & 0xff))
break;
}
byte[] bytes = new byte[4];
currentFileStream.read(bytes, 0, 4);
long size = ByteUtils.readUint32(bytes, 0);
bytes = new byte[(int) size];
currentFileStream.read(bytes, 0, (int) size);
try {
nextBlock = serializer.makeBlock(ByteBuffer.wrap(bytes));
} catch (ProtocolException e) {
nextBlock = null;
continue; continue;
} catch (Exception e) {
throw new RuntimeException("unexpected problem with block in " + file, e);
} }
break; nextChar = currentFileStream.read();
} catch (IOException e) { if (nextChar != ((packetMagic >>> 16) & 0xff))
currentFileStream = null; continue;
continue; nextChar = currentFileStream.read();
if (nextChar != ((packetMagic >>> 8) & 0xff))
continue;
nextChar = currentFileStream.read();
if (nextChar == (packetMagic & 0xff))
break;
} }
byte[] bytes = new byte[4];
currentFileStream.read(bytes, 0, 4);
long size = ByteUtils.readUint32(bytes, 0);
bytes = new byte[(int) size];
currentFileStream.read(bytes, 0, (int) size);
try {
nextBlock = serializer.makeBlock(ByteBuffer.wrap(bytes));
} catch (ProtocolException e) {
nextBlock = null;
} catch (Exception e) {
throw new RuntimeException("unexpected problem with block in " + file, e);
}
} catch (IOException e) {
nextBlock = null;
} }
} }
@ -212,10 +189,25 @@ public class BlockFileLoader implements Iterable<Block> {
@Override @Override
public Iterator<Block> iterator() { public Iterator<Block> iterator() {
return new BlockIterator(files); return stream().iterator();
} }
public Stream<Block> stream() { public Stream<Block> stream() {
return StreamSupport.stream(spliterator(), false); return files.stream()
.flatMap(this::fileBlockStream);
}
protected Stream<Block> fileBlockStream(File file) {
return StreamSupport.stream(fileBlockSpliterator(file), false);
}
protected Spliterator<Block> fileBlockSpliterator(File file) {
try {
Iterator<Block> iterator = new BlockFileIterator(file);
int characteristics = Spliterator.DISTINCT | Spliterator.ORDERED;
return Spliterators.spliteratorUnknownSize(iterator, characteristics);
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
} }
} }