@@ -33,6 +33,7 @@ type Partitions struct {
33
33
selected map [int ]bool
34
34
local map [int ]bool
35
35
remote map [int ][]string
36
+ disappeared map [int ][]string
36
37
numMissing int
37
38
readyClosed bool
38
39
shouldAdvertise bool
@@ -55,6 +56,7 @@ func WatchPartitions(zkWatcher *zk.Watcher, peers *Peers, db, version string, nu
55
56
replication : replication ,
56
57
local : make (map [int ]bool ),
57
58
remote : make (map [int ][]string ),
59
+ disappeared : make (map [int ][]string , 1024 ),
58
60
}
59
61
60
62
p .pickLocal ()
@@ -69,6 +71,19 @@ func WatchPartitions(zkWatcher *zk.Watcher, peers *Peers, db, version string, nu
69
71
return p
70
72
}
71
73
74
+ // Dedupelicates elements in a slice of strings.
75
+ func dedupe (nodes []string ) []string {
76
+ found := map [string ]bool {}
77
+ dedupedNodes := make ([]string , 0 , len (nodes ))
78
+ for _ , node := range nodes {
79
+ if ! found [node ] {
80
+ found [node ] = true
81
+ dedupedNodes = append (dedupedNodes , node )
82
+ }
83
+ }
84
+ return dedupedNodes
85
+ }
86
+
72
87
// pickLocal selects which partitions are local by iterating through
73
88
// them all, and checking the hashring to see if this peer is one of the
74
89
// replicas.
@@ -107,17 +122,20 @@ func (p *Partitions) sync(updates chan []string) {
107
122
}
108
123
109
124
// FindPeers returns the list of peers who have the given partition available.
110
- func (p * Partitions ) FindPeers (partition int ) []string {
111
- if p .peers == nil {
112
- return nil
113
- }
114
-
125
+ func (p * Partitions ) FindPeers (partition int ) ([]string , []string ) {
115
126
p .lock .RLock ()
116
127
defer p .lock .RUnlock ()
117
128
129
+ disappearedPeers := make ([]string , 1024 )
130
+ copy (disappearedPeers , p .disappeared [partition ])
131
+
132
+ if p .peers == nil {
133
+ return nil , disappearedPeers
134
+ }
135
+
118
136
peers := make ([]string , len (p .remote [partition ]))
119
137
copy (peers , p .remote [partition ])
120
- return peers
138
+ return peers , disappearedPeers
121
139
}
122
140
123
141
// Update updates the list of local partitions to the given list.
@@ -228,6 +246,25 @@ func (p *Partitions) updateRemote(nodes []string) {
228
246
}
229
247
}
230
248
249
+ for partitionId , partition := range p .remote {
250
+ disappearedPeers := make ([]string , len (partition ))
251
+ for _ , oldPeer := range partition {
252
+ found := false
253
+ for _ , newPeer := range remote [partitionId ] {
254
+ if newPeer == oldPeer {
255
+ found = true
256
+ }
257
+ }
258
+ if ! found {
259
+ disappearedPeers = append (disappearedPeers , oldPeer )
260
+ }
261
+ }
262
+ p .disappeared [partitionId ] = dedupe (append (disappearedPeers , p .disappeared [partitionId ]... ))
263
+ if len (p .disappeared [partitionId ]) >= 1024 {
264
+ p .disappeared [partitionId ] = p .disappeared [partitionId ][:1024 ]
265
+ }
266
+ }
267
+
231
268
p .remote = remote
232
269
p .updateMissing ()
233
270
}
0 commit comments