Skip to content

Fix memory exhausting issue #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ func (c *conn) sendQuery(queryType types.QueryType, queries []types.Query) (affe
return
}

c.pCaller = rpc.NewPersistentCaller(peers.Leader)
var response types.Response
if err = c.pCaller.Call(route.DBSQuery.String(), req, &response); err != nil {
return
Expand Down
15 changes: 15 additions & 0 deletions cmd/cql-minerd/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"os"
"os/exec"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -443,6 +444,13 @@ func benchDB(b *testing.B, db *sql.DB, createDB bool) {
})
})

routineCount := runtime.NumGoroutine()
if routineCount > 100 {
b.Errorf("go routine count: %d", routineCount)
} else {
log.Infof("go routine count: %d", routineCount)
}

rowCount := db.QueryRow("SELECT COUNT(1) FROM " + TABLENAME)
var count int64
err = rowCount.Scan(&count)
Expand Down Expand Up @@ -473,6 +481,13 @@ func benchDB(b *testing.B, db *sql.DB, createDB bool) {
})
})

routineCount = runtime.NumGoroutine()
if routineCount > 100 {
b.Errorf("go routine count: %d", routineCount)
} else {
log.Infof("go routine count: %d", routineCount)
}

//row := db.QueryRow("SELECT nonIndexedColumn FROM test LIMIT 1")

//var result int
Expand Down
6 changes: 1 addition & 5 deletions route/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,10 @@ const (
SQLCAdviseNewBlock
// SQLCAdviseBinLog is usd by sqlchain to advise binlog between adjacent node
SQLCAdviseBinLog
// SQLCAdviseResponsedQuery is used by sqlchain to advice response query between adjacent node
// SQLCAdviseAckedQuery is used by sqlchain to advice response query between adjacent node
SQLCAdviseAckedQuery
// SQLCFetchBlock is used by sqlchain to fetch block from adjacent nodes
SQLCFetchBlock
// SQLCFetchAckedQuery is used by sqlchain to fetch response ack from adjacent nodes
SQLCFetchAckedQuery
// SQLCSignBilling is used by sqlchain to response billing signature for periodic billing request
SQLCSignBilling
// SQLCLaunchBilling is used by blockproducer to trigger the billing process in sqlchain
Expand Down Expand Up @@ -179,8 +177,6 @@ func (s RemoteFunc) String() string {
return "SQLC.AdviseAckedQuery"
case SQLCFetchBlock:
return "SQLC.FetchBlock"
case SQLCFetchAckedQuery:
return "SQLC.FetchAckedQuery"
case SQLCSignBilling:
return "SQLC.SignBilling"
case SQLCLaunchBilling:
Expand Down
19 changes: 18 additions & 1 deletion sqlchain/ackindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ package sqlchain

import (
"sync"
"sync/atomic"

"github.com/CovenantSQL/CovenantSQL/types"
"github.com/CovenantSQL/CovenantSQL/utils/log"
"github.com/pkg/errors"
)

var (
// Global atomic counters for stats
multiIndexCount int32
responseCount int32
ackTrackerCount int32
)

type ackTracker struct {
resp *types.SignedResponseHeader
ack *types.SignedAckHeader
Expand All @@ -48,6 +56,7 @@ func (i *multiAckIndex) addResponse(resp *types.SignedResponseHeader) (err error
return
}
i.ri[key] = resp
atomic.AddInt32(&responseCount, 1)
return
}

Expand All @@ -70,16 +79,19 @@ func (i *multiAckIndex) register(ack *types.SignedAckHeader) (err error) {
resp: resp,
ack: ack,
}
atomic.AddInt32(&responseCount, -1)
atomic.AddInt32(&ackTrackerCount, 1)
return
}

func (i *multiAckIndex) remove(ack *types.SignedAckHeader) (err error) {
var key = ack.SignedRequestHeader().GetQueryKey()
log.Debugf("Removing key %s <-- ack %s", &key, ack.Hash())
log.Debugf("Removing key %s -x- ack %s", &key, ack.Hash())
i.Lock()
defer i.Unlock()
if _, ok := i.ri[key]; ok {
delete(i.ri, key)
atomic.AddInt32(&responseCount, -1)
return
}
if oack, ok := i.qi[key]; ok {
Expand All @@ -89,6 +101,7 @@ func (i *multiAckIndex) remove(ack *types.SignedAckHeader) (err error) {
return
}
delete(i.qi, key)
atomic.AddInt32(&ackTrackerCount, -1)
return
}
err = errors.Wrapf(ErrQueryNotFound, "remove key %s -x- ack %s", &key, ack.Hash())
Expand Down Expand Up @@ -162,6 +175,7 @@ func (i *ackIndex) load(h int32) (mi *multiAckIndex, err error) {
qi: make(map[types.QueryKey]*ackTracker),
}
i.hi[h] = mi
atomic.AddInt32(&multiIndexCount, 1)
}
return
}
Expand All @@ -180,7 +194,10 @@ func (i *ackIndex) advance(h int32) {
// Record expired and not acknowledged queries
for _, v := range dl {
v.expire()
atomic.AddInt32(&responseCount, int32(-len(v.ri)))
atomic.AddInt32(&ackTrackerCount, int32(-len(v.qi)))
}
atomic.AddInt32(&multiIndexCount, int32(-len(dl)))
}

func (i *ackIndex) addResponse(h int32, resp *types.SignedResponseHeader) (err error) {
Expand Down
9 changes: 2 additions & 7 deletions sqlchain/blockindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,14 @@ func (n *blockNode) indexKey() (key []byte) {
}

type blockIndex struct {
cfg *Config

mu sync.RWMutex
index map[hash.Hash]*blockNode
}

func newBlockIndex(cfg *Config) (index *blockIndex) {
index = &blockIndex{
cfg: cfg,
func newBlockIndex() (index *blockIndex) {
return &blockIndex{
index: make(map[hash.Hash]*blockNode),
}

return index
}

func (i *blockIndex) addBlock(newBlock *blockNode) {
Expand Down
6 changes: 2 additions & 4 deletions sqlchain/blockindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ func TestInitBlockNode(t *testing.T) {
}

func TestAncestor(t *testing.T) {
cfg := &Config{}
index := newBlockIndex(cfg)
index := newBlockIndex()
parent := (*blockNode)(nil)

for h, b := range testBlocks {
Expand Down Expand Up @@ -142,8 +141,7 @@ func TestAncestor(t *testing.T) {
}

func TestIndex(t *testing.T) {
cfg := &Config{}
index := newBlockIndex(cfg)
index := newBlockIndex()
parent := (*blockNode)(nil)

for h, b := range testBlocks {
Expand Down
Loading