Source code for sisyphus.global_settings


"""
These settings can be overwritten via a ``settings.py`` file in the current directory, when ``sis`` is run.
"""

# Author: Jan-Thorsten Peter <peter@cs.rwth-aachen.de>

import sys
import logging
import sisyphus.hash
# noinspection PyUnresolvedReferences
from sisyphus.global_constants import *
# noinspection PyUnresolvedReferences
import os


[docs]def engine(): """ Create engine object used to submit jobs. The simplest setup just creates a local engine starting all jobs on the local machine e.g.:: from sisyphus.localengine import LocalEngine return LocalEngine(cpu=8) The usually recommended version is to use a local and a normal grid engine. The EngineSelector can be used to schedule tasks on different engines. The main intuition was to have an engine for very small jobs that don't required to be scheduled on a large grid engine (e.g. counting lines of file). A setup using the EngineSelector would look like this:: from sisyphus.localengine import LocalEngine from sisyphus.engine import EngineSelector from sisyphus.son_of_grid_engine import SonOfGridEngine return EngineSelector(engines={'short': LocalEngine(cpu=4), 'long': SonOfGridEngine(default_rqmt={'cpu': 1, 'mem': 1, 'gpu': 0, 'time': 1})}, default_engine='long') Note: the engines should only be imported locally inside the function to avoid circular imports :return: engine (EngineBase) """ import psutil cpu_count = psutil.cpu_count() if ENGINE_NOT_SETUP_WARNING: logging.info('No custom engine setup, using default engine: LocalEngine(cpu=%i, gpu=0)' % cpu_count) from sisyphus.localengine import LocalEngine return LocalEngine(cpu=cpu_count, gpu=0)
[docs]def update_engine_rqmt(initial_rqmt, last_usage): """ Update requirements after a job got interrupted. :param dict[str] initial_rqmt: requirements that are requested first :param dict[str] last_usage: information about the last usage by the task :return: updated requirements :rtype: dict[str] """ # Contains the resources requested for interrupted run requested_resources = last_usage.get('requested_resources', {}) requested_time = requested_resources.get('time', initial_rqmt.get('time', 1)) requested_memory = requested_resources.get('mem', initial_rqmt.get('mem', 1)) # How much was actually used used_time = last_usage.get('used_time', 0) used_memory = last_usage.get('max', {}).get('rss', 0) # Did it (nearly) break the limits? out_of_memory = last_usage.get('out_of_memory') or requested_memory - used_memory < 0.25 out_of_time = requested_time - used_time < 0.1 # Double limits if needed if out_of_time: requested_time = max(initial_rqmt.get('time', 0), requested_time * 2) if out_of_memory: requested_memory = max(initial_rqmt.get('mem', 0), requested_memory * 2) # create updated rqmt dict out = initial_rqmt.copy() out.update(requested_resources) out['time'] = requested_time out['mem'] = requested_memory return out
# noinspection PyUnusedLocal
[docs]def check_engine_limits(current_rqmt, task): """ Check if requested requirements break and hardware limits and reduce them. By default ignored, a possible check for limits could look like this:: current_rqmt['time'] = min(168, current_rqmt.get('time', 2)) if current_rqmt['time'] > 24: current_rqmt['mem'] = min(63, current_rqmt['mem']) else: current_rqmt['mem'] = min(127, current_rqmt['mem']) return current_rqmt :param dict[str] current_rqmt: requirements currently requested :param sisyphus.task.Task task: task that is handled :return: requirements updated to engine limits :rtype: dict[str] """ return current_rqmt
[docs]def file_caching(path): """ This function should be replaced to enable file caching. e.g. copy given file to /var/tmp and return new path. The default behaviour is to just pass on the given path :param str path: Path to file that should be cached :return: path to cached file :rtype: str """ logging.info('No file caching function set, simply keep given path: %s' % path) return path
# Experimental settings # Log when a job output was used the last time, currently not active maintained ENABLE_LAST_USAGE = False # Add tags attached to job to work path, currently not active maintained JOB_USE_TAGS_IN_PATH = False # Start kernel to connect remotely, currently not active maintained START_KERNEL = False # Link all computed outputs to this directory for easy sharing in a team TEAM_SHARE_DIR = None # If set results will be linked to this directory #: Automatically clean up job directory after job has finished JOB_AUTO_CLEANUP = True #: How often to check for finished jobs in seconds JOB_CLEANER_INTERVAL = 60 #: How many threads should be cleaning in parallel JOB_CLEANER_WORKER = 5 #: If the job internal work directory should be keeped re deleted during clean up JOB_CLEANUP_KEEP_WORK = False #: Default value for job used by tk.cleaner to determine if a job should be removed or not JOB_DEFAULT_KEEP_VALUE = 50 #: How many threads should update the graph in parallel, useful if the filesystem has a high latency GRAPH_WORKER = 16 #: How many threads are used to setup the job directory and submit jobs MANAGER_SUBMIT_WORKER = 10 #: How many locks can be used by all jobs (one lock per job). If there are more jobs than locks, locks are reused #: This could lead to a slowdown, but the number of locks per process is limited JOB_MAX_NUMBER_OF_LOCKS = 100 #: How often sisyphus will try to resubmit a task to the engine before returning a RETRY_ERROR MAX_SUBMIT_RETRIES = 3 #: Default function to hash jobs and objects SIS_HASH = sisyphus.hash.short_hash #: Path to the config directory, not including the directory name 'config' CONFIG_PATH = '.' #: Path to the recipe directory, not including the directory name 'recipe' RECIPE_PATH = '.' #: The work directory WORK_DIR = 'work' # Name default config file if no config directory is found CONFIG_FILE_DEFAULT = "config.py" #: Name of default fuction to call in config directory CONFIG_FUNCTION_DEFAULT = "%s.main" % CONFIG_PREFIX #: Name alias directory ALIAS_DIR = 'alias' #: Name output directory OUTPUT_DIR = 'output' #: If set to a non-empty string aliases and outputs will be placed in a subdir. #: This is useful for setups with multiple configs ALIAS_AND_OUTPUT_SUBDIR = '' #: Show job targets on status screen, can significantly slow down startup time if many outputs are used SHOW_JOB_TARGETS = True #: How many seconds should be waited before assuming a job is finished after the finished file is written #: to allow network file system to sync up WAIT_PERIOD_JOB_FS_SYNC = 30 #: How often should the manager check for finished jobs WAIT_PERIOD_BETWEEN_CHECKS = 30 #: Safety period to wait for actionable jobs to change status before running action WAIT_PERIOD_CACHE = 20 #: How many seconds should be waited before retrying a ssh connection WAIT_PERIOD_SSH_TIMEOUT = 15 #: How many seconds should be waited before retrying to parse a failed qstat output WAIT_PERIOD_QSTAT_PARSING = 15 #: How many seconds should be waited before retrying to bind to the desired port WAIT_PERIOD_HTTP_RETRY_BIND = 10 #: How many seconds should be waited before cleaning up a finished job WAIT_PERIOD_JOB_CLEANUP = 10 #: How many seconds should all inputs be available before starting a job to avoid file system synchronization problems WAIT_PERIOD_MTIME_OF_INPUTS = 60 #: set true to automatically clean jobs in error state and retry CLEAR_ERROR = False #: Print error messages of a job in the manager status field PRINT_ERROR = True #: Print detailed log of that many jobs in error state PRINT_ERROR_TASKS = 1 #: Print that many last lines of error state log file PRINT_ERROR_LINES = 40 #: Which command should be called to start sisyphus, can be used to replace the python binary SIS_COMMAND = [sys.executable, sys.argv[0]] # if this first argument is -m it's missing the module name if sys.argv[0] == '-m': SIS_COMMAND += ['sisyphus'] # Parameter to log used resources by each task #: Seconds between checks how much memory and cpu a process is using PLOGGING_INTERVAL = 5 #: Suppress messages about process resources usage PLOGGING_QUIET = False #: Minimal relative change between log entries of used resources PLOGGING_MIN_CHANGE = 0.1 #: In which interval the process used resources file should be updated PLOGGING_UPDATE_FILE_PERIOD = 60 #: How long the virtual file system should cache process states FILESYSTEM_CACHE_TIME = 30 #: Use ipython traceback USE_VERBOSE_TRACEBACK = True #: The verbose traceback type. "ipython" or "better_exchook" VERBOSE_TRACEBACK_TYPE = "ipython" #: Install signal handlers for debugging USE_SIGNAL_HANDLERS = False # Job setup options #: Automatically set all input given to __init__ as attributes of the created job. #: Disabled by default since it tends to confuse new users reading the code. AUTO_SET_JOB_INIT_ATTRIBUTES = False # Job environment #: Remove all environment variables to ensure the same environment between different users CLEANUP_ENVIRONMENT = True # only Trump would say no! #: Keep these environment variables if CLEANUP_ENVIRONMENT is set DEFAULT_ENVIRONMENT_KEEP = {'CUDA_VISIBLE_DEVICES', 'HOME', 'PWD', 'SGE_STDERR_PATH', 'SGE_TASK_ID', 'TMP', 'TMPDIR', 'USER'} #: Set these environment variables if CLEANUP_ENVIRONMENT is set DEFAULT_ENVIRONMENT_SET = {'LANG': 'en_US.UTF-8', 'MKL_NUM_THREADS': 1, 'OMP_NUM_THREADS': 1, 'PATH': ':'.join(['/rbi/sge/bin', '/rbi/sge/bin/lx-amd64', '/usr/local/sbin', '/usr/local/bin', '/usr/sbin', '/usr/bin', '/sbin', '/bin', '/usr/games', '/usr/local/games', '/snap/bin']), 'SHELL': '/bin/bash'} #: Directory used by tk.mktemp TMP_PREFIX = os.path.join(os.environ.get('TMPDIR', '/tmp'), 'sis_') # Visualization #: For http visualization, list job input as common input if it is share between more then X*(total jobs) jobs VIS_RELATIVE_MERGE_THRESHOLD = 0.25 #: For http visualization, list job input as common input if it is share between more then X jobs VIS_ABSOLUTE_MERGE_THRESHOLD = 5 SHOW_VIS_NAME_IN_MANAGER = True # Add stacktrace information with specified depth, 0 for deactivation, None or -1 for full stack JOB_ADD_STACKTRACE_WITH_DEPTH = 0 # Is enabled if tk.run is called SKIP_IS_FINISHED_TIMEOUT = False # Internal functions
[docs]def update_global_settings_from_text(text, filename): """ :param text: :param str filename: :return: nothing """ # create basically empty globals globals_ = { '__builtins__': globals()['__builtins__'], '__file__': filename, '__name__': filename, '__package__': None, '__doc__': None, } globals_keys = list(globals_.keys()) # it might be useful to change the default environment by modifying the existing set/dict # thus we need to add it to the globals, but not to globals_keys globals_['DEFAULT_ENVIRONMENT_KEEP'] = DEFAULT_ENVIRONMENT_KEEP globals_['DEFAULT_ENVIRONMENT_SET'] = DEFAULT_ENVIRONMENT_SET # compile is needed for a nice trace back exec(compile(text, filename, "exec"), globals_) for k, v in globals_.items(): if k not in globals_keys: globals()[k] = v if AUTO_SET_JOB_INIT_ATTRIBUTES: import logging logging.warning('AUTO_SET_JOB_INIT_ATTRIBUTES is deprecated, please set the attributes manually ' 'you might want to use self.set_attrs(locals())')
[docs]def update_global_settings_from_file(filename): """ :param str filename: :return: nothing """ # skip if settings file doesn't exist globals()['GLOBAL_SETTINGS_FILE'] = filename content = '' try: with open(filename, encoding='utf-8') as f: content = f.read() except IOError as e: if e.errno != 2: raise e globals()['GLOBAL_SETTINGS_FILE_CONTENT'] = content update_global_settings_from_text(content, filename)
GLOBAL_SETTINGS_COMMANDLINE = [] ENVIRONMENT_SETTINGS = {} ENVIRONMENT_SETTINGS_PREFIX = 'SIS_'
[docs]def update_global_settings_from_env(): """ :return: nothing """ from ast import literal_eval for k, v in os.environ.items(): if k.startswith(ENVIRONMENT_SETTINGS_PREFIX): ENVIRONMENT_SETTINGS[k] = v k = k[len(ENVIRONMENT_SETTINGS_PREFIX):] # Try to eval parameter, if not possible use as string try: v = literal_eval(v) except Exception: pass globals()[k] = v
# noinspection PyDefaultArgument
[docs]def cached_engine(cache=[]): """ :param list cache: :return: engine (EngineBase) """ # Returns a cached version, for internal usage if not cache: e = engine() from sisyphus.engine import EngineBase assert isinstance(e, EngineBase) cache.append(e) return e # for better type hinting return cache[0]
# Parameter used for debugging or profiling MEMORY_PROFILE_LOG = None USE_UI = True # Set to False to disable Warning of unset engine ENGINE_NOT_SETUP_WARNING = True update_global_settings_from_env() update_global_settings_from_file(GLOBAL_SETTINGS_FILE_DEFAULT)