import logging
import multiprocessing
import threading
import queue
import os
import socket
import subprocess
from typing import List
import psutil
import time
from sisyphus.engine import EngineBase
from sisyphus import tools
import sisyphus.global_settings as gs
from collections import namedtuple
from ast import literal_eval
# we are only using cpu so far anyway...
ENGINE_NAME = "local"
LOCAL_DEFAULTS = {
"cpu": 1,
"mem": 1,
"time": 1,
}
[docs]
def run_task(call, logpath):
"""Simple function to run task"""
with open(logpath, "a") as logfile:
subprocess.check_call(call, stdout=logfile, stderr=logfile)
# name is the unique name combined of job id and task name, task_name is only the task name
TaskQueueInstance = namedtuple("TaskQueueInstance", ["call", "logpath", "rqmt", "name", "task_name", "task_id"])
def get_process_logging_path(task_path, task_name, task_id):
path = os.path.join(task_path, gs.PLOGGING_FILE)
path = "%s.%s.%i" % (path, task_name, task_id)
return path
[docs]
class sync_object(object):
"""Object to be used by the with statement to sync an object via queue
e.g.::
self.sobj = sync_object({})
with self.sobj as sobj:
sobj[7] = 9
# other process
with self.sobj as sobj:
assert( sobj == { 7: 9 } )
"""
def __init__(self, obj):
self.queue = queue.Queue(1)
self.obj = obj
self.queue.put(obj)
def __enter__(self):
self.obj = self.queue.get()
return self.obj
def __exit__(self, type, value, traceback):
self.queue.put(self.obj)
[docs]
class LocalEngine(threading.Thread, EngineBase):
"""Simple engine to execute running tasks locally.
CPU and GPU are always checked, all other requirements only if given during initialisation.
"""
def __init__(self, cpus=1, gpus=0, available_gpus="", **kwargs):
"""The parameter cpus and gpus are kept for backwards compatibility, if cpu and gpu are given
they will overwrite the values of cpus and gpus.
:param int cpus: number of CPUs that can be used
:param int gpus: number of GPUs that can be used
:param **kwargs: other consumable resources e.g. mem (in GB)
"""
# resources
self.lock = threading.Lock()
# There is a mismatch between the requested resources names (?pus) and internal name (?pu)
# Keep old naming for backwards compatibility, but internal name will overwrite default values
self.max_resources = {"cpu": cpus, "gpu": gpus}
self.max_resources.update(kwargs)
self.free_resources = self.max_resources.copy()
assert gpus == 0 or len(available_gpus.split(",")) == gpus
self.available_gpus = {g: True for g in available_gpus.split(",") if g != ""}
self.running_subprocess = []
self.started = False
self.running = multiprocessing.Value("B", 1) # set to 0 to stop engine
threading.Thread.__init__(self)
def start_engine(self):
if self.started:
return
# control input
self.runnable_tasks = sync_object([])
self.waiting_tasks = sync_object({})
# control output / which tasks are currently running
self.running_tasks = sync_object({})
self.start()
self.started = True
def get_default_rqmt(self, task):
return LOCAL_DEFAULTS
[docs]
def start_task(self, task, selected_gpus):
"""
:param TaskQueueInstance task:
:param str selected_gpus:
:rtype: psutil.Process
"""
# Start new task
call = task.call[:]
env = os.environ.copy()
env["CUDA_VISIBLE_DEVICES"] = selected_gpus
sp = subprocess.Popen(call, env=env, start_new_session=True)
self.running_subprocess.append(sp)
pid = sp.pid
process = psutil.Process(pid)
return process
def check_finished_tasks(self):
with self.lock:
logging.debug("Check for finished subprocesses")
still_running = []
# Let jobs started in this process finish
for p in self.running_subprocess:
if p.poll() is None:
still_running.append(p)
self.running_subprocess = still_running
logging.debug("Check for finished tasks")
with self.running_tasks as running_tasks:
for process, task, _ in list(running_tasks.values()):
logging.debug(
"Task state: %s %i PID: %s %s"
% (task.task_name, task.task_id, process.pid, process.is_running())
)
if not process.is_running():
self.task_done(running_tasks, task)
def enough_free_resources(self, rqmt):
for key, max_available in self.max_resources.items():
free = self.free_resources[key]
requested = rqmt.get(key, 0)
if max_available < requested:
logging.warning(
"Requested resources are higher than maximal available resources\n"
"Available resources %s\n"
"Requested resources %s" % (self.max_resources, rqmt)
)
if free < requested:
return False
return True
def reserve_resources(self, rqmt, selected_devices=None):
self.free_resources = {key: free - rqmt.get(key, 0) for key, free in self.free_resources.items()}
for key, max_available in self.max_resources.items():
free = self.free_resources[key]
assert 0 <= free <= max_available
# reserve specific GPUs
if selected_devices is not None:
if selected_devices != "":
for name in selected_devices.split(","):
self.available_gpus[name] = False
else:
selected_devices = []
for name, free in self.available_gpus.items():
if len(selected_devices) == rqmt.get("gpu", 0):
break
if free:
self.available_gpus[name] = False
selected_devices.append(name)
assert len(selected_devices) == rqmt.get("gpu", 0)
return ",".join(selected_devices)
def release_resources(self, rqmt, selected_devices):
self.free_resources = {key: free + rqmt.get(key, 0) for key, free in self.free_resources.items()}
for key, max_available in self.max_resources.items():
free = self.free_resources[key]
assert 0 <= free <= max_available
if selected_devices != "":
for name in selected_devices.split(","):
self.available_gpus[name] = True
@tools.default_handle_exception_interrupt_main_thread
def run(self):
try:
while self.running.value:
self.check_finished_tasks()
wait = True # wait if no new job is started
# check runnable tasks
logging.debug("Check for new tasks (Free resources %s)" % self.free_resources)
# get object for synchronisation
with self.waiting_tasks as waiting_tasks, self.runnable_tasks as runnable_tasks:
runnable_task_idx = 0
# run next task if the capacities are available
while runnable_task_idx < len(runnable_tasks):
next_task = runnable_tasks[runnable_task_idx]
with self.running_tasks as running_tasks:
# if enough free resources => run job
if self.enough_free_resources(next_task.rqmt):
selected_gpus = self.reserve_resources(next_task.rqmt)
name = (next_task.name, next_task.task_id)
logging.debug("Start task %s" % str(name))
try:
del waiting_tasks[name]
except KeyError:
logging.warning(
"Could not delete %s from waiting queue. "
"This should not happen! Probably a bug..." % str(name)
)
# Start job:
process = self.start_task(next_task, selected_gpus)
running_tasks[name] = (process, next_task, selected_gpus)
del runnable_tasks[runnable_task_idx]
wait = False
else:
runnable_task_idx += 1
if wait:
# check only once per second for new jobs
# if no job has been started
time.sleep(1)
except KeyboardInterrupt:
# KeyboardInterrupt is handled in manager
pass
def stop_engine(self):
logging.debug("Got stop signal")
self.running.value = False
self.check_finished_tasks()
with self.running_tasks as running_tasks:
if len(running_tasks) > 0:
logging.warning("Still running tasks in local engine: %i" % len(running_tasks))
for (task_name, task_id), value in running_tasks.items():
logging.warning(" Running task: %s %i PID: %s" % (task_name, task_id, value[0].pid))
[docs]
def submit_call(self, call, logpath, rqmt, name, task_name, task_ids):
if rqmt.get("multi_node_slots", None):
raise NotImplementedError("Multi-node slots are not implemented for local engine")
# run one thread for each task id
for task_id in task_ids:
call_with_id = call[:] + [str(task_id)]
call_with_id += ["--redirect_output"]
task = TaskQueueInstance(call_with_id, logpath, rqmt, name, task_name, task_id)
with self.waiting_tasks as waiting_tasks, self.runnable_tasks as runnable_tasks:
runnable_tasks.append(task)
waiting_tasks[(name, task_id)] = task
return ENGINE_NAME, socket.gethostname()
def task_done(self, running_tasks, task):
name = (task.name, task.task_id)
selected_gpus = running_tasks[name][2]
logging.debug("Task Done %s" % str(name))
try:
del running_tasks[name]
except KeyError:
logging.warning(
"Could not delete %s from waiting queue. This should not happen! Probably a bug..." % str(name)
)
# release used resources
self.release_resources(task.rqmt, selected_gpus)
def task_state(self, task, task_id):
name = task.task_name()
task_name = (name, task_id)
# Check waiting tasks
with self.waiting_tasks as waiting_tasks:
if task_name in waiting_tasks:
return gs.STATE_QUEUE
# Check running tasks
with self.running_tasks as running_tasks:
if task_name in running_tasks:
return gs.STATE_RUNNING
if self.try_to_recover_task(task, task_id):
return gs.STATE_RUNNING
else:
return gs.STATE_UNKNOWN
def reset_cache(self):
# the local engine needs no cache
pass
def try_to_recover_task(self, task, task_id):
process_logging_filename = task.get_process_logging_path(task_id)
if not os.path.isfile(process_logging_filename):
# Nothing to do here
return False
# Check if task is already running
try:
with open(process_logging_filename) as f:
d = literal_eval(f.read())
pid = d["pid"]
process = psutil.Process(pid)
# Recover instance
rqmt = d["requested_resources"]
logpath = os.path.relpath(task.path(gs.JOB_LOG_ENGINE))
call_with_id = task.get_worker_call(task_id)
name = task.task_name()
task_name = task.name()
task_instance = TaskQueueInstance(call_with_id, logpath, rqmt, name, task_name, task_id)
if call_with_id[1:] != process.cmdline()[1:]:
logging.warning("Job %s changed, recovering it anyway." % name)
logging.debug("Job changed: %i %s %s" % (pid, process.cmdline(), task_instance.call))
with self.running_tasks as running_tasks:
name = (task_instance.name, task_id)
used_gpus = process.environ().get("CUDA_VISIBLE_DEVICES", "")
running_tasks[name] = (process, task_instance, used_gpus)
self.reserve_resources(rqmt, selected_devices=used_gpus)
logging.debug("Loaded job: %i %s %s" % (pid, process.cmdline(), task_instance.call))
return True
except Exception as e:
logging.debug("Failed to load running job: %s" % e)
return False
[docs]
def get_task_id(self, task_id):
if task_id is not None:
# task id passed via argument
return task_id
logging.warning(
"Job in local engine started without task_id, worker is probably started manualy. Continue with task_id=1"
)
return 1
[docs]
def get_job_node_hostnames(self) -> List[str]:
"""
:return: we are running on `[socket.gethostname()]` since this is a local engine.
"""
try:
return [socket.gethostname()]
except socket.error:
return ["localhost"]