android13/external/avb/test/at_auth_unlock_unittest.py

520 lines
20 KiB
Python

#!/usr/bin/env python
#
# Copyright 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit tests for at_auth_unlock."""
import argparse
import filecmp
import os
import shutil
import subprocess
import unittest
from at_auth_unlock import *
from Crypto.PublicKey import RSA
from unittest.mock import patch
def dataPath(file):
return os.path.join(os.path.dirname(__file__), 'data', file)
DATA_FILE_PIK_CERTIFICATE = dataPath('atx_pik_certificate.bin')
DATA_FILE_PUK_CERTIFICATE = dataPath('atx_puk_certificate.bin')
DATA_FILE_PUK_KEY = dataPath('testkey_atx_puk.pem')
DATA_FILE_UNLOCK_CHALLENGE = dataPath('atx_unlock_challenge.bin')
DATA_FILE_UNLOCK_CREDENTIAL = dataPath('atx_unlock_credential.bin')
def createTempZip(contents):
tempzip = tempfile.NamedTemporaryFile()
with zipfile.ZipFile(tempzip, 'w') as zip:
for arcname in contents:
zip.write(contents[arcname], arcname)
return tempzip
def validUnlockCredsZip():
return createTempZip({
'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE,
'puk_v1.pem': DATA_FILE_PUK_KEY
})
class UnlockCredentialsTest(unittest.TestCase):
def testFromValidZipArchive(self):
with validUnlockCredsZip() as zip:
creds = UnlockCredentials.from_credential_archive(zip)
self.assertIsNotNone(creds.intermediate_cert)
self.assertIsNotNone(creds.unlock_cert)
self.assertIsNotNone(creds.unlock_key)
def testFromInvalidZipArchive(self):
with self.assertRaises(zipfile.BadZipfile):
UnlockCredentials.from_credential_archive(DATA_FILE_PUK_KEY)
def testFromArchiveMissingPikCertificate(self):
with createTempZip({
'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE,
'puk_v1.pem': DATA_FILE_PUK_KEY
}) as zip:
with self.assertRaises(ValueError):
UnlockCredentials.from_credential_archive(zip)
def testFromArchiveMissingPukCertificate(self):
with createTempZip({
'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
'puk_v1.pem': DATA_FILE_PUK_KEY
}) as zip:
with self.assertRaises(ValueError):
UnlockCredentials.from_credential_archive(zip)
def testFromArchiveMissingPuk(self):
with createTempZip({
'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE,
}) as zip:
with self.assertRaises(ValueError):
UnlockCredentials.from_credential_archive(zip)
def testFromArchiveMultiplePikCertificates(self):
with createTempZip({
'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
'pik_certificate_v2.bin': DATA_FILE_PIK_CERTIFICATE,
'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE,
'puk_v1.pem': DATA_FILE_PUK_KEY
}) as zip:
with self.assertRaises(ValueError):
UnlockCredentials.from_credential_archive(zip)
def testFromArchiveMultiplePukCertificates(self):
with createTempZip({
'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE,
'puk_certificate_v2.bin': DATA_FILE_PUK_CERTIFICATE,
'puk_v1.pem': DATA_FILE_PUK_KEY
}) as zip:
with self.assertRaises(ValueError):
UnlockCredentials.from_credential_archive(zip)
def testFromArchiveMultiplePuks(self):
with createTempZip({
'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE,
'puk_v1.pem': DATA_FILE_PUK_KEY,
'puk_v2.pem': DATA_FILE_PUK_KEY
}) as zip:
with self.assertRaises(ValueError):
UnlockCredentials.from_credential_archive(zip)
def testFromFiles(self):
creds = UnlockCredentials(
intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE,
unlock_cert_file=DATA_FILE_PUK_CERTIFICATE,
unlock_key_file=DATA_FILE_PUK_KEY)
self.assertIsNotNone(creds.intermediate_cert)
self.assertIsNotNone(creds.unlock_cert)
self.assertIsNotNone(creds.unlock_key)
def testInvalidPuk(self):
with self.assertRaises(ValueError):
UnlockCredentials(
intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE,
unlock_cert_file=DATA_FILE_PUK_CERTIFICATE,
unlock_key_file=DATA_FILE_PUK_CERTIFICATE)
def testPukNotPrivateKey(self):
tempdir = tempfile.mkdtemp()
try:
with open(DATA_FILE_PUK_KEY, 'rb') as f:
key = RSA.importKey(f.read())
pubkey = os.path.join(tempdir, 'pubkey.pub')
with open(pubkey, 'wb') as f:
f.write(key.publickey().exportKey())
with self.assertRaises(ValueError):
UnlockCredentials(
intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE,
unlock_cert_file=DATA_FILE_PUK_CERTIFICATE,
unlock_key_file=pubkey)
finally:
shutil.rmtree(tempdir)
def testWrongSizeCerts(self):
pik_cert = DATA_FILE_PIK_CERTIFICATE
tempdir = tempfile.mkdtemp()
try:
# Copy a valid cert and truncate a single byte from the end to create a
# too-short cert.
shortfile = os.path.join(tempdir, 'shortfile.bin')
shutil.copy2(pik_cert, shortfile)
with open(shortfile, 'ab') as f:
f.seek(-1, os.SEEK_END)
f.truncate()
with self.assertRaises(ValueError):
creds = UnlockCredentials(
intermediate_cert_file=shortfile,
unlock_cert_file=DATA_FILE_PUK_CERTIFICATE,
unlock_key_file=DATA_FILE_PUK_KEY)
with self.assertRaises(ValueError):
creds = UnlockCredentials(
intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE,
unlock_cert_file=shortfile,
unlock_key_file=DATA_FILE_PUK_KEY)
# Copy a valid cert and append an arbitrary byte on the end to create a
# too-long cert.
longfile = os.path.join(tempdir, 'longfile.bin')
shutil.copy2(pik_cert, longfile)
with open(longfile, 'ab') as f:
f.write(b'\0')
with self.assertRaises(ValueError):
creds = UnlockCredentials(
intermediate_cert_file=longfile,
unlock_cert_file=DATA_FILE_PUK_CERTIFICATE,
unlock_key_file=DATA_FILE_PUK_KEY)
with self.assertRaises(ValueError):
creds = UnlockCredentials(
intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE,
unlock_cert_file=longfile,
unlock_key_file=DATA_FILE_PUK_KEY)
finally:
shutil.rmtree(tempdir)
def writeFullUnlockChallenge(out_file, product_id_hash=None):
"""Helper function to create a file with a full AvbAtxUnlockChallenge struct.
Arguments:
product_id_hash: [optional] 32 byte value to include in the challenge as the
SHA256 hash of the product ID. If not provided, will default to the
product ID hash from the subject of DATA_FILE_PUK_CERTIFICATE.
"""
if product_id_hash is None:
with open(DATA_FILE_PUK_CERTIFICATE, 'rb') as f:
product_id_hash = GetAtxCertificateSubject(f.read())
assert len(product_id_hash) == 32
with open(out_file, 'wb') as out:
out.write(struct.pack('<I', 1))
out.write(product_id_hash)
with open(DATA_FILE_UNLOCK_CHALLENGE, 'rb') as f:
out.write(f.read())
class MakeAtxUnlockCredentialTest(unittest.TestCase):
def testCredentialIsCorrect(self):
with validUnlockCredsZip() as zip:
creds = UnlockCredentials.from_credential_archive(zip)
tempdir = tempfile.mkdtemp()
try:
challenge_file = os.path.join(tempdir, 'challenge')
writeFullUnlockChallenge(challenge_file)
challenge = UnlockChallenge(challenge_file)
out_cred = os.path.join(tempdir, 'credential')
# Compare unlock credential generated by function with one generated
# using 'avbtool make_atx_unlock_credential', to check correctness.
MakeAtxUnlockCredential(creds, challenge, out_cred)
self.assertTrue(filecmp.cmp(out_cred, DATA_FILE_UNLOCK_CREDENTIAL))
finally:
shutil.rmtree(tempdir)
def testWrongChallengeSize(self):
with validUnlockCredsZip() as zip:
creds = UnlockCredentials.from_credential_archive(zip)
tempdir = tempfile.mkdtemp()
try:
out_cred = os.path.join(tempdir, 'credential')
# The bundled unlock challenge is just the 16 byte challenge, not the
# full AvbAtxUnlockChallenge like this expects.
with self.assertRaises(ValueError):
challenge = UnlockChallenge(DATA_FILE_UNLOCK_CHALLENGE)
MakeAtxUnlockCredential(creds, challenge, out_cred)
finally:
shutil.rmtree(tempdir)
def makeFastbootCommandFake(testcase,
expect_serial=None,
error_on_command_number=None,
product_id_hash=None,
stay_locked=False):
"""Construct a fake fastboot command handler, to be used with unitttest.mock.Mock.side_effect.
This can be used to create a callable that acts as a fake for a real device
responding to the fastboot commands involved in an authenticated unlock. The
returned callback is intended to be used with unittest.mock.Mock.side_effect.
There are a number of optional arguments here that can be used to customize
the behavior of the fake for a specific test.
Arguments:
testcase: unittest.TestCase object for the associated test
expect_serial: [optional] Expect (and assert) that the fastboot command
specifies a specific device serial to communicate with.
error_on_command_number: [optional] Return a fastboot error (non-zero exit
code) on the nth (0-based) command handled.
stay_locked: [optional] Make the fake report that the device is still locked
after an otherwise successful unlock attempt.
"""
def handler(args, *extraArgs, **kwargs):
if error_on_command_number is not None:
handler.command_counter += 1
if handler.command_counter - 1 == error_on_command_number:
raise subprocess.CalledProcessError(
returncode=1, cmd=args, output=b'Fake: ERROR')
testcase.assertEqual(args.pop(0), 'fastboot')
if expect_serial is not None:
# This is a bit fragile in that, in reality, fastboot allows '-s SERIAL'
# to not just be the first arguments, but it works for this use case.
testcase.assertEqual(args.pop(0), '-s')
testcase.assertEqual(args.pop(0), expect_serial)
if args[0:2] == ['oem', 'at-get-vboot-unlock-challenge']:
handler.challenge_staged = True
elif args[0] == 'get_staged':
if not handler.challenge_staged:
raise subprocess.CalledProcessError(
returncode=1, cmd=args, output=b'Fake: No data staged')
writeFullUnlockChallenge(args[1], product_id_hash=product_id_hash)
handler.challenge_staged = False
elif args[0] == 'stage':
handler.staged_file = args[1]
elif args[0:2] == ['oem', 'at-unlock-vboot']:
if handler.staged_file is None:
raise subprocess.CalledProcessError(
returncode=1, cmd=args, output=b'Fake: No unlock credential staged')
# Validate the unlock credential as if this were a test key locked device,
# which implies tests that want a successful unlock need to be set up to
# use DATA_FILE_PUK_KEY to sign the challenge. Credentials generated using
# other keys will be properly rejected.
if not filecmp.cmp(handler.staged_file, DATA_FILE_UNLOCK_CREDENTIAL):
raise subprocess.CalledProcessError(
returncode=1, cmd=args, output=b'Fake: Incorrect unlock credential')
handler.locked = True if stay_locked else False
elif args[0:2] == ['getvar', 'at-vboot-state']:
return b'avb-locked: ' + (b'1' if handler.locked else b'0')
return b'Fake: OK'
handler.command_counter = 0
handler.challenge_staged = False
handler.staged_file = None
handler.locked = True
return handler
class AuthenticatedUnlockTest(unittest.TestCase):
@patch('subprocess.check_output')
def testUnlockWithZipArchive(self, mock_subp_check_output):
with validUnlockCredsZip() as zip:
mock_subp_check_output.side_effect = makeFastbootCommandFake(self)
self.assertEqual(main([zip.name]), 0)
self.assertNotEqual(mock_subp_check_output.call_count, 0)
@patch('subprocess.check_output')
def testUnlockDeviceBySerial(self, mock_subp_check_output):
with validUnlockCredsZip() as zip:
SERIAL = 'abcde12345'
mock_subp_check_output.side_effect = makeFastbootCommandFake(
self, expect_serial=SERIAL)
self.assertEqual(main([zip.name, '-s', SERIAL]), 0)
self.assertNotEqual(mock_subp_check_output.call_count, 0)
@patch('subprocess.check_output')
def testUnlockWithIndividualFiles(self, mock_subp_check_output):
mock_subp_check_output.side_effect = makeFastbootCommandFake(self)
self.assertEqual(
main([
'--pik_cert', DATA_FILE_PIK_CERTIFICATE, '--puk_cert',
DATA_FILE_PUK_CERTIFICATE, '--puk', DATA_FILE_PUK_KEY
]), 0)
self.assertNotEqual(mock_subp_check_output.call_count, 0)
@patch('subprocess.check_output')
def testFastbootError(self, mock_subp_check_output):
"""Verify that errors are handled properly if fastboot commands error out."""
with validUnlockCredsZip() as zip:
for n in range(5):
mock_subp_check_output.reset_mock()
mock_subp_check_output.side_effect = makeFastbootCommandFake(
self, error_on_command_number=n)
self.assertNotEqual(main([zip.name]), 0)
self.assertNotEqual(mock_subp_check_output.call_count, 0)
@patch('subprocess.check_output')
def testDoesntActuallyUnlock(self, mock_subp_check_output):
"""Verify fails if fake set to not actually unlock."""
with validUnlockCredsZip() as zip:
mock_subp_check_output.side_effect = makeFastbootCommandFake(
self, stay_locked=True)
self.assertNotEqual(main([zip.name]), 0)
self.assertNotEqual(mock_subp_check_output.call_count, 0)
@patch('subprocess.check_output')
def testNoCredentialsMatchDeviceProductID(self, mock_subp_check_output):
"""Test two cases where fake responds with a challenge that has a product ID hash which doesn't match the credentials used."""
# Case 1: Change the product ID hash that the fake responds with.
with validUnlockCredsZip() as zip:
mock_subp_check_output.side_effect = makeFastbootCommandFake(
self, product_id_hash=b'\x00' * 32)
self.assertNotEqual(main([zip.name]), 0)
self.assertNotEqual(mock_subp_check_output.call_count, 0)
# Case 2: Use credentials with a different product ID.
with createTempZip({
'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
# Note: PIK cert used as PUK cert so subject (i.e. product ID hash) is
# different
'puk_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
'puk_v1.pem': DATA_FILE_PUK_KEY
}) as zip:
mock_subp_check_output.side_effect = makeFastbootCommandFake(self)
self.assertNotEqual(main([zip.name]), 0)
self.assertNotEqual(mock_subp_check_output.call_count, 0)
@patch('subprocess.check_output')
def testMatchingCredentialSelectedFromZipArchives(self,
mock_subp_check_output):
"""Test correct credential based on product ID hash used if multiple provided directly through arguments."""
with validUnlockCredsZip() as correctCreds, createTempZip({
'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
# Note: PIK cert used as PUK cert so subject (i.e. product ID hash)
# doesn't match
'puk_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
'puk_v1.pem': DATA_FILE_PUK_KEY
}) as wrongCreds:
mock_subp_check_output.side_effect = makeFastbootCommandFake(self)
self.assertEqual(main([wrongCreds.name, correctCreds.name]), 0)
self.assertNotEqual(mock_subp_check_output.call_count, 0)
@patch('subprocess.check_output')
def testMatchingCredentialSelectedFromDirectory(self, mock_subp_check_output):
"""Test correct credential based on product ID hash used if multiple provided indirectly through a directory argument."""
with validUnlockCredsZip() as correctCreds, createTempZip({
'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
# Note: PIK cert used as PUK cert so subject (i.e. product ID hash)
# doesn't match
'puk_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
'puk_v1.pem': DATA_FILE_PUK_KEY
}) as wrongCreds:
tempdir = tempfile.mkdtemp()
try:
shutil.copy2(correctCreds.name, tempdir)
shutil.copy2(wrongCreds.name, tempdir)
mock_subp_check_output.side_effect = makeFastbootCommandFake(self)
self.assertEqual(main([tempdir]), 0)
self.assertNotEqual(mock_subp_check_output.call_count, 0)
finally:
shutil.rmtree(tempdir)
@patch('subprocess.check_output')
def testMatchingCredentialSelectedFromEither(self, mock_subp_check_output):
"""Test correct credential based on product ID hash used if arguments give some combination of file and directory arguments."""
with validUnlockCredsZip() as correctCreds, createTempZip({
'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
# Note: PIK cert used as PUK cert so subject (i.e. product ID hash)
# doesn't match
'puk_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE,
'puk_v1.pem': DATA_FILE_PUK_KEY
}) as wrongCreds:
# Case 1: Correct creds in directory, wrong in file arg
tempdir = tempfile.mkdtemp()
try:
shutil.copy2(correctCreds.name, tempdir)
mock_subp_check_output.side_effect = makeFastbootCommandFake(self)
self.assertEqual(main([wrongCreds.name, tempdir]), 0)
self.assertNotEqual(mock_subp_check_output.call_count, 0)
finally:
shutil.rmtree(tempdir)
# Case 2: Correct creds in file arg, wrong in directory
tempdir = tempfile.mkdtemp()
try:
shutil.copy2(wrongCreds.name, tempdir)
mock_subp_check_output.side_effect = makeFastbootCommandFake(self)
self.assertEqual(main([tempdir, correctCreds.name]), 0)
self.assertNotEqual(mock_subp_check_output.call_count, 0)
# Case 2: Correct creds in file arg, wrong in directory
finally:
shutil.rmtree(tempdir)
@patch('argparse.ArgumentParser.error')
def testArgparseDirectoryWithNoCredentials(self, mock_parser_error):
"""Test """
tempdir = tempfile.mkdtemp()
try:
# Make sure random files are ignored.
with open(os.path.join(tempdir, 'so_random'), 'w') as f:
f.write("I'm a random file")
mock_parser_error.side_effect = ValueError('ArgumentParser.error')
with self.assertRaises(ValueError):
main([tempdir])
self.assertEqual(mock_parser_error.call_count, 1)
finally:
shutil.rmtree(tempdir)
@patch('argparse.ArgumentParser.error')
def testArgparseMutualExclusionArchiveAndFiles(self, mock_parser_error):
mock_parser_error.side_effect = ValueError('ArgumentParser.error')
with self.assertRaises(ValueError):
main(['dummy.zip', '--pik_cert', DATA_FILE_PIK_CERTIFICATE])
self.assertEqual(mock_parser_error.call_count, 1)
@patch('argparse.ArgumentParser.error')
def testArgparseMutualInclusionOfFileArgs(self, mock_parser_error):
mock_parser_error.side_effect = ValueError('ArgumentParser.error')
with self.assertRaises(ValueError):
main(['--pik_cert', 'pik_cert.bin', '--puk_cert', 'puk_cert.bin'])
self.assertEqual(mock_parser_error.call_count, 1)
mock_parser_error.reset_mock()
with self.assertRaises(ValueError):
main(['--pik_cert', 'pik_cert.bin', '--puk', 'puk.pem'])
self.assertEqual(mock_parser_error.call_count, 1)
mock_parser_error.reset_mock()
with self.assertRaises(ValueError):
main(['--puk_cert', 'puk_cert.bin', '--puk', 'puk.pem'])
self.assertEqual(mock_parser_error.call_count, 1)
@patch('argparse.ArgumentParser.error')
def testArgparseMissingBundleAndFiles(self, mock_parser_error):
mock_parser_error.side_effect = ValueError('ArgumentParser.error')
with self.assertRaises(ValueError):
main(['-s', '1234abcd'])
self.assertEqual(mock_parser_error.call_count, 1)
if __name__ == '__main__':
unittest.main(verbosity=3)