Skip to content

draft PR #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
*.log
bin/*
.vscode
external/curve-go-rpc/*
external/website/*
docker/pigeon
docker/website
3 changes: 0 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
[submodule "external/curve-go-rpc"]
path = external/curve-go-rpc
url = git@github.com:SeanHai/curve-go-rpc.git
[submodule "external/website"]
path = external/website
url = git@github.com:opencurve/curve-dashboard.git
6 changes: 3 additions & 3 deletions api/curvebs/agent/agent.go
Original file line number Diff line number Diff line change
@@ -23,8 +23,8 @@
package agent

import (
bshttp "github.com/opencurve/curve-manager/internal/http/curvebs"
metrics "github.com/opencurve/curve-manager/internal/metrics/core"
bsrpc "github.com/opencurve/curve-manager/internal/rpc/curvebs"
"github.com/opencurve/curve-manager/internal/snapshotclone"
"github.com/opencurve/curve-manager/internal/storage"
"github.com/opencurve/pigeon"
@@ -75,7 +75,7 @@ func InitClients(logger *pigeon.Logger) error {
}

// init mds rpc client
bsrpc.Init(clusterAddrs.Addrs)
bshttp.Init(clusterAddrs.Addrs)

// init metric client
metrics.Init(clusterAddrs.Addrs)
@@ -86,7 +86,7 @@ func InitClients(logger *pigeon.Logger) error {
if currentClusterId <= 0 && clusterAddrs.ClusterId > 0 {
currentClusterId = clusterAddrs.ClusterId
return initAlerts(alertExpirationDays, logger)
}
}
if currentClusterId > 0 {
stopAlertTasks()
currentClusterId = clusterAddrs.ClusterId
6 changes: 3 additions & 3 deletions api/curvebs/agent/cluster.go
Original file line number Diff line number Diff line change
@@ -28,8 +28,8 @@ import (
comm "github.com/opencurve/curve-manager/api/common"
"github.com/opencurve/curve-manager/internal/common"
"github.com/opencurve/curve-manager/internal/errno"
bshttp "github.com/opencurve/curve-manager/internal/http/curvebs"
"github.com/opencurve/curve-manager/internal/metrics/bsmetric"
bsrpc "github.com/opencurve/curve-manager/internal/rpc/curvebs"
"github.com/opencurve/pigeon"
)

@@ -47,7 +47,7 @@ type ClusterStatus struct {
func GetClusterSpace(l *pigeon.Logger, rId string) (interface{}, errno.Errno) {
result := Space{}
// get logical pools form mds
pools, err := bsrpc.GMdsClient.ListLogicalPool()
pools, err := bshttp.GMdsClient.ListLogicalPool()
if err != nil {
l.Error("GetClusterSpace bsrpc.ListLogicalPool failed",
pigeon.Field("error", err),
@@ -111,7 +111,7 @@ func GetClusterPerformance(r *pigeon.Request, start, end, interval uint64) (inte
func GetClusterStatus(l *pigeon.Logger, rId string) interface{} {
clusterStatus := ClusterStatus{}
// 1. get pool numbers in cluster
pools, err := bsrpc.GMdsClient.ListLogicalPool()
pools, err := bshttp.GMdsClient.ListLogicalPool()
if err != nil {
clusterStatus.Healthy = false
clusterStatus.PoolNum = 0
12 changes: 6 additions & 6 deletions api/curvebs/agent/copyset.go
Original file line number Diff line number Diff line change
@@ -29,8 +29,8 @@ import (

set "github.com/deckarep/golang-set/v2"
"github.com/opencurve/curve-manager/internal/common"
bshttp "github.com/opencurve/curve-manager/internal/http/curvebs"
"github.com/opencurve/curve-manager/internal/metrics/bsmetric"
bsrpc "github.com/opencurve/curve-manager/internal/rpc/curvebs"
)

const (
@@ -121,7 +121,7 @@ func (cs *Copyset) updatePeerOfflineCopysets(csAddr string) error {
if err != nil {
return err
}
copysets, err := bsrpc.GMdsClient.GetCopySetsInChunkServer(item[0], uint32(port))
copysets, err := bshttp.GMdsClient.GetCopySetsInChunkServer(item[0], uint32(port))
if err != nil {
return fmt.Errorf("GetCopySetsInChunkServer failed, %s", err)
}
@@ -134,7 +134,7 @@ func (cs *Copyset) updatePeerOfflineCopysets(csAddr string) error {
if len(copysets) > 0 {
logicalPoolId = copysets[0].LogicalPoolId
}
memberInfo, err := bsrpc.GMdsClient.GetChunkServerListInCopySets(logicalPoolId, copysetIds)
memberInfo, err := bshttp.GMdsClient.GetChunkServerListInCopySets(logicalPoolId, copysetIds)
if err != nil {
return fmt.Errorf("GetChunkServerListInCopySets failed, %s", err)
}
@@ -184,7 +184,7 @@ func (cs *Copyset) ifChunkServerInCopysets(csAddr string, groupIds *set.Set[stri
logicalPoolId = getPoolIdFormGroupId(ngid)
copysetIds = append(copysetIds, getCopysetIdFromGroupId(ngid))
}
memberInfo, err := bsrpc.GMdsClient.GetChunkServerListInCopySets(logicalPoolId, copysetIds)
memberInfo, err := bshttp.GMdsClient.GetChunkServerListInCopySets(logicalPoolId, copysetIds)
if err != nil {
return nil, fmt.Errorf("GetChunkServerListInCopySets failed, %s", err)
}
@@ -365,7 +365,7 @@ func (cs *Copyset) checkCopysetsOnChunkServer(csAddr string, status []map[string

func (cs *Copyset) checkCopysetsWithMds() (bool, error) {
// get copysets in cluster
csInfos, err := bsrpc.GMdsClient.GetCopySetsInCluster()
csInfos, err := bshttp.GMdsClient.GetCopySetsInCluster()
if err != nil {
return false, fmt.Errorf("GetCopySetsInCluster failed, %s", err)
}
@@ -410,7 +410,7 @@ func (cs *Copyset) checkCopysetsWithMds() (bool, error) {
func (cs *Copyset) checkCopysetsInCluster() (bool, error) {
healthy := true
// 2.1 get chunkservers in cluster
chunkservers, err := bsrpc.GMdsClient.GetChunkServerInCluster()
chunkservers, err := bshttp.GMdsClient.GetChunkServerInCluster()
if err != nil {
return false, fmt.Errorf("GetChunkServerInCluster failed, %s", err)
}
7 changes: 3 additions & 4 deletions api/curvebs/agent/service_status.go
Original file line number Diff line number Diff line change
@@ -24,13 +24,12 @@ package agent

import (
"fmt"
bshttp "github.com/opencurve/curve-manager/internal/http/curvebs"

"github.com/SeanHai/curve-go-rpc/rpc/curvebs"
comm "github.com/opencurve/curve-manager/api/common"
"github.com/opencurve/curve-manager/internal/common"
"github.com/opencurve/curve-manager/internal/errno"
"github.com/opencurve/curve-manager/internal/metrics/bsmetric"
bsrpc "github.com/opencurve/curve-manager/internal/rpc/curvebs"
"github.com/opencurve/pigeon"
)

@@ -91,7 +90,7 @@ func GetSnapShotCloneServerStatus(r *pigeon.Request) (interface{}, errno.Errno)
func GetChunkServerStatus(l *pigeon.Logger, rId string) (interface{}, errno.Errno) {
var result ChunkServerStatus
// get chunkserver form mds
chunkservers, err := bsrpc.GMdsClient.GetChunkServerInCluster()
chunkservers, err := bshttp.GMdsClient.GetChunkServerInCluster()
if err != nil {
l.Error("GetChunkServerStatus bsrpc.GetChunkServerInCluster failed",
pigeon.Field("error", err),
@@ -103,7 +102,7 @@ func GetChunkServerStatus(l *pigeon.Logger, rId string) (interface{}, errno.Errn
var endponits []string
for _, cs := range chunkservers {
endpoint := fmt.Sprintf("%s:%d", cs.HostIp, cs.Port)
if cs.OnlineStatus == curvebs.ONLINE_STATUS {
if cs.OnlineStatus == bshttp.ONLINE_STATUS {
online += 1
endponits = append(endponits, endpoint)
} else {
40 changes: 40 additions & 0 deletions api/curvebs/agent/test_mdsLeader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package agent

import (
"encoding/json"
"fmt"
"github.com/go-resty/resty/v2"
"github.com/opencurve/curve-manager/internal/common"
"net/url"
"testing"
)

func TestMdsClient_ListPhysicalPool_http(t *testing.T) {

}
func TestGetCurrentClusterServicesAddr() (clusterServicesAddr, error) {
ret := clusterServicesAddr{}
httpClient := common.GetHttpClient()
url := (&url.URL{
Scheme: "http",
Host: "127.0.0.1:11000",
Path: "/",
RawQuery: fmt.Sprintf("%s=%s", "method", CLUSTER_SERVICES_ADDRESS),
}).String()

resp, err := resty.NewWithClient(httpClient).R().
SetHeader("Connection", "Keep-Alive").
SetHeader("Content-Type", "application/json").
SetHeader("User-Agent", "Curve-Manager").
Execute("GET", url)
if err != nil {
return ret, fmt.Errorf("getClusterServicesAddr failed: %v", err)
}

respStruct := admHttpResponse{}
err = json.Unmarshal([]byte(resp.String()), &respStruct)
if err != nil {
return ret, fmt.Errorf("Unmarshal getClusterServicesAddr response failed, resp = %s, err = %v", resp.String(), err)
}
return respStruct.Data, nil
}
18 changes: 9 additions & 9 deletions api/curvebs/agent/topology.go
Original file line number Diff line number Diff line change
@@ -23,15 +23,15 @@
package agent

import (
"github.com/opencurve/curve-manager/internal/http/curvebs"
"sort"

"github.com/SeanHai/curve-go-rpc/rpc/curvebs"
comm "github.com/opencurve/curve-manager/api/common"
"github.com/opencurve/curve-manager/internal/common"
"github.com/opencurve/curve-manager/internal/errno"
bshttp "github.com/opencurve/curve-manager/internal/http/curvebs"
"github.com/opencurve/curve-manager/internal/metrics/bsmetric"
metricomm "github.com/opencurve/curve-manager/internal/metrics/common"
bsrpc "github.com/opencurve/curve-manager/internal/rpc/curvebs"
"github.com/opencurve/pigeon"
)

@@ -92,7 +92,7 @@ func listChunkServer(pools *[]Pool, size int) error {
for zIndex, zone := range pool.Zones {
for sIndex, server := range zone.Servers {
go func(id uint32, addr *Server) {
chunkservers, err := bsrpc.GMdsClient.ListChunkServer(id)
chunkservers, err := bshttp.GMdsClient.ListChunkServer(id)
ret <- common.QueryResult{
Key: addr,
Result: chunkservers,
@@ -125,7 +125,7 @@ func listZoneServer(pools *[]Pool, size int) error {
for pIndex, pool := range *pools {
for zIndex, zone := range pool.Zones {
go func(id uint32, addr *Zone) {
servers, err := bsrpc.GMdsClient.ListZoneServer(id)
servers, err := bshttp.GMdsClient.ListZoneServer(id)
ret <- common.QueryResult{
Key: addr,
Result: servers,
@@ -165,7 +165,7 @@ func listPoolZone(pools *[]Pool) error {
number := 0
for index, pool := range *pools {
go func(id uint32, addr *Pool) {
zones, err := bsrpc.GMdsClient.ListPoolZone(id)
zones, err := bshttp.GMdsClient.ListPoolZone(id)
ret <- common.QueryResult{
Key: addr,
Result: zones,
@@ -194,7 +194,7 @@ func listPoolZone(pools *[]Pool) error {

func getPoolSpace(pools *[]PoolInfo) error {
// get can be recycled space
_, recycledSize, err := bsrpc.GMdsClient.GetFileAllocatedSize(RECYCLEBIN_DIR)
_, recycledSize, err := bshttp.GMdsClient.GetFileAllocatedSize(RECYCLEBIN_DIR)
if err != nil {
return err
}
@@ -312,7 +312,7 @@ func sortTopology(pools []Pool) {
func ListLogicalPool(r *pigeon.Request) (interface{}, errno.Errno) {
result := []PoolInfo{}
// get info from mds
pools, err := bsrpc.GMdsClient.ListLogicalPool()
pools, err := bshttp.GMdsClient.ListLogicalPool()
if err != nil {
r.Logger().Error("ListLogicalPool bsrpc.ListLogicalPool failed",
pigeon.Field("error", err),
@@ -353,7 +353,7 @@ func ListLogicalPool(r *pigeon.Request) (interface{}, errno.Errno) {
}

func GetLogicalPool(r *pigeon.Request, poolId uint32, start, end, interval uint64) (interface{}, errno.Errno) {
pool, err := bsrpc.GMdsClient.GetLogicalPool(poolId)
pool, err := bshttp.GMdsClient.GetLogicalPool(poolId)
if err != nil {
r.Logger().Error("GetLogicalPool bsrpc.GetLogicalPool failed",
pigeon.Field("poolId", poolId),
@@ -411,7 +411,7 @@ func GetLogicalPool(r *pigeon.Request, poolId uint32, start, end, interval uint6

func ListTopology(r *pigeon.Request) (interface{}, errno.Errno) {
result := []Pool{}
logicalPools, err := bsrpc.GMdsClient.ListLogicalPool()
logicalPools, err := bshttp.GMdsClient.ListLogicalPool()
if err != nil {
r.Logger().Error("ListTopology bsrpc.ListLogicalPool failed",
pigeon.Field("error", err),
Loading