270 lines
8.7 KiB
Python
270 lines
8.7 KiB
Python
# Lint as: python2, python3
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
|
|
|
|
import sys, os, signal, time, six.moves.cPickle, logging
|
|
|
|
from autotest_lib.client.common_lib import error, utils
|
|
from autotest_lib.client.common_lib.cros import retry
|
|
from six.moves import zip
|
|
|
|
|
|
# entry points that use subcommand must set this to their logging manager
|
|
# to get log redirection for subcommands
|
|
logging_manager_object = None
|
|
|
|
|
|
def parallel(tasklist, timeout=None, return_results=False):
|
|
"""
|
|
Run a set of predefined subcommands in parallel.
|
|
|
|
@param tasklist: A list of subcommand instances to execute.
|
|
@param timeout: Number of seconds after which the commands should timeout.
|
|
@param return_results: If True instead of an AutoServError being raised
|
|
on any error a list of the results|exceptions from the tasks is
|
|
returned. [default: False]
|
|
"""
|
|
run_error = False
|
|
for task in tasklist:
|
|
task.fork_start()
|
|
|
|
remaining_timeout = None
|
|
if timeout:
|
|
endtime = time.time() + timeout
|
|
|
|
results = []
|
|
for task in tasklist:
|
|
if timeout:
|
|
remaining_timeout = max(endtime - time.time(), 1)
|
|
try:
|
|
status = task.fork_waitfor(timeout=remaining_timeout)
|
|
except error.AutoservSubcommandError:
|
|
run_error = True
|
|
else:
|
|
if status != 0:
|
|
run_error = True
|
|
|
|
results.append(six.moves.cPickle.load(task.result_pickle))
|
|
task.result_pickle.close()
|
|
|
|
if return_results:
|
|
return results
|
|
elif run_error:
|
|
message = 'One or more subcommands failed:\n'
|
|
for task, result in zip(tasklist, results):
|
|
message += 'task: %s returned/raised: %r\n' % (task, result)
|
|
raise error.AutoservError(message)
|
|
|
|
|
|
def parallel_simple(function, arglist, subdir_name_constructor=lambda x: str(x),
|
|
log=True, timeout=None, return_results=False):
|
|
"""
|
|
Each element in the arglist used to create a subcommand object,
|
|
where that arg is used both as a subdir name, and a single argument
|
|
to pass to "function".
|
|
|
|
We create a subcommand object for each element in the list,
|
|
then execute those subcommand objects in parallel.
|
|
|
|
NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used.
|
|
|
|
@param function: A callable to run in parallel once per arg in arglist.
|
|
@param arglist: A list of arguments to be used one per subcommand
|
|
@param subdir_name_constructor: A function that returns a name for the
|
|
result sub-directory created per subcommand.
|
|
Signature is:
|
|
subdir_name_constructor(arg)
|
|
where arg is the argument passed to function.
|
|
@param log: If True, output will be written to output in a subdirectory
|
|
named after each subcommand's arg.
|
|
@param timeout: Number of seconds after which the commands should timeout.
|
|
@param return_results: If True instead of an AutoServError being raised
|
|
on any error a list of the results|exceptions from the function
|
|
called on each arg is returned. [default: False]
|
|
|
|
@returns None or a list of results/exceptions.
|
|
"""
|
|
if not arglist:
|
|
logging.warning('parallel_simple was called with an empty arglist, '
|
|
'did you forget to pass in a list of machines?')
|
|
|
|
# Bypass the multithreading if only one machine.
|
|
if len(arglist) == 1:
|
|
arg = arglist[0]
|
|
if return_results:
|
|
try:
|
|
result = function(arg)
|
|
except Exception as e:
|
|
return [e]
|
|
return [result]
|
|
else:
|
|
function(arg)
|
|
return
|
|
|
|
subcommands = []
|
|
for arg in arglist:
|
|
args = [arg]
|
|
subdir = subdir_name_constructor(arg) if log else None
|
|
subcommands.append(subcommand(function, args, subdir))
|
|
return parallel(subcommands, timeout, return_results=return_results)
|
|
|
|
|
|
class subcommand(object):
|
|
fork_hooks, join_hooks = [], []
|
|
|
|
def __init__(self, func, args, subdir = None):
|
|
# func(args) - the subcommand to run
|
|
# subdir - the subdirectory to log results in
|
|
if subdir:
|
|
self.subdir = os.path.abspath(subdir)
|
|
if not os.path.exists(self.subdir):
|
|
os.mkdir(self.subdir)
|
|
self.debug = os.path.join(self.subdir, 'debug')
|
|
if not os.path.exists(self.debug):
|
|
os.mkdir(self.debug)
|
|
else:
|
|
self.subdir = None
|
|
self.debug = None
|
|
|
|
self.func = func
|
|
self.args = args
|
|
self.pid = None
|
|
self.returncode = None
|
|
|
|
|
|
def __str__(self):
|
|
return str('subcommand(func=%s, args=%s, subdir=%s)' %
|
|
(self.func, self.args, self.subdir))
|
|
|
|
|
|
@classmethod
|
|
def register_fork_hook(cls, hook):
|
|
""" Register a function to be called from the child process after
|
|
forking. """
|
|
cls.fork_hooks.append(hook)
|
|
|
|
|
|
@classmethod
|
|
def register_join_hook(cls, hook):
|
|
""" Register a function to be called when from the child process
|
|
just before the child process terminates (joins to the parent). """
|
|
cls.join_hooks.append(hook)
|
|
|
|
|
|
def redirect_output(self):
|
|
if self.subdir and logging_manager_object:
|
|
tag = os.path.basename(self.subdir)
|
|
logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag)
|
|
|
|
|
|
def fork_start(self):
|
|
sys.stdout.flush()
|
|
sys.stderr.flush()
|
|
r, w = os.pipe()
|
|
self.returncode = None
|
|
self.pid = os.fork()
|
|
|
|
if self.pid: # I am the parent
|
|
os.close(w)
|
|
self.result_pickle = os.fdopen(r, 'r')
|
|
return
|
|
else:
|
|
os.close(r)
|
|
|
|
# We are the child from this point on. Never return.
|
|
signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
|
|
if self.subdir:
|
|
os.chdir(self.subdir)
|
|
self.redirect_output()
|
|
|
|
try:
|
|
for hook in self.fork_hooks:
|
|
hook(self)
|
|
result = self.func(*self.args)
|
|
os.write(w, six.moves.cPickle.dumps(result, six.moves.cPickle.HIGHEST_PROTOCOL))
|
|
exit_code = 0
|
|
except Exception as e:
|
|
logging.exception('function failed')
|
|
exit_code = 1
|
|
os.write(w, six.moves.cPickle.dumps(e, six.moves.cPickle.HIGHEST_PROTOCOL))
|
|
|
|
os.close(w)
|
|
|
|
try:
|
|
for hook in self.join_hooks:
|
|
hook(self)
|
|
finally:
|
|
sys.stdout.flush()
|
|
sys.stderr.flush()
|
|
os._exit(exit_code)
|
|
|
|
|
|
def _handle_exitstatus(self, sts):
|
|
"""
|
|
This is partially borrowed from subprocess.Popen.
|
|
"""
|
|
if os.WIFSIGNALED(sts):
|
|
self.returncode = -os.WTERMSIG(sts)
|
|
elif os.WIFEXITED(sts):
|
|
self.returncode = os.WEXITSTATUS(sts)
|
|
else:
|
|
# Should never happen
|
|
raise RuntimeError("Unknown child exit status!")
|
|
|
|
if self.returncode != 0:
|
|
print("subcommand failed pid %d" % self.pid)
|
|
print("%s" % (self.func,))
|
|
print("rc=%d" % self.returncode)
|
|
print()
|
|
if self.debug:
|
|
stderr_file = os.path.join(self.debug, 'autoserv.stderr')
|
|
if os.path.exists(stderr_file):
|
|
for line in open(stderr_file).readlines():
|
|
print(line, end=' ')
|
|
print("\n--------------------------------------------\n")
|
|
raise error.AutoservSubcommandError(self.func, self.returncode)
|
|
|
|
|
|
def poll(self):
|
|
"""
|
|
This is borrowed from subprocess.Popen.
|
|
"""
|
|
if self.returncode is None:
|
|
try:
|
|
pid, sts = os.waitpid(self.pid, os.WNOHANG)
|
|
if pid == self.pid:
|
|
self._handle_exitstatus(sts)
|
|
except os.error:
|
|
pass
|
|
return self.returncode
|
|
|
|
|
|
def wait(self):
|
|
"""
|
|
This is borrowed from subprocess.Popen.
|
|
"""
|
|
if self.returncode is None:
|
|
pid, sts = os.waitpid(self.pid, 0)
|
|
self._handle_exitstatus(sts)
|
|
return self.returncode
|
|
|
|
|
|
def fork_waitfor(self, timeout=None):
|
|
if not timeout:
|
|
return self.wait()
|
|
else:
|
|
_, result = retry.timeout(self.wait, timeout_sec=timeout)
|
|
|
|
if result is None:
|
|
utils.nuke_pid(self.pid)
|
|
print("subcommand failed pid %d" % self.pid)
|
|
print("%s" % (self.func,))
|
|
print("timeout after %ds" % timeout)
|
|
print()
|
|
result = self.wait()
|
|
|
|
return result
|