diff --git a/cmd/kepler/main.go b/cmd/kepler/main.go index 32b1cac551..2902aeb990 100644 --- a/cmd/kepler/main.go +++ b/cmd/kepler/main.go @@ -155,8 +155,19 @@ func createServices(logger *slog.Logger, cfg *config.Config) ([]service.Service, apiServer := server.NewAPIServer( server.WithLogger(logger), ) + + collectors, err := prometheus.CreateCollectors( + pm, + prometheus.WithLogger(logger), + prometheus.WithProcFSPath(cfg.Host.ProcFS), + ) // TODO: enable exporters based on config / flags - promExporter := prometheus.NewExporter(pm, apiServer, prometheus.WithLogger(logger)) + promExporter := prometheus.NewExporter( + pm, + apiServer, + prometheus.WithLogger(logger), + prometheus.WithCollectors(collectors), + ) return []service.Service{ promExporter, diff --git a/internal/exporter/prometheus/collectors/build_info.go b/internal/exporter/prometheus/collectors/build_info.go index 2033833fcc..5145eaca93 100644 --- a/internal/exporter/prometheus/collectors/build_info.go +++ b/internal/exporter/prometheus/collectors/build_info.go @@ -4,7 +4,7 @@ package collectors import ( - "github.com/prometheus/client_golang/prometheus" + prom "github.com/prometheus/client_golang/prometheus" "github.com/sustainable-computing-io/kepler/internal/version" ) @@ -14,13 +14,13 @@ const ( ) type BuildInfoCollector struct { - buildInfo *prometheus.GaugeVec + buildInfo *prom.GaugeVec } // NewBuildInfoCollector creates a new collector for build information func NewBuildInfoCollector() *BuildInfoCollector { - buildInfo := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + buildInfo := prom.NewGaugeVec( + prom.GaugeOpts{ Namespace: namespace, Subsystem: buildSubsystem, Name: "info", @@ -34,11 +34,11 @@ func NewBuildInfoCollector() *BuildInfoCollector { } } -func (c *BuildInfoCollector) Describe(ch chan<- *prometheus.Desc) { +func (c *BuildInfoCollector) Describe(ch chan<- *prom.Desc) { c.buildInfo.Describe(ch) } -func (c *BuildInfoCollector) Collect(ch chan<- prometheus.Metric) { +func (c *BuildInfoCollector) Collect(ch chan<- prom.Metric) { info := version.Info() c.buildInfo.WithLabelValues( diff --git a/internal/exporter/prometheus/collectors/cpuinfo.go b/internal/exporter/prometheus/collectors/cpuinfo.go new file mode 100644 index 0000000000..d43684298f --- /dev/null +++ b/internal/exporter/prometheus/collectors/cpuinfo.go @@ -0,0 +1,89 @@ +// SPDX-FileCopyrightText: 2025 The Kepler Authors +// SPDX-License-Identifier: Apache-2.0 + +package collectors + +import ( + "fmt" + "sync" + + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/procfs" +) + +// procFS is an interface for CPUInfo. +type procFS interface { + CPUInfo() ([]procfs.CPUInfo, error) +} + +type realProcFS struct { + fs procfs.FS +} + +func (r *realProcFS) CPUInfo() ([]procfs.CPUInfo, error) { + return r.fs.CPUInfo() +} + +func newProcFS(mountPoint string) (procFS, error) { + fs, err := procfs.NewFS(mountPoint) + if err != nil { + return nil, err + } + return &realProcFS{fs: fs}, nil +} + +// cpuInfoCollector collects CPU info metrics from procfs. +type cpuInfoCollector struct { + sync.Mutex + + fs procFS + desc *prom.Desc +} + +// NewCPUInfoCollector creates a CPUInfoCollector using a procfs mount path. +func NewCPUInfoCollector(procPath string) (*cpuInfoCollector, error) { + fs, err := newProcFS(procPath) + if err != nil { + return nil, fmt.Errorf("creating procfs failed: %w", err) + } + return newCPUInfoCollectorWithFS(fs), nil +} + +// newCPUInfoCollectorWithFS injects a procFS interface +func newCPUInfoCollectorWithFS(fs procFS) *cpuInfoCollector { + return &cpuInfoCollector{ + fs: fs, + desc: prom.NewDesc( + prom.BuildFQName(namespace, "", "cpu_info"), + "CPU information from procfs", + []string{"processor", "vendor_id", "model_name", "physical_id", "core_id"}, + nil, + ), + } +} + +func (c *cpuInfoCollector) Describe(ch chan<- *prom.Desc) { + ch <- c.desc +} + +func (c *cpuInfoCollector) Collect(ch chan<- prom.Metric) { + c.Lock() + defer c.Unlock() + + cpuInfos, err := c.fs.CPUInfo() + if err != nil { + return + } + for _, ci := range cpuInfos { + ch <- prom.MustNewConstMetric( + c.desc, + prom.GaugeValue, + 1, + fmt.Sprintf("%d", ci.Processor), + ci.VendorID, + ci.ModelName, + ci.PhysicalID, + ci.CoreID, + ) + } +} diff --git a/internal/exporter/prometheus/collectors/cpuinfo_test.go b/internal/exporter/prometheus/collectors/cpuinfo_test.go new file mode 100644 index 0000000000..36c159fad2 --- /dev/null +++ b/internal/exporter/prometheus/collectors/cpuinfo_test.go @@ -0,0 +1,189 @@ +// SPDX-FileCopyrightText: 2025 The Kepler Authors +// SPDX-License-Identifier: Apache-2.0 + +package collectors + +import ( + "errors" + "sync" + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/procfs" + "github.com/stretchr/testify/assert" +) + +// mockProcFS is a mock implementation of the procFS interface for testing. +type mockProcFS struct { + cpuInfoFunc func() ([]procfs.CPUInfo, error) +} + +func (m *mockProcFS) CPUInfo() ([]procfs.CPUInfo, error) { + return m.cpuInfoFunc() +} + +// sampleCPUInfo returns a sample CPUInfo slice for testing. +func sampleCPUInfo() []procfs.CPUInfo { + return []procfs.CPUInfo{ + { + Processor: 0, + VendorID: "GenuineIntel", + ModelName: "Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz", + PhysicalID: "0", + CoreID: "0", + }, + { + Processor: 1, + VendorID: "GenuineIntel", + ModelName: "Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz", + PhysicalID: "0", + CoreID: "1", + }, + } +} + +func expectedLabels() map[string]string { + return map[string]string{ + "processor": "", + "vendor_id": "", + "model_name": "", + "physical_id": "", + "core_id": "", + } +} + +// TestNewCPUInfoCollector tests the creation of a new CPUInfoCollector. +func TestNewCPUInfoCollector(t *testing.T) { + // Test successful creation with a mock procfs + collector, err := NewCPUInfoCollector("/proc") + assert.NoError(t, err) + assert.NotNil(t, collector) + assert.NotNil(t, collector.fs) + assert.NotNil(t, collector.desc) +} + +// TestNewCPUInfoCollectorWithFS tests the creation with an injected procFS. +func TestNewCPUInfoCollectorWithFS(t *testing.T) { + mockFS := &mockProcFS{ + cpuInfoFunc: func() ([]procfs.CPUInfo, error) { + return sampleCPUInfo(), nil + }, + } + collector := newCPUInfoCollectorWithFS(mockFS) + assert.NotNil(t, collector) + assert.Equal(t, mockFS, collector.fs) + assert.NotNil(t, collector.desc) + assert.Contains(t, collector.desc.String(), "kepler_cpu_info") + assert.Contains(t, collector.desc.String(), "variableLabels: {processor,vendor_id,model_name,physical_id,core_id}") +} + +// TestCPUInfoCollector_Describe tests the Describe method. +func TestCPUInfoCollector_Describe(t *testing.T) { + mockFS := &mockProcFS{ + cpuInfoFunc: func() ([]procfs.CPUInfo, error) { + return sampleCPUInfo(), nil + }, + } + collector := newCPUInfoCollectorWithFS(mockFS) + + ch := make(chan *prometheus.Desc, 1) + collector.Describe(ch) + close(ch) + + desc := <-ch + assert.Equal(t, collector.desc, desc) +} + +// TestCPUInfoCollector_Collect_Success tests the Collect method with valid CPU info. +func TestCPUInfoCollector_Collect_Success(t *testing.T) { + mockFS := &mockProcFS{ + cpuInfoFunc: func() ([]procfs.CPUInfo, error) { + return sampleCPUInfo(), nil + }, + } + collector := newCPUInfoCollectorWithFS(mockFS) + + ch := make(chan prometheus.Metric, 10) + collector.Collect(ch) + close(ch) + + var metrics []prometheus.Metric + for m := range ch { + metrics = append(metrics, m) + } + + assert.Len(t, metrics, 2, "expected two CPU info metrics") + + el := expectedLabels() + + for _, m := range metrics { + dtoMetric := &dto.Metric{} + err := m.Write(dtoMetric) + assert.NoError(t, err) + assert.NotNil(t, dtoMetric.Gauge) + assert.NotNil(t, dtoMetric.Gauge.Value) + assert.Equal(t, 1.0, *dtoMetric.Gauge.Value) + assert.NotNil(t, dtoMetric.Label) + for _, l := range dtoMetric.Label { + assert.NotNil(t, l.Name) + delete(el, *l.Name) + } + } + assert.Empty(t, el, "all expected labels not received") +} + +// TestCPUInfoCollector_Collect_Error tests the Collect method when CPUInfo fails. +func TestCPUInfoCollector_Collect_Error(t *testing.T) { + mockFS := &mockProcFS{ + cpuInfoFunc: func() ([]procfs.CPUInfo, error) { + return nil, errors.New("failed to read CPU info") + }, + } + collector := newCPUInfoCollectorWithFS(mockFS) + + ch := make(chan prometheus.Metric, 10) + collector.Collect(ch) + close(ch) + + var metrics []prometheus.Metric + for m := range ch { + metrics = append(metrics, m) + } + + assert.Len(t, metrics, 0, "expected no metrics on error") +} + +// TestCPUInfoCollector_Collect_Concurrency tests concurrent calls to Collect. +func TestCPUInfoCollector_Collect_Concurrency(t *testing.T) { + mockFS := &mockProcFS{ + cpuInfoFunc: func() ([]procfs.CPUInfo, error) { + return sampleCPUInfo(), nil + }, + } + collector := newCPUInfoCollectorWithFS(mockFS) + + const numGoroutines = 10 + var wg sync.WaitGroup + ch := make(chan prometheus.Metric, numGoroutines*len(sampleCPUInfo())) + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + collector.Collect(ch) + }() + } + + wg.Wait() + close(ch) + + var metrics []prometheus.Metric + for m := range ch { + metrics = append(metrics, m) + } + + // Expect numGoroutines * number of CPUs metrics + expectedMetrics := numGoroutines * len(sampleCPUInfo()) + assert.Equal(t, expectedMetrics, len(metrics), "expected metrics from all goroutines") +} diff --git a/internal/exporter/prometheus/prometheus.go b/internal/exporter/prometheus/prometheus.go index 41efe18391..80cdde74f0 100644 --- a/internal/exporter/prometheus/prometheus.go +++ b/internal/exporter/prometheus/prometheus.go @@ -29,6 +29,8 @@ type APIRegistry interface { type Opts struct { logger *slog.Logger debugCollectors map[string]bool + collectors map[string]prom.Collector + procfs string } // DefaultOpts() returns a new Opts with defaults set @@ -38,6 +40,7 @@ func DefaultOpts() Opts { debugCollectors: map[string]bool{ "go": true, }, + collectors: map[string]prom.Collector{}, } } @@ -60,6 +63,18 @@ func WithDebugCollectors(c *[]string) OptionFn { } } +func WithProcFSPath(procfs string) OptionFn { + return func(o *Opts) { + o.procfs = procfs + } +} + +func WithCollectors(c map[string]prom.Collector) OptionFn { + return func(o *Opts) { + o.collectors = c + } +} + // Exporter exports power data to Prometheus type Exporter struct { logger *slog.Logger @@ -67,6 +82,7 @@ type Exporter struct { registry *prom.Registry server APIRegistry debugCollectors map[string]bool + collectors map[string]prom.Collector } var _ Service = (*Exporter)(nil) @@ -78,15 +94,16 @@ func NewExporter(pm Monitor, s APIRegistry, applyOpts ...OptionFn) *Exporter { apply(&opts) } - monitor := &Exporter{ + exporter := &Exporter{ monitor: pm, server: s, logger: opts.logger.With("service", "prometheus"), debugCollectors: opts.debugCollectors, + collectors: opts.collectors, registry: prom.NewRegistry(), } - return monitor + return exporter } func collectorForName(name string) (prom.Collector, error) { @@ -100,6 +117,26 @@ func collectorForName(name string) (prom.Collector, error) { } } +func CreateCollectors(pm Monitor, applyOpts ...OptionFn) (map[string]prom.Collector, error) { + opts := Opts{ + logger: slog.Default(), + procfs: "/proc", + } + for _, apply := range applyOpts { + apply(&opts) + } + collectors := map[string]prom.Collector{ + "build_info": collectors.NewBuildInfoCollector(), + "power": collector.NewPowerCollector(pm, opts.logger), + } + cpuInfoCollector, err := collector.NewCPUInfoCollector(opts.procfs) + if err != nil { + return nil, err + } + collectors["cpu_info"] = cpuInfoCollector + return collectors, nil +} + // Start implements Exporter.Start func (e *Exporter) Start(ctx context.Context) error { e.logger.Info("Starting Prometheus exporter") @@ -114,13 +151,10 @@ func (e *Exporter) Start(ctx context.Context) error { e.registry.MustRegister(collector) } - // Register build info collector - buildInfoCollector := collector.NewBuildInfoCollector() - e.registry.MustRegister(buildInfoCollector) - - // Register power collector - powerCollector := collector.NewPowerCollector(e.monitor, e.logger) - e.registry.MustRegister(powerCollector) + for name, collector := range e.collectors { + e.logger.Info("Enabling collector", "collector", name) + e.registry.MustRegister(collector) + } err := e.server.Register("/metrics", "Metrics", "Prometheus metrics", promhttp.HandlerFor( diff --git a/internal/exporter/prometheus/prometheus_test.go b/internal/exporter/prometheus/prometheus_test.go index 64007827a3..dba4a76716 100644 --- a/internal/exporter/prometheus/prometheus_test.go +++ b/internal/exporter/prometheus/prometheus_test.go @@ -322,16 +322,17 @@ func TestDefaultOpts(t *testing.T) { func TestExporter_Integration(t *testing.T) { mockMonitor := &MockMonitor{} - mockMonitor.On("DataChannel").Return(make(<-chan struct{})) mockRegistry := &MockAPIRegistry{} mockRegistry.On("Register", "/metrics", "Metrics", "Prometheus metrics", mock.Anything).Return(nil) - // Create exporter with both collectors + dummyCollector := prom.CollectorFunc(func(ch chan<- prom.Metric) {}) + // Create exporter with dummyCollector exporter := NewExporter( mockMonitor, mockRegistry, WithDebugCollectors(&[]string{"go", "process"}), + WithCollectors(map[string]prom.Collector{"dummy": dummyCollector}), ) // Set up a cancellable context @@ -364,3 +365,22 @@ func TestExporter_Integration(t *testing.T) { err := exporter.Stop() assert.NoError(t, err) } + +func TestExporter_CreateCollectors(t *testing.T) { + mockMonitor := &MockMonitor{} + mockMonitor.On("DataChannel").Return(make(<-chan struct{})) + + // create Collectors + coll, err := CreateCollectors( + mockMonitor, + WithLogger(slog.Default()), + WithProcFSPath("/proc"), + ) + time.Sleep(50 * time.Millisecond) + + // Verify mocks + mockMonitor.AssertExpectations(t) + + assert.NoError(t, err) + assert.Len(t, coll, 3) +}