API

Toolkit

This module contains helper methods used in the console or in a Job. Use tk.<name of function>? for more help.

Useful examples:

# Find job or path:
j = tk.sis_graph.find('LineSplitter')
# Find only job:
j = tk.sis_graph.find('LineSplitter', mode='job')
# Find only path:
j = tk.sis_graph.find('LineSplitter', mode='path')

# Rerun tasks depending on a given file/job:
tk.remove_job_and_descendants(tk.sis_graph.find('hitchhiker'))

# Setup job:
tk.setup_job_directory(j)
# run job:
tk.run_job(j)

# Reload start up config:
tk.reload_config(config_files)
# Reload config file:
tk.reload_config('path/to/config/file/or/directory')
# Reload all recipe files
reload_recipes()
# Load job from job directory:
tk.load_job('path/to/job/direcotry')

# Import jobs from other work directory
tk.import_work_directory(['path/to/other/work'], mode='copy')

# Print short job summary
tk.job_info(j)

# List functions to cleanup work directory:
tk.cleaner?
exception sisyphus.toolkit.BlockedWorkflow[source]
class sisyphus.toolkit.Object[source]

Simple helper class to create Objects without adding code

class sisyphus.toolkit.RelPath(origin, hash_overwrite=None)[source]

Creates an object that will create a Path object relative to the current module if called

sisyphus.toolkit.bundle_to_str(bundle)[source]

Convert bundle of objects into a space separated list

sisyphus.toolkit.cached_engine(cache=[])[source]

Returns a cached version, for internal usage

sisyphus.toolkit.compare_graph(obj1, obj2, traceback=None, visited=None)[source]

Compares two objects and shows traceback to first found difference

Parameters:
  • (Job/Path) (obj2) – Object1 to compare

  • (Job/Path) – Object2 which is compared to Object1

  • traceback – Used for recursion, leave blank

  • visited – Used for recursion, leave blank

Returns:

traceback

sisyphus.toolkit.dump(obj: Any, filename: str)[source]

Dumps object using pickle in zipped file, creates directory if needed

Parameters:
  • obj – object to pickle

  • filename (str) – path to pickled file

sisyphus.toolkit.export_graph(output_file: str | None = None)[source]

Needs more testing

Parameters:

output_file

Returns:

sisyphus.toolkit.import_work_directory(directories: str | List[str], mode='dryrun', use_alias=False)[source]

Link or copy finished jobs from other work directories.

Parameters:
  • directories (str) – Path to other work directories

  • mode (str) – How to import job directories. Options: (copy, symlink, hardlink, dryrun)

sisyphus.toolkit.input_path(path: Path | str) Path[source]

Ensures a given input is a Path. Strings are automatically converted into Path objects

Parameters:

path – path that should be checked

Returns:

Path object

sisyphus.toolkit.job_info(job: Job)[source]

Prints information about given job to stdout

Parameters:

job(Job)

sisyphus.toolkit.load_file(path: str) Any[source]

Load object from pickled file, works with zipped and unzipped files

Parameters:

path (str) – path to pickled file

Returns:

unpickled object

sisyphus.toolkit.load_job(path: str) Job[source]

Load job from job directory even if it is already cleaned up

Parameters:

path(str) – path to job directory

Return (Job):

sisyphus.toolkit.migrate_graph(input_file=None, work_source=None, mode='dryrun')[source]

migrate the graph from the provided graph file to the current graph

Parameters:
  • input_file (str) – path to the graph file

  • work_source (str|None) – path to the work folder, if None use the local work folder

  • mode (str) – dryrun, link, copy, move, move_and_link, hardlink_or_copy, hardlink_or_link, the default is dryrun

Returns:

class sisyphus.toolkit.mktemp[source]

Object to be used by the with statement. creates temporary file that will be delete at exit. Can be used like this:

with mktemp() as temp:
    #do stuff with temp
    f = open(temp, 'w')
    f.write('foo')

    f = open(temp, 'r')
    foo = f.read()
# temp file is deleted
sisyphus.toolkit.register_output(name, value, export_graph=False)[source]
Parameters:
  • name (str)

  • value (Path)

  • export_graph (bool)

sisyphus.toolkit.reload_module(module)[source]

Shortcut to reload module, keep sis_graph if toolkit is reloaded

Parameters:

module – Module to reload

Returns:

sisyphus.toolkit.reload_recipes()[source]

Reload all recipes

sisyphus.toolkit.remove_job_and_descendants(jobs: str | AbstractPath | Job | List[str | AbstractPath | Job], mode: str = 'remove') bool[source]

Remove all jobs that depend on the given jobs/paths.

Parameters:
  • jobs (List[Job|Path]) – They and all jobs depended on them should be removed

  • mode (string) – run mode (remove, move, dryrun)

sisyphus.toolkit.replace_graph_objects(current, mapping=None, replace_function=None)[source]

This function takes a given graph and creates a new graph where every object listed in mapping is replaced.

current: current graph mapping: [(old_object, new_object), ….] replace_function: how an object will be replace, defaults using the mapping

returns: New graph

sisyphus.toolkit.run(obj: Any, quiet: bool = False)[source]

Run and setup all jobs that are contained inside object and all jobs that are necessary.

Parameters:
  • obj

  • quiet – Do not forward job output do stdout

Returns:

sisyphus.toolkit.run_job(job: Job, task_name: str | None = None, task_id: int = 1, force_resume: bool = False)[source]

Run job directly in console window.

Parameters:
  • job (Job) – Job with tasks to run

  • task_name (str) – which task should run, default: The first listed task

  • task_id (int) – which task_id should be used, default: 1

  • force_resume (bool) – Force resume of job in error state

sisyphus.toolkit.running_in_worker()[source]

Returns rue if this code is run inside the worker

sisyphus.toolkit.setup_job_directory(job: Job)[source]

Setup the work directory of the given job.

Parameters:

job (Job|Path) – Job which needs work directory

sisyphus.toolkit.setup_path(package: str) RelPath[source]

Should be called like `rel_path = setup_path(__package__)` which setups RelPath to create Path objects relative to the current module.

Parameters:

package (str)

Return type:

RelPath

sisyphus.toolkit.setup_script_mode()[source]

Use this function if you start sisyphus from an recipe file, it will:

  1. setup logging level and prompt

  2. disable the wait periods

  3. disable unwanted warning

You can run recipes directly by running something similar to this:

export SIS_RECIPE_PATH=/PATH/TO/RECIPE/DIR
# If sisyphus is not installed in your python path
export PYTHONPATH=/PATH/TO/SISYPHUS:$PYTHONPATH
# If you want to change the work directory:
export SIS_WORK_DIR=/PATH/TO/WORK/DIR
python3 $SIS_RECIPE_PATH/recipe/path_to_file script parameters

An example for the recipe:

import os
import argparse
from sisyphus import *
from recipe.eval import bleu

if __name__ == '__main__':
    tk.setup_script_mode()

    parser = argparse.ArgumentParser(description='Evaluate hypothesis')
    parser.add_argument('--hyp', help='hypothesis', required=True)
    parser.add_argument('--ref', help='reference', required=True)

    args = parser.parse_args()
    hyp = os.path.realpath(args.hyp)
    ref = os.path.realpath(args.ref)

    score = bleu(hyp, ref)

    tk.run(score, quiet=True)
    print(score.out)
sisyphus.toolkit.show_jobs_in_webserver(port: int, jobs: List[Job])[source]

Start web server on given port which displays given loaded jobs

sisyphus.toolkit.start_manager(job_engine=None, start_computations=False)[source]

Shortcut to start Manager

Parameters:
  • job_engine – Use this job engine, init own job engine if set to None

  • start_computations – Submit jobs directly

Returns:

Manager

sisyphus.toolkit.uncached_path(path)[source]
Parameters:

path (Path|str)

Return type:

str

sisyphus.toolkit.zipped(filename: Path | str) bool[source]

Check if given file is zipped

Parameters:

(Path/str) (filename) – File to be checked

Return (bool):

True if input file is zipped

Graph

class sisyphus.graph.SISGraph[source]

This graph contains all targets that needs to be calculated and through there dependencies all required jobs. These jobs can be searched and modified using the provided functions. Most interesting functions are:

# Lists all jobs
jobs()
# Find jobs by matching substring
find(pattern)
# Execute function for all nodes
for_all_nodes(f)
# Dictionaries with jobs sorted by current status:
get_jobs_by_status()
add_target(target)[source]
Parameters:

target (OutputTarget)

find(pattern, mode='all')[source]

Returns a list with all jobs and paths that partly match the pattern

Parameters:
  • pattern(str) – Pattern to match

  • mode(str) – Select if jobs, paths or both should be returned. Possible values: all, path, job

Return ([Job/Path, …]):

List with all matching jobs/paths

for_all_nodes(f, nodes=None, bottom_up=False)[source]

Run function f for each node and ancestor for nodes from top down, stop expanding tree branch if functions returns False. Does not stop on None to allow functions with no return value to run for every node.

Parameters:
  • f ((Job)->bool) – function will be executed for all nodes

  • nodes – all nodes that will be checked, defaults to all output nodes in graph

  • bottom_up (bool) – start with deepest nodes first, ignore return value of f

Returns:

set with all visited nodes

get_job_from_path(path)[source]

The reverse function for get_path_to_all_nodes

get_jobs_by_status(nodes: List | None = None, engine: Optional = None, skip_finished: bool = False) DefaultDict[str, List[Job]][source]

Return all jobs needed to finish output in dictionary with current status as key

Parameters:
  • nodes – all nodes that will be checked, defaults to all output nodes in graph

  • engine (sisyphus.engine.EngineBase) – Use status job status of engine, ignore engine status if set to None (default: None)

  • skip_finished (bool) – Stop checking subtrees of finished nodes to save time

Return ({status1: [Job, …], status2: …}):

Dictionary with all jobs sorted by current state

Return type:

dict[str,list[Job]]

jobs()[source]
Return ([Job, …]):

List with all jobs in grpah

jobs_sorted()[source]

Yields jobs in a order so that for each jop all jobs it depends on are already finished

Return (generator Node):

jobs sorted by dependency

property output

Deprecated: used for backwards comparability, only supports path outputs

set_job_targets(engine=None)[source]

Add a target to all jobs (if possible) to have a more informative output

property targets_dict
Returns:

dict name -> target

Return type:

dict[str,OutputTarget]

update_nodes()[source]

Update all nodes to get the most current dependency graph

Job

class sisyphus.job.Job(*args, **kwargs)[source]

Object to hold the job descriptions.

You derive your own job classes from this base class.

All the arguments of __init__ will be taken into account for the hash. In your derived class, you need to overwrite the tasks method.

add_input(path)[source]
Parameters:

path (AbstractPath)

Returns:

path

Return type:

AbstractPath

classmethod hash(parsed_args)[source]
Parameters:

parsed_args (dict[str])

Returns:

hash for job given the arguments

Return type:

str

hold()[source]

A job set to hold will not be started, but all required jobs will be run. :return:

info()[source]

Returns information about the currently running job to be displayed on the web interface and the manager view :return: string to be displayed or None if not available :rtype: str

job_id()[source]

Returns a unique string to identify this job

keep_value(value=None)[source]

Return keep_value, if value is given also set keep value

output_path(filename, directory=False, cached=False)[source]

Adds output path, if directory is True a directory will will be created automatically.

Parameters:
  • filename (str)

  • directory (bool)

  • cached (bool)

Return type:

Path

output_var(filename, pickle=False, backup=None)[source]

Adds output path which contains a python object, if directory is True a directory will will be created automatically

path_available(path)[source]

Returns True if given path is available yet

Parameters:

path – path to check

Returns:

set_attrs(attrs)[source]

Adds all attrs to self, used in constructor e.gl: self.set_attrs(locals())

set_default(name, value)[source]

Deprecated helper function, will be removed in the future. Don’t use it!

set_env(key: str, value: str, *, verbatim: bool = True)[source]

Set environment variable. This environment var will be set at job startup in the worker.

Parameters:
  • key – variable name

  • value

  • verbatim – True: set it as-is; False: use string.Template(value).substitute(orig_env)

set_keep_value(value)[source]

Set keep value and return self

set_rqmt(task_name, rqmt)[source]

Overwrites the given requirements for this job

Parameters:
  • task_name (str) – Which task will be affected

  • rqmt – the new requirements

Returns:

sh(command, *args, **kwargs)[source]

Calls a external shell and replaces {args} with job inputs, outputs, args and executes the command

tasks() Iterator[Task][source]
Returns:

yields Task’s

Return type:

list[sisyphus.task.Task]

update()[source]

Run after all inputs are computed, allowing the job to analyse the given input and ask for additional inputs before running.

update_rqmt(task_name, rqmt)[source]

Updates the given requirements for this job, values not set in rqmt will not be affected.

Parameters:
  • task_name (str) – Which task will be affected

  • rqmt – the new requirements

Returns:

Task

class sisyphus.task.Task(start, resume=None, rqmt=None, args=None, mini_task=False, update_rqmt=None, parallel=0, tries=1, continuable=False)[source]

Object to hold information what function should be run with which requirements.

Settings

These settings can be overwritten via a settings.py file in the current directory, when sis is run.

sisyphus.global_settings.ALIAS_AND_OUTPUT_SUBDIR = ''

If set to a non-empty string aliases and outputs will be placed in a subdir. This is useful for setups with multiple configs

sisyphus.global_settings.ALIAS_DIR = 'alias'

Name alias directory

sisyphus.global_settings.AUTO_SET_JOB_INIT_ATTRIBUTES = False

Automatically set all input given to __init__ as attributes of the created job. Disabled by default since it tends to confuse new users reading the code.

sisyphus.global_settings.CACHE_FINISHED_RESULTS = False

If enabled the results of finished jobs are cached in an extra file to reduce the file system access

sisyphus.global_settings.CACHE_FINISHED_RESULTS_MAX_SIZE = 1024

Only cache results smaller than this in central file (in bytes)

sisyphus.global_settings.CACHE_FINISHED_RESULTS_PATH = 'finished_results_cache.pkl'

Path used for CACHE_FINISHED_RESULTS

sisyphus.global_settings.CLEANUP_ENVIRONMENT = True

Remove all environment variables to ensure the same environment between different users

sisyphus.global_settings.CLEAR_ERROR = False

set true to automatically clean jobs in error state and retry

sisyphus.global_settings.CONFIG_FUNCTION_DEFAULT = 'config.main'

Name of default function to call in config directory

sisyphus.global_settings.DEFAULT_ENVIRONMENT_KEEP = {'CUDA_VISIBLE_DEVICES', 'HOME', 'PWD', 'SGE_STDERR_PATH', 'SGE_TASK_ID', 'TMP', 'TMPDIR', 'USER'}

Keep these environment variables if CLEANUP_ENVIRONMENT is set

sisyphus.global_settings.DEFAULT_ENVIRONMENT_SET = {'LANG': 'en_US.UTF-8', 'MKL_NUM_THREADS': 1, 'OMP_NUM_THREADS': 1, 'PATH': '/rbi/sge/bin:/rbi/sge/bin/lx-amd64:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin', 'SHELL': '/bin/bash'}

Set these environment variables if CLEANUP_ENVIRONMENT is set

sisyphus.global_settings.FILESYSTEM_CACHE_TIME = 30

How long the virtual file system should cache process states

sisyphus.global_settings.FINISHED_LOG = 'log/finished.log'

Log for finished outputs.

sisyphus.global_settings.GLOBAL_SETTINGS_FILE_CONTENT = ''

Stores content of all given settings file allowing to log and recreate them if necessary

sisyphus.global_settings.GRAPH_WORKER = 16

How many threads should update the graph in parallel, useful if the filesystem has a high latency

sisyphus.global_settings.IMPORT_PATHS = ['config', 'recipe', 'recipe/']

List of paths searched for loading config and recipe files. The module name should be part of the path e.g.: adding ‘config’ will cause Sisyphus to the current directory for a folder named config to load modules starting with config, other python files in the current directory will be ignored. If the path ends with ‘/’ everything inside it will be loaded, similar to adding it to PYTHONPATH. keep ‘recipe’ for legacy setups

sisyphus.global_settings.JOB_AUTO_CLEANUP = True

Automatically clean up job directory after job has finished

sisyphus.global_settings.JOB_CLEANER_INTERVAL = 60

How often to check for finished jobs in seconds

sisyphus.global_settings.JOB_CLEANER_WORKER = 5

How many threads should be cleaning in parallel

sisyphus.global_settings.JOB_CLEANUP_KEEP_WORK = False

If the job internal work directory should be keeped re deleted during clean up

sisyphus.global_settings.JOB_DEFAULT_KEEP_VALUE = 50

Default value for job used by tk.cleaner to determine if a job should be removed or not

sisyphus.global_settings.JOB_MAX_NUMBER_OF_LOCKS = 100

How many locks can be used by all jobs (one lock per job). If there are more jobs than locks, locks are reused This could lead to a slowdown, but the number of locks per process is limited

sisyphus.global_settings.LEGACY_PATH_CONVERSION = False

Changes repr conversions of Path to contain only the path instead of <Path /actual/path>.

sisyphus.global_settings.LEGACY_VARIABLE_CONVERSION = False

Changes str and repr conversions of Variable to contain only the variable content if available. Enabling it can cause bugs which are hard to find.

sisyphus.global_settings.MANAGER_SUBMIT_WORKER = 10

How many threads are used to setup the job directory and submit jobs

sisyphus.global_settings.MAX_SUBMIT_RETRIES = 3

How often sisyphus will try to resubmit a task to the engine before returning a RETRY_ERROR

sisyphus.global_settings.OUTPUT_DIR = 'output'

Name output directory

sisyphus.global_settings.PLOGGING_INTERVAL = 5

Seconds between checks how much memory and cpu a process is using

sisyphus.global_settings.PLOGGING_MIN_CHANGE = 0.1

Minimal relative change between log entries of used resources

sisyphus.global_settings.PLOGGING_QUIET = False

Suppress messages about process resources usage

sisyphus.global_settings.PLOGGING_UPDATE_FILE_PERIOD = 60

In which interval the process used resources file should be updated

sisyphus.global_settings.PRINT_ERROR = True

Print error messages of a job in the manager status field

sisyphus.global_settings.PRINT_ERROR_LINES = 40

Print that many last lines of error state log file

sisyphus.global_settings.PRINT_ERROR_TASKS = 1

Print detailed log of that many jobs in error state

sisyphus.global_settings.PRINT_HOLD = True

Print message for held jobs

sisyphus.global_settings.RAISE_VARIABLE_NOT_SET_EXCEPTION = True

Raise an exception if a Variable is accessed which is not set yet and has no backup value

sisyphus.global_settings.SHOW_JOB_TARGETS = True

Show job targets on status screen, can significantly slow down startup time if many outputs are used

sisyphus.global_settings.SIS_COMMAND = ['/home/docs/checkouts/readthedocs.org/user_builds/sisyphus-workflow-manager/envs/latest/bin/python', '/home/docs/checkouts/readthedocs.org/user_builds/sisyphus-workflow-manager/envs/latest/lib/python3.10/site-packages/sphinx/__main__.py']

Which command should be called to start sisyphus, can be used to replace the python binary

sisyphus.global_settings.SIS_HASH(obj, length=12, chars='0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz')

Default function to hash jobs and objects

sisyphus.global_settings.TASK_INPUTS_MUST_BE_AVAILABLE = True

Fail when not all inputs are available after WAIT_PERIOD_FOR_INPUTS_AVAILABLE

sisyphus.global_settings.TMP_PREFIX = '/tmp/sis_'

Directory used by tk.mktemp

sisyphus.global_settings.USE_SIGNAL_HANDLERS = False

Install signal handlers for debugging

sisyphus.global_settings.USE_VERBOSE_TRACEBACK = True

Use ipython traceback

sisyphus.global_settings.VERBOSE_TRACEBACK_TYPE = 'ipython'

The verbose traceback type. “ipython” or “better_exchook”

sisyphus.global_settings.VIS_ABSOLUTE_MERGE_THRESHOLD = 5

For http visualization, list job input as common input if it is share between more then X jobs

sisyphus.global_settings.VIS_MAX_NODES_PER_VIEW = 500

For http visualization, maximum number of nodes to show per view

sisyphus.global_settings.VIS_RELATIVE_MERGE_THRESHOLD = 0.25

For http visualization, list job input as common input if it is share between more then X*(total jobs) jobs

sisyphus.global_settings.VIS_TIMEOUT = 5

For http visualization, time out to create visual representation

sisyphus.global_settings.WAIT_PERIOD_BETWEEN_CHECKS = 30

How often should the manager check for finished jobs

sisyphus.global_settings.WAIT_PERIOD_CACHE = 20

Safety period to wait for actionable jobs to change status before running action

sisyphus.global_settings.WAIT_PERIOD_FOR_INPUTS_AVAILABLE = 60

How long to wait for all inputs to be available in Task.run (https://github.com/rwth-i6/sisyphus/issues/159)

sisyphus.global_settings.WAIT_PERIOD_HTTP_RETRY_BIND = 10

How many seconds should be waited before retrying to bind to the desired port

sisyphus.global_settings.WAIT_PERIOD_JOB_CLEANUP = 10

How many seconds should be waited before cleaning up a finished job

sisyphus.global_settings.WAIT_PERIOD_JOB_FS_SYNC = 30

How many seconds should be waited before assuming a job is finished after the finished file is written to allow network file system to sync up

sisyphus.global_settings.WAIT_PERIOD_MTIME_OF_INPUTS = 60

How many seconds should all inputs be available before starting a job to avoid file system synchronization problems

sisyphus.global_settings.WAIT_PERIOD_QSTAT_PARSING = 15

How many seconds should be waited before retrying to parse a failed qstat output

sisyphus.global_settings.WAIT_PERIOD_SSH_TIMEOUT = 15

How many seconds should be waited before retrying a ssh connection

sisyphus.global_settings.WARNING_ABSPATH = True

Warn if an absolute path inside the current directory is created

sisyphus.global_settings.WARNING_NO_FUNCTION_CALLED = True

Warn if a config file is loaded without calling a function

sisyphus.global_settings.WORK_DIR = 'work'

The work directory

sisyphus.global_settings.check_engine_limits(current_rqmt: Dict, task)[source]

Check if requested requirements break and hardware limits and reduce them. By default ignored, a possible check for limits could look like this:

current_rqmt['time'] = min(168, current_rqmt.get('time', 2))
if current_rqmt['time'] > 24:
    current_rqmt['mem'] = min(63, current_rqmt['mem'])
else:
    current_rqmt['mem'] = min(127, current_rqmt['mem'])
return current_rqmt
Parameters:
  • current_rqmt (dict[str]) – requirements currently requested

  • task (sisyphus.task.Task) – task that is handled

Returns:

requirements updated to engine limits

Return type:

dict[str]

sisyphus.global_settings.engine()[source]

Create engine object used to submit jobs. The simplest setup just creates a local engine starting all jobs on the local machine e.g.:

from sisyphus.localengine import LocalEngine
return LocalEngine(cpu=8)

The usually recommended version is to use a local and a normal grid engine. The EngineSelector can be used to schedule tasks on different engines. The main intuition was to have an engine for very small jobs that don’t required to be scheduled on a large grid engine (e.g. counting lines of file). A setup using the EngineSelector would look like this:

from sisyphus.localengine import LocalEngine
from sisyphus.engine import EngineSelector
from sisyphus.son_of_grid_engine import SonOfGridEngine
return EngineSelector(engines={'short': LocalEngine(cpu=4),
                               'long': SonOfGridEngine(default_rqmt={'cpu': 1, 'mem': 1,
                                                                     'gpu': 0, 'time': 1})},
                      default_engine='long')

Note: the engines should only be imported locally inside the function to avoid circular imports

Returns:

engine (EngineBase)

sisyphus.global_settings.file_caching(path)[source]

This function should be replaced to enable file caching. e.g. copy given file to /var/tmp and return new path. The default behaviour is to just pass on the given path

Parameters:

path (str) – Path to file that should be cached

Returns:

path to cached file

Return type:

str

sisyphus.global_settings.update_engine_rqmt(last_rqmt: Dict, last_usage: Dict)[source]

Update requirements after a job got interrupted, double limits if needed

Parameters:
  • last_rqmt (dict[str]) – requirements that where requested for previous run of this task

  • last_usage (dict[str]) – information about the used resources of previous run (mainly memory and time)

Returns:

updated requirements

Return type:

dict[str]

sisyphus.global_settings.update_global_settings_from_env()[source]

Updates global_settings from environment variables :return: nothing

sisyphus.global_settings.update_global_settings_from_file(filename)[source]

Loads setting file and updates state of global_settings with its content

Parameters:

filename (str)

Returns:

nothing

sisyphus.global_settings.worker_wrapper(job, task_name, call)[source]

All worker calls are passed through this function. This can be used, for example, to run the worker in a singularity environment:

def worker_wrapper(job, task_name, call):

return [‘singularity_call’] + call

Engines

class sisyphus.engine.EngineBase[source]

An engine manages the execution of jobs, e.g. locally, or in a queuing system like SGE.

get_job_used_resources(current_process)[source]

Should be overwritten by subclass if a better way to measure the used resources is available, e.g. cgroups. This function should only be used by the worker.

Parameters:
  • current_process (psutil.Process)

  • engine_selector

get_rqmt(task, task_id, update=True)[source]

Get the requirements submitted for this task

Parameters:
static get_task_id(task_id)[source]

Gets task id either from args or the environment

init_worker(task)[source]

This method will be call before the task is started by the worker. e.g. SGE uses this method to link the SGE log file to the desired position.

Parameters:

task

Returns:

job_state(job)[source]

Return current state of job

submit(task)[source]

Prepares all relevant commands and calls submit_call of subclass to actual pass job to relevant engine

Parameters:

task (sisyphus.task.Task) – Task to submit

Returns:

None

submit_call(call, logpath, rqmt, name, task_name, task_ids)[source]
Parameters:
  • call (list[str])

  • logpath (str)

  • rqmt (dict[str])

  • name (str)

  • task_name (str)

  • task_ids (list[int])

Returns:

ENGINE_NAME, submitted (list of (list of task ids, job id))

Return type:

(str, list[(list[int],str|int)])

class sisyphus.engine.EngineSelector(engines, default_engine)[source]

The EngineSelector engine wraps multiple other engines and schedules the jobs according to the requirements (rqmt).

Tasks with mini_task=True will use the engine selector ‘short’, so usually that engine should be specified as well.

for_all_engines(f)[source]

Tell all engines to stop

get_job_used_resources(current_process)[source]

Should be overwritten by subclass if a better way to measure the used resources is available, e.g. cgroups. This function should only be used by the worker.

Parameters:
  • current_process (psutil.Process)

  • engine_selector

get_used_engine(engine_selector)[source]
Parameters:

engine_selector (str) – name in self.engines

Return type:

EngineBase

get_used_engine_by_rqmt(rqmt)[source]
Parameters:

rqmt (dict[str])

Return type:

EngineBase

stop_engine()[source]

Tell all engines to stop

submit_call(call, logpath, rqmt, name, task_name, task_ids)[source]
Parameters:
  • call (list[str])

  • logpath (str)

  • rqmt (dict[str])

  • name (str)

  • task_name (str)

  • task_ids (list[int])

Returns:

ENGINE_NAME, submitted (list of (list of task ids, job id))

Return type:

(str, list[(list[int],str|int)])

task_state(task, task_id)[source]

Return state of task

class sisyphus.localengine.LocalEngine(cpus=1, gpus=0, available_gpus='', **kwargs)[source]

Simple engine to execute running tasks locally. CPU and GPU are always checked, all other requirements only if given during initialisation.

get_task_id(task_id)[source]

Gets task id either from args or the environment

run(**kwargs)

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

start_task(task, selected_gpus)[source]
Parameters:
Return type:

psutil.Process

submit_call(call, logpath, rqmt, name, task_name, task_ids)[source]
Parameters:
  • call (list[str])

  • logpath (str)

  • rqmt (dict[str])

  • name (str)

  • task_name (str)

  • task_ids (list[int])

Returns:

ENGINE_NAME, submitted (list of (list of task ids, job id))

Return type:

(str, list[(list[int],str|int)])

class sisyphus.localengine.TaskQueueInstance(call, logpath, rqmt, name, task_name, task_id)
call

Alias for field number 0

logpath

Alias for field number 1

name

Alias for field number 3

rqmt

Alias for field number 2

task_id

Alias for field number 5

task_name

Alias for field number 4

sisyphus.localengine.run_task(call, logpath)[source]

Simple function to run task

class sisyphus.localengine.sync_object(obj)[source]

Object to be used by the with statement to sync an object via queue e.g.:

self.sobj = sync_object({})
with self.sobj as sobj:
    sobj[7] = 9

# other process
with self.sobj as sobj:
    assert( sobj == { 7: 9 } )
class sisyphus.son_of_grid_engine.SonOfGridEngine(default_rqmt, gateway=None, auto_clean_eqw=True, ignore_jobs=None, pe_name='mpi')[source]
get_logpath(logpath_base, task_name, task_id)[source]

Returns log file for the currently running task

static get_task_id(task_id)[source]

Gets task id either from args or the environment

init_worker(task)[source]

This method will be call before the task is started by the worker. e.g. SGE uses this method to link the SGE log file to the desired position.

Parameters:

task

Returns:

queue_state()[source]

Return s list with all currently running tasks in this queue

start_engine()[source]

No starting action required with the current implementation

stop_engine()[source]

No stopping action required with the current implementation

submit_call(call, logpath, rqmt, name, task_name, task_ids)[source]
Parameters:
  • call (list[str])

  • logpath (str)

  • rqmt (dict[str])

  • name (str)

  • task_name (str)

  • task_ids (list[int])

Returns:

ENGINE_NAME, submitted (list of (list of task ids, job id))

Return type:

(str, list[(list[int],str)])

submit_helper(call, logpath, rqmt, name, task_name, start_id, end_id, step_size)[source]
Parameters:
  • call (list[str])

  • logpath (str)

  • rqmt (dict[str])

  • name (str)

  • task_name (str)

  • start_id (int)

  • end_id (int)

  • step_size (int)

Return type:

str|None

system_call(command, send_to_stdin=None)[source]
Parameters:
  • command (list[str]) – qsub command

  • send_to_stdin (str|None) – shell code, e.g. the command itself to execute

Returns:

stdout, stderr, retval

Return type:

list[bytes], list[bytes], int

task_state(task, task_id)[source]

Return task state: ‘r’ == STATE_RUNNING ‘qw’ == STATE_QUEUE not found == STATE_UNKNOWN everything else == STATE_QUEUE_ERROR

class sisyphus.son_of_grid_engine.TaskInfo(job_id, task_id, state)
job_id

Alias for field number 0

state

Alias for field number 2

task_id

Alias for field number 1

sisyphus.son_of_grid_engine.escape_name(name)[source]
Parameters:

name (str)

Return type:

str

sisyphus.son_of_grid_engine.try_to_multiply(y, x, backup_value=None)[source]

Tries to convert y to float multiply it by x and convert it back to a rounded string. return backup_value if it fails return y if backup_value == None

Parameters:
  • y (str)

  • x (int|float)

  • backup_value (str|None)

Return type:

str

class sisyphus.load_sharing_facility_engine.LoadSharingFacilityEngine(default_rqmt, gateway=None, auto_clean_eqw=True)[source]
static get_logpath(logpath_base, task_name, task_id, engine_selector=None)[source]

Returns log file for the currently running task

get_task_id(task_id)[source]

Gets task id either from args or the environment

queue_state()[source]

Returns list with all currently running tasks in this queue

start_engine()[source]

No starting action required with the current implementation

stop_engine()[source]

No stopping action required with the current implementation

submit_call(call, logpath, rqmt, name, task_name, task_ids)[source]
Parameters:
  • call (list[str])

  • logpath (str)

  • rqmt (dict[str])

  • name (str)

  • task_name (str)

  • task_ids (list[int])

Returns:

ENGINE_NAME, submitted (list of (list of task ids, job id))

Return type:

(str, list[(list[int],str|int)])

task_state(task, task_id)[source]

Return task state: ‘RUN’, ‘PROV’ == STATE_RUNNING ‘PEND’, ‘WAIT’ == STATE_QUEUE not found == STATE_UNKNOWN everything else == STATE_QUEUE_ERROR

class sisyphus.load_sharing_facility_engine.TaskInfo(job_id, task_id, state)
job_id

Alias for field number 0

state

Alias for field number 2

task_id

Alias for field number 1

sisyphus.load_sharing_facility_engine.try_to_multiply(y, x, backup_value=None)[source]

Tries to convert y to float multiply it by x and convert it back to a rounded string. return backup_value if it fails return y if backup_value == None