Source code for sisyphus.engine

# Author: Jan-Thorsten Peter <peter@cs.rwth-aachen.de>

import collections
import copy
import logging
import os
import psutil
from ast import literal_eval

import sisyphus.global_settings as gs
import sisyphus.tools as tools


[docs] class EngineBase: """ An engine manages the execution of jobs, e.g. locally, or in a queuing system like SGE. """
[docs] @staticmethod def get_task_id(task_id): """Gets task id either from args or the environment""" raise NotImplementedError
def task_state(self, task, task_id): raise NotImplementedError def start_engine(self): raise NotImplementedError def stop_engine(self): raise NotImplementedError def reset_cache(self): raise NotImplementedError
[docs] def submit_call(self, call, logpath, rqmt, name, task_name, task_ids): """ :param list[str] call: :param str logpath: :param dict[str] rqmt: :param str name: :param str task_name: :param list[int] task_ids: :return: ENGINE_NAME, submitted (list of (list of task ids, job id)) :rtype: (str, list[(list[int],str|int)]) """ raise NotImplementedError
def get_default_rqmt(self, task): raise NotImplementedError def get_used_engine(self, engine_selector): return self @tools.cache_result() def get_submit_history(self, task): submit_log = os.path.relpath(task.path(gs.ENGINE_SUBMIT)) id_to_rqmt = collections.defaultdict(list) if os.path.isfile(submit_log): if hasattr(task, "submit_history_cache"): last_update, id_to_rqmt = task.submit_history_cache else: last_update = None current_mtime = os.path.getmtime(submit_log) if not last_update or last_update != current_mtime: with open(submit_log) as submit_file: for line in submit_file: ids, r = literal_eval(line) # list of ids, submitted requirement for this id for i in ids: id_to_rqmt[i].append(r) task.submit_history_cache = (current_mtime, id_to_rqmt) return id_to_rqmt def add_defaults_to_rqmt(self, task, rqmt): s = copy.deepcopy(self.get_default_rqmt(task)) s.update(rqmt) return s
[docs] def get_rqmt(self, task, task_id, update=True): """ Get the requirements submitted for this task :param sisyphus.task.Task task: :param int task_id: :param bool update: """ rqmt = task.rqmt() # find last requirements rqmt = self.add_defaults_to_rqmt(task, rqmt) rqmt_hist = self.get_submit_history(task)[task_id] if rqmt_hist and rqmt_hist[0] == rqmt: # job has been submitted before and the rqmt given by the recipe did not change rqmt.update(rqmt_hist[-1]) if update: rqmt = task.update_rqmt(rqmt, task_id) if "mem" in rqmt: rqmt["mem"] = tools.str_to_GB(rqmt["mem"]) if "time" in rqmt: rqmt["time"] = tools.str_to_hours(rqmt["time"]) return gs.check_engine_limits(rqmt, task)
[docs] def job_state(self, job): """Return current state of job""" if not job._sis_setup(): if job._sis_runnable(): return gs.STATE_RUNNABLE else: return gs.STATE_WAITING elif job._sis_finished(): return gs.STATE_FINISHED for task in job._sis_tasks(): if not task.finished(): return self.task_state(task) return gs.STATE_FINISHED
[docs] def get_job_used_resources(self, current_process): """ Should be overwritten by subclass if a better way to measure the used resources is available, e.g. cgroups. This function should only be used by the worker. :param psutil.Process current_process: :param engine_selector: """ d = {} # get memory usage and convert it to GB mem_info = current_process.memory_info() d["rss"] = mem_info.rss / 1024**3 d["vms"] = mem_info.vms / 1024**3 d["cpu"] = current_process.cpu_percent() for child in current_process.children(recursive=True): try: mem_info = child.memory_info() d["rss"] += mem_info.rss / 1024**3 d["vms"] += mem_info.vms / 1024**3 d["cpu"] += child.cpu_percent() except psutil.NoSuchProcess: # Quietly continue if job disappeared continue return d
[docs] def submit(self, task): """Prepares all relevant commands and calls submit_call of subclass to actual pass job to relevant engine :param sisyphus.task.Task task: Task to submit :return: None """ call = task.get_worker_call() logpath = os.path.relpath(task.path(gs.JOB_LOG_ENGINE)) task_ids = [ task_id for task_id in task.task_ids() if task.state(self, task_id, True) in [gs.STATE_RUNNABLE, gs.STATE_INTERRUPTED_RESUMABLE] ] # update rqmts and collect them rqmt_to_ids = {} for task_id in task_ids: rqmt = self.get_rqmt(task, task_id) key = tools.sis_hash(rqmt) if key not in rqmt_to_ids: rqmt_to_ids[key] = (rqmt, set()) rqmt_, ids = rqmt_to_ids[key] assert task_id not in ids assert rqmt == rqmt_ ids.add(task_id) # the actuary job submitting part submit_log = os.path.relpath(task.path(gs.ENGINE_SUBMIT)) for rqmt_key, (rqmt, task_ids) in rqmt_to_ids.items(): task_ids = sorted(task_ids) logging.info("Submit to queue: %s %s %s" % (str(task.path()), task.name(), str(task_ids))) engine_name, engine_info = self.submit_call(call, logpath, rqmt, task.task_name(), task.name(), task_ids) logging.debug("Command: (%s) Tasks ids: (%s)" % (" ".join(call), " ".join([str(i) for i in task_ids]))) logging.debug("Requirements: %s" % (str(rqmt))) submit_info = rqmt.copy() submit_info["engine_info"] = engine_info submit_info["engine_name"] = engine_name with open(submit_log, "a") as submit_file: submit_file.write("%s\n" % str((task_ids, submit_info))) task.reset_cache()
[docs] def init_worker(self, task): """ This method will be call before the task is started by the worker. e.g. SGE uses this method to link the SGE log file to the desired position. :param task: :return: """ pass
[docs] class EngineSelector(EngineBase): """ The EngineSelector engine wraps multiple other engines and schedules the jobs according to the requirements (rqmt). Tasks with mini_task=True will use the engine selector 'short', so usually that engine should be specified as well. """ def __init__(self, engines, default_engine): """ :param dict[str,EngineBase] engines: :param str default_engine: """ assert isinstance(default_engine, str), "default_engine must be a string: %r" % (default_engine,) for k, v in engines.items(): assert isinstance(k, str) and isinstance( v, EngineBase ), "engines must only contain strings as keys " "and Engines as value: (%r, %r)" % (k, v) self.engines = engines self.default_engine = default_engine
[docs] def get_used_engine(self, engine_selector): """ :param str engine_selector: name in self.engines :rtype: EngineBase """ assert engine_selector in self.engines, "Unknown engine selector: %r" % engine_selector return self.engines[engine_selector].get_used_engine(engine_selector)
[docs] def get_used_engine_by_rqmt(self, rqmt): """ :param dict[str] rqmt: :rtype: EngineBase """ engine_selector = rqmt.get("engine", self.default_engine) return self.get_used_engine(engine_selector)
[docs] def get_job_used_resources(self, current_process): assert NotImplementedError, "Used active engine first via get_used_engine first"
[docs] def task_state(self, task, task_id): """Return state of task""" return self.get_used_engine_by_rqmt(task.rqmt()).task_state(task, task_id)
[docs] def for_all_engines(self, f): """Tell all engines to stop""" visited = set() for engine in self.engines.values(): eid = id(engine) if eid not in visited: visited.add(eid) f(engine)
def start_engine(self): self.for_all_engines(lambda e: e.start_engine())
[docs] def stop_engine(self): """Tell all engines to stop""" self.for_all_engines(lambda e: e.stop_engine())
def reset_cache(self): self.for_all_engines(lambda e: e.reset_cache()) # def get_task_id(self, task_id): # # This function should only be used while the worker # assert NotImplementedError, "Used active engine first via get_used_engine first" # """ Gets task id either from args or the environment""" # if task_id is not None: # # task id passed via argument # return task_id # return self.get_used_engine(engine_selector).get_task_id(task_id, engine_selector) # def get_logpath(self, logpath, task_name, task_id): # """ Returns log file for the currently running task """ # return self.get_used_engine(engine_selector).get_logpath(logpath, task_name, task_id, engine_selector)
[docs] def submit_call(self, call, logpath, rqmt, name, task_name, task_ids): engine_selector = rqmt.get("engine", self.default_engine) # update call to contain selected engine new_call = [] added = False for i in call: new_call.append(i) if not added and i == gs.CMD_WORKER: new_call.append("--engine") new_call.append(engine_selector) added = True return self.get_used_engine(engine_selector).submit_call(new_call, logpath, rqmt, name, task_name, task_ids)
def get_default_rqmt(self, task): return self.get_used_engine_by_rqmt(task.rqmt()).get_default_rqmt(task)