1431 lines
54 KiB
Python
Executable File
1431 lines
54 KiB
Python
Executable File
#!/usr/bin/python2
|
|
|
|
# pylint: disable=missing-docstring
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
import logging
|
|
import os
|
|
import shutil
|
|
from six.moves import range
|
|
import stat
|
|
import tempfile
|
|
import unittest
|
|
|
|
import common
|
|
from autotest_lib.client.common_lib import base_job, error
|
|
|
|
|
|
class stub_job_directory(object):
|
|
"""
|
|
Stub job_directory class, for replacing the job._job_directory factory.
|
|
Just creates a job_directory object without any of the actual directory
|
|
checks. When given None it creates a temporary name (but not an actual
|
|
temporary directory).
|
|
"""
|
|
def __init__(self, path, is_writable=False):
|
|
# path=None and is_writable=False is always an error
|
|
assert path or is_writable
|
|
|
|
if path is None and is_writable:
|
|
self.path = tempfile.mktemp()
|
|
else:
|
|
self.path = path
|
|
|
|
|
|
class stub_job_state(base_job.job_state):
|
|
"""
|
|
Stub job state class, for replacing the job._job_state factory.
|
|
Doesn't actually provide any persistence, just the state handling.
|
|
"""
|
|
def __init__(self):
|
|
self._state = {}
|
|
self._backing_file_lock = None
|
|
|
|
def read_from_file(self, file_path):
|
|
pass
|
|
|
|
def write_to_file(self, file_path):
|
|
pass
|
|
|
|
def set_backing_file(self, file_path):
|
|
pass
|
|
|
|
def _read_from_backing_file(self):
|
|
pass
|
|
|
|
def _write_to_backing_file(self):
|
|
pass
|
|
|
|
def _lock_backing_file(self):
|
|
pass
|
|
|
|
def _unlock_backing_file(self):
|
|
pass
|
|
|
|
|
|
class test_init(unittest.TestCase):
|
|
class generic_tests(object):
|
|
"""
|
|
Generic tests for any implementation of __init__.
|
|
|
|
Expectations:
|
|
A self.job attribute where self.job is a __new__'ed instance of
|
|
the job class to be tested, but not yet __init__'ed.
|
|
|
|
A self.call_init method that will config the appropriate mocks
|
|
and then call job.__init__. It should undo any mocks it created
|
|
afterwards.
|
|
"""
|
|
|
|
PUBLIC_ATTRIBUTES = set([
|
|
# standard directories
|
|
'autodir', 'clientdir', 'serverdir', 'resultdir', 'pkgdir',
|
|
'tmpdir', 'testdir', 'site_testdir', 'bindir',
|
|
'profdir', 'toolsdir',
|
|
|
|
# other special attributes
|
|
'args', 'automatic_test_tag', 'control',
|
|
'default_profile_only', 'drop_caches',
|
|
'drop_caches_between_iterations', 'harness', 'hosts',
|
|
'logging', 'machines', 'num_tests_failed', 'num_tests_run',
|
|
'pkgmgr', 'profilers', 'resultdir', 'run_test_cleanup',
|
|
'sysinfo', 'tag', 'user', 'use_sequence_number',
|
|
'warning_loggers', 'warning_manager', 'label',
|
|
'parent_job_id', 'in_lab', 'machine_dict_list',
|
|
'max_result_size_KB', 'fast'
|
|
])
|
|
|
|
OPTIONAL_ATTRIBUTES = set([
|
|
'serverdir',
|
|
|
|
'automatic_test_tag', 'control', 'harness', 'num_tests_run',
|
|
'num_tests_failed', 'tag', 'warning_manager', 'warning_loggers',
|
|
'label', 'parent_job_id', 'max_result_size_KB', 'fast'
|
|
])
|
|
|
|
OPTIONAL_ATTRIBUTES_DEVICE_ERROR = set(['failed_with_device_error'])
|
|
|
|
def test_public_attributes_initialized(self):
|
|
# only the known public attributes should be there after __init__
|
|
self.call_init()
|
|
public_attributes = set(attr for attr in dir(self.job)
|
|
if not attr.startswith('_')
|
|
and not callable(getattr(self.job, attr)))
|
|
expected_attributes = self.PUBLIC_ATTRIBUTES
|
|
missing_attributes = expected_attributes - public_attributes
|
|
self.assertEqual(missing_attributes, set([]),
|
|
'Missing attributes: %s' %
|
|
', '.join(sorted(missing_attributes)))
|
|
extra_attributes = (public_attributes - expected_attributes -
|
|
self.OPTIONAL_ATTRIBUTES_DEVICE_ERROR)
|
|
self.assertEqual(extra_attributes, set([]),
|
|
'Extra public attributes found: %s' %
|
|
', '.join(sorted(extra_attributes)))
|
|
|
|
|
|
def test_required_attributes_not_none(self):
|
|
required_attributes = (self.PUBLIC_ATTRIBUTES -
|
|
self.OPTIONAL_ATTRIBUTES)
|
|
self.call_init()
|
|
for attribute in required_attributes:
|
|
self.assertNotEqual(getattr(self.job, attribute, None), None,
|
|
'job.%s is None but is not optional'
|
|
% attribute)
|
|
|
|
|
|
class test_find_base_directories(unittest.TestCase):
|
|
class generic_tests(object):
|
|
"""
|
|
Generic tests for any implementation of _find_base_directories.
|
|
|
|
Expectations:
|
|
A self.job attribute where self.job is an instance of the job
|
|
class to be tested.
|
|
"""
|
|
def test_autodir_is_not_none(self):
|
|
auto, client, server = self.job._find_base_directories()
|
|
self.assertNotEqual(auto, None)
|
|
|
|
|
|
def test_clientdir_is_not_none(self):
|
|
auto, client, server = self.job._find_base_directories()
|
|
self.assertNotEqual(client, None)
|
|
|
|
|
|
class test_initialize_dir_properties(unittest.TestCase):
|
|
def make_job(self, autodir, server):
|
|
job = base_job.base_job.__new__(base_job.base_job)
|
|
job._job_directory = stub_job_directory
|
|
job._autodir = stub_job_directory(autodir)
|
|
if server:
|
|
job._clientdir = stub_job_directory(
|
|
os.path.join(autodir, 'client'))
|
|
job._serverdir = stub_job_directory(
|
|
os.path.join(autodir, 'server'))
|
|
else:
|
|
job._clientdir = stub_job_directory(job.autodir)
|
|
job._serverdir = None
|
|
return job
|
|
|
|
|
|
def setUp(self):
|
|
self.cjob = self.make_job('/atest/client', False)
|
|
self.sjob = self.make_job('/atest', True)
|
|
|
|
|
|
def test_always_client_dirs(self):
|
|
self.cjob._initialize_dir_properties()
|
|
self.sjob._initialize_dir_properties()
|
|
|
|
# check all the always-client dir properties
|
|
self.assertEqual(self.cjob.bindir, self.sjob.bindir)
|
|
self.assertEqual(self.cjob.profdir, self.sjob.profdir)
|
|
self.assertEqual(self.cjob.pkgdir, self.sjob.pkgdir)
|
|
|
|
|
|
def test_dynamic_dirs(self):
|
|
self.cjob._initialize_dir_properties()
|
|
self.sjob._initialize_dir_properties()
|
|
|
|
# check all the context-specifc dir properties
|
|
self.assert_(self.cjob.tmpdir.startswith('/atest/client'))
|
|
self.assert_(self.cjob.testdir.startswith('/atest/client'))
|
|
self.assert_(self.cjob.site_testdir.startswith('/atest/client'))
|
|
self.assertEqual(self.sjob.tmpdir, tempfile.gettempdir())
|
|
self.assert_(self.sjob.testdir.startswith('/atest/server'))
|
|
self.assert_(self.sjob.site_testdir.startswith('/atest/server'))
|
|
|
|
|
|
class test_execution_context(unittest.TestCase):
|
|
def setUp(self):
|
|
clientdir = os.path.abspath(os.path.join(__file__, '..', '..'))
|
|
self.resultdir = tempfile.mkdtemp(suffix='unittest')
|
|
self.job = base_job.base_job.__new__(base_job.base_job)
|
|
self.job._find_base_directories = lambda: (clientdir, clientdir, None)
|
|
self.job._find_resultdir = lambda *_: self.resultdir
|
|
self.job.__init__()
|
|
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.resultdir, ignore_errors=True)
|
|
|
|
|
|
def test_pop_fails_without_push(self):
|
|
self.assertRaises(IndexError, self.job.pop_execution_context)
|
|
|
|
|
|
def test_push_changes_to_subdir(self):
|
|
sub1 = os.path.join(self.resultdir, 'sub1')
|
|
os.mkdir(sub1)
|
|
self.job.push_execution_context('sub1')
|
|
self.assertEqual(self.job.resultdir, sub1)
|
|
|
|
|
|
def test_push_creates_subdir(self):
|
|
sub2 = os.path.join(self.resultdir, 'sub2')
|
|
self.job.push_execution_context('sub2')
|
|
self.assertEqual(self.job.resultdir, sub2)
|
|
self.assert_(os.path.exists(sub2))
|
|
|
|
|
|
def test_push_handles_absolute_paths(self):
|
|
otherresults = tempfile.mkdtemp(suffix='unittest')
|
|
try:
|
|
self.job.push_execution_context(otherresults)
|
|
self.assertEqual(self.job.resultdir, otherresults)
|
|
finally:
|
|
shutil.rmtree(otherresults, ignore_errors=True)
|
|
|
|
|
|
def test_pop_restores_context(self):
|
|
sub3 = os.path.join(self.resultdir, 'sub3')
|
|
self.job.push_execution_context('sub3')
|
|
self.assertEqual(self.job.resultdir, sub3)
|
|
self.job.pop_execution_context()
|
|
self.assertEqual(self.job.resultdir, self.resultdir)
|
|
|
|
|
|
def test_push_and_pop_are_fifo(self):
|
|
sub4 = os.path.join(self.resultdir, 'sub4')
|
|
subsub = os.path.join(sub4, 'subsub')
|
|
self.job.push_execution_context('sub4')
|
|
self.assertEqual(self.job.resultdir, sub4)
|
|
self.job.push_execution_context('subsub')
|
|
self.assertEqual(self.job.resultdir, subsub)
|
|
self.job.pop_execution_context()
|
|
self.assertEqual(self.job.resultdir, sub4)
|
|
self.job.pop_execution_context()
|
|
self.assertEqual(self.job.resultdir, self.resultdir)
|
|
|
|
|
|
class test_job_directory(unittest.TestCase):
|
|
def setUp(self):
|
|
self.testdir = tempfile.mkdtemp(suffix='unittest')
|
|
self.original_wd = os.getcwd()
|
|
os.chdir(self.testdir)
|
|
|
|
|
|
def tearDown(self):
|
|
os.chdir(self.original_wd)
|
|
shutil.rmtree(self.testdir, ignore_errors=True)
|
|
|
|
|
|
def test_passes_if_dir_exists(self):
|
|
os.mkdir('testing')
|
|
self.assert_(os.path.isdir('testing'))
|
|
jd = base_job.job_directory('testing')
|
|
self.assert_(os.path.isdir('testing'))
|
|
|
|
|
|
def test_fails_if_not_writable_and_dir_doesnt_exist(self):
|
|
self.assert_(not os.path.isdir('testing2'))
|
|
self.assertRaises(base_job.job_directory.MissingDirectoryException,
|
|
base_job.job_directory, 'testing2')
|
|
|
|
|
|
def test_fails_if_file_already_exists(self):
|
|
open('testing3', 'w').close()
|
|
self.assert_(os.path.isfile('testing3'))
|
|
self.assertRaises(base_job.job_directory.MissingDirectoryException,
|
|
base_job.job_directory, 'testing3')
|
|
|
|
|
|
def test_passes_if_writable_and_dir_exists(self):
|
|
os.mkdir('testing4')
|
|
self.assert_(os.path.isdir('testing4'))
|
|
jd = base_job.job_directory('testing4', True)
|
|
self.assert_(os.path.isdir('testing4'))
|
|
|
|
|
|
def test_creates_dir_if_writable_and_dir_doesnt_exist(self):
|
|
self.assert_(not os.path.isdir('testing5'))
|
|
jd = base_job.job_directory('testing5', True)
|
|
self.assert_(os.path.isdir('testing5'))
|
|
|
|
|
|
def test_recursive_creates_dir_if_writable_and_dir_doesnt_exist(self):
|
|
self.assert_(not os.path.isdir('testing6'))
|
|
base_job.job_directory('testing6/subdir', True)
|
|
self.assert_(os.path.isdir('testing6/subdir'))
|
|
|
|
|
|
def test_fails_if_writable_and_file_exists(self):
|
|
open('testing7', 'w').close()
|
|
self.assert_(os.path.isfile('testing7'))
|
|
self.assert_(not os.path.isdir('testing7'))
|
|
self.assertRaises(base_job.job_directory.UncreatableDirectoryException,
|
|
base_job.job_directory, 'testing7', True)
|
|
|
|
|
|
def test_fails_if_writable_and_no_permission_to_create(self):
|
|
os.mkdir('testing8', 0o555)
|
|
self.assert_(os.path.isdir('testing8'))
|
|
self.assertRaises(base_job.job_directory.UncreatableDirectoryException,
|
|
base_job.job_directory, 'testing8/subdir', True)
|
|
|
|
|
|
def test_passes_if_not_is_writable_and_dir_not_writable(self):
|
|
os.mkdir('testing9', 0o555)
|
|
self.assert_(os.path.isdir('testing9'))
|
|
self.assert_(not os.access('testing9', os.W_OK))
|
|
jd = base_job.job_directory('testing9')
|
|
|
|
|
|
def test_fails_if_is_writable_but_dir_not_writable(self):
|
|
os.mkdir('testing10', 0o555)
|
|
self.assert_(os.path.isdir('testing10'))
|
|
self.assert_(not os.access('testing10', os.W_OK))
|
|
self.assertRaises(base_job.job_directory.UnwritableDirectoryException,
|
|
base_job.job_directory, 'testing10', True)
|
|
|
|
|
|
def test_fails_if_no_path_and_not_writable(self):
|
|
self.assertRaises(base_job.job_directory.MissingDirectoryException,
|
|
base_job.job_directory, None)
|
|
|
|
|
|
def test_no_path_and_and_not_writable_creates_tempdir(self):
|
|
jd = base_job.job_directory(None, True)
|
|
self.assert_(os.path.isdir(jd.path))
|
|
self.assert_(os.access(jd.path, os.W_OK))
|
|
temp_path = jd.path
|
|
del jd
|
|
self.assert_(not os.path.isdir(temp_path))
|
|
|
|
|
|
class test_job_state(unittest.TestCase):
|
|
def setUp(self):
|
|
self.state = base_job.job_state()
|
|
|
|
|
|
def test_undefined_name_returns_key_error(self):
|
|
self.assertRaises(KeyError, self.state.get, 'ns1', 'missing_name')
|
|
|
|
|
|
def test_undefined_name_returns_default(self):
|
|
self.assertEqual(42, self.state.get('ns2', 'missing_name', default=42))
|
|
|
|
|
|
def test_none_is_valid_default(self):
|
|
self.assertEqual(None, self.state.get('ns3', 'missing_name',
|
|
default=None))
|
|
|
|
|
|
def test_get_returns_set_values(self):
|
|
self.state.set('ns4', 'name1', 50)
|
|
self.assertEqual(50, self.state.get('ns4', 'name1'))
|
|
|
|
|
|
def test_get_ignores_default_when_value_is_defined(self):
|
|
self.state.set('ns5', 'name2', 55)
|
|
self.assertEqual(55, self.state.get('ns5', 'name2', default=45))
|
|
|
|
|
|
def test_set_only_sets_one_value(self):
|
|
self.state.set('ns6', 'name3', 50)
|
|
self.assertEqual(50, self.state.get('ns6', 'name3'))
|
|
self.assertRaises(KeyError, self.state.get, 'ns6', 'name4')
|
|
|
|
|
|
def test_set_works_with_multiple_names(self):
|
|
self.state.set('ns7', 'name5', 60)
|
|
self.state.set('ns7', 'name6', 70)
|
|
self.assertEquals(60, self.state.get('ns7', 'name5'))
|
|
self.assertEquals(70, self.state.get('ns7', 'name6'))
|
|
|
|
|
|
def test_multiple_sets_override_each_other(self):
|
|
self.state.set('ns8', 'name7', 10)
|
|
self.state.set('ns8', 'name7', 25)
|
|
self.assertEquals(25, self.state.get('ns8', 'name7'))
|
|
|
|
|
|
def test_get_with_default_does_not_set(self):
|
|
self.assertEquals(100, self.state.get('ns9', 'name8', default=100))
|
|
self.assertRaises(KeyError, self.state.get, 'ns9', 'name8')
|
|
|
|
|
|
def test_set_in_one_namespace_ignores_other(self):
|
|
self.state.set('ns10', 'name9', 200)
|
|
self.assertEquals(200, self.state.get('ns10', 'name9'))
|
|
self.assertRaises(KeyError, self.state.get, 'ns11', 'name9')
|
|
|
|
|
|
def test_namespaces_do_not_collide(self):
|
|
self.state.set('ns12', 'name10', 250)
|
|
self.state.set('ns13', 'name10', -150)
|
|
self.assertEquals(-150, self.state.get('ns13', 'name10'))
|
|
self.assertEquals(250, self.state.get('ns12', 'name10'))
|
|
|
|
|
|
def test_discard_does_nothing_on_undefined_namespace(self):
|
|
self.state.discard('missing_ns', 'missing')
|
|
self.assertRaises(KeyError, self.state.get, 'missing_ns', 'missing')
|
|
|
|
|
|
def test_discard_does_nothing_on_missing_name(self):
|
|
self.state.set('ns14', 'name20', 111)
|
|
self.state.discard('ns14', 'missing')
|
|
self.assertEqual(111, self.state.get('ns14', 'name20'))
|
|
self.assertRaises(KeyError, self.state.get, 'ns14', 'missing')
|
|
|
|
|
|
def test_discard_deletes_name(self):
|
|
self.state.set('ns15', 'name21', 4567)
|
|
self.assertEqual(4567, self.state.get('ns15', 'name21'))
|
|
self.state.discard('ns15', 'name21')
|
|
self.assertRaises(KeyError, self.state.get, 'ns15', 'name21')
|
|
|
|
|
|
def test_discard_doesnt_touch_other_values(self):
|
|
self.state.set('ns16_1', 'name22', 'val1')
|
|
self.state.set('ns16_1', 'name23', 'val2')
|
|
self.state.set('ns16_2', 'name23', 'val3')
|
|
self.assertEqual('val1', self.state.get('ns16_1', 'name22'))
|
|
self.assertEqual('val3', self.state.get('ns16_2', 'name23'))
|
|
self.state.discard('ns16_1', 'name23')
|
|
self.assertEqual('val1', self.state.get('ns16_1', 'name22'))
|
|
self.assertEqual('val3', self.state.get('ns16_2', 'name23'))
|
|
|
|
|
|
def test_has_is_true_for_all_set_values(self):
|
|
self.state.set('ns17_1', 'name24', 1)
|
|
self.state.set('ns17_1', 'name25', 2)
|
|
self.state.set('ns17_2', 'name25', 3)
|
|
self.assert_(self.state.has('ns17_1', 'name24'))
|
|
self.assert_(self.state.has('ns17_1', 'name25'))
|
|
self.assert_(self.state.has('ns17_2', 'name25'))
|
|
|
|
|
|
def test_has_is_false_for_all_unset_values(self):
|
|
self.state.set('ns18_1', 'name26', 1)
|
|
self.state.set('ns18_1', 'name27', 2)
|
|
self.state.set('ns18_2', 'name27', 3)
|
|
self.assert_(not self.state.has('ns18_2', 'name26'))
|
|
|
|
|
|
def test_discard_namespace_drops_all_values(self):
|
|
self.state.set('ns19', 'var1', 10)
|
|
self.state.set('ns19', 'var3', 100)
|
|
self.state.discard_namespace('ns19')
|
|
self.assertRaises(KeyError, self.state.get, 'ns19', 'var1')
|
|
self.assertRaises(KeyError, self.state.get, 'ns19', 'var3')
|
|
|
|
|
|
def test_discard_namespace_works_on_missing_namespace(self):
|
|
self.state.discard_namespace('missing_ns')
|
|
|
|
|
|
def test_discard_namespace_doesnt_touch_other_values(self):
|
|
self.state.set('ns20', 'var1', 20)
|
|
self.state.set('ns20', 'var2', 200)
|
|
self.state.set('ns21', 'var2', 21)
|
|
self.state.discard_namespace('ns20')
|
|
self.assertEqual(21, self.state.get('ns21', 'var2'))
|
|
|
|
|
|
# run the same tests as test_job_state, but with a backing file turned on
|
|
# also adds some tests to check that each method is persistent
|
|
class test_job_state_with_backing_file(test_job_state):
|
|
def setUp(self):
|
|
self.backing_file = tempfile.mktemp()
|
|
self.state = base_job.job_state()
|
|
self.state.set_backing_file(self.backing_file)
|
|
|
|
|
|
def tearDown(self):
|
|
if os.path.exists(self.backing_file):
|
|
os.remove(self.backing_file)
|
|
|
|
|
|
def test_set_is_persistent(self):
|
|
self.state.set('persist', 'var', 'value')
|
|
written_state = base_job.job_state()
|
|
written_state.read_from_file(self.backing_file)
|
|
self.assertEqual('value', written_state.get('persist', 'var'))
|
|
|
|
|
|
def test_discard_is_persistent(self):
|
|
self.state.set('persist', 'var', 'value')
|
|
self.state.discard('persist', 'var')
|
|
written_state = base_job.job_state()
|
|
written_state.read_from_file(self.backing_file)
|
|
self.assertRaises(KeyError, written_state.get, 'persist', 'var')
|
|
|
|
|
|
def test_discard_namespace_is_persistent(self):
|
|
self.state.set('persist', 'var', 'value')
|
|
self.state.discard_namespace('persist')
|
|
written_state = base_job.job_state()
|
|
written_state.read_from_file(self.backing_file)
|
|
self.assertRaises(KeyError, written_state.get, 'persist', 'var')
|
|
|
|
|
|
class test_job_state_read_write_file(unittest.TestCase):
|
|
def setUp(self):
|
|
self.testdir = tempfile.mkdtemp(suffix='unittest')
|
|
self.original_wd = os.getcwd()
|
|
os.chdir(self.testdir)
|
|
|
|
|
|
def tearDown(self):
|
|
os.chdir(self.original_wd)
|
|
shutil.rmtree(self.testdir, ignore_errors=True)
|
|
|
|
|
|
def test_write_read_transfers_all_state(self):
|
|
state1 = base_job.job_state()
|
|
state1.set('ns1', 'var0', 50)
|
|
state1.set('ns2', 'var10', 100)
|
|
state1.write_to_file('transfer_file')
|
|
state2 = base_job.job_state()
|
|
self.assertRaises(KeyError, state2.get, 'ns1', 'var0')
|
|
self.assertRaises(KeyError, state2.get, 'ns2', 'var10')
|
|
state2.read_from_file('transfer_file')
|
|
self.assertEqual(50, state2.get('ns1', 'var0'))
|
|
self.assertEqual(100, state2.get('ns2', 'var10'))
|
|
|
|
|
|
def test_read_overwrites_in_memory(self):
|
|
state = base_job.job_state()
|
|
state.set('ns', 'myvar', 'hello')
|
|
state.write_to_file('backup')
|
|
state.set('ns', 'myvar', 'goodbye')
|
|
self.assertEqual('goodbye', state.get('ns', 'myvar'))
|
|
state.read_from_file('backup')
|
|
self.assertEqual('hello', state.get('ns', 'myvar'))
|
|
|
|
|
|
def test_read_updates_persistent_file(self):
|
|
state1 = base_job.job_state()
|
|
state1.set('ns', 'var1', 'value1')
|
|
state1.write_to_file('to_be_read')
|
|
state2 = base_job.job_state()
|
|
state2.set_backing_file('backing_file')
|
|
state2.set('ns', 'var2', 'value2')
|
|
state2.read_from_file('to_be_read')
|
|
state2.set_backing_file(None)
|
|
state3 = base_job.job_state()
|
|
state3.read_from_file('backing_file')
|
|
self.assertEqual('value1', state3.get('ns', 'var1'))
|
|
self.assertEqual('value2', state3.get('ns', 'var2'))
|
|
|
|
|
|
def test_read_without_merge(self):
|
|
state = base_job.job_state()
|
|
state.set('ns', 'myvar1', 'hello')
|
|
state.write_to_file('backup')
|
|
state.discard('ns', 'myvar1')
|
|
state.set('ns', 'myvar2', 'goodbye')
|
|
self.assertFalse(state.has('ns', 'myvar1'))
|
|
self.assertEqual('goodbye', state.get('ns', 'myvar2'))
|
|
state.read_from_file('backup', merge=False)
|
|
self.assertEqual('hello', state.get('ns', 'myvar1'))
|
|
self.assertFalse(state.has('ns', 'myvar2'))
|
|
|
|
|
|
class test_job_state_set_backing_file(unittest.TestCase):
|
|
def setUp(self):
|
|
self.testdir = tempfile.mkdtemp(suffix='unittest')
|
|
self.original_wd = os.getcwd()
|
|
os.chdir(self.testdir)
|
|
|
|
|
|
def tearDown(self):
|
|
os.chdir(self.original_wd)
|
|
shutil.rmtree(self.testdir, ignore_errors=True)
|
|
|
|
|
|
def test_writes_to_file(self):
|
|
state = base_job.job_state()
|
|
state.set_backing_file('outfile1')
|
|
self.assert_(os.path.exists('outfile1'))
|
|
|
|
|
|
def test_set_backing_file_updates_existing_file(self):
|
|
state1 = base_job.job_state()
|
|
state1.set_backing_file('second_file')
|
|
state1.set('ns0', 'var1x', 100)
|
|
state1.set_backing_file(None)
|
|
state2 = base_job.job_state()
|
|
state2.set_backing_file('first_file')
|
|
state2.set('ns0', 'var0x', 0)
|
|
state2.set_backing_file('second_file')
|
|
state2.set_backing_file(None)
|
|
state3 = base_job.job_state()
|
|
state3.read_from_file('second_file')
|
|
self.assertEqual(0, state3.get('ns0', 'var0x'))
|
|
self.assertEqual(100, state3.get('ns0', 'var1x'))
|
|
|
|
|
|
def test_set_backing_file_does_not_overwrite_previous_backing_file(self):
|
|
state1 = base_job.job_state()
|
|
state1.set_backing_file('second_file')
|
|
state1.set('ns0', 'var1y', 10)
|
|
state1.set_backing_file(None)
|
|
state2 = base_job.job_state()
|
|
state2.set_backing_file('first_file')
|
|
state2.set('ns0', 'var0y', -10)
|
|
state2.set_backing_file('second_file')
|
|
state2.set_backing_file(None)
|
|
state3 = base_job.job_state()
|
|
state3.read_from_file('first_file')
|
|
self.assertEqual(-10, state3.get('ns0', 'var0y'))
|
|
self.assertRaises(KeyError, state3.get, 'ns0', 'var1y')
|
|
|
|
|
|
def test_writes_stop_after_backing_file_removed(self):
|
|
state = base_job.job_state()
|
|
state.set('ns', 'var1', 'value1')
|
|
state.set_backing_file('outfile2')
|
|
state.set_backing_file(None)
|
|
os.remove('outfile2')
|
|
state.set('n2', 'var2', 'value2')
|
|
self.assert_(not os.path.exists('outfile2'))
|
|
|
|
|
|
def test_written_files_can_be_reloaded(self):
|
|
state1 = base_job.job_state()
|
|
state1.set_backing_file('outfile3')
|
|
state1.set('n3', 'var1', 67)
|
|
state1.set_backing_file(None)
|
|
state2 = base_job.job_state()
|
|
self.assertRaises(KeyError, state2.get, 'n3', 'var1')
|
|
state2.set_backing_file('outfile3')
|
|
self.assertEqual(67, state2.get('n3', 'var1'))
|
|
|
|
|
|
def test_backing_file_overrides_in_memory_values(self):
|
|
state1 = base_job.job_state()
|
|
state1.set_backing_file('outfile4')
|
|
state1.set('n4', 'var1', 42)
|
|
state1.set_backing_file(None)
|
|
state2 = base_job.job_state()
|
|
state2.set('n4', 'var1', 430)
|
|
self.assertEqual(430, state2.get('n4', 'var1'))
|
|
state2.set_backing_file('outfile4')
|
|
self.assertEqual(42, state2.get('n4', 'var1'))
|
|
|
|
|
|
def test_backing_file_only_overrides_values_it_defines(self):
|
|
state1 = base_job.job_state()
|
|
state1.set_backing_file('outfile5')
|
|
state1.set('n5', 'var1', 123)
|
|
state1.set_backing_file(None)
|
|
state2 = base_job.job_state()
|
|
state2.set('n5', 'var2', 456)
|
|
state2.set_backing_file('outfile5')
|
|
self.assertEqual(123, state2.get('n5', 'var1'))
|
|
self.assertEqual(456, state2.get('n5', 'var2'))
|
|
|
|
|
|
def test_shared_backing_file_propagates_state_to_get(self):
|
|
state1 = base_job.job_state()
|
|
state1.set_backing_file('outfile6')
|
|
state2 = base_job.job_state()
|
|
state2.set_backing_file('outfile6')
|
|
self.assertRaises(KeyError, state1.get, 'n6', 'shared1')
|
|
self.assertRaises(KeyError, state2.get, 'n6', 'shared1')
|
|
state1.set('n6', 'shared1', 345)
|
|
self.assertEqual(345, state1.get('n6', 'shared1'))
|
|
self.assertEqual(345, state2.get('n6', 'shared1'))
|
|
|
|
|
|
def test_shared_backing_file_propagates_state_to_has(self):
|
|
state1 = base_job.job_state()
|
|
state1.set_backing_file('outfile7')
|
|
state2 = base_job.job_state()
|
|
state2.set_backing_file('outfile7')
|
|
self.assertFalse(state1.has('n6', 'shared2'))
|
|
self.assertFalse(state2.has('n6', 'shared2'))
|
|
state1.set('n6', 'shared2', 'hello')
|
|
self.assertTrue(state1.has('n6', 'shared2'))
|
|
self.assertTrue(state2.has('n6', 'shared2'))
|
|
|
|
|
|
def test_shared_backing_file_propagates_state_from_discard(self):
|
|
state1 = base_job.job_state()
|
|
state1.set_backing_file('outfile8')
|
|
state1.set('n6', 'shared3', 10000)
|
|
state2 = base_job.job_state()
|
|
state2.set_backing_file('outfile8')
|
|
self.assertEqual(10000, state1.get('n6', 'shared3'))
|
|
self.assertEqual(10000, state2.get('n6', 'shared3'))
|
|
state1.discard('n6', 'shared3')
|
|
self.assertRaises(KeyError, state1.get, 'n6', 'shared3')
|
|
self.assertRaises(KeyError, state2.get, 'n6', 'shared3')
|
|
|
|
|
|
def test_shared_backing_file_propagates_state_from_discard_namespace(self):
|
|
state1 = base_job.job_state()
|
|
state1.set_backing_file('outfile9')
|
|
state1.set('n7', 'shared4', -1)
|
|
state1.set('n7', 'shared5', -2)
|
|
state2 = base_job.job_state()
|
|
state2.set_backing_file('outfile9')
|
|
self.assertEqual(-1, state1.get('n7', 'shared4'))
|
|
self.assertEqual(-1, state2.get('n7', 'shared4'))
|
|
self.assertEqual(-2, state1.get('n7', 'shared5'))
|
|
self.assertEqual(-2, state2.get('n7', 'shared5'))
|
|
state1.discard_namespace('n7')
|
|
self.assertRaises(KeyError, state1.get, 'n7', 'shared4')
|
|
self.assertRaises(KeyError, state2.get, 'n7', 'shared4')
|
|
self.assertRaises(KeyError, state1.get, 'n7', 'shared5')
|
|
self.assertRaises(KeyError, state2.get, 'n7', 'shared5')
|
|
|
|
|
|
class test_job_state_backing_file_locking(unittest.TestCase):
|
|
def setUp(self):
|
|
self.testdir = tempfile.mkdtemp(suffix='unittest')
|
|
self.original_wd = os.getcwd()
|
|
os.chdir(self.testdir)
|
|
|
|
# create a job_state object with stub read_* and write_* methods
|
|
# to check that a lock is always held during a call to them
|
|
ut_self = self
|
|
class mocked_job_state(base_job.job_state):
|
|
def read_from_file(self, file_path, merge=True):
|
|
if self._backing_file and file_path == self._backing_file:
|
|
ut_self.assertNotEqual(None, self._backing_file_lock)
|
|
return super(mocked_job_state, self).read_from_file(
|
|
file_path, merge=True)
|
|
def write_to_file(self, file_path):
|
|
if self._backing_file and file_path == self._backing_file:
|
|
ut_self.assertNotEqual(None, self._backing_file_lock)
|
|
return super(mocked_job_state, self).write_to_file(file_path)
|
|
self.state = mocked_job_state()
|
|
self.state.set_backing_file('backing_file')
|
|
|
|
|
|
def tearDown(self):
|
|
os.chdir(self.original_wd)
|
|
shutil.rmtree(self.testdir, ignore_errors=True)
|
|
|
|
|
|
def test_set(self):
|
|
self.state.set('ns1', 'var1', 100)
|
|
|
|
|
|
def test_get_missing(self):
|
|
self.assertRaises(KeyError, self.state.get, 'ns2', 'var2')
|
|
|
|
|
|
def test_get_present(self):
|
|
self.state.set('ns3', 'var3', 333)
|
|
self.assertEqual(333, self.state.get('ns3', 'var3'))
|
|
|
|
|
|
def test_set_backing_file(self):
|
|
self.state.set_backing_file('some_other_file')
|
|
|
|
|
|
def test_has_missing(self):
|
|
self.assertFalse(self.state.has('ns4', 'var4'))
|
|
|
|
|
|
def test_has_present(self):
|
|
self.state.set('ns5', 'var5', 55555)
|
|
self.assertTrue(self.state.has('ns5', 'var5'))
|
|
|
|
|
|
def test_discard_missing(self):
|
|
self.state.discard('ns6', 'var6')
|
|
|
|
|
|
def test_discard_present(self):
|
|
self.state.set('ns7', 'var7', -777)
|
|
self.state.discard('ns7', 'var7')
|
|
|
|
|
|
def test_discard_missing_namespace(self):
|
|
self.state.discard_namespace('ns8')
|
|
|
|
|
|
def test_discard_present_namespace(self):
|
|
self.state.set('ns8', 'var8', 80)
|
|
self.state.set('ns8', 'var8.1', 81)
|
|
self.state.discard_namespace('ns8')
|
|
|
|
|
|
def test_disable_backing_file(self):
|
|
self.state.set_backing_file(None)
|
|
|
|
|
|
def test_change_backing_file(self):
|
|
self.state.set_backing_file('another_backing_file')
|
|
|
|
|
|
def test_read_from_a_non_backing_file(self):
|
|
state = base_job.job_state()
|
|
state.set('ns9', 'var9', 9999)
|
|
state.write_to_file('non_backing_file')
|
|
self.state.read_from_file('non_backing_file')
|
|
|
|
|
|
def test_write_to_a_non_backing_file(self):
|
|
self.state.write_to_file('non_backing_file')
|
|
|
|
|
|
class test_job_state_property_factory(unittest.TestCase):
|
|
def setUp(self):
|
|
class job_stub(object):
|
|
pass
|
|
self.job_class = job_stub
|
|
self.job = job_stub()
|
|
self.state = base_job.job_state()
|
|
self.job.stateobj = self.state
|
|
|
|
|
|
def test_properties_are_readwrite(self):
|
|
self.job_class.testprop1 = base_job.job_state.property_factory(
|
|
'stateobj', 'testprop1', 1)
|
|
self.job.testprop1 = 'testvalue'
|
|
self.assertEqual('testvalue', self.job.testprop1)
|
|
|
|
|
|
def test_properties_use_default_if_not_initialized(self):
|
|
self.job_class.testprop2 = base_job.job_state.property_factory(
|
|
'stateobj', 'testprop2', 'abc123')
|
|
self.assertEqual('abc123', self.job.testprop2)
|
|
|
|
|
|
def test_properties_do_not_collisde(self):
|
|
self.job_class.testprop3 = base_job.job_state.property_factory(
|
|
'stateobj', 'testprop3', 2)
|
|
self.job_class.testprop4 = base_job.job_state.property_factory(
|
|
'stateobj', 'testprop4', 3)
|
|
self.job.testprop3 = 500
|
|
self.job.testprop4 = '1000'
|
|
self.assertEqual(500, self.job.testprop3)
|
|
self.assertEqual('1000', self.job.testprop4)
|
|
|
|
|
|
def test_properties_do_not_collide_across_different_state_objects(self):
|
|
self.job_class.testprop5 = base_job.job_state.property_factory(
|
|
'stateobj', 'testprop5', 55)
|
|
self.job.auxstateobj = base_job.job_state()
|
|
self.job_class.auxtestprop5 = base_job.job_state.property_factory(
|
|
'auxstateobj', 'testprop5', 600)
|
|
self.job.auxtestprop5 = 700
|
|
self.assertEqual(55, self.job.testprop5)
|
|
self.assertEqual(700, self.job.auxtestprop5)
|
|
|
|
|
|
def test_properties_do_not_collide_across_different_job_objects(self):
|
|
self.job_class.testprop6 = base_job.job_state.property_factory(
|
|
'stateobj', 'testprop6', 'defaultval')
|
|
job1 = self.job
|
|
job2 = self.job_class()
|
|
job2.stateobj = base_job.job_state()
|
|
job1.testprop6 = 'notdefaultval'
|
|
self.assertEqual('notdefaultval', job1.testprop6)
|
|
self.assertEqual('defaultval', job2.testprop6)
|
|
job2.testprop6 = 'job2val'
|
|
self.assertEqual('notdefaultval', job1.testprop6)
|
|
self.assertEqual('job2val', job2.testprop6)
|
|
|
|
def test_properties_in_different_namespaces_do_not_collide(self):
|
|
self.job_class.ns1 = base_job.job_state.property_factory(
|
|
'stateobj', 'attribute', 'default1', namespace='ns1')
|
|
self.job_class.ns2 = base_job.job_state.property_factory(
|
|
'stateobj', 'attribute', 'default2', namespace='ns2')
|
|
self.assertEqual('default1', self.job.ns1)
|
|
self.assertEqual('default2', self.job.ns2)
|
|
self.job.ns1 = 'notdefault'
|
|
self.job.ns2 = 'alsonotdefault'
|
|
self.assertEqual('notdefault', self.job.ns1)
|
|
self.assertEqual('alsonotdefault', self.job.ns2)
|
|
|
|
|
|
class test_status_log_entry(unittest.TestCase):
|
|
def test_accepts_valid_status_code(self):
|
|
base_job.status_log_entry('GOOD', None, None, '', None)
|
|
base_job.status_log_entry('FAIL', None, None, '', None)
|
|
base_job.status_log_entry('ABORT', None, None, '', None)
|
|
|
|
|
|
def test_accepts_valid_start_status_code(self):
|
|
base_job.status_log_entry('START', None, None, '', None)
|
|
|
|
|
|
def test_accepts_valid_end_status_code(self):
|
|
base_job.status_log_entry('END GOOD', None, None, '', None)
|
|
base_job.status_log_entry('END FAIL', None, None, '', None)
|
|
base_job.status_log_entry('END ABORT', None, None, '', None)
|
|
|
|
|
|
def test_rejects_invalid_status_code(self):
|
|
self.assertRaises(ValueError, base_job.status_log_entry,
|
|
'FAKE', None, None, '', None)
|
|
|
|
|
|
def test_rejects_invalid_start_status_code(self):
|
|
self.assertRaises(ValueError, base_job.status_log_entry,
|
|
'START GOOD', None, None, '', None)
|
|
self.assertRaises(ValueError, base_job.status_log_entry,
|
|
'START FAIL', None, None, '', None)
|
|
self.assertRaises(ValueError, base_job.status_log_entry,
|
|
'START ABORT', None, None, '', None)
|
|
self.assertRaises(ValueError, base_job.status_log_entry,
|
|
'START FAKE', None, None, '', None)
|
|
|
|
|
|
def test_rejects_invalid_end_status_code(self):
|
|
self.assertRaises(ValueError, base_job.status_log_entry,
|
|
'END FAKE', None, None, '', None)
|
|
|
|
|
|
def test_accepts_valid_subdir(self):
|
|
base_job.status_log_entry('GOOD', 'subdir', None, '', None)
|
|
base_job.status_log_entry('FAIL', 'good.subdir', None, '', None)
|
|
|
|
|
|
def test_rejects_bad_subdir(self):
|
|
self.assertRaises(ValueError, base_job.status_log_entry,
|
|
'GOOD', 'bad.subdir\t', None, '', None)
|
|
self.assertRaises(ValueError, base_job.status_log_entry,
|
|
'GOOD', 'bad.subdir\t', None, '', None)
|
|
self.assertRaises(ValueError, base_job.status_log_entry,
|
|
'GOOD', 'bad.subdir\t', None, '', None)
|
|
self.assertRaises(ValueError, base_job.status_log_entry,
|
|
'GOOD', 'bad.subdir\t', None, '', None)
|
|
self.assertRaises(ValueError, base_job.status_log_entry,
|
|
'GOOD', 'bad.subdir\t', None, '', None)
|
|
|
|
|
|
def test_accepts_valid_operation(self):
|
|
base_job.status_log_entry('GOOD', None, 'build', '', None)
|
|
base_job.status_log_entry('FAIL', None, 'clean', '', None)
|
|
|
|
|
|
def test_rejects_bad_operation(self):
|
|
self.assertRaises(ValueError, base_job.status_log_entry,
|
|
'GOOD', None, 'bad.operation\n', '', None)
|
|
self.assertRaises(ValueError, base_job.status_log_entry,
|
|
'GOOD', None, 'bad.\voperation', '', None)
|
|
self.assertRaises(ValueError, base_job.status_log_entry,
|
|
'GOOD', None, 'bad.\foperation', '', None)
|
|
self.assertRaises(ValueError, base_job.status_log_entry,
|
|
'GOOD', None, 'bad\r.operation', '', None)
|
|
self.assertRaises(ValueError, base_job.status_log_entry,
|
|
'GOOD', None, '\tbad.operation', '', None)
|
|
|
|
|
|
def test_simple_message(self):
|
|
base_job.status_log_entry('ERROR', None, None, 'simple error message',
|
|
None)
|
|
|
|
|
|
def test_message_split_into_multiple_lines(self):
|
|
def make_entry(msg):
|
|
return base_job.status_log_entry('GOOD', None, None, msg, None)
|
|
base_job.status_log_entry('ABORT', None, None, 'first line\nsecond',
|
|
None)
|
|
|
|
|
|
def test_message_with_tabs(self):
|
|
base_job.status_log_entry('GOOD', None, None, '\tindent\tagain', None)
|
|
|
|
|
|
def test_message_with_custom_fields(self):
|
|
base_job.status_log_entry('GOOD', None, None, 'my message',
|
|
{'key1': 'blah', 'key2': 'blahblah'})
|
|
|
|
|
|
def assertRendered(self, rendered, status, subdir, operation, msg,
|
|
extra_fields, timestamp):
|
|
parts = rendered.split('\t')
|
|
self.assertEqual(parts[0], status)
|
|
self.assertEqual(parts[1], subdir)
|
|
self.assertEqual(parts[2], operation)
|
|
self.assertEqual(parts[-1], msg)
|
|
fields = dict(f.split('=', 1) for f in parts[3:-1])
|
|
self.assertEqual(int(fields['timestamp']), timestamp)
|
|
self.assert_('localtime' in fields) # too flaky to do an exact check
|
|
del fields['timestamp']
|
|
del fields['localtime']
|
|
self.assertEqual(fields, extra_fields)
|
|
|
|
|
|
def test_base_render(self):
|
|
entry = base_job.status_log_entry('GOOD', None, None, 'message1', None,
|
|
timestamp=1)
|
|
self.assertRendered(entry.render(), 'GOOD', '----', '----', 'message1',
|
|
{}, 1)
|
|
|
|
|
|
def test_subdir_render(self):
|
|
entry = base_job.status_log_entry('FAIL', 'sub', None, 'message2', None,
|
|
timestamp=2)
|
|
self.assertRendered(entry.render(), 'FAIL', 'sub', '----', 'message2',
|
|
{}, 2)
|
|
|
|
|
|
def test_operation_render(self):
|
|
entry = base_job.status_log_entry('ABORT', None, 'myop', 'message3',
|
|
None, timestamp=4)
|
|
self.assertRendered(entry.render(), 'ABORT', '----', 'myop', 'message3',
|
|
{}, 4)
|
|
|
|
|
|
def test_fields_render(self):
|
|
custom_fields = {'custom1': 'foo', 'custom2': 'bar'}
|
|
entry = base_job.status_log_entry('WARN', None, None, 'message4',
|
|
custom_fields, timestamp=8)
|
|
self.assertRendered(entry.render(), 'WARN', '----', '----', 'message4',
|
|
custom_fields, 8)
|
|
|
|
|
|
def assertEntryEqual(self, lhs, rhs):
|
|
self.assertEqual(
|
|
(lhs.status_code, lhs.subdir, lhs.operation, lhs.fields, lhs.message),
|
|
(rhs.status_code, rhs.subdir, rhs.operation, rhs.fields, rhs.message))
|
|
|
|
|
|
def test_base_parse(self):
|
|
entry = base_job.status_log_entry(
|
|
'GOOD', None, None, 'message', {'field1': 'x', 'field2': 'y'},
|
|
timestamp=16)
|
|
parsed_entry = base_job.status_log_entry.parse(
|
|
'GOOD\t----\t----\tfield1=x\tfield2=y\ttimestamp=16\tmessage\n')
|
|
self.assertEntryEqual(entry, parsed_entry)
|
|
|
|
|
|
def test_subdir_parse(self):
|
|
entry = base_job.status_log_entry(
|
|
'FAIL', 'sub', None, 'message', {'field1': 'x', 'field2': 'y'},
|
|
timestamp=32)
|
|
parsed_entry = base_job.status_log_entry.parse(
|
|
'FAIL\tsub\t----\tfield1=x\tfield2=y\ttimestamp=32\tmessage\n')
|
|
self.assertEntryEqual(entry, parsed_entry)
|
|
|
|
|
|
def test_operation_parse(self):
|
|
entry = base_job.status_log_entry(
|
|
'ABORT', None, 'myop', 'message', {'field1': 'x', 'field2': 'y'},
|
|
timestamp=64)
|
|
parsed_entry = base_job.status_log_entry.parse(
|
|
'ABORT\t----\tmyop\tfield1=x\tfield2=y\ttimestamp=64\tmessage\n')
|
|
self.assertEntryEqual(entry, parsed_entry)
|
|
|
|
|
|
def test_extra_lines_parse(self):
|
|
parsed_entry = base_job.status_log_entry.parse(
|
|
' This is a non-status line, line in a traceback\n')
|
|
self.assertEqual(None, parsed_entry)
|
|
|
|
|
|
class test_status_logger(unittest.TestCase):
|
|
def setUp(self):
|
|
self.testdir = tempfile.mkdtemp(suffix='unittest')
|
|
self.original_wd = os.getcwd()
|
|
os.chdir(self.testdir)
|
|
|
|
class stub_job(object):
|
|
resultdir = self.testdir
|
|
self.job = stub_job() # need to hold a reference to the job
|
|
class stub_indenter(object):
|
|
def __init__(self):
|
|
self.indent = 0
|
|
def increment(self):
|
|
self.indent += 1
|
|
def decrement(self):
|
|
self.indent -= 1
|
|
self.indenter = stub_indenter()
|
|
self.logger = base_job.status_logger(self.job, self.indenter)
|
|
|
|
|
|
def make_dummy_entry(self, rendered_text, start=False, end=False,
|
|
subdir=None):
|
|
"""Helper to make a dummy status log entry with custom rendered text.
|
|
|
|
Helpful when validating the logging since it lets the test control
|
|
the rendered text and so it doesn't depend on the exact formatting
|
|
of a "real" status log entry.
|
|
|
|
@param rendred_text: The value to return when rendering the entry.
|
|
@param start: An optional value indicating if this should be the start
|
|
of a nested group.
|
|
@param end: An optional value indicating if this should be the end
|
|
of a nested group.
|
|
@param subdir: An optional value to use for the entry subdir field.
|
|
|
|
@return: A dummy status log entry object with the given subdir field
|
|
and a render implementation that returns rendered_text.
|
|
"""
|
|
assert not start or not end # real entries would never be both
|
|
class dummy_entry(object):
|
|
def is_start(self):
|
|
return start
|
|
def is_end(self):
|
|
return end
|
|
def render(self):
|
|
return rendered_text
|
|
entry = dummy_entry()
|
|
entry.subdir = subdir
|
|
return entry
|
|
|
|
|
|
def test_render_includes_indent(self):
|
|
entry = self.make_dummy_entry('LINE0')
|
|
self.assertEqual('LINE0', self.logger.render_entry(entry))
|
|
self.indenter.increment()
|
|
self.indenter.increment()
|
|
self.assertEqual('\t\tLINE0', self.logger.render_entry(entry))
|
|
|
|
|
|
def test_render_handles_start(self):
|
|
entry = self.make_dummy_entry('LINE10', start=True)
|
|
self.indenter.increment()
|
|
self.assertEqual('\tLINE10', self.logger.render_entry(entry))
|
|
|
|
|
|
def test_render_handles_end(self):
|
|
entry = self.make_dummy_entry('LINE20', end=True)
|
|
self.indenter.increment()
|
|
self.indenter.increment()
|
|
self.indenter.increment()
|
|
self.assertEqual('\t\tLINE20', self.logger.render_entry(entry))
|
|
|
|
|
|
def test_writes_toplevel_log(self):
|
|
entries = [self.make_dummy_entry('LINE%d' % x) for x in range(3)]
|
|
for entry in entries:
|
|
self.logger.record_entry(entry)
|
|
self.assertEqual('LINE0\nLINE1\nLINE2\n', open('status').read())
|
|
|
|
|
|
def test_uses_given_filenames(self):
|
|
os.mkdir('sub')
|
|
self.logger = base_job.status_logger(self.job, self.indenter,
|
|
global_filename='global.log',
|
|
subdir_filename='subdir.log')
|
|
self.logger.record_entry(self.make_dummy_entry('LINE1', subdir='sub'))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE2', subdir='sub'))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE3'))
|
|
|
|
self.assertEqual('LINE1\nLINE2\nLINE3\n', open('global.log').read())
|
|
self.assertEqual('LINE1\nLINE2\n', open('sub/subdir.log').read())
|
|
|
|
self.assertFalse(os.path.exists('status'))
|
|
self.assertFalse(os.path.exists('sub/status'))
|
|
self.assertFalse(os.path.exists('subdir.log'))
|
|
self.assertFalse(os.path.exists('sub/global.log'))
|
|
|
|
|
|
def test_filenames_are_mutable(self):
|
|
os.mkdir('sub2')
|
|
self.logger = base_job.status_logger(self.job, self.indenter,
|
|
global_filename='global.log',
|
|
subdir_filename='subdir.log')
|
|
self.logger.record_entry(self.make_dummy_entry('LINE1', subdir='sub2'))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE2'))
|
|
self.logger.global_filename = 'global.log2'
|
|
self.logger.subdir_filename = 'subdir.log2'
|
|
self.logger.record_entry(self.make_dummy_entry('LINE3', subdir='sub2'))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE4'))
|
|
|
|
self.assertEqual('LINE1\nLINE2\n', open('global.log').read())
|
|
self.assertEqual('LINE1\n', open('sub2/subdir.log').read())
|
|
self.assertEqual('LINE3\nLINE4\n', open('global.log2').read())
|
|
self.assertEqual('LINE3\n', open('sub2/subdir.log2').read())
|
|
|
|
|
|
def test_writes_subdir_logs(self):
|
|
os.mkdir('abc')
|
|
os.mkdir('123')
|
|
self.logger.record_entry(self.make_dummy_entry('LINE1'))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE2', subdir='abc'))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE3', subdir='abc'))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE4', subdir='123'))
|
|
|
|
self.assertEqual('LINE1\nLINE2\nLINE3\nLINE4\n', open('status').read())
|
|
self.assertEqual('LINE2\nLINE3\n', open('abc/status').read())
|
|
self.assertEqual('LINE4\n', open('123/status').read())
|
|
|
|
|
|
def test_writes_no_subdir_when_disabled(self):
|
|
os.mkdir('sub')
|
|
self.logger.record_entry(self.make_dummy_entry('LINE1'))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE2', subdir='sub'))
|
|
self.logger.record_entry(self.make_dummy_entry(
|
|
'LINE3', subdir='sub_nowrite'), log_in_subdir=False)
|
|
self.logger.record_entry(self.make_dummy_entry('LINE4', subdir='sub'))
|
|
|
|
self.assertEqual('LINE1\nLINE2\nLINE3\nLINE4\n', open('status').read())
|
|
self.assertEqual('LINE2\nLINE4\n', open('sub/status').read())
|
|
self.assert_(not os.path.exists('sub_nowrite/status'))
|
|
|
|
|
|
def test_indentation(self):
|
|
self.logger.record_entry(self.make_dummy_entry('LINE1', start=True))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE2'))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE3', start=True))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE4'))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE5'))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE6', end=True))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE7', end=True))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE8'))
|
|
|
|
expected_log = ('LINE1\n\tLINE2\n\tLINE3\n\t\tLINE4\n\t\tLINE5\n'
|
|
'\tLINE6\nLINE7\nLINE8\n')
|
|
self.assertEqual(expected_log, open('status').read())
|
|
|
|
|
|
def test_multiline_indent(self):
|
|
self.logger.record_entry(self.make_dummy_entry('LINE1\n blah\n'))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE2', start=True))
|
|
self.logger.record_entry(
|
|
self.make_dummy_entry('LINE3\n blah\n two\n'))
|
|
self.logger.record_entry(self.make_dummy_entry('LINE4', end=True))
|
|
|
|
expected_log = ('LINE1\n blah\nLINE2\n'
|
|
'\tLINE3\n blah\n two\nLINE4\n')
|
|
self.assertEqual(expected_log, open('status').read())
|
|
|
|
|
|
def test_hook_is_called(self):
|
|
entries = [self.make_dummy_entry('LINE%d' % x) for x in range(5)]
|
|
recorded_entries = []
|
|
def hook(entry):
|
|
recorded_entries.append(entry)
|
|
self.logger = base_job.status_logger(self.job, self.indenter,
|
|
record_hook=hook)
|
|
for entry in entries:
|
|
self.logger.record_entry(entry)
|
|
self.assertEqual(entries, recorded_entries)
|
|
|
|
|
|
def tearDown(self):
|
|
os.chdir(self.original_wd)
|
|
shutil.rmtree(self.testdir, ignore_errors=True)
|
|
|
|
|
|
class test_job_tags(unittest.TestCase):
|
|
def setUp(self):
|
|
class stub_job(base_job.base_job):
|
|
_job_directory = stub_job_directory
|
|
@classmethod
|
|
def _find_base_directories(cls):
|
|
return '/autodir', '/autodir/client', '/autodir/server'
|
|
def _find_resultdir(self):
|
|
return '/autodir/results'
|
|
self.job = stub_job()
|
|
|
|
|
|
def test_default_with_no_args_means_no_tags(self):
|
|
self.assertEqual(('testname', 'testname', ''),
|
|
self.job._build_tagged_test_name('testname', {}))
|
|
self.assertEqual(('othername', 'othername', ''),
|
|
self.job._build_tagged_test_name('othername', {}))
|
|
|
|
|
|
def test_tag_argument_appended(self):
|
|
self.assertEqual(
|
|
('test1.mytag', 'test1.mytag', 'mytag'),
|
|
self.job._build_tagged_test_name('test1', {'tag': 'mytag'}))
|
|
|
|
|
|
def test_turning_on_use_sequence_adds_sequence_tags(self):
|
|
self.job.use_sequence_number = True
|
|
self.assertEqual(
|
|
('test2._01_', 'test2._01_', '_01_'),
|
|
self.job._build_tagged_test_name('test2', {}))
|
|
self.assertEqual(
|
|
('test2._02_', 'test2._02_', '_02_'),
|
|
self.job._build_tagged_test_name('test2', {}))
|
|
self.assertEqual(
|
|
('test3._03_', 'test3._03_', '_03_'),
|
|
self.job._build_tagged_test_name('test3', {}))
|
|
|
|
|
|
def test_adding_automatic_test_tag_automatically_tags(self):
|
|
self.job.automatic_test_tag = 'autotag'
|
|
self.assertEqual(
|
|
('test4.autotag', 'test4.autotag', 'autotag'),
|
|
self.job._build_tagged_test_name('test4', {}))
|
|
|
|
|
|
def test_none_automatic_test_tag_turns_off_tagging(self):
|
|
self.job.automatic_test_tag = 'autotag'
|
|
self.assertEqual(
|
|
('test5.autotag', 'test5.autotag', 'autotag'),
|
|
self.job._build_tagged_test_name('test5', {}))
|
|
self.job.automatic_test_tag = None
|
|
self.assertEqual(
|
|
('test5', 'test5', ''),
|
|
self.job._build_tagged_test_name('test5', {}))
|
|
|
|
|
|
def test_empty_automatic_test_tag_turns_off_tagging(self):
|
|
self.job.automatic_test_tag = 'autotag'
|
|
self.assertEqual(
|
|
('test6.autotag', 'test6.autotag', 'autotag'),
|
|
self.job._build_tagged_test_name('test6', {}))
|
|
self.job.automatic_test_tag = ''
|
|
self.assertEqual(
|
|
('test6', 'test6', ''),
|
|
self.job._build_tagged_test_name('test6', {}))
|
|
|
|
|
|
def test_subdir_tag_modifies_subdir_and_tag_only(self):
|
|
self.assertEqual(
|
|
('test7', 'test7.subdirtag', 'subdirtag'),
|
|
self.job._build_tagged_test_name('test7',
|
|
{'subdir_tag': 'subdirtag'}))
|
|
|
|
|
|
def test_all_tag_components_together(self):
|
|
self.job.use_sequence_number = True
|
|
self.job.automatic_test_tag = 'auto'
|
|
expected = ('test8.tag._01_.auto',
|
|
'test8.tag._01_.auto.subdir',
|
|
'tag._01_.auto.subdir')
|
|
actual = self.job._build_tagged_test_name(
|
|
'test8', {'tag': 'tag', 'subdir_tag': 'subdir'})
|
|
self.assertEqual(expected, actual)
|
|
|
|
|
|
def test_subtest_with_master_test_path_and_subdir(self):
|
|
self.assertEqual(
|
|
('test9', 'subtestdir/test9.subdirtag', 'subdirtag'),
|
|
self.job._build_tagged_test_name('test9',
|
|
{'main_testpath': 'subtestdir',
|
|
'subdir_tag': 'subdirtag'}))
|
|
|
|
|
|
def test_subtest_all_tag_components_together_subdir(self):
|
|
self.job.use_sequence_number = True
|
|
self.job.automatic_test_tag = 'auto'
|
|
expected = ('test10.tag._01_.auto',
|
|
'subtestdir/test10.tag._01_.auto.subdir',
|
|
'tag._01_.auto.subdir')
|
|
actual = self.job._build_tagged_test_name(
|
|
'test10', {'tag': 'tag', 'subdir_tag': 'subdir',
|
|
'main_testpath': 'subtestdir'})
|
|
self.assertEqual(expected, actual)
|
|
|
|
|
|
class test_make_outputdir(unittest.TestCase):
|
|
def setUp(self):
|
|
self.resultdir = tempfile.mkdtemp(suffix='unittest')
|
|
class stub_job(base_job.base_job):
|
|
@classmethod
|
|
def _find_base_directories(cls):
|
|
return '/autodir', '/autodir/client', '/autodir/server'
|
|
@classmethod
|
|
def _find_resultdir(cls):
|
|
return self.resultdir
|
|
|
|
# stub out _job_directory for creation only
|
|
stub_job._job_directory = stub_job_directory
|
|
self.job = stub_job()
|
|
del stub_job._job_directory
|
|
|
|
# stub out logging.exception
|
|
self.original_exception = logging.exception
|
|
logging.exception = lambda *args, **dargs: None
|
|
|
|
self.original_wd = os.getcwd()
|
|
os.chdir(self.resultdir)
|
|
|
|
|
|
def tearDown(self):
|
|
logging.exception = self.original_exception
|
|
os.chdir(self.original_wd)
|
|
shutil.rmtree(self.resultdir, ignore_errors=True)
|
|
|
|
|
|
def test_raises_test_error_if_outputdir_exists(self):
|
|
os.mkdir('subdir1')
|
|
self.assert_(os.path.exists('subdir1'))
|
|
self.assertRaises(error.TestError, self.job._make_test_outputdir,
|
|
'subdir1')
|
|
|
|
|
|
def test_raises_test_error_if_outputdir_uncreatable(self):
|
|
os.chmod(self.resultdir, stat.S_IRUSR | stat.S_IXUSR)
|
|
self.assert_(not os.path.exists('subdir2'))
|
|
self.assertRaises(OSError, os.mkdir, 'subdir2')
|
|
self.assertRaises(error.TestError, self.job._make_test_outputdir,
|
|
'subdir2')
|
|
self.assert_(not os.path.exists('subdir2'))
|
|
|
|
|
|
def test_creates_writable_directory(self):
|
|
self.assert_(not os.path.exists('subdir3'))
|
|
self.job._make_test_outputdir('subdir3')
|
|
self.assert_(os.path.isdir('subdir3'))
|
|
|
|
# we can write to the directory afterwards
|
|
self.assert_(not os.path.exists('subdir3/testfile'))
|
|
open('subdir3/testfile', 'w').close()
|
|
self.assert_(os.path.isfile('subdir3/testfile'))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|