diff --git a/cleanupDB.sh b/cleanupDB.sh index a14cbd9bd..caf5dc037 100755 --- a/cleanupDB.sh +++ b/cleanupDB.sh @@ -7,4 +7,5 @@ cd ${PROJECT_DIR} && find . -name '*.db-shm' -exec rm -f {} \; cd ${PROJECT_DIR} && find . -name '*.db-wal' -exec rm -f {} \; cd ${PROJECT_DIR} && find . -name 'db.meta' -exec rm -f {} \; cd ${PROJECT_DIR} && find . -name 'public.keystore' -exec rm -f {} \; -cd ${PROJECT_DIR} && find . -name '*.public.keystore' -exec rm -f {} \; \ No newline at end of file +cd ${PROJECT_DIR} && find . -name '*.public.keystore' -exec rm -f {} \; +cd ${PROJECT_DIR} && find . -type d -name '*.ldb' -prune -exec rm -rf {} \; diff --git a/cmd/cql-minerd/integration_test.go b/cmd/cql-minerd/integration_test.go index 6c62ddc67..61fbf14e5 100644 --- a/cmd/cql-minerd/integration_test.go +++ b/cmd/cql-minerd/integration_test.go @@ -72,7 +72,6 @@ func startNodes() { log.Fatalf("wait for port ready timeout: %v", err) } - utils.CleanupDB() // start 3bps var cmd *utils.CMD if cmd, err = utils.RunCommandNB( diff --git a/cmd/cql-observer/observation_test.go b/cmd/cql-observer/observation_test.go index 6d16e7285..4db8dc2d9 100644 --- a/cmd/cql-observer/observation_test.go +++ b/cmd/cql-observer/observation_test.go @@ -73,8 +73,6 @@ func startNodes() { log.Fatalf("wait for port ready timeout: %v", err) } - utils.CleanupDB() - // start 3bps var cmd *utils.CMD if cmd, err = utils.RunCommandNB( diff --git a/sqlchain/chain.go b/sqlchain/chain.go index 894461792..8984fd8aa 100644 --- a/sqlchain/chain.go +++ b/sqlchain/chain.go @@ -37,7 +37,6 @@ import ( "github.com/CovenantSQL/CovenantSQL/utils" "github.com/CovenantSQL/CovenantSQL/utils/log" wt "github.com/CovenantSQL/CovenantSQL/worker/types" - "github.com/coreos/bbolt" "github.com/pkg/errors" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/opt" @@ -45,14 +44,12 @@ import ( ) var ( - metaBucket = [4]byte{0x0, 0x0, 0x0, 0x0} - metaStateKey = []byte("covenantsql-state") - metaBlockIndexBucket = []byte("covenantsql-block-index-bucket") - metaHeightIndexBucket = []byte("covenantsql-query-height-index-bucket") - metaRequestIndexBucket = []byte("covenantsql-query-request-index-bucket") - metaResponseIndexBucket = []byte("covenantsql-query-response-index-bucket") - metaAckIndexBucket = [4]byte{'Q', 'A', 'C', 'K'} - leveldbConf = opt.Options{} + metaState = [4]byte{'S', 'T', 'A', 'T'} + metaBlockIndex = [4]byte{'B', 'L', 'C', 'K'} + metaRequestIndex = [4]byte{'R', 'E', 'Q', 'U'} + metaResponseIndex = [4]byte{'R', 'E', 'S', 'P'} + metaAckIndex = [4]byte{'Q', 'A', 'C', 'K'} + leveldbConf = opt.Options{} ) func init() { @@ -74,10 +71,28 @@ func keyToHeight(k []byte) int32 { return int32(binary.BigEndian.Uint32(k)) } +// keyWithSymbolToHeight converts a height back from a key(ack/resp/req/block) in bytes. +// ack key: +// ['Q', 'A', 'C', 'K', height, hash] +// resp key: +// ['R', 'E', 'S', 'P', height, hash] +// req key: +// ['R', 'E', 'Q', 'U', height, hash] +// block key: +// ['B', 'L', 'C', 'K', height, hash] +func keyWithSymbolToHeight(k []byte) int32 { + if len(k) < 8 { + return -1 + } + return int32(binary.BigEndian.Uint32(k[4:])) +} + // Chain represents a sql-chain. type Chain struct { - db *bolt.DB - ldb *leveldb.DB + // bdb stores state and block + bdb *leveldb.DB + // tdb stores ack/request/response + tdb *leveldb.DB bi *blockIndex qi *queryIndex cl *rpc.Caller @@ -106,7 +121,7 @@ func NewChain(c *Config) (chain *Chain, err error) { // TODO(leventeliu): this is a rough solution, you may also want to clean database file and // force rebuilding. var fi os.FileInfo - if fi, err = os.Stat(c.DataFile); err == nil && fi.Mode().IsRegular() { + if fi, err = os.Stat(c.DataFile + "-block-state.ldb"); err == nil && fi.Mode().IsDir() { return LoadChain(c) } @@ -115,42 +130,26 @@ func NewChain(c *Config) (chain *Chain, err error) { return } - // Open DB file - db, err := bolt.Open(c.DataFile, 0600, nil) + // Open LevelDB for block and state + bdbFile := c.DataFile + "-block-state.ldb" + bdb, err := leveldb.OpenFile(bdbFile, &leveldbConf) if err != nil { + err = errors.Wrapf(err, "open leveldb %s", bdbFile) return } - // Create buckets for chain meta - if err = db.Update(func(tx *bolt.Tx) (err error) { - bucket, err := tx.CreateBucketIfNotExists(metaBucket[:]) - - if err != nil { - return - } - - if _, err = bucket.CreateBucketIfNotExists(metaBlockIndexBucket); err != nil { - return - } - - _, err = bucket.CreateBucketIfNotExists(metaHeightIndexBucket) - return - }); err != nil { - return - } - - // Open LevelDB - ldbFile := c.DataFile + ".ldb" - ldb, err := leveldb.OpenFile(ldbFile, &leveldbConf) + // Open LevelDB for ack/request/response + tdbFile := c.DataFile + "-ack-req-resp.ldb" + tdb, err := leveldb.OpenFile(tdbFile, &leveldbConf) if err != nil { - err = errors.Wrapf(err, "open leveldb %s", ldbFile) + err = errors.Wrapf(err, "open leveldb %s", tdbFile) return } // Create chain state chain = &Chain{ - db: db, - ldb: ldb, + bdb: bdb, + tdb: tdb, bi: newBlockIndex(c), qi: newQueryIndex(), cl: rpc.NewCaller(), @@ -176,24 +175,27 @@ func NewChain(c *Config) (chain *Chain, err error) { // LoadChain loads the chain state from the specified database and rebuilds a memory index. func LoadChain(c *Config) (chain *Chain, err error) { - // Open DB file - db, err := bolt.Open(c.DataFile, 0600, nil) + + // Open LevelDB for block and state + bdbFile := c.DataFile + "-block-state.ldb" + bdb, err := leveldb.OpenFile(bdbFile, &leveldbConf) if err != nil { + err = errors.Wrapf(err, "open leveldb %s", bdbFile) return } - // Open LevelDB - ldbFile := c.DataFile + ".ldb" - ldb, err := leveldb.OpenFile(ldbFile, &leveldbConf) + // Open LevelDB for ack/request/response + tdbFile := c.DataFile + "-ack-req-resp.ldb" + tdb, err := leveldb.OpenFile(tdbFile, &leveldbConf) if err != nil { - err = errors.Wrapf(err, "open leveldb %s", ldbFile) + err = errors.Wrapf(err, "open leveldb %s", tdbFile) return } // Create chain state chain = &Chain{ - db: db, - ldb: ldb, + bdb: bdb, + tdb: tdb, bi: newBlockIndex(c), qi: newQueryIndex(), cl: rpc.NewCaller(), @@ -210,145 +212,133 @@ func LoadChain(c *Config) (chain *Chain, err error) { replCh: make(chan struct{}), } - err = chain.db.View(func(tx *bolt.Tx) (err error) { - // Read state struct - meta := tx.Bucket(metaBucket[:]) - metaEnc := meta.Get(metaStateKey) - if metaEnc == nil { - return ErrMetaStateNotFound - } - st := &state{} - if err = utils.DecodeMsgPack(metaEnc, st); err != nil { - return err + // Read state struct + stateEnc, err := chain.bdb.Get(metaState[:], nil) + if err != nil { + return nil, err + } + st := &state{} + if err = utils.DecodeMsgPack(stateEnc, st); err != nil { + return nil, err + } + + log.WithFields(log.Fields{ + "peer": chain.rt.getPeerInfoString(), + "state": st, + }).Debug("Loading state from database") + + // Read blocks and rebuild memory index + var last *blockNode + var index int32 + // TODO(lambda): select a better init length + nodes := make([]blockNode, 100) + blockIter := chain.bdb.NewIterator(util.BytesPrefix(metaBlockIndex[:]), nil) + defer blockIter.Release() + for blockIter.Next() { + k := blockIter.Key() + v := blockIter.Value() + + block := &ct.Block{} + + if err = utils.DecodeMsgPack(v, block); err != nil { + err = errors.Wrapf(err, "block height %d, key index %s", keyWithSymbolToHeight(k), string(k)) + return } log.WithFields(log.Fields{ "peer": chain.rt.getPeerInfoString(), - "state": st, - }).Debug("Loading state from database") - - // Read blocks and rebuild memory index - var last *blockNode - var index int32 - blocks := meta.Bucket(metaBlockIndexBucket) - nodes := make([]blockNode, blocks.Stats().KeyN) + "block": block.BlockHash().String(), + }).Debug("Loading block from database") + parent := (*blockNode)(nil) - if err = blocks.ForEach(func(k, v []byte) (err error) { - block := &ct.Block{} - - if err = utils.DecodeMsgPack(v, block); err != nil { + if last == nil { + if err = block.VerifyAsGenesis(); err != nil { return } - log.WithFields(log.Fields{ - "peer": chain.rt.getPeerInfoString(), - "block": block.BlockHash().String(), - }).Debug("Loading block from database") - parent := (*blockNode)(nil) - - if last == nil { - if err = block.VerifyAsGenesis(); err != nil { - return - } - - // Set constant fields from genesis block - chain.rt.setGenesis(block) - } else if block.ParentHash().IsEqual(&last.hash) { - if err = block.Verify(); err != nil { - return - } + // Set constant fields from genesis block + chain.rt.setGenesis(block) + } else if block.ParentHash().IsEqual(&last.hash) { + if err = block.Verify(); err != nil { + return + } - parent = last - } else { - parent = chain.bi.lookupNode(block.ParentHash()) + parent = last + } else { + parent = chain.bi.lookupNode(block.ParentHash()) - if parent == nil { - return ErrParentNotFound - } + if parent == nil { + return nil, ErrParentNotFound } + } + + height := chain.rt.getHeightFromTime(block.Timestamp()) + nodes[index].initBlockNode(height, block, parent) + chain.bi.addBlock(&nodes[index]) + last = &nodes[index] + index++ + } + if err = blockIter.Error(); err != nil { + err = errors.Wrap(err, "load block") + return + } - height := chain.rt.getHeightFromTime(block.Timestamp()) - nodes[index].initBlockNode(height, block, parent) - chain.bi.addBlock(&nodes[index]) - last = &nodes[index] - index++ + // Set chain state + st.node = last + chain.rt.setHead(st) + + // Read queries and rebuild memory index + respIter := chain.tdb.NewIterator(util.BytesPrefix(metaResponseIndex[:]), nil) + defer respIter.Release() + for respIter.Next() { + k := respIter.Key() + v := respIter.Value() + h := keyWithSymbolToHeight(k) + var resp = &wt.SignedResponseHeader{} + if err = utils.DecodeMsgPack(v, resp); err != nil { + err = errors.Wrapf(err, "load resp, height %d, index %s", h, string(k)) return - }); err != nil { + } + log.WithFields(log.Fields{ + "height": h, + "header": resp.HeaderHash.String(), + }).Debug("Loaded new resp header") + err = chain.qi.addResponse(h, resp) + if err != nil { + err = errors.Wrapf(err, "load resp, height %d, hash %s", h, resp.HeaderHash.String()) return } + } + if err = respIter.Error(); err != nil { + err = errors.Wrap(err, "load resp") + return + } - // Set chain state - st.node = last - chain.rt.setHead(st) - - // Read queries and rebuild memory index - heights := meta.Bucket(metaHeightIndexBucket) - - if err = heights.ForEach(func(k, v []byte) (err error) { - h := keyToHeight(k) - - if resps := heights.Bucket(k).Bucket( - metaResponseIndexBucket); resps != nil { - if err = resps.ForEach(func(k []byte, v []byte) (err error) { - var resp = &wt.SignedResponseHeader{} - if err = utils.DecodeMsgPack(v, resp); err != nil { - return - } - log.WithFields(log.Fields{ - "height": h, - "header": resp.HeaderHash.String(), - }).Debug("Loaded new resp header") - return chain.qi.addResponse(h, resp) - }); err != nil { - return - } - } - - ldbKey := make([]byte, 0, len(metaAckIndexBucket)+len(k)+hash.HashSize) - ldbKey = append(append(ldbKey, metaAckIndexBucket[:]...), k...) - iter := ldb.NewIterator(util.BytesPrefix(ldbKey), nil) - defer iter.Release() - for iter.Next() { - var ack = &wt.SignedAckHeader{} - if err = utils.DecodeMsgPack(iter.Value(), ack); err != nil { - return - } - log.WithFields(log.Fields{ - "height": h, - "header": ack.HeaderHash.String(), - }).Debug("Loaded new ack header") - return chain.qi.addAck(h, ack) - } - err = iter.Error() - if err != nil { - err = errors.Wrap(err, "load new ack header") - return - } - - //acks := heights.Bucket(k).Bucket(metaAckIndexBucket) - //if acks != nil { - // if err = acks.ForEach(func(k []byte, v []byte) (err error) { - // var ack = &wt.SignedAckHeader{} - // if err = utils.DecodeMsgPack(v, ack); err != nil { - // return - // } - // log.WithFields(log.Fields{ - // "height": h, - // "header": ack.HeaderHash.String(), - // }).Debug("Loaded new ack header") - // return chain.qi.addAck(h, ack) - // }); err != nil { - // return - // } - //} - + ackIter := chain.tdb.NewIterator(util.BytesPrefix(metaAckIndex[:]), nil) + defer ackIter.Release() + for ackIter.Next() { + k := ackIter.Key() + v := ackIter.Value() + h := keyWithSymbolToHeight(k) + var ack = &wt.SignedAckHeader{} + if err = utils.DecodeMsgPack(v, ack); err != nil { + err = errors.Wrapf(err, "load ack, height %d, index %s", h, string(k)) return - }); err != nil { + } + log.WithFields(log.Fields{ + "height": h, + "header": ack.HeaderHash.String(), + }).Debug("Loaded new ack header") + err = chain.qi.addAck(h, ack) + if err != nil { + err = errors.Wrapf(err, "load ack, height %d, hash %s", h, ack.HeaderHash.String()) return } - + } + if err = respIter.Error(); err != nil { + err = errors.Wrap(err, "load ack") return - }) + } return } @@ -374,21 +364,26 @@ func (c *Chain) pushBlock(b *ct.Block) (err error) { } // Update in transaction - err = c.db.Update(func(tx *bolt.Tx) (err error) { - if err = tx.Bucket(metaBucket[:]).Put(metaStateKey, encState.Bytes()); err != nil { - return - } - - if err = tx.Bucket(metaBucket[:]).Bucket(metaBlockIndexBucket).Put( - node.indexKey(), encBlock.Bytes()); err != nil { - return - } - - c.rt.setHead(st) - c.bi.addBlock(node) - c.qi.setSignedBlock(h, b) + t, err := c.bdb.OpenTransaction() + if err = t.Put(metaState[:], encState.Bytes(), nil); err != nil { + err = errors.Wrapf(err, "put %s", string(metaState[:])) + t.Discard() + return + } + blockKey := utils.ConcatAll(metaBlockIndex[:], node.indexKey()) + if err = t.Put(blockKey, encBlock.Bytes(), nil); err != nil { + err = errors.Wrapf(err, "put %s", string(node.indexKey())) + t.Discard() return - }) + } + if err = t.Commit(); err != nil { + err = errors.Wrapf(err, "commit error") + t.Discard() + return + } + c.rt.setHead(st) + c.bi.addBlock(node) + c.qi.setSignedBlock(h, b) if err == nil { log.WithFields(log.Fields{ @@ -413,27 +408,6 @@ func (c *Chain) pushBlock(b *ct.Block) (err error) { return } -func ensureHeight(tx *bolt.Tx, k []byte) (hb *bolt.Bucket, err error) { - b := tx.Bucket(metaBucket[:]).Bucket(metaHeightIndexBucket) - - if hb = b.Bucket(k); hb == nil { - // Create and initialize bucket in new height - if hb, err = b.CreateBucketIfNotExists(k); err != nil { - return - } - - if _, err = hb.CreateBucketIfNotExists(metaRequestIndexBucket); err != nil { - return - } - - if _, err = hb.CreateBucketIfNotExists(metaResponseIndexBucket); err != nil { - return - } - } - - return -} - // pushResponedQuery pushes a responsed, signed and verified query into the chain. func (c *Chain) pushResponedQuery(resp *wt.SignedResponseHeader) (err error) { h := c.rt.getHeightFromTime(resp.Request.Timestamp) @@ -444,25 +418,23 @@ func (c *Chain) pushResponedQuery(resp *wt.SignedResponseHeader) (err error) { return } - return c.db.Update(func(tx *bolt.Tx) (err error) { - heightBucket, err := ensureHeight(tx, k) - - if err != nil { - return - } + tdbKey := utils.ConcatAll(metaResponseIndex[:], k, resp.HeaderHash[:]) + if err = c.tdb.Put(tdbKey, enc.Bytes(), nil); err != nil { + err = errors.Wrapf(err, "put response %d %s", h, resp.HeaderHash.String()) + return + } - if err = heightBucket.Bucket(metaResponseIndexBucket).Put( - resp.HeaderHash[:], enc.Bytes()); err != nil { - return - } + if err = c.qi.addResponse(h, resp); err != nil { + err = errors.Wrapf(err, "add resp h %d hash %s", h, resp.HeaderHash) + return err + } - // Always put memory changes which will not be affected by rollback after DB operations - return c.qi.addResponse(h, resp) - }) + return } // pushAckedQuery pushes a acknowledged, signed and verified query into the chain. func (c *Chain) pushAckedQuery(ack *wt.SignedAckHeader) (err error) { + log.Debugf("push ack %s", ack.HeaderHash.String()) h := c.rt.getHeightFromTime(ack.SignedResponseHeader().Timestamp) k := heightToKey(h) var enc *bytes.Buffer @@ -471,27 +443,19 @@ func (c *Chain) pushAckedQuery(ack *wt.SignedAckHeader) (err error) { return } - return c.db.Update(func(tx *bolt.Tx) (err error) { - _, err = ensureHeight(tx, k) - if err != nil { - return - } + tdbKey := utils.ConcatAll(metaAckIndex[:], k, ack.HeaderHash[:]) - ldbKey := make([]byte, 0, len(metaAckIndexBucket)+len(k)+hash.HashSize) - ldbKey = append(append(append(ldbKey, metaAckIndexBucket[:]...), k...), ack.HeaderHash[:]...) - err = c.ldb.Put(ldbKey, enc.Bytes(), nil) - if err != nil { - err = errors.Wrapf(err, "put %s %d %s", string(metaAckIndexBucket[:]), h, ack.HeaderHash) - return - } + if err = c.tdb.Put(tdbKey, enc.Bytes(), nil); err != nil { + err = errors.Wrapf(err, "put ack %d %s", h, ack.HeaderHash.String()) + return + } - // Always put memory changes which will not be affected by rollback after DB operations - if err = c.qi.addAck(h, ack); err != nil { - return - } + if err = c.qi.addAck(h, ack); err != nil { + err = errors.Wrapf(err, "add ack h %d hash %s", h, ack.HeaderHash) + return err + } - return - }) + return } // produceBlock prepares, signs and advises the pending block to the orther peers. @@ -876,14 +840,13 @@ func (c *Chain) Stop() (err error) { "peer": c.rt.getPeerInfoString(), "time": c.rt.getChainTimeString(), }).Debug("Chain service stopped") - // Close database file - err = c.db.Close() + // Close LevelDB file + err = c.bdb.Close() log.WithFields(log.Fields{ "peer": c.rt.getPeerInfoString(), "time": c.rt.getChainTimeString(), }).Debug("Chain database closed") - // Close LevelDB file - err = c.ldb.Close() + err = c.tdb.Close() log.WithFields(log.Fields{ "peer": c.rt.getPeerInfoString(), "time": c.rt.getChainTimeString(), @@ -894,15 +857,20 @@ func (c *Chain) Stop() (err error) { // FetchBlock fetches the block at specified height from local cache. func (c *Chain) FetchBlock(height int32) (b *ct.Block, err error) { if n := c.rt.getHead().node.ancestor(height); n != nil { - k := n.indexKey() - err = c.db.View(func(tx *bolt.Tx) (err error) { - if v := tx.Bucket(metaBucket[:]).Bucket(metaBlockIndexBucket).Get(k); v != nil { - b = &ct.Block{} - err = utils.DecodeMsgPack(v, b) - } + k := utils.ConcatAll(metaBlockIndex[:], n.indexKey()) + var v []byte + v, err = c.bdb.Get(k, nil) + if err != nil { + err = errors.Wrapf(err, "fetch block %s", string(k)) + return + } + b = &ct.Block{} + err = utils.DecodeMsgPack(v, b) + if err != nil { + err = errors.Wrapf(err, "fetch block %s", string(k)) return - }) + } } return @@ -912,33 +880,31 @@ func (c *Chain) FetchBlock(height int32) (b *ct.Block, err error) { func (c *Chain) FetchAckedQuery(height int32, header *hash.Hash) ( ack *wt.SignedAckHeader, err error, ) { - if ack, err = c.qi.getAck(height, header); err == nil && ack != nil { - return - } - err = c.db.View(func(tx *bolt.Tx) (err error) { - var hb = tx.Bucket(metaBucket[:]).Bucket(metaHeightIndexBucket) + if ack, err = c.qi.getAck(height, header); err != nil || ack == nil { for h := height - c.rt.queryTTL - 1; h <= height; h++ { k := heightToKey(h) - if ab := hb.Bucket(heightToKey(h)); ab != nil { - ldbKey := make([]byte, 0, len(metaAckIndexBucket)+len(k)+hash.HashSize) - ldbKey = append(append(append(ldbKey, metaAckIndexBucket[:]...), k...), header[:]...) - v, _ := c.ldb.Get(ldbKey, nil) - //v := ab.Bucket(metaAckIndexBucket).Get(header[:]) - if v != nil { - var dec = &wt.SignedAckHeader{} - if err = utils.DecodeMsgPack(v, dec); err != nil { - return - } - ack = dec - break + ackKey := utils.ConcatAll(metaAckIndex[:], k, header[:]) + var v []byte + if v, err = c.tdb.Get(ackKey, nil); err != nil { + // if err == leveldb.ErrNotFound, just loop for next h + if err != leveldb.ErrNotFound { + err = errors.Wrapf(err, "fetch ack in height %d hash %s", h, header.String()) + return } + } else { + var dec = &wt.SignedAckHeader{} + if err = utils.DecodeMsgPack(v, dec); err != nil { + err = errors.Wrapf(err, "fetch ack in height %d hash %s", h, header.String()) + return + } + ack = dec + break } } - if ack == nil { - err = ErrAckQueryNotFound - } - return - }) + } + if ack == nil { + err = errors.Wrapf(ErrAckQueryNotFound, "fetch ack not found") + } return } @@ -1079,10 +1045,12 @@ func (c *Chain) CheckAndPushNewBlock(block *ct.Block) (err error) { func (c *Chain) VerifyAndPushResponsedQuery(resp *wt.SignedResponseHeader) (err error) { // TODO(leventeliu): check resp. if c.rt.queryTimeIsExpired(resp.Timestamp) { - return ErrQueryExpired + err = errors.Wrapf(ErrQueryExpired, "Verify response query, min valid height %d, response height %d", c.rt.getMinValidHeight(), c.rt.getHeightFromTime(resp.Timestamp)) + return } if err = resp.Verify(); err != nil { + err = errors.Wrapf(err, "") return } @@ -1093,7 +1061,8 @@ func (c *Chain) VerifyAndPushResponsedQuery(resp *wt.SignedResponseHeader) (err func (c *Chain) VerifyAndPushAckedQuery(ack *wt.SignedAckHeader) (err error) { // TODO(leventeliu): check ack. if c.rt.queryTimeIsExpired(ack.SignedResponseHeader().Timestamp) { - return ErrQueryExpired + err = errors.Wrapf(ErrQueryExpired, "Verify ack query, min valid height %d, ack height %d", c.rt.getMinValidHeight(), c.rt.getHeightFromTime(ack.Timestamp)) + return } if err = ack.Verify(); err != nil { diff --git a/sqlchain/chain_test.go b/sqlchain/chain_test.go index 907129cb0..5afd1f227 100644 --- a/sqlchain/chain_test.go +++ b/sqlchain/chain_test.go @@ -184,6 +184,8 @@ func TestMultiChain(t *testing.T) { config: config, chain: chain, } + + } // Create a master BP for RPC test diff --git a/sqlchain/queryindex.go b/sqlchain/queryindex.go index 7a8a28fd0..27fa77ca9 100644 --- a/sqlchain/queryindex.go +++ b/sqlchain/queryindex.go @@ -19,6 +19,7 @@ package sqlchain // TODO(leventeliu): use pooled objects to speed up this index. import ( + "github.com/pkg/errors" "sync" "github.com/CovenantSQL/CovenantSQL/crypto/hash" @@ -482,7 +483,7 @@ func (i *queryIndex) checkAckFromBlock(h int32, b *hash.Hash, ack *hash.Hash) ( l := i.getBarrier() if h < l { - err = ErrQueryExpired + err = errors.Wrapf(ErrQueryExpired, "check Ack, height %d, barrier %d", h, l) return } @@ -527,7 +528,7 @@ func (i *queryIndex) getAck(h int32, header *hash.Hash) (ack *wt.SignedAckHeader b := i.getBarrier() if h < b { - err = ErrQueryExpired + err = errors.Wrapf(ErrQueryExpired, "get Ack, height %d, barrier %d", h, b) return } diff --git a/sqlchain/queryindex_test.go b/sqlchain/queryindex_test.go index 23d6fb0b2..609b55b35 100644 --- a/sqlchain/queryindex_test.go +++ b/sqlchain/queryindex_test.go @@ -17,6 +17,7 @@ package sqlchain import ( + "github.com/pkg/errors" "math/rand" "reflect" "testing" @@ -130,7 +131,7 @@ func TestCheckAckFromBlock(t *testing.T) { if _, err := qi.checkAckFromBlock( 0, b1.BlockHash(), b1.Queries[0], - ); err != ErrQueryExpired { + ); errors.Cause(err) != ErrQueryExpired { t.Fatalf("Unexpected error: %v", err) } @@ -248,11 +249,11 @@ func TestGetAck(t *testing.T) { qi := newQueryIndex() qh := &hash.Hash{} - if _, err := qi.getAck(-1, qh); err != ErrQueryExpired { + if _, err := qi.getAck(-1, qh); errors.Cause(err) != ErrQueryExpired { t.Fatalf("Unexpected error: %v", err) } - if _, err := qi.getAck(0, qh); err != ErrQueryNotCached { + if _, err := qi.getAck(0, qh); errors.Cause(err) != ErrQueryNotCached { t.Fatalf("Unexpected error: %v", err) } } diff --git a/test/GNTE/GNTE b/test/GNTE/GNTE index 956452ace..51de88917 160000 --- a/test/GNTE/GNTE +++ b/test/GNTE/GNTE @@ -1 +1 @@ -Subproject commit 956452aceb2afdeb3b72ea041aebf1bb367ede56 +Subproject commit 51de889172de377ec104900b9566ef8c8510525e diff --git a/utils/bytes.go b/utils/bytes.go new file mode 100644 index 000000000..5f0b77459 --- /dev/null +++ b/utils/bytes.go @@ -0,0 +1,33 @@ +/* + * Copyright 2018 The CovenantSQL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +// ConcatAll concatenate several bytes slice into one. +func ConcatAll(args ...[]byte) []byte { + var bLen int + for i := range args { + bLen += len(args[i]) + } + + key := make([]byte, bLen) + position := 0 + for i := range args { + copy(key[position:], args[i]) + position += len(args[i]) + } + return key +} diff --git a/utils/bytes_test.go b/utils/bytes_test.go new file mode 100644 index 000000000..9276681de --- /dev/null +++ b/utils/bytes_test.go @@ -0,0 +1,45 @@ +/* + * Copyright 2018 The CovenantSQL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +import ( + "testing" + + "github.com/CovenantSQL/CovenantSQL/utils/log" + . "github.com/smartystreets/goconvey/convey" +) + +func TestNewLevelDBKey(t *testing.T) { + Convey("new bytes", t, func() { + log.SetLevel(log.DebugLevel) + So(ConcatAll(nil), ShouldResemble, []byte{}) + So(ConcatAll([]byte{}), ShouldResemble, []byte{}) + So(ConcatAll([]byte{'0'}, []byte{'1'}), ShouldResemble, []byte{'0', '1'}) + So(ConcatAll([]byte{'0'}, nil), ShouldResemble, []byte{'0'}) + So(ConcatAll(nil, []byte{'0'}), ShouldResemble, []byte{'0'}) + So(ConcatAll([]byte{'0', '1', '2', '3'}, []byte{'a', 'b', 'c', 'd', 'e'}, []byte{'x', 'y', 'z'}), + ShouldResemble, []byte{'0', '1', '2', '3', 'a', 'b', 'c', 'd', 'e', 'x', 'y', 'z'}) + So(ConcatAll([]byte{'0', '1', '2', '3'}, nil, []byte{'x', 'y', 'z'}), + ShouldResemble, []byte{'0', '1', '2', '3', 'x', 'y', 'z'}) + So(ConcatAll([]byte{'0', '1', '2', '3'}, []byte{}, []byte{'x', 'y', 'z'}), + ShouldResemble, []byte{'0', '1', '2', '3','x', 'y', 'z'}) + So(ConcatAll(nil, []byte{'0', '1', '2', '3'}, nil, []byte{'x', 'y', 'z'}), + ShouldResemble, []byte{'0', '1', '2', '3', 'x', 'y', 'z'}) + So(ConcatAll([]byte{}, []byte{'0', '1', '2', '3'}, nil, []byte{'x', 'y', 'z'}, nil), + ShouldResemble, []byte{'0', '1', '2', '3', 'x', 'y', 'z'}) + }) +}