Skip to content

Commit

Permalink
Pass target-roots to CHECK-PRIORITY (#48)
Browse files Browse the repository at this point in the history
* Pass the debug? flag when sowing nodes

* Pull one-root cluster aborting check into the root collection step

* Pass target-roots to CHECK-PRIORITY

* Don't forget to HALT...

* Or to pop off the data frame...
  • Loading branch information
karalekas authored Nov 18, 2023
1 parent 46410c0 commit 84ab978
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 44 deletions.
3 changes: 2 additions & 1 deletion src/dryad.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ NOTE: In the basic implementation, these messages must be waiting for the DRYAD
(let* ((node-id (message-sow-id message))
(node-process (spawn-process (dryad-node-class dryad)
:dryad (process-public-address dryad)
:id node-id))
:id node-id
:debug? (process-debug? dryad)))
(node-address (process-public-address node-process)))
(schedule node-process now)
(setf (gethash node-address (dryad-ids dryad)) node-id
Expand Down
91 changes: 48 additions & 43 deletions src/operations/multireweight.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -51,65 +51,70 @@
;;;

(define-process-upkeep ((supervisor supervisor) now) (START-MULTIREWEIGHT pong)
"Sets up the multireweight procedure.
1. Collect the mutually held roots for the `HOLD-CLUSTER'
2. Lock the `HOLD-CLUSTER' and check the rootiness of each root.
3. Change the pingability of the cluster to `:SOFT'.
4. Scan the `HOLD-CLUSTER' for the best external rec to use for reweighting.
5. Reweight the `HOLD-CLUSTER' according to the recommendation.
6. Check to see if the `HOLD-CLUSTER' should be rewound, and do so if need be.
7. Unlock the targets and tear down transient state."
"Sets up the multireweight procedure by first collecting mutually-held roots, which form the `HOLD-CLUSTER'."
;; NOTE: we couldn't call MAKE-PONG even if we wanted to, since we don't have
;; access to the underlying node's Lisp object (or its type).
(push (make-data-frame-multireweight :internal-pong nil)
(process-data-stack supervisor))
(with-slots (root-bucket source-root) pong
(setf root-bucket (remove-duplicates root-bucket :test #'address=))
(process-continuation supervisor
`(CONVERGECAST-COLLECT-ROOTS ,(list source-root) ,root-bucket)
`(CHECK-PRIORITY ,source-root)
`(START-INNER-MULTIREWEIGHT)
`(CONVERGECAST-COLLECT-ROOTS ,source-root ,root-bucket)
`(FINISH-MULTIREWEIGHT)
`(HALT))))

(define-process-upkeep ((supervisor supervisor) now)
(CONVERGECAST-COLLECT-ROOTS cluster roots)
"Recursively collects the `HELD-BY-ROOTS' values of `ROOTS' to determine the set of roots that are participating in this `HOLD' cluster (meaning that they are mutually held by each other), starting with a base `CLUSTER' of just the `SOURCE-ROOT'. If any replies are NIL, we abort."
(with-slots (hold-cluster) (peek (process-data-stack supervisor))
(flet ((payload-constructor ()
(make-message-convergecast-collect-roots :hold-cluster cluster)))
(with-replies (replies :returned? returned?)
(send-message-batch #'payload-constructor roots)
(when (some #'null replies)
(log-entry :entry-type 'aborting-multireweight
:reason 'root-collection-failed
:hold-cluster cluster
:held-by-roots roots)
(setf (process-lockable-aborting? supervisor) t)
(finish-with-scheduling))
(setf hold-cluster (reduce #'address-union (list* cluster replies)))))))

(define-process-upkeep ((supervisor supervisor) now) (CHECK-PRIORITY original-root)
"Confirm that, of the roots in the hold cluster, we have priority to act. Namely, we have priority when our `ORIGINAL-ROOT' carries the minimum ID of all the roots in the cluster."
(with-slots (hold-cluster) (peek (process-data-stack supervisor))
;; don't bother _multi_reweighting if we're in a cluster of 1.
(when (endp (rest hold-cluster))
(log-entry :entry-type 'aborting-multireweight
:reason 'cluster-of-one
:hold-cluster hold-cluster)
(setf (process-lockable-aborting? supervisor) t)
(finish-with-futures))
(sync-rpc (make-message-id-query)
(original-id original-root)
(CONVERGECAST-COLLECT-ROOTS source-root root-bucket)
"Recursively collects the `HELD-BY-ROOTS' values of `ROOT-BUCKET' to determine the set of roots that are participating in this `HOLD' cluster (meaning that they are mutually held by each other), starting with a base `cluster' of just the `SOURCE-ROOT'. If any replies are NIL, we abort."
(let ((cluster (list source-root)))
(with-slots (hold-cluster) (peek (process-data-stack supervisor))
(flet ((payload-constructor ()
(make-message-convergecast-collect-roots :hold-cluster cluster)))
(with-replies (replies :returned? returned?)
(send-message-batch #'payload-constructor root-bucket)
(when (some #'null replies)
(log-entry :entry-type 'aborting-multireweight
:reason 'root-collection-failed
:hold-cluster cluster
:held-by-roots root-bucket)
(setf (process-lockable-aborting? supervisor) t)
(finish-with-scheduling))
(setf hold-cluster (reduce #'address-union (list* cluster replies)))
;; don't bother _multi_reweighting if we're in a cluster of 1.
(when (endp (rest hold-cluster))
(log-entry :entry-type 'aborting-multireweight
:reason 'cluster-of-one
:hold-cluster hold-cluster)
(setf (process-lockable-aborting? supervisor) t)
(finish-with-futures))
;; otherwise, push the next set of commands onto the stack
(process-continuation supervisor
`(CHECK-PRIORITY ,source-root ,hold-cluster)
`(START-INNER-MULTIREWEIGHT)))))))

(define-process-upkeep ((supervisor supervisor) now)
(CHECK-PRIORITY source-root target-roots)
"Confirm that, of the roots in the hold cluster, we have priority to act. Namely, we have priority when our `SOURCE-ROOT' carries the minimum ID (i.e. coordinate) of all the roots in the `hold-cluster' (passed as `TARGET-ROOTS')."
(let ((hold-cluster target-roots))
(sync-rpc (make-message-id-query) (source-id source-root)
(with-replies (replies)
(send-message-batch #'make-message-id-query hold-cluster)
(let ((cluster-id (reduce #'min-id replies)))
(unless (equalp original-id (min-id original-id cluster-id))
(unless (equalp source-id (min-id source-id cluster-id))
(setf (process-lockable-aborting? supervisor) t)))))))

(define-process-upkeep ((supervisor supervisor) now) (START-INNER-MULTIREWEIGHT)
"This is the start of the \"critical segment\", where it begins to be impossible to rewind partway through the modifications we're about to make."
"This is the start of the \"critical segment\", where it begins to be impossible to rewind partway through the modifications we're about to make.
1. Lock the `HOLD-CLUSTER'.
2. Check that each root in the `HOLD-CLUSTER' is still a root.
3. Change the pingability of the cluster to `:SOFT'.
4. Scan the `HOLD-CLUSTER' for the best external rec to use for reweighting.
5. Change the pingability of the cluster to `:NONE'.
6. Reweight the `HOLD-CLUSTER' according to the recommendation.
7. Change the pingability of the cluster to `:SOFT'.
8. Check to see if the `HOLD-CLUSTER' should be rewound, and do so if need be.
9. Unlock the targets and tear down transient state."
(with-slots (hold-cluster) (peek (process-data-stack supervisor))
(cond
((not (process-lockable-aborting? supervisor))
Expand All @@ -122,7 +127,7 @@
`(MULTIREWEIGHT-BROADCAST-REWEIGHT ,hold-cluster)
`(BROADCAST-PINGABILITY ,hold-cluster :SOFT)
`(MULTIREWEIGHT-CHECK-REWINDING ,hold-cluster)
`(BROADCAST-UNLOCK))) ; don't destroy trees
`(BROADCAST-UNLOCK))) ; don't destroy trees
(t
(log-entry :entry-type 'aborting-multireweight
:reason 'previously-aborted
Expand Down

0 comments on commit 84ab978

Please sign in to comment.