@@ -18,15 +18,21 @@ package log
18
18
import (
19
19
"fmt"
20
20
"strings"
21
+ "time"
21
22
22
23
"github.com/nathanaelle/syslog5424/v2"
24
+ circuit "github.com/rubyist/circuitbreaker"
23
25
"github.com/sirupsen/logrus"
24
26
)
25
27
26
28
type RFC5424Hook struct {
27
29
syslog * syslog5424.Syslog
28
30
sender * syslog5424.Sender
29
31
msgID string
32
+
33
+ // Use a circuit breaker to pause sending messages to the syslog target
34
+ // in the presence of connection errors.
35
+ cb * circuit.Breaker
30
36
}
31
37
32
38
func (r RFC5424Hook ) Levels () []logrus.Level {
@@ -58,7 +64,13 @@ func (r RFC5424Hook) Fire(entry *logrus.Entry) (err error) {
58
64
59
65
msg := strings .Join (messages , " " )
60
66
61
- r .syslog .Channel (sev ).Msgid (r .msgID ).Log (msg )
67
+ // Do not perform any action unless the circuit breaker is either closed (reset), or is ready to retry.
68
+ if r .cb .Ready () {
69
+ r .syslog .Channel (sev ).Msgid (r .msgID ).Log (msg )
70
+ // Register any call as successful to enable automatic resets.
71
+ // Failures are registered asynchronously by the goroutine that consumes errors from the corresponding channel.
72
+ r .cb .Success ()
73
+ }
62
74
63
75
return
64
76
}
@@ -74,7 +86,9 @@ func NewRFC5424Hook(opts Target) (logrus.Hook, error) {
74
86
return nil , err
75
87
}
76
88
77
- slConn , _ , err := syslog5424 .Dial (opts .SyslogProto , opts .SyslogAddr )
89
+ // syslog5424.Dial() returns an error channel, which needs to be drained
90
+ // in order to avoid blocking.
91
+ slConn , errCh , err := syslog5424 .Dial (opts .SyslogProto , opts .SyslogAddr )
78
92
if err != nil {
79
93
return nil , err
80
94
}
@@ -84,10 +98,64 @@ func NewRFC5424Hook(opts Target) (logrus.Hook, error) {
84
98
return nil , err
85
99
}
86
100
87
- return & RFC5424Hook {syslog : syslogServer , sender : slConn , msgID : opts .SyslogMsgID }, nil
101
+ r := & RFC5424Hook {
102
+ syslog : syslogServer , sender : slConn , msgID : opts .SyslogMsgID ,
103
+ // We can change the circuit breaker settings as desired - including making
104
+ // them configurable and/or dynamically adjustable based on runtime conditions.
105
+ //
106
+ // Please note, however, that a 3-failure threshold breaker with default settings
107
+ // was found to work well with varying load and different states of a log target.
108
+ // Specifically, the breaker will remain tripped when sending messages to the target
109
+ // that is consistently failing, and will reset quickly when delivery begins to succeed.
110
+ cb : circuit .NewThresholdBreaker (3 ),
111
+ }
112
+
113
+ // A signal channel that is used to stop the goroutine reporting on circuit breaker state changes.
114
+ doneCh := make (chan struct {})
115
+
116
+ // Consume errors from errCh until it is closed.
117
+ go func () {
118
+ for {
119
+ err , ok := <- errCh
120
+ if err != nil {
121
+ r .cb .Fail () // Register a failure with the circuit breaker.
122
+ }
123
+ if ! ok {
124
+ close (doneCh )
125
+ return
126
+ }
127
+ }
128
+ }()
129
+
130
+ // Report on circuit breaker state changes.
131
+ cbStateCh := r .cb .Subscribe ()
132
+ go func () {
133
+ for {
134
+ select {
135
+ case e , ok := <- cbStateCh :
136
+ if ! ok {
137
+ return
138
+ }
139
+ var state string
140
+ switch e {
141
+ case circuit .BreakerTripped :
142
+ state = "too many connection errors, log delivery is stopped until this improves"
143
+ case circuit .BreakerReset :
144
+ state = "resuming log delivery"
145
+ default :
146
+ continue
147
+ }
148
+ fmt .Println (time .Now ().Format (time .RFC3339 ), "syslog target" , opts .SyslogAddr , "(" + opts .SyslogTag + "):" , state )
149
+ case <- doneCh :
150
+ return
151
+ }
152
+ }
153
+ }()
154
+
155
+ return r , nil
88
156
}
89
157
90
158
func (r RFC5424Hook ) Close () error {
91
- r .sender .End ()
159
+ r .sender .End () // This will also close errCh returned by syslog.Dial() in NewRFC5424Hook(), causing related goroutines to exit.
92
160
return nil
93
161
}
0 commit comments