Skip to content

Commit c371d8c

Browse files
committed
rafthttp: version enforcement on rafthttp messages
This PR sets etcd version and min cluster version in request header, and let server check version compatibility. rafthttp server will reject any message from peer with incompatible version(too low version or too high version), and print out warning logs.
1 parent 8825af4 commit c371d8c

File tree

9 files changed

+290
-122
lines changed

9 files changed

+290
-122
lines changed

rafthttp/http.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package rafthttp
1616

1717
import (
18+
"errors"
1819
"io/ioutil"
1920
"log"
2021
"net/http"
@@ -34,6 +35,9 @@ const (
3435
var (
3536
RaftPrefix = "/raft"
3637
RaftStreamPrefix = path.Join(RaftPrefix, "stream")
38+
39+
errIncompatibleVersion = errors.New("incompatible version")
40+
errClusterIDMismatch = errors.New("cluster ID mismatch")
3741
)
3842

3943
func NewHandler(r Raft, cid types.ID) http.Handler {
@@ -72,13 +76,19 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
7276
return
7377
}
7478

79+
if err := checkVersionCompability(r.Header.Get("X-Server-From"), serverVersion(r.Header), minClusterVersion(r.Header)); err != nil {
80+
log.Printf("rafthttp: request received was ignored (%v)", err)
81+
http.Error(w, errIncompatibleVersion.Error(), http.StatusPreconditionFailed)
82+
return
83+
}
84+
7585
wcid := h.cid.String()
7686
w.Header().Set("X-Etcd-Cluster-ID", wcid)
7787

7888
gcid := r.Header.Get("X-Etcd-Cluster-ID")
7989
if gcid != wcid {
8090
log.Printf("rafthttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid)
81-
http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed)
91+
http.Error(w, errClusterIDMismatch.Error(), http.StatusPreconditionFailed)
8292
return
8393
}
8494

@@ -126,17 +136,23 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
126136
return
127137
}
128138

139+
w.Header().Set("X-Server-Version", version.Version)
140+
141+
if err := checkVersionCompability(r.Header.Get("X-Server-From"), serverVersion(r.Header), minClusterVersion(r.Header)); err != nil {
142+
log.Printf("rafthttp: request received was ignored (%v)", err)
143+
http.Error(w, errIncompatibleVersion.Error(), http.StatusPreconditionFailed)
144+
return
145+
}
146+
129147
wcid := h.cid.String()
130148
w.Header().Set("X-Etcd-Cluster-ID", wcid)
131149

132150
if gcid := r.Header.Get("X-Etcd-Cluster-ID"); gcid != wcid {
133151
log.Printf("rafthttp: streaming request ignored due to cluster ID mismatch got %s want %s", gcid, wcid)
134-
http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed)
152+
http.Error(w, errClusterIDMismatch.Error(), http.StatusPreconditionFailed)
135153
return
136154
}
137155

138-
w.Header().Add("X-Server-Version", version.Version)
139-
140156
var t streamType
141157
switch path.Dir(r.URL.Path) {
142158
// backward compatibility

rafthttp/peer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
124124
r: r,
125125
msgAppWriter: startStreamWriter(to, fs, r),
126126
writer: startStreamWriter(to, fs, r),
127-
pipeline: newPipeline(tr, picker, to, cid, fs, r, errorc),
127+
pipeline: newPipeline(tr, picker, local, to, cid, fs, r, errorc),
128128
sendc: make(chan raftpb.Message),
129129
recvc: make(chan raftpb.Message, recvBufSize),
130130
propc: make(chan raftpb.Message, maxPendingProposals),

rafthttp/pipeline.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ package rafthttp
1717
import (
1818
"bytes"
1919
"fmt"
20+
"io/ioutil"
2021
"log"
2122
"net/http"
23+
"strings"
2224
"sync"
2325
"time"
2426

@@ -27,6 +29,7 @@ import (
2729
"github.com/coreos/etcd/pkg/types"
2830
"github.com/coreos/etcd/raft"
2931
"github.com/coreos/etcd/raft/raftpb"
32+
"github.com/coreos/etcd/version"
3033
)
3134

3235
const (
@@ -39,8 +42,8 @@ const (
3942
)
4043

4144
type pipeline struct {
42-
id types.ID
43-
cid types.ID
45+
from, to types.ID
46+
cid types.ID
4447

4548
tr http.RoundTripper
4649
picker *urlPicker
@@ -58,9 +61,10 @@ type pipeline struct {
5861
errored error
5962
}
6063

61-
func newPipeline(tr http.RoundTripper, picker *urlPicker, id, cid types.ID, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline {
64+
func newPipeline(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline {
6265
p := &pipeline{
63-
id: id,
66+
from: from,
67+
to: to,
6468
cid: cid,
6569
tr: tr,
6670
picker: picker,
@@ -94,11 +98,11 @@ func (p *pipeline) handle() {
9498
reportSentFailure(pipelineMsg, m)
9599

96100
if p.errored == nil || p.errored.Error() != err.Error() {
97-
log.Printf("pipeline: error posting to %s: %v", p.id, err)
101+
log.Printf("pipeline: error posting to %s: %v", p.to, err)
98102
p.errored = err
99103
}
100104
if p.active {
101-
log.Printf("pipeline: the connection with %s became inactive", p.id)
105+
log.Printf("pipeline: the connection with %s became inactive", p.to)
102106
p.active = false
103107
}
104108
if m.Type == raftpb.MsgApp && p.fs != nil {
@@ -110,7 +114,7 @@ func (p *pipeline) handle() {
110114
}
111115
} else {
112116
if !p.active {
113-
log.Printf("pipeline: the connection with %s became active", p.id)
117+
log.Printf("pipeline: the connection with %s became active", p.to)
114118
p.active = true
115119
p.errored = nil
116120
}
@@ -138,19 +142,35 @@ func (p *pipeline) post(data []byte) error {
138142
return err
139143
}
140144
req.Header.Set("Content-Type", "application/protobuf")
145+
req.Header.Set("X-Server-From", p.from.String())
146+
req.Header.Set("X-Server-Version", version.Version)
147+
req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
141148
req.Header.Set("X-Etcd-Cluster-ID", p.cid.String())
142149
resp, err := p.tr.RoundTrip(req)
143150
if err != nil {
144151
p.picker.unreachable(u)
145152
return err
146153
}
154+
b, err := ioutil.ReadAll(resp.Body)
155+
if err != nil {
156+
p.picker.unreachable(u)
157+
return err
158+
}
147159
resp.Body.Close()
148160

149161
switch resp.StatusCode {
150162
case http.StatusPreconditionFailed:
151-
log.Printf("rafthttp: request sent was ignored due to cluster ID mismatch (remote[%s]:%s, local:%s)",
152-
uu.Host, resp.Header.Get("X-Etcd-Cluster-ID"), p.cid)
153-
return fmt.Errorf("cluster ID mismatch")
163+
switch strings.TrimSuffix(string(b), "\n") {
164+
case errIncompatibleVersion.Error():
165+
log.Printf("rafthttp: request sent was ignored by peer %s (server version incompatible)", p.to)
166+
return errIncompatibleVersion
167+
case errClusterIDMismatch.Error():
168+
log.Printf("rafthttp: request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
169+
p.to, resp.Header.Get("X-Etcd-Cluster-ID"), p.cid)
170+
return errClusterIDMismatch
171+
default:
172+
return fmt.Errorf("unhandled error %q when precondition failed", string(b))
173+
}
154174
case http.StatusForbidden:
155175
err := fmt.Errorf("the member has been permanently removed from the cluster")
156176
select {

rafthttp/pipeline_test.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package rafthttp
1616

1717
import (
1818
"errors"
19+
"io"
1920
"io/ioutil"
2021
"net/http"
2122
"sync"
@@ -25,6 +26,7 @@ import (
2526
"github.com/coreos/etcd/pkg/testutil"
2627
"github.com/coreos/etcd/pkg/types"
2728
"github.com/coreos/etcd/raft/raftpb"
29+
"github.com/coreos/etcd/version"
2830
)
2931

3032
// TestPipelineSend tests that pipeline could send data using roundtripper
@@ -33,7 +35,7 @@ func TestPipelineSend(t *testing.T) {
3335
tr := &roundTripperRecorder{}
3436
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
3537
fs := &stats.FollowerStats{}
36-
p := newPipeline(tr, picker, types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
38+
p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
3739

3840
p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
3941
p.stop()
@@ -52,7 +54,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) {
5254
tr := newRoundTripperBlocker()
5355
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
5456
fs := &stats.FollowerStats{}
55-
p := newPipeline(tr, picker, types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
57+
p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
5658

5759
// keep the sender busy and make the buffer full
5860
// nothing can go out as we block the sender
@@ -92,7 +94,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) {
9294
func TestPipelineSendFailed(t *testing.T) {
9395
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
9496
fs := &stats.FollowerStats{}
95-
p := newPipeline(newRespRoundTripper(0, errors.New("blah")), picker, types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
97+
p := newPipeline(newRespRoundTripper(0, errors.New("blah")), picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil)
9698

9799
p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
98100
p.stop()
@@ -107,7 +109,7 @@ func TestPipelineSendFailed(t *testing.T) {
107109
func TestPipelinePost(t *testing.T) {
108110
tr := &roundTripperRecorder{}
109111
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
110-
p := newPipeline(tr, picker, types.ID(1), types.ID(1), nil, &fakeRaft{}, nil)
112+
p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, nil)
111113
if err := p.post([]byte("some data")); err != nil {
112114
t.Fatalf("unexpect post error: %v", err)
113115
}
@@ -122,6 +124,12 @@ func TestPipelinePost(t *testing.T) {
122124
if g := tr.Request().Header.Get("Content-Type"); g != "application/protobuf" {
123125
t.Errorf("content type = %s, want %s", g, "application/protobuf")
124126
}
127+
if g := tr.Request().Header.Get("X-Server-Version"); g != version.Version {
128+
t.Errorf("version = %s, want %s", g, version.Version)
129+
}
130+
if g := tr.Request().Header.Get("X-Min-Cluster-Version"); g != version.MinClusterVersion {
131+
t.Errorf("min version = %s, want %s", g, version.MinClusterVersion)
132+
}
125133
if g := tr.Request().Header.Get("X-Etcd-Cluster-ID"); g != "1" {
126134
t.Errorf("cluster id = %s, want %s", g, "1")
127135
}
@@ -148,7 +156,7 @@ func TestPipelinePostBad(t *testing.T) {
148156
}
149157
for i, tt := range tests {
150158
picker := mustNewURLPicker(t, []string{tt.u})
151-
p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(1), types.ID(1), nil, &fakeRaft{}, make(chan error))
159+
p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, make(chan error))
152160
err := p.post([]byte("some data"))
153161
p.stop()
154162

@@ -169,7 +177,7 @@ func TestPipelinePostErrorc(t *testing.T) {
169177
for i, tt := range tests {
170178
picker := mustNewURLPicker(t, []string{tt.u})
171179
errorc := make(chan error, 1)
172-
p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(1), types.ID(1), nil, &fakeRaft{}, errorc)
180+
p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, errorc)
173181
p.post([]byte("some data"))
174182
p.stop()
175183
select {
@@ -227,5 +235,5 @@ func (t *roundTripperRecorder) Request() *http.Request {
227235

228236
type nopReadCloser struct{}
229237

230-
func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, nil }
238+
func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, io.EOF }
231239
func (n *nopReadCloser) Close() error { return nil }

rafthttp/remote.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func startRemote(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID,
3131
picker := newURLPicker(urls)
3232
return &remote{
3333
id: to,
34-
pipeline: newPipeline(tr, picker, to, cid, nil, r, errorc),
34+
pipeline: newPipeline(tr, picker, local, to, cid, nil, r, errorc),
3535
}
3636
}
3737

rafthttp/stream.go

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ package rafthttp
1717
import (
1818
"fmt"
1919
"io"
20+
"io/ioutil"
2021
"log"
2122
"net"
2223
"net/http"
2324
"path"
2425
"strconv"
26+
"strings"
2527
"sync"
2628
"time"
2729

@@ -389,6 +391,9 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
389391
cr.picker.unreachable(u)
390392
return nil, fmt.Errorf("new request to %s error: %v", u, err)
391393
}
394+
req.Header.Set("X-Server-From", cr.from.String())
395+
req.Header.Set("X-Server-Version", version.Version)
396+
req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
392397
req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String())
393398
req.Header.Set("X-Raft-To", cr.to.String())
394399
if t == streamTypeMsgApp {
@@ -425,10 +430,24 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
425430
resp.Body.Close()
426431
return nil, fmt.Errorf("local member has not been added to the peer list of member %s", cr.to)
427432
case http.StatusPreconditionFailed:
433+
b, err := ioutil.ReadAll(resp.Body)
434+
if err != nil {
435+
cr.picker.unreachable(u)
436+
return nil, err
437+
}
428438
resp.Body.Close()
429-
log.Printf("rafthttp: request sent was ignored due to cluster ID mismatch (remote[%s]:%s, local:%s)",
430-
uu.Host, resp.Header.Get("X-Etcd-Cluster-ID"), cr.cid)
431-
return nil, fmt.Errorf("cluster ID mismatch")
439+
440+
switch strings.TrimSuffix(string(b), "\n") {
441+
case errIncompatibleVersion.Error():
442+
log.Printf("rafthttp: request sent was ignored by peer %s (server version incompatible)", cr.to)
443+
return nil, errIncompatibleVersion
444+
case errClusterIDMismatch.Error():
445+
log.Printf("rafthttp: request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
446+
cr.to, resp.Header.Get("X-Etcd-Cluster-ID"), cr.cid)
447+
return nil, errClusterIDMismatch
448+
default:
449+
return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))
450+
}
432451
default:
433452
resp.Body.Close()
434453
return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
@@ -457,32 +476,6 @@ func isClosedConnectionError(err error) bool {
457476
return ok && operr.Err.Error() == "use of closed network connection"
458477
}
459478

460-
// serverVersion returns the version from the given header.
461-
func serverVersion(h http.Header) *semver.Version {
462-
verStr := h.Get("X-Server-Version")
463-
// backward compatibility with etcd 2.0
464-
if verStr == "" {
465-
verStr = "2.0.0"
466-
}
467-
return semver.Must(semver.NewVersion(verStr))
468-
}
469-
470-
// compareMajorMinorVersion returns an integer comparing two versions based on
471-
// their major and minor version. The result will be 0 if a==b, -1 if a < b,
472-
// and 1 if a > b.
473-
func compareMajorMinorVersion(a, b *semver.Version) int {
474-
na := &semver.Version{Major: a.Major, Minor: a.Minor}
475-
nb := &semver.Version{Major: b.Major, Minor: b.Minor}
476-
switch {
477-
case na.LessThan(*nb):
478-
return -1
479-
case nb.LessThan(*na):
480-
return 1
481-
default:
482-
return 0
483-
}
484-
}
485-
486479
// checkStreamSupport checks whether the stream type is supported in the
487480
// given version.
488481
func checkStreamSupport(v *semver.Version, t streamType) bool {

0 commit comments

Comments
 (0)