# Dependencies
- Akka
- Akka.FSharp
- Akka.Core
# Implementation
let system = System.create "Project2" (Configuration.load())
# Fully Connected Actors
let createFullActors (num: int) =
spawn system ("Actor" + num.ToString())
(fun mailbox ->
// Gossip count: number of times being called.
// Pushsum cnt: number of times not change consecutively.
let mutable count = 0
let mutable s = num |> double
let mutable w = 1 |> double
let mutable cnt = 0
let buildTime = Diagnostics.Stopwatch()
buildTime.Start()
let rec loop() = actor {
let! message = mailbox.Receive()
let sender = mailbox.Sender()
match box message with
| :? int as msg ->
printfn "%i" msg
| :? string as msg ->
printfn "%s" msg
| :? GossipMessage as msg ->
count <- count + 1
let mutable breakcnt = msg.breakcnt
let mutable nodeToBreak = msg.breaknode
let mutable curMsg = msg.totalmsg
curMsg <- curMsg + 1
// Uncomment for Period Failure Mode
// Suppose the nodes turned off/on every 100 messages, X = 100
// If breakcnt % X -> change boolean breakchange.
// That is from breakchange = false -> breakchange -> true or vise versa.
// If breakchange = true, then randomly pick a node to shut down
// Otherwise set shut down node = 0, meaning no node is shut down.
//
// if breakcnt % bin = 0 then
// if breakchange = true then
// breakchange <- false
// else
// breakchange <- true
// if breakchange = true then
//
// else
//
//
if breakcnt = 5 then
nodeToBreak <- msg.destination
//printfn "------------------"
printfn "%i" nodeToBreak
//printfn "------------------"
breakcnt <- breakcnt + 1
//printfn "Actor%i" msg.destination
if isGossipTerminate(count) then
printfn "msg: %i" curMsg
printfn "gossip_time: %ims" buildTime.ElapsedMilliseconds
else
//printfn $"{msg}"
let mutable destination = 0
if nodeToBreak = 0 then
destination <- Random().Next(1, msg.numNodes + 1)
while destination = msg.destination do
destination <- Random().Next(1, msg.numNodes + 1)
else
destination <- Random().Next(1, msg.numNodes + 1)
while destination = msg.destination || destination = nodeToBreak do
destination <- Random().Next(1, msg.numNodes + 1)
let newMessage = {breakcnt = breakcnt; breaknode = nodeToBreak;breakchange = msg.breakchange; totalmsg = curMsg; length = msg.length; numNodes = msg.numNodes; topology = msg.topology; algorithm = msg.algorithm; destination = destination}
let destActor = system.ActorSelection("akka://Project2/user/Actor" + destination.ToString())
destActor <! newMessage
| :? PushSumMessage as msg ->
let ns = s + msg.s
let nw = w + msg.w
let mutable curMsg = msg.totalmsg
curMsg <- curMsg + 1
if abs((double)(s/w) - (double)(ns/nw)) < (double)(10.0**(-10.0)) then
cnt <- cnt + 1
else
cnt <- 0
if cnt = 3 then
printfn "%i" curMsg
printfn "time: %ims" buildTime.ElapsedMilliseconds
else
s <- ns
w <- nw
let mutable breakcnt = msg.breakcnt
let mutable nodeToBreak = msg.breaknode
if breakcnt = 3 then
nodeToBreak <- msg.destination
breakcnt <- breakcnt + 1
let mutable destination = 0
if nodeToBreak = 0 then
destination <- Random().Next(1, msg.numNodes + 1)
while destination = msg.destination do
destination <- Random().Next(1, msg.numNodes + 1)
else
destination <- Random().Next(1, msg.numNodes + 1)
while destination = msg.destination || destination = nodeToBreak do
destination <- Random().Next(1, msg.numNodes + 1)
let newMessage = {s = s/2.0; w = w/2.0; breakcnt = breakcnt; breaknode = nodeToBreak; totalmsg = curMsg; length = msg.length; numNodes = msg.numNodes; topology = msg.topology; algorithm = msg.algorithm; destination = destination}
s <- s/2.0
w <- w/2.0
let destActor = system.ActorSelection("akka://Project2/user/Actor" + destination.ToString())
Threading.Thread.Sleep(10)
destActor <! newMessage
| _ -> ()
return! loop()
}
loop()
)
# Line Actors
let createLineActors (num: int) =
spawn system ("Actor" + num.ToString())
(fun mailbox ->
// Gossip count: number of times being called.
// Pushsum cnt: number of times not change consecutively.
let mutable count = 0
let mutable s = num |> double
let mutable w = 1 |> double
let mutable cnt = 0
let buildTime = Diagnostics.Stopwatch()
buildTime.Start()
let rec loop() = actor {
let! message = mailbox.Receive()
let sender = mailbox.Sender()
match box message with
| :? int as msg ->
printfn "%i" msg
| :? string as msg ->
printfn "%s" msg
| :? GossipMessage as msg ->
count <- count + 1
let mutable breakcnt = msg.breakcnt
let mutable nodeToBreak = msg.breaknode
let mutable curMsg = msg.totalmsg
let mutable breakchange = msg.breakchange
curMsg <- curMsg + 1
//if curMsg % 10 = 0 then
// if breakchange = true then
// breakchange <- false
// nodeToBreak <- 0
// printfn "all connected"
// else
// breakchange <- true
// nodeToBreak <- msg.destination
// printfn "break: %i" nodeToBreak
if breakcnt = 3 then
nodeToBreak <- msg.destination
printfn "------------------"
printfn "%i" nodeToBreak
printfn "------------------"
breakcnt <- breakcnt + 1
//printfn "Actor%i" msg.destination
if isGossipTerminate(count) then
printfn "msg: %i" curMsg
printfn "gossip_time: %ims" buildTime.ElapsedMilliseconds
else
//printfn $"{msg}"
let mutable valid = true
let mutable destination = 0
if breakchange = false then
if msg.destination = 1 then
destination <- 2
elif msg.destination = msg.numNodes then
destination <- msg.numNodes - 1
else
let mutable dirs = Random().Next(1, 3)
if dirs = 1 then
destination <- msg.destination + 1
else
destination <- msg.destination - 1
else
if msg.destination = 1 then
if nodeToBreak <> 2 then
destination <- 2
else
printfn "%i" curMsg
printfn "force quit time: %ims" buildTime.ElapsedMilliseconds
valid <- false
elif msg.destination = msg.numNodes then
if nodeToBreak <> msg.numNodes - 1 then
destination <- msg.numNodes - 1
else
printfn "%i" curMsg
printfn "force quit time: %ims" buildTime.ElapsedMilliseconds
valid <- false
else
let mutable dirs = Random().Next(1, 3)
if dirs = 1 then
if nodeToBreak <> msg.destination + 1 then
destination <- msg.destination + 1
else
destination <- msg.destination - 1
else
if nodeToBreak <> msg.destination - 1 then
destination <- msg.destination - 1
else
destination <- msg.destination + 1
if valid = true then
printfn "%i" destination
let newMessage = {breakcnt = breakcnt; breaknode = nodeToBreak; breakchange = msg.breakchange; totalmsg = curMsg; length = msg.length; numNodes = msg.numNodes; topology = msg.topology; algorithm = msg.algorithm; destination = destination}
//printfn $"{newMessage}"
let destActor = system.ActorSelection("akka://Project2/user/Actor" + destination.ToString())
Threading.Thread.Sleep(10)
destActor <! newMessage
| :? PushSumMessage as msg ->
let mutable breakcnt = msg.breakcnt
let mutable nodeToBreak = msg.breaknode
let mutable curMsg = msg.totalmsg
curMsg <- curMsg + 1
if breakcnt = 20 then
nodeToBreak <- msg.destination
//printfn "------------------"
printfn "%i" nodeToBreak
//printfn "------------------"
breakcnt <- breakcnt + 1
let ns = s + msg.s
let nw = w + msg.w
if abs((double)(s/w) - (double)(ns/nw)) < (double)(10.0**(-10.0)) then
cnt <- cnt + 1
else
cnt <- 0
if cnt = 3 then
printfn "msg: %i" curMsg
printfn "time: %ims" buildTime.ElapsedMilliseconds
else
s <- ns
w <- nw
//printfn "Actor%i" msg.destination
let mutable destination = 0
let mutable valid = true
if nodeToBreak = 0 then
if msg.destination = 1 then
destination <- 2
elif msg.destination = msg.numNodes then
destination <- msg.numNodes - 1
else
let mutable dirs = Random().Next(1, 3)
if dirs = 1 then
destination <- msg.destination + 1
else
destination <- msg.destination - 1
else
if msg.destination = 1 then
if nodeToBreak <> 2 then
destination <- 2
else
printfn "%i" curMsg
printfn "force quit time: %ims" buildTime.ElapsedMilliseconds
valid <- false
elif msg.destination = msg.numNodes then
if nodeToBreak <> msg.numNodes - 1 then
destination <- msg.numNodes - 1
else
printfn "%i" curMsg
printfn "force quit time: %ims" buildTime.ElapsedMilliseconds
valid <- false
else
let mutable dirs = Random().Next(1, 3)
if dirs = 1 then
if nodeToBreak <> msg.destination + 1 then
destination <- msg.destination + 1
else
destination <- msg.destination - 1
else
if nodeToBreak <> msg.destination - 1 then
destination <- msg.destination - 1
else
destination <- msg.destination + 1
if valid = true then
let newMessage = {s = s/2.0; w = w/2.0; breakcnt = breakcnt; breaknode = nodeToBreak; totalmsg = curMsg; length = msg.length; numNodes = msg.numNodes; topology = msg.topology; algorithm = msg.algorithm; destination = destination}
//printfn $"{newMessage}"
s <- s/2.0
w <- w/2.0
let destActor = system.ActorSelection("akka://Project2/user/Actor" + destination.ToString())
//Threading.Thread.Sleep(1000)
destActor <! newMessage
| _ -> ()
return! loop()
}
loop()
)
# 3D Actors
let create3DActors (num: int) =
spawn system ("Actor" + num.ToString())
(fun mailbox ->
// Gossip count: number of times being called.
// Pushsum cnt: number of times not change consecutively.
let mutable count = 0
let mutable s = num |> double
let mutable w = 1 |> double
let mutable cnt = 0
let buildTime = new Diagnostics.Stopwatch()
buildTime.Start()
let rec loop() = actor {
let! message = mailbox.Receive()
let sender = mailbox.Sender()
match box message with
| :? int as msg ->
printfn "%i" msg
| :? string as msg ->
printfn "%s" msg
| :? GossipMessage as msg ->
count <- count + 1
let mutable breakcnt = msg.breakcnt
let mutable nodeToBreak = msg.breaknode
let mutable curMsg = msg.totalmsg
let mutable breakchange = msg.breakchange
curMsg <- curMsg + 1
if curMsg % 10 = 0 then
if breakchange = true then
breakchange <- false
nodeToBreak <- 0
printfn "all connected"
else
breakchange <- true
nodeToBreak <- msg.destination
printfn "break: %i" nodeToBreak
printfn "%i" nodeToBreak
//if breakchange = then
// nodeToBreak <- msg.destination
//printfn "------------------"
//printfn "%i" nodeToBreak
//printfn "------------------"
breakcnt <- breakcnt + 1
//printfn "Actor%i" msg.destination
if isGossipTerminate(count) then
printfn "%i" curMsg
printfn "time: %ims" buildTime.ElapsedMilliseconds
else
//printfn $"{msg}"
let mutable destination = 0
let length = msg.length
let node = msg.destination
let mutable dirs = 0
// 1 -> lower layer; 2 -> upper layer; 3 -> left; 4 -> right;
// 5 -> previous row; 6 -> next row;
if breakchange = false then
dirs <- Random().Next(1, 7)
if dirs = 1 then
if node <= length * length then
destination <- node + length * length
else
destination <- node - length * length
elif dirs = 2 then
if node > length * length * (length - 1) then
destination <- node - length * length
else
destination <- node + length * length
elif dirs = 3 then
if node % length = 1 then
destination <- node + 1
else destination <- node - 1
elif dirs = 4 then
if node % length = 0 then
destination <- node - 1
else
destination <- node + 1
elif dirs = 5 then
if node % (length * length) > 0 && node % (length * length) <= length then
destination <- node + length
else
destination <- node - length
else
if node % (length * length) > length * (length - 1) || node % (length * length) = 0 then
destination <- node - length
else
destination <- node + length
else
let mutable find = false
while find = false do
dirs <- Random().Next(1, 7)
if dirs = 1 then
if node <= length * length then
destination <- node + length * length
else
destination <- node - length * length
elif dirs = 2 then
if node > length * length * (length - 1) then
destination <- node - length * length
else
destination <- node + length * length
elif dirs = 3 then
if node % length = 1 then
destination <- node + 1
else destination <- node - 1
elif dirs = 4 then
if node % length = 0 then
destination <- node - 1
else
destination <- node + 1
elif dirs = 5 then
if node % (length * length) > 0 && node % (length * length) <= length then
destination <- node + length
else
destination <- node - length
else
if node % (length * length) > length * (length - 1) || node % (length * length) = 0 then
destination <- node - length
else
destination <- node + length
if destination <> nodeToBreak then
find <- true
let newMessage = {breakcnt = breakcnt; breaknode = nodeToBreak; breakchange = breakchange;totalmsg = curMsg;length = msg.length; numNodes = msg.numNodes; topology = msg.topology; algorithm = msg.algorithm; destination = destination}
//printfn $"{newMessage}"
printfn "dest:%i" destination
let destActor = system.ActorSelection("akka://Project2/user/Actor" + destination.ToString())
//Threading.Thread.Sleep(1000)
destActor <! newMessage
| :? PushSumMessage as msg ->
let ns = s + msg.s
let nw = w + msg.w
let mutable breakcnt = msg.breakcnt
let mutable nodeToBreak = msg.breaknode
let mutable curMsg = msg.totalmsg
curMsg <- curMsg + 1
if breakcnt = 20 then
nodeToBreak <- msg.destination
//printfn "------------------"
//printfn "%i" nodeToBreak
//printfn "------------------"
breakcnt <- breakcnt + 1
if abs((double)(s/w) - (double)(ns/nw)) < (double)(10.0**(-10.0)) then
cnt <- cnt + 1
else
cnt <- 0
if cnt = 3 then
printfn "%i" curMsg
printfn "time: %ims" buildTime.ElapsedMilliseconds
else
s <- ns
w <- nw
printfn "Actor%i" msg.destination
let mutable destination = 0
let length = msg.length
let node = msg.destination
let mutable dirs = 0
if nodeToBreak = 0 then
dirs <- Random().Next(1, 7)
// 1 -> lower layer; 2 -> upper layer; 3 -> left; 4 -> right;
// 5 -> previous row; 6 -> next row;
if dirs = 1 then
if node <= length * length then
destination <- node + length * length
else
destination <- node - length * length
elif dirs = 2 then
if node > length * length * (length - 1) then
destination <- node - length * length
else
destination <- node + length * length
elif dirs = 3 then
if node % length = 1 then
destination <- node + 1
else destination <- node - 1
elif dirs = 4 then
if node % length = 0 then
destination <- node - 1
else
destination <- node + 1
elif dirs = 5 then
if node % (length * length) > 0 && node % (length * length) <= length then
destination <- node + length
else
destination <- node - length
else
if node % (length * length) > length * (length - 1) || node % (length * length) = 0 then
destination <- node - length
else
destination <- node + length
else
let mutable find = false
while find = false do
dirs <- Random().Next(1, 7)
// 1 -> lower layer; 2 -> upper layer; 3 -> left; 4 -> right;
// 5 -> previous row; 6 -> next row;
if dirs = 1 then
if node <= length * length then
destination <- node + length * length
else
destination <- node - length * length
elif dirs = 2 then
if node > length * length * (length - 1) then
destination <- node - length * length
else
destination <- node + length * length
elif dirs = 3 then
if node % length = 1 then
destination <- node + 1
else destination <- node - 1
elif dirs = 4 then
if node % length = 0 then
destination <- node - 1
else
destination <- node + 1
elif dirs = 5 then
if node % (length * length) > 0 && node % (length * length) <= length then
destination <- node + length
else
destination <- node - length
else
if node % (length * length) > length * (length - 1) || node % (length * length) = 0 then
destination <- node - length
else
destination <- node + length
if destination <> nodeToBreak then
find <- true
let newMessage = {s = s/2.0; w = w/2.0; breakcnt = breakcnt; breaknode = nodeToBreak; totalmsg = curMsg; length = msg.length; numNodes = msg.numNodes; topology = msg.topology; algorithm = msg.algorithm; destination = destination}
//printfn $"{newMessage}"
s <- s/2.0
w <- w/2.0
let destActor = system.ActorSelection("akka://Project2/user/Actor" + destination.ToString())
//Threading.Thread.Sleep(1000)
destActor <! newMessage
| _ -> ()
return! loop()
}
loop()
)
# Test
- Push Sum Protocol
- Gossip Protocol