Gossip Glomers: Broadcast
I've been playing around with the Gossip Glomers distributed systems challenges for fun and to remind myself of a more visceral sense of the CAP theorem. I've been using babashka for fun - mostly to learn more about babashka, although I found that maelstrom (a system for running tests on toy distributed systems) really prefers it if you have a single binary. It accepts a single file grudgingly, but if you try to split up your file into, say, a shared library and a script that uses it, it will be quite displeased. That complicates things a little, and of course makes it impossible to use, say, bb.edn
as well. Oh well.
I have a working script for the no partion state of Broadcast, which I'll reproduce below:
#!/usr/bin/env bb
;; The boilerplate! This should stay pretty much the same for the entire
;; maelstrom challenge. (Maelstrom really wants a single file binary, so I can't
;; put this in a separate file as a library.)
(ns broadcast
(:require [cheshire.core :as json]
[clojure.java.io :as io]
[clojure.set :as set]))
(defn log [& args]
(binding [*out* *err*]
(apply println args)))
;; Keeps track of the node ID
(def node-id (promise))
;; A registry of promises for req/resp
(def response-promises (atom {}))
(def message-id (atom 0))
(defn next-message-id []
(swap! message-id inc))
(def pending-broadcasts (atom {}))
;; Fundamentally, we're building something that will process lines coming in. It
;; will generate lines coming out, but we don't necessarily want to process an
;; input completely before output is generated.
;;
;; (For example, we might want to send a message and receive a response in the
;; process of handling another request.)
(defn- process-stdin
"Read lines from the stdin and calls the handler"
[handler]
(->> *in*
io/reader
line-seq
(map #(json/parse-string % true))
(map #(future (handler %)))
doall))
;; Note that we're using stdin for two purposes:
;;
;; - new requests coming in
;; - responses to requests that we've sent out
;;
;; So we want our handler to either:
;; - call our handler, OR
;; - call a registered reply dispatch function
(defn dispatch-reply [reply-to req]
(let [p (get @response-promises reply-to)]
(cond
(nil? p) (throw (ex-info "No registered promise" {:reply-to reply-to :req req}))
(= "error" (get-in req [:body :type])) (deliver p (ex-info (:text (:body req)) req))
:else (deliver p (:body req)))))
(defn reply-dispatch-middleware [handler]
(fn [req]
(if-let [reply-to (get-in req [:body :in_reply_to])]
(do (dispatch-reply reply-to req) nil) ;; do not reply to replies!
(handler req))))
;; We need a way to send an outgoing message. This function just sends and
;; forgets.
(defn send! [to body]
(log :send {:to to :body body})
(println (json/generate-string
{:src @node-id
:dest to
:body body})))
;; This function sends an outgoing message, and synchronously returns the result
;; body. It throws an exception if no response is received in `timeout-ms`.
(defn rpc! [to type body {:keys [timeout-ms]}]
(let [message-id (next-message-id)
body (merge body {:msg_id message-id
:type type})
p (promise)]
(swap! response-promises assoc message-id p)
(send! to body)
(let [result (deref p timeout-ms ::timeout)]
(if (= result ::timeout)
(throw (ex-info (format "Timeout in %s" type) {:body body
:timeout-ms timeout-ms
:type type
:to to}))))))
;; This middleware allows our handler to return a response body and have it
;; automatically reply
(defn reply-middleware [handler]
(fn [req]
(let [response (handler req)]
(when response
(send! (:src req)
(assoc response :in_reply_to (get-in req [:body :msg_id])))))))
(defn log-middleware [handler]
(fn [msg]
(log :recv msg)
(let [result (handler msg)]
result)))
(defn body-snatcher-middleware [handler]
(fn [msg]
(handler (:body msg))))
(defn exception-middleware [handler]
(fn [req]
(try
(handler req)
(catch Exception e
{:type "error"
:code 13
:text (ex-message e)}))))
(defn make-handler [f]
(-> f body-snatcher-middleware log-middleware reply-dispatch-middleware exception-middleware reply-middleware))
(defn start! [handler]
(map deref (process-stdin (make-handler handler))))
;; Finally, we'll define our handler - a multimethod so that we can dispatch on type:
(defmulti handler (fn [{type :type}] (keyword type)))
;; Processing the `init` request, we just reply with a body.
(defmethod handler :init [{id :node_id}]
(deliver node-id id)
{:type :init_ok})
;; Okay, boilerplate is out of the way.
;;
;; We now need to define a handler that will properly handle `read`,
;; `broadcast`, and `topology` type messages.
;;
;; Now we need a way to track the messages that we've received (the entire point
;; of the exercise!), we can just use an in-memory atom because maelstrom won't
;; crash nodes.
(def messages (atom #{}))
;; We also need a way to track our neighbors - maelstrom will tell us the
;; topology we're working with.
;;
;; My assumption - maybe wrong - is that maelstrom is guaranteed to do this
;; before we get any other messages.
(def topology (promise))
(defn neighbors [node]
(or (get @topology node)
(throw (ex-info "Missing neighbors for node!" {:node node}))))
(defmethod handler :topology [body]
(deliver topology (->> (:topology body)
(map (fn [[k v]] [(name k) (set v)]))
(into {})))
{:type :topology_ok})
;; Our `read` handler is very simple, just return the list of messages we've
;; received.
(defmethod handler :read [_]
{:type :read_ok
:messages (into [] @messages)})
;; Now, let's consider how our broadcast algorithm can work. Let's walk through
;; a few options. First, we could just rebroadcast every broadcast message we
;; receive! What would happen then?
;;
;; A receives a broadcast
;; A->B Broadcast
;; B->A Broadcast
;; A->B Broadcast
;; ...
;;
;; An infinite loop! Not great.
;;
;; Okay, another option: each node could rebroadcast, but include a set of
;; `seen` nodes that it appends itself to.
;;
;;
;; A receives a broadcast
;; A->B Broadcast, seen: `a`
;; A->C Broadcast, seen: `a`
;; B->C Broadcast, seen: `a`, `b`
;; C->B Broadcast, seen: `a`, `c`
;;
;;
;; This is much better. But note that if each node is connected to every other
;; node, a broadcast will result in N-1 messages being received by every
;; node (where N is the number of nodes).
;;
;; Can we further reduce the number of unnecessary messages?
;;
;; Also, we should decide: what does broadcasting *mean*? Is a message
;; considered to be broadcast after it goes out to every node? To a single node?
;;
;; Let's make this implementation ignore network issues completely. We'll define
;; a successful broadcast as one that has gone out to all neighbors.
;;
;; So let's say we have the topology:
;;
;;
;; a: b,c
;; b: a,c
;; c: a,b
;;
;;
;;
;; A receives a broadcast
;; A->B broadcast
;; B receives a broadcast, seen: `a`
;; B->C broadcast
;; C receives a broadcast, seen: `a`, `b`
;; C responds with `broadcast_ok`, seen: `a`, `b`, `c`
;; B responds with `broadcast_ok`, seen: `a`, `b`, `c`
;; A responds with `broadcast_ok`, seen: `a`, `b`, `c`
;;
;;
;; Let's try this.
;;
;; We'll use an internal broadcast message type, because we can't add data (like
;; `seen`) to the maelstrom type.
(defmethod handler :internal/broadcast [{seen :seen message :message}]
(swap! messages conj message)
(loop [neighbors (neighbors @node-id)
seen-set (conj (set seen) @node-id)]
(cond
(empty? neighbors)
{:seen seen-set
:type :broadcast_ok}
(contains? seen-set (first neighbors))
(recur (next neighbors) seen-set)
:else
(let [{newly-seen :seen} (rpc! (first neighbors)
:internal/broadcast
{:seen seen-set :message message}
{:timeout-ms 1000})]
(recur (next neighbors)
(set/union seen-set newly-seen))))))
;; The actual handler just makes a synchronous RPC call to ourselves with the
;; `internal/broadcast` type.
(defmethod handler :broadcast [{message :message}]
(rpc! @node-id :internal/broadcast
{:message message
:seen #{}}
{:timeout-ms 1000})
{:type "broadcast_ok"})
(defmethod handler :echo [body]
(assoc body :type "echo_ok"))
(defn -main [& args]
(start! handler)
(Thread/sleep 100))
(-main)
So it's a pretty simple little script overall. The most complex bit is definitely the handling of outgoing request/responses, because we're using a single channel (stdin) for both incoming requests and responses to our requests.
For that, we have an atom that holds a registry of message ids pointing to promises. We have a middleware that checks to see if an incoming request is a reply to a message. If it is, it looks up the promise for that reply in the registry and delivers the body to that promise.
So - the problem with the current state of the application is pretty clear: when we receive a broadcast message, we are synchronously broadcasting the message to all unseen neighbors. They synchronously broadcast to all their unseen neighbors, who... etc.
We have ironclad consistency. But we don't have high availability (if one node goes down, they'll all be unable to broadcast) and we don't have partition tolerance (if any two neighbors can't talk to each other, no one will be able to broadcast).
What's the solution?
Well, the CAP theorem is going to give us a hint: it tells us that we can either have partition tolerance and availability or partition tolerance and consistency, but not both.
So, a few options that are in the possible solution space:
- we could simply respond with an error whenever the network is partitioned! This "solves" the problem, in that now we're unavailable whenever the network is partitioned. But it doesn't solve the problem in reality, because an implicit requirement of maelstrom is availability. Maybe this would be okay in some real life systems, though.
- we can accept inconsistent state and seek "eventual consistency" once the partition is healed.
So how do we do this?
One simple option is to just make our neighbor-broadcasts asynchronous and add retries.
- make a list of neighbors we need to broadcast to
- broadcast to them, with a retry + exponential backoff
Eventually, when the partition is healed, we'll tell the neighbor about our message.
This is going to be a little less efficient in terms of total number of messages than our current solution. Why? Because with a synchronous broadcast, we can tell the requester exactly who we broadcasted to. With async broadcast, we could tell the requester something - but since they aren't waiting for our response, there's less point.
So if our topology looks like
a: b,c
b: a,c
c: a,b
Then it should go something like:
A->B broadcast, seen: A
A->C broadcast, seen: A
B->C broadcast, seen: A, B
C->B broadcast, seen: A, C
Let's try it out.
First, we need to write a quick function to do automatic retries. As I always do when I can when writing macros, I split the macro logic up into two parts:
- a dead simple macro that I can almost guarantee is free of bugs just by looking at it
- a more complex function
This way, if we've done something silly, it's easier to debug.
We'll use a super simple exponential backoff. Try up to N times with delays of 1.5 seconds, 2.25 seconds, 3.375 seconds, 5.0625 seconds, 7.59375 seconds, ...
(defn with-retry-policy-fn [tries-allowed current-try f]
(try
(f)
(catch Exception e
(if (<= (- tries-allowed current-try) 0)
(throw e)
(do
(Thread/sleep (* 1000 (reduce * (repeat current-try 1.5))))
(with-retry-policy-fn tries-allowed (inc current-try) f))))))
(defmacro with-retry-policy [tries-allowed & forms]
`(with-retry-policy-fn ~tries-allowed 0 (fn [] ~@forms)))
Now we can define our async-broadcast
handler. It's pretty simple: just add the message to our own persistent store (aka an atom with a set), and then asyncronously tell each of our neighbors about it, unless we know they've already heard. We'll use our retry macro to make sure we're "eventually" consistent.
(defmethod handler :internal/async-broadcast [{:keys [seen message]}]
(swap! messages conj message)
(let [seen-set (conj (set seen) @node-id)
neighbors (neighbors @node-id)
unseen-neighbors (set/difference neighbors seen-set)]
(doseq [neighbor unseen-neighbors]
(future
(with-retry-policy 10
(rpc! neighbor
:internal/async-broadcast
{:seen seen-set
:message message}
{:timeout-ms 1000})))))
{:accepted true})
Running maelstrom:
$ ./maelstrom/maelstrom test -w broadcast --bin ./broadcast.clj --node-count 5 --time-limit 20 --rate 10 --nemesis partition
...
Everything looks good! ヽ(‘ー`)ノ
Awesome!
Now, on to the next part of the challenge: Challenge #3d: Efficient Broadcast, Part I
.
Efficiency... hmm. That's something we didn't consider too much in this implementation.
Each node is sending every broadcast to every neighbor.
Say our nodes are A-B-C-D and all are connected.
A gets a broadcast. It forwards it to B, C, and D.
It sends it:
- A->B
- A->C
- A->D
Then:
- B forwards A->B to C and D.
- C forwards A->B to B and D.
- D forwards A->D to B and C.
Then:
- C forwards the message from A->B->C to D.
- C forwards the message from A->D->C to B.
- B forwards the message from A->D->B to C.
- B forwards the message from A->C->B to D.
- D forwards the message from A->C->D to B.
- D forwards the message from A->B->D to C.
That's 15 messages to propagate a broadcast between our 4 nodes! This is excessive.
How can we cut this down?
One kind of silly option would be to adjust the topology so that, rather than connecting every node to every other node, we just connect all our nodes in a chain: A->B->C->D
.
A gets a broadcast. It forwards it:
- A->B
- B->C
- C->D
Now we only have 3 messages!
Of course, the problem here is that we lose out on resiliency: if A only tells B, then if the network is partitioned and A can no longer talk to B, or if B goes down, we have an issue.
Here's another option:
- A gets a broadcast "0"
- A tells B: broadcast "0", but I am responsible for telling C/D.
- A tells C: broadcast "0", but I am responsible for telling B/D.
- A tells D: broadcast "0", but I am responsible for telling B/C.
Then we retry each of those messages.
;; define a promise to hold every node in the system
(def node-ids (promise))
;; Processing the `init` request, we now deliver the node-ids promise
(defmethod handler :init [{ids :node_ids id :node_id}]
(deliver node-id id)
(deliver node-ids (set ids))
{:type :init_ok})
;; Send the messages to every other node, telling each of them that
;; *we* are responsible for telling the other nodes.
(defmethod handler :internal/fast-async-broadcast [{:keys [seen message]}]
(swap! messages conj message)
(let [seen-set (conj (set seen) @node-id)
neighbors (disj @node-ids @node-id)
unseen-neighbors (set/difference neighbors seen-set)]
(doseq [neighbor unseen-neighbors]
(future
(with-retry-policy 10
(rpc! neighbor
:internal/fast-async-broadcast
{:seen (set/union seen-set unseen-neighbors)
:message message}
{:timeout-ms 1000})))))
{:accepted true})
(defmethod handler :broadcast [{message :message}]
(rpc! @node-id
:internal/fast-async-broadcast
{:message message}
{:timeout-ms 1000})
{:type "broadcast_ok"})
Running this, we get:
...
:servers {:send-count 42200,
:recv-count 42200,
:msg-count 42200,
:msgs-per-op 25.0},
...
:stable-latencies {0 0, 0.5 84, 0.95 98, 0.99 102, 1 107},
Not too bad - 25 messages per op, and a median latency under 100ms in the presence of constant 100ms latency!
Note that msgs-per-op
is a bit confusing, for two reasons:
- on one hand, about half the ops are reads, which don't require any additional messages, so we should double the statistic to get the true number of messages per broadcast
- on the other hand, a request/response pair has two messages! So just remember: this isn't the number of requests! It's the number of messages.
So in our case, we're sending 25 messages per op total, which is about 50 messages per broadcast, which is about 25 requests per broadcast.
Note that, if we introduce a network partition, things still work!
Okay, on to the next part of the challenge:
With the same node count of 25 and a message delay of 100ms, your challenge is to achieve the following performance metrics:
- Messages-per-operation is below 20
- Median latency is below 1 second
- Maximum latency is below 2 seconds
Okay, this is interesting: we are trying to reduce the number of messages per operation, but we're allowed to increase the median and worst case latency a bit! Note that the latency here is NOT the latency for the request/response itself. Instead:
These latencies are measured from the time a broadcast request was acknowledged to when it was last missing from a read on any node.
One interesting bit is that we now have messages per operation of <20, fewer than our number of nodes. That's kind of wild, right? To "complete" a broadcast, every node needs to know about the message that was broadcasted. So every node needs to receive the broadcast. So, one might think, the very best case scenario should be that we send 24 messages in order to broadcast one message.
How can we accomplish our goal despite this?
Let's think about some things we can do to reduce the number of messages.
- we could eliminate responses! Just fire and forget our broadcasts, baby! Unfortunately, then we can't determine whether we need to retry or not, so we either give up fault tolerance or just blindly retry everything, which, lol, doesn't reduce the number of messages.
- we could batch our requests! Instead of immediately sending the other nodes our message, just make a note: "I need to send message 123 to node n5 later." Then, every 5 seconds or so, send all pending messages to node n5.
- we could batch our responses: make requests fire-and-forget, but make a note: "I am expecting a response from node n5 that they received message 123." Every 5 seconds or so, each node sends all pending acknowledgements at once: "n0, I received messages 123, 124, 129, and 331."
Let's try batching requests first.
First, we need a way to track our pending broadcasts. We'll make it a map of node-id->set-of-messages
.
(def pending-broadcasts (atom {}))
Our handler just records that we need to send the message later.
(defmethod handler :internal/batch-broadcast [{:keys [message]}]
(swap! messages conj message)
(swap! pending-broadcasts (partial merge-with set/union) (zipmap (vec (disj @node-ids @node-id)) (repeat #{message})))
{:accepted true})
We need another handler that will read multiple messages at once. It doesn't worry about rebroadcasting: for every broadcast, the original recipient is responsible for broadcasting to every node.
(defmethod handler :broadcast/norecurse-multi [{new-messages :messages}]
(swap! messages set/union new-messages)
{:done true})
Of course, our :broadcast
handler now needs to call our batched broadcast handler:
(defmethod handler :broadcast [{message :message}]
(future (handler {:message message
:type "internal/batch-broadcast"}))
{:type "broadcast_ok"})
Finally, we'll define a function to start a worker. Every 1.5 seconds, it'll send pending broadcasts to every other node:
(defn start-worker! []
(loop []
(Thread/sleep 1500)
(doseq [[node messages] @pending-broadcasts]
(swap! pending-broadcasts dissoc node)
(future
(with-retry-policy 10
(rpc! node :broadcast/norecurse-multi {:messages messages} {:timeout-ms 2000}))))
(recur)))
(Note the stupid dangerous thing we're doing above - removing the node from pending broadcasts after we get the messages. If another thread happened to be adding messages at the same moment we're looping through, which is *entirely possible*, we'll possibly remove the new messages without ever broadcasting them. This is pretty easily fixable, but for now I'm just going to leave it if it's not causing problems in practice.)
This yields messages-per-op of 13.9 (well below our limit of 20 messages per op), a median stable latency of 836ms, and a maximum stable latency of 1.6 seconds. These are well below our requirements!
:net {:all {:send-count 27500,
:recv-count 27500,
:msg-count 27500,
:msgs-per-op 15.951276},
:clients {:send-count 3548, :recv-count 3548, :msg-count 3548},
:servers {:send-count 23952,
:recv-count 23952,
:msg-count 23952,
:msgs-per-op 13.893271},
:valid? true},
...
:stable-latencies {0 0,
0.5 836,
0.95 1512,
0.99 1566,
1 1598},
It should be quite possible to improve these further, by batching responses as well.
We'll stop there for now. Next time maybe we can take on the next challenge.