From 84ab97807168e119d8e2c391e46a7317b86f6f84 Mon Sep 17 00:00:00 2001 From: Peter Karalekas Date: Sat, 18 Nov 2023 12:07:46 -0800 Subject: [PATCH] Pass target-roots to `CHECK-PRIORITY` (#48) * Pass the debug? flag when sowing nodes * Pull one-root cluster aborting check into the root collection step * Pass target-roots to CHECK-PRIORITY * Don't forget to HALT... * Or to pop off the data frame... --- src/dryad.lisp | 3 +- src/operations/multireweight.lisp | 91 ++++++++++++++++--------------- 2 files changed, 50 insertions(+), 44 deletions(-) diff --git a/src/dryad.lisp b/src/dryad.lisp index ddeb576..fd7f85e 100644 --- a/src/dryad.lisp +++ b/src/dryad.lisp @@ -56,7 +56,8 @@ NOTE: In the basic implementation, these messages must be waiting for the DRYAD (let* ((node-id (message-sow-id message)) (node-process (spawn-process (dryad-node-class dryad) :dryad (process-public-address dryad) - :id node-id)) + :id node-id + :debug? (process-debug? dryad))) (node-address (process-public-address node-process))) (schedule node-process now) (setf (gethash node-address (dryad-ids dryad)) node-id diff --git a/src/operations/multireweight.lisp b/src/operations/multireweight.lisp index 83469ee..816232e 100644 --- a/src/operations/multireweight.lisp +++ b/src/operations/multireweight.lisp @@ -51,15 +51,7 @@ ;;; (define-process-upkeep ((supervisor supervisor) now) (START-MULTIREWEIGHT pong) - "Sets up the multireweight procedure. - -1. Collect the mutually held roots for the `HOLD-CLUSTER' -2. Lock the `HOLD-CLUSTER' and check the rootiness of each root. -3. Change the pingability of the cluster to `:SOFT'. -4. Scan the `HOLD-CLUSTER' for the best external rec to use for reweighting. -5. Reweight the `HOLD-CLUSTER' according to the recommendation. -6. Check to see if the `HOLD-CLUSTER' should be rewound, and do so if need be. -7. Unlock the targets and tear down transient state." + "Sets up the multireweight procedure by first collecting mutually-held roots, which form the `HOLD-CLUSTER'." ;; NOTE: we couldn't call MAKE-PONG even if we wanted to, since we don't have ;; access to the underlying node's Lisp object (or its type). (push (make-data-frame-multireweight :internal-pong nil) @@ -67,49 +59,62 @@ (with-slots (root-bucket source-root) pong (setf root-bucket (remove-duplicates root-bucket :test #'address=)) (process-continuation supervisor - `(CONVERGECAST-COLLECT-ROOTS ,(list source-root) ,root-bucket) - `(CHECK-PRIORITY ,source-root) - `(START-INNER-MULTIREWEIGHT) + `(CONVERGECAST-COLLECT-ROOTS ,source-root ,root-bucket) `(FINISH-MULTIREWEIGHT) `(HALT)))) (define-process-upkeep ((supervisor supervisor) now) - (CONVERGECAST-COLLECT-ROOTS cluster roots) - "Recursively collects the `HELD-BY-ROOTS' values of `ROOTS' to determine the set of roots that are participating in this `HOLD' cluster (meaning that they are mutually held by each other), starting with a base `CLUSTER' of just the `SOURCE-ROOT'. If any replies are NIL, we abort." - (with-slots (hold-cluster) (peek (process-data-stack supervisor)) - (flet ((payload-constructor () - (make-message-convergecast-collect-roots :hold-cluster cluster))) - (with-replies (replies :returned? returned?) - (send-message-batch #'payload-constructor roots) - (when (some #'null replies) - (log-entry :entry-type 'aborting-multireweight - :reason 'root-collection-failed - :hold-cluster cluster - :held-by-roots roots) - (setf (process-lockable-aborting? supervisor) t) - (finish-with-scheduling)) - (setf hold-cluster (reduce #'address-union (list* cluster replies))))))) - -(define-process-upkeep ((supervisor supervisor) now) (CHECK-PRIORITY original-root) - "Confirm that, of the roots in the hold cluster, we have priority to act. Namely, we have priority when our `ORIGINAL-ROOT' carries the minimum ID of all the roots in the cluster." - (with-slots (hold-cluster) (peek (process-data-stack supervisor)) - ;; don't bother _multi_reweighting if we're in a cluster of 1. - (when (endp (rest hold-cluster)) - (log-entry :entry-type 'aborting-multireweight - :reason 'cluster-of-one - :hold-cluster hold-cluster) - (setf (process-lockable-aborting? supervisor) t) - (finish-with-futures)) - (sync-rpc (make-message-id-query) - (original-id original-root) + (CONVERGECAST-COLLECT-ROOTS source-root root-bucket) + "Recursively collects the `HELD-BY-ROOTS' values of `ROOT-BUCKET' to determine the set of roots that are participating in this `HOLD' cluster (meaning that they are mutually held by each other), starting with a base `cluster' of just the `SOURCE-ROOT'. If any replies are NIL, we abort." + (let ((cluster (list source-root))) + (with-slots (hold-cluster) (peek (process-data-stack supervisor)) + (flet ((payload-constructor () + (make-message-convergecast-collect-roots :hold-cluster cluster))) + (with-replies (replies :returned? returned?) + (send-message-batch #'payload-constructor root-bucket) + (when (some #'null replies) + (log-entry :entry-type 'aborting-multireweight + :reason 'root-collection-failed + :hold-cluster cluster + :held-by-roots root-bucket) + (setf (process-lockable-aborting? supervisor) t) + (finish-with-scheduling)) + (setf hold-cluster (reduce #'address-union (list* cluster replies))) + ;; don't bother _multi_reweighting if we're in a cluster of 1. + (when (endp (rest hold-cluster)) + (log-entry :entry-type 'aborting-multireweight + :reason 'cluster-of-one + :hold-cluster hold-cluster) + (setf (process-lockable-aborting? supervisor) t) + (finish-with-futures)) + ;; otherwise, push the next set of commands onto the stack + (process-continuation supervisor + `(CHECK-PRIORITY ,source-root ,hold-cluster) + `(START-INNER-MULTIREWEIGHT))))))) + +(define-process-upkeep ((supervisor supervisor) now) + (CHECK-PRIORITY source-root target-roots) + "Confirm that, of the roots in the hold cluster, we have priority to act. Namely, we have priority when our `SOURCE-ROOT' carries the minimum ID (i.e. coordinate) of all the roots in the `hold-cluster' (passed as `TARGET-ROOTS')." + (let ((hold-cluster target-roots)) + (sync-rpc (make-message-id-query) (source-id source-root) (with-replies (replies) (send-message-batch #'make-message-id-query hold-cluster) (let ((cluster-id (reduce #'min-id replies))) - (unless (equalp original-id (min-id original-id cluster-id)) + (unless (equalp source-id (min-id source-id cluster-id)) (setf (process-lockable-aborting? supervisor) t))))))) (define-process-upkeep ((supervisor supervisor) now) (START-INNER-MULTIREWEIGHT) - "This is the start of the \"critical segment\", where it begins to be impossible to rewind partway through the modifications we're about to make." + "This is the start of the \"critical segment\", where it begins to be impossible to rewind partway through the modifications we're about to make. + +1. Lock the `HOLD-CLUSTER'. +2. Check that each root in the `HOLD-CLUSTER' is still a root. +3. Change the pingability of the cluster to `:SOFT'. +4. Scan the `HOLD-CLUSTER' for the best external rec to use for reweighting. +5. Change the pingability of the cluster to `:NONE'. +6. Reweight the `HOLD-CLUSTER' according to the recommendation. +7. Change the pingability of the cluster to `:SOFT'. +8. Check to see if the `HOLD-CLUSTER' should be rewound, and do so if need be. +9. Unlock the targets and tear down transient state." (with-slots (hold-cluster) (peek (process-data-stack supervisor)) (cond ((not (process-lockable-aborting? supervisor)) @@ -122,7 +127,7 @@ `(MULTIREWEIGHT-BROADCAST-REWEIGHT ,hold-cluster) `(BROADCAST-PINGABILITY ,hold-cluster :SOFT) `(MULTIREWEIGHT-CHECK-REWINDING ,hold-cluster) - `(BROADCAST-UNLOCK))) ; don't destroy trees + `(BROADCAST-UNLOCK))) ; don't destroy trees (t (log-entry :entry-type 'aborting-multireweight :reason 'previously-aborted