Skip to content

Commit 0e5f056

Browse files
authored
Merge pull request #240 from CovenantSQL/feature/beta_trace
Improve database query performance
2 parents c910822 + 19eb4fe commit 0e5f056

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+2629
-2130
lines changed

alltest.sh

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,15 @@ set -o errexit
44
set -o pipefail
55
set -o nounset
66

7-
test::package() {
8-
local package="${1:-notset}"
9-
10-
if [[ "${package}" == "notset" ]]; then
11-
&>2 echo "empty package name"
12-
exit 1
13-
fi
14-
15-
local coverage_file="${package//\//.}.cover.out"
16-
echo "[TEST] package=${package}, coverage=${coverage_file}"
17-
go test -tags "$UNITTESTTAGS" -race -failfast -parallel 16 -cpu 16 -coverpkg="github.com/CovenantSQL/CovenantSQL/..." -coverprofile "${coverage_file}" "${package}"
18-
}
19-
207
main() {
218
make clean
229
make -j6 bp miner observer
2310

24-
# test package by package
25-
for package in $(go list ./... | grep -v "/vendor/"); do
26-
test::package "${package}"
27-
done
11+
go test -tags "$UNITTESTTAGS" -race -failfast -parallel 16 -cpu 16 -coverprofile main.cover.out $(go list ./... | grep -v CovenantSQL/api)
12+
go test -tags "$UNITTESTTAGS" -race -failfast -parallel 16 -cpu 16 -coverpkg ./api/...,./rpc/jsonrpc -coverprofile api.cover.out ./api/...
2813

2914
set -x
30-
gocovmerge *.cover.out $(find cmd -name "*.cover.out") | grep -F -v '_gen.go' > coverage.txt && rm -f *.cover.out
15+
gocovmerge main.cover.out api.cover.out $(find cmd -name "*.cover.out") | grep -F -v '_gen.go' > coverage.txt && rm -f *.cover.out
3116
bash <(curl -s https://codecov.io/bash)
3217

3318
# some benchmarks

client/conn.go

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/CovenantSQL/CovenantSQL/rpc"
3232
"github.com/CovenantSQL/CovenantSQL/types"
3333
"github.com/CovenantSQL/CovenantSQL/utils/log"
34+
"github.com/CovenantSQL/CovenantSQL/utils/trace"
3435
"github.com/pkg/errors"
3536
)
3637

@@ -150,7 +151,7 @@ ackWorkerLoop:
150151
oneTime.Do(func() {
151152
pc = rpc.NewPersistentCaller(c.pCaller.TargetID)
152153
})
153-
if err = ack.Sign(c.parent.privKey, false); err != nil {
154+
if err = ack.Sign(c.parent.privKey); err != nil {
154155
log.WithField("target", pc.TargetID).WithError(err).Error("failed to sign ack")
155156
continue
156157
}
@@ -164,7 +165,7 @@ ackWorkerLoop:
164165
}
165166

166167
if pc != nil {
167-
pc.CloseStream()
168+
pc.Close()
168169
}
169170

170171
log.Debug("ack worker quiting")
@@ -173,7 +174,7 @@ ackWorkerLoop:
173174
func (c *pconn) close() error {
174175
c.stopAckWorkers()
175176
if c.pCaller != nil {
176-
c.pCaller.CloseStream()
177+
c.pCaller.Close()
177178
}
178179
return nil
179180
}
@@ -237,6 +238,8 @@ func (c *conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, e
237238

238239
// ExecContext implements the driver.ExecerContext.ExecContext method.
239240
func (c *conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (result driver.Result, err error) {
241+
defer trace.StartRegion(ctx, "dbExec").End()
242+
240243
if atomic.LoadInt32(&c.closed) != 0 {
241244
err = driver.ErrBadConn
242245
return
@@ -246,7 +249,7 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
246249
sq := convertQuery(query, args)
247250

248251
var affectedRows, lastInsertID int64
249-
if affectedRows, lastInsertID, _, err = c.addQuery(types.WriteQuery, sq); err != nil {
252+
if affectedRows, lastInsertID, _, err = c.addQuery(ctx, types.WriteQuery, sq); err != nil {
250253
return
251254
}
252255

@@ -260,14 +263,16 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
260263

261264
// QueryContext implements the driver.QueryerContext.QueryContext method.
262265
func (c *conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (rows driver.Rows, err error) {
266+
defer trace.StartRegion(ctx, "dbQuery").End()
267+
263268
if atomic.LoadInt32(&c.closed) != 0 {
264269
err = driver.ErrBadConn
265270
return
266271
}
267272

268273
// TODO(xq262144): make use of the ctx argument
269274
sq := convertQuery(query, args)
270-
_, _, rows, err = c.addQuery(types.ReadQuery, sq)
275+
_, _, rows, err = c.addQuery(ctx, types.ReadQuery, sq)
271276

272277
return
273278
}
@@ -289,7 +294,7 @@ func (c *conn) Commit() (err error) {
289294

290295
if len(c.queries) > 0 {
291296
// send query
292-
if _, _, _, err = c.sendQuery(types.WriteQuery, c.queries); err != nil {
297+
if _, _, _, err = c.sendQuery(context.Background(), types.WriteQuery, c.queries); err != nil {
293298
return
294299
}
295300
}
@@ -319,7 +324,7 @@ func (c *conn) Rollback() error {
319324
return nil
320325
}
321326

322-
func (c *conn) addQuery(queryType types.QueryType, query *types.Query) (affectedRows int64, lastInsertID int64, rows driver.Rows, err error) {
327+
func (c *conn) addQuery(ctx context.Context, queryType types.QueryType, query *types.Query) (affectedRows int64, lastInsertID int64, rows driver.Rows, err error) {
323328
if c.inTransaction {
324329
// check query type, enqueue query
325330
if queryType == types.ReadQuery {
@@ -344,10 +349,10 @@ func (c *conn) addQuery(queryType types.QueryType, query *types.Query) (affected
344349
"args": query.Args,
345350
}).Debug("execute query")
346351

347-
return c.sendQuery(queryType, []types.Query{*query})
352+
return c.sendQuery(ctx, queryType, []types.Query{*query})
348353
}
349354

350-
func (c *conn) sendQuery(queryType types.QueryType, queries []types.Query) (affectedRows int64, lastInsertID int64, rows driver.Rows, err error) {
355+
func (c *conn) sendQuery(ctx context.Context, queryType types.QueryType, queries []types.Query) (affectedRows int64, lastInsertID int64, rows driver.Rows, err error) {
351356
var uc *pconn // peer connection used to execute the queries
352357

353358
uc = c.leader
@@ -399,11 +404,6 @@ func (c *conn) sendQuery(queryType types.QueryType, queries []types.Query) (affe
399404
if err = uc.pCaller.Call(route.DBSQuery.String(), req, &response); err != nil {
400405
return
401406
}
402-
403-
// verify response
404-
if err = response.Verify(); err != nil {
405-
return
406-
}
407407
rows = newRows(&response)
408408

409409
if queryType == types.WriteQuery {
@@ -412,15 +412,19 @@ func (c *conn) sendQuery(queryType types.QueryType, queries []types.Query) (affe
412412
}
413413

414414
// build ack
415-
uc.ackCh <- &types.Ack{
416-
Header: types.SignedAckHeader{
417-
AckHeader: types.AckHeader{
418-
Response: response.Header,
419-
NodeID: c.localNodeID,
420-
Timestamp: getLocalTime(),
415+
func() {
416+
defer trace.StartRegion(ctx, "ackEnqueue").End()
417+
uc.ackCh <- &types.Ack{
418+
Header: types.SignedAckHeader{
419+
AckHeader: types.AckHeader{
420+
Response: response.Header.ResponseHeader,
421+
ResponseHash: response.Header.Hash(),
422+
NodeID: c.localNodeID,
423+
Timestamp: getLocalTime(),
424+
},
421425
},
422-
},
423-
}
426+
}
427+
}()
424428

425429
return
426430
}

cmd/cql-minerd/dbms.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,17 @@ import (
3838

3939
var rootHash = hash.Hash{}
4040

41-
func startDBMS(server *rpc.Server) (dbms *worker.DBMS, err error) {
41+
func startDBMS(server *rpc.Server, onCreateDB func()) (dbms *worker.DBMS, err error) {
4242
if conf.GConf.Miner == nil {
4343
err = errors.New("invalid database config")
4444
return
4545
}
4646

4747
cfg := &worker.DBMSConfig{
48-
RootDir: conf.GConf.Miner.RootDir,
49-
Server: server,
50-
MaxReqTimeGap: conf.GConf.Miner.MaxReqTimeGap,
48+
RootDir: conf.GConf.Miner.RootDir,
49+
Server: server,
50+
MaxReqTimeGap: conf.GConf.Miner.MaxReqTimeGap,
51+
OnCreateDatabase: onCreateDB,
5152
}
5253

5354
if dbms, err = worker.NewDBMS(cfg); err != nil {

cmd/cql-minerd/integration_test.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/CovenantSQL/CovenantSQL/types"
4747
"github.com/CovenantSQL/CovenantSQL/utils"
4848
"github.com/CovenantSQL/CovenantSQL/utils/log"
49+
"github.com/CovenantSQL/CovenantSQL/utils/trace"
4950
sqlite3 "github.com/CovenantSQL/go-sqlite3-encrypt"
5051
. "github.com/smartystreets/goconvey/convey"
5152
yaml "gopkg.in/yaml.v2"
@@ -647,20 +648,25 @@ func benchDB(b *testing.B, db *sql.DB, createDB bool) {
647648
ii := atomic.AddInt64(&i, 1)
648649
index := ROWSTART + ii
649650
//start := time.Now()
650-
_, err = db.Exec("INSERT INTO "+TABLENAME+" ( k, v1 ) VALUES"+
651+
652+
ctx, task := trace.NewTask(context.Background(), "BenchInsert")
653+
654+
_, err = db.ExecContext(ctx, "INSERT INTO "+TABLENAME+" ( k, v1 ) VALUES"+
651655
"(?, ?)", index, ii,
652656
)
653657
//log.Warnf("insert index = %d %v", index, time.Since(start))
654658
for err != nil && err.Error() == sqlite3.ErrBusy.Error() {
655659
// retry forever
656660
log.Warnf("index = %d retried", index)
657-
_, err = db.Exec("INSERT INTO "+TABLENAME+" ( k, v1 ) VALUES"+
661+
_, err = db.ExecContext(ctx, "INSERT INTO "+TABLENAME+" ( k, v1 ) VALUES"+
658662
"(?, ?)", index, ii,
659663
)
660664
}
661665
if err != nil {
662666
b.Fatal(err)
663667
}
668+
669+
task.End()
664670
}
665671
})
666672
})
@@ -690,16 +696,19 @@ func benchDB(b *testing.B, db *sql.DB, createDB bool) {
690696
} else { //has data before ROWSTART
691697
index = rand.Int63n(count - 1)
692698
}
699+
700+
ctx, task := trace.NewTask(context.Background(), "BenchSelect")
693701
//log.Debugf("index = %d", index)
694702
//start := time.Now()
695-
row := db.QueryRow("SELECT v1 FROM "+TABLENAME+" WHERE k = ? LIMIT 1", index)
703+
row := db.QueryRowContext(ctx, "SELECT v1 FROM "+TABLENAME+" WHERE k = ? LIMIT 1", index)
696704
//log.Warnf("select index = %d %v", index, time.Since(start))
697705
var result []byte
698706
err = row.Scan(&result)
699707
if err != nil || (len(result) == 0) {
700708
log.Errorf("index = %d", index)
701709
b.Fatal(err)
702710
}
711+
task.End()
703712
}
704713
})
705714
})

cmd/cql-minerd/main.go

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,17 @@ import (
2626
"os"
2727
"os/signal"
2828
"runtime"
29-
30-
"github.com/CovenantSQL/CovenantSQL/metric"
31-
32-
//"runtime/trace"
3329
"syscall"
3430
"time"
3531

3632
"github.com/CovenantSQL/CovenantSQL/conf"
3733
"github.com/CovenantSQL/CovenantSQL/crypto/asymmetric"
3834
"github.com/CovenantSQL/CovenantSQL/crypto/kms"
35+
"github.com/CovenantSQL/CovenantSQL/metric"
3936
"github.com/CovenantSQL/CovenantSQL/rpc"
4037
"github.com/CovenantSQL/CovenantSQL/utils"
4138
"github.com/CovenantSQL/CovenantSQL/utils/log"
39+
"github.com/CovenantSQL/CovenantSQL/utils/trace"
4240
"github.com/CovenantSQL/CovenantSQL/worker"
4341
graphite "github.com/cyberdelia/go-metrics-graphite"
4442
metrics "github.com/rcrowley/go-metrics"
@@ -199,11 +197,11 @@ func main() {
199197
}
200198
}
201199

200+
// start prometheus collector
201+
reg := metric.StartMetricCollector()
202+
202203
// start period provide service transaction generator
203204
go func() {
204-
// start prometheus collector
205-
reg := metric.StartMetricCollector()
206-
207205
tick := time.NewTicker(conf.GConf.Miner.ProvideServiceInterval)
208206
defer tick.Stop()
209207

@@ -220,7 +218,9 @@ func main() {
220218

221219
// start dbms
222220
var dbms *worker.DBMS
223-
if dbms, err = startDBMS(server); err != nil {
221+
if dbms, err = startDBMS(server, func() {
222+
sendProvideService(reg)
223+
}); err != nil {
224224
log.WithError(err).Fatal("start dbms failed")
225225
}
226226

@@ -257,22 +257,22 @@ func main() {
257257
go graphite.Graphite(metrics.DefaultRegistry, 5*time.Second, minerName, addr)
258258
}
259259

260-
//if traceFile != "" {
261-
// f, err := os.Create(traceFile)
262-
// if err != nil {
263-
// log.WithError(err).Fatal("failed to create trace output file")
264-
// }
265-
// defer func() {
266-
// if err := f.Close(); err != nil {
267-
// log.WithError(err).Fatal("failed to close trace file")
268-
// }
269-
// }()
270-
271-
// if err := trace.Start(f); err != nil {
272-
// log.WithError(err).Fatal("failed to start trace")
273-
// }
274-
// defer trace.Stop()
275-
//}
260+
if traceFile != "" {
261+
f, err := os.Create(traceFile)
262+
if err != nil {
263+
log.WithError(err).Fatal("failed to create trace output file")
264+
}
265+
defer func() {
266+
if err := f.Close(); err != nil {
267+
log.WithError(err).Fatal("failed to close trace file")
268+
}
269+
}()
270+
271+
if err := trace.Start(f); err != nil {
272+
log.WithError(err).Fatal("failed to start trace")
273+
}
274+
defer trace.Stop()
275+
}
276276

277277
<-signalCh
278278
utils.StopProfile()

0 commit comments

Comments
 (0)