399 lines
13 KiB
Python
Executable File
399 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# Copyright 2020 The Pigweed Authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
|
# use this file except in compliance with the License. You may obtain a copy of
|
|
# the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations under
|
|
# the License.
|
|
"""Tests compiling and importing Python protos on the fly."""
|
|
|
|
from pathlib import Path
|
|
import tempfile
|
|
import unittest
|
|
|
|
from pw_protobuf_compiler import python_protos
|
|
from pw_protobuf_compiler.python_protos import bytes_repr, proto_repr
|
|
|
|
PROTO_1 = """\
|
|
syntax = "proto3";
|
|
|
|
package pw.protobuf_compiler.test1;
|
|
|
|
message SomeMessage {
|
|
uint32 magic_number = 1;
|
|
}
|
|
|
|
message AnotherMessage {
|
|
enum Result {
|
|
FAILED = 0;
|
|
FAILED_MISERABLY = 1;
|
|
I_DONT_WANT_TO_TALK_ABOUT_IT = 2;
|
|
}
|
|
|
|
Result result = 1;
|
|
string payload = 2;
|
|
}
|
|
|
|
service PublicService {
|
|
rpc Unary(SomeMessage) returns (AnotherMessage) {}
|
|
rpc ServerStreaming(SomeMessage) returns (stream AnotherMessage) {}
|
|
rpc ClientStreaming(stream SomeMessage) returns (AnotherMessage) {}
|
|
rpc BidiStreaming(stream SomeMessage) returns (stream AnotherMessage) {}
|
|
}
|
|
"""
|
|
|
|
PROTO_2 = """\
|
|
syntax = "proto2";
|
|
|
|
package pw.protobuf_compiler.test2;
|
|
|
|
message Request {
|
|
optional float magic_number = 1;
|
|
}
|
|
|
|
message Response {
|
|
}
|
|
|
|
service Alpha {
|
|
rpc Unary(Request) returns (Response) {}
|
|
}
|
|
|
|
service Bravo {
|
|
rpc BidiStreaming(stream Request) returns (stream Response) {}
|
|
}
|
|
"""
|
|
|
|
PROTO_3 = """\
|
|
syntax = "proto3";
|
|
|
|
package pw.protobuf_compiler.test2;
|
|
|
|
enum Greeting {
|
|
YO = 0;
|
|
HI = 1;
|
|
}
|
|
|
|
message Hello {
|
|
repeated int64 value = 1;
|
|
Greeting hi = 2;
|
|
}
|
|
|
|
message NestingMessage {
|
|
message NestedMessage {
|
|
message NestedNestedMessage {
|
|
int32 nested_nested_field = 1;
|
|
}
|
|
|
|
NestedNestedMessage nested_nested_message = 1;
|
|
}
|
|
|
|
NestedMessage nested_message = 1;
|
|
}
|
|
"""
|
|
|
|
|
|
class TestCompileAndImport(unittest.TestCase):
|
|
"""Test compiling and importing."""
|
|
def setUp(self):
|
|
self._proto_dir = tempfile.TemporaryDirectory(prefix='proto_test')
|
|
self._protos = []
|
|
|
|
for i, contents in enumerate([PROTO_1, PROTO_2, PROTO_3], 1):
|
|
self._protos.append(Path(self._proto_dir.name, f'test_{i}.proto'))
|
|
self._protos[-1].write_text(contents)
|
|
|
|
def tearDown(self):
|
|
self._proto_dir.cleanup()
|
|
|
|
def test_compile_to_temp_dir_and_import(self):
|
|
modules = {
|
|
m.DESCRIPTOR.name: m
|
|
for m in python_protos.compile_and_import(self._protos)
|
|
}
|
|
self.assertEqual(3, len(modules))
|
|
|
|
# Make sure the protobuf modules contain what we expect them to.
|
|
mod = modules['test_1.proto']
|
|
self.assertEqual(
|
|
4, len(mod.DESCRIPTOR.services_by_name['PublicService'].methods))
|
|
|
|
mod = modules['test_2.proto']
|
|
self.assertEqual(mod.Request(magic_number=1.5).magic_number, 1.5)
|
|
self.assertEqual(2, len(mod.DESCRIPTOR.services_by_name))
|
|
|
|
mod = modules['test_3.proto']
|
|
self.assertEqual(mod.Hello(value=[123, 456]).value, [123, 456])
|
|
|
|
|
|
class TestProtoLibrary(TestCompileAndImport):
|
|
"""Tests the Library class."""
|
|
def setUp(self):
|
|
super().setUp()
|
|
self._library = python_protos.Library(
|
|
python_protos.compile_and_import(self._protos))
|
|
|
|
def test_packages_can_access_messages(self):
|
|
msg = self._library.packages.pw.protobuf_compiler.test1.SomeMessage
|
|
self.assertEqual(msg(magic_number=123).magic_number, 123)
|
|
|
|
def test_packages_finds_across_modules(self):
|
|
msg = self._library.packages.pw.protobuf_compiler.test2.Request
|
|
self.assertEqual(msg(magic_number=50).magic_number, 50)
|
|
|
|
val = self._library.packages.pw.protobuf_compiler.test2.YO
|
|
self.assertEqual(val, 0)
|
|
|
|
def test_packages_invalid_name(self):
|
|
with self.assertRaises(AttributeError):
|
|
_ = self._library.packages.nothing
|
|
|
|
with self.assertRaises(AttributeError):
|
|
_ = self._library.packages.pw.NOT_HERE
|
|
|
|
with self.assertRaises(AttributeError):
|
|
_ = self._library.packages.pw.protobuf_compiler.test1.NotARealMsg
|
|
|
|
def test_access_modules_by_package(self):
|
|
test1 = self._library.modules_by_package['pw.protobuf_compiler.test1']
|
|
self.assertEqual(len(test1), 1)
|
|
self.assertEqual(test1[0].AnotherMessage.Result.Value('FAILED'), 0)
|
|
|
|
test2 = self._library.modules_by_package['pw.protobuf_compiler.test2']
|
|
self.assertEqual(len(test2), 2)
|
|
|
|
def test_access_modules_by_package_unknown(self):
|
|
with self.assertRaises(KeyError):
|
|
_ = self._library.modules_by_package['pw.not_real']
|
|
|
|
def test_library_from_strings(self):
|
|
# Replace the package to avoid conflicts with the other proto imports
|
|
new_protos = [
|
|
p.replace('pw.protobuf_compiler', 'proto.library.test')
|
|
for p in [PROTO_1, PROTO_2, PROTO_3]
|
|
]
|
|
|
|
library = python_protos.Library.from_strings(new_protos)
|
|
|
|
# Make sure we can safely import the same proto contents multiple times.
|
|
library = python_protos.Library.from_strings(new_protos)
|
|
|
|
msg = library.packages.proto.library.test.test2.Request
|
|
self.assertEqual(msg(magic_number=50).magic_number, 50)
|
|
|
|
val = library.packages.proto.library.test.test2.YO
|
|
self.assertEqual(val, 0)
|
|
|
|
def test_access_nested_packages_by_name(self):
|
|
self.assertIs(self._library.packages['pw.protobuf_compiler.test1'],
|
|
self._library.packages.pw.protobuf_compiler.test1)
|
|
self.assertIs(self._library.packages.pw['protobuf_compiler.test1'],
|
|
self._library.packages.pw.protobuf_compiler.test1)
|
|
self.assertIs(self._library.packages.pw.protobuf_compiler['test1'],
|
|
self._library.packages.pw.protobuf_compiler.test1)
|
|
|
|
def test_access_nested_packages_by_name_unknown_package(self):
|
|
with self.assertRaises(KeyError):
|
|
_ = self._library.packages['']
|
|
|
|
with self.assertRaises(KeyError):
|
|
_ = self._library.packages['.']
|
|
|
|
with self.assertRaises(KeyError):
|
|
_ = self._library.packages['protobuf_compiler.test1']
|
|
|
|
with self.assertRaises(KeyError):
|
|
_ = self._library.packages.pw['pw.protobuf_compiler.test1']
|
|
|
|
with self.assertRaises(KeyError):
|
|
_ = self._library.packages.pw.protobuf_compiler['not here']
|
|
|
|
def test_messages(self):
|
|
protos = self._library.packages.pw.protobuf_compiler
|
|
self.assertEqual(
|
|
set(self._library.messages()), {
|
|
protos.test1.SomeMessage,
|
|
protos.test1.AnotherMessage,
|
|
protos.test2.Request,
|
|
protos.test2.Response,
|
|
protos.test2.Hello,
|
|
protos.test2.NestingMessage,
|
|
protos.test2.NestingMessage.NestedMessage,
|
|
protos.test2.NestingMessage.NestedMessage.NestedNestedMessage,
|
|
})
|
|
|
|
|
|
PROTO_FOR_REPR = """\
|
|
syntax = "proto3";
|
|
|
|
package pw.test3;
|
|
|
|
enum Enum {
|
|
ZERO = 0;
|
|
ONE = 1;
|
|
}
|
|
|
|
message Nested {
|
|
repeated int64 value = 1;
|
|
Enum an_enum = 2;
|
|
}
|
|
|
|
message Message {
|
|
Nested message = 1;
|
|
repeated Nested repeated_message = 2;
|
|
|
|
fixed32 regular_int = 3;
|
|
optional int64 optional_int = 4;
|
|
repeated int32 repeated_int = 5;
|
|
|
|
bytes regular_bytes = 6;
|
|
optional bytes optional_bytes = 7;
|
|
repeated bytes repeated_bytes = 8;
|
|
|
|
string regular_string = 9;
|
|
optional string optional_string = 10;
|
|
repeated string repeated_string = 11;
|
|
|
|
Enum regular_enum = 12;
|
|
optional Enum optional_enum = 13;
|
|
repeated Enum repeated_enum = 14;
|
|
|
|
oneof oneof_test {
|
|
string oneof_1 = 15;
|
|
int32 oneof_2 = 16;
|
|
float oneof_3 = 17;
|
|
}
|
|
|
|
map<string, Nested> mapping = 18;
|
|
}
|
|
"""
|
|
|
|
|
|
class TestProtoRepr(unittest.TestCase):
|
|
"""Tests printing protobufs."""
|
|
def setUp(self):
|
|
protos = python_protos.Library.from_strings(PROTO_FOR_REPR)
|
|
self.enum = protos.packages.pw.test3.Enum
|
|
self.nested = protos.packages.pw.test3.Nested
|
|
self.message = protos.packages.pw.test3.Message
|
|
|
|
def test_empty(self):
|
|
self.assertEqual('pw.test3.Nested()', proto_repr(self.nested()))
|
|
self.assertEqual('pw.test3.Message()', proto_repr(self.message()))
|
|
|
|
def test_int_fields(self):
|
|
self.assertEqual(
|
|
'pw.test3.Message('
|
|
'regular_int=999, '
|
|
'optional_int=-1, '
|
|
'repeated_int=[0, 1, 2])',
|
|
proto_repr(
|
|
self.message(repeated_int=[0, 1, 2],
|
|
regular_int=999,
|
|
optional_int=-1)))
|
|
|
|
def test_bytes_fields(self):
|
|
self.assertEqual(
|
|
'pw.test3.Message('
|
|
r"regular_bytes=b'\xFE\xED\xBE\xEF', "
|
|
r"optional_bytes=b'', "
|
|
r"repeated_bytes=[b'Hello\'\'\''])",
|
|
proto_repr(
|
|
self.message(
|
|
regular_bytes=b'\xfe\xed\xbe\xef',
|
|
optional_bytes=b'',
|
|
repeated_bytes=[b"Hello'''"],
|
|
)))
|
|
|
|
def test_string_fields(self):
|
|
self.assertEqual(
|
|
'pw.test3.Message('
|
|
"regular_string='hi', "
|
|
"optional_string='', "
|
|
'repeated_string=["\'"])',
|
|
proto_repr(
|
|
self.message(
|
|
regular_string='hi',
|
|
optional_string='',
|
|
repeated_string=[b"'"],
|
|
)))
|
|
|
|
def test_enum_fields(self):
|
|
self.assertEqual('pw.test3.Nested(an_enum=pw.test3.Enum.ONE)',
|
|
proto_repr(self.nested(an_enum=1)))
|
|
self.assertEqual('pw.test3.Message(optional_enum=pw.test3.Enum.ONE)',
|
|
proto_repr(self.message(optional_enum=self.enum.ONE)))
|
|
self.assertEqual(
|
|
'pw.test3.Message(repeated_enum='
|
|
'[pw.test3.Enum.ONE, pw.test3.Enum.ONE, pw.test3.Enum.ZERO])',
|
|
proto_repr(self.message(repeated_enum=[1, 1, 0])))
|
|
|
|
def test_message_fields(self):
|
|
self.assertEqual(
|
|
'pw.test3.Message(message=pw.test3.Nested(value=[123]))',
|
|
proto_repr(self.message(message=self.nested(value=[123]))))
|
|
self.assertEqual(
|
|
'pw.test3.Message('
|
|
'repeated_message=[pw.test3.Nested(value=[123]), '
|
|
'pw.test3.Nested()])',
|
|
proto_repr(
|
|
self.message(
|
|
repeated_message=[self.nested(
|
|
value=[123]), self.nested()])))
|
|
|
|
def test_optional_shown_if_set_to_default(self):
|
|
self.assertEqual(
|
|
"pw.test3.Message("
|
|
"optional_int=0, optional_bytes=b'', optional_string='', "
|
|
"optional_enum=pw.test3.Enum.ZERO)",
|
|
proto_repr(
|
|
self.message(optional_int=0,
|
|
optional_bytes=b'',
|
|
optional_string='',
|
|
optional_enum=0)))
|
|
|
|
def test_oneof(self):
|
|
self.assertEqual(proto_repr(self.message(oneof_1='test')),
|
|
"pw.test3.Message(oneof_1='test')")
|
|
self.assertEqual(proto_repr(self.message(oneof_2=123)),
|
|
"pw.test3.Message(oneof_2=123)")
|
|
self.assertEqual(proto_repr(self.message(oneof_3=123)),
|
|
"pw.test3.Message(oneof_3=123.0)")
|
|
|
|
msg = self.message(oneof_1='test')
|
|
msg.oneof_2 = 99
|
|
self.assertEqual(proto_repr(msg), "pw.test3.Message(oneof_2=99)")
|
|
|
|
def test_map(self):
|
|
msg = self.message()
|
|
msg.mapping['zero'].MergeFrom(self.nested())
|
|
msg.mapping['one'].MergeFrom(
|
|
self.nested(an_enum=self.enum.ONE, value=[1]))
|
|
|
|
result = proto_repr(msg)
|
|
self.assertRegex(result, r'^pw.test3.Message\(mapping={.*}\)$')
|
|
self.assertIn("'zero': pw.test3.Nested()", result)
|
|
self.assertIn(
|
|
"'one': pw.test3.Nested(value=[1], an_enum=pw.test3.Enum.ONE)",
|
|
result)
|
|
|
|
def test_bytes_repr(self):
|
|
self.assertEqual(bytes_repr(b'\xfe\xed\xbe\xef'),
|
|
r"b'\xFE\xED\xBE\xEF'")
|
|
self.assertEqual(bytes_repr(b'\xfe\xed\xbe\xef123'),
|
|
r"b'\xFE\xED\xBE\xEF\x31\x32\x33'")
|
|
self.assertEqual(bytes_repr(b'\xfe\xed\xbe\xef1234'),
|
|
r"b'\xFE\xED\xBE\xEF1234'")
|
|
self.assertEqual(bytes_repr(b'\xfe\xed\xbe\xef12345'),
|
|
r"b'\xFE\xED\xBE\xEF12345'")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|