529 lines
20 KiB
Python
Executable File
529 lines
20 KiB
Python
Executable File
#!/usr/bin/python2
|
|
#
|
|
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""Tool to validate code in prod branch before pushing to lab.
|
|
|
|
The script runs push_to_prod suite to verify code in prod branch is ready to be
|
|
pushed. Link to design document:
|
|
https://docs.google.com/a/google.com/document/d/1JMz0xS3fZRSHMpFkkKAL_rxsdbNZomhHbC3B8L71uuI/edit
|
|
|
|
To verify if prod branch can be pushed to lab, run following command in
|
|
chromeos-staging-master2.hot server:
|
|
/usr/local/autotest/site_utils/test_push.py -e someone@company.com
|
|
|
|
The script uses latest gandof stable build as test build by default.
|
|
|
|
"""
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
import argparse
|
|
import ast
|
|
import datetime
|
|
import getpass
|
|
import multiprocessing
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from six.moves import urllib
|
|
|
|
import common
|
|
try:
|
|
from autotest_lib.frontend import setup_django_environment
|
|
from autotest_lib.frontend.afe import models
|
|
from autotest_lib.frontend.afe import rpc_utils
|
|
except ImportError:
|
|
# Unittest may not have Django database configured and will fail to import.
|
|
pass
|
|
from autotest_lib.client.common_lib import global_config
|
|
from autotest_lib.client.common_lib import priorities
|
|
from autotest_lib.client.common_lib.cros import retry
|
|
from autotest_lib.frontend.afe import rpc_client_lib
|
|
from autotest_lib.server import constants
|
|
from autotest_lib.server import site_utils
|
|
from autotest_lib.server import utils
|
|
from autotest_lib.server.cros import provision
|
|
from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
|
|
from autotest_lib.site_utils import test_push_common
|
|
|
|
AUTOTEST_DIR=common.autotest_dir
|
|
CONFIG = global_config.global_config
|
|
|
|
AFE = frontend_wrappers.RetryingAFE(timeout_min=0.5, delay_sec=2)
|
|
TKO = frontend_wrappers.RetryingTKO(timeout_min=0.1, delay_sec=10)
|
|
|
|
MAIL_FROM = 'chromeos-test@google.com'
|
|
BUILD_REGEX = 'R[\d]+-[\d]+\.[\d]+\.[\d]+'
|
|
RUN_SUITE_COMMAND = 'run_suite.py'
|
|
PUSH_TO_PROD_SUITE = 'push_to_prod'
|
|
DUMMY_SUITE = 'dummy'
|
|
DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB = 30
|
|
IMAGE_BUCKET = CONFIG.get_config_value('CROS', 'image_storage_server')
|
|
DEFAULT_NUM_DUTS = (
|
|
('gandof', 4),
|
|
('quawks', 2),
|
|
)
|
|
|
|
SUITE_JOB_START_INFO_REGEX = ('^.*Created suite job:.*'
|
|
'tab_id=view_job&object_id=(\d+)$')
|
|
|
|
URL_HOST = CONFIG.get_config_value('SERVER', 'hostname', type=str)
|
|
URL_PATTERN = CONFIG.get_config_value('CROS', 'log_url_pattern', type=str)
|
|
|
|
# Some test could be extra / missing or have mismatched results for various
|
|
# reasons. Add such test in this list and explain the reason.
|
|
_IGNORED_TESTS = [
|
|
# test_push uses a stable image build to test, which is quite behind ToT.
|
|
# The following expectations are correct at ToT, but need to be ignored
|
|
# until stable image is recent enough.
|
|
|
|
# TODO(pprabhu): Remove once R70 is stable.
|
|
'dummy_Fail.RetrySuccess',
|
|
'dummy_Fail.RetryFail',
|
|
]
|
|
|
|
# Multiprocessing proxy objects that are used to share data between background
|
|
# suite-running processes and main process. The multiprocessing-compatible
|
|
# versions are initialized in _main.
|
|
_run_suite_output = []
|
|
_all_suite_ids = []
|
|
|
|
DEFAULT_SERVICE_RESPAWN_LIMIT = 2
|
|
|
|
|
|
class TestPushException(Exception):
|
|
"""Exception to be raised when the test to push to prod failed."""
|
|
pass
|
|
|
|
@retry.retry(TestPushException, timeout_min=5, delay_sec=30)
|
|
def check_dut_inventory(required_num_duts, pool):
|
|
"""Check DUT inventory for each board in the pool specified..
|
|
|
|
@param required_num_duts: a dict specifying the number of DUT each platform
|
|
requires in order to finish push tests.
|
|
@param pool: the pool used by test_push.
|
|
@raise TestPushException: if number of DUTs are less than the requirement.
|
|
"""
|
|
print('Checking DUT inventory...')
|
|
pool_label = constants.Labels.POOL_PREFIX + pool
|
|
hosts = AFE.run('get_hosts', status='Ready', locked=False)
|
|
hosts = [h for h in hosts if pool_label in h.get('labels', [])]
|
|
platforms = [host['platform'] for host in hosts]
|
|
current_inventory = {p : platforms.count(p) for p in platforms}
|
|
error_msg = ''
|
|
for platform, req_num in required_num_duts.items():
|
|
curr_num = current_inventory.get(platform, 0)
|
|
if curr_num < req_num:
|
|
error_msg += ('\nRequire %d %s DUTs in pool: %s, only %d are Ready'
|
|
' now' % (req_num, platform, pool, curr_num))
|
|
if error_msg:
|
|
raise TestPushException('Not enough DUTs to run push tests. %s' %
|
|
error_msg)
|
|
|
|
|
|
def powerwash_dut_to_test_repair(hostname, timeout):
|
|
"""Powerwash dut to test repair workflow.
|
|
|
|
@param hostname: hostname of the dut.
|
|
@param timeout: seconds of the powerwash test to hit timeout.
|
|
@raise TestPushException: if DUT fail to run the test.
|
|
"""
|
|
t = models.Test.objects.get(name='platform_Powerwash')
|
|
c = utils.read_file(os.path.join(AUTOTEST_DIR, t.path))
|
|
job_id = rpc_utils.create_job_common(
|
|
'powerwash', priority=priorities.Priority.SUPER,
|
|
control_type='Server', control_file=c, hosts=[hostname])
|
|
|
|
end = time.time() + timeout
|
|
while not TKO.get_job_test_statuses_from_db(job_id):
|
|
if time.time() >= end:
|
|
AFE.run('abort_host_queue_entries', job=job_id)
|
|
raise TestPushException(
|
|
'Powerwash test on %s timeout after %ds, abort it.' %
|
|
(hostname, timeout))
|
|
time.sleep(10)
|
|
verify_test_results(job_id,
|
|
test_push_common.EXPECTED_TEST_RESULTS_POWERWASH)
|
|
# Kick off verify, verify will fail and a repair should be triggered.
|
|
AFE.reverify_hosts(hostnames=[hostname])
|
|
|
|
|
|
def reverify_all_push_duts():
|
|
"""Reverify all the push DUTs."""
|
|
print('Reverifying all DUTs.')
|
|
hosts = [h.hostname for h in AFE.get_hosts()]
|
|
AFE.reverify_hosts(hostnames=hosts)
|
|
|
|
|
|
def parse_arguments(argv):
|
|
"""Parse arguments for test_push tool.
|
|
|
|
@param argv Argument vector, as for `sys.argv`, including the
|
|
command name in `argv[0]`.
|
|
@return: Parsed arguments.
|
|
|
|
"""
|
|
parser = argparse.ArgumentParser(prog=argv[0])
|
|
parser.add_argument('-b', '--board', dest='board', default='gandof',
|
|
help='Default is gandof.')
|
|
parser.add_argument('-sb', '--shard_board', dest='shard_board',
|
|
default='quawks',
|
|
help='Default is quawks.')
|
|
parser.add_argument('-i', '--build', dest='build', default=None,
|
|
help='Default is the latest stale build of given '
|
|
'board. Must be a stable build, otherwise AU test '
|
|
'will fail. (ex: gandolf-release/R54-8743.25.0)')
|
|
parser.add_argument('-si', '--shard_build', dest='shard_build', default=None,
|
|
help='Default is the latest stable build of given '
|
|
'board. Must be a stable build, otherwise AU test '
|
|
'will fail.')
|
|
parser.add_argument('-p', '--pool', dest='pool', default='bvt')
|
|
parser.add_argument('-t', '--timeout_min', dest='timeout_min', type=int,
|
|
default=DEFAULT_TIMEOUT_MIN_FOR_SUITE_JOB,
|
|
help='Time in mins to wait before abort the jobs we '
|
|
'are waiting on. Only for the asynchronous suites '
|
|
'triggered by create_and_return flag.')
|
|
parser.add_argument('-ud', '--num_duts', dest='num_duts',
|
|
default=dict(DEFAULT_NUM_DUTS),
|
|
type=ast.literal_eval,
|
|
help="Python dict literal that specifies the required"
|
|
" number of DUTs for each board. E.g {'gandof':4}")
|
|
parser.add_argument('-c', '--continue_on_failure', action='store_true',
|
|
dest='continue_on_failure',
|
|
help='All tests continue to run when there is failure')
|
|
parser.add_argument('-sl', '--service_respawn_limit', type=int,
|
|
default=DEFAULT_SERVICE_RESPAWN_LIMIT,
|
|
help='If a service crashes more than this, the test '
|
|
'push is considered failed.')
|
|
|
|
arguments = parser.parse_args(argv[1:])
|
|
|
|
# Get latest stable build as default build.
|
|
version_map = AFE.get_stable_version_map(AFE.CROS_IMAGE_TYPE)
|
|
if not arguments.build:
|
|
arguments.build = version_map.get_image_name(arguments.board)
|
|
if not arguments.shard_build:
|
|
arguments.shard_build = version_map.get_image_name(
|
|
arguments.shard_board)
|
|
return arguments
|
|
|
|
|
|
def do_run_suite(suite_name, arguments, use_shard=False,
|
|
create_and_return=False):
|
|
"""Call run_suite to run a suite job, and return the suite job id.
|
|
|
|
The script waits the suite job to finish before returning the suite job id.
|
|
Also it will echo the run_suite output to stdout.
|
|
|
|
@param suite_name: Name of a suite, e.g., dummy.
|
|
@param arguments: Arguments for run_suite command.
|
|
@param use_shard: If true, suite is scheduled for shard board.
|
|
@param create_and_return: If True, run_suite just creates the suite, print
|
|
the job id, then finish immediately.
|
|
|
|
@return: Suite job ID.
|
|
|
|
"""
|
|
if use_shard:
|
|
board = arguments.shard_board
|
|
build = arguments.shard_build
|
|
else:
|
|
board = arguments.board
|
|
build = arguments.build
|
|
|
|
# Remove cros-version label to force provision.
|
|
hosts = AFE.get_hosts(label=constants.Labels.BOARD_PREFIX+board,
|
|
locked=False)
|
|
for host in hosts:
|
|
labels_to_remove = [
|
|
l for l in host.labels
|
|
if l.startswith(provision.CROS_VERSION_PREFIX)]
|
|
if labels_to_remove:
|
|
AFE.run('host_remove_labels', id=host.id, labels=labels_to_remove)
|
|
|
|
# Test repair work flow on shards, powerwash test will timeout after 7m.
|
|
if use_shard and not create_and_return:
|
|
powerwash_dut_to_test_repair(host.hostname, timeout=420)
|
|
|
|
current_dir = os.path.dirname(os.path.realpath(__file__))
|
|
cmd = [os.path.join(current_dir, RUN_SUITE_COMMAND),
|
|
'-s', suite_name,
|
|
'-b', board,
|
|
'-i', build,
|
|
'-p', arguments.pool,
|
|
'--minimum_duts', str(arguments.num_duts[board])]
|
|
if create_and_return:
|
|
cmd += ['-c']
|
|
|
|
suite_job_id = None
|
|
|
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT)
|
|
|
|
while True:
|
|
line = proc.stdout.readline()
|
|
|
|
# Break when run_suite process completed.
|
|
if not line and proc.poll() != None:
|
|
break
|
|
print(line.rstrip())
|
|
_run_suite_output.append(line.rstrip())
|
|
|
|
if not suite_job_id:
|
|
m = re.match(SUITE_JOB_START_INFO_REGEX, line)
|
|
if m and m.group(1):
|
|
suite_job_id = int(m.group(1))
|
|
_all_suite_ids.append(suite_job_id)
|
|
|
|
if not suite_job_id:
|
|
raise TestPushException('Failed to retrieve suite job ID.')
|
|
|
|
# If create_and_return specified, wait for the suite to finish.
|
|
if create_and_return:
|
|
end = time.time() + arguments.timeout_min * 60
|
|
while not AFE.get_jobs(id=suite_job_id, finished=True):
|
|
if time.time() < end:
|
|
time.sleep(10)
|
|
else:
|
|
AFE.run('abort_host_queue_entries', job=suite_job_id)
|
|
raise TestPushException(
|
|
'Asynchronous suite triggered by create_and_return '
|
|
'flag has timed out after %d mins. Aborting it.' %
|
|
arguments.timeout_min)
|
|
|
|
print('Suite job %s is completed.' % suite_job_id)
|
|
return suite_job_id
|
|
|
|
|
|
def check_dut_image(build, suite_job_id):
|
|
"""Confirm all DUTs used for the suite are imaged to expected build.
|
|
|
|
@param build: Expected build to be imaged.
|
|
@param suite_job_id: job ID of the suite job.
|
|
@raise TestPushException: If a DUT does not have expected build imaged.
|
|
"""
|
|
print('Checking image installed in DUTs...')
|
|
job_ids = [job.id for job in
|
|
models.Job.objects.filter(parent_job_id=suite_job_id)]
|
|
hqes = [models.HostQueueEntry.objects.filter(job_id=job_id)[0]
|
|
for job_id in job_ids]
|
|
hostnames = set([hqe.host.hostname for hqe in hqes])
|
|
for hostname in hostnames:
|
|
found_build = site_utils.get_build_from_afe(hostname, AFE)
|
|
if found_build != build:
|
|
raise TestPushException('DUT is not imaged properly. Host %s has '
|
|
'build %s, while build %s is expected.' %
|
|
(hostname, found_build, build))
|
|
|
|
|
|
def test_suite(suite_name, expected_results, arguments, use_shard=False,
|
|
create_and_return=False):
|
|
"""Call run_suite to start a suite job and verify results.
|
|
|
|
@param suite_name: Name of a suite, e.g., dummy
|
|
@param expected_results: A dictionary of test name to test result.
|
|
@param arguments: Arguments for run_suite command.
|
|
@param use_shard: If true, suite is scheduled for shard board.
|
|
@param create_and_return: If True, run_suite just creates the suite, print
|
|
the job id, then finish immediately.
|
|
"""
|
|
suite_job_id = do_run_suite(suite_name, arguments, use_shard,
|
|
create_and_return)
|
|
|
|
# Confirm all DUTs used for the suite are imaged to expected build.
|
|
# hqe.host_id for jobs running in shard is not synced back to master db,
|
|
# therefore, skip verifying dut build for jobs running in shard.
|
|
build_expected = arguments.build
|
|
if not use_shard:
|
|
check_dut_image(build_expected, suite_job_id)
|
|
|
|
# Verify test results are the expected results.
|
|
verify_test_results(suite_job_id, expected_results)
|
|
|
|
|
|
def verify_test_results(job_id, expected_results):
|
|
"""Verify the test results with the expected results.
|
|
|
|
@param job_id: id of the running jobs. For suite job, it is suite_job_id.
|
|
@param expected_results: A dictionary of test name to test result.
|
|
@raise TestPushException: If verify fails.
|
|
"""
|
|
print('Comparing test results...')
|
|
test_views = site_utils.get_test_views_from_tko(job_id, TKO)
|
|
summary = test_push_common.summarize_push(test_views, expected_results,
|
|
_IGNORED_TESTS)
|
|
|
|
# Test link to log can be loaded.
|
|
job_name = '%s-%s' % (job_id, getpass.getuser())
|
|
log_link = URL_PATTERN % (rpc_client_lib.add_protocol(URL_HOST), job_name)
|
|
try:
|
|
urllib.request.urlopen(log_link).read()
|
|
except urllib.error.URLError:
|
|
summary.append('Failed to load page for link to log: %s.' % log_link)
|
|
|
|
if summary:
|
|
raise TestPushException('\n'.join(summary))
|
|
|
|
def test_suite_wrapper(queue, suite_name, expected_results, arguments,
|
|
use_shard=False, create_and_return=False):
|
|
"""Wrapper to call test_suite. Handle exception and pipe it to parent
|
|
process.
|
|
|
|
@param queue: Queue to save exception to be accessed by parent process.
|
|
@param suite_name: Name of a suite, e.g., dummy
|
|
@param expected_results: A dictionary of test name to test result.
|
|
@param arguments: Arguments for run_suite command.
|
|
@param use_shard: If true, suite is scheduled for shard board.
|
|
@param create_and_return: If True, run_suite just creates the suite, print
|
|
the job id, then finish immediately.
|
|
"""
|
|
try:
|
|
test_suite(suite_name, expected_results, arguments, use_shard,
|
|
create_and_return)
|
|
except Exception:
|
|
# Store the whole exc_info leads to a PicklingError.
|
|
except_type, except_value, tb = sys.exc_info()
|
|
queue.put((except_type, except_value, traceback.extract_tb(tb)))
|
|
|
|
|
|
def check_queue(queue):
|
|
"""Check the queue for any exception being raised.
|
|
|
|
@param queue: Queue used to store exception for parent process to access.
|
|
@raise: Any exception found in the queue.
|
|
"""
|
|
if queue.empty():
|
|
return
|
|
exc_info = queue.get()
|
|
# Raise the exception with original backtrace.
|
|
print('Original stack trace of the exception:\n%s' % exc_info[2])
|
|
raise exc_info[0](exc_info[1])
|
|
|
|
|
|
def _run_test_suites(arguments):
|
|
"""Run the actual tests that comprise the test_push."""
|
|
# Use daemon flag will kill child processes when parent process fails.
|
|
use_daemon = not arguments.continue_on_failure
|
|
queue = multiprocessing.Queue()
|
|
|
|
push_to_prod_suite = multiprocessing.Process(
|
|
target=test_suite_wrapper,
|
|
args=(queue, PUSH_TO_PROD_SUITE,
|
|
test_push_common.EXPECTED_TEST_RESULTS, arguments))
|
|
push_to_prod_suite.daemon = use_daemon
|
|
push_to_prod_suite.start()
|
|
|
|
# suite test with --create_and_return flag
|
|
asynchronous_suite = multiprocessing.Process(
|
|
target=test_suite_wrapper,
|
|
args=(queue, DUMMY_SUITE,
|
|
test_push_common.EXPECTED_TEST_RESULTS_DUMMY,
|
|
arguments, True, True))
|
|
asynchronous_suite.daemon = True
|
|
asynchronous_suite.start()
|
|
|
|
while push_to_prod_suite.is_alive() or asynchronous_suite.is_alive():
|
|
check_queue(queue)
|
|
time.sleep(5)
|
|
check_queue(queue)
|
|
push_to_prod_suite.join()
|
|
asynchronous_suite.join()
|
|
|
|
|
|
def check_service_crash(respawn_limit, start_time):
|
|
"""Check whether scheduler or host_scheduler crash during testing.
|
|
|
|
Since the testing push is kicked off at the beginning of a given hour, the way
|
|
to check whether a service is crashed is to check whether the times of the
|
|
service being respawn during testing push is over the respawn_limit.
|
|
|
|
@param respawn_limit: The maximum number of times the service is allowed to
|
|
be respawn.
|
|
@param start_time: The time that testing push is kicked off.
|
|
"""
|
|
def _parse(filename_prefix, filename):
|
|
"""Helper method to parse the time of the log.
|
|
|
|
@param filename_prefix: The prefix of the filename.
|
|
@param filename: The name of the log file.
|
|
"""
|
|
return datetime.datetime.strptime(filename[len(filename_prefix):],
|
|
"%Y-%m-%d-%H.%M.%S")
|
|
|
|
services = ['scheduler', 'host_scheduler']
|
|
logs = os.listdir('%s/logs/' % AUTOTEST_DIR)
|
|
curr_time = datetime.datetime.now()
|
|
|
|
error_msg = ''
|
|
for service in services:
|
|
log_prefix = '%s.log.' % service
|
|
respawn_count = sum(1 for l in logs if l.startswith(log_prefix)
|
|
and start_time <= _parse(log_prefix, l) <= curr_time)
|
|
|
|
if respawn_count > respawn_limit:
|
|
error_msg += ('%s has been respawned %s times during testing push at %s. '
|
|
'It is very likely crashed. Please check!\n' %
|
|
(service, respawn_count,
|
|
start_time.strftime("%Y-%m-%d-%H")))
|
|
if error_msg:
|
|
raise TestPushException(error_msg)
|
|
|
|
|
|
_SUCCESS_MSG = """
|
|
All staging tests completed successfully.
|
|
|
|
Instructions for pushing to prod are available at
|
|
https://goto.google.com/autotest-to-prod
|
|
"""
|
|
|
|
|
|
def _main(arguments):
|
|
"""Run test and promote repo branches if tests succeed.
|
|
|
|
@param arguments: command line arguments.
|
|
"""
|
|
|
|
# TODO Use chromite.lib.parallel.Manager instead, to workaround the
|
|
# too-long-tmp-path problem.
|
|
mpmanager = multiprocessing.Manager()
|
|
# These are globals used by other functions in this module to communicate
|
|
# back from worker processes.
|
|
global _run_suite_output
|
|
_run_suite_output = mpmanager.list()
|
|
global _all_suite_ids
|
|
_all_suite_ids = mpmanager.list()
|
|
|
|
try:
|
|
start_time = datetime.datetime.now()
|
|
reverify_all_push_duts()
|
|
time.sleep(15) # Wait for the verify test to start.
|
|
check_dut_inventory(arguments.num_duts, arguments.pool)
|
|
_run_test_suites(arguments)
|
|
check_service_crash(arguments.service_respawn_limit, start_time)
|
|
print(_SUCCESS_MSG)
|
|
except Exception:
|
|
# Abort running jobs unless flagged to continue when there is a failure.
|
|
if not arguments.continue_on_failure:
|
|
for suite_id in _all_suite_ids:
|
|
if AFE.get_jobs(id=suite_id, finished=False):
|
|
AFE.run('abort_host_queue_entries', job=suite_id)
|
|
raise
|
|
|
|
|
|
def main():
|
|
"""Entry point."""
|
|
arguments = parse_arguments(sys.argv)
|
|
_main(arguments)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|