import logging
import multiprocessing
import threading
import queue
import os
import socket
import subprocess
import psutil
import time
from sisyphus.engine import EngineBase
from sisyphus import tools
import sisyphus.global_settings as gs
from collections import namedtuple
from ast import literal_eval
# we are only using cpu so far anyway...
ENGINE_NAME = "local"
LOCAL_DEFAULTS = {
"cpu": 1,
"mem": 1,
"time": 1,
}
[docs]
def run_task(call, logpath):
"""Simple function to run task"""
with open(logpath, "a") as logfile:
subprocess.check_call(call, stdout=logfile, stderr=logfile)
# name is the unique name combined of job id and task name, task_name is only the task name
TaskQueueInstance = namedtuple("TaskQueueInstance", ["call", "logpath", "rqmt", "name", "task_name", "task_id"])
def get_process_logging_path(task_path, task_name, task_id):
path = os.path.join(task_path, gs.PLOGGING_FILE)
path = "%s.%s.%i" % (path, task_name, task_id)
return path
[docs]
class sync_object(object):
"""Object to be used by the with statement to sync an object via queue
e.g.::
self.sobj = sync_object({})
with self.sobj as sobj:
sobj[7] = 9
# other process
with self.sobj as sobj:
assert( sobj == { 7: 9 } )
"""
def __init__(self, obj):
self.queue = queue.Queue(1)
self.obj = obj
self.queue.put(obj)
def __enter__(self):
self.obj = self.queue.get()
return self.obj
def __exit__(self, type, value, traceback):
self.queue.put(self.obj)
[docs]
class LocalEngine(threading.Thread, EngineBase):
"""Simple engine to execute running tasks locally.
CPU and GPU are always checked, all other requirements only if given during initialisation.
"""
def __init__(self, cpus=1, gpus=0, available_gpus="", **kwargs):
"""The parameter cpus and gpus are kept for backwards compatibility, if cpu and gpu are given
they will overwrite the values of cpus and gpus.
:param int cpus: number of CPUs that can be used
:param int gpus: number of GPUs that can be used
:param **kwargs: other consumable resources e.g. mem (in GB)
"""
# resources
self.lock = threading.Lock()
# There is a mismatch between the requested resources names (?pus) and internal name (?pu)
# Keep old naming for backwards compatibility, but internal name will overwrite default values
self.max_resources = {"cpu": cpus, "gpu": gpus}
self.max_resources.update(kwargs)
self.free_resources = self.max_resources.copy()
assert gpus == 0 or len(available_gpus.split(",")) == gpus
self.available_gpus = {g: True for g in available_gpus.split(",") if g != ""}
self.running_subprocess = []
self.started = False
self.running = multiprocessing.Value("B", 1) # set to 0 to stop engine
threading.Thread.__init__(self)
def start_engine(self):
if self.started:
return
# control input
self.runnable_tasks = sync_object([])
self.waiting_tasks = sync_object({})
# control output / which tasks are currently running
self.running_tasks = sync_object({})
self.start()
self.started = True
def get_default_rqmt(self, task):
return LOCAL_DEFAULTS
[docs]
def start_task(self, task, selected_gpus):
"""
:param TaskQueueInstance task:
:param str selected_gpus:
:rtype: psutil.Process
"""
# Start new task
call = task.call[:]
env = os.environ.copy()
env["CUDA_VISIBLE_DEVICES"] = selected_gpus
sp = subprocess.Popen(call, env=env, start_new_session=True)
self.running_subprocess.append(sp)
pid = sp.pid
process = psutil.Process(pid)
return process
def check_finished_tasks(self):
with self.lock:
logging.debug("Check for finished subprocesses")
still_running = []
# Let jobs started in this process finish
for p in self.running_subprocess:
if p.poll() is None:
still_running.append(p)
self.running_subprocess = still_running
logging.debug("Check for finished tasks")
with self.running_tasks as running_tasks:
for process, task, _ in list(running_tasks.values()):
logging.debug(
"Task state: %s %i PID: %s %s"
% (task.task_name, task.task_id, process.pid, process.is_running())
)
if not process.is_running():
self.task_done(running_tasks, task)
def enough_free_resources(self, rqmt):
for key, max_available in self.max_resources.items():
free = self.free_resources[key]
requested = rqmt.get(key, 0)
if max_available < requested:
logging.warning(
"Requested resources are higher than maximal available resources\n"
"Available resources %s\n"
"Requested resources %s" % (self.max_resources, rqmt)
)
if free < requested:
return False
return True
def reserve_resources(self, rqmt, selected_devices=None):
self.free_resources = {key: free - rqmt.get(key, 0) for key, free in self.free_resources.items()}
for key, max_available in self.max_resources.items():
free = self.free_resources[key]
assert 0 <= free <= max_available
# reserve specific GPUs
if selected_devices is not None:
if selected_devices != "":
for name in selected_devices.split(","):
self.available_gpus[name] = False
else:
selected_devices = []
for name, free in self.available_gpus.items():
if len(selected_devices) == rqmt.get("gpu", 0):
break
if free:
self.available_gpus[name] = False
selected_devices.append(name)
assert len(selected_devices) == rqmt.get("gpu", 0)
return ",".join(selected_devices)
def release_resources(self, rqmt, selected_devices):
self.free_resources = {key: free + rqmt.get(key, 0) for key, free in self.free_resources.items()}
for key, max_available in self.max_resources.items():
free = self.free_resources[key]
assert 0 <= free <= max_available
if selected_devices != "":
for name in selected_devices.split(","):
self.available_gpus[name] = True
@tools.default_handle_exception_interrupt_main_thread
def run(self):
try:
while self.running.value:
self.check_finished_tasks()
wait = True # wait if no new job is started
# check runnable tasks
logging.debug("Check for new tasks (Free resources %s)" % self.free_resources)
# get object for synchronisation
with self.waiting_tasks as waiting_tasks, self.runnable_tasks as runnable_tasks:
runnable_task_idx = 0
# run next task if the capacities are available
while runnable_task_idx < len(runnable_tasks):
next_task = runnable_tasks[runnable_task_idx]
with self.running_tasks as running_tasks:
# if enough free resources => run job
if self.enough_free_resources(next_task.rqmt):
selected_gpus = self.reserve_resources(next_task.rqmt)
name = (next_task.name, next_task.task_id)
logging.debug("Start task %s" % str(name))
try:
del waiting_tasks[name]
except KeyError:
logging.warning(
"Could not delete %s from waiting queue. "
"This should not happen! Probably a bug..." % str(name)
)
# Start job:
process = self.start_task(next_task, selected_gpus)
running_tasks[name] = (process, next_task, selected_gpus)
del runnable_tasks[runnable_task_idx]
wait = False
else:
runnable_task_idx += 1
if wait:
# check only once per second for new jobs
# if no job has been started
time.sleep(1)
except KeyboardInterrupt:
# KeyboardInterrupt is handled in manager
pass
def stop_engine(self):
logging.debug("Got stop signal")
self.running.value = False
self.check_finished_tasks()
with self.running_tasks as running_tasks:
if len(running_tasks) > 0:
logging.warning("Still running tasks in local engine: %i" % len(running_tasks))
for (task_name, task_id), value in running_tasks.items():
logging.warning(" Running task: %s %i PID: %s" % (task_name, task_id, value[0].pid))
[docs]
def submit_call(self, call, logpath, rqmt, name, task_name, task_ids):
if rqmt.get("multi_node_slots", None):
raise NotImplementedError("Multi-node slots are not implemented for local engine")
# run one thread for each task id
for task_id in task_ids:
call_with_id = call[:] + [str(task_id)]
call_with_id += ["--redirect_output"]
task = TaskQueueInstance(call_with_id, logpath, rqmt, name, task_name, task_id)
with self.waiting_tasks as waiting_tasks, self.runnable_tasks as runnable_tasks:
runnable_tasks.append(task)
waiting_tasks[(name, task_id)] = task
return ENGINE_NAME, socket.gethostname()
def task_done(self, running_tasks, task):
name = (task.name, task.task_id)
selected_gpus = running_tasks[name][2]
logging.debug("Task Done %s" % str(name))
try:
del running_tasks[name]
except KeyError:
logging.warning(
"Could not delete %s from waiting queue. This should not happen! Probably a bug..." % str(name)
)
# release used resources
self.release_resources(task.rqmt, selected_gpus)
def task_state(self, task, task_id):
name = task.task_name()
task_name = (name, task_id)
# Check waiting tasks
with self.waiting_tasks as waiting_tasks:
if task_name in waiting_tasks:
return gs.STATE_QUEUE
# Check running tasks
with self.running_tasks as running_tasks:
if task_name in running_tasks:
return gs.STATE_RUNNING
if self.try_to_recover_task(task, task_id):
return gs.STATE_RUNNING
else:
return gs.STATE_UNKNOWN
def reset_cache(self):
# the local engine needs no cache
pass
def try_to_recover_task(self, task, task_id):
process_logging_filename = task.get_process_logging_path(task_id)
if not os.path.isfile(process_logging_filename):
# Nothing to do here
return False
# Check if task is already running
try:
with open(process_logging_filename) as f:
d = literal_eval(f.read())
pid = d["pid"]
process = psutil.Process(pid)
# Recover instance
rqmt = d["requested_resources"]
logpath = os.path.relpath(task.path(gs.JOB_LOG_ENGINE))
call_with_id = task.get_worker_call(task_id)
name = task.task_name()
task_name = task.name()
task_instance = TaskQueueInstance(call_with_id, logpath, rqmt, name, task_name, task_id)
if call_with_id[1:] != process.cmdline()[1:]:
logging.warning("Job %s changed, recovering it anyway." % name)
logging.debug("Job changed: %i %s %s" % (pid, process.cmdline(), task_instance.call))
with self.running_tasks as running_tasks:
name = (task_instance.name, task_id)
used_gpus = process.environ().get("CUDA_VISIBLE_DEVICES", "")
running_tasks[name] = (process, task_instance, used_gpus)
self.reserve_resources(rqmt, selected_devices=used_gpus)
logging.debug("Loaded job: %i %s %s" % (pid, process.cmdline(), task_instance.call))
return True
except Exception as e:
logging.debug("Failed to load running job: %s" % e)
return False
[docs]
def get_task_id(self, task_id):
if task_id is not None:
# task id passed via argument
return task_id
logging.warning(
"Job in local engine started without task_id, "
"worker is probably started manualy. Continue with task_id=1"
)
return 1