Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PRE_JOB_RANK to CondorClassAdScheduler #25

Open
HerrHorizontal opened this issue Jun 15, 2021 · 4 comments
Open

Add PRE_JOB_RANK to CondorClassAdScheduler #25

HerrHorizontal opened this issue Jun 15, 2021 · 4 comments
Labels
enhancement New feature or request

Comments

@HerrHorizontal
Copy link
Contributor

HerrHorizontal commented Jun 15, 2021

The current implementation of the CondorClassAdScheduler

class CondorClassadJobScheduler(JobScheduler):
"""
Goal of the htcondor job scheduler is to have a scheduler that somehow
mimics how htcondor does schedule jobs.
Htcondor does scheduling based on a priority queue. The priorities itself
are managed by operators of htcondor.
So different instances can apparently behave very different.
In this case a priority queue that sorts job slots
by increasing cost is built. The scheduler checks if a job either
exactly fits a slot or if it does fit into it several times. The cost for
putting a job at a given slot is given by the amount of resources that
might remain unallocated.
"""
def __init__(
self,
job_queue,
machine_ad: str = machine_ad_defaults,
job_ad: str = job_ad_defaults,
pre_job_rank: str = "0",
interval: float = 60,
autocluster: bool = False,
):
"""
Initializes the CondorClassadJobScheduler
:param job_queue: queue of jobs that are scheduled in the following simulation
:param machine_ad: ClassAd that is used with every drone
:param job_ad: ClassAd that is used with every job
:param pre_job_rank: ClassAd attribute that all drones are sorted by
:param interval: time between scheduling cycles
:param autocluster: could be used to decide whether to use autoclusters
"""
self._stream_queue = job_queue
self._drones: RankedClusters[Drone] = RankedNonClusters(
quantization=quantization_defaults, ranking=parse(pre_job_rank)
)
self.interval = interval
self.job_queue = JobQueue()
self._collecting = True
self._processing = Resources(jobs=0)
# temporary solution
self._wrapped_classads = WeakKeyDictionary()
self._machine_classad = parse(machine_ad)
self._job_classad = parse(job_ad)
@property
def drone_list(self) -> Iterator[Drone]:
"""
Takes an iterator over the WrappedClassAd objects of drones known to the
scheduler, extracts the drones and returns an iterator over the drone objects.
:return:
"""
for cluster in self._drones.clusters():
for drone in cluster:
yield drone._wrapped
def register_drone(self, drone: Drone):
"""
Provides the drones with the drone ClassAd, combines both into one object and
adds the resulting WrappedClassAd object to the drones known to the scheduler as
well as the dictionary containing all WrappedClassAd objects the scheduler
works with.
:param drone:
"""
wrapped_drone = WrappedClassAd(classad=self._machine_classad, wrapped=drone)
self._drones.add(wrapped_drone)
self._wrapped_classads[drone] = wrapped_drone
def unregister_drone(self, drone: Drone):
"""
Remove a drone's representation from the scheduler's scope.
:param drone:
:return:
"""
drone_wrapper = self._wrapped_classads[drone]
self._drones.remove(drone_wrapper)
def update_drone(self, drone: Drone):
"""
Update a drone's representation in the scheduler scope.
:param drone:
:return:
"""
drone_wrapper = self._wrapped_classads[drone]
self._drones.update(drone_wrapper)
async def run(self):
"""
Runs the scheduler's functionality. One executed, the scheduler starts up and
begins to add the jobs that are
:return:
"""
async with Scope() as scope:
scope.do(self._collect_jobs())
async for _ in interval(self.interval):
await self._schedule_jobs()
if (
not self._collecting
and not self.job_queue
and self._processing.levels.jobs == 0
):
break
@staticmethod
def _match_job(
job: ClassAd, pre_job_clusters: Iterator[List[Set[WrappedClassAd[Drone]]]]
):
"""
Tries to find a match for the transferred job among the available drones.
:param job: job to match
:param pre_job_clusters: list of clusters of wrapped drones that are
presorted by a clustering mechanism of RankedAutoClusters/RankedNonClusters
that mimics the HTCondor NEGOTIATOR_PRE_JOB_RANK, short prejobrank. The
clusters contain drones that are considered to be equivalent with respect to all
Requirements and Ranks
that are used during the matchmaking process. This mimics the Autoclustering
functionality of HTCondor.
[[highest prejobrank {autocluster}, {autocluster}], ..., [lowest prejobrank {
autocluster}, {autocluster}]
:return: drone that is the best match for the job
The matching is performed in several steps:
1. The job's requirements are evaluted and only drones that meet them are
considered further. A drone of every autocluster is extracted from
pre_job_clusters and if it meets the job's requirements it is not removed
from pre_job_clusters.
2. The autoclusters that are equivalent with respect to the prejobrank are
then sorted by the job's rank expression. The resulting format of
pre_job_clusters is
[[(highest prejobrank, highest jobrank) {autocluster} {autocluster},
..., (highest prejobrank, lowest jobrank) {autocluster}], ...]
3. The resulting pre_job_clusters are then iterated and the drone with the
highest (prejobrank, jobrank) whose requirements are also compatible with the
job is returned as best match.
"""
def debug_evaluate(expr, my, target=None):
"""
Reimplementation of the classad packages evaluate function. Having it
here enables developers to inspect the ClassAd evaluation process more
closely and to add debug output if necessary.
:param expr:
:param my:
:param target:
:return:
"""
if type(expr) is str:
expr = my[expr]
result = expr.evaluate(my=my, target=target)
return result
if job["Requirements"] != Undefined():
pre_job_clusters_tmp = []
for cluster_group in pre_job_clusters:
cluster_group_tmp = []
for cluster in cluster_group:
if debug_evaluate(
"Requirements", my=job, target=next(iter(cluster))
):
cluster_group_tmp.append(cluster)
pre_job_clusters_tmp.append(cluster_group_tmp)
pre_job_clusters = pre_job_clusters_tmp
if job["Rank"] != Undefined():
pre_job_clusters_tmp = []
for cluster_group in pre_job_clusters:
pre_job_clusters_tmp.append(
sorted(
cluster_group,
key=lambda cluster: (
debug_evaluate("Rank", my=job, target=next(iter(cluster))),
random.random(),
),
reverse=True,
)
)
pre_job_clusters = pre_job_clusters_tmp
for cluster_group in pre_job_clusters:
# TODO: if we have POST_JOB_RANK, collect *all* matches of a group
for cluster in cluster_group:
for drone in cluster:
if drone["Requirements"] == Undefined() or drone.evaluate(
"Requirements", my=drone, target=job
):
return drone
raise NoMatch()
async def _schedule_jobs(self):
"""
Handles the scheduling of jobs. Tried to match the jobs in the job queue to
available resources. This occurs in several steps.
1. The list of drones known to the scheduler is copied. The copy can then be
used to keep track of the drones' available resources while matching jobs as
the jobs allocate resources on the original drones before being processed but
not during scheduling.
2. The job in the job queue are matched to (the copied)resources iteratively.
The actual matching is performed by the `_match_job` method that returns the
most suitable drone unless no drone is compatible with the job's requirements.
If a match was found, the resources requested by the job are allocated on the
matched drone. If no resources remain unallocated after the last job's
allocation, the matching process is ended for this scheduler interval.
3. After the job matching is finished, the matched jobs are removed from the
job queue as the index of a job in the job queue changes once a job with a
lower index is removed from the queue.
4. The matched jobs' execution is triggered.
"""
# Pre CachingJob Rank is the same for all jobs
# Use a copy to allow temporary "remainder after match" estimates
if self._drones.empty():
return
pre_job_drones = self._drones.copy()
matches: List[
Tuple[int, WrappedClassAd[CachingJob], WrappedClassAd[Drone]]
] = []
for queue_index, candidate_job in enumerate(self.job_queue):
try:
pre_job_drones.lookup(candidate_job._wrapped)
matched_drone = self._match_job(
candidate_job, pre_job_drones.cluster_groups()
)
except NoMatch:
candidate_job._wrapped.failed_matches += 1
continue
else:
matches.append((queue_index, candidate_job, matched_drone))
for key, value in candidate_job._wrapped.resources.items():
matched_drone._temp[key] = (
matched_drone._temp.get(
key,
matched_drone._wrapped.theoretical_available_resources[key],
)
- value
)
pre_job_drones.update(matched_drone)
# monitoring/coordination stuff
if (
candidate_job._wrapped._total_input_data
and matched_drone._wrapped.cached_data
):
candidate_job._wrapped._cached_data = (
matched_drone._wrapped.cached_data
)
if pre_job_drones.empty():
break
if not matches:
return
# TODO: optimize for few matches, many matches, all matches
for queue_index, _, _ in reversed(matches):
del self.job_queue[queue_index]
for _, job, drone in matches:
drone.clear_temporary_resources()
await self._execute_job(job=job, drone=drone)
await sampling_required.put(self)
# NOTE: Is this correct? Triggers once instead of for each job
await sampling_required.put(self.job_queue)
await sampling_required.put(UserDemand(len(self.job_queue)))
async def _execute_job(self, job: WrappedClassAd, drone: WrappedClassAd):
"""
Schedules a job on a drone by extracting both objects from the
respective WrappedClassAd and using the drone's scheduling functionality
:param job:
:param drone:
"""
wrapped_job = job._wrapped
wrapped_drone = drone._wrapped
await wrapped_drone.schedule_job(wrapped_job)
async def _collect_jobs(self):
"""
Combines jobs that are imported from the simulation's job config with a job
ClassAd and adds the resulting WrappedClassAd objects to the scheduler's job
queue.
"""
async for job in self._stream_queue:
wrapped_job = WrappedClassAd(classad=self._job_classad, wrapped=job)
self._wrapped_classads[job] = wrapped_job
self.job_queue.append(wrapped_job)
await self._processing.increase(jobs=1)
# TODO: logging happens with each job
# TODO: job queue to the outside now contains wrapped classads...
await sampling_required.put(self.job_queue)
await sampling_required.put(UserDemand(len(self.job_queue)))
self._collecting = False
async def job_finished(self, job):
"""
Handles the impact of finishing jobs on the scheduler. If the job is completed
successfully, the amount of running jobs matched by the current scheduler
instance is reduced. If the job is not finished successfully,
it is resubmitted to the scheduler's job queue.
:param job:
"""
if job.successful:
await self._processing.decrease(jobs=1)
else:
self.job_queue.append(self._wrapped_classads[job])
does not consider the PRE_JOB_RANK in the negotiation process. Currently a PRE_JOB_RANK classad that works in the context of Drones instead of CachingJobs is assumed, as this improves the runtime of the scheduling process.

Comparable to the implementation of RANK, the classad mechanism for the PRE_JOB_RANK might also be used in the context of all available components including CachingJobs, instead of the current drone ranking:

self._drones: RankedClusters[Drone] = RankedNonClusters(
quantization=quantization_defaults, ranking=parse(pre_job_rank)
)

As this will heavily impact the performance of simulation this might be done in a separate class. A renaming of the current CondorClassAdScheduler might be reasonable to make clear, that it does not implement the whole functionality of the original HTCondor Scheduler.

@HerrHorizontal HerrHorizontal added the enhancement New feature or request label Jun 15, 2021
@eileen-kuehn
Copy link
Member

Would you please clarify what you mean with modifying the PRE_JOB_RANK during negotiation process? Each negotiator has a predefined PRE_JOB_RANK, that cannot be modified during the process of negotiation.

@HerrHorizontal
Copy link
Contributor Author

Following the description in https://htcondor.readthedocs.io/en/latest/admin-manual/configuration-macros.html?highlight=NEGOTIATOR_PRE_JOB_RANK#condor-negotiator-configuration-file-entries and https://htcondor.readthedocs.io/en/latest/admin-manual/user-priorities-negotiation.html?highlight=NEGOTIATOR_PRE_JOB_RANK#negotiation, the PRE_JOB_RANK is just a higher prioritized rank in the matching process than the RANK set by the user. As @tfesenbecker points out in her thesis, it might be nice to be able to set this PRE_JOB_RANK via class ads, as it can be done for the RANK.

Maybe I just have mistaken the code and this is already possible?

@eileen-kuehn
Copy link
Member

eileen-kuehn commented Jun 21, 2021

We considered this already for implementation. The issue is that we currently use a predefined ordering to keep the complexity of the scheduling process bounded. In case we properly add the PRE_JOB_RANK we have to update the values of each job, and do the sorting before actually going on with everything else. This will have a huge impact on the simulation time.
We foresee to provide another type of scheduler that will properly implement the PRE_JOB_RANK but will have a much higher cost than the currently implemented scheduler.

In case you have a great idea on what you actually want to do with the PRE_JOB_RANK, we can prioritise this issue.

@HerrHorizontal
Copy link
Contributor Author

Actually, @tfesenbecker already suggested studying this in her thesis, but wasn't able to do so, because of the current implementation. However, I don't see this as high priority at the moment.

@eileen-kuehn eileen-kuehn changed the title [Scheduler] CondorClassAdScheduler doesn't allow modification of PRE_JOB_RANK Add PRE_JOB_RANK to CondorClassAdScheduler Jun 22, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants