diff --git a/client/conn.go b/client/conn.go index e79f0cb07..eeec4d1db 100644 --- a/client/conn.go +++ b/client/conn.go @@ -348,7 +348,6 @@ func (c *conn) sendQuery(queryType types.QueryType, queries []types.Query) (affe return } - c.pCaller = rpc.NewPersistentCaller(peers.Leader) var response types.Response if err = c.pCaller.Call(route.DBSQuery.String(), req, &response); err != nil { return diff --git a/cmd/cql-minerd/integration_test.go b/cmd/cql-minerd/integration_test.go index 4d16529d8..93935400b 100644 --- a/cmd/cql-minerd/integration_test.go +++ b/cmd/cql-minerd/integration_test.go @@ -27,6 +27,7 @@ import ( "os" "os/exec" "path/filepath" + "runtime" "sync" "sync/atomic" "syscall" @@ -443,6 +444,13 @@ func benchDB(b *testing.B, db *sql.DB, createDB bool) { }) }) + routineCount := runtime.NumGoroutine() + if routineCount > 100 { + b.Errorf("go routine count: %d", routineCount) + } else { + log.Infof("go routine count: %d", routineCount) + } + rowCount := db.QueryRow("SELECT COUNT(1) FROM " + TABLENAME) var count int64 err = rowCount.Scan(&count) @@ -473,6 +481,13 @@ func benchDB(b *testing.B, db *sql.DB, createDB bool) { }) }) + routineCount = runtime.NumGoroutine() + if routineCount > 100 { + b.Errorf("go routine count: %d", routineCount) + } else { + log.Infof("go routine count: %d", routineCount) + } + //row := db.QueryRow("SELECT nonIndexedColumn FROM test LIMIT 1") //var result int diff --git a/route/acl.go b/route/acl.go index dd6d4ab69..9664f7e3c 100644 --- a/route/acl.go +++ b/route/acl.go @@ -91,12 +91,10 @@ const ( SQLCAdviseNewBlock // SQLCAdviseBinLog is usd by sqlchain to advise binlog between adjacent node SQLCAdviseBinLog - // SQLCAdviseResponsedQuery is used by sqlchain to advice response query between adjacent node + // SQLCAdviseAckedQuery is used by sqlchain to advice response query between adjacent node SQLCAdviseAckedQuery // SQLCFetchBlock is used by sqlchain to fetch block from adjacent nodes SQLCFetchBlock - // SQLCFetchAckedQuery is used by sqlchain to fetch response ack from adjacent nodes - SQLCFetchAckedQuery // SQLCSignBilling is used by sqlchain to response billing signature for periodic billing request SQLCSignBilling // SQLCLaunchBilling is used by blockproducer to trigger the billing process in sqlchain @@ -179,8 +177,6 @@ func (s RemoteFunc) String() string { return "SQLC.AdviseAckedQuery" case SQLCFetchBlock: return "SQLC.FetchBlock" - case SQLCFetchAckedQuery: - return "SQLC.FetchAckedQuery" case SQLCSignBilling: return "SQLC.SignBilling" case SQLCLaunchBilling: diff --git a/sqlchain/ackindex.go b/sqlchain/ackindex.go index 13bc3c9fa..8e84d8bf5 100644 --- a/sqlchain/ackindex.go +++ b/sqlchain/ackindex.go @@ -18,12 +18,20 @@ package sqlchain import ( "sync" + "sync/atomic" "github.com/CovenantSQL/CovenantSQL/types" "github.com/CovenantSQL/CovenantSQL/utils/log" "github.com/pkg/errors" ) +var ( + // Global atomic counters for stats + multiIndexCount int32 + responseCount int32 + ackTrackerCount int32 +) + type ackTracker struct { resp *types.SignedResponseHeader ack *types.SignedAckHeader @@ -48,6 +56,7 @@ func (i *multiAckIndex) addResponse(resp *types.SignedResponseHeader) (err error return } i.ri[key] = resp + atomic.AddInt32(&responseCount, 1) return } @@ -70,16 +79,19 @@ func (i *multiAckIndex) register(ack *types.SignedAckHeader) (err error) { resp: resp, ack: ack, } + atomic.AddInt32(&responseCount, -1) + atomic.AddInt32(&ackTrackerCount, 1) return } func (i *multiAckIndex) remove(ack *types.SignedAckHeader) (err error) { var key = ack.SignedRequestHeader().GetQueryKey() - log.Debugf("Removing key %s <-- ack %s", &key, ack.Hash()) + log.Debugf("Removing key %s -x- ack %s", &key, ack.Hash()) i.Lock() defer i.Unlock() if _, ok := i.ri[key]; ok { delete(i.ri, key) + atomic.AddInt32(&responseCount, -1) return } if oack, ok := i.qi[key]; ok { @@ -89,6 +101,7 @@ func (i *multiAckIndex) remove(ack *types.SignedAckHeader) (err error) { return } delete(i.qi, key) + atomic.AddInt32(&ackTrackerCount, -1) return } err = errors.Wrapf(ErrQueryNotFound, "remove key %s -x- ack %s", &key, ack.Hash()) @@ -162,6 +175,7 @@ func (i *ackIndex) load(h int32) (mi *multiAckIndex, err error) { qi: make(map[types.QueryKey]*ackTracker), } i.hi[h] = mi + atomic.AddInt32(&multiIndexCount, 1) } return } @@ -180,7 +194,10 @@ func (i *ackIndex) advance(h int32) { // Record expired and not acknowledged queries for _, v := range dl { v.expire() + atomic.AddInt32(&responseCount, int32(-len(v.ri))) + atomic.AddInt32(&ackTrackerCount, int32(-len(v.qi))) } + atomic.AddInt32(&multiIndexCount, int32(-len(dl))) } func (i *ackIndex) addResponse(h int32, resp *types.SignedResponseHeader) (err error) { diff --git a/sqlchain/blockindex.go b/sqlchain/blockindex.go index 96d773bef..04bf7d97f 100644 --- a/sqlchain/blockindex.go +++ b/sqlchain/blockindex.go @@ -85,19 +85,14 @@ func (n *blockNode) indexKey() (key []byte) { } type blockIndex struct { - cfg *Config - mu sync.RWMutex index map[hash.Hash]*blockNode } -func newBlockIndex(cfg *Config) (index *blockIndex) { - index = &blockIndex{ - cfg: cfg, +func newBlockIndex() (index *blockIndex) { + return &blockIndex{ index: make(map[hash.Hash]*blockNode), } - - return index } func (i *blockIndex) addBlock(newBlock *blockNode) { diff --git a/sqlchain/blockindex_test.go b/sqlchain/blockindex_test.go index 9c5c0c39d..e372980c8 100644 --- a/sqlchain/blockindex_test.go +++ b/sqlchain/blockindex_test.go @@ -108,8 +108,7 @@ func TestInitBlockNode(t *testing.T) { } func TestAncestor(t *testing.T) { - cfg := &Config{} - index := newBlockIndex(cfg) + index := newBlockIndex() parent := (*blockNode)(nil) for h, b := range testBlocks { @@ -142,8 +141,7 @@ func TestAncestor(t *testing.T) { } func TestIndex(t *testing.T) { - cfg := &Config{} - index := newBlockIndex(cfg) + index := newBlockIndex() parent := (*blockNode)(nil) for h, b := range testBlocks { diff --git a/sqlchain/chain.go b/sqlchain/chain.go index f2d79a851..a95f81c45 100644 --- a/sqlchain/chain.go +++ b/sqlchain/chain.go @@ -21,13 +21,14 @@ import ( "encoding/binary" "fmt" "os" + rt "runtime" "sync" + "sync/atomic" "time" pt "github.com/CovenantSQL/CovenantSQL/blockproducer/types" "github.com/CovenantSQL/CovenantSQL/crypto" "github.com/CovenantSQL/CovenantSQL/crypto/asymmetric" - "github.com/CovenantSQL/CovenantSQL/crypto/hash" "github.com/CovenantSQL/CovenantSQL/crypto/kms" "github.com/CovenantSQL/CovenantSQL/proto" "github.com/CovenantSQL/CovenantSQL/route" @@ -45,7 +46,7 @@ import ( ) const ( - minBlockCacheTTL = int32(100) + minBlockCacheTTL = int32(30) ) var ( @@ -55,6 +56,9 @@ var ( metaResponseIndex = [4]byte{'R', 'E', 'S', 'P'} metaAckIndex = [4]byte{'Q', 'A', 'C', 'K'} leveldbConf = opt.Options{} + + // Atomic counters for stats + cachedBlockCount int32 ) func init() { @@ -62,6 +66,13 @@ func init() { leveldbConf.Compression = opt.SnappyCompression } +func statBlock(b *types.Block) { + atomic.AddInt32(&cachedBlockCount, 1) + rt.SetFinalizer(b, func(_ *types.Block) { + atomic.AddInt32(&cachedBlockCount, -1) + }) +} + // heightToKey converts a height in int32 to a key in bytes. func heightToKey(h int32) (key []byte) { key = make([]byte, 4) @@ -97,7 +108,6 @@ type Chain struct { // tdb stores ack/request/response tdb *leveldb.DB bi *blockIndex - qi *queryIndex // TODO(leventeliu): remove query index. ai *ackIndex st *x.State cl *rpc.Caller @@ -188,8 +198,7 @@ func NewChain(c *Config) (chain *Chain, err error) { chain = &Chain{ bdb: bdb, tdb: tdb, - bi: newBlockIndex(c), - qi: newQueryIndex(), + bi: newBlockIndex(), ai: newAckIndex(), st: state, cl: rpc.NewCaller(), @@ -259,8 +268,7 @@ func LoadChain(c *Config) (chain *Chain, err error) { chain = &Chain{ bdb: bdb, tdb: tdb, - bi: newBlockIndex(c), - qi: newQueryIndex(), + bi: newBlockIndex(), ai: newAckIndex(), st: xstate, cl: rpc.NewCaller(), @@ -381,11 +389,6 @@ func LoadChain(c *Config) (chain *Chain, err error) { "height": h, "header": resp.Hash().String(), }).Debug("Loaded new resp header") - err = chain.qi.addResponse(h, resp) - if err != nil { - err = errors.Wrapf(err, "load resp, height %d, hash %s", h, resp.Hash().String()) - return - } } if err = respIter.Error(); err != nil { err = errors.Wrap(err, "load resp") @@ -407,11 +410,6 @@ func LoadChain(c *Config) (chain *Chain, err error) { "height": h, "header": ack.Hash().String(), }).Debug("Loaded new ack header") - err = chain.qi.addAck(h, ack) - if err != nil { - err = errors.Wrapf(err, "load ack, height %d, hash %s", h, ack.Hash().String()) - return - } } if err = respIter.Error(); err != nil { err = errors.Wrap(err, "load ack") @@ -461,7 +459,6 @@ func (c *Chain) pushBlock(b *types.Block) (err error) { } c.rt.setHead(st) c.bi.addBlock(node) - c.qi.setSignedBlock(h, b) // Keep track of the queries from the new block var ierr error @@ -526,11 +523,6 @@ func (c *Chain) pushAckedQuery(ack *types.SignedAckHeader) (err error) { return } - if err = c.qi.addAck(h, ack); err != nil { - err = errors.Wrapf(err, "add ack %v at height %d", ack.Hash(), h) - return - } - if err = c.register(ack); err != nil { err = errors.Wrapf(err, "register ack %v at height %d", ack.Hash(), h) return @@ -563,6 +555,7 @@ func (c *Chain) produceBlockV2(now time.Time) (err error) { QueryTxs: make([]*types.QueryAsTx, len(qts)), Acks: c.ai.acks(c.rt.getHeightFromTime(now)), } + statBlock(block) for i, v := range qts { // TODO(leventeliu): maybe block waiting at a ready channel instead? for !v.Ready() { @@ -668,6 +661,7 @@ func (c *Chain) syncHead() { }).WithError(err).Debug( "Failed to fetch block from peer") } else { + statBlock(resp.Block) c.blocks <- resp.Block log.WithFields(log.Fields{ "peer": c.rt.getPeerInfoString(), @@ -700,9 +694,9 @@ func (c *Chain) syncHead() { // runCurrentTurn does the check and runs block producing if its my turn. func (c *Chain) runCurrentTurn(now time.Time) { defer func() { + c.stat() c.pruneBlockCache() c.rt.setNextTurn() - c.qi.advanceBarrier(c.rt.getMinValidHeight()) c.ai.advance(c.rt.getMinValidHeight()) // Info the block processing goroutine that the chain height has grown, so please return // any stashed blocks for further check. @@ -969,6 +963,7 @@ func (c *Chain) FetchBlock(height int32) (b *types.Block, err error) { } b = &types.Block{} + statBlock(b) err = utils.DecodeMsgPack(v, b) if err != nil { err = errors.Wrapf(err, "fetch block %s", string(k)) @@ -979,84 +974,6 @@ func (c *Chain) FetchBlock(height int32) (b *types.Block, err error) { return } -// FetchAckedQuery fetches the acknowledged query from local cache. -func (c *Chain) FetchAckedQuery(height int32, header *hash.Hash) ( - ack *types.SignedAckHeader, err error, -) { - if ack, err = c.qi.getAck(height, header); err != nil || ack == nil { - for h := height - c.rt.queryTTL - 1; h <= height; h++ { - k := heightToKey(h) - ackKey := utils.ConcatAll(metaAckIndex[:], k, header[:]) - var v []byte - if v, err = c.tdb.Get(ackKey, nil); err != nil { - // if err == leveldb.ErrNotFound, just loop for next h - if err != leveldb.ErrNotFound { - err = errors.Wrapf(err, "fetch ack in height %d hash %s", h, header.String()) - return - } - } else { - var dec = &types.SignedAckHeader{} - if err = utils.DecodeMsgPack(v, dec); err != nil { - err = errors.Wrapf(err, "fetch ack in height %d hash %s", h, header.String()) - return - } - ack = dec - break - } - } - } - if ack == nil { - err = errors.Wrapf(ErrAckQueryNotFound, "fetch ack not found") - } - return -} - -// syncAckedQuery uses RPC call to synchronize an acknowledged query from a remote node. -func (c *Chain) syncAckedQuery(height int32, header *hash.Hash, id proto.NodeID) ( - ack *types.SignedAckHeader, err error, -) { - req := &MuxFetchAckedQueryReq{ - Envelope: proto.Envelope{ - // TODO(leventeliu): Add fields. - }, - DatabaseID: c.rt.databaseID, - FetchAckedQueryReq: FetchAckedQueryReq{ - Height: height, - SignedAckedHash: header, - }, - } - resp := &MuxFetchAckedQueryResp{} - - if err = c.cl.CallNode(id, route.SQLCFetchAckedQuery.String(), req, resp); err != nil { - log.WithFields(log.Fields{ - "peer": c.rt.getPeerInfoString(), - "time": c.rt.getChainTimeString(), - }).WithError(err).Error( - "Failed to fetch acked query") - return - } - - if err = c.VerifyAndPushAckedQuery(resp.Ack); err != nil { - return - } - - ack = resp.Ack - return -} - -// queryOrSyncAckedQuery tries to query an acknowledged query from local index, and also tries to -// synchronize it from a remote node if not found locally. -func (c *Chain) queryOrSyncAckedQuery(height int32, header *hash.Hash, id proto.NodeID) ( - ack *types.SignedAckHeader, err error, -) { - if ack, err = c.FetchAckedQuery( - height, header, - ); (err == nil && ack != nil) || id == c.rt.getServer() { - return - } - return c.syncAckedQuery(height, header, id) -} - // CheckAndPushNewBlock implements ChainRPCServer.CheckAndPushNewBlock. func (c *Chain) CheckAndPushNewBlock(block *types.Block) (err error) { height := c.rt.getHeightFromTime(block.Timestamp()) @@ -1122,25 +1039,6 @@ func (c *Chain) CheckAndPushNewBlock(block *types.Block) (err error) { // ... // } - //// Check queries - //for _, q := range block.Queries { - // var ok bool - - // if ok, err = c.qi.checkAckFromBlock(height, block.BlockHash(), q); err != nil { - // return - // } - - // if !ok { - // if _, err = c.syncAckedQuery(height, q, block.Producer()); err != nil { - // return - // } - - // if _, err = c.qi.checkAckFromBlock(height, block.BlockHash(), q); err != nil { - // return - // } - // } - //} - // Replicate local state from the new block if err = c.st.ReplayBlock(block); err != nil { return @@ -1180,7 +1078,6 @@ func (c *Chain) getBilling(low, high int32) (req *pt.BillingRequest, err error) var ( n *blockNode addr proto.AccountAddress - ack *types.SignedAckHeader lowBlock, highBlock *types.Block billings = make(map[proto.AccountAddress]*proto.AddrAndGas) ) @@ -1220,22 +1117,17 @@ func (c *Chain) getBilling(low, high int32) (req *pt.BillingRequest, err error) } for _, v := range n.block.Acks { - ackHash := v.Hash() - if ack, err = c.queryOrSyncAckedQuery(n.height, &ackHash, n.block.Producer()); err != nil { - return - } - - if addr, err = crypto.PubKeyHash(ack.SignedResponseHeader().Signee); err != nil { + if addr, err = crypto.PubKeyHash(v.SignedResponseHeader().Signee); err != nil { return } if billing, ok := billings[addr]; ok { - billing.GasAmount += c.rt.price[ack.SignedRequestHeader().QueryType] * - ack.SignedRequestHeader().BatchCount + billing.GasAmount += c.rt.price[v.SignedRequestHeader().QueryType] * + v.SignedRequestHeader().BatchCount } else { billings[addr] = &proto.AddrAndGas{ AccountAddress: addr, - RawNodeID: *ack.SignedResponseHeader().NodeID.ToRawNodeID(), + RawNodeID: *v.SignedResponseHeader().NodeID.ToRawNodeID(), GasAmount: c.rt.producingReward, } } @@ -1550,3 +1442,22 @@ func (c *Chain) pruneBlockCache() { head.block = nil } } + +func (c *Chain) stat() { + var ( + ic = atomic.LoadInt32(&multiIndexCount) + rc = atomic.LoadInt32(&responseCount) + tc = atomic.LoadInt32(&ackTrackerCount) + bc = atomic.LoadInt32(&cachedBlockCount) + ) + // Print chain stats + log.WithFields(log.Fields{ + "database_id": c.rt.databaseID, + "multiIndex_count": ic, + "response_header_count": rc, + "query_tracker_count": tc, + "cached_block_count": bc, + }).Info("Chain mem stats") + // Print xeno stats + c.st.Stat(c.rt.databaseID) +} diff --git a/sqlchain/chain_test.go b/sqlchain/chain_test.go index 10c515c99..798854417 100644 --- a/sqlchain/chain_test.go +++ b/sqlchain/chain_test.go @@ -308,8 +308,10 @@ func TestMultiChain(t *testing.T) { i, c.rt.getPeerInfoString()) continue } - t.Logf("Checking block %v at height %d in peer %s", - node.block.BlockHash(), i, c.rt.getPeerInfoString()) + if node.block != nil { + t.Logf("Checking block %v at height %d in peer %s", + node.block.BlockHash(), i, c.rt.getPeerInfoString()) + } } }(v.chain) } diff --git a/sqlchain/mux.go b/sqlchain/mux.go index 8ea2f56f5..7dea6dfea 100644 --- a/sqlchain/mux.go +++ b/sqlchain/mux.go @@ -103,20 +103,6 @@ type MuxFetchBlockResp struct { FetchBlockResp } -// MuxFetchAckedQueryReq defines a request of the FetchAckedQuery RPC method. -type MuxFetchAckedQueryReq struct { - proto.Envelope - proto.DatabaseID - FetchAckedQueryReq -} - -// MuxFetchAckedQueryResp defines a request of the FetchAckedQuery RPC method. -type MuxFetchAckedQueryResp struct { - proto.Envelope - proto.DatabaseID - FetchAckedQueryResp -} - // MuxSignBillingReq defines a request of the SignBilling RPC method. type MuxSignBillingReq struct { proto.Envelope @@ -219,19 +205,6 @@ func (s *MuxService) FetchBlock(req *MuxFetchBlockReq, resp *MuxFetchBlockResp) return ErrUnknownMuxRequest } -// FetchAckedQuery is the RPC method to fetch a known block from the target server. -func (s *MuxService) FetchAckedQuery( - req *MuxFetchAckedQueryReq, resp *MuxFetchAckedQueryResp) (err error) { - if v, ok := s.serviceMap.Load(req.DatabaseID); ok { - resp.Envelope = req.Envelope - resp.DatabaseID = req.DatabaseID - return v.(*ChainRPCService).FetchAckedQuery( - &req.FetchAckedQueryReq, &resp.FetchAckedQueryResp) - } - - return ErrUnknownMuxRequest -} - // SignBilling is the RPC method to get signature for a billing request from the target server. func (s *MuxService) SignBilling(req *MuxSignBillingReq, resp *MuxSignBillingResp) (err error) { if v, ok := s.serviceMap.Load(req.DatabaseID); ok { diff --git a/sqlchain/queryindex.go b/sqlchain/queryindex.go deleted file mode 100644 index cd792becf..000000000 --- a/sqlchain/queryindex.go +++ /dev/null @@ -1,576 +0,0 @@ -/* - * Copyright 2018 The CovenantSQL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package sqlchain - -// TODO(leventeliu): use pooled objects to speed up this index. - -import ( - "sync" - - "github.com/CovenantSQL/CovenantSQL/crypto/hash" - "github.com/CovenantSQL/CovenantSQL/types" - "github.com/CovenantSQL/CovenantSQL/utils/log" - "github.com/pkg/errors" -) - -var ( - placeHolder = &hash.Hash{} -) - -// requestTracker defines a tracker of a particular database query request. -// We use it to track and update queries in this index system. -type requestTracker struct { - // TODO(leventeliu): maybe we don't need them to be "signed" here. Given that the response or - // Ack is already verified, simply use Header. - response *types.SignedResponseHeader - ack *types.SignedAckHeader - // signedBlock is the hash of the block in the currently best chain which contains this query. - signedBlock *hash.Hash -} - -// queryTracker defines a tracker of a particular database query. It may contain multiple queries -// to differe workers. -type queryTracker struct { - firstAck *requestTracker - queries []*requestTracker -} - -// newQueryTracker returns a new queryTracker reference. -func newQueryTracker() *queryTracker { - return &queryTracker{ - // TODO(leventeliu): set appropriate capacity. - firstAck: nil, - queries: make([]*requestTracker, 0, 10), - } -} - -// updateAck updates the query tracker with a verified SignedAckHeader. -func (s *requestTracker) updateAck(ack *types.SignedAckHeader) (isNew bool, err error) { - if s.ack == nil { - // A later Ack can overwrite the original Response setting - *s = requestTracker{ - response: ack.SignedResponseHeader(), - ack: ack, - } - - isNew = true - } else if s.ack.Hash() != ack.Hash() { - // This may happen when a client sends multiple acknowledgements for a same query (same - // response header hash) - err = ErrMultipleAckOfResponse - } // else it's same as s.Ack, let's try not to overwrite it - - return -} - -// hashIndex defines a requestTracker index using hash as key. -type hashIndex map[hash.Hash]*requestTracker - -// seqIndex defines a queryTracker index using sequence number as key. -type seqIndex map[types.QueryKey]*queryTracker - -// ensure returns the *queryTracker associated with the given key. It creates a new item if the -// key doesn't exist. -func (i seqIndex) ensure(k types.QueryKey) (v *queryTracker) { - var ok bool - - if v, ok = i[k]; !ok { - v = newQueryTracker() - i[k] = v - } - - return -} - -// multiIndex defines a combination of multiple indexes. -// -// Index layout is as following: -// -// respIndex +----------------+ -// +---------------------------+->| requestTracker | +---------------------------+ -// | ... | | | | +-response |------>| signedresponseheader | -// +--------+ | | | +-ack (nil) | | +-ResponseHeader | -// | hash#1 |-----+ | | +-... | | | +-SignedRequestHeader | -// +--------+ | +----------------+ | | | +-RequestHeader | -// | ... | | | | | | +-... | -// +--------+ +------------------+ | | | | | +-SeqNo: seq#0 | -// | hash#3 |-----+ +->| queryTracker | | | | | | +-... | -// +--------+ | | | +-firstAck (nil) | | | | | +-Hash: hash#0 | -// | ... | | | | +-queries | | | | | +-Signee ====> pubk#0 | -// +--------+ | | | +-[0] |--+ | | | +-Signature => sign#0 | -// | hash#6 |--+ | | | +-... | | | +-... | -// +--------+ | | | +------------------+ | +-Hash: hash#1 | -// | ... | | | | | +-Signee ====> pubk#1 | -// | | | | +-Signature => sign#1 | -// | | | +---------------------------+ -// | | | +----------------+ -// | +-------------+---------+-+--->| requestTracker | -// | | | | | | +-response |----+ +-------------------------------+ -// ackindex | | | | | | +-ack |----|->| SignedAckHeader | -// | | | | | | +-... | | | +-AckHeader | -// | ... | | | | | | +----------------+ +->| | +-SignedResponseHeader | -// +--------+ | | | | | | | | +-ResponseHeader | -// | hash#4 |--|----------------+ | | | | | | +-SignedRequestHeader | -// +--------+ | | | | | | | | | +-RequestHeader | -// | ... | | | | | | | | | | | +-... | -// | | | | | | | | | | +-SeqNo: seq#1 | -// | | | | | | | | | | +-... | -// | | | | | | | | | +-Hash: hash#2 | -// | | | | | | | | | +-Signee ====> pubk#2 | -// | | | | | | | | | +-Signature => sign#2 | -// seqIndex | | | | +----------------+ | | | | +-... | -// +------------------------------+->| requestTracker | | | | +-Hash: hash#3 | -// | ... | | | | | | +-response |---+ | | | +-signee ====> pubk#3 | -// +--------+ | | | | | +-ack (nil) | | | | | +-Signature => sign#3 | -// | seq#0 |--------+ | | | | +-... | | | | +-... | -// +--------+ | | | +----------------+ | | +-Hash: hash#4 | -// | ... | | | | | | +-Signee ====> pubk#2 | -// +--------+ +--------------+ | | | | | +-Signature => sign#4 | -// | seq#1 |---------->| queryTracker | | | | | +-------------------------------+ -// +--------+ | +-firstAck |--+ | | | -// | ... | | +-queries | | | | -// | +-[0] |----+ | | -// | +-[1] |------+ | +---------------------------+ -// | +-... | +-->| SignedResponseHeader | -// +--------------+ | +-ResponseHeader | -// | | +-SignedRequestHeader | -// | | | +-RequestHeader | -// | | | | +-... | -// | | | | +-SeqNo: seq#1 | -// | | | | +-... | -// | | | +-Hash: hash#5 | -// | | | +-Signee ====> pubk#5 | -// | | | +-Signature => sign#5 | -// | | +-... | -// | +-Hash: hash#6 | -// | +-Signee ====> pubk#6 | -// | +-Signature => sign#6 | -// +---------------------------+ -// -type multiIndex struct { - sync.Mutex - respIndex, ackIndex hashIndex - seqIndex -} - -// newMultiIndex returns a new multiIndex reference. -func newMultiIndex() *multiIndex { - return &multiIndex{ - respIndex: make(map[hash.Hash]*requestTracker), - ackIndex: make(map[hash.Hash]*requestTracker), - seqIndex: make(map[types.QueryKey]*queryTracker), - } -} - -// addResponse adds the responsed query to the index. -func (i *multiIndex) addResponse(resp *types.SignedResponseHeader) (err error) { - i.Lock() - defer i.Unlock() - - if v, ok := i.respIndex[resp.Hash()]; ok { - if v == nil || v.response == nil { - // TODO(leventeliu): consider to panic. - err = ErrCorruptedIndex - return - } - - // Given that `resp` is already verified by user, its header should be deeply equal to - // v.response.ResponseHeader. - // Considering that we may allow a node to update its key pair on-the-fly, just overwrite - // this response. - v.response = resp - return - } - - // Create new item - s := &requestTracker{ - response: resp, - } - - i.respIndex[resp.Hash()] = s - q := i.seqIndex.ensure(resp.Request.GetQueryKey()) - q.queries = append(q.queries, s) - - return nil -} - -// addAck adds the acknowledged query to the index. -func (i *multiIndex) addAck(ack *types.SignedAckHeader) (err error) { - i.Lock() - defer i.Unlock() - var v *requestTracker - var ok bool - q := i.seqIndex.ensure(ack.SignedRequestHeader().GetQueryKey()) - - if v, ok = i.respIndex[ack.ResponseHash()]; ok { - if v == nil || v.response == nil { - // TODO(leventeliu): consider to panic. - err = ErrCorruptedIndex - return - } - - // Add hash -> ack index anyway, so that we can find the request tracker later, even if - // there is a earlier acknowledgement for the same request - i.ackIndex[ack.Hash()] = v - - // This also updates the item indexed by ackIndex and seqIndex - var isNew bool - - if isNew, err = v.updateAck(ack); err != nil { - return - } - - if isNew { - q.queries = append(q.queries, v) - } - } else { - // Build new queryTracker and update both indexes - v = &requestTracker{ - response: ack.SignedResponseHeader(), - ack: ack, - } - - i.respIndex[ack.ResponseHash()] = v - i.ackIndex[ack.Hash()] = v - q.queries = append(q.queries, v) - } - - // TODO(leventeliu): - // This query has multiple signed acknowledgements. It may be caused by a network problem. - // We will keep the first ack counted anyway. But, should we report it to someone? - if q.firstAck == nil { - q.firstAck = v - } else if q.firstAck.ack.Hash() != ack.Hash() { - err = ErrMultipleAckOfSeqNo - } - - return -} - -func (i *multiIndex) getAck(header *hash.Hash) (ack *types.SignedAckHeader, ok bool) { - i.Lock() - defer i.Unlock() - - var t *requestTracker - if t, ok = i.ackIndex[*header]; ok { - ack = t.ack - } - - return -} - -// setSignedBlock sets the signed block of the acknowledged query. -func (i *multiIndex) setSignedBlock(blockHash *hash.Hash, ackHeaderHash *hash.Hash) { - i.Lock() - defer i.Unlock() - - if v, ok := i.ackIndex[*ackHeaderHash]; ok { - v.signedBlock = blockHash - } -} - -// resetSignedBlock resets the signed block of the acknowledged query. -func (i *multiIndex) resetSignedBlock(blockHash *hash.Hash, ackHeaderHash *hash.Hash) { - i.Lock() - defer i.Unlock() - - if v, ok := i.ackIndex[*ackHeaderHash]; ok { - // TODO(leventeliu): check if v.signedBlock equals blockHash. - v.signedBlock = nil - } -} - -// checkBeforeExpire checks the index and does some necessary work before it expires. -func (i *multiIndex) checkBeforeExpire() { - i.Lock() - defer i.Unlock() - - for _, q := range i.seqIndex { - if ack := q.firstAck; ack == nil { - // TODO(leventeliu): - // This query is not acknowledged and expires now. - } else if ack.signedBlock == nil || ack.signedBlock == placeHolder { - // TODO(leventeliu): - // This query was acknowledged normally but collectors didn't pack it in any block. - // There is definitely something wrong with them. - } - - for _, s := range q.queries { - if s != q.firstAck { - // TODO(leventeliu): so these guys lost the competition in this query. Should we - // do something about it? - } - } - } -} - -// checkAckFromBlock checks a acknowledged query from a block in this index. -func (i *multiIndex) checkAckFromBlock(b *hash.Hash, ack *hash.Hash) (isKnown bool, err error) { - i.Lock() - defer i.Unlock() - - // Check acknowledgement - q, isKnown := i.ackIndex[*ack] - - if !isKnown { - return - } - - if q.signedBlock != nil && !q.signedBlock.IsEqual(b) { - err = ErrQuerySignedByAnotherBlock - log.WithFields(log.Fields{ - "query": ack.String(), - "block": b.String(), - "signed_block": q.signedBlock.String(), - }).WithError(err).Error( - "Failed to check acknowledgement from block") - return - } - - qs := i.seqIndex[q.ack.SignedRequestHeader().GetQueryKey()] - - // Check it as a first acknowledgement - if i.respIndex[q.response.Hash()] != q || qs == nil || qs.firstAck == nil { - err = ErrCorruptedIndex - return - } - - // If `q` is not considered first acknowledgement of this query locally - if qs.firstAck != q { - if qs.firstAck.signedBlock != nil { - err = ErrQuerySignedByAnotherBlock - log.WithFields(log.Fields{ - "query": ack.String(), - "block": b.String(), - "signed_block": func() string { - if q.signedBlock != nil { - return q.signedBlock.String() - } - return "nil" - }(), - }).WithError(err).Error( - "Failed to check acknowledgement from block") - return - } - - // But if the acknowledgement is not signed yet, it is also acceptable to promote another - // acknowledgement - qs.firstAck = q - } - - return -} - -// markAndCollectUnsignedAcks marks and collects all the unsigned acknowledgements in the index. -func (i *multiIndex) markAndCollectUnsignedAcks(qs *[]*hash.Hash) { - i.Lock() - defer i.Unlock() - - for _, q := range i.seqIndex { - if ack := q.firstAck; ack != nil && ack.signedBlock == nil { - ack.signedBlock = placeHolder - ackHash := ack.ack.Hash() - *qs = append(*qs, &ackHash) - } - } -} - -// heightIndex defines a MultiIndex index using height as key. -type heightIndex struct { - sync.Mutex - index map[int32]*multiIndex -} - -// ensureHeight returns the *MultiIndex associated with the given height. It creates a new item if -// the key doesn't exist. -func (i *heightIndex) ensureHeight(h int32) (v *multiIndex) { - i.Lock() - defer i.Unlock() - v, ok := i.index[h] - - if !ok { - v = newMultiIndex() - i.index[h] = v - } - - return -} - -// ensureRange creates new *multiIndex items associated within the given height range [l, h) for -// those don't exist. -func (i *heightIndex) ensureRange(l, h int32) { - i.Lock() - defer i.Unlock() - - for x := l; x < h; x++ { - if _, ok := i.index[x]; !ok { - i.index[x] = newMultiIndex() - } - } -} - -func (i *heightIndex) get(k int32) (v *multiIndex, ok bool) { - i.Lock() - defer i.Unlock() - v, ok = i.index[k] - return -} - -func (i *heightIndex) del(k int32) { - i.Lock() - defer i.Unlock() - delete(i.index, k) -} - -// queryIndex defines a query index maintainer. -type queryIndex struct { - heightIndex *heightIndex - - sync.Mutex - barrier int32 -} - -func (i *queryIndex) getBarrier() int32 { - i.Lock() - defer i.Unlock() - return i.barrier -} - -func (i *queryIndex) setBarrier(b int32) { - i.Lock() - defer i.Unlock() - i.barrier = b -} - -// newQueryIndex returns a new queryIndex reference. -func newQueryIndex() *queryIndex { - return &queryIndex{ - heightIndex: &heightIndex{ - index: make(map[int32]*multiIndex), - }, - } -} - -// addResponse adds the responsed query to the index. -func (i *queryIndex) addResponse(h int32, resp *types.SignedResponseHeader) error { - // TODO(leventeliu): we should ensure that the Request uses coordinated timestamp, instead of - // any client local time. - return i.heightIndex.ensureHeight(h).addResponse(resp) -} - -// addAck adds the acknowledged query to the index. -func (i *queryIndex) addAck(h int32, ack *types.SignedAckHeader) error { - return i.heightIndex.ensureHeight(h).addAck(ack) -} - -// checkAckFromBlock checks a acknowledged query from a block at the given height. -func (i *queryIndex) checkAckFromBlock(h int32, b *hash.Hash, ack *hash.Hash) ( - isKnown bool, err error) { - l := i.getBarrier() - - if h < l { - err = errors.Wrapf(ErrQueryExpired, "check Ack, height %d, barrier %d", h, l) - return - } - - for x := l; x <= h; x++ { - if hi, ok := i.heightIndex.get(x); ok { - if isKnown, err = hi.checkAckFromBlock(b, ack); err != nil || isKnown { - return - } - } - } - - return -} - -// setSignedBlock updates the signed block in index for the acknowledged queries in the block. -func (i *queryIndex) setSignedBlock(h int32, block *types.Block) { - b := i.getBarrier() - - for _, v := range block.Acks { - for x := b; x <= h; x++ { - if hi, ok := i.heightIndex.get(x); ok { - ackHash := v.Hash() - hi.setSignedBlock(block.BlockHash(), &ackHash) - } - } - } -} - -func (i *queryIndex) resetSignedBlock(h int32, block *types.Block) { - b := i.getBarrier() - - for _, v := range block.Acks { - for x := b; x <= h; x++ { - if hi, ok := i.heightIndex.get(x); ok { - ackHash := v.Hash() - hi.resetSignedBlock(block.BlockHash(), &ackHash) - } - } - } -} - -// getAck gets the acknowledged queries from the index. -func (i *queryIndex) getAck(h int32, header *hash.Hash) (ack *types.SignedAckHeader, err error) { - b := i.getBarrier() - - if h < b { - err = errors.Wrapf(ErrQueryExpired, "get Ack, height %d, barrier %d", h, b) - return - } - - for x := b; x <= h; x++ { - if hi, ok := i.heightIndex.get(x); ok { - if ack, ok = hi.getAck(header); ok { - return - } - } - } - - err = ErrQueryNotCached - return -} - -// advanceBarrier moves barrier to given height. All buckets lower than this height will be set as -// expired, and all the queries which are not packed in these buckets will be reported. -func (i *queryIndex) advanceBarrier(height int32) { - b := i.getBarrier() - i.setBarrier(height) - - for x := b; x < height; x++ { - if hi, ok := i.heightIndex.get(x); ok { - hi.checkBeforeExpire() - i.heightIndex.del(x) - } - } -} - -// markAndCollectUnsignedAcks marks and collects all the unsigned acknowledgements which can be -// signed by a block at the given height. -func (i *queryIndex) markAndCollectUnsignedAcks(height int32) (qs []*hash.Hash) { - b := i.getBarrier() - qs = make([]*hash.Hash, 0, 1024) - - for x := b; x < height; x++ { - if hi, ok := i.heightIndex.get(x); ok { - hi.markAndCollectUnsignedAcks(&qs) - } - } - - return -} diff --git a/sqlchain/queryindex_test.go b/sqlchain/queryindex_test.go deleted file mode 100644 index a1889a3fa..000000000 --- a/sqlchain/queryindex_test.go +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Copyright 2018 The CovenantSQL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package sqlchain - -import ( - "testing" - - "github.com/CovenantSQL/CovenantSQL/crypto/hash" - "github.com/pkg/errors" -) - -const ( - testBucketNumber = 10 - testQueryNumberPerHeight = 10 - testClientNumber = 10 - testWorkerNumber = 10 - testQueryWorkerNumber = 3 -) - -func (i *heightIndex) mustGet(k int32) *multiIndex { - i.Lock() - defer i.Unlock() - return i.index[k] -} - -func TestCorruptedIndex(t *testing.T) { - ack, err := createRandomNodesAndAck() - - if err != nil { - t.Fatalf("Error occurred: %v", err) - } - - resp := ack.SignedResponseHeader() - - // Create index - qi := newQueryIndex() - - if err = qi.addResponse(0, resp); err != nil { - t.Fatalf("Error occurred: %v", err) - } - - if err = qi.addAck(0, ack); err != nil { - t.Fatalf("Error occurred: %v", err) - } - - // Test repeatedly add - if err = qi.addResponse(0, resp); err != nil { - t.Fatalf("Error occurred: %v", err) - } - - if err = qi.addAck(0, ack); err != nil { - t.Fatalf("Error occurred: %v", err) - } - - // Test corrupted index - qi.heightIndex.mustGet(0).respIndex[resp.Hash()].response = nil - - if err = qi.addResponse(0, resp); err != ErrCorruptedIndex { - t.Fatalf("Unexpected error: %v", err) - } - - if err = qi.addAck(0, ack); err != ErrCorruptedIndex { - t.Fatalf("Unexpected error: %v", err) - } - - qi.heightIndex.mustGet(0).respIndex[resp.Hash()] = nil - - if err = qi.addResponse(0, resp); err != ErrCorruptedIndex { - t.Fatalf("Unexpected error: %v", err) - } - - if err = qi.addAck(0, ack); err != ErrCorruptedIndex { - t.Fatalf("Unexpected error: %v", err) - } -} - -func TestSingleAck(t *testing.T) { - ack, err := createRandomNodesAndAck() - - if err != nil { - t.Fatalf("Error occurred: %v", err) - } - - qi := newQueryIndex() - - if err = qi.addAck(0, ack); err != nil { - t.Fatalf("Error occurred: %v", err) - } - - // Check not signed ack - qi.heightIndex.mustGet(0).checkBeforeExpire() -} - -func TestEnsureRange(t *testing.T) { - qi := newQueryIndex() - qi.heightIndex.ensureRange(0, 10) - - for i := 0; i < 10; i++ { - if _, ok := qi.heightIndex.get(int32(i)); !ok { - t.Fatalf("Failed to ensure height %d", i) - } - } -} - -func TestCheckAckFromBlock(t *testing.T) { - var height int32 = 10 - qi := newQueryIndex() - qi.advanceBarrier(height) - b1, err := createRandomBlock(genesisHash, false) - - if err != nil { - t.Fatalf("Error occurred: %v", err) - } - - ackHash := b1.Acks[0].Hash() - if _, err := qi.checkAckFromBlock( - 0, b1.BlockHash(), &ackHash, - ); errors.Cause(err) != ErrQueryExpired { - t.Fatalf("Unexpected error: %v", err) - } - - if isKnown, err := qi.checkAckFromBlock( - height, b1.BlockHash(), &ackHash, - ); err != nil { - t.Fatalf("Error occurred: %v", err) - } else if isKnown { - t.Fatal("Unexpected result: index should not know this query") - } - - // Create a group of query for test - cli, err := newRandomNode() - - if err != nil { - t.Fatalf("Error occurred: %v", err) - } - - worker1, err := newRandomNode() - - if err != nil { - t.Fatalf("Error occurred: %v", err) - } - - worker2, err := newRandomNode() - - if err != nil { - t.Fatalf("Error occurred: %v", err) - } - - req, err := createRandomQueryRequest(cli) - - if err != nil { - t.Fatalf("Error occurred: %v", err) - } - - resp1, err := createRandomQueryResponseWithRequest(req, worker1) - - if err != nil { - t.Fatalf("Error occurred: %v", err) - } - - ack1, err := createRandomQueryAckWithResponse(resp1, cli) - - if err != nil { - t.Fatalf("Error occurred: %v", err) - } - - resp2, err := createRandomQueryResponseWithRequest(req, worker2) - - if err != nil { - t.Fatalf("Error occurred: %v", err) - } - - ack2, err := createRandomQueryAckWithResponse(resp2, cli) - - if err != nil { - t.Fatalf("Error occurred: %v", err) - } - - // Test a query signed by another block - if err = qi.addAck(height, ack1); err != nil { - t.Fatalf("Error occurred: %v", err) - } - - if err = qi.addAck(height, ack2); err != ErrMultipleAckOfSeqNo { - t.Fatalf("Unexpected error: %v", err) - } - - b2, err := createRandomBlock(genesisHash, false) - - if err != nil { - t.Fatalf("Error occurred: %v", err) - } - - b1.Acks[0] = ack1 - b2.Acks[0] = ack2 - ack1Hash := ack1.Hash() - qi.setSignedBlock(height, b1) - - if _, err := qi.checkAckFromBlock( - height, b2.BlockHash(), &ack1Hash, - ); err != ErrQuerySignedByAnotherBlock { - t.Fatalf("Unexpected error: %v", err) - } - - // Test checking same ack signed by another block - ack2Hash := ack2.Hash() - b2.Acks[0] = ack2 - - if _, err = qi.checkAckFromBlock( - height, b2.BlockHash(), &ack2Hash, - ); err != ErrQuerySignedByAnotherBlock { - t.Fatalf("Unexpected error: %v", err) - } - - // Revert index state for the first block, and test checking again - qi.heightIndex.mustGet(height).seqIndex[req.GetQueryKey()].firstAck.signedBlock = nil - - if _, err = qi.checkAckFromBlock( - height, b2.BlockHash(), &ack2Hash, - ); err != nil { - t.Fatalf("Error occurred: %v", err) - } - - // Test corrupted index - qi.heightIndex.mustGet(height).seqIndex[req.GetQueryKey()] = nil - - if _, err = qi.checkAckFromBlock( - height, b2.BlockHash(), &ack2Hash, - ); err != ErrCorruptedIndex { - t.Fatalf("Unexpected error: %v", err) - } -} - -func TestGetAck(t *testing.T) { - qi := newQueryIndex() - qh := &hash.Hash{} - - if _, err := qi.getAck(-1, qh); errors.Cause(err) != ErrQueryExpired { - t.Fatalf("Unexpected error: %v", err) - } - - if _, err := qi.getAck(0, qh); errors.Cause(err) != ErrQueryNotCached { - t.Fatalf("Unexpected error: %v", err) - } -} diff --git a/sqlchain/rpc.go b/sqlchain/rpc.go index c30029053..5ed518267 100644 --- a/sqlchain/rpc.go +++ b/sqlchain/rpc.go @@ -67,17 +67,6 @@ type FetchBlockResp struct { Block *types.Block } -// FetchAckedQueryReq defines a request of the FetchAckedQuery RPC method. -type FetchAckedQueryReq struct { - Height int32 - SignedAckedHash *hash.Hash -} - -// FetchAckedQueryResp defines a request of the FetchAckedQuery RPC method. -type FetchAckedQueryResp struct { - Ack *types.SignedAckHeader -} - // SignBillingReq defines a request of the SignBilling RPC method. type SignBillingReq struct { pt.BillingRequest @@ -143,13 +132,6 @@ func (s *ChainRPCService) FetchBlock(req *FetchBlockReq, resp *FetchBlockResp) ( return } -// FetchAckedQuery is the RPC method to fetch a known block from the target server. -func (s *ChainRPCService) FetchAckedQuery(req *FetchAckedQueryReq, resp *FetchAckedQueryResp, -) (err error) { - resp.Ack, err = s.chain.FetchAckedQuery(req.Height, req.SignedAckedHash) - return -} - // SignBilling is the RPC method to get signature for a billing request from the target server. func (s *ChainRPCService) SignBilling(req *SignBillingReq, resp *SignBillingResp) (err error) { resp.HeaderHash = req.BillingRequest.RequestHash diff --git a/xenomint/pool.go b/xenomint/pool.go index de17f3abb..38a1f6511 100644 --- a/xenomint/pool.go +++ b/xenomint/pool.go @@ -18,6 +18,7 @@ package xenomint import ( "sync" + "sync/atomic" "github.com/CovenantSQL/CovenantSQL/crypto/hash" "github.com/CovenantSQL/CovenantSQL/types" @@ -51,6 +52,9 @@ type pool struct { // Succeeded queries and their index queries []*QueryTracker index map[uint64]int + // Atomic counters for stats + failedRequestCount int32 + trackerCount int32 } func newPool() *pool { @@ -65,11 +69,13 @@ func (p *pool) enqueue(sp uint64, q *QueryTracker) { var pos = len(p.queries) p.queries = append(p.queries, q) p.index[sp] = pos + atomic.StoreInt32(&p.trackerCount, int32(len(p.queries))) return } func (p *pool) setFailed(req *types.Request) { p.failed[req.Header.Hash()] = req + atomic.StoreInt32(&p.failedRequestCount, int32(len(p.failed))) } func (p *pool) failedList() (reqs []*types.Request) { @@ -82,6 +88,7 @@ func (p *pool) failedList() (reqs []*types.Request) { func (p *pool) removeFailed(req *types.Request) { delete(p.failed, req.Header.Hash()) + atomic.StoreInt32(&p.failedRequestCount, int32(len(p.failed))) } func (p *pool) match(sp uint64, req *types.Request) bool { @@ -130,4 +137,5 @@ func (p *pool) truncate(sp uint64) { } p.index = ni p.queries = p.queries[pos+1:] + atomic.StoreInt32(&p.trackerCount, int32(len(p.queries))) } diff --git a/xenomint/state.go b/xenomint/state.go index 48283c21d..8599cf012 100644 --- a/xenomint/state.go +++ b/xenomint/state.go @@ -615,3 +615,21 @@ func (s *State) Replay(req *types.Request, resp *types.Response) (err error) { } return } + +// Stat prints the statistic message of the State object. +func (s *State) Stat(id proto.DatabaseID) { + var ( + p = func() *pool { + s.RLock() + defer s.RUnlock() + return s.pool + }() + fc = atomic.LoadInt32(&p.failedRequestCount) + tc = atomic.LoadInt32(&p.trackerCount) + ) + log.WithFields(log.Fields{ + "database_id": id, + "pooled_fail_request_count": fc, + "pooled_query_tracker": tc, + }).Info("Xeno pool stats") +} diff --git a/xenomint/state_test.go b/xenomint/state_test.go index 4a05d4945..783142a15 100644 --- a/xenomint/state_test.go +++ b/xenomint/state_test.go @@ -36,6 +36,7 @@ import ( func TestState(t *testing.T) { Convey("Given a chain state object", t, func() { var ( + id1 = proto.DatabaseID("db-x1") fl1 = path.Join(testingDataDir, fmt.Sprint(t.Name(), "x1")) fl2 = path.Join(testingDataDir, fmt.Sprint(t.Name(), "x2")) st1, st2 *State @@ -156,6 +157,7 @@ func TestState(t *testing.T) { })) So(err, ShouldNotBeNil) So(resp, ShouldBeNil) + st1.Stat(id1) err = st1.Replay(buildRequest(types.WriteQuery, []types.Query{ buildQuery(`XXXXXX INTO t1 (k, v) VALUES (?, ?)`, values[0]...), }), &types.Response{ @@ -171,6 +173,7 @@ func TestState(t *testing.T) { })) So(err, ShouldNotBeNil) So(resp, ShouldBeNil) + st1.Stat(id1) err = st1.Replay(buildRequest(types.WriteQuery, []types.Query{ buildQuery(`INSERT INTO t2 (k, v) VALUES (?, ?)`, values[0]...), }), &types.Response{ @@ -181,21 +184,25 @@ func TestState(t *testing.T) { }, }) So(err, ShouldNotBeNil) + st1.Stat(id1) _, resp, err = st1.Query(buildRequest(types.ReadQuery, []types.Query{ buildQuery(`XXXXXX v FROM t1`), })) So(err, ShouldNotBeNil) So(resp, ShouldBeNil) + st1.Stat(id1) _, resp, err = st1.Query(buildRequest(types.ReadQuery, []types.Query{ buildQuery(`SELECT v FROM t2`), })) So(err, ShouldNotBeNil) So(resp, ShouldBeNil) + st1.Stat(id1) _, resp, err = st1.read(buildRequest(types.ReadQuery, []types.Query{ buildQuery(`SELECT v FROM t2`), })) So(err, ShouldNotBeNil) So(resp, ShouldBeNil) + st1.Stat(id1) }) Convey("The state should work properly with reading/writing queries", func() { _, resp, err = st1.Query(buildRequest(types.WriteQuery, []types.Query{ @@ -213,6 +220,7 @@ func TestState(t *testing.T) { DeclTypes: []string{"TEXT"}, Rows: []types.ResponseRow{{Values: values[0][1:]}}, }) + st1.Stat(id1) _, resp, err = st1.Query(buildRequest(types.WriteQuery, []types.Query{ buildQuery(`INSERT INTO t1 (k, v) VALUES (?, ?)`, values[1]...), @@ -236,6 +244,7 @@ INSERT INTO t1 (k, v) VALUES (?, ?)`, concat(values[2:4])...), {Values: values[3][1:]}, }, }) + st1.Stat(id1) _, resp, err = st1.Query(buildRequest(types.ReadQuery, []types.Query{ buildQuery(`SELECT * FROM t1`), @@ -251,6 +260,7 @@ INSERT INTO t1 (k, v) VALUES (?, ?)`, concat(values[2:4])...), {Values: values[3][:]}, }, }) + st1.Stat(id1) // Test show statements _, resp, err = st1.Query(buildRequest(types.ReadQuery, []types.Query{ @@ -273,6 +283,7 @@ INSERT INTO t1 (k, v) VALUES (?, ?)`, concat(values[2:4])...), })) So(err, ShouldBeNil) So(resp, ShouldNotBeNil) + st1.Stat(id1) // Also test a non-transaction read implementation _, resp, err = st1.read(buildRequest(types.ReadQuery, []types.Query{ @@ -289,6 +300,7 @@ INSERT INTO t1 (k, v) VALUES (?, ?)`, concat(values[2:4])...), {Values: values[3][:]}, }, }) + st1.Stat(id1) }) Convey("The state should skip read query while replaying", func() { err = st1.Replay(buildRequest(types.ReadQuery, []types.Query{