788 lines
31 KiB
C++
788 lines
31 KiB
C++
// Copyright 2021 The Pigweed Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
|
// use this file except in compliance with the License. You may obtain a copy of
|
|
// the License at
|
|
//
|
|
// https://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
// License for the specific language governing permissions and limitations under
|
|
// the License.
|
|
|
|
#include "pw_log_rpc/log_service.h"
|
|
|
|
#include <array>
|
|
#include <cstdint>
|
|
#include <limits>
|
|
|
|
#include "gtest/gtest.h"
|
|
#include "pw_assert/check.h"
|
|
#include "pw_bytes/endian.h"
|
|
#include "pw_containers/vector.h"
|
|
#include "pw_log/log.h"
|
|
#include "pw_log/proto/log.pwpb.h"
|
|
#include "pw_log/proto_utils.h"
|
|
#include "pw_log_rpc/log_filter.h"
|
|
#include "pw_log_rpc_private/test_utils.h"
|
|
#include "pw_log_tokenized/metadata.h"
|
|
#include "pw_protobuf/bytes_utils.h"
|
|
#include "pw_protobuf/decoder.h"
|
|
#include "pw_result/result.h"
|
|
#include "pw_rpc/channel.h"
|
|
#include "pw_rpc/raw/fake_channel_output.h"
|
|
#include "pw_rpc/raw/test_method_context.h"
|
|
#include "pw_sync/mutex.h"
|
|
|
|
namespace pw::log_rpc {
|
|
namespace {
|
|
|
|
using log::pw_rpc::raw::Logs;
|
|
|
|
#define LOG_SERVICE_METHOD_CONTEXT \
|
|
PW_RAW_TEST_METHOD_CONTEXT(LogService, Listen, 10)
|
|
|
|
constexpr size_t kMaxMessageSize = 50;
|
|
constexpr size_t kMaxLogEntrySize =
|
|
RpcLogDrain::kMinEntryBufferSize + kMaxMessageSize;
|
|
static_assert(RpcLogDrain::kMinEntryBufferSize < kMaxLogEntrySize);
|
|
constexpr size_t kMultiSinkBufferSize = kMaxLogEntrySize * 10;
|
|
constexpr size_t kMaxDrains = 3;
|
|
constexpr char kMessage[] = "message";
|
|
// A message small enough to fit encoded in
|
|
// LogServiceTest::entry_encode_buffer_ but large enough to not fit in
|
|
// LogServiceTest::small_buffer_.
|
|
constexpr char kLongMessage[] =
|
|
"This is a long log message that will be dropped.";
|
|
static_assert(sizeof(kLongMessage) < kMaxMessageSize);
|
|
static_assert(sizeof(kLongMessage) + RpcLogDrain::kMinEntrySizeWithoutPayload >
|
|
RpcLogDrain::kMinEntryBufferSize);
|
|
std::array<std::byte, 1> rpc_request_buffer;
|
|
constexpr auto kSampleMetadata =
|
|
log_tokenized::Metadata::Set<PW_LOG_LEVEL_INFO, 123, 0x03, __LINE__>();
|
|
constexpr auto kDropMessageMetadata =
|
|
log_tokenized::Metadata::Set<0, 0, 0, 0>();
|
|
constexpr int64_t kSampleTimestamp = 1000;
|
|
|
|
// `LogServiceTest` sets up a logging environment for testing with a
|
|
// `MultiSink` for log entries, and multiple `RpcLogDrain`s for consuming such
|
|
// log entries. It includes methods to add log entries to the `MultiSink`, and
|
|
// buffers for encoding and retrieving log entries. Tests can choose how many
|
|
// entries to add to the multisink, and which drain to use.
|
|
class LogServiceTest : public ::testing::Test {
|
|
public:
|
|
LogServiceTest() : multisink_(multisink_buffer_), drain_map_(drains_) {
|
|
for (auto& drain : drain_map_.drains()) {
|
|
multisink_.AttachDrain(drain);
|
|
}
|
|
}
|
|
|
|
void AddLogEntries(size_t log_count,
|
|
std::string_view message,
|
|
log_tokenized::Metadata metadata,
|
|
int64_t timestamp) {
|
|
for (size_t i = 0; i < log_count; ++i) {
|
|
ASSERT_TRUE(AddLogEntry(message, metadata, timestamp).ok());
|
|
}
|
|
}
|
|
|
|
StatusWithSize AddLogEntry(std::string_view message,
|
|
log_tokenized::Metadata metadata,
|
|
int64_t timestamp) {
|
|
Result<ConstByteSpan> encoded_log_result =
|
|
log::EncodeTokenizedLog(metadata,
|
|
std::as_bytes(std::span(message)),
|
|
timestamp,
|
|
/*thread_name=*/{},
|
|
entry_encode_buffer_);
|
|
PW_TRY_WITH_SIZE(encoded_log_result.status());
|
|
multisink_.HandleEntry(encoded_log_result.value());
|
|
return StatusWithSize(encoded_log_result.value().size());
|
|
}
|
|
|
|
protected:
|
|
std::array<std::byte, kMultiSinkBufferSize> multisink_buffer_;
|
|
multisink::MultiSink multisink_;
|
|
RpcLogDrainMap drain_map_;
|
|
std::array<std::byte, kMaxLogEntrySize> entry_encode_buffer_;
|
|
static constexpr size_t kMaxFilterRules = 3;
|
|
std::array<Filter::Rule, kMaxFilterRules> rules1_;
|
|
std::array<Filter::Rule, kMaxFilterRules> rules2_;
|
|
std::array<Filter::Rule, kMaxFilterRules> rules3_;
|
|
static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id1_{
|
|
std::byte(65), std::byte(66), std::byte(67), std::byte(0)};
|
|
static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id2_{
|
|
std::byte(68), std::byte(69), std::byte(70), std::byte(0)};
|
|
static constexpr std::array<std::byte, cfg::kMaxFilterIdBytes> filter_id3_{
|
|
std::byte(71), std::byte(72), std::byte(73), std::byte(0)};
|
|
std::array<Filter, kMaxDrains> filters_ = {
|
|
Filter(filter_id1_, rules1_),
|
|
Filter(filter_id2_, rules2_),
|
|
Filter(filter_id3_, rules3_),
|
|
};
|
|
|
|
// Drain Buffers
|
|
std::array<std::byte, kMaxLogEntrySize> drain_buffer1_;
|
|
std::array<std::byte, kMaxLogEntrySize> drain_buffer2_;
|
|
std::array<std::byte, RpcLogDrain::kMinEntryBufferSize> small_buffer_;
|
|
static constexpr uint32_t kIgnoreWriterErrorsDrainId = 1;
|
|
static constexpr uint32_t kCloseWriterOnErrorDrainId = 2;
|
|
static constexpr uint32_t kSmallBufferDrainId = 3;
|
|
sync::Mutex shared_mutex_;
|
|
std::array<RpcLogDrain, kMaxDrains> drains_{
|
|
RpcLogDrain(kIgnoreWriterErrorsDrainId,
|
|
drain_buffer1_,
|
|
shared_mutex_,
|
|
RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
|
|
&filters_[0]),
|
|
RpcLogDrain(kCloseWriterOnErrorDrainId,
|
|
drain_buffer2_,
|
|
shared_mutex_,
|
|
RpcLogDrain::LogDrainErrorHandling::kCloseStreamOnWriterError,
|
|
&filters_[1]),
|
|
RpcLogDrain(kSmallBufferDrainId,
|
|
small_buffer_,
|
|
shared_mutex_,
|
|
RpcLogDrain::LogDrainErrorHandling::kIgnoreWriterErrors,
|
|
&filters_[2]),
|
|
};
|
|
|
|
std::array<std::byte, 128> encoding_buffer_ = {};
|
|
};
|
|
|
|
TEST_F(LogServiceTest, AssignWriter) {
|
|
// Drains don't have writers.
|
|
for (auto& drain : drain_map_.drains()) {
|
|
EXPECT_EQ(drain.Flush(encoding_buffer_), Status::Unavailable());
|
|
}
|
|
|
|
// Create context directed to drain with ID 1.
|
|
RpcLogDrain& active_drain = drains_[0];
|
|
const uint32_t drain_channel_id = active_drain.channel_id();
|
|
LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
|
|
context.set_channel_id(drain_channel_id);
|
|
|
|
// Call RPC, which sets the drain's writer.
|
|
context.call(rpc_request_buffer);
|
|
EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
|
|
|
|
// Other drains are still missing writers.
|
|
for (auto& drain : drain_map_.drains()) {
|
|
if (drain.channel_id() != drain_channel_id) {
|
|
EXPECT_EQ(drain.Flush(encoding_buffer_), Status::Unavailable());
|
|
}
|
|
}
|
|
|
|
// Calling an ongoing log stream must not change the active drain's
|
|
// writer, and the second writer must not get any responses.
|
|
LOG_SERVICE_METHOD_CONTEXT second_call_context(drain_map_);
|
|
second_call_context.set_channel_id(drain_channel_id);
|
|
second_call_context.call(rpc_request_buffer);
|
|
EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
|
|
ASSERT_TRUE(second_call_context.done());
|
|
EXPECT_EQ(second_call_context.responses().size(), 0u);
|
|
|
|
// Setting a new writer on a closed stream is allowed.
|
|
ASSERT_EQ(active_drain.Close(), OkStatus());
|
|
LOG_SERVICE_METHOD_CONTEXT third_call_context(drain_map_);
|
|
third_call_context.set_channel_id(drain_channel_id);
|
|
third_call_context.call(rpc_request_buffer);
|
|
EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
|
|
ASSERT_FALSE(third_call_context.done());
|
|
EXPECT_EQ(third_call_context.responses().size(), 0u);
|
|
EXPECT_EQ(active_drain.Close(), OkStatus());
|
|
}
|
|
|
|
TEST_F(LogServiceTest, StartAndEndStream) {
|
|
RpcLogDrain& active_drain = drains_[2];
|
|
const uint32_t drain_channel_id = active_drain.channel_id();
|
|
LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
|
|
context.set_channel_id(drain_channel_id);
|
|
|
|
// Add log entries.
|
|
const size_t total_entries = 10;
|
|
AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp);
|
|
|
|
// Request logs.
|
|
context.call(rpc_request_buffer);
|
|
EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
|
|
|
|
// Not done until the stream is finished.
|
|
ASSERT_FALSE(context.done());
|
|
EXPECT_EQ(OkStatus(), active_drain.Close());
|
|
ASSERT_TRUE(context.done());
|
|
|
|
EXPECT_EQ(context.status(), OkStatus());
|
|
// There is at least 1 response with multiple log entries packed.
|
|
EXPECT_GE(context.responses().size(), 1u);
|
|
|
|
// Verify data in responses.
|
|
Vector<TestLogEntry, total_entries> expected_messages;
|
|
for (size_t i = 0; i < total_entries; ++i) {
|
|
expected_messages.push_back({.metadata = kSampleMetadata,
|
|
.timestamp = kSampleTimestamp,
|
|
.tokenized_data = std::as_bytes(
|
|
std::span(std::string_view(kMessage)))});
|
|
}
|
|
size_t entries_found = 0;
|
|
uint32_t drop_count_found = 0;
|
|
for (auto& response : context.responses()) {
|
|
protobuf::Decoder entry_decoder(response);
|
|
VerifyLogEntries(entry_decoder,
|
|
expected_messages,
|
|
entries_found,
|
|
entries_found,
|
|
drop_count_found);
|
|
}
|
|
EXPECT_EQ(entries_found, total_entries);
|
|
EXPECT_EQ(drop_count_found, 0u);
|
|
}
|
|
|
|
TEST_F(LogServiceTest, HandleDropped) {
|
|
RpcLogDrain& active_drain = drains_[0];
|
|
const uint32_t drain_channel_id = active_drain.channel_id();
|
|
LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
|
|
context.set_channel_id(drain_channel_id);
|
|
|
|
// Add log entries.
|
|
const size_t total_entries = 5;
|
|
const size_t entries_before_drop = 1;
|
|
const uint32_t total_drop_count = 2;
|
|
|
|
// Force a drop entry in between entries.
|
|
AddLogEntries(
|
|
entries_before_drop, kMessage, kSampleMetadata, kSampleTimestamp);
|
|
multisink_.HandleDropped(total_drop_count);
|
|
AddLogEntries(total_entries - entries_before_drop,
|
|
kMessage,
|
|
kSampleMetadata,
|
|
kSampleTimestamp);
|
|
|
|
// Request logs.
|
|
context.call(rpc_request_buffer);
|
|
EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
|
|
EXPECT_EQ(OkStatus(), active_drain.Close());
|
|
ASSERT_EQ(context.status(), OkStatus());
|
|
// There is at least 1 response with multiple log entries packed.
|
|
ASSERT_GE(context.responses().size(), 1u);
|
|
|
|
Vector<TestLogEntry, total_entries + 1> expected_messages;
|
|
size_t i = 0;
|
|
for (; i < entries_before_drop; ++i) {
|
|
expected_messages.push_back({.metadata = kSampleMetadata,
|
|
.timestamp = kSampleTimestamp,
|
|
.tokenized_data = std::as_bytes(
|
|
std::span(std::string_view(kMessage)))});
|
|
}
|
|
expected_messages.push_back(
|
|
{.metadata = kDropMessageMetadata,
|
|
.dropped = total_drop_count,
|
|
.tokenized_data = std::as_bytes(
|
|
std::span(std::string_view(RpcLogDrain::kIngressErrorMessage)))});
|
|
for (; i < total_entries; ++i) {
|
|
expected_messages.push_back({.metadata = kSampleMetadata,
|
|
.timestamp = kSampleTimestamp,
|
|
.tokenized_data = std::as_bytes(
|
|
std::span(std::string_view(kMessage)))});
|
|
}
|
|
|
|
// Verify data in responses.
|
|
size_t entries_found = 0;
|
|
uint32_t drop_count_found = 0;
|
|
for (auto& response : context.responses()) {
|
|
protobuf::Decoder entry_decoder(response);
|
|
VerifyLogEntries(entry_decoder,
|
|
expected_messages,
|
|
entries_found,
|
|
entries_found,
|
|
drop_count_found);
|
|
}
|
|
EXPECT_EQ(entries_found, total_entries);
|
|
EXPECT_EQ(drop_count_found, total_drop_count);
|
|
}
|
|
|
|
TEST_F(LogServiceTest, HandleDroppedBetweenFilteredOutLogs) {
|
|
RpcLogDrain& active_drain = drains_[0];
|
|
const uint32_t drain_channel_id = active_drain.channel_id();
|
|
LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
|
|
context.set_channel_id(drain_channel_id);
|
|
// Set filter to drop INFO+ and keep DEBUG logs
|
|
rules1_[0].action = Filter::Rule::Action::kDrop;
|
|
rules1_[0].level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL;
|
|
|
|
// Add log entries.
|
|
const size_t total_entries = 5;
|
|
const uint32_t total_drop_count = total_entries - 1;
|
|
|
|
// Force a drop entry in between entries that will be filtered out.
|
|
for (size_t i = 1; i < total_entries; ++i) {
|
|
ASSERT_EQ(
|
|
OkStatus(),
|
|
AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp).status());
|
|
multisink_.HandleDropped(1);
|
|
}
|
|
// Add message that won't be filtered out.
|
|
constexpr auto metadata =
|
|
log_tokenized::Metadata::Set<PW_LOG_LEVEL_DEBUG, 0, 0, __LINE__>();
|
|
ASSERT_EQ(OkStatus(),
|
|
AddLogEntry(kMessage, metadata, kSampleTimestamp).status());
|
|
|
|
// Request logs.
|
|
context.call(rpc_request_buffer);
|
|
EXPECT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
|
|
EXPECT_EQ(OkStatus(), active_drain.Close());
|
|
ASSERT_EQ(context.status(), OkStatus());
|
|
// There is at least 1 response with multiple log entries packed.
|
|
ASSERT_GE(context.responses().size(), 1u);
|
|
|
|
Vector<TestLogEntry, 2> expected_messages;
|
|
expected_messages.push_back(
|
|
{.metadata = kDropMessageMetadata,
|
|
.dropped = total_drop_count,
|
|
.tokenized_data = std::as_bytes(
|
|
std::span(std::string_view(RpcLogDrain::kIngressErrorMessage)))});
|
|
expected_messages.push_back(
|
|
{.metadata = metadata,
|
|
.timestamp = kSampleTimestamp,
|
|
.tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
|
|
|
|
// Verify data in responses.
|
|
size_t entries_found = 0;
|
|
uint32_t drop_count_found = 0;
|
|
for (auto& response : context.responses()) {
|
|
protobuf::Decoder entry_decoder(response);
|
|
VerifyLogEntries(entry_decoder,
|
|
expected_messages,
|
|
entries_found,
|
|
entries_found,
|
|
drop_count_found);
|
|
}
|
|
EXPECT_EQ(entries_found, 1u);
|
|
EXPECT_EQ(drop_count_found, total_drop_count);
|
|
}
|
|
|
|
TEST_F(LogServiceTest, HandleSmallLogEntryBuffer) {
|
|
LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
|
|
context.set_channel_id(kSmallBufferDrainId);
|
|
auto small_buffer_drain =
|
|
drain_map_.GetDrainFromChannelId(kSmallBufferDrainId);
|
|
ASSERT_TRUE(small_buffer_drain.ok());
|
|
|
|
// Add long entries that don't fit the drain's log entry buffer, except for
|
|
// one, since drop count messages are only sent when a log entry can be sent.
|
|
const size_t total_entries = 5;
|
|
const uint32_t total_drop_count = total_entries - 1;
|
|
AddLogEntries(
|
|
total_drop_count, kLongMessage, kSampleMetadata, kSampleTimestamp);
|
|
EXPECT_EQ(OkStatus(),
|
|
AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp).status());
|
|
|
|
// Request logs.
|
|
context.call(rpc_request_buffer);
|
|
EXPECT_EQ(small_buffer_drain.value()->Flush(encoding_buffer_), OkStatus());
|
|
EXPECT_EQ(small_buffer_drain.value()->Close(), OkStatus());
|
|
ASSERT_EQ(context.status(), OkStatus());
|
|
ASSERT_EQ(context.responses().size(), 1u);
|
|
|
|
Vector<TestLogEntry, total_entries + 1> expected_messages;
|
|
expected_messages.push_back(
|
|
{.metadata = kDropMessageMetadata,
|
|
.dropped = total_drop_count,
|
|
.tokenized_data = std::as_bytes(std::span(
|
|
std::string_view(RpcLogDrain::kSmallStackBufferErrorMessage)))});
|
|
expected_messages.push_back(
|
|
{.metadata = kSampleMetadata,
|
|
.timestamp = kSampleTimestamp,
|
|
.tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))});
|
|
|
|
// Expect one drop message with the total drop count, and the only message
|
|
// that fits the buffer.
|
|
size_t entries_found = 0;
|
|
uint32_t drop_count_found = 0;
|
|
for (auto& response : context.responses()) {
|
|
protobuf::Decoder entry_decoder(response);
|
|
VerifyLogEntries(entry_decoder,
|
|
expected_messages,
|
|
entries_found,
|
|
entries_found,
|
|
drop_count_found);
|
|
}
|
|
EXPECT_EQ(entries_found, 1u);
|
|
EXPECT_EQ(drop_count_found, total_drop_count);
|
|
}
|
|
|
|
TEST_F(LogServiceTest, FlushDrainWithoutMultisink) {
|
|
auto& detached_drain = drains_[0];
|
|
multisink_.DetachDrain(detached_drain);
|
|
LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
|
|
context.set_channel_id(detached_drain.channel_id());
|
|
|
|
// Add log entries.
|
|
const size_t total_entries = 5;
|
|
AddLogEntries(total_entries, kMessage, kSampleMetadata, kSampleTimestamp);
|
|
// Request logs.
|
|
context.call(rpc_request_buffer);
|
|
EXPECT_EQ(detached_drain.Close(), OkStatus());
|
|
ASSERT_EQ(context.status(), OkStatus());
|
|
EXPECT_EQ(context.responses().size(), 0u);
|
|
}
|
|
|
|
TEST_F(LogServiceTest, LargeLogEntry) {
|
|
const TestLogEntry expected_entry{
|
|
.metadata =
|
|
log_tokenized::Metadata::Set<PW_LOG_LEVEL_WARN,
|
|
(1 << PW_LOG_TOKENIZED_MODULE_BITS) - 1,
|
|
(1 << PW_LOG_TOKENIZED_FLAG_BITS) - 1,
|
|
(1 << PW_LOG_TOKENIZED_LINE_BITS) - 1>(),
|
|
.timestamp = std::numeric_limits<int64_t>::max(),
|
|
.tokenized_data = std::as_bytes(std::span(kMessage)),
|
|
};
|
|
|
|
// Add entry to multisink.
|
|
log::LogEntry::MemoryEncoder encoder(entry_encode_buffer_);
|
|
ASSERT_EQ(encoder.WriteMessage(expected_entry.tokenized_data), OkStatus());
|
|
ASSERT_EQ(encoder.WriteLineLevel(
|
|
(expected_entry.metadata.level() & PW_LOG_LEVEL_BITMASK) |
|
|
((expected_entry.metadata.line_number() << PW_LOG_LEVEL_BITS) &
|
|
~PW_LOG_LEVEL_BITMASK)),
|
|
OkStatus());
|
|
ASSERT_EQ(encoder.WriteFlags(expected_entry.metadata.flags()), OkStatus());
|
|
ASSERT_EQ(encoder.WriteTimestamp(expected_entry.timestamp), OkStatus());
|
|
const uint32_t little_endian_module = bytes::ConvertOrderTo(
|
|
std::endian::little, expected_entry.metadata.module());
|
|
ASSERT_EQ(
|
|
encoder.WriteModule(std::as_bytes(std::span(&little_endian_module, 1))),
|
|
OkStatus());
|
|
ASSERT_EQ(encoder.status(), OkStatus());
|
|
multisink_.HandleEntry(encoder);
|
|
|
|
// Start log stream.
|
|
RpcLogDrain& active_drain = drains_[0];
|
|
const uint32_t drain_channel_id = active_drain.channel_id();
|
|
LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
|
|
context.set_channel_id(drain_channel_id);
|
|
context.call(rpc_request_buffer);
|
|
ASSERT_EQ(active_drain.Flush(encoding_buffer_), OkStatus());
|
|
EXPECT_EQ(OkStatus(), active_drain.Close());
|
|
ASSERT_EQ(context.status(), OkStatus());
|
|
ASSERT_EQ(context.responses().size(), 1u);
|
|
|
|
// Verify message.
|
|
protobuf::Decoder entries_decoder(context.responses()[0]);
|
|
ASSERT_TRUE(entries_decoder.Next().ok());
|
|
ConstByteSpan entry;
|
|
EXPECT_TRUE(entries_decoder.ReadBytes(&entry).ok());
|
|
protobuf::Decoder entry_decoder(entry);
|
|
uint32_t drop_count = 0;
|
|
VerifyLogEntry(entry_decoder, expected_entry, drop_count);
|
|
EXPECT_EQ(drop_count, 0u);
|
|
}
|
|
|
|
TEST_F(LogServiceTest, InterruptedLogStreamSendsDropCount) {
|
|
const uint32_t drain_channel_id = kCloseWriterOnErrorDrainId;
|
|
auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
|
|
ASSERT_TRUE(drain.ok());
|
|
|
|
LogService log_service(drain_map_);
|
|
const size_t max_packets = 10;
|
|
rpc::RawFakeChannelOutput<10, 512> output;
|
|
rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
|
|
rpc::Server server(std::span(&channel, 1));
|
|
|
|
// Add as many entries needed to have multiple packets send.
|
|
StatusWithSize status =
|
|
AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp);
|
|
ASSERT_TRUE(status.ok());
|
|
|
|
const uint32_t max_messages_per_response =
|
|
encoding_buffer_.size() / status.size();
|
|
// Send less packets than the max to avoid crashes.
|
|
const uint32_t packets_sent = max_packets / 2;
|
|
const size_t total_entries = packets_sent * max_messages_per_response;
|
|
const size_t max_entries = 50;
|
|
// Check we can test all these entries.
|
|
ASSERT_GE(max_entries, total_entries);
|
|
AddLogEntries(total_entries - 1, kMessage, kSampleMetadata, kSampleTimestamp);
|
|
|
|
// Interrupt log stream with an error.
|
|
const uint32_t successful_packets_sent = packets_sent / 2;
|
|
output.set_send_status(Status::Unavailable(), successful_packets_sent);
|
|
|
|
// Request logs.
|
|
rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
|
|
server, drain_channel_id, log_service);
|
|
EXPECT_EQ(drain.value()->Open(writer), OkStatus());
|
|
// This drain closes on errors.
|
|
EXPECT_EQ(drain.value()->Flush(encoding_buffer_), Status::Aborted());
|
|
EXPECT_TRUE(output.done());
|
|
|
|
// Make sure not all packets were sent.
|
|
ASSERT_EQ(output.payloads<Logs::Listen>().size(), successful_packets_sent);
|
|
|
|
// Verify data in responses.
|
|
Vector<TestLogEntry, max_entries> expected_messages;
|
|
for (size_t i = 0; i < total_entries; ++i) {
|
|
expected_messages.push_back({.metadata = kSampleMetadata,
|
|
.timestamp = kSampleTimestamp,
|
|
.tokenized_data = std::as_bytes(
|
|
std::span(std::string_view(kMessage)))});
|
|
}
|
|
size_t entries_found = 0;
|
|
uint32_t drop_count_found = 0;
|
|
for (auto& response : output.payloads<Logs::Listen>()) {
|
|
protobuf::Decoder entry_decoder(response);
|
|
VerifyLogEntries(entry_decoder,
|
|
expected_messages,
|
|
entries_found,
|
|
entries_found,
|
|
drop_count_found);
|
|
}
|
|
|
|
// Verify that not all the entries were sent.
|
|
EXPECT_LT(entries_found, total_entries);
|
|
// The drain closes on errors, thus the drop count is reported on the next
|
|
// call to Flush.
|
|
EXPECT_EQ(drop_count_found, 0u);
|
|
|
|
// Reset channel output and resume log stream with a new writer.
|
|
output.clear();
|
|
writer = rpc::RawServerWriter::Open<Logs::Listen>(
|
|
server, drain_channel_id, log_service);
|
|
EXPECT_EQ(drain.value()->Open(writer), OkStatus());
|
|
EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
|
|
|
|
// One full packet was dropped. Since all messages are the same length,
|
|
// there are entries_found / successful_packets_sent per packet.
|
|
const uint32_t total_drop_count = entries_found / successful_packets_sent;
|
|
Vector<TestLogEntry, max_entries> expected_messages_after_reset;
|
|
expected_messages_after_reset.push_back(
|
|
{.metadata = kDropMessageMetadata,
|
|
.dropped = total_drop_count,
|
|
.tokenized_data = std::as_bytes(
|
|
std::span(std::string_view(RpcLogDrain::kWriterErrorMessage)))});
|
|
|
|
const uint32_t remaining_entries = total_entries - total_drop_count;
|
|
for (size_t i = 0; i < remaining_entries; ++i) {
|
|
expected_messages_after_reset.push_back(
|
|
{.metadata = kSampleMetadata,
|
|
.timestamp = kSampleTimestamp,
|
|
.tokenized_data =
|
|
std::as_bytes(std::span(std::string_view(kMessage)))});
|
|
}
|
|
|
|
size_t entries_found_after_reset = 0;
|
|
for (auto& response : output.payloads<Logs::Listen>()) {
|
|
protobuf::Decoder entry_decoder(response);
|
|
uint32_t expected_sequence_id =
|
|
entries_found + entries_found_after_reset + total_drop_count;
|
|
VerifyLogEntries(entry_decoder,
|
|
expected_messages_after_reset,
|
|
expected_sequence_id,
|
|
entries_found_after_reset,
|
|
drop_count_found);
|
|
}
|
|
EXPECT_EQ(entries_found + entries_found_after_reset, remaining_entries);
|
|
EXPECT_EQ(drop_count_found, total_drop_count);
|
|
}
|
|
|
|
TEST_F(LogServiceTest, InterruptedLogStreamIgnoresErrors) {
|
|
const uint32_t drain_channel_id = kIgnoreWriterErrorsDrainId;
|
|
auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
|
|
ASSERT_TRUE(drain.ok());
|
|
|
|
LogService log_service(drain_map_);
|
|
const size_t max_packets = 20;
|
|
rpc::RawFakeChannelOutput<max_packets, 512> output;
|
|
rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
|
|
rpc::Server server(std::span(&channel, 1));
|
|
|
|
// Add as many entries needed to have multiple packets send.
|
|
StatusWithSize status =
|
|
AddLogEntry(kMessage, kSampleMetadata, kSampleTimestamp);
|
|
ASSERT_TRUE(status.ok());
|
|
|
|
const uint32_t max_messages_per_response =
|
|
encoding_buffer_.size() / status.size();
|
|
// Send less packets than the max to avoid crashes.
|
|
const uint32_t packets_sent = 4;
|
|
const size_t total_entries = packets_sent * max_messages_per_response;
|
|
const size_t max_entries = 50;
|
|
// Check we can test all these entries.
|
|
ASSERT_GT(max_entries, total_entries);
|
|
AddLogEntries(total_entries - 1, kMessage, kSampleMetadata, kSampleTimestamp);
|
|
|
|
// Interrupt log stream with an error.
|
|
const uint32_t error_on_packet_count = packets_sent / 2;
|
|
output.set_send_status(Status::Unavailable(), error_on_packet_count);
|
|
|
|
// Request logs.
|
|
rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
|
|
server, drain_channel_id, log_service);
|
|
EXPECT_EQ(drain.value()->Open(writer), OkStatus());
|
|
// This drain ignores errors.
|
|
EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
|
|
EXPECT_FALSE(output.done());
|
|
|
|
// Make sure some packets were sent.
|
|
ASSERT_GT(output.payloads<Logs::Listen>().size(), 0u);
|
|
|
|
// Verify that not all the entries were sent.
|
|
size_t entries_found = 0;
|
|
for (auto& response : output.payloads<Logs::Listen>()) {
|
|
protobuf::Decoder entry_decoder(response);
|
|
entries_found += CountLogEntries(entry_decoder);
|
|
}
|
|
ASSERT_LT(entries_found, total_entries);
|
|
|
|
// Verify that all messages were sent.
|
|
const uint32_t total_drop_count = total_entries - entries_found;
|
|
Vector<TestLogEntry, max_entries> expected_messages;
|
|
for (size_t i = 0; i < entries_found; ++i) {
|
|
expected_messages.push_back({.metadata = kSampleMetadata,
|
|
.timestamp = kSampleTimestamp,
|
|
.tokenized_data = std::as_bytes(
|
|
std::span(std::string_view(kMessage)))});
|
|
}
|
|
|
|
entries_found = 0;
|
|
uint32_t drop_count_found = 0;
|
|
uint32_t i = 0;
|
|
for (; i < error_on_packet_count; ++i) {
|
|
protobuf::Decoder entry_decoder(output.payloads<Logs::Listen>()[i]);
|
|
VerifyLogEntries(entry_decoder,
|
|
expected_messages,
|
|
entries_found,
|
|
entries_found,
|
|
drop_count_found);
|
|
}
|
|
for (; i < output.payloads<Logs::Listen>().size(); ++i) {
|
|
protobuf::Decoder entry_decoder(output.payloads<Logs::Listen>()[i]);
|
|
VerifyLogEntries(entry_decoder,
|
|
expected_messages,
|
|
entries_found + total_drop_count,
|
|
entries_found,
|
|
drop_count_found);
|
|
}
|
|
// This drain ignores errors and thus doesn't report drops on its own.
|
|
EXPECT_EQ(drop_count_found, 0u);
|
|
|
|
// More calls to flush with errors will not affect this stubborn drain.
|
|
const size_t previous_stream_packet_count =
|
|
output.payloads<Logs::Listen>().size();
|
|
output.set_send_status(Status::Unavailable());
|
|
EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
|
|
EXPECT_FALSE(output.done());
|
|
ASSERT_EQ(output.payloads<Logs::Listen>().size(),
|
|
previous_stream_packet_count);
|
|
|
|
output.clear();
|
|
EXPECT_EQ(drain.value()->Close(), OkStatus());
|
|
EXPECT_TRUE(output.done());
|
|
}
|
|
|
|
TEST_F(LogServiceTest, FilterLogs) {
|
|
// Add a variety of logs.
|
|
const uint32_t module = 0xcafe;
|
|
const uint32_t flags = 0x02;
|
|
const uint32_t line_number = 100;
|
|
const auto debug_metadata = log_tokenized::Metadata::
|
|
Set<PW_LOG_LEVEL_DEBUG, module, flags, line_number>();
|
|
ASSERT_TRUE(AddLogEntry(kMessage, debug_metadata, kSampleTimestamp).ok());
|
|
const auto info_metadata = log_tokenized::Metadata::
|
|
Set<PW_LOG_LEVEL_INFO, module, flags, line_number>();
|
|
ASSERT_TRUE(AddLogEntry(kMessage, info_metadata, kSampleTimestamp).ok());
|
|
const auto warn_metadata = log_tokenized::Metadata::
|
|
Set<PW_LOG_LEVEL_WARN, module, flags, line_number>();
|
|
ASSERT_TRUE(AddLogEntry(kMessage, warn_metadata, kSampleTimestamp).ok());
|
|
const auto error_metadata = log_tokenized::Metadata::
|
|
Set<PW_LOG_LEVEL_ERROR, module, flags, line_number>();
|
|
ASSERT_TRUE(AddLogEntry(kMessage, error_metadata, kSampleTimestamp).ok());
|
|
const auto different_flags_metadata = log_tokenized::Metadata::
|
|
Set<PW_LOG_LEVEL_ERROR, module, 0x01, line_number>();
|
|
ASSERT_TRUE(
|
|
AddLogEntry(kMessage, different_flags_metadata, kSampleTimestamp).ok());
|
|
const auto different_module_metadata = log_tokenized::Metadata::
|
|
Set<PW_LOG_LEVEL_ERROR, 0xabcd, flags, line_number>();
|
|
ASSERT_TRUE(
|
|
AddLogEntry(kMessage, different_module_metadata, kSampleTimestamp).ok());
|
|
|
|
Vector<TestLogEntry, 3> expected_messages{
|
|
{.metadata = info_metadata,
|
|
.timestamp = kSampleTimestamp,
|
|
.tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))},
|
|
{.metadata = warn_metadata,
|
|
.timestamp = kSampleTimestamp,
|
|
.tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))},
|
|
{.metadata = error_metadata,
|
|
.timestamp = kSampleTimestamp,
|
|
.tokenized_data = std::as_bytes(std::span(std::string_view(kMessage)))},
|
|
};
|
|
|
|
// Set up filter rules for drain at drains_[1].
|
|
RpcLogDrain& drain = drains_[1];
|
|
for (auto& rule : rules2_) {
|
|
rule = {};
|
|
}
|
|
const auto module_little_endian =
|
|
bytes::CopyInOrder<uint32_t>(std::endian::little, module);
|
|
rules2_[0] = {
|
|
.action = Filter::Rule::Action::kKeep,
|
|
.level_greater_than_or_equal = log::FilterRule::Level::INFO_LEVEL,
|
|
.any_flags_set = flags,
|
|
.module_equals{module_little_endian.begin(), module_little_endian.end()}};
|
|
rules2_[1] = {
|
|
.action = Filter::Rule::Action::kDrop,
|
|
.level_greater_than_or_equal = log::FilterRule::Level::ANY_LEVEL,
|
|
.any_flags_set = 0,
|
|
.module_equals{},
|
|
};
|
|
|
|
// Request logs.
|
|
LOG_SERVICE_METHOD_CONTEXT context(drain_map_);
|
|
context.set_channel_id(drain.channel_id());
|
|
context.call({});
|
|
ASSERT_EQ(drain.Flush(encoding_buffer_), OkStatus());
|
|
|
|
size_t entries_found = 0;
|
|
uint32_t drop_count_found = 0;
|
|
for (auto& response : context.responses()) {
|
|
protobuf::Decoder entry_decoder(response);
|
|
VerifyLogEntries(entry_decoder,
|
|
expected_messages,
|
|
entries_found,
|
|
entries_found,
|
|
drop_count_found);
|
|
}
|
|
EXPECT_EQ(entries_found, 3u);
|
|
EXPECT_EQ(drop_count_found, 0u);
|
|
}
|
|
|
|
TEST_F(LogServiceTest, ReopenClosedLogStreamWithAcquiredBuffer) {
|
|
const uint32_t drain_channel_id = kCloseWriterOnErrorDrainId;
|
|
auto drain = drain_map_.GetDrainFromChannelId(drain_channel_id);
|
|
ASSERT_TRUE(drain.ok());
|
|
|
|
LogService log_service(drain_map_);
|
|
rpc::RawFakeChannelOutput<10, 512> output;
|
|
rpc::Channel channel(rpc::Channel::Create<drain_channel_id>(&output));
|
|
rpc::Server server(std::span(&channel, 1));
|
|
|
|
// Request logs.
|
|
rpc::RawServerWriter writer = rpc::RawServerWriter::Open<Logs::Listen>(
|
|
server, drain_channel_id, log_service);
|
|
EXPECT_EQ(drain.value()->Open(writer), OkStatus());
|
|
// This drain closes on errors.
|
|
EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
|
|
|
|
// Request log stream with a new writer.
|
|
writer = rpc::RawServerWriter::Open<Logs::Listen>(
|
|
server, drain_channel_id, log_service);
|
|
EXPECT_EQ(drain.value()->Open(writer), OkStatus());
|
|
EXPECT_EQ(drain.value()->Flush(encoding_buffer_), OkStatus());
|
|
}
|
|
|
|
} // namespace
|
|
} // namespace pw::log_rpc
|