From 3cc823b646cb1fcaa6176568118153876313aff5 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Fri, 15 Sep 2023 12:15:14 +0530 Subject: [PATCH 1/2] Use get batch api badger --- go.mod | 5 +-- go.sum | 8 ++-- posting/list_test.go | 32 ++++++++++++++++ posting/lists.go | 91 +++++++++++++++++++++++++++++++++++--------- worker/task.go | 18 +++++++-- 5 files changed, 125 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index d1a22a5d6ab..2013f595122 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/IBM/sarama v1.45.0 github.com/Masterminds/semver/v3 v3.3.1 github.com/blevesearch/bleve/v2 v2.4.4 - github.com/dgraph-io/badger/v4 v4.5.1 + github.com/dgraph-io/badger/v4 v4.5.2-0.20250218121059-0faedd88140e github.com/dgraph-io/dgo/v240 v240.1.0 github.com/dgraph-io/gqlgen v0.13.2 github.com/dgraph-io/gqlparser/v2 v2.2.2 @@ -98,7 +98,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/google/flatbuffers v25.1.24+incompatible // indirect + github.com/google/flatbuffers v25.2.10+incompatible // indirect github.com/google/pprof v0.0.0-20250128161936-077ca0a936bf // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect @@ -156,7 +156,6 @@ require ( go.opentelemetry.io/otel v1.34.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 // indirect go.opentelemetry.io/otel/metric v1.34.0 // indirect - go.opentelemetry.io/otel/sdk v1.34.0 // indirect go.opentelemetry.io/otel/trace v1.34.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/time v0.9.0 // indirect diff --git a/go.sum b/go.sum index d6f17102d01..e17aed5fcbf 100644 --- a/go.sum +++ b/go.sum @@ -127,8 +127,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/badger/v4 v4.5.1 h1:7DCIXrQjo1LKmM96YD+hLVJ2EEsyyoWxJfpdd56HLps= -github.com/dgraph-io/badger/v4 v4.5.1/go.mod h1:qn3Be0j3TfV4kPbVoK0arXCD1/nr1ftth6sbL5jxdoA= +github.com/dgraph-io/badger/v4 v4.5.2-0.20250218121059-0faedd88140e h1:sZmnvDqloFjehWjr6f/G5O8ANbhenwSYdkGxkTR2Bww= +github.com/dgraph-io/badger/v4 v4.5.2-0.20250218121059-0faedd88140e/go.mod h1:aSwx/bXKT3/WRl9rn2BrTU+tfRQlFPKlOsqRTdcpHB8= github.com/dgraph-io/dgo/v240 v240.1.0 h1:xd8z9kEXDWOAblaLJ2HLg2tXD6ngMQwq3ehLUS7GKNg= github.com/dgraph-io/dgo/v240 v240.1.0/go.mod h1:r8WASETKfodzKqThSAhhTNIzcEMychArKKlZXQufWuA= github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM= @@ -267,8 +267,8 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/codesearch v1.2.0 h1:VlyAH+AntnIbGGArOUs6sEBdPVwYvf1e8Uw3/TC77cA= github.com/google/codesearch v1.2.0/go.mod h1:9wQjQDVAP7Mvt96tw1KqVeXncdBLOWUYdxRiHlsG6Xc= -github.com/google/flatbuffers v25.1.24+incompatible h1:4wPqL3K7GzBd1CwyhSd3usxLKOaJN/AC6puCca6Jm7o= -github.com/google/flatbuffers v25.1.24+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/flatbuffers v25.2.10+incompatible h1:F3vclr7C3HpB1k9mxCGRMXq6FdUalZ6H/pNX4FP1v0Q= +github.com/google/flatbuffers v25.2.10+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= diff --git a/posting/list_test.go b/posting/list_test.go index d8d999f228a..62973d82e44 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -493,7 +493,10 @@ func TestAddMutation_mrjn1(t *testing.T) { func TestReadSingleValue(t *testing.T) { defer setMaxListSize(maxListSize) maxListSize = math.MaxInt32 +<<<<<<< HEAD require.Equal(t, nil, pstore.DropAll()) +======= +>>>>>>> 6fbd525d2 (Use get batch api badger) // We call pl.Iterate and then stop iterating in the first loop when we are reading // single values. This test confirms that the two functions, getFirst from this file @@ -502,6 +505,7 @@ func TestReadSingleValue(t *testing.T) { key := x.DataKey(x.GalaxyAttr("value"), 1240) ol, err := getNew(key, ps, math.MaxUint64) require.NoError(t, err) +<<<<<<< HEAD N := uint64(10000) for i := uint64(2); i <= N; i += 2 { edge := &pb.DirectedEdge{ @@ -514,6 +518,19 @@ func TestReadSingleValue(t *testing.T) { kData := ol.getMutation(i + 1) writer := NewTxnWriter(pstore) if err := writer.SetAt(key, kData, BitDeltaPosting, i+1); err != nil { +======= + N := int(10000) + for i := 2; i <= N; i += 2 { + edge := &pb.DirectedEdge{ + Value: []byte("ho hey there" + strconv.Itoa(i)), + } + txn := Txn{StartTs: uint64(i)} + addMutationHelper(t, ol, edge, Set, &txn) + require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1)) + kData := ol.getMutation(uint64(i)) + writer := NewTxnWriter(pstore) + if err := writer.SetAt(key, kData, BitDeltaPosting, uint64(i)); err != nil { +>>>>>>> 6fbd525d2 (Use get batch api badger) require.NoError(t, err) } writer.Flush() @@ -523,12 +540,16 @@ func TestReadSingleValue(t *testing.T) { kvs, err := ol.Rollup(nil, txn.StartTs-3) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) +<<<<<<< HEAD // Delete item from global cache before reading, as we are not updating the cache in the test memoryLayer.del(key) +======= +>>>>>>> 6fbd525d2 (Use get batch api badger) ol, err = getNew(key, ps, math.MaxUint64) require.NoError(t, err) } +<<<<<<< HEAD j := uint64(3) if j < ol.minTs { j = ol.minTs @@ -538,6 +559,17 @@ func TestReadSingleValue(t *testing.T) { k, err := tx.cache.GetSinglePosting(key) require.NoError(t, err) checkValue(t, ol, string(k.Postings[0].Value), j) +======= + j := 2 + if j < int(ol.minTs) { + j = int(ol.minTs) + } + for ; j < i+6; j++ { + tx := NewTxn(uint64(j)) + k, err := tx.cache.GetSinglePosting(key) + require.NoError(t, err) + checkValue(t, ol, string(k.Postings[0].Value), uint64(j)) +>>>>>>> 6fbd525d2 (Use get batch api badger) } } } diff --git a/posting/lists.go b/posting/lists.go index 977b0413063..4370544a2a4 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -323,32 +323,32 @@ func (lc *LocalCache) readPostingListAt(key []byte) (*pb.PostingList, error) { return pl, err } -// GetSinglePosting retrieves the cached version of the first item in the list associated with the -// given key. This is used for retrieving the value of a scalar predicats. -func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { - // This would return an error if there is some data in the local cache, but we couldn't read it. - getListFromLocalCache := func() (*pb.PostingList, error) { - lc.RLock() - - pl := &pb.PostingList{} - if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { - err := proto.Unmarshal(delta, pl) - lc.RUnlock() - return pl, err - } +func (lc *LocalCache) GetSinglePostingFromLocalCache(key []byte) (*pb.PostingList, error) { + lc.RLock() - l := lc.plists[string(key)] + pl := &pb.PostingList{} + if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { + err := proto.Unmarshal(delta, pl) lc.RUnlock() + return pl, err + } - if l != nil { - return l.StaticValue(lc.startTs) - } + l := lc.plists[string(key)] + lc.RUnlock() - return nil, nil + if l != nil { + return l.StaticValue(lc.startTs) } + return nil, nil +} + +// GetSinglePosting retrieves the cached version of the first item in the list associated with the +// given key. This is used for retrieving the value of a scalar predicats. +func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { + // This would return an error if there is some data in the local cache, but we couldn't read it. getPostings := func() (*pb.PostingList, error) { - pl, err := getListFromLocalCache() + pl, err := lc.GetSinglePostingFromLocalCache(key) // If both pl and err are empty, that means that there was no data in local cache, hence we should // read the data from badger. if pl != nil || err != nil { @@ -381,6 +381,59 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { return pl, nil } +func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, error) { + results := make([]*pb.PostingList, len(keys)) + remaining_keys := make([][]byte, 0) + for i, key := range keys { + if pl, err := lc.GetSinglePostingFromLocalCache(key); pl != nil && err != nil { + results[i] = pl + } else { + remaining_keys = append(remaining_keys, key) + } + } + + txn := pstore.NewTransactionAt(lc.startTs, false) + items, err := txn.GetBatch(remaining_keys) + if err != nil { + fmt.Println(err, keys) + return nil, err + } + idx := 0 + + for i := 0; i < len(results); i++ { + if results[i] != nil { + continue + } + pl := &pb.PostingList{} + err = items[idx].Value(func(val []byte) error { + if err := proto.Unmarshal(val, pl); err != nil { + return err + } + return nil + }) + idx += 1 + results[i] = pl + } + + for i := 0; i < len(results); i++ { + pl := results[i] + idx := 0 + for _, postings := range pl.Postings { + if hasDeleteAll(postings) { + return nil, nil + } + if postings.Op != Del { + pl.Postings[idx] = postings + idx++ + } + } + pl.Postings = pl.Postings[:idx] + results[i] = pl + } + + return results, err +} + // Get retrieves the cached version of the list associated with the given key. func (lc *LocalCache) Get(key []byte) (*List, error) { return lc.getInternal(key, true) diff --git a/worker/task.go b/worker/task.go index 0b5c067d4ff..62f7a097ab8 100644 --- a/worker/task.go +++ b/worker/task.go @@ -423,6 +423,7 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er out := &pb.Result{} outputs[start/width] = out + cache := make([]*pb.PostingList, 0) for i := start; i < end; i++ { select { case <-ctx.Done(): @@ -437,9 +438,20 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored if !getMultiplePosting { - pl, err := qs.cache.GetSinglePosting(key) - if err != nil { - return err + if len(cache) == 0 { + keys := make([][]byte, 10) + keys[0] = key + for j := i + 1; j < i+10 && j < end; j++ { + keys[j-i] = x.DataKey(q.Attr, q.UidList.Uids[j]) + } + cache, err = qs.cache.GetBatchSinglePosting(keys) + if err != nil { + return err + } + } + pl := cache[0] + if len(cache) > 1 { + cache = cache[1:] } if pl == nil || len(pl.Postings) == 0 { out.UidMatrix = append(out.UidMatrix, &pb.List{}) From fbb5863500a3d51f5004cb4e15a14bf182d79433 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Tue, 18 Feb 2025 18:51:45 +0530 Subject: [PATCH 2/2] fxied test --- posting/list_test.go | 32 -------------------------------- 1 file changed, 32 deletions(-) diff --git a/posting/list_test.go b/posting/list_test.go index 62973d82e44..d8d999f228a 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -493,10 +493,7 @@ func TestAddMutation_mrjn1(t *testing.T) { func TestReadSingleValue(t *testing.T) { defer setMaxListSize(maxListSize) maxListSize = math.MaxInt32 -<<<<<<< HEAD require.Equal(t, nil, pstore.DropAll()) -======= ->>>>>>> 6fbd525d2 (Use get batch api badger) // We call pl.Iterate and then stop iterating in the first loop when we are reading // single values. This test confirms that the two functions, getFirst from this file @@ -505,7 +502,6 @@ func TestReadSingleValue(t *testing.T) { key := x.DataKey(x.GalaxyAttr("value"), 1240) ol, err := getNew(key, ps, math.MaxUint64) require.NoError(t, err) -<<<<<<< HEAD N := uint64(10000) for i := uint64(2); i <= N; i += 2 { edge := &pb.DirectedEdge{ @@ -518,19 +514,6 @@ func TestReadSingleValue(t *testing.T) { kData := ol.getMutation(i + 1) writer := NewTxnWriter(pstore) if err := writer.SetAt(key, kData, BitDeltaPosting, i+1); err != nil { -======= - N := int(10000) - for i := 2; i <= N; i += 2 { - edge := &pb.DirectedEdge{ - Value: []byte("ho hey there" + strconv.Itoa(i)), - } - txn := Txn{StartTs: uint64(i)} - addMutationHelper(t, ol, edge, Set, &txn) - require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1)) - kData := ol.getMutation(uint64(i)) - writer := NewTxnWriter(pstore) - if err := writer.SetAt(key, kData, BitDeltaPosting, uint64(i)); err != nil { ->>>>>>> 6fbd525d2 (Use get batch api badger) require.NoError(t, err) } writer.Flush() @@ -540,16 +523,12 @@ func TestReadSingleValue(t *testing.T) { kvs, err := ol.Rollup(nil, txn.StartTs-3) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) -<<<<<<< HEAD // Delete item from global cache before reading, as we are not updating the cache in the test memoryLayer.del(key) -======= ->>>>>>> 6fbd525d2 (Use get batch api badger) ol, err = getNew(key, ps, math.MaxUint64) require.NoError(t, err) } -<<<<<<< HEAD j := uint64(3) if j < ol.minTs { j = ol.minTs @@ -559,17 +538,6 @@ func TestReadSingleValue(t *testing.T) { k, err := tx.cache.GetSinglePosting(key) require.NoError(t, err) checkValue(t, ol, string(k.Postings[0].Value), j) -======= - j := 2 - if j < int(ol.minTs) { - j = int(ol.minTs) - } - for ; j < i+6; j++ { - tx := NewTxn(uint64(j)) - k, err := tx.cache.GetSinglePosting(key) - require.NoError(t, err) - checkValue(t, ol, string(k.Postings[0].Value), uint64(j)) ->>>>>>> 6fbd525d2 (Use get batch api badger) } } }