Skip to content

Kayak performance improvement refactor #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Nov 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
670bd0a
Kayak performance refactor
Nov 1, 2018
79ce086
Add logs
Nov 5, 2018
da3308e
Increase commit window and support commit window in follower
Nov 5, 2018
a185ed9
Update runtime
Nov 6, 2018
5011864
CallerHook disabled in Warning Info Debug
Nov 6, 2018
6de61a4
Remove log in tracker
Nov 6, 2018
3f20b3f
No -traceFile in performance test
Nov 6, 2018
cadfa74
Comment single test
Nov 6, 2018
dcc8104
Add more verbose logging
Nov 6, 2018
3a2bf5b
Remove bin/*.test from image
Nov 6, 2018
1fbbe08
Use covenantsql instead of covenantsql.io for docker hub org name
Nov 6, 2018
0aa1739
Add covenantsql explorer to docker-compose
ggicci Nov 6, 2018
f45f374
Update grep command to support binary log grepping
Nov 6, 2018
557b06c
Fix makefile for docker build
Nov 6, 2018
3a7ff27
Add docker push version
Nov 6, 2018
8b7465f
Fix compilation failure bug
Nov 6, 2018
4436400
Fix docker-compose version
Nov 6, 2018
39cc93e
NodeID.UnmarshalBinary process 64 bytes NodeID for backward compatible
Nov 6, 2018
3194c18
Update hash-upgrade binary logic
Nov 6, 2018
a0f79b3
Update hotfix binary to pretty print convert result
Nov 6, 2018
0f9c3d3
Update hash-upgrade binary to pretty print json instance config
Nov 6, 2018
9ddf6de
Fix typo in log import
Nov 5, 2018
bdc401c
Fix bad import in gdpaverage.go
Nov 6, 2018
04b4cce
Add test service pem for docker-compose
Nov 6, 2018
e659592
Fix empty time encode/decode and hash verification bug
Nov 6, 2018
1debdf4
Adapter use test/service/node_adapter conf in docker-compose
Nov 6, 2018
eaeb233
If CryptoConn.Conn is nil, Close just return
Nov 7, 2018
32093c6
For benchtest
Nov 7, 2018
3934eee
Clean up chain.db and observer.db when start
Nov 7, 2018
d989f11
Remove unused delete statement
Nov 7, 2018
2544402
Goimports & golint
Nov 7, 2018
ef4ce2f
Remove useless code
Nov 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ FROM golang:1.11-stretch as builder
WORKDIR /go/src/github.com/CovenantSQL/CovenantSQL
COPY . .
RUN CGO_ENABLED=1 GOOS=linux GOLDFLAGS="-linkmode external -extldflags -static" ./build.sh
RUN rm -f bin/*.test

# Stage: runner
FROM alpine:3.7
Expand Down
41 changes: 3 additions & 38 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
default: build

IMAGE := covenantsql.io/covenantsql
IMAGE := covenantsql/covenantsql
GIT_COMMIT ?= $(shell git rev-parse --short HEAD)
GIT_DIRTY ?= $(shell test -n "`git status --porcelain`" && echo "+CHANGES" || true)
GIT_DESCRIBE ?= $(shell git describe --tags --always)
Expand Down Expand Up @@ -38,4 +38,8 @@ start:
logs:
docker-compose logs -f --tail=10

.PHONY: status build save start logs
push:
docker push $(IMAGE):$(VERSION)
docker push $(IMAGE):latest

.PHONY: status build save start logs push
2 changes: 2 additions & 0 deletions bin/docker-entry.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ miner)
exec /app/cql-minerd -config "${COVENANT_CONF}"
;;
blockproducer)
rm -f /app/node_*/chain.db
exec /app/cqld -config "${COVENANT_CONF}"
;;
observer)
rm -f /app/node_observer/observer.db
exec /app/cql-observer -config "${COVENANT_CONF}" "${@}"
;;
adapter)
Expand Down
12 changes: 6 additions & 6 deletions blockproducer/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (c *Chain) produceBlock(now time.Time) error {
peers := c.rt.getPeers()
wg := &sync.WaitGroup{}
for _, s := range peers.Servers {
if !s.ID.IsEqual(&c.rt.nodeID) {
if !s.IsEqual(&c.rt.nodeID) {
wg.Add(1)
go func(id proto.NodeID) {
defer wg.Done()
Expand All @@ -399,7 +399,7 @@ func (c *Chain) produceBlock(now time.Time) error {
"node": id,
}).Debug("success advising block")
}
}(s.ID)
}(s)
}
}

Expand Down Expand Up @@ -723,12 +723,12 @@ func (c *Chain) syncHead() {
succ := false

for i, s := range peers.Servers {
if !s.ID.IsEqual(&c.rt.nodeID) {
err = c.cl.CallNode(s.ID, route.MCCFetchBlock.String(), req, resp)
if !s.IsEqual(&c.rt.nodeID) {
err = c.cl.CallNode(s, route.MCCFetchBlock.String(), req, resp)
if err != nil || resp.Block == nil {
log.WithFields(log.Fields{
"peer": c.rt.getPeerInfoString(),
"remote": fmt.Sprintf("[%d/%d] %s", i, len(peers.Servers), s.ID),
"remote": fmt.Sprintf("[%d/%d] %s", i, len(peers.Servers), s),
"curr_turn": c.rt.getNextTurn(),
"head_height": c.rt.getHead().getHeight(),
"head_block": c.rt.getHead().getHeader().String(),
Expand All @@ -738,7 +738,7 @@ func (c *Chain) syncHead() {
c.blocksFromRPC <- resp.Block
log.WithFields(log.Fields{
"peer": c.rt.getPeerInfoString(),
"remote": fmt.Sprintf("[%d/%d] %s", i, len(peers.Servers), s.ID),
"remote": fmt.Sprintf("[%d/%d] %s", i, len(peers.Servers), s),
"curr_turn": c.rt.getNextTurn(),
"head_height": c.rt.getHead().getHeight(),
"head_block": c.rt.getHead().getHeader().String(),
Expand Down
18 changes: 11 additions & 7 deletions blockproducer/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
pi "github.com/CovenantSQL/CovenantSQL/blockproducer/interfaces"
pt "github.com/CovenantSQL/CovenantSQL/blockproducer/types"
"github.com/CovenantSQL/CovenantSQL/crypto/kms"
"github.com/CovenantSQL/CovenantSQL/kayak"
"github.com/CovenantSQL/CovenantSQL/pow/cpuminer"
"github.com/CovenantSQL/CovenantSQL/proto"
"github.com/CovenantSQL/CovenantSQL/route"
Expand Down Expand Up @@ -69,7 +68,7 @@ func TestChain(t *testing.T) {
So(err, ShouldBeNil)
_, peers, err := createTestPeersWithPrivKeys(priv, testPeersNumber)

cfg := NewConfig(genesis, fl.Name(), rpcServer, peers, peers.Servers[0].ID, testPeriod, testTick)
cfg := NewConfig(genesis, fl.Name(), rpcServer, peers, peers.Servers[0], testPeriod, testTick)
chain, err := NewChain(cfg)
So(err, ShouldBeNil)
ao, ok := chain.ms.readonly.accounts[testAddress1]
Expand Down Expand Up @@ -199,7 +198,7 @@ func TestMultiNode(t *testing.T) {
}

var nis []cpuminer.NonceInfo
var peers *kayak.Peers
var peers *proto.Peers
peerInited := false
for i := range chains {
// create tmp file
Expand All @@ -219,13 +218,13 @@ func TestMultiNode(t *testing.T) {
So(err, ShouldBeNil)

for i, p := range peers.Servers {
t.Logf("Peer #%d: %s", i, p.ID)
t.Logf("Peer #%d: %s", i, p)
}

peerInited = true
}

cfg := NewConfig(genesis, fl.Name(), server, peers, peers.Servers[i].ID, testPeriod, testTick)
cfg := NewConfig(genesis, fl.Name(), server, peers, peers.Servers[i], testPeriod, testTick)

// init chain
chains[i], err = NewChain(cfg)
Expand All @@ -235,8 +234,13 @@ func TestMultiNode(t *testing.T) {
pub, err := kms.GetLocalPublicKey()
So(err, ShouldBeNil)
node := proto.Node{
ID: peers.Servers[i].ID,
Role: peers.Servers[i].Role,
ID: peers.Servers[i],
Role: func(peers *proto.Peers, i int) proto.ServerRole {
if peers.Leader.IsEqual(&peers.Servers[i]) {
return proto.Leader
}
return proto.Follower
}(peers, i),
Addr: server.Listener.Addr().String(),
PublicKey: pub,
Nonce: nis[i].Nonce,
Expand Down
5 changes: 2 additions & 3 deletions blockproducer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"time"

"github.com/CovenantSQL/CovenantSQL/blockproducer/types"
"github.com/CovenantSQL/CovenantSQL/kayak"
"github.com/CovenantSQL/CovenantSQL/proto"
"github.com/CovenantSQL/CovenantSQL/rpc"
)
Expand All @@ -37,7 +36,7 @@ type Config struct {

Server *rpc.Server

Peers *kayak.Peers
Peers *proto.Peers
NodeID proto.NodeID

Period time.Duration
Expand All @@ -46,7 +45,7 @@ type Config struct {

// NewConfig creates new config.
func NewConfig(genesis *types.Block, dataFile string,
server *rpc.Server, peers *kayak.Peers,
server *rpc.Server, peers *proto.Peers,
nodeID proto.NodeID, period time.Duration, tick time.Duration) *Config {
config := Config{
Genesis: genesis,
Expand Down
65 changes: 11 additions & 54 deletions blockproducer/db_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/CovenantSQL/CovenantSQL/crypto/asymmetric"
"github.com/CovenantSQL/CovenantSQL/crypto/hash"
"github.com/CovenantSQL/CovenantSQL/crypto/kms"
"github.com/CovenantSQL/CovenantSQL/kayak"
"github.com/CovenantSQL/CovenantSQL/metric"
"github.com/CovenantSQL/CovenantSQL/pow/cpuminer"
"github.com/CovenantSQL/CovenantSQL/proto"
Expand Down Expand Up @@ -92,7 +91,7 @@ func (s *DBService) CreateDatabase(req *CreateDatabaseRequest, resp *CreateDatab
log.WithField("db", dbID).Debug("generated database id")

// allocate nodes
var peers *kayak.Peers
var peers *proto.Peers
if peers, err = s.allocateNodes(0, dbID, req.Header.ResourceMeta); err != nil {
return
}
Expand Down Expand Up @@ -139,7 +138,7 @@ func (s *DBService) CreateDatabase(req *CreateDatabaseRequest, resp *CreateDatab
return
}

if err = s.batchSendSvcReq(initSvcReq, rollbackReq, s.peersToNodes(peers)); err != nil {
if err = s.batchSendSvcReq(initSvcReq, rollbackReq, peers.Servers); err != nil {
return
}

Expand Down Expand Up @@ -208,7 +207,7 @@ func (s *DBService) DropDatabase(req *DropDatabaseRequest, resp *DropDatabaseRes
return
}

if err = s.batchSendSvcReq(dropDBSvcReq, nil, s.peersToNodes(instanceMeta.Peers)); err != nil {
if err = s.batchSendSvcReq(dropDBSvcReq, nil, instanceMeta.Peers.Servers); err != nil {
return
}

Expand Down Expand Up @@ -324,7 +323,7 @@ func (s *DBService) generateDatabaseID(reqNodeID *proto.RawNodeID) (dbID proto.D
}
}

func (s *DBService) allocateNodes(lastTerm uint64, dbID proto.DatabaseID, resourceMeta wt.ResourceMeta) (peers *kayak.Peers, err error) {
func (s *DBService) allocateNodes(lastTerm uint64, dbID proto.DatabaseID, resourceMeta wt.ResourceMeta) (peers *proto.Peers, err error) {
curRange := int(resourceMeta.Node)
excludeNodes := make(map[proto.NodeID]bool)
var allocated []allocatedNode
Expand Down Expand Up @@ -444,7 +443,7 @@ func (s *DBService) allocateNodes(lastTerm uint64, dbID proto.DatabaseID, resour
}

// build peers
return s.buildPeers(lastTerm+1, nodes, nodeAllocated)
return s.buildPeers(lastTerm+1, nodeAllocated)
}

curRange += int(resourceMeta.Node)
Expand Down Expand Up @@ -479,54 +478,26 @@ func (s *DBService) getMetric(metric metric.MetricMap, keys []string) (value uin
return
}

func (s *DBService) buildPeers(term uint64, nodes []proto.Node, allocated []proto.NodeID) (peers *kayak.Peers, err error) {
func (s *DBService) buildPeers(term uint64, allocated []proto.NodeID) (peers *proto.Peers, err error) {
log.WithFields(log.Fields{
"term": term,
"nodes": allocated,
}).Debug("build peers for term/nodes")

// get local private key
var pubKey *asymmetric.PublicKey
if pubKey, err = kms.GetLocalPublicKey(); err != nil {
return
}

var privKey *asymmetric.PrivateKey
if privKey, err = kms.GetLocalPrivateKey(); err != nil {
return
}

// get allocated node info
allocatedMap := make(map[proto.NodeID]bool)

for _, nodeID := range allocated {
allocatedMap[nodeID] = true
}

allocatedNodes := make([]proto.Node, 0, len(allocated))

for _, node := range nodes {
if allocatedMap[node.ID] {
allocatedNodes = append(allocatedNodes, node)
}
}

peers = &kayak.Peers{
Term: term,
PubKey: pubKey,
Servers: make([]*kayak.Server, len(allocated)),
}

for idx, node := range allocatedNodes {
peers.Servers[idx] = &kayak.Server{
Role: proto.Follower,
ID: node.ID,
PubKey: node.PublicKey,
}
peers = &proto.Peers{
PeersHeader: proto.PeersHeader{
Term: term,
Servers: allocated,
},
}

// choose the first node as leader, allocateNodes sort the allocated node list by memory size
peers.Servers[0].Role = proto.Leader
peers.Leader = peers.Servers[0]

// sign the peers structure
Expand Down Expand Up @@ -591,17 +562,3 @@ func (s *DBService) batchSendSingleSvcReq(req *wt.UpdateService, nodes []proto.N

return
}

func (s *DBService) peersToNodes(peers *kayak.Peers) (nodes []proto.NodeID) {
if peers == nil {
return
}

nodes = make([]proto.NodeID, 0, len(peers.Servers))

for _, s := range peers.Servers {
nodes = append(nodes, s.ID)
}

return
}
Loading