254 lines
8.0 KiB
Python
254 lines
8.0 KiB
Python
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
import logging
|
|
import time
|
|
|
|
from autotest_lib.client.bin import utils
|
|
from autotest_lib.client.common_lib import error
|
|
from autotest_lib.server import test
|
|
|
|
_MODEM_WAIT_DELAY = 120
|
|
|
|
NO_MODEM_STATE_AVAILABLE = 'FAILED TO GET MODEM STATE'
|
|
|
|
# TODO(harpreet / benchan): Modify the modem script to report modem health.
|
|
# crbug.com/352351
|
|
MM_MODEM_STATE_FAILED = '-1'
|
|
MM_MODEM_STATE_UNKNOWN = '0'
|
|
MM_MODEM_STATE_INITIALIZING = '1'
|
|
MM_MODEM_STATE_LOCKED = '2'
|
|
MM_MODEM_STATE_DISABLED = '3'
|
|
MM_MODEM_STATE_DISABLING = '4'
|
|
MM_MODEM_STATE_ENABLING = '5'
|
|
MM_MODEM_STATE_ENABLED = '6'
|
|
MM_MODEM_STATE_SEARCHING = '7'
|
|
MM_MODEM_STATE_REGISTERED = '8'
|
|
MM_MODEM_STATE_DISCONNECTING = '9'
|
|
MM_MODEM_STATE_CONNECTING = '10'
|
|
MM_MODEM_STATE_CONNECTED = '11'
|
|
|
|
GOBI_MODEM_STATE_UNKNOWN = '0'
|
|
GOBI_MODEM_STATE_DISABLED = '10'
|
|
GOBI_MODEM_STATE_DISABLING = '20'
|
|
GOBI_MODEM_STATE_ENABLING = '30'
|
|
GOBI_MODEM_STATE_ENABLED = '40'
|
|
GOBI_MODEM_STATE_SEARCHING = '50'
|
|
GOBI_MODEM_STATE_REGISTERED = '60'
|
|
GOBI_MODEM_STATE_DISCONNECTING = '70'
|
|
GOBI_MODEM_STATE_CONNECTING = '80'
|
|
GOBI_MODEM_STATE_CONNECTED = '90'
|
|
|
|
ENABLED_MODEM_STATES = [
|
|
MM_MODEM_STATE_ENABLED,
|
|
GOBI_MODEM_STATE_ENABLED
|
|
]
|
|
|
|
MM_STABLE_MODEM_STATES = [
|
|
MM_MODEM_STATE_DISABLED,
|
|
MM_MODEM_STATE_REGISTERED,
|
|
MM_MODEM_STATE_CONNECTED,
|
|
]
|
|
|
|
GOBI_STABLE_MODEM_STATES = [
|
|
GOBI_MODEM_STATE_DISABLED,
|
|
GOBI_MODEM_STATE_REGISTERED,
|
|
GOBI_MODEM_STATE_CONNECTED
|
|
]
|
|
|
|
class cellular_StaleModemReboot(test.test):
|
|
"""
|
|
Uses servo to cold reboot the device if modem is not available or is not in
|
|
testable state.
|
|
|
|
The test attempts to get modem status by running the 'modem status' command
|
|
on the DUT. If it is unsuccessful in getting the modem status or the modem
|
|
is in a bad state, it will try to reboot the DUT.
|
|
|
|
"""
|
|
|
|
version = 1
|
|
|
|
def _modem_state_to_string(self, state, is_gobi):
|
|
"""Takes the numerical modem state and returns associated state name.
|
|
|
|
@param state: The state of the modem on the device.
|
|
@param is_gobi: True if the device has a gobi modem.
|
|
@return MODEM_STATE_STRINGS as the actual name of the state associated
|
|
with the numeric value.
|
|
|
|
"""
|
|
if not state:
|
|
return NO_MODEM_STATE_AVAILABLE
|
|
|
|
if is_gobi:
|
|
MODEM_STATE_STRINGS = [
|
|
'UNKNOWN',
|
|
'DISABLED',
|
|
'DISABLING',
|
|
'ENABLING',
|
|
'ENABLED',
|
|
'SEARCHING',
|
|
'REGISTERED',
|
|
'DISCONNECTING',
|
|
'CONNECTING',
|
|
'CONNECTED'
|
|
]
|
|
return MODEM_STATE_STRINGS[int(state[:1])]
|
|
|
|
MODEM_STATE_STRINGS = [
|
|
'FAILED',
|
|
'UNKNOWN',
|
|
'INITIALIZING',
|
|
'LOCKED',
|
|
'DISABLED',
|
|
'DISABLING',
|
|
'ENABLING',
|
|
'ENABLED',
|
|
'SEARCHING',
|
|
'REGISTERED',
|
|
'DISCONNECTING',
|
|
'CONNECTING',
|
|
'CONNECTED'
|
|
]
|
|
return MODEM_STATE_STRINGS[int(state) + 1]
|
|
|
|
|
|
def _format_modem_status(self, modem_status):
|
|
"""Formats the modem status data and inserts it into a dictionary.
|
|
|
|
@param modem_status: Command line output of 'modem status'.
|
|
@return modem status dictionary
|
|
|
|
"""
|
|
|
|
modem_state = ''
|
|
modem_status_dict = {}
|
|
|
|
if not modem_status:
|
|
return None
|
|
|
|
lines = modem_status.splitlines()
|
|
|
|
for item in lines:
|
|
columns = item.split(':')
|
|
columns = [x.strip() for x in columns]
|
|
if len(columns) > 1:
|
|
modem_status_dict[columns[0]] = columns[1]
|
|
else:
|
|
modem_status_dict[columns[0]] = ''
|
|
|
|
return modem_status_dict
|
|
|
|
|
|
def _get_modem_status(self):
|
|
"""Gets the status of the modem by running 'modem status' command.
|
|
|
|
@return modem_status_dict: is the dictionary of all the lines retuned
|
|
by modem status command.
|
|
|
|
"""
|
|
try:
|
|
modem_status = self._client.run('modem status').stdout.strip()
|
|
modem_status_dict = self._format_modem_status(modem_status)
|
|
return modem_status_dict
|
|
except error.AutoservRunError as e:
|
|
logging.debug("AutoservRunError is: %s", e)
|
|
return None
|
|
|
|
|
|
def _get_modem_state(self):
|
|
modem_status_dict = self._get_modem_status()
|
|
|
|
if not modem_status_dict:
|
|
return None
|
|
|
|
return modem_status_dict.get('State')
|
|
|
|
|
|
def _cold_reset_dut(self, boot_id):
|
|
self._client.power_off_via_servo()
|
|
self._client.power_on_via_servo()
|
|
time.sleep(self._servo.BOOT_DELAY)
|
|
self._client.wait_for_restart(old_boot_id=boot_id)
|
|
self._wait_for_stable_modem_state()
|
|
|
|
|
|
def _wait_for_stable_modem_state(self):
|
|
"""
|
|
Wait for a maximum of _MODEM_WAIT_DELAY seconds for the modem to get
|
|
into stable state. Also, because we do not want the test to terminate
|
|
in case there is an exception, we are catching and logging the exception
|
|
so the test can continue with rebooting the device again as needed.
|
|
|
|
"""
|
|
try:
|
|
utils.poll_for_condition(
|
|
lambda: self._get_modem_state() in self.STABLE_MODEM_STATES,
|
|
exception=utils.TimeoutError('Modem not in stable state '
|
|
'after %s seconds.' %
|
|
_MODEM_WAIT_DELAY),
|
|
timeout=_MODEM_WAIT_DELAY,
|
|
sleep_interval=5)
|
|
except utils.TimeoutError as e:
|
|
logging.debug("Stable modem state TimeoutError is: %s", e)
|
|
|
|
|
|
def run_once(self, host, tries=2, expect_auto_registration=True):
|
|
"""
|
|
Runs the test.
|
|
|
|
@param host: A host object representing the DUT.
|
|
@param tries: Maximum number of times test will try to reboot the DUT.
|
|
Default number of tries is 2, which is set in the control file.
|
|
@param expect_auto_registration: To be used with an exceptional modem
|
|
that does not auto-register by passing False from modem specific
|
|
control file.
|
|
@raise error.TestFail if modem cannot be brought to a testable stated.
|
|
|
|
"""
|
|
|
|
self._client = host
|
|
self._servo = host.servo
|
|
|
|
if not self._servo:
|
|
logging.info('Host %s does not have a servo.', host.hostname)
|
|
return
|
|
|
|
self.STABLE_MODEM_STATES = MM_STABLE_MODEM_STATES
|
|
gobi = False
|
|
if 'gobi' in self._client.run('modem status').stdout.strip().lower():
|
|
self.STABLE_MODEM_STATES = GOBI_STABLE_MODEM_STATES
|
|
gobi = True
|
|
|
|
if not expect_auto_registration:
|
|
self.STABLE_MODEM_STATES.extend(ENABLED_MODEM_STATES)
|
|
|
|
original_modem_state = self._get_modem_state()
|
|
|
|
logging.info('Modem state before reboot on host %s: %s',
|
|
host.hostname,
|
|
self._modem_state_to_string(original_modem_state, gobi))
|
|
|
|
boot_id = self._client.get_boot_id()
|
|
|
|
num_tries = 0
|
|
|
|
while True:
|
|
self._cold_reset_dut(boot_id)
|
|
new_modem_state = self._get_modem_state()
|
|
if new_modem_state in self.STABLE_MODEM_STATES:
|
|
logging.info('Modem is in testable state: %s',
|
|
self._modem_state_to_string(new_modem_state, gobi))
|
|
break
|
|
if new_modem_state == MM_MODEM_STATE_LOCKED:
|
|
raise error.TestFail('Modem in locked state.')
|
|
if num_tries == tries:
|
|
logging.info('Modem still in bad state after %s reboot tries '
|
|
'on host %s. Modem state: %s ',
|
|
tries+1, host.hostname,
|
|
self._modem_state_to_string(new_modem_state, gobi))
|
|
raise error.TestFail('Modem is not in testable state')
|
|
num_tries += 1
|