From b36121be32b715172a93f15582a947333f10dc2d Mon Sep 17 00:00:00 2001 From: lambda Date: Tue, 30 Oct 2018 22:24:25 +0800 Subject: [PATCH 1/8] Change boltdb to lldb --- sqlchain/chain.go | 439 ++++++++++++++++++++++---------------------- utils/bytes.go | 29 +++ utils/bytes_test.go | 34 ++++ 3 files changed, 285 insertions(+), 217 deletions(-) create mode 100644 utils/bytes.go create mode 100644 utils/bytes_test.go diff --git a/sqlchain/chain.go b/sqlchain/chain.go index e65b433f7..472e55f88 100644 --- a/sqlchain/chain.go +++ b/sqlchain/chain.go @@ -46,12 +46,17 @@ import ( var ( metaBucket = [4]byte{0x0, 0x0, 0x0, 0x0} + metaState = [4]byte{'S', 'T', 'A', 'T'} metaStateKey = []byte("covenantsql-state") metaBlockIndexBucket = []byte("covenantsql-block-index-bucket") + metaBlockIndex = [4]byte{'B', 'L', 'C', 'K'} metaHeightIndexBucket = []byte("covenantsql-query-height-index-bucket") metaRequestIndexBucket = []byte("covenantsql-query-request-index-bucket") + metaRequestIndex = [4]byte{'R', 'E', 'Q', 'U'} metaResponseIndexBucket = []byte("covenantsql-query-response-index-bucket") + metaResponseIndex = [4]byte{'R', 'E', 'S', 'P'} metaAckIndexBucket = [4]byte{'Q', 'A', 'C', 'K'} + metaAckIndex = [4]byte{'Q', 'A', 'C', 'K'} leveldbConf = opt.Options{} ) @@ -74,10 +79,28 @@ func keyToHeight(k []byte) int32 { return int32(binary.BigEndian.Uint32(k)) } +// keyWithSymbolToHeight converts a height back from a key(ack/resp/req/block) in bytes. +// ack key: +// ['Q', 'A', 'C', 'K', height, hash] +// resp key: +// ['R', 'E', 'S', 'P', height, hash] +// req key: +// ['R', 'E', 'Q', 'U', height, hash] +// block key: +// ['B', 'L', 'C', 'K', height, hash] +func keyWithSymbolToHeight(k []byte) int32 { + if len(k) < 8 { + return -1 + } + return int32(binary.BigEndian.Uint32(k[4:])) +} + // Chain represents a sql-chain. type Chain struct { - db *bolt.DB - ldb *leveldb.DB + // bdb stores state and block + bdb *leveldb.DB + // tdb stores ack/request/response + tdb *leveldb.DB bi *blockIndex qi *queryIndex cl *rpc.Caller @@ -139,18 +162,26 @@ func NewChain(c *Config) (chain *Chain, err error) { return } - // Open LevelDB - ldbFile := c.DataFile + ".ldb" - ldb, err := leveldb.OpenFile(ldbFile, &leveldbConf) + // Open LevelDB for block and state + bdbFile := c.DataFile + "-block-state.ldb" + bdb, err := leveldb.OpenFile(bdbFile, &leveldbConf) + if err != nil { + err = errors.Wrapf(err, "open leveldb %s", bdbFile) + return + } + + // Open LevelDB for ack/request/response + tdbFile := c.DataFile + "-ack-req-resp.ldb" + tdb, err := leveldb.OpenFile(tdbFile, &leveldbConf) if err != nil { - err = errors.Wrapf(err, "open leveldb %s", ldbFile) + err = errors.Wrapf(err, "open leveldb %s", tdbFile) return } // Create chain state chain = &Chain{ - db: db, - ldb: ldb, + bdb: bdb, + tdb: tdb, bi: newBlockIndex(c), qi: newQueryIndex(), cl: rpc.NewCaller(), @@ -176,24 +207,27 @@ func NewChain(c *Config) (chain *Chain, err error) { // LoadChain loads the chain state from the specified database and rebuilds a memory index. func LoadChain(c *Config) (chain *Chain, err error) { - // Open DB file - db, err := bolt.Open(c.DataFile, 0600, nil) + + // Open LevelDB for block and state + bdbFile := c.DataFile + "-block-state.ldb" + bdb, err := leveldb.OpenFile(bdbFile, &leveldbConf) if err != nil { + err = errors.Wrapf(err, "open leveldb %s", bdbFile) return } - // Open LevelDB - ldbFile := c.DataFile + ".ldb" - ldb, err := leveldb.OpenFile(ldbFile, &leveldbConf) + // Open LevelDB for ack/request/response + tdbFile := c.DataFile + "-ack-req-resp.ldb" + tdb, err := leveldb.OpenFile(tdbFile, &leveldbConf) if err != nil { - err = errors.Wrapf(err, "open leveldb %s", ldbFile) + err = errors.Wrapf(err, "open leveldb %s", tdbFile) return } // Create chain state chain = &Chain{ - db: db, - ldb: ldb, + bdb: bdb, + tdb: tdb, bi: newBlockIndex(c), qi: newQueryIndex(), cl: rpc.NewCaller(), @@ -210,145 +244,133 @@ func LoadChain(c *Config) (chain *Chain, err error) { replCh: make(chan struct{}), } - err = chain.db.View(func(tx *bolt.Tx) (err error) { - // Read state struct - meta := tx.Bucket(metaBucket[:]) - metaEnc := meta.Get(metaStateKey) - if metaEnc == nil { - return ErrMetaStateNotFound - } - st := &state{} - if err = utils.DecodeMsgPack(metaEnc, st); err != nil { - return err + // Read state struct + stateEnc, err := chain.bdb.Get(metaState[:], nil) + if err != nil { + return nil, err + } + st := &state{} + if err = utils.DecodeMsgPack(stateEnc, st); err != nil { + return nil, err + } + + log.WithFields(log.Fields{ + "peer": chain.rt.getPeerInfoString(), + "state": st, + }).Debug("Loading state from database") + + // Read blocks and rebuild memory index + var last *blockNode + var index int32 + // TODO(lambda): select a better init length + nodes := make([]blockNode, 100) + blockIter := chain.bdb.NewIterator(util.BytesPrefix(metaBlockIndex[:]), nil) + defer blockIter.Release() + for blockIter.Next() { + k := blockIter.Key() + v := blockIter.Value() + + block := &ct.Block{} + + if err = utils.DecodeMsgPack(v, block); err != nil { + err = errors.Wrapf(err, "block height %d, key index %s", keyWithSymbolToHeight(k), string(k)) + return } log.WithFields(log.Fields{ "peer": chain.rt.getPeerInfoString(), - "state": st, - }).Debug("Loading state from database") - - // Read blocks and rebuild memory index - var last *blockNode - var index int32 - blocks := meta.Bucket(metaBlockIndexBucket) - nodes := make([]blockNode, blocks.Stats().KeyN) - - if err = blocks.ForEach(func(k, v []byte) (err error) { - block := &ct.Block{} + "block": block.BlockHash().String(), + }).Debug("Loading block from database") + parent := (*blockNode)(nil) - if err = utils.DecodeMsgPack(v, block); err != nil { + if last == nil { + if err = block.VerifyAsGenesis(); err != nil { return } - log.WithFields(log.Fields{ - "peer": chain.rt.getPeerInfoString(), - "block": block.BlockHash().String(), - }).Debug("Loading block from database") - parent := (*blockNode)(nil) - - if last == nil { - if err = block.VerifyAsGenesis(); err != nil { - return - } - - // Set constant fields from genesis block - chain.rt.setGenesis(block) - } else if block.ParentHash().IsEqual(&last.hash) { - if err = block.Verify(); err != nil { - return - } + // Set constant fields from genesis block + chain.rt.setGenesis(block) + } else if block.ParentHash().IsEqual(&last.hash) { + if err = block.Verify(); err != nil { + return + } - parent = last - } else { - parent = chain.bi.lookupNode(block.ParentHash()) + parent = last + } else { + parent = chain.bi.lookupNode(block.ParentHash()) - if parent == nil { - return ErrParentNotFound - } + if parent == nil { + return nil, ErrParentNotFound } + } + + height := chain.rt.getHeightFromTime(block.Timestamp()) + nodes[index].initBlockNode(height, block, parent) + chain.bi.addBlock(&nodes[index]) + last = &nodes[index] + index++ + } + if err = blockIter.Error(); err != nil { + err = errors.Wrap(err, "load block") + return + } - height := chain.rt.getHeightFromTime(block.Timestamp()) - nodes[index].initBlockNode(height, block, parent) - chain.bi.addBlock(&nodes[index]) - last = &nodes[index] - index++ + // Set chain state + st.node = last + chain.rt.setHead(st) + + // Read queries and rebuild memory index + respIter := chain.tdb.NewIterator(util.BytesPrefix(metaResponseIndex[:]), nil) + defer respIter.Release() + for respIter.Next() { + k := respIter.Key() + v := respIter.Value() + h := keyWithSymbolToHeight(k) + var resp = &wt.SignedResponseHeader{} + if err = utils.DecodeMsgPack(v, resp); err != nil { + err = errors.Wrapf(err, "load resp, height %d, index %s", h, string(k)) return - }); err != nil { + } + log.WithFields(log.Fields{ + "height": h, + "header": resp.HeaderHash.String(), + }).Debug("Loaded new resp header") + err = chain.qi.addResponse(h, resp) + if err != nil { + err = errors.Wrapf(err, "load resp, height %d, hash %s", h, resp.HeaderHash.String()) return } + } + if err = respIter.Error(); err != nil { + err = errors.Wrap(err, "load resp") + return + } - // Set chain state - st.node = last - chain.rt.setHead(st) - - // Read queries and rebuild memory index - heights := meta.Bucket(metaHeightIndexBucket) - - if err = heights.ForEach(func(k, v []byte) (err error) { - h := keyToHeight(k) - - if resps := heights.Bucket(k).Bucket( - metaResponseIndexBucket); resps != nil { - if err = resps.ForEach(func(k []byte, v []byte) (err error) { - var resp = &wt.SignedResponseHeader{} - if err = utils.DecodeMsgPack(v, resp); err != nil { - return - } - log.WithFields(log.Fields{ - "height": h, - "header": resp.HeaderHash.String(), - }).Debug("Loaded new resp header") - return chain.qi.addResponse(h, resp) - }); err != nil { - return - } - } - - ldbKey := make([]byte, 0, len(metaAckIndexBucket)+len(k)+hash.HashSize) - ldbKey = append(append(ldbKey, metaAckIndexBucket[:]...), k...) - iter := ldb.NewIterator(util.BytesPrefix(ldbKey), nil) - defer iter.Release() - for iter.Next() { - var ack = &wt.SignedAckHeader{} - if err = utils.DecodeMsgPack(iter.Value(), ack); err != nil { - return - } - log.WithFields(log.Fields{ - "height": h, - "header": ack.HeaderHash.String(), - }).Debug("Loaded new ack header") - return chain.qi.addAck(h, ack) - } - err = iter.Error() - if err != nil { - err = errors.Wrap(err, "load new ack header") - return - } - - //acks := heights.Bucket(k).Bucket(metaAckIndexBucket) - //if acks != nil { - // if err = acks.ForEach(func(k []byte, v []byte) (err error) { - // var ack = &wt.SignedAckHeader{} - // if err = utils.DecodeMsgPack(v, ack); err != nil { - // return - // } - // log.WithFields(log.Fields{ - // "height": h, - // "header": ack.HeaderHash.String(), - // }).Debug("Loaded new ack header") - // return chain.qi.addAck(h, ack) - // }); err != nil { - // return - // } - //} - + ackIter := chain.tdb.NewIterator(util.BytesPrefix(metaAckIndex[:]), nil) + defer ackIter.Release() + for ackIter.Next() { + k := ackIter.Key() + v := ackIter.Value() + h := keyWithSymbolToHeight(k) + var ack = &wt.SignedAckHeader{} + if err = utils.DecodeMsgPack(v, ack); err != nil { + err = errors.Wrapf(err, "load ack, height %d, index %s", h, string(k)) return - }); err != nil { + } + log.WithFields(log.Fields{ + "height": h, + "header": ack.HeaderHash.String(), + }).Debug("Loaded new ack header") + err = chain.qi.addAck(h, ack) + if err != nil { + err = errors.Wrapf(err, "load ack, height %d, hash %s", h, ack.HeaderHash.String()) return } - + } + if err = respIter.Error(); err != nil { + err = errors.Wrap(err, "load ack") return - }) + } return } @@ -374,21 +396,25 @@ func (c *Chain) pushBlock(b *ct.Block) (err error) { } // Update in transaction - err = c.db.Update(func(tx *bolt.Tx) (err error) { - if err = tx.Bucket(metaBucket[:]).Put(metaStateKey, encState.Bytes()); err != nil { - return - } - - if err = tx.Bucket(metaBucket[:]).Bucket(metaBlockIndexBucket).Put( - node.indexKey(), encBlock.Bytes()); err != nil { - return - } - - c.rt.setHead(st) - c.bi.addBlock(node) - c.qi.setSignedBlock(h, b) + batch := new(leveldb.Batch) + batch.Put(metaState[:], encState.Bytes()) + t, err := c.bdb.OpenTransaction() + if err = t.Put(metaState[:], encState.Bytes(), nil); err != nil { + err = errors.Wrapf(err, "put %s", string(metaState[:])) + return + } + blockKey := utils.ConcatAll(metaBlockIndex[:], node.indexKey()) + if err = t.Put(blockKey, encBlock.Bytes(), nil); err != nil { + err = errors.Wrapf(err, "put %s", string(node.indexKey())) + return + } + if err = t.Commit(); err != nil { + err = errors.Wrapf(err, "commit error") return - }) + } + c.rt.setHead(st) + c.bi.addBlock(node) + c.qi.setSignedBlock(h, b) if err == nil { log.WithFields(log.Fields{ @@ -444,21 +470,13 @@ func (c *Chain) pushResponedQuery(resp *wt.SignedResponseHeader) (err error) { return } - return c.db.Update(func(tx *bolt.Tx) (err error) { - heightBucket, err := ensureHeight(tx, k) - - if err != nil { - return - } - - if err = heightBucket.Bucket(metaResponseIndexBucket).Put( - resp.HeaderHash[:], enc.Bytes()); err != nil { - return - } + tdbKey := utils.ConcatAll(metaResponseIndex[:], k, resp.HeaderHash[:]) + if err = c.tdb.Put(tdbKey, enc.Bytes(), nil); err != nil { + err = errors.Wrapf(err, "put response %d %s", h, resp.HeaderHash.String()) + return + } - // Always put memory changes which will not be affected by rollback after DB operations - return c.qi.addResponse(h, resp) - }) + return } // pushAckedQuery pushes a acknowledged, signed and verified query into the chain. @@ -471,28 +489,14 @@ func (c *Chain) pushAckedQuery(ack *wt.SignedAckHeader) (err error) { return } - return c.db.Update(func(tx *bolt.Tx) (err error) { - _, err = ensureHeight(tx, k) - if err != nil { - return - } - - ldbKey := make([]byte, 0, len(metaAckIndexBucket)+len(k)+hash.HashSize) - ldbKey = append(append(append(ldbKey, metaAckIndexBucket[:]...), k...), ack.HeaderHash[:]...) - err = c.ldb.Put(ldbKey, enc.Bytes(), nil) - //err = b.Bucket(metaAckIndexBucket).Put(ack.HeaderHash[:], enc.Bytes()) - if err != nil { - err = errors.Wrapf(err, "put %s %d %s", string(metaAckIndexBucket[:]), h, ack.HeaderHash) - return - } - - // Always put memory changes which will not be affected by rollback after DB operations - if err = c.qi.addAck(h, ack); err != nil { - return - } + tdbKey := utils.ConcatAll(metaAckIndex[:], k, ack.HeaderHash[:]) + if err = c.tdb.Put(tdbKey, enc.Bytes(), nil); err != nil { + err = errors.Wrapf(err, "put response %d %s", h, ack.HeaderHash.String()) return - }) + } + + return } // produceBlock prepares, signs and advises the pending block to the orther peers. @@ -877,14 +881,13 @@ func (c *Chain) Stop() (err error) { "peer": c.rt.getPeerInfoString(), "time": c.rt.getChainTimeString(), }).Debug("Chain service stopped") - // Close database file - err = c.db.Close() + // Close LevelDB file + err = c.bdb.Close() log.WithFields(log.Fields{ "peer": c.rt.getPeerInfoString(), "time": c.rt.getChainTimeString(), }).Debug("Chain database closed") - // Close LevelDB file - err = c.ldb.Close() + err = c.tdb.Close() log.WithFields(log.Fields{ "peer": c.rt.getPeerInfoString(), "time": c.rt.getChainTimeString(), @@ -895,15 +898,19 @@ func (c *Chain) Stop() (err error) { // FetchBlock fetches the block at specified height from local cache. func (c *Chain) FetchBlock(height int32) (b *ct.Block, err error) { if n := c.rt.getHead().node.ancestor(height); n != nil { - k := n.indexKey() - err = c.db.View(func(tx *bolt.Tx) (err error) { - if v := tx.Bucket(metaBucket[:]).Bucket(metaBlockIndexBucket).Get(k); v != nil { - b = &ct.Block{} - err = utils.DecodeMsgPack(v, b) - } + k := utils.ConcatAll(metaBlockIndex[:], n.indexKey()) + v, err := c.bdb.Get(k, nil) + if err != nil { + err = errors.Wrapf(err, "fetch block %s", string(k)) + return nil, err + } - return - }) + b = &ct.Block{} + err = utils.DecodeMsgPack(v, b) + if err != nil { + err = errors.Wrapf(err, "fetch block %s", string(k)) + return nil, err + } } return @@ -916,30 +923,28 @@ func (c *Chain) FetchAckedQuery(height int32, header *hash.Hash) ( if ack, err = c.qi.getAck(height, header); err == nil && ack != nil { return } - err = c.db.View(func(tx *bolt.Tx) (err error) { - var hb = tx.Bucket(metaBucket[:]).Bucket(metaHeightIndexBucket) - for h := height - c.rt.queryTTL - 1; h <= height; h++ { - k := heightToKey(h) - if ab := hb.Bucket(heightToKey(h)); ab != nil { - ldbKey := make([]byte, 0, len(metaAckIndexBucket)+len(k)+hash.HashSize) - ldbKey = append(append(append(ldbKey, metaAckIndexBucket[:]...), k...), header[:]...) - v, _ := c.ldb.Get(ldbKey, nil) - //v := ab.Bucket(metaAckIndexBucket).Get(header[:]) - if v != nil { - var dec = &wt.SignedAckHeader{} - if err = utils.DecodeMsgPack(v, dec); err != nil { - return - } - ack = dec - break - } + for h := height - c.rt.queryTTL - 1; h <= height; h++ { + k := heightToKey(h) + ackKey := utils.ConcatAll(metaAckIndex[:], k, header[:]) + v, err := c.tdb.Get(ackKey, nil) + if err != nil { + if err != leveldb.ErrNotFound { + err = errors.Wrapf(err, "fetch ack in height %d hash %s", h, header.String()) + return nil, err } + } else { + var dec = &wt.SignedAckHeader{} + if err = utils.DecodeMsgPack(v, dec); err != nil { + err = errors.Wrapf(err, "fetch ack in height %d hash %s", h, header.String()) + return nil, err + } + ack = dec + break } - if ack == nil { - err = ErrAckQueryNotFound - } - return - }) + } + if ack == nil { + err = ErrAckQueryNotFound + } return } diff --git a/utils/bytes.go b/utils/bytes.go new file mode 100644 index 000000000..651cbc415 --- /dev/null +++ b/utils/bytes.go @@ -0,0 +1,29 @@ +/* + * Copyright 2018 The CovenantSQL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +import "bytes" + +// ConcatAll concatenate several bytes slice into one. +func ConcatAll(args ...[]byte) []byte { + key := bytes.NewBuffer([]byte{}) + + for i := range args { + key.Write(args[i]) + } + return key.Bytes() +} diff --git a/utils/bytes_test.go b/utils/bytes_test.go new file mode 100644 index 000000000..ada5ec537 --- /dev/null +++ b/utils/bytes_test.go @@ -0,0 +1,34 @@ +/* + * Copyright 2018 The CovenantSQL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +import ( + "github.com/CovenantSQL/CovenantSQL/utils/log" + . "github.com/smartystreets/goconvey/convey" + "testing" +) + +func TestNewLevelDBKey(t *testing.T) { + Convey("new bytes", t, func() { + log.SetLevel(log.DebugLevel) + So(ConcatAll(nil), ShouldResemble, []byte{}) + So(ConcatAll([]byte{}), ShouldResemble, []byte{}) + So(ConcatAll([]byte{'0'}, []byte{'1'}), ShouldResemble, []byte{'0', '1'}) + So(ConcatAll([]byte{'0'}, nil), ShouldResemble, []byte{'0'}) + So(ConcatAll(nil, []byte{'0'}), ShouldResemble, []byte{'0'}) + }) +} \ No newline at end of file From 8bf6d8a83dfe19364386b5a3df5fc4e2ef9bcb1e Mon Sep 17 00:00:00 2001 From: lambda Date: Wed, 31 Oct 2018 00:06:05 +0800 Subject: [PATCH 2/8] fix query index problem --- cleanupDB.sh | 3 +- cmd/cql-minerd/main.go | 2 +- sqlchain/chain.go | 83 ++++++++++-------------------------------- utils/bytes_test.go | 5 ++- 4 files changed, 26 insertions(+), 67 deletions(-) diff --git a/cleanupDB.sh b/cleanupDB.sh index a14cbd9bd..49ebc51b9 100755 --- a/cleanupDB.sh +++ b/cleanupDB.sh @@ -7,4 +7,5 @@ cd ${PROJECT_DIR} && find . -name '*.db-shm' -exec rm -f {} \; cd ${PROJECT_DIR} && find . -name '*.db-wal' -exec rm -f {} \; cd ${PROJECT_DIR} && find . -name 'db.meta' -exec rm -f {} \; cd ${PROJECT_DIR} && find . -name 'public.keystore' -exec rm -f {} \; -cd ${PROJECT_DIR} && find . -name '*.public.keystore' -exec rm -f {} \; \ No newline at end of file +cd ${PROJECT_DIR} && find . -name '*.public.keystore' -exec rm -f {} \; +cd ${PROJECT_DIR} && find . -name '*.ldb' -exec rm -rf {} \; diff --git a/cmd/cql-minerd/main.go b/cmd/cql-minerd/main.go index ab094181b..4072bc8a7 100644 --- a/cmd/cql-minerd/main.go +++ b/cmd/cql-minerd/main.go @@ -120,7 +120,7 @@ func initLogs() { func main() { // set random rand.Seed(time.Now().UnixNano()) - log.SetLevel(log.InfoLevel) + log.SetLevel(log.DebugLevel) flag.Parse() flag.Visit(func(f *flag.Flag) { log.Infof("Args %#v : %#v", f.Name, f.Value) diff --git a/sqlchain/chain.go b/sqlchain/chain.go index 0a676d0f5..3dbd84b07 100644 --- a/sqlchain/chain.go +++ b/sqlchain/chain.go @@ -37,7 +37,6 @@ import ( "github.com/CovenantSQL/CovenantSQL/utils" "github.com/CovenantSQL/CovenantSQL/utils/log" wt "github.com/CovenantSQL/CovenantSQL/worker/types" - "github.com/coreos/bbolt" "github.com/pkg/errors" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/opt" @@ -45,19 +44,12 @@ import ( ) var ( - metaBucket = [4]byte{0x0, 0x0, 0x0, 0x0} - metaState = [4]byte{'S', 'T', 'A', 'T'} - metaStateKey = []byte("covenantsql-state") - metaBlockIndexBucket = []byte("covenantsql-block-index-bucket") - metaBlockIndex = [4]byte{'B', 'L', 'C', 'K'} - metaHeightIndexBucket = []byte("covenantsql-query-height-index-bucket") - metaRequestIndexBucket = []byte("covenantsql-query-request-index-bucket") - metaRequestIndex = [4]byte{'R', 'E', 'Q', 'U'} - metaResponseIndexBucket = []byte("covenantsql-query-response-index-bucket") + metaState = [4]byte{'S', 'T', 'A', 'T'} + metaBlockIndex = [4]byte{'B', 'L', 'C', 'K'} + metaRequestIndex = [4]byte{'R', 'E', 'Q', 'U'} metaResponseIndex = [4]byte{'R', 'E', 'S', 'P'} - metaAckIndexBucket = [4]byte{'Q', 'A', 'C', 'K'} - metaAckIndex = [4]byte{'Q', 'A', 'C', 'K'} - leveldbConf = opt.Options{} + metaAckIndex = [4]byte{'Q', 'A', 'C', 'K'} + leveldbConf = opt.Options{} ) func init() { @@ -138,30 +130,6 @@ func NewChain(c *Config) (chain *Chain, err error) { return } - // Open DB file - db, err := bolt.Open(c.DataFile, 0600, nil) - if err != nil { - return - } - - // Create buckets for chain meta - if err = db.Update(func(tx *bolt.Tx) (err error) { - bucket, err := tx.CreateBucketIfNotExists(metaBucket[:]) - - if err != nil { - return - } - - if _, err = bucket.CreateBucketIfNotExists(metaBlockIndexBucket); err != nil { - return - } - - _, err = bucket.CreateBucketIfNotExists(metaHeightIndexBucket) - return - }); err != nil { - return - } - // Open LevelDB for block and state bdbFile := c.DataFile + "-block-state.ldb" bdb, err := leveldb.OpenFile(bdbFile, &leveldbConf) @@ -180,8 +148,8 @@ func NewChain(c *Config) (chain *Chain, err error) { // Create chain state chain = &Chain{ - bdb: bdb, - tdb: tdb, + bdb: bdb, + tdb: tdb, bi: newBlockIndex(c), qi: newQueryIndex(), cl: rpc.NewCaller(), @@ -226,8 +194,8 @@ func LoadChain(c *Config) (chain *Chain, err error) { // Create chain state chain = &Chain{ - bdb: bdb, - tdb: tdb, + bdb: bdb, + tdb: tdb, bi: newBlockIndex(c), qi: newQueryIndex(), cl: rpc.NewCaller(), @@ -439,27 +407,6 @@ func (c *Chain) pushBlock(b *ct.Block) (err error) { return } -func ensureHeight(tx *bolt.Tx, k []byte) (hb *bolt.Bucket, err error) { - b := tx.Bucket(metaBucket[:]).Bucket(metaHeightIndexBucket) - - if hb = b.Bucket(k); hb == nil { - // Create and initialize bucket in new height - if hb, err = b.CreateBucketIfNotExists(k); err != nil { - return - } - - if _, err = hb.CreateBucketIfNotExists(metaRequestIndexBucket); err != nil { - return - } - - if _, err = hb.CreateBucketIfNotExists(metaResponseIndexBucket); err != nil { - return - } - } - - return -} - // pushResponedQuery pushes a responsed, signed and verified query into the chain. func (c *Chain) pushResponedQuery(resp *wt.SignedResponseHeader) (err error) { h := c.rt.getHeightFromTime(resp.Request.Timestamp) @@ -476,6 +423,11 @@ func (c *Chain) pushResponedQuery(resp *wt.SignedResponseHeader) (err error) { return } + if err = c.qi.addResponse(h, resp); err != nil { + err = errors.Wrapf(err, "add resp h %d hash %s", h, resp.HeaderHash) + return err + } + return } @@ -492,10 +444,15 @@ func (c *Chain) pushAckedQuery(ack *wt.SignedAckHeader) (err error) { tdbKey := utils.ConcatAll(metaAckIndex[:], k, ack.HeaderHash[:]) if err = c.tdb.Put(tdbKey, enc.Bytes(), nil); err != nil { - err = errors.Wrapf(err, "put response %d %s", h, ack.HeaderHash.String()) + err = errors.Wrapf(err, "put ack %d %s", h, ack.HeaderHash.String()) return } + if err = c.qi.addAck(h, ack); err != nil { + err = errors.Wrapf(err, "add ack h %d hash %s", h, ack.HeaderHash) + return err + } + return } diff --git a/utils/bytes_test.go b/utils/bytes_test.go index ada5ec537..3d7a45e66 100644 --- a/utils/bytes_test.go +++ b/utils/bytes_test.go @@ -17,9 +17,10 @@ package utils import ( + "testing" + "github.com/CovenantSQL/CovenantSQL/utils/log" . "github.com/smartystreets/goconvey/convey" - "testing" ) func TestNewLevelDBKey(t *testing.T) { @@ -31,4 +32,4 @@ func TestNewLevelDBKey(t *testing.T) { So(ConcatAll([]byte{'0'}, nil), ShouldResemble, []byte{'0'}) So(ConcatAll(nil, []byte{'0'}), ShouldResemble, []byte{'0'}) }) -} \ No newline at end of file +} From 7a888982deb28cb82ca6c7ca1fef96417c73e5cb Mon Sep 17 00:00:00 2001 From: lambda Date: Wed, 31 Oct 2018 11:53:08 +0800 Subject: [PATCH 3/8] fix rollback bug --- sqlchain/chain.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sqlchain/chain.go b/sqlchain/chain.go index 3dbd84b07..d2a961419 100644 --- a/sqlchain/chain.go +++ b/sqlchain/chain.go @@ -364,20 +364,21 @@ func (c *Chain) pushBlock(b *ct.Block) (err error) { } // Update in transaction - batch := new(leveldb.Batch) - batch.Put(metaState[:], encState.Bytes()) t, err := c.bdb.OpenTransaction() if err = t.Put(metaState[:], encState.Bytes(), nil); err != nil { err = errors.Wrapf(err, "put %s", string(metaState[:])) + t.Discard() return } blockKey := utils.ConcatAll(metaBlockIndex[:], node.indexKey()) if err = t.Put(blockKey, encBlock.Bytes(), nil); err != nil { err = errors.Wrapf(err, "put %s", string(node.indexKey())) + t.Discard() return } if err = t.Commit(); err != nil { err = errors.Wrapf(err, "commit error") + t.Discard() return } c.rt.setHead(st) @@ -433,6 +434,7 @@ func (c *Chain) pushResponedQuery(resp *wt.SignedResponseHeader) (err error) { // pushAckedQuery pushes a acknowledged, signed and verified query into the chain. func (c *Chain) pushAckedQuery(ack *wt.SignedAckHeader) (err error) { + log.Debugf("push ack %s", ack.HeaderHash.String()) h := c.rt.getHeightFromTime(ack.SignedResponseHeader().Timestamp) k := heightToKey(h) var enc *bytes.Buffer From 7f3657de05dabf44056b03fb4fb479e091d98ad4 Mon Sep 17 00:00:00 2001 From: lambda Date: Wed, 31 Oct 2018 15:33:32 +0800 Subject: [PATCH 4/8] Fix error shadowed bug --- cleanupDB.sh | 2 +- sqlchain/chain.go | 54 ++++++++++++++++++++----------------- sqlchain/queryindex.go | 5 ++-- sqlchain/queryindex_test.go | 7 ++--- 4 files changed, 37 insertions(+), 31 deletions(-) diff --git a/cleanupDB.sh b/cleanupDB.sh index 49ebc51b9..caf5dc037 100755 --- a/cleanupDB.sh +++ b/cleanupDB.sh @@ -8,4 +8,4 @@ cd ${PROJECT_DIR} && find . -name '*.db-wal' -exec rm -f {} \; cd ${PROJECT_DIR} && find . -name 'db.meta' -exec rm -f {} \; cd ${PROJECT_DIR} && find . -name 'public.keystore' -exec rm -f {} \; cd ${PROJECT_DIR} && find . -name '*.public.keystore' -exec rm -f {} \; -cd ${PROJECT_DIR} && find . -name '*.ldb' -exec rm -rf {} \; +cd ${PROJECT_DIR} && find . -type d -name '*.ldb' -prune -exec rm -rf {} \; diff --git a/sqlchain/chain.go b/sqlchain/chain.go index d2a961419..e1ccff89e 100644 --- a/sqlchain/chain.go +++ b/sqlchain/chain.go @@ -858,17 +858,18 @@ func (c *Chain) Stop() (err error) { func (c *Chain) FetchBlock(height int32) (b *ct.Block, err error) { if n := c.rt.getHead().node.ancestor(height); n != nil { k := utils.ConcatAll(metaBlockIndex[:], n.indexKey()) - v, err := c.bdb.Get(k, nil) + var v []byte + v, err = c.bdb.Get(k, nil) if err != nil { err = errors.Wrapf(err, "fetch block %s", string(k)) - return nil, err + return } b = &ct.Block{} err = utils.DecodeMsgPack(v, b) if err != nil { err = errors.Wrapf(err, "fetch block %s", string(k)) - return nil, err + return } } @@ -879,30 +880,30 @@ func (c *Chain) FetchBlock(height int32) (b *ct.Block, err error) { func (c *Chain) FetchAckedQuery(height int32, header *hash.Hash) ( ack *wt.SignedAckHeader, err error, ) { - if ack, err = c.qi.getAck(height, header); err == nil && ack != nil { - return - } - for h := height - c.rt.queryTTL - 1; h <= height; h++ { - k := heightToKey(h) - ackKey := utils.ConcatAll(metaAckIndex[:], k, header[:]) - v, err := c.tdb.Get(ackKey, nil) - if err != nil { - if err != leveldb.ErrNotFound { - err = errors.Wrapf(err, "fetch ack in height %d hash %s", h, header.String()) - return nil, err - } - } else { - var dec = &wt.SignedAckHeader{} - if err = utils.DecodeMsgPack(v, dec); err != nil { - err = errors.Wrapf(err, "fetch ack in height %d hash %s", h, header.String()) - return nil, err + if ack, err = c.qi.getAck(height, header); err != nil || ack == nil { + for h := height - c.rt.queryTTL - 1; h <= height; h++ { + k := heightToKey(h) + ackKey := utils.ConcatAll(metaAckIndex[:], k, header[:]) + var v []byte + if v, err = c.tdb.Get(ackKey, nil); err != nil { + // if err == leveldb.ErrNotFound, just loop for next h + if err != leveldb.ErrNotFound { + err = errors.Wrapf(err, "fetch ack in height %d hash %s", h, header.String()) + return + } + } else { + var dec = &wt.SignedAckHeader{} + if err = utils.DecodeMsgPack(v, dec); err != nil { + err = errors.Wrapf(err, "fetch ack in height %d hash %s", h, header.String()) + return + } + ack = dec + break } - ack = dec - break } } if ack == nil { - err = ErrAckQueryNotFound + err = errors.Wrapf(ErrAckQueryNotFound, "fetch ack not found") } return } @@ -1044,10 +1045,12 @@ func (c *Chain) CheckAndPushNewBlock(block *ct.Block) (err error) { func (c *Chain) VerifyAndPushResponsedQuery(resp *wt.SignedResponseHeader) (err error) { // TODO(leventeliu): check resp. if c.rt.queryTimeIsExpired(resp.Timestamp) { - return ErrQueryExpired + err = errors.Wrapf(ErrQueryExpired, "Verify response query, min valid height %d, response height %d", c.rt.getMinValidHeight(), c.rt.getHeightFromTime(resp.Timestamp)) + return } if err = resp.Verify(); err != nil { + err = errors.Wrapf(err, "") return } @@ -1058,7 +1061,8 @@ func (c *Chain) VerifyAndPushResponsedQuery(resp *wt.SignedResponseHeader) (err func (c *Chain) VerifyAndPushAckedQuery(ack *wt.SignedAckHeader) (err error) { // TODO(leventeliu): check ack. if c.rt.queryTimeIsExpired(ack.SignedResponseHeader().Timestamp) { - return ErrQueryExpired + err = errors.Wrapf(ErrQueryExpired, "Verify ack query, min valid height %d, ack height %d", c.rt.getMinValidHeight(), c.rt.getHeightFromTime(ack.Timestamp)) + return } if err = ack.Verify(); err != nil { diff --git a/sqlchain/queryindex.go b/sqlchain/queryindex.go index 7a8a28fd0..27fa77ca9 100644 --- a/sqlchain/queryindex.go +++ b/sqlchain/queryindex.go @@ -19,6 +19,7 @@ package sqlchain // TODO(leventeliu): use pooled objects to speed up this index. import ( + "github.com/pkg/errors" "sync" "github.com/CovenantSQL/CovenantSQL/crypto/hash" @@ -482,7 +483,7 @@ func (i *queryIndex) checkAckFromBlock(h int32, b *hash.Hash, ack *hash.Hash) ( l := i.getBarrier() if h < l { - err = ErrQueryExpired + err = errors.Wrapf(ErrQueryExpired, "check Ack, height %d, barrier %d", h, l) return } @@ -527,7 +528,7 @@ func (i *queryIndex) getAck(h int32, header *hash.Hash) (ack *wt.SignedAckHeader b := i.getBarrier() if h < b { - err = ErrQueryExpired + err = errors.Wrapf(ErrQueryExpired, "get Ack, height %d, barrier %d", h, b) return } diff --git a/sqlchain/queryindex_test.go b/sqlchain/queryindex_test.go index 23d6fb0b2..609b55b35 100644 --- a/sqlchain/queryindex_test.go +++ b/sqlchain/queryindex_test.go @@ -17,6 +17,7 @@ package sqlchain import ( + "github.com/pkg/errors" "math/rand" "reflect" "testing" @@ -130,7 +131,7 @@ func TestCheckAckFromBlock(t *testing.T) { if _, err := qi.checkAckFromBlock( 0, b1.BlockHash(), b1.Queries[0], - ); err != ErrQueryExpired { + ); errors.Cause(err) != ErrQueryExpired { t.Fatalf("Unexpected error: %v", err) } @@ -248,11 +249,11 @@ func TestGetAck(t *testing.T) { qi := newQueryIndex() qh := &hash.Hash{} - if _, err := qi.getAck(-1, qh); err != ErrQueryExpired { + if _, err := qi.getAck(-1, qh); errors.Cause(err) != ErrQueryExpired { t.Fatalf("Unexpected error: %v", err) } - if _, err := qi.getAck(0, qh); err != ErrQueryNotCached { + if _, err := qi.getAck(0, qh); errors.Cause(err) != ErrQueryNotCached { t.Fatalf("Unexpected error: %v", err) } } From 846e872743856df9abd697fd3ac4509ac09c308c Mon Sep 17 00:00:00 2001 From: lambda Date: Wed, 31 Oct 2018 15:53:19 +0800 Subject: [PATCH 5/8] Modify concatAll and debug level --- cmd/cql-minerd/main.go | 2 +- utils/bytes.go | 14 +++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/cmd/cql-minerd/main.go b/cmd/cql-minerd/main.go index 4072bc8a7..ab094181b 100644 --- a/cmd/cql-minerd/main.go +++ b/cmd/cql-minerd/main.go @@ -120,7 +120,7 @@ func initLogs() { func main() { // set random rand.Seed(time.Now().UnixNano()) - log.SetLevel(log.DebugLevel) + log.SetLevel(log.InfoLevel) flag.Parse() flag.Visit(func(f *flag.Flag) { log.Infof("Args %#v : %#v", f.Name, f.Value) diff --git a/utils/bytes.go b/utils/bytes.go index 651cbc415..f85d85e17 100644 --- a/utils/bytes.go +++ b/utils/bytes.go @@ -16,14 +16,18 @@ package utils -import "bytes" - // ConcatAll concatenate several bytes slice into one. func ConcatAll(args ...[]byte) []byte { - key := bytes.NewBuffer([]byte{}) + var bLen int + for i := range args { + bLen += len(args[i]) + } + key := make([]byte, bLen) + bLen = 0 for i := range args { - key.Write(args[i]) + copy(key[bLen:], args[i]) + bLen += len(args[i]) } - return key.Bytes() + return key } From a163a77c2cdb840ba6f353d6c15b985705ca0d09 Mon Sep 17 00:00:00 2001 From: auxten Date: Wed, 31 Oct 2018 22:16:36 +0800 Subject: [PATCH 6/8] Remove CleanupDB in test case --- cmd/cql-minerd/integration_test.go | 1 - cmd/cql-observer/observation_test.go | 2 -- 2 files changed, 3 deletions(-) diff --git a/cmd/cql-minerd/integration_test.go b/cmd/cql-minerd/integration_test.go index 6c62ddc67..61fbf14e5 100644 --- a/cmd/cql-minerd/integration_test.go +++ b/cmd/cql-minerd/integration_test.go @@ -72,7 +72,6 @@ func startNodes() { log.Fatalf("wait for port ready timeout: %v", err) } - utils.CleanupDB() // start 3bps var cmd *utils.CMD if cmd, err = utils.RunCommandNB( diff --git a/cmd/cql-observer/observation_test.go b/cmd/cql-observer/observation_test.go index 6d16e7285..4db8dc2d9 100644 --- a/cmd/cql-observer/observation_test.go +++ b/cmd/cql-observer/observation_test.go @@ -73,8 +73,6 @@ func startNodes() { log.Fatalf("wait for port ready timeout: %v", err) } - utils.CleanupDB() - // start 3bps var cmd *utils.CMD if cmd, err = utils.RunCommandNB( From 0b008c6b1a7ea4758afa2fa87df3e48742e872dc Mon Sep 17 00:00:00 2001 From: lambda Date: Thu, 1 Nov 2018 12:32:02 +0800 Subject: [PATCH 7/8] Add more code coverage --- sqlchain/chain.go | 2 +- sqlchain/chain_test.go | 2 ++ test/GNTE/GNTE | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sqlchain/chain.go b/sqlchain/chain.go index d57c975d1..8984fd8aa 100644 --- a/sqlchain/chain.go +++ b/sqlchain/chain.go @@ -121,7 +121,7 @@ func NewChain(c *Config) (chain *Chain, err error) { // TODO(leventeliu): this is a rough solution, you may also want to clean database file and // force rebuilding. var fi os.FileInfo - if fi, err = os.Stat(c.DataFile); err == nil && fi.Mode().IsRegular() { + if fi, err = os.Stat(c.DataFile + "-block-state.ldb"); err == nil && fi.Mode().IsDir() { return LoadChain(c) } diff --git a/sqlchain/chain_test.go b/sqlchain/chain_test.go index 907129cb0..5afd1f227 100644 --- a/sqlchain/chain_test.go +++ b/sqlchain/chain_test.go @@ -184,6 +184,8 @@ func TestMultiChain(t *testing.T) { config: config, chain: chain, } + + } // Create a master BP for RPC test diff --git a/test/GNTE/GNTE b/test/GNTE/GNTE index 956452ace..51de88917 160000 --- a/test/GNTE/GNTE +++ b/test/GNTE/GNTE @@ -1 +1 @@ -Subproject commit 956452aceb2afdeb3b72ea041aebf1bb367ede56 +Subproject commit 51de889172de377ec104900b9566ef8c8510525e From 7c01bdd2e74409fe503879a33ce96197f61b15d0 Mon Sep 17 00:00:00 2001 From: lambda Date: Thu, 1 Nov 2018 13:07:25 +0800 Subject: [PATCH 8/8] Add test case and state new var in utils/bytes.go --- utils/bytes.go | 6 +++--- utils/bytes_test.go | 10 ++++++++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/utils/bytes.go b/utils/bytes.go index f85d85e17..5f0b77459 100644 --- a/utils/bytes.go +++ b/utils/bytes.go @@ -24,10 +24,10 @@ func ConcatAll(args ...[]byte) []byte { } key := make([]byte, bLen) - bLen = 0 + position := 0 for i := range args { - copy(key[bLen:], args[i]) - bLen += len(args[i]) + copy(key[position:], args[i]) + position += len(args[i]) } return key } diff --git a/utils/bytes_test.go b/utils/bytes_test.go index 3d7a45e66..9276681de 100644 --- a/utils/bytes_test.go +++ b/utils/bytes_test.go @@ -31,5 +31,15 @@ func TestNewLevelDBKey(t *testing.T) { So(ConcatAll([]byte{'0'}, []byte{'1'}), ShouldResemble, []byte{'0', '1'}) So(ConcatAll([]byte{'0'}, nil), ShouldResemble, []byte{'0'}) So(ConcatAll(nil, []byte{'0'}), ShouldResemble, []byte{'0'}) + So(ConcatAll([]byte{'0', '1', '2', '3'}, []byte{'a', 'b', 'c', 'd', 'e'}, []byte{'x', 'y', 'z'}), + ShouldResemble, []byte{'0', '1', '2', '3', 'a', 'b', 'c', 'd', 'e', 'x', 'y', 'z'}) + So(ConcatAll([]byte{'0', '1', '2', '3'}, nil, []byte{'x', 'y', 'z'}), + ShouldResemble, []byte{'0', '1', '2', '3', 'x', 'y', 'z'}) + So(ConcatAll([]byte{'0', '1', '2', '3'}, []byte{}, []byte{'x', 'y', 'z'}), + ShouldResemble, []byte{'0', '1', '2', '3','x', 'y', 'z'}) + So(ConcatAll(nil, []byte{'0', '1', '2', '3'}, nil, []byte{'x', 'y', 'z'}), + ShouldResemble, []byte{'0', '1', '2', '3', 'x', 'y', 'z'}) + So(ConcatAll([]byte{}, []byte{'0', '1', '2', '3'}, nil, []byte{'x', 'y', 'z'}, nil), + ShouldResemble, []byte{'0', '1', '2', '3', 'x', 'y', 'z'}) }) }