import os
import logging
import traceback
import sys
import time
import subprocess as sp
from ast import literal_eval
import sisyphus.tools as tools
import sisyphus.global_settings as gs
[docs]class Task(object):
"""
Object to hold information what function should be run with which requirements.
"""
def __init__(self, start, resume=None, rqmt={}, args=[[]], mini_task=False,
update_rqmt=None, parallel=0, tries=1, continuable=False):
"""
:param str start: name of the function which will be executed on start
:param str resume: name of the function which will be executed on resume, often set equal to start
:param dict[str] rqmt: job requirements
Might contain:
"cpu": number of cpus
"gpu": number of gpus
"mem": amount of memory, in GB
"time": amount of time, in hours
:param list[list[object]|object] args: job arguments
:param bool mini_task: will be run on engine for short jobs if True
:param (dict[str],dict[str])->dict[str] update_rqmt: function to update job requirements for interrupted jobs
:param int parallel: the max. number of jobs to submit to a queue, defaults to the number of args
:param int tries: how often this task is resubmitted after failure
:param bool continuable: If set to True this task will not set a finished marker, useful for tasks that can be
continued for arbitrarily long, e.g. adding more epochs to neural network training
"""
self._start = start
self._resume = resume
self._rqmt = rqmt
if mini_task:
self._rqmt['engine'] = 'short'
self._update_rqmt = update_rqmt if update_rqmt else gs.update_engine_rqmt
self._args = list(args)
self._parallel = len(self._args) if parallel == 0 else parallel
self.mini_task = mini_task
self.reset_cache()
self.last_state = None
self.tries = tries
self.continuable = continuable
def __repr__(self):
return "<Task %r job=%r>" % (self._start, getattr(self, "_job", None))
def reset_cache(self):
self._state_cache = {}
self._state_cache_time = {}
def set_job(self, job):
"""
:param sisyphus.job.Job job:
"""
self._job = job
for name in self._start, self._resume:
try:
if name is not None:
getattr(self._job, name)
except AttributeError:
logging.critical("Trying to create a task with an invalid function name")
logging.critical("Job name: %s" % str(job))
logging.critical("Function name: %s" % str(name))
raise
def get_f(self, name):
return getattr(self._job, name)
def task_ids(self):
"""
:return: list with all valid task ids
:rtype: list[int]
"""
return list(range(1, self._parallel + 1))
def rqmt(self):
if callable(self._rqmt):
rqmt = self._rqmt()
else:
rqmt = self._rqmt
# Ensure that the requested memory is a float representing GB
if 'mem' in rqmt:
rqmt['mem'] = tools.str_to_GB(rqmt['mem'])
if 'time' in rqmt:
rqmt['time'] = tools.str_to_hours(rqmt['time'])
return rqmt
def name(self):
return self._start
def resumeable(self):
return self._resume is not None
def run(self, task_id, resume_job=False, logging_thread=None):
"""
This function is executed to run this job.
:param int task_id:
:param bool resume_job:
:param sisyphus.worker.LoggingThread logging_thread:
"""
logging.debug("Task name: %s id: %s" % (self.name(), task_id))
job = self._job
logging.info("Start Job: %s Task: %s" % (job, self.name()))
logging.info("Inputs:")
for i in self._job._sis_inputs:
logging.info(str(i))
# each input must be at least X seconds old
# if an input file is too young it's may not synced in a network filesystem yet
try:
input_age = time.time() - os.stat(i.get_path()).st_mtime
time.sleep(max(0, gs.WAIT_PERIOD_MTIME_OF_INPUTS - input_age))
except FileNotFoundError:
logging.warning('Input path does not exist: %s' % i.get_path())
if i.creator and gs.ENABLE_LAST_USAGE:
# mark that input was used
try:
os.unlink(os.path.join(i.creator, gs.JOB_LAST_USER, os.getlogin()))
except OSError as e:
if e.errno not in (2, 13):
# 2: file not found
# 13: permission denied
raise e
try:
user_path = os.path.join(i.creator, gs.JOB_LAST_USER, os.getlogin())
os.symlink(os.path.abspath(job._sis_path()), user_path)
os.chmod(user_path, 0o775)
except OSError as e:
if e.errno not in (2, 13, 17):
# 2: file not found
# 13: permission denied
# 17: file exists
raise e
tools.get_system_informations(sys.stdout)
sys.stdout.flush()
try:
if resume_job:
if self._resume is not None:
task = self._resume
else:
task = self._start
logging.warning('No resume function set (changed tasks after job was initialized?) '
'Fallback to normal start function: %s' % task)
else:
task = self._start
assert task is not None, "Error loading task"
# save current directory and change into work directory
with tools.execute_in_dir(self.path(gs.JOB_WORK_DIR)):
f = getattr(self._job, task)
# get job arguments
for arg_id in self._get_arg_idx_for_task_id(task_id):
args = self._args[arg_id]
if not isinstance(args, (list, tuple)):
args = [args]
logging.info("-" * 60)
logging.info("Starting subtask for arg id: %d args: %s" % (arg_id, str(args)))
logging.info("-" * 60)
f(*args)
except sp.CalledProcessError as e:
if e.returncode == 137:
# TODO move this into engine class
logging.error("Command got killed by SGE (probably out of memory):")
logging.error("Cmd: %s" % e.cmd)
logging.error("Args: %s" % str(e.args))
logging.error("Return-Code: %s" % e.returncode)
logging_thread.out_of_memory = True
logging_thread.stop()
else:
logging.error("Executed command failed:")
logging.error("Cmd: %s" % e.cmd)
logging.error("Args: %s" % str(e.args))
logging.error("Return-Code: %s" % e.returncode)
logging_thread.stop()
self.error(task_id, True)
except Exception:
# Job failed
logging.error("Job failed, traceback:")
sys.excepthook(*sys.exc_info())
logging_thread.stop()
self.error(task_id, True)
# TODO handle failed job
else:
# Job finished normally
logging_thread.stop()
if not self.continuable:
self.finished(task_id, True)
sys.stdout.flush()
sys.stderr.flush()
logging.info("Job finished successful")
def task_name(self):
return '%s.%s' % (self._job._sis_id(), self.name())
def path(self, path_type=None, task_id=None):
if path_type not in (None, gs.JOB_WORK_DIR, gs.JOB_SAVE, gs.JOB_LOG_ENGINE):
path_type = '%s.%s' % (path_type, self.name())
return self._job._sis_path(path_type, task_id)
def check_state(self, state, task_id=None, update=None, combine=all, minimal_time_since_change=0):
"""
:param state: name of state
:param int|list[int]|None task_id:
:param bool|None update: if not None change state to this value
:param combine: how states are combines, e.g. only finished if all jobs are finished => all,
error state is true if only one or more is has the error flag => any
:param minimal_time_since_change: only true if state change is at least that old
:return: if this state is currently set or not
:rtype: bool
"""
if task_id is None:
task_id = self.task_ids()
current_state = self._job._sis_file_logging(state + '.' + self.name(), task_id, update=update,
combine=combine, minimal_file_age=minimal_time_since_change)
return current_state
def finished(self, task_id=None, update=None) -> bool:
minimal_time_since_change = 0
if not gs.SKIP_IS_FINISHED_TIMEOUT:
minimal_time_since_change = gs.WAIT_PERIOD_JOB_FS_SYNC + gs.WAIT_PERIOD_JOB_CLEANUP
if self.check_state(gs.STATE_FINISHED, task_id, update=update, combine=all,
minimal_time_since_change=minimal_time_since_change):
return True
else:
return False
def error(self, task_id=None, update=None):
"""
:param int|list[int]|None task_id:
:param bool|None update:
:return: true if job or task is in error state.
:rtype: bool
"""
if update:
# set error flag
self.check_state(gs.STATE_ERROR, task_id, update=update, combine=any)
return True
if isinstance(task_id, int):
task_ids = [task_id]
elif task_id is None:
task_ids = self.task_ids()
elif isinstance(task_id, list):
task_ids = task_id
else:
raise Exception("unexpected task_id %r" % (task_id,))
assert isinstance(task_ids, list)
for task_id in task_ids:
error_file = self._job._sis_path(gs.STATE_ERROR + '.' + self.name(), task_id)
error_file = os.path.realpath(error_file)
if os.path.isfile(error_file): # task is in error state
# move log file and remove error file if a usued try is left
for i in range(1, self.tries):
log_file = self._job._sis_path(gs.JOB_LOG + '.' + self.name(), task_id)
new_name = "%s.error.%02i" % (log_file, i)
if not os.path.isfile(new_name):
if os.path.isfile(log_file):
os.rename(log_file, new_name)
os.remove(error_file)
break
if os.path.isfile(error_file):
# task is still in error state
return True
return False
def started(self, task_id=None):
""" True if job execution has started """
path = self.path(gs.JOB_LOG, task_id)
state = os.path.isfile(path)
return state
def print_error(self, lines=0):
for task_id in self.task_ids():
if self.error(task_id):
logging.error("Job: %s Task: %s %s" % (self._job._sis_id(), self.name(), task_id))
logpath = self.path(gs.JOB_LOG, task_id)
if os.path.exists(logpath):
with open(logpath) as log:
logging.error("Logfile:")
print()
if lines > 0:
print("".join(log.readlines()[-lines:]), end='')
else:
print(log.read())
def state(self, engine, task_id=None, force=False):
if force or time.time() - self._state_cache_time.get(task_id, -20) >= 10:
state = self._get_state(engine, task_id)
self._state_cache[task_id] = state
self._state_cache_time[task_id] = time.time()
return self._state_cache[task_id]
def _get_state(self, engine, task_id=None):
""" Store return of helper as value as last state """
self.last_state = self._get_state_helper(engine, task_id)
return self.last_state
def _get_state_helper(self, engine, task_id=None):
""" Return state of this task given by external code """
# Handling external states
if self.finished(task_id):
return gs.STATE_FINISHED
elif self.error(task_id):
return gs.STATE_ERROR
else:
# Task is not finished and not in error state, time to check the engine
if task_id is None:
# Check all task_id of this task, return the 'worst' state
engine_states = [self.state(engine, i) for i in self.task_ids()]
for engine_state in (
gs.STATE_ERROR,
gs.STATE_QUEUE_ERROR,
gs.STATE_INTERRUPTED,
gs.STATE_RUNNABLE,
gs.STATE_QUEUE,
gs.STATE_RUNNING,
gs.STATE_RETRY_ERROR,
gs.STATE_FINISHED):
if engine_state in engine_states:
return engine_state
logging.critical("Could not determine state of task: %s" % str(engine_states))
assert False # This code point should be unreachable
else:
# check state for the given task id
if engine is None:
engine_state = gs.STATE_UNKNOWN
else:
engine_state = engine.task_state(self, task_id)
assert engine_state in (gs.STATE_QUEUE, gs.STATE_QUEUE_ERROR, gs.STATE_RUNNING, gs.STATE_UNKNOWN)
# force cache update to avoid caching problems if last state was not also UNKNOWN
if engine_state == gs.STATE_UNKNOWN and self.last_state and \
self.last_state != gs.STATE_UNKNOWN and self.started(task_id):
engine.reset_cache()
engine_state = engine.task_state(self, task_id)
assert engine_state in (gs.STATE_QUEUE, gs.STATE_QUEUE_ERROR,
gs.STATE_RUNNING, gs.STATE_UNKNOWN)
if engine_state == gs.STATE_UNKNOWN:
if self.started(task_id):
# check again if it finished or crashed while retrieving the state
if self.finished(task_id):
return gs.STATE_FINISHED
elif self.error(task_id):
return gs.STATE_ERROR
# job logging file got updated recently, assume job is still running.
# used to avoid wrongly marking jobs as interrupted do to slow filesystem updates
elif self.running(task_id):
return gs.STATE_RUNNING
history = [] if engine is None else engine.get_submit_history(self)
if history and len(history[task_id]) > gs.MAX_SUBMIT_RETRIES:
# More then three tries to run this task, something is wrong
return gs.STATE_RETRY_ERROR
else:
# Task was started, but isn't running anymore => interrupted
return gs.STATE_INTERRUPTED
else:
return gs.STATE_RUNNABLE
else:
if engine_state == gs.STATE_RUNNING and self.running(task_id) is False:
# Warn if job is running but doesn't update logging file anymore
logging.warning('Job marked as running but logging file has not been updated: '
'%s assume it is running' % str(self._job))
return engine_state
def running(self, task_id):
"""
:return: True if usage file changed recently, None if usage file doesn't exist False otherwise
"""
usage_file = self._job._sis_path(gs.PLOGGING_FILE + '.' + self.name(), task_id, abspath=True)
maximal_file_age = gs.WAIT_PERIOD_JOB_FS_SYNC + gs.PLOGGING_UPDATE_FILE_PERIOD + gs.WAIT_PERIOD_JOB_CLEANUP
if not os.path.isfile(usage_file):
return None
return maximal_file_age > time.time() - os.path.getmtime(usage_file)
def _get_arg_idx_for_task_id(self, task_id):
"""
:param int task_id:
:rtype: list[int]
"""
assert task_id > 0, "this function assumes task_ids start at 1"
nargs = len(self._args)
chunk_size = nargs // self._parallel
overflow = nargs % self._parallel
if task_id - 1 < overflow:
start = (chunk_size + 1) * (task_id - 1)
return range(start, start + chunk_size + 1)
else:
start = (chunk_size + 1) * overflow + chunk_size * (task_id - 1 - overflow)
return range(start, start + chunk_size)
def update_rqmt(self, initial_rqmt, submit_history, task_id):
""" Update task requirements of interrupted job """
initial_rqmt = initial_rqmt.copy()
initial_rqmt['mem'] = tools.str_to_GB(initial_rqmt['mem'])
initial_rqmt['time'] = tools.str_to_hours(initial_rqmt['time'])
usage_file = self._job._sis_path(gs.PLOGGING_FILE + '.' + self.name(), task_id, abspath=True)
try:
last_usage = literal_eval(open(usage_file).read())
except (SyntaxError, IOError):
# we don't know anything if no usage file is writen or is invalid, just reuse last rqmts
return initial_rqmt
rresources = last_usage['requested_resources']
if 'mem' in rresources:
rresources['mem'] = tools.str_to_GB(rresources['mem'])
if 'time' in rresources:
rresources['time'] = tools.str_to_hours(rresources['time'])
new_rqmt = self._update_rqmt(initial_rqmt=initial_rqmt, last_usage=last_usage)
new_rqmt = gs.check_engine_limits(new_rqmt, self)
return new_rqmt
def get_process_logging_path(self, task_id):
return self._job._sis_path(gs.PLOGGING_FILE + '.' + self.name(), task_id, abspath=True)
def __str__(self):
return "Task < workdir(%s) name(%s) ids(%s) >" % (
self.path(), self.name(), ','.join(str(i) for i in self.task_ids()))
def get_worker_call(self, task_id=None):
if isinstance(gs.SIS_COMMAND, list):
call = gs.SIS_COMMAND[:]
else:
call = gs.SIS_COMMAND.split()
call += [gs.CMD_WORKER, os.path.relpath(self.path()), self.name()]
if task_id is not None:
call.append(str(task_id))
return call