new stxo struct and more db methods

I'm getting away from having both in-ram and on-disk stores for the transaction store data.
it should all be on disk, it's safer that way.  It might be slower but this will not
process many txs / second anyway.
This commit is contained in:
Tadge Dryja 2016-01-27 01:24:16 -08:00
parent 851d3533e5
commit d9afd623eb
4 changed files with 225 additions and 49 deletions

View File

@ -222,10 +222,19 @@ func (s *SPVCon) AskForTx(txid wire.ShaHash) {
// We don't have it in our header file so when we get it we do both operations:
// appending and checking the header, and checking spv proofs
func (s *SPVCon) AskForBlock(hsh wire.ShaHash) {
fmt.Printf("mBlockQueue len %d\n", len(s.mBlockQueue))
// wait until all mblocks are done before adding
for len(s.mBlockQueue) != 0 {
// fmt.Printf("mBlockQueue len %d\n", len(s.mBlockQueue))
}
gdata := wire.NewMsgGetData()
inv := wire.NewInvVect(wire.InvTypeFilteredBlock, &hsh)
gdata.AddInvVect(inv)
// TODO - wait until headers are sync'd before checking height
info, err := s.headerFile.Stat() // get
if err != nil {
log.Fatal(err) // crash if header file disappears
@ -437,7 +446,11 @@ func (s *SPVCon) AskForMerkBlocks(current, last int32) error {
}
fmt.Printf("will request merkleblocks %d to %d\n", current, last)
// track number of utxos
track := len(s.TS.Utxos)
track, err := s.TS.NumUtxos()
if err != nil {
return err
}
// create initial filter
filt, err := s.TS.GimmeFilter()
if err != nil {
@ -454,15 +467,20 @@ func (s *SPVCon) AskForMerkBlocks(current, last int32) error {
// loop through all heights where we want merkleblocks.
for current < last {
// check if we need to update filter... diff of 5 utxos...?
if track < len(s.TS.Utxos)-4 || track > len(s.TS.Utxos)+4 {
track = len(s.TS.Utxos)
nTrack, err := s.TS.NumUtxos()
if err != nil {
return err
}
if track < nTrack-4 || track > nTrack+4 {
track = nTrack
filt, err := s.TS.GimmeFilter()
if err != nil {
return err
}
s.SendFilter(filt)
fmt.Printf("sent filter %x\n", filt.MsgFilterLoad().Filter)
fmt.Printf("sent %d byte filter\n", len(filt.MsgFilterLoad().Filter))
}
// load header from file

View File

@ -54,18 +54,14 @@ func (s *SPVCon) incomingMessageHandler() {
log.Printf("Rejected! cmd: %s code: %s tx: %s reason: %s",
m.Cmd, m.Code.String(), m.Hash.String(), m.Reason)
case *wire.MsgInv:
log.Printf("got inv. Contains:\n")
go s.InvHandler(m)
case *wire.MsgNotFound:
log.Printf("Got not found response from remote:")
for i, thing := range m.InvList {
log.Printf("\t%d)%s : %s",
i, thing.Type.String(), thing.Hash.String())
if thing.Type == wire.InvTypeTx { // new tx, ingest
s.TS.OKTxids[thing.Hash] = 0 // unconfirmed
s.AskForTx(thing.Hash)
}
if thing.Type == wire.InvTypeBlock { // new block, ingest
s.AskForBlock(thing.Hash)
}
log.Printf("\t$d) %s: %s", i, thing.Type, thing.Hash)
}
default:
log.Printf("Got unknown message type %s\n", m.Command())
}
@ -86,3 +82,18 @@ func (s *SPVCon) outgoingMessageHandler() {
}
return
}
func (s *SPVCon) InvHandler(m *wire.MsgInv) {
log.Printf("got inv. Contains:\n")
for i, thing := range m.InvList {
log.Printf("\t%d)%s : %s",
i, thing.Type.String(), thing.Hash.String())
if thing.Type == wire.InvTypeTx { // new tx, ingest
s.TS.OKTxids[thing.Hash] = 0 // unconfirmed
s.AskForTx(thing.Hash)
}
if thing.Type == wire.InvTypeBlock { // new block, ingest
s.AskForBlock(thing.Hash)
}
}
}

View File

@ -32,12 +32,21 @@ type TxStore struct {
}
type Utxo struct { // cash money.
Op wire.OutPoint // where
// all the info needed to spend
AtHeight int32 // block height where this tx was confirmed, 0 for unconf
KeyIdx uint32 // index for private key needed to sign / spend
Value int64 // higher is better
Op wire.OutPoint // where
// IsCoinbase bool // can't spend for a while
}
// Stxo is a utxo that has moved on.
type Stxo struct {
Utxo // when it used to be a utxo
SpendHeight int32 // height at which it met its demise
SpendTxid wire.ShaHash // the tx that consumed it
}
type MyAdr struct { // an address I have the private key for
@ -79,7 +88,12 @@ func (t *TxStore) GimmeFilter() (*bloom.Filter, error) {
return nil, fmt.Errorf("no addresses to filter for")
}
// add addresses to look for incoming
elem := uint32(len(t.Adrs) + len(t.Utxos))
nutxo, err := t.NumUtxos()
if err != nil {
return nil, err
}
elem := uint32(len(t.Adrs)) + nutxo
f := bloom.NewFilter(elem, 0, 0.001, wire.BloomUpdateAll)
for _, a := range t.Adrs {
f.Add(a.PkhAdr.ScriptAddress())
@ -121,7 +135,7 @@ func (t *TxStore) AbsorbTx(tx *wire.MsgTx, height int32) error {
newTxid := tx.TxSha()
var hits uint32 // how many outputs of this tx are ours
var acq int64 // total acquirement from this tx
// check if any of the tx's outputs match my adrs
// check if any of the tx's outputs match my known outpoints
for i, out := range tx.TxOut { // in each output of tx
dup := false // start by assuming its new until found duplicate
newOp := wire.NewOutPoint(&newTxid, uint32(i))
@ -133,7 +147,7 @@ func (t *TxStore) AbsorbTx(tx *wire.MsgTx, height int32) error {
fmt.Printf(" %s is dupe\t", newOp.String())
u.AtHeight = height // ONLY difference is height
// save modified utxo to db, overwriting old one
err := u.SaveToDB(t.StateDB)
err := t.SaveUtxo(u)
if err != nil {
return err
}
@ -145,15 +159,15 @@ func (t *TxStore) AbsorbTx(tx *wire.MsgTx, height int32) error {
// when it matches an address, just go to the next outpoint
continue
}
// check if this is a new txout matching one of my addresses
for _, a := range t.Adrs { // compare to each adr we have
// check for full script to eliminate false positives
aPKscript, err := txscript.PayToAddrScript(a.PkhAdr)
if err != nil {
return err
}
// already checked for dupes, this must be a new outpoint
if bytes.Equal(out.PkScript, aPKscript) { // hit
// already checked for dupes, so this must be a new outpoint
var newu Utxo
newu.AtHeight = height
newu.KeyIdx = a.KeyIdx
@ -163,7 +177,7 @@ func (t *TxStore) AbsorbTx(tx *wire.MsgTx, height int32) error {
newop.Hash = tx.TxSha()
newop.Index = uint32(i)
newu.Op = newop
err = newu.SaveToDB(t.StateDB)
err = t.SaveUtxo(&newu)
if err != nil {
return err
}
@ -193,7 +207,7 @@ func (t *TxStore) ExpellTx(tx *wire.MsgTx, height int32) error {
if OutPointsEqual(myutxo.Op, in.PreviousOutPoint) {
hits++
loss += myutxo.Value
err := t.MarkSpent(&myutxo.Op, height, tx)
err := t.MarkSpent(*myutxo, height, tx)
if err != nil {
return err
}

View File

@ -14,7 +14,8 @@ import (
var (
BKTUtxos = []byte("DuffelBag") // leave the rest to collect interest
BKTOld = []byte("SpentTxs") // for bookkeeping
BKTStxos = []byte("SpentTxs") // for bookkeeping
BKTTxns = []byte("Txns") // all txs we care about, for replays
BKTState = []byte("MiscState") // last state of DB
KEYNumKeys = []byte("NumKeys") // number of keys used
@ -27,16 +28,20 @@ func (ts *TxStore) OpenDB(filename string) error {
return err
}
// create buckets if they're not already there
return ts.StateDB.Update(func(tx *bolt.Tx) error {
_, err = tx.CreateBucketIfNotExists(BKTUtxos)
return ts.StateDB.Update(func(btx *bolt.Tx) error {
_, err = btx.CreateBucketIfNotExists(BKTUtxos)
if err != nil {
return err
}
_, err = tx.CreateBucketIfNotExists(BKTOld)
_, err = btx.CreateBucketIfNotExists(BKTStxos)
if err != nil {
return err
}
_, err = tx.CreateBucketIfNotExists(BKTState)
_, err = btx.CreateBucketIfNotExists(BKTTxns)
if err != nil {
return err
}
_, err = btx.CreateBucketIfNotExists(BKTState)
if err != nil {
return err
}
@ -69,8 +74,8 @@ func (ts *TxStore) NewAdr() (*btcutil.AddressPubKeyHash, error) {
}
// write to db file
err = ts.StateDB.Update(func(tx *bolt.Tx) error {
stt := tx.Bucket(BKTState)
err = ts.StateDB.Update(func(btx *bolt.Tx) error {
stt := btx.Bucket(BKTState)
return stt.Put(KEYNumKeys, buf.Bytes())
})
if err != nil {
@ -81,6 +86,24 @@ func (ts *TxStore) NewAdr() (*btcutil.AddressPubKeyHash, error) {
return newAdr, nil
}
// NumUtxos returns the number of utxos in the DB.
func (ts *TxStore) NumUtxos() (uint32, error) {
var n uint32
err := ts.StateDB.View(func(btx *bolt.Tx) error {
duf := btx.Bucket(BKTUtxos)
if duf == nil {
return fmt.Errorf("no duffel bag")
}
stats := duf.Stats()
n = uint32(stats.KeyN)
return nil
})
if err != nil {
return 0, err
}
return n, nil
}
// PopulateAdrs just puts a bunch of adrs in ram; it doesn't touch the DB
func (ts *TxStore) PopulateAdrs(lastKey uint32) error {
for k := uint32(0); k < lastKey; k++ {
@ -101,10 +124,9 @@ func (ts *TxStore) PopulateAdrs(lastKey uint32) error {
}
// SaveToDB write a utxo to disk, overwriting an old utxo of the same outpoint
func (u *Utxo) SaveToDB(dbx *bolt.DB) error {
err := dbx.Update(func(tx *bolt.Tx) error {
duf := tx.Bucket(BKTUtxos)
func (ts *TxStore) SaveUtxo(u *Utxo) error {
err := ts.StateDB.Update(func(btx *bolt.Tx) error {
duf := btx.Bucket(BKTUtxos)
b, err := u.ToBytes()
if err != nil {
return err
@ -124,26 +146,47 @@ func (u *Utxo) SaveToDB(dbx *bolt.DB) error {
return nil
}
func (ts *TxStore) MarkSpent(op *wire.OutPoint, h int32, stx *wire.MsgTx) error {
func (ts *TxStore) MarkSpent(ut Utxo, h int32, stx *wire.MsgTx) error {
// we write in key = outpoint (32 hash, 4 index)
// value = spending txid
// if we care about the spending tx we can store that in another bucket.
return ts.StateDB.Update(func(tx *bolt.Tx) error {
old := tx.Bucket(BKTOld)
opb, err := outPointToBytes(op)
var st Stxo
st.Utxo = ut
st.SpendHeight = h
st.SpendTxid = stx.TxSha()
return ts.StateDB.Update(func(btx *bolt.Tx) error {
duf := btx.Bucket(BKTUtxos)
old := btx.Bucket(BKTStxos)
txns := btx.Bucket(BKTTxns)
opb, err := outPointToBytes(&st.Op)
if err != nil {
return err
}
var buf bytes.Buffer
err = binary.Write(&buf, binary.BigEndian, h)
err = duf.Delete(opb) // not utxo anymore
if err != nil {
return err
}
stxb, err := st.ToBytes()
if err != nil {
return err
}
err = old.Put(opb, stxb) // write k:v outpoint:stxo bytes
if err != nil {
return err
}
// store spending tx
sha := stx.TxSha()
err = old.Put(opb, sha.Bytes()) // write k:v outpoint:txid
if err != nil {
return err
}
var buf bytes.Buffer
stx.Serialize(&buf)
txns.Put(sha.Bytes(), buf.Bytes())
return nil
})
}
@ -155,16 +198,16 @@ func (ts *TxStore) LoadFromDB() error {
if ts.rootPrivKey == nil {
return fmt.Errorf("LoadFromDB needs rootPrivKey loaded")
}
return ts.StateDB.View(func(tx *bolt.Tx) error {
duf := tx.Bucket(BKTUtxos)
return ts.StateDB.View(func(btx *bolt.Tx) error {
duf := btx.Bucket(BKTUtxos)
if duf == nil {
return fmt.Errorf("no duffel bag")
}
spent := tx.Bucket(BKTOld)
spent := btx.Bucket(BKTStxos)
if spent == nil {
return fmt.Errorf("no spenttx bucket")
}
state := tx.Bucket(BKTState)
state := btx.Bucket(BKTState)
if state == nil {
return fmt.Errorf("no state bucket")
}
@ -268,8 +311,8 @@ func UtxoFromBytes(b []byte) (Utxo, error) {
return u, fmt.Errorf("nil input slice")
}
buf := bytes.NewBuffer(b)
if buf.Len() < 52 { // minimum 52 bytes with no pkscript
return u, fmt.Errorf("Got %d bytes for sender, expect > 52", buf.Len())
if buf.Len() < 52 { // utxos are 52 bytes
return u, fmt.Errorf("Got %d bytes for utxo, expect 52", buf.Len())
}
// read 32 byte txid
err := u.Op.Hash.SetBytes(buf.Next(32))
@ -298,3 +341,93 @@ func UtxoFromBytes(b []byte) (Utxo, error) {
}
return u, nil
}
// ToBytes turns an Stxo into some bytes.
// outpoint txid, outpoint idx, height, key idx, amt, spendheight, spendtxid
func (s *Stxo) ToBytes() ([]byte, error) {
var buf bytes.Buffer
// write 32 byte txid of the utxo
_, err := buf.Write(s.Op.Hash.Bytes())
if err != nil {
return nil, err
}
// write 4 byte outpoint index within the tx to spend
err = binary.Write(&buf, binary.BigEndian, s.Op.Index)
if err != nil {
return nil, err
}
// write 4 byte height of utxo
err = binary.Write(&buf, binary.BigEndian, s.AtHeight)
if err != nil {
return nil, err
}
// write 4 byte key index of utxo
err = binary.Write(&buf, binary.BigEndian, s.KeyIdx)
if err != nil {
return nil, err
}
// write 8 byte amount of money at the utxo
err = binary.Write(&buf, binary.BigEndian, s.Value)
if err != nil {
return nil, err
}
// write 4 byte height where the txo was spent
err = binary.Write(&buf, binary.BigEndian, s.SpendHeight)
if err != nil {
return nil, err
}
// write 32 byte txid of the spending transaction
_, err = buf.Write(s.SpendTxid.Bytes())
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// StxoFromBytes turns bytes into a Stxo.
func StxoFromBytes(b []byte) (Stxo, error) {
var s Stxo
if b == nil {
return s, fmt.Errorf("nil input slice")
}
buf := bytes.NewBuffer(b)
if buf.Len() < 88 { // stxos are 88 bytes
return s, fmt.Errorf("Got %d bytes for stxo, expect 88", buf.Len())
}
// read 32 byte txid
err := s.Op.Hash.SetBytes(buf.Next(32))
if err != nil {
return s, err
}
// read 4 byte outpoint index within the tx to spend
err = binary.Read(buf, binary.BigEndian, &s.Op.Index)
if err != nil {
return s, err
}
// read 4 byte height of utxo
err = binary.Read(buf, binary.BigEndian, &s.AtHeight)
if err != nil {
return s, err
}
// read 4 byte key index of utxo
err = binary.Read(buf, binary.BigEndian, &s.KeyIdx)
if err != nil {
return s, err
}
// read 8 byte amount of money at the utxo
err = binary.Read(buf, binary.BigEndian, &s.Value)
if err != nil {
return s, err
}
// read 4 byte spend height
err = binary.Read(buf, binary.BigEndian, &s.SpendHeight)
if err != nil {
return s, err
}
// read 32 byte txid
err = s.SpendTxid.SetBytes(buf.Next(32))
if err != nil {
return s, err
}
return s, nil
}