Skip to content

Commit 1193c84

Browse files
authored
fix(live): fix how xidmap stores value along with upsertPredicate (#9309)
1 parent 60ddeb7 commit 1193c84

File tree

2 files changed

+68
-17
lines changed

2 files changed

+68
-17
lines changed

dgraph/cmd/live/batch.go

+58-17
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"time"
1818

1919
"github.com/dgryski/go-farm"
20+
"github.com/dustin/go-humanize"
2021
"github.com/dustin/go-humanize/english"
2122
"google.golang.org/grpc"
2223
"google.golang.org/grpc/codes"
@@ -70,6 +71,9 @@ type loader struct {
7071
// To get time elapsed
7172
start time.Time
7273

74+
inflight int32
75+
conc int32
76+
7377
conflicts map[uint64]struct{}
7478
uidsLock sync.RWMutex
7579

@@ -156,6 +160,7 @@ func (l *loader) infinitelyRetry(req *request) {
156160
}
157161

158162
func (l *loader) mutate(req *request) error {
163+
atomic.AddInt32(&l.inflight, 1)
159164
txn := l.dc.NewTxn()
160165
req.CommitNow = true
161166
request := &api.Request{
@@ -168,6 +173,7 @@ func (l *loader) mutate(req *request) error {
168173

169174
func (l *loader) request(req *request) {
170175
atomic.AddUint64(&l.reqNum, 1)
176+
defer atomic.AddInt32(&l.inflight, -1)
171177
err := l.mutate(req)
172178
if err == nil {
173179
atomic.AddUint64(&l.nquads, uint64(len(req.Set)))
@@ -390,23 +396,52 @@ func (l *loader) deregister(req *request) {
390396
// caller functions.
391397
func (l *loader) makeRequests() {
392398
defer l.requestsWg.Done()
399+
atomic.AddInt32(&l.conc, 1)
400+
defer atomic.AddInt32(&l.conc, -1)
393401

394402
buffer := make([]*request, 0, l.opts.bufferSize)
395-
drain := func(maxSize int) {
396-
for len(buffer) > maxSize {
397-
i := 0
398-
for _, req := range buffer {
399-
// If there is no conflict in req, we will use it
400-
// and then it would shift all the other reqs in buffer
401-
if !l.addConflictKeys(req) {
402-
buffer[i] = req
403-
i++
404-
continue
405-
}
406-
// Req will no longer be part of a buffer
403+
var loops int
404+
drain := func() {
405+
i := 0
406+
for _, req := range buffer {
407+
loops++
408+
// If there is no conflict in req, we will use it
409+
// and then it would shift all the other reqs in buffer
410+
if !l.addConflictKeys(req) {
411+
buffer[i] = req
412+
i++
413+
continue
414+
}
415+
// Req will no longer be part of a buffer
416+
l.request(req)
417+
}
418+
buffer = buffer[:i]
419+
}
420+
421+
t := time.NewTicker(5 * time.Second)
422+
defer t.Stop()
423+
424+
loop:
425+
for {
426+
select {
427+
case req, ok := <-l.reqs:
428+
if !ok {
429+
break loop
430+
}
431+
req.conflicts = l.conflictKeysForReq(req)
432+
if l.addConflictKeys(req) {
407433
l.request(req)
434+
} else {
435+
buffer = append(buffer, req)
436+
}
437+
438+
case <-t.C:
439+
for {
440+
drain()
441+
if len(buffer) < l.opts.bufferSize {
442+
break
443+
}
408444
}
409-
buffer = buffer[:i]
410445
}
411446
}
412447

@@ -417,10 +452,13 @@ func (l *loader) makeRequests() {
417452
} else {
418453
buffer = append(buffer, req)
419454
}
420-
drain(l.opts.bufferSize - 1)
455+
456+
drain()
457+
time.Sleep(100 * time.Millisecond)
421458
}
459+
fmt.Printf("Looped %d times over buffered requests.\n", loops)
422460

423-
drain(0)
461+
drain()
424462
}
425463

426464
func (l *loader) printCounters() {
@@ -434,8 +472,11 @@ func (l *loader) printCounters() {
434472
rate := float64(counter.Nquads-last.Nquads) / period.Seconds()
435473
elapsed := time.Since(start).Round(time.Second)
436474
timestamp := time.Now().Format("15:04:05Z0700")
437-
fmt.Printf("[%s] Elapsed: %s Txns: %d N-Quads: %d N-Quads/s [last 5s]: %5.0f Aborts: %d\n",
438-
timestamp, x.FixedDuration(elapsed), counter.TxnsDone, counter.Nquads, rate, counter.Aborts)
475+
fmt.Printf("[%s] Elapsed: %s Txns: %d N-Quads: %s N-Quads/s: %s"+
476+
" Inflight: %2d/%2d Aborts: %d\n",
477+
timestamp, x.FixedDuration(elapsed), counter.TxnsDone,
478+
humanize.Comma(int64(counter.Nquads)), humanize.Comma(int64(rate)),
479+
atomic.LoadInt32(&l.inflight), atomic.LoadInt32(&l.conc), counter.Aborts)
439480
last = counter
440481
}
441482
}

xidmap/xidmap.go

+10
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,16 @@ func (m *XidMap) SetUid(xid string, uid uint64) {
215215
sh.Lock()
216216
defer sh.Unlock()
217217
sh.tree.Set(farm.Fingerprint64([]byte(xid)), uid)
218+
if m.writer != nil {
219+
var uidBuf [8]byte
220+
binary.BigEndian.PutUint64(uidBuf[:], uid)
221+
m.kvBuf = append(m.kvBuf, kv{key: []byte(xid), value: uidBuf[:]})
222+
223+
if len(m.kvBuf) == 64 {
224+
m.kvChan <- m.kvBuf
225+
m.kvBuf = make([]kv, 0, 64)
226+
}
227+
}
218228
}
219229

220230
func (m *XidMap) dbWriter() {

0 commit comments

Comments
 (0)