Skip to content

Commit 9d10852

Browse files
authored
Kafka tailer integration (#129)
* Adding changes for kafka tailer integration * Deleted vendor directory
1 parent fb42b53 commit 9d10852

File tree

8 files changed

+418
-7
lines changed

8 files changed

+418
-7
lines changed

CONFIG.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,37 @@ input:
208208
209209
This configuration example may be found in the examples directory [here](example/config_logstash_http_input_ipv6.yml).
210210
211+
### Kafka Input Type
212+
213+
The `grok_exporter` is also capable of consuming log entries from Kafka. Currently, only plain-text encoded messages are supported.
214+
215+
```yaml
216+
input:
217+
type: kafka
218+
# Version corresponding to the kafka cluster
219+
kafka_version: 2.1.0
220+
221+
# The list of the Kafka brokers part of the Kafka cluster. Please note that you need an instance of grok_exporter per Kafka cluster if you plan on consuming from topics from multiple clusters.
222+
kafka_brokers:
223+
- localhost:9092
224+
225+
# The list of Kafka topics to consume from.
226+
kafka_topics:
227+
- grok_exporter_test
228+
229+
# The assignor to use, which can be either range, roundrobin, sticky (range by default)
230+
kafka_partition_assignor: range
231+
232+
# The name of the consumer group to register as on the broker. If not specified, the default is 'grok_exporter'
233+
kafka_consumer_group_name: grok_exporter
234+
235+
# Indicates if the exporter should start consuming as of the most recent messages in the topic (true), or consume from the earliest messages in the topic (false).
236+
kafka_consume_from_oldest: false
237+
```
238+
239+
This configuration example may be found in the examples directory [here](example/config-kafka.yml).
240+
241+
211242
imports Section
212243
---------------
213244

KAFKA_TESTING.md

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
## How to test the Kafka integration
2+
3+
1. Ensure you have docker setup on your system.
4+
5+
2. Create the following config and save it as `docker-compose-kafka-single.yml`
6+
```
7+
---
8+
version: '2'
9+
10+
services:
11+
zookeeper:
12+
image: confluentinc/cp-zookeeper:latest
13+
hostname: zookeeper
14+
ports:
15+
- 32181:32181
16+
environment:
17+
ZOOKEEPER_CLIENT_PORT: 32181
18+
ZOOKEEPER_TICK_TIME: 2000
19+
extra_hosts:
20+
- "moby:127.0.0.1"
21+
- "localhost: 127.0.0.1"
22+
23+
kafka:
24+
image: confluentinc/cp-kafka:latest
25+
hostname: kafka
26+
ports:
27+
- 9092:9092
28+
depends_on:
29+
- zookeeper
30+
environment:
31+
KAFKA_BROKER_ID: 1
32+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
33+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
34+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
35+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
36+
extra_hosts:
37+
- "moby:127.0.0.1"
38+
- "localhost: 127.0.0.1"
39+
```
40+
41+
3. Create the kafka cluster:
42+
```
43+
docker-compose -f docker-compose-kafka-single.yml up
44+
```
45+
46+
4. Create the necessary topic:
47+
```
48+
docker-compose -f docker-compose-kafka-single.yml exec kafka kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic grok_exporter_test
49+
```
50+
51+
5. Publish a sample test message:
52+
```
53+
docker-compose -f docker-compose-kafka-single.yml exec bash -c "echo 'this is a test' | kafka-console-producer --request-required-acks 1 --broker-list localhost:9092 --topic grok_exporter_test"
54+
```
55+
56+
6. Given that the grok_exporter was properly configured and you're matching for a string in the message you've previously published to kafka, you should have matches that appear on the metrics page.

config/v3/configV3.go

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,24 @@ package v3
1616

1717
import (
1818
"fmt"
19-
v2 "github.com/fstab/grok_exporter/config/v2"
20-
"github.com/fstab/grok_exporter/tailer/glob"
21-
"github.com/fstab/grok_exporter/template"
22-
"gopkg.in/yaml.v2"
2319
"os"
20+
"regexp"
2421
"strconv"
2522
"strings"
2623
"time"
24+
25+
v2 "github.com/fstab/grok_exporter/config/v2"
26+
"github.com/fstab/grok_exporter/tailer/glob"
27+
"github.com/fstab/grok_exporter/template"
28+
"gopkg.in/yaml.v2"
2729
)
2830

2931
const (
3032
defaultRetentionCheckInterval = 53 * time.Second
3133
inputTypeStdin = "stdin"
3234
inputTypeFile = "file"
3335
inputTypeWebhook = "webhook"
36+
inputTypeKafka = "kafka"
3437
importMetricsType = "metrics"
3538
importPatternsType = "grok_patterns"
3639
)
@@ -102,6 +105,12 @@ type InputConfig struct {
102105
WebhookFormat string `yaml:"webhook_format,omitempty"`
103106
WebhookJsonSelector string `yaml:"webhook_json_selector,omitempty"`
104107
WebhookTextBulkSeparator string `yaml:"webhook_text_bulk_separator,omitempty"`
108+
KafkaVersion string `yaml:"kafka_version,omitempty"`
109+
KafkaBrokers []string `yaml:"kafka_brokers,omitempty"`
110+
KafkaTopics []string `yaml:"kafka_topics,omitempty"`
111+
KafkaPartitionAssignor string `yaml:"kafka_partition_assignor,omitempty"`
112+
KafkaConsumerGroupName string `yaml:"kafka_consumer_group_name,omitempty"`
113+
KafkaConsumeFromOldest bool `yaml:"kafka_consume_from_oldest,omitempty"`
105114
}
106115

107116
type GrokPatternsConfig []string
@@ -268,6 +277,19 @@ func (c *InputConfig) addDefaults() {
268277
c.WebhookTextBulkSeparator = "\n\n"
269278
}
270279
}
280+
if c.Type == inputTypeKafka {
281+
c.KafkaConsumeFromOldest = false
282+
283+
if c.KafkaPartitionAssignor == "" {
284+
c.KafkaPartitionAssignor = "range"
285+
}
286+
if c.KafkaVersion == "" {
287+
c.KafkaVersion = "2.1.0"
288+
}
289+
if c.KafkaConsumerGroupName == "" {
290+
c.KafkaConsumerGroupName = "grok_exporter"
291+
}
292+
}
271293
}
272294

273295
func (c *GrokPatternsConfig) addDefaults() {}
@@ -403,6 +425,26 @@ func (c *InputConfig) validate() error {
403425
if c.WebhookFormat == "text_bulk" && c.WebhookTextBulkSeparator == "" {
404426
return fmt.Errorf("invalid input configuration: 'input.webhook_text_bulk_separator' is required for input type \"webhook\" and webhook_format \"text_bulk\"")
405427
}
428+
case c.Type == inputTypeKafka:
429+
if len(c.KafkaBrokers) == 0 {
430+
return fmt.Errorf("invalid input configuration: Kafka 'input.kafka_brokers' cannot be empty")
431+
}
432+
if len(c.KafkaTopics) == 0 {
433+
return fmt.Errorf("invalid input configuration: Kafka 'input.kafka_topics' cannot be empty")
434+
}
435+
436+
matched, _ := regexp.MatchString(`^[0-9]\.[0-9]\.[0-9]$`, c.KafkaVersion)
437+
if !matched {
438+
return fmt.Errorf("invalid input configuration: Kafka 'input.kafka_version' must a valid semantic version X.Y.Z")
439+
}
440+
441+
versionParts := strings.Split(c.KafkaVersion, ".")
442+
vMajor, vMajorErr := strconv.Atoi(versionParts[0])
443+
vMinor, vMinorErr := strconv.Atoi(versionParts[1])
444+
if vMajorErr != nil && vMinorErr != nil && vMajor < 1 && vMinor < 8 {
445+
return fmt.Errorf("invalid input configuration: Kafka 'input.kafka_version' must be >= 0.8.0")
446+
}
447+
406448
default:
407449
return fmt.Errorf("unsupported 'input.type': %v", c.Type)
408450
}

example/config-kafka.yml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
global:
2+
config_version: 3
3+
input:
4+
type: kafka
5+
kafka_version: 2.1.0
6+
kafka_brokers:
7+
- localhost:9092
8+
kafka_topics:
9+
- grok_exporter_test
10+
kafka_consumer_group_name: grok_exporter
11+
kafka_consume_from_oldest: true
12+
imports:
13+
- type: grok_patterns
14+
dir: ./logstash-patterns-core/patterns
15+
metrics:
16+
- type: counter
17+
name: test_strings
18+
help: Test string detected.
19+
match: 'test'
20+
labels:
21+
logfile: '{{base .logfile}}'
22+
server:
23+
protocol: http
24+
port: 9144
25+

go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
module github.com/fstab/grok_exporter
22

33
require (
4+
github.com/Shopify/sarama v1.27.0
45
github.com/bitly/go-simplejson v0.5.0
56
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
7+
github.com/ofabry/go-callvis v0.6.1 // indirect
68
github.com/prometheus/client_golang v1.7.1
79
github.com/prometheus/client_model v0.2.0
810
github.com/prometheus/common v0.10.0
911
github.com/sirupsen/logrus v1.6.0
1012
golang.org/x/exp v0.0.0-20191227195350-da58074b4299
13+
golang.org/x/image v0.0.0-20200801110659-972c09e46d76 // indirect
14+
golang.org/x/tools v0.0.0-20200828161849-5deb26317202 // indirect
15+
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
1116
gopkg.in/yaml.v2 v2.3.0
1217
)
1318

0 commit comments

Comments
 (0)