Welcome to Sisyphus’s documentation!

Motivation

The motivation for Sisyphus was to have framework to easily rerun experiments. It should make is simple to come back to an old experiment and see exactly what commands were executed to get the final result. Sisyphus makes it easy to have a organized way how to share a workflow, e.g. how to setup a complete translation system from start to end or just use parts of it. This is done by creating a graph which connects outputs of jobs (calculations on some input files given some parameter) with other jobs. The connections between these jobs are either files or simple python objects.

Installation

Sisyphus requires a Python 3.5 installation with the following additional libraries:
  • sudo pip3 install psutil
  • sudo pip3 install ipython
Optional if web interface should be used:
  • sudo pip3 install flask
Optional to compile documentation:
  • sudo pip3 install Sphinx
  • sudo pip3 install sphinx_rtd_theme
Optional if virtual file system should be used:
  • sudo pip3 install fusepy
  • sudo addgroup $USER fuse # depending on your system

QuickStart

To run sisyphus you need to setup an experiment folder that contains all needed files (See Structure). An example directory is given in the example folder. To start this toy setup run:

../sis manager

you will get something similar to:

[2018-06-15 16:31:50,488] INFO: Add target result to jobs (used for more informativ output, disable with SHOW_JOB_TARGETS=False)
[2018-06-15 16:31:50,796] INFO: runnable: Job< workdir: work/parallel/LineSpliter.AVSubx1baWqKyMx35c> <target: result>
[2018-06-15 16:31:50,796] INFO: runnable(1) waiting(1)
Print verbose overview (v), start manager (y), or exit (n)?

Start the computation by pressing y. You can stop the manager again at any time by pressing CTRL-C. Sisyphus will show you which processes are currently running. For more information about the processes either check the web interface. It can be started with the http option:

../sis manager --http 8080

This will start a local web server at the given port. Visit it by going to http://localhost:8080 Once the final output is computed it will appear in the output folder. In this given example just some random text file.

Structure

A Sisyphus experiment folder consists mainly of 5 things:
  • the config.py file or config folder
  • the settings.py file
  • the recipes folder
  • the work folder
  • and the output folder (create automatically)

The recipe folder

The recipe folder contains python files which describe what commands are executed in which order and how they are linked together. A typical file starts with this line:

from sisyphus import *
which setup the sisyphus environment by importing/creating:
  • Job, this is the base class for all Jobs created in a sisyphus setup. A job takes some parameter and files as input and creates some other files as output. It represents a node in our workflow graph.
  • Task, these are the subelements of a Job. Each job runs one or more Tasks to create it’s actual outputs.
  • Path, used to reference to files directly. Path object are also created as outputs of Jobs. They can be seen as edges in the workflow graph.
  • tk, short for toolkit. Contains commands to communicate with sisyphus

A workflow in a python file is now created by connecting these jobs together via a path object. This is usually done by a function which serves as template. Outputs of the workflow graph a registered at sisyphus via the `tk.register_output('name', path)` function. These files will be linked to the output folder after the responsible job to create this file finished.

config folder

The config folder contains the description which experiments should be run. e.g.:

from sisyphus import *

from recipe import tools
head = tools.Head(Path('input/file')).out
tk.register_output('head_of_input_file', head)

This imports the module tools from the recipe folder and runs the job Head with a given input file and registers the result as output.

work folder

The work folder stores all files created during the experiment. This folder should point to a directory with a lot available space. The whole folder could be deleted after an experiment is done since everything can be recomputed, assuming your experiments are deterministic.

settings.py

Contains all settings that should be changed for the whole setup globally. Usually a description of the work engine that should be used. You can probably just copy the last one you used. A detailed overview of all settings can be found here. Example:

def engine():
    """ Create engine object used to submit jobs. The simplest setup just creates a local
    engine starting all jobs on the local machine e.g.:

        from sisyphus.localengine import LocalEngine
        return LocalEngine(max_cpu=8)

    The usually recommended version is to use a local and a normal grid engine. The EngineSelector
    can be used to schedule tasks on different engines. The main intuition was to have an engine for
    very small jobs that don't required to be scheduled on a large grid engine (e.g. counting lines of file).

    Note: the engines should only be imported locally inside the function to avoid circular imports

    :return: engine
    """
    # Exmple of local engine:
    from sisyphus.localengine import LocalEngine
    return LocalEngine(cpu=4)

    # Example how to use the engine selector, normally the 'long' engine would be a grid enigne e.g. sge
    from sisyphus.engine import EngineSelector
    from sisyphus.son_of_grid_engine import SonOfGridEngine
    return EngineSelector(engines={'short': LocalEngine(cpu=8),
                               'long': SonOfGridEngine(default_rqmt={'cpu' : 1, 'mem' : '1G', 'gpu' : 0, 'time' : 1, })},
                      default_engine='long')

# Wait so long before marking a job as finished to allow network
# filesystems so synchronize, should be reduced if only the local engine and filesystem is used.
WAIT_PERIOD_JOB_FS_SYNC = 30

# How ofter Sisyphus checking for finished jobs
WAIT_PERIOD_BETWEEN_CHECKS = 30

# Disable automatic job directory clean up
JOB_AUTO_CLEANUP = False

Job

Jobs are the most import objects to understand a sisyphus setup. A job defines a operation which creates a well defines output given the same inputs. The outputs of a job are normally the input to other jobs or defines as output of this sisyphus setup. Sisyphus will automatically figure out which jobs need to be run in which order to created all requested outputs. If two jobs with the exact same inputs are created sisyphus assumes they are equal since they should produces the same output by definition. They will be grouped together and only run once, this is useful to reduce the number of calculations dramatically. Each job gets it’s own clean work directory to work with and a output directory to place it’s finished calculations. A simple job looks like this:

class CountVocab(Job):

    def __init__(self, text): # takes text as input parameter, all inputs for this job need to be listed in the __init__ function
        self.text = text
        self.out = self.output_path('counts.gz') # the output file of this job

    def run(self): # this function will be run by the task, see below
        # the actual bash command, everything placed in {name} will be replaced by property with the same name of this
        # object, e.g. self.name
        self.sh("zcat -f {text} | tr ' ' '\n' | awk 'NF' | sort | uniq -c | sort -g | gzip > {out}")

    def tasks(self): # function that will be called to request all tasks from this job, expects a iterable
        # request to run the function 'run', with requirements of 2GB memory and 2 hours of time.
        yield Task('run', rqmt={'mem': 2, 'time': 2})

Task

A task defines which functions of a job should be run with which argument and which resources should be requested. A job can have multiple tasks. All tasks are executed after another. A possible setup with multiple tasks is a setup task, a worker task which is run on multiple computers and a finalize task to collect the results of all worker tasks.

sis command

Sisyphus is started by running the sis command in it’s folder. The main mode of this tool is sis manager or short sis m it will parses the config directive and will submit the required job to the cluster. The manager will periodically check which jobs have finished and submits all jobs that became runnable to the cluster as long as it is running. If you stop the manager (using Ctrl-C) no further jobs are submitted, but jobs submitted to the cluster will continue.

FAQ

Dryrun a Job without engine

For debugging it is often annoying to submit a job to the engine just to see it crashing shortly after. If the job directory is already created just run the command that would be executed by Sisyphus manual:

sis worker work/path/to/job name_of_method

If this is not the case you want to switch into console mode and setup the job:

sis console

In the console you need to find the job an pass it to tk.run_job to automatically set it up and run it:

# find and save job:
In [1]: tk.sis_graph.find('LineSp', mode='job')
Out[1]: [Job< workdir: work/parallel/LineSpliter.AVSubx1baWqKyMx35c>]

In [2]: j = tk.sis_graph.find('LineSp', mode='job')[0]

# setup and run job:
In [3]: tk.run_job(j, 'run', 1)
# If only the job is given it will run the first task:
In [3]: tk.run_job(j)

Using a relative Path in recipe folder

It is nice to have small scripts directly next to the recipe calls. This can be easily achieved by initializing a relative path like this:

RelPath = tk.setup_path(__package__)

All a Path created using RelPath will be relative to the current recipe file.

Remove finished jobs and its descendants

You can remove a job with all jobs depending on it from the Sisyphus console using the sisyphus.toolkit.remove_job_and_descendants() method. This is useful if a job definition changed and everything depending on it should be rerun:

# find and save job:
In [1]: tk.sis_graph.find('LineSp', mode='job')
Out[1]: [Job< workdir: work/parallel/LineSpliter.AVSubx1baWqKyMx35c>]

In [2]: jobs = tk.sis_graph.find('LineSp', mode='job')

# delete these jobs with all jobs depending on them
In [3]: tk.remove_job_and_descendants(jobs)

API

Toolkit

This module contains helper methods used in the console or in a Job. Use tk.<name of function>? for more help.

Useful examples:

# Find job or path:
j = tk.sis_graph.find('LineSplitter')
# Find only job:
j = tk.sis_graph.find('LineSplitter', mode='job')
# Find only path:
j = tk.sis_graph.find('LineSplitter', mode='path')

# Rerun tasks depending on a given file/job:
tk.remove_job_and_descendants(tk.sis_graph.find('hitchhiker'))

# Setup job:
tk.setup_job_directory(j)
# run job:
tk.run_job(j)

# Reload start up config:
tk.reload_config(config_files)
# Reload config file:
tk.reload_config('path/to/config/file/or/directory')
# Reload all recipe files
reload_recipes()
# Load job from job directory:
tk.load_job('path/to/job/direcotry')

# Import jobs from other work directory
tk.import_work_directory(['path/to/other/work'], mode='copy')

# Print short job summary
tk.job_info(j)

# Cleanup work directory (use with caution):
tk.cleaner(clean_job_dir=True, clean_work_dir=True, mode='remove')
exception sisyphus.toolkit.BlockedWorkflow[source]
class sisyphus.toolkit.Object[source]

Simple helper class to create Objects without adding code

class sisyphus.toolkit.RelPath(origin, hash_overwrite=None)[source]

Creates an object that will create a Path object relative to the current module if called

sisyphus.toolkit.bundle_to_str(bundle)[source]

Convert bundle of objects into a space separated list

sisyphus.toolkit.cached_engine(cache=[])[source]

Returns a cached version, for internal usage

sisyphus.toolkit.cleaner(clean_job_dir: bool = False, clean_work_dir: bool = False, mode: str = 'dryrun', keep_value: int = 0, only_remove_current_graph: bool = False)[source]

Free wasted disk space. Creates a list of all possible path in the current setup and deletes all directories that are not part of the current graph. In addition it can clean up directories of finished jobs by deleting the work directory, zipping the log files and removing status files.

Check keep value of each job, if the job has a lower value then given and is not needed anymore to compute an other job it will be removed. Each job has a default value of 50.

Parameters:
  • clean_job_dir(bool) – Clean up job directories by zipping as much as possible into a tar archive, also delete the work directory (depending on global setting) and remove status files. Set mode to ‘remove’ for cleaning.
  • clean_work_dir(bool) – Scan the work directory for files and directories not part of the graph
  • mode(str) – Possible values: dryrun, move, remove
  • keep_value(int) – Delete all jobs with a lower value.
  • only_remove_current_graph(bool) – Only remove files from the current graph.
sisyphus.toolkit.compare_graph(obj1, obj2, traceback=None, visited=None)[source]

Compares two objects and shows traceback to first found difference

Parameters:
  • (Job/Path) (obj2) – Object1 to compare
  • (Job/Path) – Object2 which is compared to Object1
  • traceback – Used for recursion, leave blank
  • visited – Used for recursion, leave blank
Returns:

traceback

sisyphus.toolkit.dump(obj: Any, filename: str)[source]

Dumps object using pickle in zipped file, creates directory if needed

Parameters:
  • obj – Object to pickle
  • filename (str) – Path to pickled file
sisyphus.toolkit.export_graph(output_file: Optional[str] = None)[source]

Needs more testing

Parameters:output_file
Returns:
sisyphus.toolkit.import_work_directory(directories: Union[str, List[str]], mode='dryrun')[source]

Link or copy finished jobs from other work directories.

Parameters:
  • directories (str) – Path to other work directories
  • mode (str) – How to import job directories. Options: (copy, symlink, dryrun)
sisyphus.toolkit.input_path(path: Union[sisyphus.job_path.Path, str]) → sisyphus.job_path.Path[source]

Ensures a given input is a Path. Strings are automatically converted into Path objects

Parameters:path – path that should be checked
Returns:Path object
sisyphus.toolkit.job_info(job: sisyphus.job.Job)[source]

Prints information about given job to stdout

Parameters:job(Job)
sisyphus.toolkit.load_file(path: str) → Any[source]

Load object from pickled file, works with zipped and unzipped files

Parameters:path (str) – Path to pickled file
Returns:Unpickled object
sisyphus.toolkit.load_job(path: str) → sisyphus.job.Job[source]

Load job from job directory even if it is already cleaned up

Parameters:path(str) – Path to job directory
Return (Job):
sisyphus.toolkit.migrate_graph(input_file=None, work_source=None, mode='dryrun')[source]

migrate the graph from the provided graph file to the current graph

Parameters:
  • input_file (str) – path to the graph file
  • work_source (str|None) – path to the work folder, if None use the local work folder
  • mode (str) – dryrun, link, copy, move, move_and_link, hardlink_or_copy, hardlink_or_link, the default is dryrun
Returns:

class sisyphus.toolkit.mktemp[source]

Object to be used by the with statement. creates temporary file that will be delete at exit. Can be used like this:

with mktemp() as temp:
    #do stuff with temp
    f = open(temp, 'w')
    f.write('foo')

    f = open(temp, 'r')
    foo = f.read()
# temp file is deleted
sisyphus.toolkit.register_output(name, value, export_graph=False)[source]
Parameters:
  • name (str) –
  • value (Path) –
  • export_graph (bool) –
sisyphus.toolkit.reload_config(config_files: List[str] = [])[source]

Reset state, reload old config files, and load given config_files

Parameters:..]) (config_files([str,) –
sisyphus.toolkit.reload_module(module)[source]

Shortcut to reload module, keep sis_graph if toolkit is reloaded

Parameters:module – Module to reload
Returns:
sisyphus.toolkit.reload_recipes()[source]

Reload all recipes

sisyphus.toolkit.remove_job_and_descendants(jobs: Union[str, sisyphus.job_path.Path, sisyphus.job.Job, List[Union[str, sisyphus.job_path.Path, sisyphus.job.Job]]], mode: str = 'remove') → bool[source]

Remove all jobs that depend on the given jobs/paths.

Parameters:
  • jobs (List[Job|Path]) – They and all jobs depended on them should be removed
  • mode (string) – run mode (remove, move, dryrun)
sisyphus.toolkit.replace_graph_objects(current, mapping=None, replace_function=None)[source]

This function takes a given graph and creates a new graph where every object listed in mapping is replaced.

current: current graph mapping: [(old_object, new_object), ….] replace_function: how an object will be replace, defaults using the mapping

returns: New graph

sisyphus.toolkit.run(obj: Any, quiet: bool = False)[source]

Run and setup all jobs that are contained inside object and all jobs that are necessary.

Parameters:
  • obj
  • quiet – Do not forward job output do stdout
Returns:

sisyphus.toolkit.run_job(job: sisyphus.job.Job, task_name: str = None, task_id: int = 1, force_resume: bool = False)[source]

Run job directly in console window.

Parameters:
  • job (Job) – Job with tasks to run
  • task_name (str) – which task should run, default: The first listed task
  • task_id (int) – which task_id should be used, default: 1
  • force_resume (bool) – Force resume of job in error state
sisyphus.toolkit.setup_job_directory(job: sisyphus.job.Job)[source]

Setup the work directory of the given job.

Parameters:job (Job|Path) – Job which needs work directory
sisyphus.toolkit.setup_path(package: str) → sisyphus.toolkit.RelPath[source]

Should be called like `rel_path = setup_path(__package__)` which setups RelPath to create Path objects relative to the current module.

Parameters:package (str) –
Return type:RelPath
sisyphus.toolkit.setup_script_mode()[source]

Use this function if you start sisyphus from an recipe file, it will:

  1. setup logging level and prompt
  2. disable the wait periods
  3. disable unwanted warning

You can run recipes directly by running something similar to this:

export SIS_RECIPE_PATH=/PATH/TO/RECIPE/DIR
# If sisyphus is not installed in your python path
export PYTHONPATH=/PATH/TO/SISYPHUS:$PYTHONPATH
# If you want to change the work directory:
export SIS_WORK_DIR=/PATH/TO/WORK/DIR
python3 $SIS_RECIPE_PATH/recipe/path_to_file script parameters

An example for the recipe:

import os
import argparse
from sisyphus import *
from recipe.eval import bleu

if __name__ == '__main__':
    tk.setup_script_mode()

    parser = argparse.ArgumentParser(description='Evaluate hypothesis')
    parser.add_argument('--hyp', help='hypothesis', required=True)
    parser.add_argument('--ref', help='reference', required=True)

    args = parser.parse_args()
    hyp = os.path.realpath(args.hyp)
    ref = os.path.realpath(args.ref)

    score = bleu(hyp, ref)

    tk.run(score, quiet=True)
    print(score.out)
sisyphus.toolkit.start_manager(job_engine=None, start_computations=False)[source]

Shortcut to start Manager

Parameters:
  • job_engine – Use this job engine, init own job engine if set to None
  • start_computations – Submit jobs directly
Returns:

Manager

sisyphus.toolkit.uncached_path(path)[source]
Parameters:path (Path|str) –
Return type:str
sisyphus.toolkit.zipped(filename: Union[sisyphus.job_path.Path, str]) → bool[source]

Check if given file is zipped

Parameters:(Path/str) (filename) – File to be checked
Return (bool):True if input file is zipped

Graph

class sisyphus.graph.SISGraph[source]

This graph contains all targets that needs to be calculated and through there dependencies all required jobs. These jobs can be searched and modified using the provided functions. Most interesting functions are:

# Lists all jobs
jobs()
# Find jobs by matching substring
find(pattern)
# Execute function for all nodes
for_all_nodes(f)
# Dictionaries with jobs sorted by current status:
get_jobs_by_status()
add_target(target)[source]
Parameters:target (OutputTarget) –
find(pattern, mode='all')[source]

Returns a list with all jobs and paths that partly match the pattern

Parameters:
  • pattern(str) – Pattern to match
  • mode(str) – Select if jobs, paths or both should be returned. Possible values: all, path, job
Return ([Job/Path, …]):
 

List with all matching jobs/paths

for_all_nodes(f, nodes=None, bottom_up=False)[source]

Run function f for each node and ancestor for nodes from top down, stop expanding tree branch if functions returns False. Does not stop on None to allow functions with no return value to run for every node.

Parameters:
  • f ((Job)->bool) – function will be executed for all nodes
  • nodes – all nodes that will be checked, defaults to all output nodes in graph
  • bottom_up (bool) – start with deepest nodes first, ignore return value of f
Returns:

set with all visited nodes

get_job_from_path(path)[source]

The reverse function for get_path_to_all_nodes

get_jobs_by_status(nodes: Optional[List[T]] = None, engine: Optional = None, skip_finished: bool = False) → DefaultDict[str, List[sisyphus.job.Job]][source]

Return all jobs needed to finish output in dictionary with current status as key

Parameters:
  • nodes – all nodes that will be checked, defaults to all output nodes in graph
  • engine (sisyphus.engine.EngineBase) – Use status job status of engine, ignore engine status if set to None (default: None)
  • skip_finished (bool) – Stop checking subtrees of finished nodes to save time
Return ({status1: [Job, …], status2: …}):
 

Dictionary with all jobs sorted by current state

Return type:

dict[str,list[Job]]

jobs()[source]
Return ([Job, …]):
 List with all jobs in grpah
jobs_sorted()[source]

Yields jobs in a order so that for each jop all jobs it depends on are already finished

Return (generator Node):
 jobs sorted by dependency
output

Deprecated: used for backwards comparability, only supports path outputs

set_job_targets(engine=None)[source]

Add a target to all jobs (if possible) to have a more informative output

targets_dict
Returns:dict name -> target
Return type:dict[str,OutputTarget]
update_nodes()[source]

Update all nodes to get the most current dependency graph

Job

class sisyphus.job.Job[source]

Object do hold the job descriptions. You derive your own job classes from this base class. All the arguments of __init__ will be taken into account for the hash. In your derived class, you need to overwrite the tasks method.

add_input(path)[source]
Parameters:path (Path) –
Returns:path
Return type:Path
classmethod hash(parsed_args)[source]
Parameters:parsed_args (dict[str]) –
Returns:hash for job given the arguments
Return type:str
info()[source]

Returns information about the currently running job to be displayed on the web interface and the manager view :return: string to be displayed or None if not available :rtype: str

job_id()[source]

Returns a unique string to identify this job

keep_value(value=None)[source]

Return keep_value, if value is given also set keep value

output_path(filename, directory=False, cached=False)[source]

Adds output path, if directory is true a directory will will be created automatically.

Parameters:
  • filename (str) –
  • directory (bool) –
  • cached (bool) –
Return type:

Path

output_var(filename, pickle=False, backup=None)[source]

Adds output path which contains a python object, if directory is true a directory will will be created automatically

path_available(path)[source]

Returns true if given path is available yet

Parameters:path – path to check
Returns:
set_attrs(attrs)[source]

Adds all attrs to self, used in constructor e.gl: self.set_attrs(locals())

set_default(name, value)[source]

Deprecated helper function, will be removed in the future. Don’t use it!

set_keep_value(value)[source]

Set keep value and return self

set_rqmt(task_name, rqmt)[source]

Overwrites the given requirements for this job

Parameters:
  • task_name (str) – Which task will be affected
  • rqmt – the new requirements
Returns:

sh(command, *args, **kwargs)[source]

Calls a external shell and replaces {args} with job inputs, outputs, args and executes the command

tasks() → Iterator[sisyphus.task.Task][source]
Returns:yields Task’s
Return type:list[sisyphus.task.Task]
update()[source]

Run after all inputs are computed, allowing the job to analyse the given input and ask for additional inputs before running.

update_rqmt(task_name, rqmt)[source]

Updates the given requirements for this job, values not set in rqmt will not be affected.

Parameters:
  • task_name (str) – Which task will be affected
  • rqmt – the new requirements
Returns:

Task

class sisyphus.task.Task(start, resume=None, rqmt={}, args=[[]], mini_task=False, update_rqmt=None, parallel=0, tries=1, continuable=False)[source]

Object to hold information what function should be run with which requirements.

Settings

These settings can be overwritten via a settings.py file in the current directory, when sis is run.

sisyphus.global_settings.ALIAS_AND_OUTPUT_SUBDIR = ''

If set to a non-empty string aliases and outputs will be placed in a subdir. This is useful for setups with multiple configs

sisyphus.global_settings.ALIAS_DIR = 'alias'

Name alias directory

sisyphus.global_settings.AUTO_SET_JOB_INIT_ATTRIBUTES = False

Automatically set all input given to __init__ as attributes of the created job. Disabled by default since it tends to confuse new users reading the code.

sisyphus.global_settings.CLEANUP_ENVIRONMENT = True

Remove all environment variables to ensure the same environment between different users

sisyphus.global_settings.CLEAR_ERROR = False

set true to automatically clean jobs in error state and retry

sisyphus.global_settings.CONFIG_FUNCTION_DEFAULT = 'config.main'

Name of default fuction to call in config directory

sisyphus.global_settings.CONFIG_PATH = '.'

Path to the config directory, not including the directory name ‘config’

sisyphus.global_settings.DEFAULT_ENVIRONMENT_KEEP = {'CUDA_VISIBLE_DEVICES', 'HOME', 'PWD', 'SGE_STDERR_PATH', 'SGE_TASK_ID', 'TMP', 'TMPDIR', 'USER'}

Keep these environment variables if CLEANUP_ENVIRONMENT is set

sisyphus.global_settings.DEFAULT_ENVIRONMENT_SET = {'LANG': 'en_US.UTF-8', 'MKL_NUM_THREADS': 1, 'OMP_NUM_THREADS': 1, 'PATH': '/rbi/sge/bin:/rbi/sge/bin/lx-amd64:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin', 'SHELL': '/bin/bash'}

Set these environment variables if CLEANUP_ENVIRONMENT is set

sisyphus.global_settings.FILESYSTEM_CACHE_TIME = 30

How long the virtual file system should cache process states

sisyphus.global_settings.GRAPH_WORKER = 16

How many threads should update the graph in parallel, useful if the filesystem has a high latency

sisyphus.global_settings.JOB_AUTO_CLEANUP = True

Automatically clean up job directory after job has finished

sisyphus.global_settings.JOB_CLEANER_INTERVAL = 60

How often to check for finished jobs in seconds

sisyphus.global_settings.JOB_CLEANER_WORKER = 5

How many threads should be cleaning in parallel

sisyphus.global_settings.JOB_CLEANUP_KEEP_WORK = False

If the job internal work directory should be keeped re deleted during clean up

sisyphus.global_settings.JOB_DEFAULT_KEEP_VALUE = 50

Default value for job used by tk.cleaner to determine if a job should be removed or not

sisyphus.global_settings.JOB_MAX_NUMBER_OF_LOCKS = 100

How many locks can be used by all jobs (one lock per job). If there are more jobs than locks, locks are reused This could lead to a slowdown, but the number of locks per process is limited

sisyphus.global_settings.MANAGER_SUBMIT_WORKER = 10

How many threads are used to setup the job directory and submit jobs

sisyphus.global_settings.MAX_SUBMIT_RETRIES = 3

How often sisyphus will try to resubmit a task to the engine before returning a RETRY_ERROR

sisyphus.global_settings.OUTPUT_DIR = 'output'

Name output directory

sisyphus.global_settings.PLOGGING_INTERVAL = 5

Seconds between checks how much memory and cpu a process is using

sisyphus.global_settings.PLOGGING_MIN_CHANGE = 0.1

Minimal relative change between log entries of used resources

sisyphus.global_settings.PLOGGING_QUIET = False

Suppress messages about process resources usage

sisyphus.global_settings.PLOGGING_UPDATE_FILE_PERIOD = 60

In which interval the process used resources file should be updated

sisyphus.global_settings.PRINT_ERROR = True

Print error messages of a job in the manager status field

sisyphus.global_settings.PRINT_ERROR_LINES = 40

Print that many last lines of error state log file

sisyphus.global_settings.PRINT_ERROR_TASKS = 1

Print detailed log of that many jobs in error state

sisyphus.global_settings.RECIPE_PATH = '.'

Path to the recipe directory, not including the directory name ‘recipe’

sisyphus.global_settings.SHOW_JOB_TARGETS = True

Show job targets on status screen, can significantly slow down startup time if many outputs are used

sisyphus.global_settings.SIS_COMMAND = ['/home/docs/checkouts/readthedocs.org/user_builds/sisyphus-workflow-manager/envs/stable/bin/python', '/home/docs/checkouts/readthedocs.org/user_builds/sisyphus-workflow-manager/envs/stable/bin/sphinx-build']

Which command should be called to start sisyphus, can be used to replace the python binary

sisyphus.global_settings.SIS_HASH(obj, length=12, chars='0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz')

Default function to hash jobs and objects

sisyphus.global_settings.TMP_PREFIX = '/tmp/sis_'

Directory used by tk.mktemp

sisyphus.global_settings.USE_SIGNAL_HANDLERS = False

Install signal handlers for debugging

sisyphus.global_settings.USE_VERBOSE_TRACEBACK = True

Use ipython traceback

sisyphus.global_settings.VERBOSE_TRACEBACK_TYPE = 'ipython'

The verbose traceback type. “ipython” or “better_exchook”

sisyphus.global_settings.VIS_ABSOLUTE_MERGE_THRESHOLD = 5

For http visualization, list job input as common input if it is share between more then X jobs

sisyphus.global_settings.VIS_RELATIVE_MERGE_THRESHOLD = 0.25

For http visualization, list job input as common input if it is share between more then X*(total jobs) jobs

sisyphus.global_settings.WAIT_PERIOD_BETWEEN_CHECKS = 30

How often should the manager check for finished jobs

sisyphus.global_settings.WAIT_PERIOD_CACHE = 20

Safety period to wait for actionable jobs to change status before running action

sisyphus.global_settings.WAIT_PERIOD_HTTP_RETRY_BIND = 10

How many seconds should be waited before retrying to bind to the desired port

sisyphus.global_settings.WAIT_PERIOD_JOB_CLEANUP = 10

How many seconds should be waited before cleaning up a finished job

sisyphus.global_settings.WAIT_PERIOD_JOB_FS_SYNC = 30

How many seconds should be waited before assuming a job is finished after the finished file is written to allow network file system to sync up

sisyphus.global_settings.WAIT_PERIOD_MTIME_OF_INPUTS = 60

How many seconds should all inputs be available before starting a job to avoid file system synchronization problems

sisyphus.global_settings.WAIT_PERIOD_QSTAT_PARSING = 15

How many seconds should be waited before retrying to parse a failed qstat output

sisyphus.global_settings.WAIT_PERIOD_SSH_TIMEOUT = 15

How many seconds should be waited before retrying a ssh connection

sisyphus.global_settings.WORK_DIR = 'work'

The work directory

sisyphus.global_settings.cached_engine(cache=[])[source]
Parameters:cache (list) –
Returns:engine (EngineBase)
sisyphus.global_settings.check_engine_limits(current_rqmt, task)[source]

Check if requested requirements break and hardware limits and reduce them. By default ignored, a possible check for limits could look like this:

current_rqmt['time'] = min(168, current_rqmt.get('time', 2))
if current_rqmt['time'] > 24:
    current_rqmt['mem'] = min(63, current_rqmt['mem'])
else:
    current_rqmt['mem'] = min(127, current_rqmt['mem'])
return current_rqmt
Parameters:
  • current_rqmt (dict[str]) – requirements currently requested
  • task (sisyphus.task.Task) – task that is handled
Returns:

requirements updated to engine limits

Return type:

dict[str]

sisyphus.global_settings.engine()[source]

Create engine object used to submit jobs. The simplest setup just creates a local engine starting all jobs on the local machine e.g.:

from sisyphus.localengine import LocalEngine
return LocalEngine(cpu=8)

The usually recommended version is to use a local and a normal grid engine. The EngineSelector can be used to schedule tasks on different engines. The main intuition was to have an engine for very small jobs that don’t required to be scheduled on a large grid engine (e.g. counting lines of file). A setup using the EngineSelector would look like this:

from sisyphus.localengine import LocalEngine
from sisyphus.engine import EngineSelector
from sisyphus.son_of_grid_engine import SonOfGridEngine
return EngineSelector(engines={'short': LocalEngine(cpu=4),
                               'long': SonOfGridEngine(default_rqmt={'cpu': 1, 'mem': 1,
                                                                     'gpu': 0, 'time': 1})},
                      default_engine='long')

Note: the engines should only be imported locally inside the function to avoid circular imports

Returns:engine (EngineBase)
sisyphus.global_settings.file_caching(path)[source]

This function should be replaced to enable file caching. e.g. copy given file to /var/tmp and return new path. The default behaviour is to just pass on the given path

Parameters:path (str) – Path to file that should be cached
Returns:path to cached file
Return type:str
sisyphus.global_settings.update_engine_rqmt(initial_rqmt, last_usage)[source]

Update requirements after a job got interrupted.

Parameters:
  • initial_rqmt (dict[str]) – requirements that are requested first
  • last_usage (dict[str]) – information about the last usage by the task
Returns:

updated requirements

Return type:

dict[str]

sisyphus.global_settings.update_global_settings_from_env()[source]
Returns:nothing
sisyphus.global_settings.update_global_settings_from_file(filename)[source]
Parameters:filename (str) –
Returns:nothing
sisyphus.global_settings.update_global_settings_from_text(text, filename)[source]
Parameters:
  • text
  • filename (str) –
Returns:

nothing

Engines

class sisyphus.engine.EngineBase[source]

An engine manages the execution of jobs, e.g. locally, or in a queuing system like SGE.

get_job_used_resources(current_process)[source]

Should be overwritten by subclass if a better way to measure the used resources is available, e.g. cgroups. This function should only be used by the worker.

Parameters:
  • current_process (psutil.Process) –
  • engine_selector
get_rqmt(task, task_id, update=True)[source]

Get the requirements submitted for this task

Parameters:
static get_task_id(task_id)[source]

Gets task id either from args or the environment

init_worker(task)[source]

This method will be call before the task is started by the worker. e.g. SGE uses this method to link the SGE log file to the desired position.

Parameters:task
Returns:
job_state(job)[source]

Return current state of job

submit(task)[source]

Prepares all relevant commands and calls submit_call of subclass to actual pass job to relevant engine

Parameters:task (sisyphus.task.Task) – Task to submit
Returns:None
submit_call(call, logpath, rqmt, name, task_name, task_ids)[source]
Parameters:
  • call (list[str]) –
  • logpath (str) –
  • rqmt (dict[str]) –
  • name (str) –
  • task_name (str) –
  • task_ids (list[int]) –
Returns:

ENGINE_NAME, submitted (list of (list of task ids, job id))

Return type:

str, list[(list[int],int)]

class sisyphus.engine.EngineSelector(engines, default_engine)[source]

The EngineSelector engine wraps multiple other engines and schedules the jobs according to the requirements (rqmt).

Tasks with mini_task=True will use the engine selector ‘short’, so usually that engine should be specified as well.

for_all_engines(f)[source]

Tell all engines to stop

get_job_used_resources(current_process)[source]

Should be overwritten by subclass if a better way to measure the used resources is available, e.g. cgroups. This function should only be used by the worker.

Parameters:
  • current_process (psutil.Process) –
  • engine_selector
get_used_engine(engine_selector)[source]
Parameters:engine_selector (str) – name in self.engines
Return type:EngineBase
get_used_engine_by_rqmt(rqmt)[source]
Parameters:rqmt (dict[str]) –
Return type:EngineBase
stop_engine()[source]

Tell all engines to stop

submit_call(call, logpath, rqmt, name, task_name, task_ids)[source]
Parameters:
  • call (list[str]) –
  • logpath (str) –
  • rqmt (dict[str]) –
  • name (str) –
  • task_name (str) –
  • task_ids (list[int]) –
Returns:

ENGINE_NAME, submitted (list of (list of task ids, job id))

Return type:

str, list[(list[int],int)]

task_state(task, task_id)[source]

Return state of task

class sisyphus.localengine.LocalEngine(cpus=1, gpus=0, **kwargs)[source]

Simple engine to execute running tasks locally. CPU and GPU are always checked, all other requirements only if given during initialisation.

get_task_id(task_id)[source]

Gets task id either from args or the environment

start_task(task)[source]
Parameters:task (TaskQueueInstance) –
Return type:psutil.Process
submit_call(call, logpath, rqmt, name, task_name, task_ids)[source]
Parameters:
  • call (list[str]) –
  • logpath (str) –
  • rqmt (dict[str]) –
  • name (str) –
  • task_name (str) –
  • task_ids (list[int]) –
Returns:

ENGINE_NAME, submitted (list of (list of task ids, job id))

Return type:

str, list[(list[int],int)]

class sisyphus.localengine.TaskQueueInstance(call, logpath, rqmt, name, task_name, task_id)
call

Alias for field number 0

logpath

Alias for field number 1

name

Alias for field number 3

rqmt

Alias for field number 2

task_id

Alias for field number 5

task_name

Alias for field number 4

sisyphus.localengine.run_task(call, logpath)[source]

Simple function to run task

class sisyphus.localengine.sync_object(obj)[source]

Object to be used by the with statement to sync an object via queue e.g.:

self.sobj = sync_object({})
with self.sobj as sobj:
    sobj[7] = 9

# other process
with self.sobj as sobj:
    assert( sobj == { 7: 9 } )
class sisyphus.son_of_grid_engine.SonOfGridEngine(default_rqmt, gateway=None, auto_clean_eqw=True, ignore_jobs=[])[source]
get_logpath(logpath_base, task_name, task_id)[source]

Returns log file for the currently running task

static get_task_id(task_id)[source]

Gets task id either from args or the environment

init_worker(task)[source]

This method will be call before the task is started by the worker. e.g. SGE uses this method to link the SGE log file to the desired position.

Parameters:task
Returns:
queue_state()[source]

Return s list with all currently running tasks in this queue

start_engine()[source]

No starting action required with the current implementation

stop_engine()[source]

No stopping action required with the current implementation

submit_call(call, logpath, rqmt, name, task_name, task_ids)[source]
Parameters:
  • call (list[str]) –
  • logpath (str) –
  • rqmt (dict[str]) –
  • name (str) –
  • task_name (str) –
  • task_ids (list[int]) –
Returns:

ENGINE_NAME, submitted (list of (list of task ids, job id))

Return type:

str, list[(list[int],int)]

submit_helper(call, logpath, rqmt, name, task_name, start_id, end_id, step_size)[source]
Parameters:
  • call (list[str]) –
  • logpath (str) –
  • rqmt (dict[str]) –
  • name (str) –
  • task_name (str) –
  • start_id (int) –
  • end_id (int) –
  • step_size (int) –
system_call(command, send_to_stdin=None)[source]
Parameters:
  • command (list[str]) – qsub command
  • send_to_stdin (str|None) – shell code, e.g. the command itself to execute
Returns:

stdout, stderr, retval

Return type:

list[bytes], list[bytes], int

task_state(task, task_id)[source]

Return task state: ‘r’ == STATE_RUNNING ‘qw’ == STATE_QUEUE not found == STATE_UNKNOWN everything else == STATE_QUEUE_ERROR

class sisyphus.son_of_grid_engine.TaskInfo(job_id, task_id, state)
job_id

Alias for field number 0

state

Alias for field number 2

task_id

Alias for field number 1

sisyphus.son_of_grid_engine.try_to_multiply(y, x, backup_value=None)[source]

Tries to convert y to float multiply it by x and convert it back to a rounded string. return backup_value if it fails return y if backup_value == None

class sisyphus.load_sharing_facility_engine.LoadSharingFacilityEngine(default_rqmt, gateway=None, auto_clean_eqw=True)[source]
static get_logpath(logpath_base, task_name, task_id, engine_selector=None)[source]

Returns log file for the currently running task

get_task_id(task_id)[source]

Gets task id either from args or the environment

queue_state()[source]

Returns list with all currently running tasks in this queue

start_engine()[source]

No starting action required with the current implementation

stop_engine()[source]

No stopping action required with the current implementation

submit_call(call, logpath, rqmt, name, task_name, task_ids)[source]
Parameters:
  • call (list[str]) –
  • logpath (str) –
  • rqmt (dict[str]) –
  • name (str) –
  • task_name (str) –
  • task_ids (list[int]) –
Returns:

ENGINE_NAME, submitted (list of (list of task ids, job id))

Return type:

str, list[(list[int],int)]

task_state(task, task_id)[source]

Return task state: ‘RUN’, ‘PROV’ == STATE_RUNNING ‘PEND’, ‘WAIT’ == STATE_QUEUE not found == STATE_UNKNOWN everything else == STATE_QUEUE_ERROR

class sisyphus.load_sharing_facility_engine.TaskInfo(job_id, task_id, state)
job_id

Alias for field number 0

state

Alias for field number 2

task_id

Alias for field number 1

sisyphus.load_sharing_facility_engine.try_to_multiply(y, x, backup_value=None)[source]

Tries to convert y to float multiply it by x and convert it back to a rounded string. return backup_value if it fails return y if backup_value == None

Indices and tables