-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathHeartBeat.da
93 lines (73 loc) · 3.61 KB
/
HeartBeat.da
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""
Created on Sat Dec 1 18:17:36 2018
@author: risabh
"""
import time
config(channel is fifo, clock is lamport)
class HeartBeat(process):
def setup(master:Master):
output('Setting up HeartBeat')
#Stores the set of heartBeats recieved in each call
self.heartBeatFromChunks = set()
#Stores the list of available chunkServers to which HeartBeat needs to poll
self.availableChunkServers = set()
#Map which stores the list of files send from chunkServer in a call
self.filesList = {}
self.master = master
self.counter=0
# Number of cycles in which we want garbage collection to happen
self.timeForGarbage =1
def sendHeartBeat():
'''
Helper method to send HeartBeat message to all chunkServers and await their
replies and then send it to master for updates.
'''
output('Inside Send Heart Beat')
self.heartBeatFromChunks = set()
nonAvailableChunkServers =set()
counter+=1
#Obatain the list of chunkServers from master
if len(self.availableChunkServers)==0:
send(('GET_AVAILABLE_CHUNK_SERVERS',), to = master)
await(len(self.availableChunkServers)!=0)
for chunkServer in self.availableChunkServers:
self.filesList[chunkServer] = []
# Send heartbeat message to all the chunkServers and await their replies
for chunkServer in self.availableChunkServers:
send(('HEARTBEAT',), to = chunkServer)
if await(len(self.availableChunkServers) == len(self.heartBeatFromChunks)):
output('All chunk servers are responding')
elif timeout(20):
output('Chunk Servers available', self.heartBeatFromChunks)
nonAvailableChunkServers = self.availableChunkServers - self.heartBeatFromChunks
output("Chunk servers not available", nonAvailableChunkServers)
for chunkServer in nonAvailableChunkServers:
self.filesList[chunkServer] = []
#Updates master with list of chunkServers which are not available
send(('UPDATE_AVAILABLE_CHUNK_SERVERS', nonAvailableChunkServers), to = master)
if counter% self.timeForGarbage==0:
#Send the list of file obtained from chunkServers to master to be checked for garbage collection
send(('DELETE_ORPHANED_CHUNKS', self.filesList), to=master)
for chunkServer in self.availableChunkServers:
self.filesList[chunkServer] = []
def receive(msg=('AVAILABLE_CHUNK_SERVERS', availableChunkServers), from_=master):
'''
Receive handler to get the availableChunkServers from master
'''
self.availableChunkServers = availableChunkServers
def receive(msg=('HEARTBEAT_ACKNOWLEDGED',files), from_=chunkServer):
'''
Receive handler for Heartbeat acknowledgement from the chunkServers and
adding the list of files sent from ChunkServer to be checked for existence
'''
output("Heart beat received from", chunkServer)
self.heartBeatFromChunks.add(chunkServer)
localFileList = []
for file in files:
localFileList.append(file[:-4])
self.filesList[chunkServer].extend(localFileList)
self.filesList[chunkServer] = list(set(self.filesList[chunkServer]))
def run():
while True:
sendHeartBeat()
time.sleep(10) #Running heartbeat process every 10 seconds