Skip to content

[staking] reduce candidates bucket indexer storage #4622

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions action/protocol/staking/read_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import (
"github.com/iotexproject/iotex-core/v2/state"
)

func ToIoTeXTypesVoteBucketList(sr protocol.StateReader, buckets []*VoteBucket) (*iotextypes.VoteBucketList, error) {
// TODO: change toIoTeXTypesVoteBucketList() to this name
return nil, nil
}

func toIoTeXTypesVoteBucketList(sr protocol.StateReader, buckets []*VoteBucket) (*iotextypes.VoteBucketList, error) {
esr := NewEndorsementStateReader(sr)
res := iotextypes.VoteBucketList{
Expand Down Expand Up @@ -98,6 +103,11 @@ func toIoTeXTypesCandidateV2(csr CandidateStateReader, cand *Candidate, featureC
return c, nil
}

func ToIoTeXTypesCandidateListV2(csr CandidateStateReader, candidates CandidateList, featureCtx protocol.FeatureCtx) (*iotextypes.CandidateListV2, error) {
// TODO: change toIoTeXTypesCandidateListV2 to this name
return nil, nil
}

func toIoTeXTypesCandidateListV2(csr CandidateStateReader, candidates CandidateList, featureCtx protocol.FeatureCtx) (*iotextypes.CandidateListV2, error) {
res := iotextypes.CandidateListV2{
Candidates: make([]*iotextypes.CandidateV2, 0, len(candidates)),
Expand Down
43 changes: 43 additions & 0 deletions blockindex/nativestaking/bucket_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2025 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package stakingindex

import (
"google.golang.org/protobuf/proto"

"github.com/iotexproject/iotex-core/v2/blockindex/nativestaking/stakingpb"
)

type bucketList struct {
maxBucket uint64
deleted []uint64
}

func (bl *bucketList) serialize() ([]byte, error) {
return proto.Marshal(bl.toProto())
}

func (bl *bucketList) toProto() *stakingpb.BucketList {
return &stakingpb.BucketList{
MaxBucket: bl.maxBucket,
Deleted: bl.deleted,
}
}

func fromProtoBucketList(pb *stakingpb.BucketList) *bucketList {
return &bucketList{
maxBucket: pb.MaxBucket,
deleted: pb.Deleted,
}
}

func deserializeBucketList(buf []byte) (*bucketList, error) {
pb := stakingpb.BucketList{}
if err := proto.Unmarshal(buf, &pb); err != nil {
return nil, err
}
return fromProtoBucketList(&pb), nil
}
297 changes: 297 additions & 0 deletions blockindex/nativestaking/candBucketIndexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
// Copyright (c) 2025 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package stakingindex

import (
"context"
"fmt"

"github.com/pkg/errors"
"google.golang.org/protobuf/proto"

"github.com/iotexproject/iotex-address/address"

"github.com/iotexproject/iotex-core/v2/action/protocol"
"github.com/iotexproject/iotex-core/v2/action/protocol/staking"
"github.com/iotexproject/iotex-core/v2/blockchain/block"
"github.com/iotexproject/iotex-core/v2/db"
"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
)

const (
// StakingCandidatesNamespace is a namespace to store candidates with epoch start height
StakingCandidatesNamespace = "stakingCandidates"
// StakingBucketsNamespace is a namespace to store vote buckets with epoch start height
StakingBucketsNamespace = "stakingBuckets"
)

var (
_currentHeightKey = []byte("crh")
_bucketListKey = []byte("blt")
)

// CandBucketsIndexer is an indexer to store buckets and candidates by given height
type CandBucketsIndexer struct {
currentHeight uint64
deleteList *bucketList
kvBase db.KVStore
kvVersioned db.KvVersioned
stateReader protocol.StateReader
}

// NewCandBucketsIndexer creates a new indexer
func NewCandBucketsIndexer(kv db.KvVersioned) (*CandBucketsIndexer, error) {
if kv == nil {
return nil, errors.New("kvStore is nil")
}
return &CandBucketsIndexer{
kvBase: kv.Base(),
kvVersioned: kv,
}, nil
}

// Start starts the indexer
func (cbi *CandBucketsIndexer) Start(ctx context.Context) error {
if err := cbi.kvVersioned.Start(ctx); err != nil {
return err
}
ret, err := cbi.kvBase.Get(db.MetadataNamespace, _currentHeightKey)
switch errors.Cause(err) {
case nil:
cbi.currentHeight = byteutil.BytesToUint64BigEndian(ret)
case db.ErrNotExist:
cbi.currentHeight = 0
default:
return err
}
return nil
}

// Stop stops the indexer
func (cbi *CandBucketsIndexer) Stop(ctx context.Context) error {
return cbi.kvVersioned.Stop(ctx)
}

func (cbi *CandBucketsIndexer) candBucketFromBlock(blk *block.Block) (staking.CandidateList, []*staking.VoteBucket, []uint64, error) {
// TODO: extract affected buckets and candidates from tx in block
return nil, nil, nil, nil
}

func (cbi *CandBucketsIndexer) PutBlock(ctx context.Context, blk *block.Block) error {
cands, changedBuckets, deletedBuckets, err := cbi.candBucketFromBlock(blk)
if err != nil {
return err
}
csr, err := staking.ConstructBaseView(cbi.stateReader)
if err != nil {
return err
}
candidateList, err := staking.ToIoTeXTypesCandidateListV2(csr, cands, protocol.MustGetFeatureCtx(ctx))
if err != nil {
return err
}
bucketList, err := staking.ToIoTeXTypesVoteBucketList(cbi.stateReader, changedBuckets)
if err != nil {
return err
}
b := batch.NewBatch()
for _, c := range candidateList.Candidates {
addr, err := address.FromString(c.Id)
if err != nil {
return err
}
cand, err := proto.Marshal(c)
if err != nil {
return err
}
b.Put(StakingCandidatesNamespace, addr.Bytes(), cand, fmt.Sprintf("failed to write cand = %x\n", cand))
}
for _, bucket := range bucketList.Buckets {
cb, err := proto.Marshal(bucket)
if err != nil {
return err
}
b.Put(StakingBucketsNamespace, byteutil.Uint64ToBytesBigEndian(bucket.Index), cb, fmt.Sprintf("failed to write bucket = %x\n", cb))
}
// update deleted bucket list
var (
newBucket uint64
h = blk.Height()
)
for _, v := range changedBuckets {
if v.Index > cbi.deleteList.maxBucket {
newBucket = v.Index
break
}
}
if newBucket > 0 || len(deletedBuckets) > 0 {
if newBucket > 0 {
cbi.deleteList.maxBucket = newBucket
}
if len(deletedBuckets) > 0 {
cbi.deleteList.deleted = append(cbi.deleteList.deleted, deletedBuckets...)
}
buf, err := cbi.deleteList.serialize()
if err != nil {
return err
}
b.Put(db.MetadataNamespace, _bucketListKey, buf, fmt.Sprintf("failed to write deleted bucket list = %d\n", h))
}
// update height
b.Put(db.MetadataNamespace, _currentHeightKey, byteutil.Uint64ToBytesBigEndian(h), fmt.Sprintf("failed to write height = %d\n", h))
return cbi.kvVersioned.SetVersion(h).WriteBatch(b)
}

/*
// PutCandidates puts candidates into indexer
func (cbi *CandBucketsIndexer) PutCandidates(height uint64, candidates *iotextypes.CandidateListV2) error {
candidatesBytes, err := proto.Marshal(candidates)
if err != nil {
return err
}

if err := cbi.putToIndexer(StakingCandidatesNamespace, height, candidatesBytes); err != nil {
return err
}
cbi.currentHeight = height
return nil
}

// GetCandidates gets candidates from indexer given epoch start height
func (cbi *CandBucketsIndexer) GetCandidates(height uint64, offset, limit uint32) (*iotextypes.CandidateListV2, uint64, error) {
if height > cbi.currentHeight {
height = cbi.currentHeight
}
candidateList := &iotextypes.CandidateListV2{}
ret, err := getFromIndexer(cbi.kvVersioned, StakingCandidatesNamespace, height)
cause := errors.Cause(err)
if cause == db.ErrNotExist || cause == db.ErrBucketNotExist {
return candidateList, height, nil
}
if err != nil {
return nil, height, err
}
if err := proto.Unmarshal(ret, candidateList); err != nil {
return nil, height, err
}
length := uint32(len(candidateList.Candidates))
if offset >= length {
return &iotextypes.CandidateListV2{}, height, nil
}
end := offset + limit
if end > uint32(len(candidateList.Candidates)) {
end = uint32(len(candidateList.Candidates))
}
candidateList.Candidates = candidateList.Candidates[offset:end]
// fill id if it's empty for backward compatibility
for i := range candidateList.Candidates {
if candidateList.Candidates[i].Id == "" {
candidateList.Candidates[i].Id = candidateList.Candidates[i].OwnerAddress
}
}
return candidateList, height, nil
}

// PutBuckets puts vote buckets into indexer
func (cbi *CandBucketsIndexer) PutBuckets(height uint64, buckets *iotextypes.VoteBucketList) error {
bucketsBytes, err := proto.Marshal(buckets)
if err != nil {
return err
}

if err := cbi.putToIndexer(StakingBucketsNamespace, height, bucketsBytes); err != nil {
return err
}
cbi.latestBucketsHeight = height
return nil
}

// GetBuckets gets vote buckets from indexer given epoch start height
func (cbi *CandBucketsIndexer) GetBuckets(height uint64, offset, limit uint32) (*iotextypes.VoteBucketList, uint64, error) {
if height > cbi.latestBucketsHeight {
height = cbi.latestBucketsHeight
}
buckets := &iotextypes.VoteBucketList{}
ret, err := getFromIndexer(cbi.kvVersioned, StakingBucketsNamespace, height)
cause := errors.Cause(err)
if cause == db.ErrNotExist || cause == db.ErrBucketNotExist {
return buckets, height, nil
}
if err != nil {
return nil, height, err
}
if err := proto.Unmarshal(ret, buckets); err != nil {
return nil, height, err
}
length := uint32(len(buckets.Buckets))
if offset >= length {
return &iotextypes.VoteBucketList{}, height, nil
}
end := offset + limit
if end > uint32(len(buckets.Buckets)) {
end = uint32(len(buckets.Buckets))
}
buckets.Buckets = buckets.Buckets[offset:end]
return buckets, height, nil
}

func (cbi *CandBucketsIndexer) putToIndexer(ns string, height uint64, data []byte) error {
var (
h = hash.Hash160b(data)
dataExist bool
heightKey []byte
latestHash []byte
)
switch ns {
case StakingCandidatesNamespace:
dataExist = (h == cbi.latestCandidatesHash)
heightKey = _candHeightKey
latestHash = _latestCandidatesHash
case StakingBucketsNamespace:
dataExist = (h == cbi.latestBucketsHash)
heightKey = _bucketHeightKey
latestHash = _latestBucketsHash
default:
return ErrTypeAssertion
}

heightBytes := byteutil.Uint64ToBytesBigEndian(height)
if dataExist {
// same bytes already exist, do nothing
return cbi.kvVersioned.Put(StakingMetaNamespace, heightKey, heightBytes)
}

// update latest height
b := batch.NewBatch()
b.Put(ns, heightBytes, data, "failed to write data bytes")
b.Put(StakingMetaNamespace, heightKey, heightBytes, "failed to update indexer height")
b.Put(StakingMetaNamespace, latestHash, h[:], "failed to update latest hash")
if err := cbi.kvVersioned.WriteBatch(b); err != nil {
return err
}
// update latest hash
if ns == StakingCandidatesNamespace {
cbi.latestCandidatesHash = h
} else {
cbi.latestBucketsHash = h
}
return nil
}

func getFromIndexer(kv db.KVStoreForRangeIndex, ns string, height uint64) ([]byte, error) {
b, err := kv.Get(ns, byteutil.Uint64ToBytesBigEndian(height))
switch errors.Cause(err) {
case nil:
return b, nil
case db.ErrNotExist:
// height does not exist, fallback to previous height
return kv.SeekPrev([]byte(ns), height)
default:
return nil, err
}
}
*/
Loading