Skip to content

Commit 899d495

Browse files
committed
publish total shards on a node
Signed-off-by: avinash kumar <avee137@gmail.com>
1 parent 4b049f8 commit 899d495

File tree

3 files changed

+198
-0
lines changed

3 files changed

+198
-0
lines changed

collector/shards.go

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
// Copyright 2021 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
package collector
14+
15+
import (
16+
"encoding/json"
17+
"fmt"
18+
"net/http"
19+
"net/url"
20+
"path"
21+
22+
"github.com/go-kit/kit/log"
23+
"github.com/go-kit/kit/log/level"
24+
"github.com/prometheus/client_golang/prometheus"
25+
)
26+
27+
var (
28+
defaultNodeShardLabels = []string{"node"}
29+
30+
defaultNodeShardLabelValues = func(node string) []string {
31+
return []string{
32+
node,
33+
}
34+
}
35+
)
36+
37+
// Shards information struct
38+
type Shards struct {
39+
logger log.Logger
40+
client *http.Client
41+
url *url.URL
42+
43+
nodeShardMetrics []*nodeShardMetric
44+
jsonParseFailures prometheus.Counter
45+
}
46+
47+
// NodeShard Information per node struct
48+
type NodeShard struct {
49+
node string
50+
shards int64
51+
}
52+
53+
type nodeShardMetric struct {
54+
Type prometheus.ValueType
55+
Desc *prometheus.Desc
56+
Value func(shards float64) float64
57+
Labels func(node string) []string
58+
}
59+
60+
// NewShards defines Shards Prometheus metrics
61+
func NewShards(logger log.Logger, client *http.Client, url *url.URL) *Shards {
62+
return &Shards{
63+
logger: logger,
64+
client: client,
65+
url: url,
66+
67+
nodeShardMetrics: []*nodeShardMetric{
68+
{
69+
Type: prometheus.GaugeValue,
70+
Desc: prometheus.NewDesc(
71+
prometheus.BuildFQName(namespace, "node_shards", "total"),
72+
"Total shards per node",
73+
defaultNodeShardLabels, nil,
74+
),
75+
Value: func(shards float64) float64 {
76+
return shards
77+
},
78+
Labels: defaultNodeShardLabelValues,
79+
}},
80+
81+
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
82+
Name: prometheus.BuildFQName(namespace, "node_shards", "json_parse_failures"),
83+
Help: "Number of errors while parsing JSON.",
84+
}),
85+
}
86+
}
87+
88+
// Describe Shards
89+
func (s *Shards) Describe(ch chan<- *prometheus.Desc) {
90+
ch <- s.jsonParseFailures.Desc()
91+
92+
for _, metric := range s.nodeShardMetrics {
93+
ch <- metric.Desc
94+
}
95+
}
96+
97+
func (s *Shards) getAndParseURL(u *url.URL, data interface{}) error {
98+
res, err := s.client.Get(u.String())
99+
if err != nil {
100+
return fmt.Errorf("failed to get from %s://%s:%s%s: %s",
101+
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
102+
}
103+
104+
defer func() {
105+
err = res.Body.Close()
106+
if err != nil {
107+
_ = level.Warn(s.logger).Log(
108+
"msg", "failed to close http.Client",
109+
"err", err,
110+
)
111+
}
112+
}()
113+
114+
if res.StatusCode != http.StatusOK {
115+
return fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
116+
}
117+
118+
if err := json.NewDecoder(res.Body).Decode(data); err != nil {
119+
s.jsonParseFailures.Inc()
120+
return err
121+
}
122+
return nil
123+
}
124+
125+
func (s *Shards) fetchAndDecodeShards() ([]ShardResponse, error) {
126+
127+
u := *s.url
128+
u.Path = path.Join(u.Path, "/_cat/shards")
129+
q := u.Query()
130+
q.Set("format", "json")
131+
u.RawQuery = q.Encode()
132+
u.RawPath = q.Encode()
133+
var sfr []ShardResponse
134+
err := s.getAndParseURL(&u, &sfr)
135+
if err != nil {
136+
return sfr, err
137+
}
138+
return sfr, err
139+
}
140+
141+
// Collect number of shards on each nodes
142+
func (s *Shards) Collect(ch chan<- prometheus.Metric) {
143+
144+
defer func() {
145+
ch <- s.jsonParseFailures
146+
}()
147+
148+
sr, err := s.fetchAndDecodeShards()
149+
if err != nil {
150+
_ = level.Warn(s.logger).Log(
151+
"msg", "failed to fetch and decode cluster settings stats",
152+
"err", err,
153+
)
154+
return
155+
}
156+
157+
nodeShards := make(map[string]float64)
158+
159+
for _, shard := range sr {
160+
if val, ok := nodeShards[shard.Node]; ok {
161+
nodeShards[shard.Node] = val + 1
162+
} else {
163+
nodeShards[shard.Node] = 1
164+
}
165+
}
166+
167+
for node, shards := range nodeShards {
168+
for _, metric := range s.nodeShardMetrics {
169+
ch <- prometheus.MustNewConstMetric(
170+
metric.Desc,
171+
metric.Type,
172+
metric.Value(shards),
173+
metric.Labels(node)...,
174+
)
175+
}
176+
}
177+
}

collector/shards_response.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright 2021 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
package collector
14+
15+
// ShardResponse is a representation of an Elasticsearch Shard
16+
type ShardResponse struct {
17+
Index string `json:"index"`
18+
Shard string `json:"shard"`
19+
Node string `json:"node"`
20+
}

main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ func main() {
167167
prometheus.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode))
168168

169169
if *esExportIndices || *esExportShards {
170+
prometheus.MustRegister(collector.NewShards(logger, httpClient, esURL))
170171
iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards)
171172
prometheus.MustRegister(iC)
172173
if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil {

0 commit comments

Comments
 (0)