Source code for sisyphus.global_settings

"""
These settings can be overwritten via a ``settings.py`` file in the current directory, when ``sis`` is run.
"""

# Author: Jan-Thorsten Peter <peter@cs.rwth-aachen.de>

import logging
import sys
from typing import Dict

import sisyphus.hash
from sisyphus.global_constants import *


[docs] def engine(): """ Create engine object used to submit jobs. The simplest setup just creates a local engine starting all jobs on the local machine e.g.:: from sisyphus.localengine import LocalEngine return LocalEngine(cpu=8) The usually recommended version is to use a local and a normal grid engine. The EngineSelector can be used to schedule tasks on different engines. The main intuition was to have an engine for very small jobs that don't required to be scheduled on a large grid engine (e.g. counting lines of file). A setup using the EngineSelector would look like this:: from sisyphus.localengine import LocalEngine from sisyphus.engine import EngineSelector from sisyphus.son_of_grid_engine import SonOfGridEngine return EngineSelector(engines={'short': LocalEngine(cpu=4), 'long': SonOfGridEngine(default_rqmt={'cpu': 1, 'mem': 1, 'gpu': 0, 'time': 1})}, default_engine='long') Note: the engines should only be imported locally inside the function to avoid circular imports :return: engine (EngineBase) """ import psutil cpu_count = psutil.cpu_count() if ENGINE_NOT_SETUP_WARNING: logging.info("No custom engine setup, using default engine: LocalEngine(cpu=%i, gpu=0)" % cpu_count) from sisyphus.localengine import LocalEngine return LocalEngine(cpu=cpu_count, gpu=0)
[docs] def worker_wrapper(job, task_name, call): """ All worker calls are passed through this function. This can be used, for example, to run the worker in a singularity environment: def worker_wrapper(job, task_name, call): return ['singularity_call'] + call """ return call
[docs] def update_engine_rqmt(last_rqmt: Dict, last_usage: Dict): """Update requirements after a job got interrupted, double limits if needed :param dict[str] last_rqmt: requirements that where requested for previous run of this task :param dict[str] last_usage: information about the used resources of previous run (mainly memory and time) :return: updated requirements :rtype: dict[str] """ out = last_rqmt.copy() # Did we run out of time? requested_time = last_rqmt.get("time") used_time = last_usage.get("used_time", 0) if requested_time and requested_time - used_time < 0.1: out["time"] = requested_time * 2 # Did it (nearly) break the memory limits? requested_memory = last_rqmt.get("mem") used_memory = last_usage.get("max", {}).get("rss", 0) if requested_memory and last_usage.get("out_of_memory") or requested_memory - used_memory < 0.25: out["mem"] = requested_memory * 2 return out
# noinspection PyUnusedLocal
[docs] def check_engine_limits(current_rqmt: Dict, task): """Check if requested requirements break and hardware limits and reduce them. By default ignored, a possible check for limits could look like this:: current_rqmt['time'] = min(168, current_rqmt.get('time', 2)) if current_rqmt['time'] > 24: current_rqmt['mem'] = min(63, current_rqmt['mem']) else: current_rqmt['mem'] = min(127, current_rqmt['mem']) return current_rqmt :param dict[str] current_rqmt: requirements currently requested :param sisyphus.task.Task task: task that is handled :return: requirements updated to engine limits :rtype: dict[str] """ return current_rqmt
[docs] def file_caching(path): """This function should be replaced to enable file caching. e.g. copy given file to /var/tmp and return new path. The default behaviour is to just pass on the given path :param str path: Path to file that should be cached :return: path to cached file :rtype: str """ logging.info("No file caching function set, simply keep given path: %s" % path) return path
# Experimental settings # Add tags attached to job to work path, currently not active maintained JOB_USE_TAGS_IN_PATH = False # Link all computed outputs to this directory for easy sharing in a team TEAM_SHARE_DIR = None # If set results will be linked to this directory #: Automatically clean up job directory after job has finished JOB_AUTO_CLEANUP = True #: How often to check for finished jobs in seconds JOB_CLEANER_INTERVAL = 60 #: How many threads should be cleaning in parallel JOB_CLEANER_WORKER = 5 #: If the job internal work directory should be keeped re deleted during clean up JOB_CLEANUP_KEEP_WORK = False #: Default value for job used by tk.cleaner to determine if a job should be removed or not JOB_DEFAULT_KEEP_VALUE = 50 #: How many threads should update the graph in parallel, useful if the filesystem has a high latency GRAPH_WORKER = 16 #: How many threads are used to setup the job directory and submit jobs MANAGER_SUBMIT_WORKER = 10 #: How many locks can be used by all jobs (one lock per job). If there are more jobs than locks, locks are reused #: This could lead to a slowdown, but the number of locks per process is limited JOB_MAX_NUMBER_OF_LOCKS = 100 #: How often sisyphus will try to resubmit a task to the engine before returning a RETRY_ERROR MAX_SUBMIT_RETRIES = 3 #: Default function to hash jobs and objects SIS_HASH = sisyphus.hash.short_hash #: List of paths searched for loading config and recipe files. The module name should be part of the path e.g.: #: adding 'config' will cause Sisyphus to the current directory for a folder named config to load modules starting #: with config, other python files in the current directory will be ignored. #: If the path ends with '/' everything inside it will be loaded, similar to adding it to PYTHONPATH. #: keep 'recipe' for legacy setups IMPORT_PATHS = ["config", "recipe", "recipe/"] #: The work directory WORK_DIR = "work" # Name default config file if no config directory is found CONFIG_FILE_DEFAULT = "config.py" #: Name of default function to call in config directory CONFIG_FUNCTION_DEFAULT = "%s.main" % CONFIG_PREFIX #: Name alias directory ALIAS_DIR = "alias" #: Name output directory OUTPUT_DIR = "output" #: If set to a non-empty string aliases and outputs will be placed in a subdir. #: This is useful for setups with multiple configs ALIAS_AND_OUTPUT_SUBDIR = "" #: Show job targets on status screen, can significantly slow down startup time if many outputs are used SHOW_JOB_TARGETS = True #: How many seconds should be waited before assuming a job is finished after the finished file is written #: to allow network file system to sync up WAIT_PERIOD_JOB_FS_SYNC = 30 #: How often should the manager check for finished jobs WAIT_PERIOD_BETWEEN_CHECKS = 30 #: Safety period to wait for actionable jobs to change status before running action WAIT_PERIOD_CACHE = 20 #: How many seconds should be waited before retrying a ssh connection WAIT_PERIOD_SSH_TIMEOUT = 15 #: How many seconds should be waited before retrying to parse a failed qstat output WAIT_PERIOD_QSTAT_PARSING = 15 #: How many seconds should be waited before retrying to bind to the desired port WAIT_PERIOD_HTTP_RETRY_BIND = 10 #: How many seconds should be waited before cleaning up a finished job WAIT_PERIOD_JOB_CLEANUP = 10 #: How many seconds should all inputs be available before starting a job to avoid file system synchronization problems WAIT_PERIOD_MTIME_OF_INPUTS = 60 #: How long to wait for all inputs to be available in Task.run (https://github.com/rwth-i6/sisyphus/issues/159) WAIT_PERIOD_FOR_INPUTS_AVAILABLE = 60 #: Fail when not all inputs are available after WAIT_PERIOD_FOR_INPUTS_AVAILABLE TASK_INPUTS_MUST_BE_AVAILABLE = True #: set true to automatically clean jobs in error state and retry CLEAR_ERROR = False #: Print error messages of a job in the manager status field PRINT_ERROR = True #: Print detailed log of that many jobs in error state PRINT_ERROR_TASKS = 1 #: Print that many last lines of error state log file PRINT_ERROR_LINES = 40 #: Print message for held jobs PRINT_HOLD = True #: Log for finished outputs. FINISHED_LOG = "log/finished.log" #: Which command should be called to start sisyphus, can be used to replace the python binary SIS_COMMAND = [sys.executable, sys.argv[0]] # if this first argument is -m it's missing the module name if sys.argv[0] == "-m": SIS_COMMAND += ["sisyphus"] # Parameter to log used resources by each task #: Seconds between checks how much memory and cpu a process is using PLOGGING_INTERVAL = 5 #: Suppress messages about process resources usage PLOGGING_QUIET = False #: Minimal relative change between log entries of used resources PLOGGING_MIN_CHANGE = 0.1 #: In which interval the process used resources file should be updated PLOGGING_UPDATE_FILE_PERIOD = 60 #: How long the virtual file system should cache process states FILESYSTEM_CACHE_TIME = 30 #: Use ipython traceback USE_VERBOSE_TRACEBACK = True #: The verbose traceback type. "ipython" or "better_exchook" VERBOSE_TRACEBACK_TYPE = "ipython" #: Install signal handlers for debugging USE_SIGNAL_HANDLERS = False # Job setup options #: Automatically set all input given to __init__ as attributes of the created job. #: Disabled by default since it tends to confuse new users reading the code. AUTO_SET_JOB_INIT_ATTRIBUTES = False # Job environment #: Remove all environment variables to ensure the same environment between different users CLEANUP_ENVIRONMENT = True # only Trump would say no! #: Keep these environment variables if CLEANUP_ENVIRONMENT is set DEFAULT_ENVIRONMENT_KEEP = { "CUDA_VISIBLE_DEVICES", "HOME", "PWD", "SGE_STDERR_PATH", "SGE_TASK_ID", "TMP", "TMPDIR", "USER", } #: Set these environment variables if CLEANUP_ENVIRONMENT is set DEFAULT_ENVIRONMENT_SET = { "LANG": "en_US.UTF-8", "MKL_NUM_THREADS": 1, "OMP_NUM_THREADS": 1, "PATH": ":".join( [ "/rbi/sge/bin", "/rbi/sge/bin/lx-amd64", "/usr/local/sbin", "/usr/local/bin", "/usr/sbin", "/usr/bin", "/sbin", "/bin", "/usr/games", "/usr/local/games", "/snap/bin", ] ), "SHELL": "/bin/bash", } #: Directory used by tk.mktemp TMP_PREFIX = os.path.join(os.environ.get("TMPDIR", "/tmp"), "sis_") # Visualization #: For http visualization, list job input as common input if it is share between more then X*(total jobs) jobs VIS_RELATIVE_MERGE_THRESHOLD = 0.25 #: For http visualization, list job input as common input if it is share between more then X jobs VIS_ABSOLUTE_MERGE_THRESHOLD = 5 #: For http visualization, time out to create visual representation VIS_TIMEOUT = 5 #: For http visualization, maximum number of nodes to show per view VIS_MAX_NODES_PER_VIEW = 500 SHOW_VIS_NAME_IN_MANAGER = True # Add stacktrace information with specified depth, 0 for deactivation, None or -1 for full stack JOB_ADD_STACKTRACE_WITH_DEPTH = 0 # Is enabled if tk.run is called SKIP_IS_FINISHED_TIMEOUT = False # Caching #: If enabled the results of finished jobs are cached in an extra file to reduce the file system access CACHE_FINISHED_RESULTS = False #: Path used for CACHE_FINISHED_RESULTS CACHE_FINISHED_RESULTS_PATH = "finished_results_cache.pkl" #: Only cache results smaller than this in central file (in bytes) CACHE_FINISHED_RESULTS_MAX_SIZE = 1024 # Warnings #: Warn if a config file is loaded without calling a function WARNING_NO_FUNCTION_CALLED = True #: Warn if an absolute path inside the current directory is created WARNING_ABSPATH = True # Prohibit resolving paths in graph construction DELAYED_CHECK_FOR_WORKER = False #: Changes repr conversions of Path to contain only the path instead of <Path /actual/path>. LEGACY_PATH_CONVERSION = False #: Changes str and repr conversions of Variable to contain only the variable content if available. #: Enabling it can cause bugs which are hard to find. LEGACY_VARIABLE_CONVERSION = False #: Raise an exception if a Variable is accessed which is not set yet and has no backup value RAISE_VARIABLE_NOT_SET_EXCEPTION = True # Parameter used for debugging or profiling MEMORY_PROFILE_LOG = None USE_UI = True # Set to False to disable Warning of unset engine ENGINE_NOT_SETUP_WARNING = True # Internal functions ENVIRONMENT_SETTINGS = {} ENVIRONMENT_SETTINGS_PREFIX = "SIS_" #: Stores content of all given settings file allowing to log and recreate them if necessary GLOBAL_SETTINGS_FILE_CONTENT = ""
[docs] def update_global_settings_from_file(filename): """Loads setting file and updates state of global_settings with its content :param str filename: :return: nothing """ global GLOBAL_SETTINGS_FILE_CONTENT try: with open(filename, encoding="utf-8") as f: content = f.read() except FileNotFoundError as e: import logging logging.warning(f"Settings file '{filename}' does not exist, ignoring it ({e}).") else: exec(compile(content, filename, "exec"), globals()) GLOBAL_SETTINGS_FILE_CONTENT += f"##### Settings file: {filename} #####\n{content}\n"
[docs] def update_global_settings_from_env(): """Updates global_settings from environment variables :return: nothing """ from ast import literal_eval global GLOBAL_SETTINGS_FILE_CONTENT content = [] for k, v in os.environ.items(): if k.startswith(ENVIRONMENT_SETTINGS_PREFIX): ENVIRONMENT_SETTINGS[k] = v k = k[len(ENVIRONMENT_SETTINGS_PREFIX) :] # Try to eval parameter, if not possible use as string try: v = literal_eval(v) except Exception: pass if k == "IMPORT_PATHS" and isinstance(v, str): v = v.split(":") globals()[k] = v content.append(f"{k} = {repr(v)}\n") if content: GLOBAL_SETTINGS_FILE_CONTENT += "##### Settings from environment #####\n" + "".join(content)
GLOBAL_SETTINGS_FILE = os.environ.get( ENVIRONMENT_SETTINGS_PREFIX + "GLOBAL_SETTINGS_FILE", GLOBAL_SETTINGS_FILE_DEFAULT ) for settings_file in GLOBAL_SETTINGS_FILE.split(":"): if settings_file: update_global_settings_from_file(settings_file) update_global_settings_from_env() if AUTO_SET_JOB_INIT_ATTRIBUTES: logging.warning( "AUTO_SET_JOB_INIT_ATTRIBUTES is deprecated, please set the attributes manually " "you might want to use self.set_attrs(locals())" )