@@ -25,7 +25,7 @@ type partitions struct {
25
25
selected map [int ]bool
26
26
local map [int ]bool
27
27
remote map [int ][]string
28
- old map [int ][]string
28
+ disappeared map [int ][]string
29
29
numMissing int
30
30
ready chan bool
31
31
readyClosed bool
@@ -45,7 +45,7 @@ func watchPartitions(zkWatcher *zkWatcher, peers *peers, db, version string, num
45
45
replication : replication ,
46
46
local : make (map [int ]bool ),
47
47
remote : make (map [int ][]string ),
48
- old : make (map [int ][]string , 1024 ),
48
+ disappeared : make (map [int ][]string , 1024 ),
49
49
ready : make (chan bool ),
50
50
}
51
51
@@ -113,6 +113,19 @@ func (p *partitions) updateLocalPartitions(local map[int]bool) {
113
113
}
114
114
}
115
115
116
+ func (p * partitions ) deDupe (nodes []string ) []string {
117
+ found := map [string ]bool {}
118
+ dedupedNodes := make ([]string , len (nodes ))
119
+ for _ , node := range nodes {
120
+ if ! found [node ] {
121
+ found [node ] = true
122
+
123
+ dedupedNodes = append (dedupedNodes , node )
124
+ }
125
+ }
126
+ return dedupedNodes
127
+ }
128
+
116
129
func (p * partitions ) updateRemotePartitions (nodes []string ) {
117
130
if p .peers == nil {
118
131
return
@@ -131,23 +144,14 @@ func (p *partitions) updateRemotePartitions(nodes []string) {
131
144
}
132
145
}
133
146
134
- // Keep track of old peers in case zookeeper goes away.
135
147
for partitionId , partition := range remote {
136
148
newPartition := make ([]string , len (partition ))
137
149
copy (newPartition , partition )
138
150
139
- unDedupedPartition := append (newPartition , p .old [partitionId ]... )
140
- found := map [string ]bool {}
141
-
142
- // Shitty dedupe, iterate though the remote peers
143
- for _ , node := range unDedupedPartition {
144
- if ! found [node ] {
145
- found [node ] = true
146
- p .old [partitionId ] = append ([]string {node }, p .old [partitionId ]... )
147
- }
148
- }
149
- if len (p .old [partitionId ]) >= 1024 {
150
- p .old [partitionId ] = p .old [partitionId ][:1024 ]
151
+ unDedupedPartition := append (newPartition , p .disappeared [partitionId ]... )
152
+ p .disappeared [partitionId ] = p .deDupe (unDedupedPartition )
153
+ if len (p .disappeared [partitionId ]) >= 1024 {
154
+ p .disappeared [partitionId ] = p .disappeared [partitionId ][:1024 ]
151
155
}
152
156
}
153
157
@@ -243,23 +247,21 @@ func (p *partitions) partitionZKNode(partition int) string {
243
247
}
244
248
245
249
// getPeers returns the list of peers who have the given partition available.
246
- func (p * partitions ) getPeers (partition int ) []string {
247
- if p .peers == nil {
248
- return nil
249
- }
250
-
250
+ func (p * partitions ) getPeers (partition int ) ([]string , []string ) {
251
251
p .lock .RLock ()
252
252
defer p .lock .RUnlock ()
253
253
254
+ disappearedPeers := make ([]string , 1024 )
255
+ copy (disappearedPeers , p .disappeared [partition ])
256
+
257
+ if p .peers == nil {
258
+ return nil , disappearedPeers
259
+ }
260
+
254
261
peers := make ([]string , len (p .remote [partition ]))
255
262
copy (peers , p .remote [partition ])
256
263
257
- oldPeers := make ([]string , 1024 )
258
- copy (oldPeers , p .old [partition ])
259
- // Append old peers to peer list, in case of Zookeeper issues.
260
- peers = append (peers , oldPeers ... )
261
-
262
- return peers
264
+ return peers , disappearedPeers
263
265
}
264
266
265
267
// partitionId returns a string id for the given partition, to be used for the
0 commit comments