1548 lines
55 KiB
C++
1548 lines
55 KiB
C++
// Copyright 2022 The Pigweed Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
|
// use this file except in compliance with the License. You may obtain a copy of
|
|
// the License at
|
|
//
|
|
// https://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
// License for the specific language governing permissions and limitations under
|
|
// the License.
|
|
|
|
#include "pw_transfer/transfer.h"
|
|
|
|
#include "gtest/gtest.h"
|
|
#include "pw_bytes/array.h"
|
|
#include "pw_rpc/raw/test_method_context.h"
|
|
#include "pw_rpc/thread_testing.h"
|
|
#include "pw_thread/thread.h"
|
|
#include "pw_thread_stl/options.h"
|
|
#include "pw_transfer/transfer.pwpb.h"
|
|
#include "pw_transfer_private/chunk_testing.h"
|
|
|
|
namespace pw::transfer::test {
|
|
namespace {
|
|
|
|
using namespace std::chrono_literals;
|
|
|
|
PW_MODIFY_DIAGNOSTICS_PUSH();
|
|
PW_MODIFY_DIAGNOSTIC(ignored, "-Wmissing-field-initializers");
|
|
|
|
// TODO(frolv): Have a generic way to obtain a thread for testing on any system.
|
|
thread::Options& TransferThreadOptions() {
|
|
static thread::stl::Options options;
|
|
return options;
|
|
}
|
|
|
|
using internal::Chunk;
|
|
|
|
class TestMemoryReader : public stream::SeekableReader {
|
|
public:
|
|
constexpr TestMemoryReader(std::span<const std::byte> data)
|
|
: memory_reader_(data) {}
|
|
|
|
Status DoSeek(ptrdiff_t offset, Whence origin) override {
|
|
if (seek_status.ok()) {
|
|
return memory_reader_.Seek(offset, origin);
|
|
}
|
|
return seek_status;
|
|
}
|
|
|
|
StatusWithSize DoRead(ByteSpan dest) final {
|
|
if (!read_status.ok()) {
|
|
return StatusWithSize(read_status, 0);
|
|
}
|
|
|
|
auto result = memory_reader_.Read(dest);
|
|
return result.ok() ? StatusWithSize(result->size())
|
|
: StatusWithSize(result.status(), 0);
|
|
}
|
|
|
|
Status seek_status;
|
|
Status read_status;
|
|
|
|
private:
|
|
stream::MemoryReader memory_reader_;
|
|
};
|
|
|
|
class SimpleReadTransfer final : public ReadOnlyHandler {
|
|
public:
|
|
SimpleReadTransfer(uint32_t transfer_id, ConstByteSpan data)
|
|
: ReadOnlyHandler(transfer_id),
|
|
prepare_read_called(false),
|
|
finalize_read_called(false),
|
|
finalize_read_status(Status::Unknown()),
|
|
reader_(data) {}
|
|
|
|
Status PrepareRead() final {
|
|
prepare_read_called = true;
|
|
|
|
if (!prepare_read_return_status.ok()) {
|
|
return prepare_read_return_status;
|
|
}
|
|
|
|
EXPECT_EQ(reader_.seek_status, reader_.Seek(0));
|
|
set_reader(reader_);
|
|
return OkStatus();
|
|
}
|
|
|
|
void FinalizeRead(Status status) final {
|
|
finalize_read_called = true;
|
|
finalize_read_status = status;
|
|
}
|
|
|
|
void set_seek_status(Status status) { reader_.seek_status = status; }
|
|
void set_read_status(Status status) { reader_.read_status = status; }
|
|
|
|
bool prepare_read_called;
|
|
bool finalize_read_called;
|
|
Status prepare_read_return_status;
|
|
Status finalize_read_status;
|
|
|
|
private:
|
|
TestMemoryReader reader_;
|
|
};
|
|
|
|
constexpr auto kData = bytes::Initialized<32>([](size_t i) { return i; });
|
|
|
|
class ReadTransfer : public ::testing::Test {
|
|
protected:
|
|
ReadTransfer(size_t max_chunk_size_bytes = 64)
|
|
: handler_(3, kData),
|
|
transfer_thread_(std::span(data_buffer_).first(max_chunk_size_bytes),
|
|
encode_buffer_),
|
|
ctx_(transfer_thread_, 64),
|
|
system_thread_(TransferThreadOptions(), transfer_thread_) {
|
|
ctx_.service().RegisterHandler(handler_);
|
|
|
|
ASSERT_FALSE(handler_.prepare_read_called);
|
|
ASSERT_FALSE(handler_.finalize_read_called);
|
|
|
|
ctx_.call(); // Open the read stream
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
}
|
|
|
|
~ReadTransfer() {
|
|
transfer_thread_.Terminate();
|
|
system_thread_.join();
|
|
}
|
|
|
|
SimpleReadTransfer handler_;
|
|
Thread<1, 1> transfer_thread_;
|
|
PW_RAW_TEST_METHOD_CONTEXT(TransferService, Read) ctx_;
|
|
thread::Thread system_thread_;
|
|
std::array<std::byte, 64> data_buffer_;
|
|
std::array<std::byte, 64> encode_buffer_;
|
|
};
|
|
|
|
TEST_F(ReadTransfer, SingleChunk) {
|
|
rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
|
|
.window_end_offset = 64,
|
|
.pending_bytes = 64,
|
|
.offset = 0,
|
|
.type = Chunk::Type::kTransferStart}));
|
|
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
});
|
|
|
|
EXPECT_TRUE(handler_.prepare_read_called);
|
|
EXPECT_FALSE(handler_.finalize_read_called);
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
|
|
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
|
|
|
|
// First chunk should have all the read data.
|
|
EXPECT_EQ(c0.transfer_id, 3u);
|
|
EXPECT_EQ(c0.offset, 0u);
|
|
ASSERT_EQ(c0.data.size(), kData.size());
|
|
EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
|
|
|
|
// Second chunk should be empty and set remaining_bytes = 0.
|
|
EXPECT_EQ(c1.transfer_id, 3u);
|
|
EXPECT_EQ(c1.data.size(), 0u);
|
|
ASSERT_TRUE(c1.remaining_bytes.has_value());
|
|
EXPECT_EQ(c1.remaining_bytes.value(), 0u);
|
|
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.finalize_read_called);
|
|
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
|
|
}
|
|
|
|
TEST_F(ReadTransfer, PendingBytes_SingleChunk) {
|
|
rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
|
|
.pending_bytes = 64,
|
|
.offset = 0,
|
|
.type = Chunk::Type::kTransferStart}));
|
|
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
});
|
|
|
|
EXPECT_TRUE(handler_.prepare_read_called);
|
|
EXPECT_FALSE(handler_.finalize_read_called);
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
|
|
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
|
|
|
|
// First chunk should have all the read data.
|
|
EXPECT_EQ(c0.transfer_id, 3u);
|
|
EXPECT_EQ(c0.offset, 0u);
|
|
ASSERT_EQ(c0.data.size(), kData.size());
|
|
EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
|
|
|
|
// Second chunk should be empty and set remaining_bytes = 0.
|
|
EXPECT_EQ(c1.transfer_id, 3u);
|
|
EXPECT_EQ(c1.data.size(), 0u);
|
|
ASSERT_TRUE(c1.remaining_bytes.has_value());
|
|
EXPECT_EQ(c1.remaining_bytes.value(), 0u);
|
|
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.finalize_read_called);
|
|
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
|
|
}
|
|
|
|
TEST_F(ReadTransfer, MultiChunk) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
|
|
.window_end_offset = 16,
|
|
.pending_bytes = 16,
|
|
.offset = 0,
|
|
.type = Chunk::Type::kTransferStart}));
|
|
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.prepare_read_called);
|
|
EXPECT_FALSE(handler_.finalize_read_called);
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
|
|
|
|
EXPECT_EQ(c0.transfer_id, 3u);
|
|
EXPECT_EQ(c0.offset, 0u);
|
|
ASSERT_EQ(c0.data.size(), 16u);
|
|
EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
|
|
|
|
ctx_.SendClientStream(
|
|
EncodeChunk({.transfer_id = 3,
|
|
.window_end_offset = 32,
|
|
.pending_bytes = 16,
|
|
.offset = 16,
|
|
.type = Chunk::Type::kParametersContinue}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
|
|
|
|
EXPECT_EQ(c1.transfer_id, 3u);
|
|
EXPECT_EQ(c1.offset, 16u);
|
|
ASSERT_EQ(c1.data.size(), 16u);
|
|
EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 16, c1.data.size()), 0);
|
|
|
|
ctx_.SendClientStream(
|
|
EncodeChunk({.transfer_id = 3,
|
|
.window_end_offset = 48,
|
|
.pending_bytes = 16,
|
|
.offset = 32,
|
|
.type = Chunk::Type::kParametersContinue}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 3u);
|
|
Chunk c2 = DecodeChunk(ctx_.responses()[2]);
|
|
|
|
EXPECT_EQ(c2.transfer_id, 3u);
|
|
EXPECT_EQ(c2.data.size(), 0u);
|
|
ASSERT_TRUE(c2.remaining_bytes.has_value());
|
|
EXPECT_EQ(c2.remaining_bytes.value(), 0u);
|
|
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.finalize_read_called);
|
|
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
|
|
}
|
|
|
|
TEST_F(ReadTransfer, MultiChunk_RepeatedContinuePackets) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
|
|
.window_end_offset = 16,
|
|
.pending_bytes = 16,
|
|
.offset = 0,
|
|
.type = Chunk::Type::kTransferStart}));
|
|
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
const auto continue_chunk =
|
|
EncodeChunk({.transfer_id = 3,
|
|
.window_end_offset = 24,
|
|
.pending_bytes = 8,
|
|
.offset = 16,
|
|
.type = Chunk::Type::kParametersContinue});
|
|
ctx_.SendClientStream(continue_chunk);
|
|
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
// Resend the CONTINUE packets that don't actually advance the window.
|
|
for (int i = 0; i < 3; ++i) {
|
|
ctx_.SendClientStream(continue_chunk);
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
}
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u); // Only sent one packet
|
|
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
|
|
|
|
EXPECT_EQ(c1.transfer_id, 3u);
|
|
EXPECT_EQ(c1.offset, 16u);
|
|
ASSERT_EQ(c1.data.size(), 8u);
|
|
EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 16, c1.data.size()), 0);
|
|
}
|
|
|
|
TEST_F(ReadTransfer, PendingBytes_MultiChunk) {
|
|
ctx_.SendClientStream(
|
|
EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
|
|
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.prepare_read_called);
|
|
EXPECT_FALSE(handler_.finalize_read_called);
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
|
|
|
|
EXPECT_EQ(c0.transfer_id, 3u);
|
|
EXPECT_EQ(c0.offset, 0u);
|
|
ASSERT_EQ(c0.data.size(), 16u);
|
|
EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
|
|
|
|
ctx_.SendClientStream(
|
|
EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 16}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
|
|
|
|
EXPECT_EQ(c1.transfer_id, 3u);
|
|
EXPECT_EQ(c1.offset, 16u);
|
|
ASSERT_EQ(c1.data.size(), 16u);
|
|
EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 16, c1.data.size()), 0);
|
|
|
|
ctx_.SendClientStream(
|
|
EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 32}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 3u);
|
|
Chunk c2 = DecodeChunk(ctx_.responses()[2]);
|
|
|
|
EXPECT_EQ(c2.transfer_id, 3u);
|
|
EXPECT_EQ(c2.data.size(), 0u);
|
|
ASSERT_TRUE(c2.remaining_bytes.has_value());
|
|
EXPECT_EQ(c2.remaining_bytes.value(), 0u);
|
|
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.finalize_read_called);
|
|
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
|
|
}
|
|
|
|
TEST_F(ReadTransfer, OutOfOrder_SeekingSupported) {
|
|
rpc::test::WaitForPackets(ctx_.output(), 4, [this] {
|
|
ctx_.SendClientStream(
|
|
EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
|
|
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
Chunk chunk = DecodeChunk(ctx_.responses().back());
|
|
EXPECT_TRUE(std::equal(
|
|
&kData[0], &kData[16], chunk.data.begin(), chunk.data.end()));
|
|
|
|
ctx_.SendClientStream(
|
|
EncodeChunk({.transfer_id = 3, .pending_bytes = 8, .offset = 2}));
|
|
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
chunk = DecodeChunk(ctx_.responses().back());
|
|
EXPECT_TRUE(std::equal(
|
|
&kData[2], &kData[10], chunk.data.begin(), chunk.data.end()));
|
|
|
|
ctx_.SendClientStream(
|
|
EncodeChunk({.transfer_id = 3, .pending_bytes = 64, .offset = 17}));
|
|
});
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 4u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[2]);
|
|
EXPECT_TRUE(std::equal(
|
|
&kData[17], kData.end(), chunk.data.begin(), chunk.data.end()));
|
|
}
|
|
|
|
TEST_F(ReadTransfer, OutOfOrder_SeekingNotSupported_EndsWithUnimplemented) {
|
|
handler_.set_seek_status(Status::Unimplemented());
|
|
|
|
ctx_.SendClientStream(
|
|
EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 0}));
|
|
ctx_.SendClientStream(
|
|
EncodeChunk({.transfer_id = 3, .pending_bytes = 8, .offset = 2}));
|
|
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses().back());
|
|
EXPECT_EQ(chunk.status, Status::Unimplemented());
|
|
}
|
|
|
|
TEST_F(ReadTransfer, MaxChunkSize_Client) {
|
|
rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
|
|
.pending_bytes = 64,
|
|
.max_chunk_size_bytes = 8,
|
|
.offset = 0,
|
|
.type = Chunk::Type::kTransferStart}));
|
|
});
|
|
|
|
EXPECT_TRUE(handler_.prepare_read_called);
|
|
EXPECT_FALSE(handler_.finalize_read_called);
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 5u);
|
|
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
|
|
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
|
|
Chunk c2 = DecodeChunk(ctx_.responses()[2]);
|
|
Chunk c3 = DecodeChunk(ctx_.responses()[3]);
|
|
Chunk c4 = DecodeChunk(ctx_.responses()[4]);
|
|
|
|
EXPECT_EQ(c0.transfer_id, 3u);
|
|
EXPECT_EQ(c0.offset, 0u);
|
|
ASSERT_EQ(c0.data.size(), 8u);
|
|
EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
|
|
|
|
EXPECT_EQ(c1.transfer_id, 3u);
|
|
EXPECT_EQ(c1.offset, 8u);
|
|
ASSERT_EQ(c1.data.size(), 8u);
|
|
EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 8, c1.data.size()), 0);
|
|
|
|
EXPECT_EQ(c2.transfer_id, 3u);
|
|
EXPECT_EQ(c2.offset, 16u);
|
|
ASSERT_EQ(c2.data.size(), 8u);
|
|
EXPECT_EQ(std::memcmp(c2.data.data(), kData.data() + 16, c2.data.size()), 0);
|
|
|
|
EXPECT_EQ(c3.transfer_id, 3u);
|
|
EXPECT_EQ(c3.offset, 24u);
|
|
ASSERT_EQ(c3.data.size(), 8u);
|
|
EXPECT_EQ(std::memcmp(c3.data.data(), kData.data() + 24, c3.data.size()), 0);
|
|
|
|
EXPECT_EQ(c4.transfer_id, 3u);
|
|
EXPECT_EQ(c4.data.size(), 0u);
|
|
ASSERT_TRUE(c4.remaining_bytes.has_value());
|
|
EXPECT_EQ(c4.remaining_bytes.value(), 0u);
|
|
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.finalize_read_called);
|
|
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
|
|
}
|
|
|
|
TEST_F(ReadTransfer, HandlerIsClearedAfterTransfer) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
|
|
.window_end_offset = 64,
|
|
.pending_bytes = 64,
|
|
.offset = 0,
|
|
.type = Chunk::Type::kTransferStart}));
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
ASSERT_TRUE(handler_.prepare_read_called);
|
|
ASSERT_TRUE(handler_.finalize_read_called);
|
|
ASSERT_EQ(OkStatus(), handler_.finalize_read_status);
|
|
|
|
// Now, clear state and start a second transfer
|
|
handler_.prepare_read_return_status = Status::FailedPrecondition();
|
|
handler_.prepare_read_called = false;
|
|
handler_.finalize_read_called = false;
|
|
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
|
|
.window_end_offset = 64,
|
|
.pending_bytes = 64,
|
|
.offset = 0,
|
|
.type = Chunk::Type::kTransferStart}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
// Prepare failed, so the handler should not have been stored in the context,
|
|
// and finalize should not have been called.
|
|
ASSERT_TRUE(handler_.prepare_read_called);
|
|
ASSERT_FALSE(handler_.finalize_read_called);
|
|
}
|
|
|
|
class ReadTransferMaxChunkSize8 : public ReadTransfer {
|
|
protected:
|
|
ReadTransferMaxChunkSize8() : ReadTransfer(/*max_chunk_size_bytes=*/8) {}
|
|
};
|
|
|
|
TEST_F(ReadTransferMaxChunkSize8, MaxChunkSize_Server) {
|
|
// Client asks for max 16-byte chunks, but service places a limit of 8 bytes.
|
|
rpc::test::WaitForPackets(ctx_.output(), 5, [this] {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
|
|
.pending_bytes = 64,
|
|
.max_chunk_size_bytes = 16,
|
|
.offset = 0,
|
|
.type = Chunk::Type::kTransferStart}));
|
|
});
|
|
|
|
EXPECT_TRUE(handler_.prepare_read_called);
|
|
EXPECT_FALSE(handler_.finalize_read_called);
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 5u);
|
|
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
|
|
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
|
|
Chunk c2 = DecodeChunk(ctx_.responses()[2]);
|
|
Chunk c3 = DecodeChunk(ctx_.responses()[3]);
|
|
Chunk c4 = DecodeChunk(ctx_.responses()[4]);
|
|
|
|
EXPECT_EQ(c0.transfer_id, 3u);
|
|
EXPECT_EQ(c0.offset, 0u);
|
|
ASSERT_EQ(c0.data.size(), 8u);
|
|
EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
|
|
|
|
EXPECT_EQ(c1.transfer_id, 3u);
|
|
EXPECT_EQ(c1.offset, 8u);
|
|
ASSERT_EQ(c1.data.size(), 8u);
|
|
EXPECT_EQ(std::memcmp(c1.data.data(), kData.data() + 8, c1.data.size()), 0);
|
|
|
|
EXPECT_EQ(c2.transfer_id, 3u);
|
|
EXPECT_EQ(c2.offset, 16u);
|
|
ASSERT_EQ(c2.data.size(), 8u);
|
|
EXPECT_EQ(std::memcmp(c2.data.data(), kData.data() + 16, c2.data.size()), 0);
|
|
|
|
EXPECT_EQ(c3.transfer_id, 3u);
|
|
EXPECT_EQ(c3.offset, 24u);
|
|
ASSERT_EQ(c3.data.size(), 8u);
|
|
EXPECT_EQ(std::memcmp(c3.data.data(), kData.data() + 24, c3.data.size()), 0);
|
|
|
|
EXPECT_EQ(c4.transfer_id, 3u);
|
|
EXPECT_EQ(c4.data.size(), 0u);
|
|
ASSERT_TRUE(c4.remaining_bytes.has_value());
|
|
EXPECT_EQ(c4.remaining_bytes.value(), 0u);
|
|
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.finalize_read_called);
|
|
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
|
|
}
|
|
|
|
TEST_F(ReadTransfer, ClientError) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
|
|
.pending_bytes = 16,
|
|
.offset = 0,
|
|
.type = Chunk::Type::kTransferStart}));
|
|
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.prepare_read_called);
|
|
EXPECT_FALSE(handler_.finalize_read_called);
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
|
|
// Send client error.
|
|
ctx_.SendClientStream(
|
|
EncodeChunk({.transfer_id = 3, .status = Status::OutOfRange()}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
EXPECT_TRUE(handler_.finalize_read_called);
|
|
EXPECT_EQ(handler_.finalize_read_status, Status::OutOfRange());
|
|
}
|
|
|
|
TEST_F(ReadTransfer, MalformedParametersChunk) {
|
|
// pending_bytes is required in a parameters chunk.
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.prepare_read_called);
|
|
EXPECT_TRUE(handler_.finalize_read_called);
|
|
EXPECT_EQ(handler_.finalize_read_status, Status::InvalidArgument());
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
|
|
EXPECT_EQ(chunk.transfer_id, 3u);
|
|
ASSERT_TRUE(chunk.status.has_value());
|
|
EXPECT_EQ(chunk.status.value(), Status::InvalidArgument());
|
|
}
|
|
|
|
TEST_F(ReadTransfer, UnregisteredHandler) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 11,
|
|
.pending_bytes = 32,
|
|
.offset = 0,
|
|
.type = Chunk::Type::kTransferStart}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
|
|
EXPECT_EQ(chunk.transfer_id, 11u);
|
|
ASSERT_TRUE(chunk.status.has_value());
|
|
EXPECT_EQ(chunk.status.value(), Status::NotFound());
|
|
}
|
|
|
|
TEST_F(ReadTransfer, IgnoresNonPendingTransfers) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .offset = 3}));
|
|
ctx_.SendClientStream(EncodeChunk(
|
|
{.transfer_id = 3, .offset = 0, .data = std::span(kData).first(10)}));
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
// Only start transfer for initial packet.
|
|
EXPECT_FALSE(handler_.prepare_read_called);
|
|
EXPECT_FALSE(handler_.finalize_read_called);
|
|
}
|
|
|
|
TEST_F(ReadTransfer, AbortAndRestartIfInitialPacketIsReceived) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
|
|
.pending_bytes = 16,
|
|
.offset = 0,
|
|
.type = Chunk::Type::kTransferStart}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
|
|
EXPECT_TRUE(handler_.prepare_read_called);
|
|
EXPECT_FALSE(handler_.finalize_read_called);
|
|
handler_.prepare_read_called = false; // Reset so can check if called again.
|
|
|
|
ctx_.SendClientStream( // Resend starting chunk
|
|
EncodeChunk({.transfer_id = 3,
|
|
.pending_bytes = 16,
|
|
.offset = 0,
|
|
.type = Chunk::Type::kTransferStart}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
|
|
EXPECT_TRUE(handler_.prepare_read_called);
|
|
EXPECT_TRUE(handler_.finalize_read_called);
|
|
EXPECT_EQ(handler_.finalize_read_status, Status::Aborted());
|
|
handler_.finalize_read_called = false; // Reset so can check later
|
|
|
|
ctx_.SendClientStream(
|
|
EncodeChunk({.transfer_id = 3, .pending_bytes = 16, .offset = 16}));
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 3u);
|
|
EXPECT_TRUE(handler_.finalize_read_called);
|
|
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
|
|
}
|
|
|
|
TEST_F(ReadTransfer, ZeroPendingBytesWithRemainingData_Aborts) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
|
|
.pending_bytes = 0,
|
|
.type = Chunk::Type::kTransferStart}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
ASSERT_TRUE(handler_.finalize_read_called);
|
|
EXPECT_EQ(handler_.finalize_read_status, Status::ResourceExhausted());
|
|
|
|
Chunk chunk = DecodeChunk(ctx_.responses().back());
|
|
EXPECT_EQ(chunk.status, Status::ResourceExhausted());
|
|
}
|
|
|
|
TEST_F(ReadTransfer, ZeroPendingBytesNoRemainingData_Completes) {
|
|
// Make the next read appear to be the end of the stream.
|
|
handler_.set_read_status(Status::OutOfRange());
|
|
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
|
|
.pending_bytes = 0,
|
|
.type = Chunk::Type::kTransferStart}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
Chunk chunk = DecodeChunk(ctx_.responses().back());
|
|
EXPECT_EQ(chunk.transfer_id, 3u);
|
|
EXPECT_EQ(chunk.remaining_bytes, 0u);
|
|
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
ASSERT_TRUE(handler_.finalize_read_called);
|
|
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
|
|
}
|
|
|
|
TEST_F(ReadTransfer, SendsErrorIfChunkIsReceivedInCompletedState) {
|
|
rpc::test::WaitForPackets(ctx_.output(), 2, [this] {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3,
|
|
.pending_bytes = 64,
|
|
.offset = 0,
|
|
.type = Chunk::Type::kTransferStart}));
|
|
});
|
|
|
|
EXPECT_TRUE(handler_.prepare_read_called);
|
|
EXPECT_FALSE(handler_.finalize_read_called);
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
Chunk c0 = DecodeChunk(ctx_.responses()[0]);
|
|
Chunk c1 = DecodeChunk(ctx_.responses()[1]);
|
|
|
|
// First chunk should have all the read data.
|
|
EXPECT_EQ(c0.transfer_id, 3u);
|
|
EXPECT_EQ(c0.offset, 0u);
|
|
ASSERT_EQ(c0.data.size(), kData.size());
|
|
EXPECT_EQ(std::memcmp(c0.data.data(), kData.data(), c0.data.size()), 0);
|
|
|
|
// Second chunk should be empty and set remaining_bytes = 0.
|
|
EXPECT_EQ(c1.transfer_id, 3u);
|
|
EXPECT_EQ(c1.data.size(), 0u);
|
|
ASSERT_TRUE(c1.remaining_bytes.has_value());
|
|
EXPECT_EQ(c1.remaining_bytes.value(), 0u);
|
|
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 3, .status = OkStatus()}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.finalize_read_called);
|
|
EXPECT_EQ(handler_.finalize_read_status, OkStatus());
|
|
|
|
// At this point the transfer should be in a completed state. Send a
|
|
// non-initial chunk as a continuation of the transfer.
|
|
handler_.finalize_read_called = false;
|
|
|
|
ctx_.SendClientStream(
|
|
EncodeChunk({.transfer_id = 3, .pending_bytes = 48, .offset = 16}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 3u);
|
|
|
|
Chunk c2 = DecodeChunk(ctx_.responses()[2]);
|
|
ASSERT_TRUE(c2.status.has_value());
|
|
EXPECT_EQ(c2.status.value(), Status::FailedPrecondition());
|
|
|
|
// FinalizeRead should not be called again.
|
|
EXPECT_FALSE(handler_.finalize_read_called);
|
|
}
|
|
|
|
class SimpleWriteTransfer final : public WriteOnlyHandler {
|
|
public:
|
|
SimpleWriteTransfer(uint32_t transfer_id, ByteSpan data)
|
|
: WriteOnlyHandler(transfer_id),
|
|
prepare_write_called(false),
|
|
finalize_write_called(false),
|
|
finalize_write_status(Status::Unknown()),
|
|
writer_(data) {}
|
|
|
|
Status PrepareWrite() final {
|
|
EXPECT_EQ(OkStatus(), writer_.Seek(0));
|
|
set_writer(writer_);
|
|
prepare_write_called = true;
|
|
return OkStatus();
|
|
}
|
|
|
|
Status FinalizeWrite(Status status) final {
|
|
finalize_write_called = true;
|
|
finalize_write_status = status;
|
|
return finalize_write_return_status_;
|
|
}
|
|
|
|
void set_finalize_write_return(Status status) {
|
|
finalize_write_return_status_ = status;
|
|
}
|
|
|
|
bool prepare_write_called;
|
|
bool finalize_write_called;
|
|
Status finalize_write_status;
|
|
|
|
private:
|
|
Status finalize_write_return_status_;
|
|
stream::MemoryWriter writer_;
|
|
};
|
|
|
|
class WriteTransfer : public ::testing::Test {
|
|
protected:
|
|
WriteTransfer(size_t max_bytes_to_receive = 64)
|
|
: buffer{},
|
|
handler_(7, buffer),
|
|
transfer_thread_(data_buffer_, encode_buffer_),
|
|
system_thread_(TransferThreadOptions(), transfer_thread_),
|
|
ctx_(transfer_thread_,
|
|
max_bytes_to_receive,
|
|
// Use a long timeout to avoid accidentally triggering timeouts.
|
|
std::chrono::minutes(1)) {
|
|
ctx_.service().RegisterHandler(handler_);
|
|
|
|
ASSERT_FALSE(handler_.prepare_write_called);
|
|
ASSERT_FALSE(handler_.finalize_write_called);
|
|
|
|
ctx_.call(); // Open the write stream
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
}
|
|
|
|
~WriteTransfer() {
|
|
transfer_thread_.Terminate();
|
|
system_thread_.join();
|
|
}
|
|
|
|
std::array<std::byte, kData.size()> buffer;
|
|
SimpleWriteTransfer handler_;
|
|
|
|
Thread<1, 1> transfer_thread_;
|
|
thread::Thread system_thread_;
|
|
std::array<std::byte, 64> data_buffer_;
|
|
std::array<std::byte, 64> encode_buffer_;
|
|
PW_RAW_TEST_METHOD_CONTEXT(TransferService, Write) ctx_;
|
|
};
|
|
|
|
TEST_F(WriteTransfer, SingleChunk) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.prepare_write_called);
|
|
EXPECT_FALSE(handler_.finalize_write_called);
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 32u);
|
|
ASSERT_TRUE(chunk.max_chunk_size_bytes.has_value());
|
|
EXPECT_EQ(chunk.max_chunk_size_bytes.value(), 37u);
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
|
|
.offset = 0,
|
|
.data = std::span(kData),
|
|
.remaining_bytes = 0}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
chunk = DecodeChunk(ctx_.responses()[1]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
ASSERT_TRUE(chunk.status.has_value());
|
|
EXPECT_EQ(chunk.status.value(), OkStatus());
|
|
|
|
EXPECT_TRUE(handler_.finalize_write_called);
|
|
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
|
|
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
|
|
}
|
|
|
|
TEST_F(WriteTransfer, FinalizeFails) {
|
|
// Return an error when FinalizeWrite is called.
|
|
handler_.set_finalize_write_return(Status::FailedPrecondition());
|
|
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
|
|
.offset = 0,
|
|
.data = std::span(kData),
|
|
.remaining_bytes = 0}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[1]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
ASSERT_TRUE(chunk.status.has_value());
|
|
EXPECT_EQ(chunk.status.value(), Status::DataLoss());
|
|
|
|
EXPECT_TRUE(handler_.finalize_write_called);
|
|
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
|
|
}
|
|
|
|
TEST_F(WriteTransfer, SendingFinalPacketFails) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ctx_.output().set_send_status(Status::Unknown());
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
|
|
.offset = 0,
|
|
.data = std::span(kData),
|
|
.remaining_bytes = 0}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
// Should only have sent the transfer parameters.
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 32u);
|
|
ASSERT_TRUE(chunk.max_chunk_size_bytes.has_value());
|
|
EXPECT_EQ(chunk.max_chunk_size_bytes.value(), 37u);
|
|
|
|
// When FinalizeWrite() was called, the transfer was considered successful.
|
|
EXPECT_TRUE(handler_.finalize_write_called);
|
|
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
|
|
}
|
|
|
|
TEST_F(WriteTransfer, MultiChunk) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.prepare_write_called);
|
|
EXPECT_FALSE(handler_.finalize_write_called);
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 32u);
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk(
|
|
{.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
|
|
.offset = 8,
|
|
.data = std::span(kData).subspan(8),
|
|
.remaining_bytes = 0}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
chunk = DecodeChunk(ctx_.responses()[1]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
ASSERT_TRUE(chunk.status.has_value());
|
|
EXPECT_EQ(chunk.status.value(), OkStatus());
|
|
|
|
EXPECT_TRUE(handler_.finalize_write_called);
|
|
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
|
|
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
|
|
}
|
|
|
|
TEST_F(WriteTransfer, WriteFailsOnRetry) {
|
|
// Skip one packet to fail on a retry.
|
|
ctx_.output().set_send_status(Status::FailedPrecondition(), 1);
|
|
|
|
// Wait for 3 packets: initial params, retry attempt, final error
|
|
rpc::test::WaitForPackets(ctx_.output(), 3, [this] {
|
|
// Send only one client packet so the service times out.
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.SimulateServerTimeout(7); // Time out to trigger retry
|
|
});
|
|
|
|
// Attempted to send 3 packets, but the 2nd packet was dropped.
|
|
// Check that the last packet is an INTERNAL error from the RPC write failure.
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[1]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
ASSERT_TRUE(chunk.status.has_value());
|
|
EXPECT_EQ(chunk.status.value(), Status::Internal());
|
|
}
|
|
|
|
TEST_F(WriteTransfer, TimeoutInRecoveryState) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses().back());
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.offset, 0u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 32u);
|
|
|
|
constexpr std::span data(kData);
|
|
|
|
ctx_.SendClientStream<64>(
|
|
EncodeChunk({.transfer_id = 7, .offset = 0, .data = data.first(8)}));
|
|
|
|
// Skip offset 8 to enter a recovery state.
|
|
ctx_.SendClientStream<64>(EncodeChunk(
|
|
{.transfer_id = 7, .offset = 12, .data = data.subspan(12, 4)}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
// Recovery parameters should be sent for offset 8.
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
chunk = DecodeChunk(ctx_.responses().back());
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.offset, 8u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 24u);
|
|
|
|
// Timeout while in the recovery state.
|
|
transfer_thread_.SimulateServerTimeout(7);
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
// Same recovery parameters should be re-sent.
|
|
ASSERT_EQ(ctx_.total_responses(), 3u);
|
|
chunk = DecodeChunk(ctx_.responses().back());
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.offset, 8u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 24u);
|
|
}
|
|
|
|
TEST_F(WriteTransfer, ExtendWindow) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.prepare_write_called);
|
|
EXPECT_FALSE(handler_.finalize_write_called);
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.window_end_offset, 32u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 32u);
|
|
|
|
// Window starts at 32 bytes and should extend when half of that is sent.
|
|
ctx_.SendClientStream<64>(EncodeChunk(
|
|
{.transfer_id = 7, .offset = 0, .data = std::span(kData).first(4)}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk(
|
|
{.transfer_id = 7, .offset = 4, .data = std::span(kData).subspan(4, 4)}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk(
|
|
{.transfer_id = 7, .offset = 8, .data = std::span(kData).subspan(8, 4)}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
|
|
ctx_.SendClientStream<64>(
|
|
EncodeChunk({.transfer_id = 7,
|
|
.offset = 12,
|
|
.data = std::span(kData).subspan(12, 4)}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
|
|
// Extend parameters chunk.
|
|
chunk = DecodeChunk(ctx_.responses()[1]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.window_end_offset, 32u);
|
|
EXPECT_EQ(chunk.type, Chunk::Type::kParametersContinue);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 16u);
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
|
|
.offset = 16,
|
|
.data = std::span(kData).subspan(16),
|
|
.remaining_bytes = 0}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 3u);
|
|
chunk = DecodeChunk(ctx_.responses()[2]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
ASSERT_TRUE(chunk.status.has_value());
|
|
EXPECT_EQ(chunk.status.value(), OkStatus());
|
|
|
|
EXPECT_TRUE(handler_.finalize_write_called);
|
|
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
|
|
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
|
|
}
|
|
|
|
class WriteTransferMaxBytes16 : public WriteTransfer {
|
|
protected:
|
|
WriteTransferMaxBytes16() : WriteTransfer(/*max_bytes_to_receive=*/16) {}
|
|
};
|
|
|
|
TEST_F(WriteTransfer, TransmitterReducesWindow) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.prepare_write_called);
|
|
EXPECT_FALSE(handler_.finalize_write_called);
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses().back());
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.window_end_offset, 32u);
|
|
|
|
// Send only 12 bytes and set that as the new end offset.
|
|
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
|
|
.window_end_offset = 12,
|
|
.offset = 0,
|
|
.data = std::span(kData).first(12)}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
|
|
// Receiver should respond immediately with a retransmit chunk as the end of
|
|
// the window has been reached.
|
|
chunk = DecodeChunk(ctx_.responses().back());
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.offset, 12u);
|
|
EXPECT_EQ(chunk.window_end_offset, 32u);
|
|
EXPECT_EQ(chunk.type, Chunk::Type::kParametersRetransmit);
|
|
}
|
|
|
|
TEST_F(WriteTransfer, TransmitterExtendsWindow_TerminatesWithInvalid) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.prepare_write_called);
|
|
EXPECT_FALSE(handler_.finalize_write_called);
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses().back());
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.window_end_offset, 32u);
|
|
|
|
// Send only 12 bytes and set that as the new end offset.
|
|
ctx_.SendClientStream<64>(
|
|
EncodeChunk({.transfer_id = 7,
|
|
// Larger window end offset than the receiver's.
|
|
.window_end_offset = 48,
|
|
.offset = 0,
|
|
.data = std::span(kData).first(16)}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
|
|
chunk = DecodeChunk(ctx_.responses().back());
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
ASSERT_TRUE(chunk.status.has_value());
|
|
EXPECT_EQ(chunk.status.value(), Status::Internal());
|
|
}
|
|
|
|
TEST_F(WriteTransferMaxBytes16, MultipleParameters) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.prepare_write_called);
|
|
EXPECT_FALSE(handler_.finalize_write_called);
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 16u);
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk(
|
|
{.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
chunk = DecodeChunk(ctx_.responses()[1]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.offset, 8u);
|
|
EXPECT_EQ(chunk.window_end_offset, 24u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 16u);
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk(
|
|
{.transfer_id = 7, .offset = 8, .data = std::span(kData).subspan(8, 8)}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 3u);
|
|
chunk = DecodeChunk(ctx_.responses()[2]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.offset, 16u);
|
|
EXPECT_EQ(chunk.window_end_offset, 32u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 16u);
|
|
|
|
ctx_.SendClientStream<64>(
|
|
EncodeChunk({.transfer_id = 7,
|
|
.offset = 16,
|
|
.data = std::span(kData).subspan(16, 8)}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 4u);
|
|
chunk = DecodeChunk(ctx_.responses()[3]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.offset, 24u);
|
|
EXPECT_EQ(chunk.window_end_offset, 32u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 8u);
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
|
|
.offset = 24,
|
|
.data = std::span(kData).subspan(24),
|
|
.remaining_bytes = 0}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 5u);
|
|
chunk = DecodeChunk(ctx_.responses()[4]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
ASSERT_TRUE(chunk.status.has_value());
|
|
EXPECT_EQ(chunk.status.value(), OkStatus());
|
|
|
|
EXPECT_TRUE(handler_.finalize_write_called);
|
|
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
|
|
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
|
|
}
|
|
|
|
TEST_F(WriteTransferMaxBytes16, SetsDefaultPendingBytes) {
|
|
// Default max bytes is smaller than buffer.
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 16u);
|
|
}
|
|
|
|
TEST_F(WriteTransfer, SetsWriterPendingBytes) {
|
|
// Buffer is smaller than constructor's default max bytes.
|
|
std::array<std::byte, 8> small_buffer = {};
|
|
|
|
SimpleWriteTransfer handler_(987, small_buffer);
|
|
ctx_.service().RegisterHandler(handler_);
|
|
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 987}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
|
|
EXPECT_EQ(chunk.transfer_id, 987u);
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 8u);
|
|
}
|
|
|
|
TEST_F(WriteTransfer, UnexpectedOffset) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.prepare_write_called);
|
|
EXPECT_FALSE(handler_.finalize_write_called);
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.offset, 0u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 32u);
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk(
|
|
{.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
|
|
.offset = 4, // incorrect
|
|
.data = std::span(kData).subspan(16),
|
|
.remaining_bytes = 0}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
chunk = DecodeChunk(ctx_.responses()[1]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.offset, 8u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 24u);
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
|
|
.offset = 8, // correct
|
|
.data = std::span(kData).subspan(8),
|
|
.remaining_bytes = 0}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 3u);
|
|
chunk = DecodeChunk(ctx_.responses()[2]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
ASSERT_TRUE(chunk.status.has_value());
|
|
EXPECT_EQ(chunk.status.value(), OkStatus());
|
|
|
|
EXPECT_TRUE(handler_.finalize_write_called);
|
|
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
|
|
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
|
|
}
|
|
|
|
TEST_F(WriteTransferMaxBytes16, TooMuchData) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.prepare_write_called);
|
|
EXPECT_FALSE(handler_.finalize_write_called);
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 16u);
|
|
|
|
// pending_bytes = 16 but send 24
|
|
ctx_.SendClientStream<64>(EncodeChunk(
|
|
{.transfer_id = 7, .offset = 0, .data = std::span(kData).first(24)}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
chunk = DecodeChunk(ctx_.responses()[1]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
ASSERT_TRUE(chunk.status.has_value());
|
|
EXPECT_EQ(chunk.status.value(), Status::Internal());
|
|
}
|
|
|
|
TEST_F(WriteTransfer, UnregisteredHandler) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 999}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
|
|
EXPECT_EQ(chunk.transfer_id, 999u);
|
|
ASSERT_TRUE(chunk.status.has_value());
|
|
EXPECT_EQ(chunk.status.value(), Status::NotFound());
|
|
}
|
|
|
|
TEST_F(WriteTransfer, ClientError) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.prepare_write_called);
|
|
EXPECT_FALSE(handler_.finalize_write_called);
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 32u);
|
|
|
|
ctx_.SendClientStream<64>(
|
|
EncodeChunk({.transfer_id = 7, .status = Status::DataLoss()}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_EQ(ctx_.total_responses(), 1u);
|
|
|
|
EXPECT_TRUE(handler_.finalize_write_called);
|
|
EXPECT_EQ(handler_.finalize_write_status, Status::DataLoss());
|
|
}
|
|
|
|
TEST_F(WriteTransfer, OnlySendParametersUpdateOnceAfterDrop) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
|
|
constexpr std::span data(kData);
|
|
ctx_.SendClientStream<64>(
|
|
EncodeChunk({.transfer_id = 7, .offset = 0, .data = data.first(1)}));
|
|
|
|
// Drop offset 1, then send the rest of the data.
|
|
for (uint32_t i = 2; i < kData.size(); ++i) {
|
|
ctx_.SendClientStream<64>(EncodeChunk(
|
|
{.transfer_id = 7, .offset = i, .data = data.subspan(i, 1)}));
|
|
}
|
|
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses().back());
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.offset, 1u);
|
|
|
|
// Send the remaining data and the final status.
|
|
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
|
|
.offset = 1,
|
|
.data = data.subspan(1, 31),
|
|
.status = OkStatus()}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.finalize_write_called);
|
|
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
|
|
}
|
|
|
|
TEST_F(WriteTransfer, ResendParametersIfSentRepeatedChunkDuringRecovery) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
|
|
constexpr std::span data(kData);
|
|
|
|
// Skip offset 0, then send the rest of the data.
|
|
for (uint32_t i = 1; i < kData.size(); ++i) {
|
|
ctx_.SendClientStream<64>(EncodeChunk(
|
|
{.transfer_id = 7, .offset = i, .data = data.subspan(i, 1)}));
|
|
}
|
|
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u); // Resent transfer parameters once.
|
|
|
|
const auto last_chunk = EncodeChunk(
|
|
{.transfer_id = 7, .offset = kData.size() - 1, .data = data.last(1)});
|
|
ctx_.SendClientStream<64>(last_chunk);
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
// Resent transfer parameters since the packet is repeated
|
|
ASSERT_EQ(ctx_.total_responses(), 3u);
|
|
|
|
ctx_.SendClientStream<64>(last_chunk);
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 4u);
|
|
|
|
Chunk chunk = DecodeChunk(ctx_.responses().back());
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.offset, 0u);
|
|
EXPECT_TRUE(chunk.pending_bytes.has_value());
|
|
|
|
// Resumes normal operation when correct offset is sent.
|
|
ctx_.SendClientStream<64>(EncodeChunk(
|
|
{.transfer_id = 7, .offset = 0, .data = kData, .status = OkStatus()}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.finalize_write_called);
|
|
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
|
|
}
|
|
|
|
TEST_F(WriteTransfer, ResendsStatusIfClientRetriesAfterStatusChunk) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
|
|
.offset = 0,
|
|
.data = std::span(kData),
|
|
.remaining_bytes = 0}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses().back());
|
|
ASSERT_TRUE(chunk.status.has_value());
|
|
EXPECT_EQ(chunk.status.value(), OkStatus());
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
|
|
.offset = 0,
|
|
.data = std::span(kData),
|
|
.remaining_bytes = 0}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 3u);
|
|
chunk = DecodeChunk(ctx_.responses().back());
|
|
ASSERT_TRUE(chunk.status.has_value());
|
|
EXPECT_EQ(chunk.status.value(), OkStatus());
|
|
}
|
|
|
|
TEST_F(WriteTransfer, IgnoresNonPendingTransfers) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7, .offset = 3}));
|
|
ctx_.SendClientStream(EncodeChunk(
|
|
{.transfer_id = 7, .offset = 0, .data = std::span(kData).first(10)}));
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7, .status = OkStatus()}));
|
|
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
// Only start transfer for initial packet.
|
|
EXPECT_FALSE(handler_.prepare_write_called);
|
|
EXPECT_FALSE(handler_.finalize_write_called);
|
|
}
|
|
|
|
TEST_F(WriteTransfer, AbortAndRestartIfInitialPacketIsReceived) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk(
|
|
{.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
|
|
ASSERT_TRUE(handler_.prepare_write_called);
|
|
ASSERT_FALSE(handler_.finalize_write_called);
|
|
handler_.prepare_write_called = false; // Reset to check it's called again.
|
|
|
|
// Simulate client disappearing then restarting the transfer.
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.prepare_write_called);
|
|
EXPECT_TRUE(handler_.finalize_write_called);
|
|
EXPECT_EQ(handler_.finalize_write_status, Status::Aborted());
|
|
|
|
handler_.finalize_write_called = false; // Reset to check it's called again.
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk({.transfer_id = 7,
|
|
.offset = 0,
|
|
.data = std::span(kData),
|
|
.remaining_bytes = 0}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 3u);
|
|
|
|
EXPECT_TRUE(handler_.finalize_write_called);
|
|
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
|
|
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
|
|
}
|
|
|
|
class SometimesUnavailableReadHandler final : public ReadOnlyHandler {
|
|
public:
|
|
SometimesUnavailableReadHandler(uint32_t transfer_id, ConstByteSpan data)
|
|
: ReadOnlyHandler(transfer_id), reader_(data), call_count_(0) {}
|
|
|
|
Status PrepareRead() final {
|
|
if ((call_count_++ % 2) == 0) {
|
|
return Status::Unavailable();
|
|
}
|
|
|
|
set_reader(reader_);
|
|
return OkStatus();
|
|
}
|
|
|
|
private:
|
|
stream::MemoryReader reader_;
|
|
int call_count_;
|
|
};
|
|
|
|
TEST_F(ReadTransfer, PrepareError) {
|
|
SometimesUnavailableReadHandler unavailable_handler(88, kData);
|
|
ctx_.service().RegisterHandler(unavailable_handler);
|
|
|
|
ctx_.SendClientStream(
|
|
EncodeChunk({.transfer_id = 88, .pending_bytes = 128, .offset = 0}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
|
|
EXPECT_EQ(chunk.transfer_id, 88u);
|
|
ASSERT_TRUE(chunk.status.has_value());
|
|
EXPECT_EQ(chunk.status.value(), Status::DataLoss());
|
|
|
|
// Try starting the transfer again. It should work this time.
|
|
// TODO(frolv): This won't work until completion ACKs are supported.
|
|
if (false) {
|
|
ctx_.SendClientStream(
|
|
EncodeChunk({.transfer_id = 88, .pending_bytes = 128, .offset = 0}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
chunk = DecodeChunk(ctx_.responses()[1]);
|
|
EXPECT_EQ(chunk.transfer_id, 88u);
|
|
ASSERT_EQ(chunk.data.size(), kData.size());
|
|
EXPECT_EQ(std::memcmp(chunk.data.data(), kData.data(), chunk.data.size()),
|
|
0);
|
|
}
|
|
}
|
|
|
|
TEST_F(WriteTransferMaxBytes16, Service_SetMaxPendingBytes) {
|
|
ctx_.SendClientStream(EncodeChunk({.transfer_id = 7}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
EXPECT_TRUE(handler_.prepare_write_called);
|
|
EXPECT_FALSE(handler_.finalize_write_called);
|
|
|
|
// First parameters chunk has default pending bytes of 16.
|
|
ASSERT_EQ(ctx_.total_responses(), 1u);
|
|
Chunk chunk = DecodeChunk(ctx_.responses()[0]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 16u);
|
|
|
|
// Update the pending bytes value.
|
|
ctx_.service().set_max_pending_bytes(12);
|
|
|
|
ctx_.SendClientStream<64>(EncodeChunk(
|
|
{.transfer_id = 7, .offset = 0, .data = std::span(kData).first(8)}));
|
|
transfer_thread_.WaitUntilEventIsProcessed();
|
|
|
|
// Second parameters chunk should use the new max pending bytes.
|
|
ASSERT_EQ(ctx_.total_responses(), 2u);
|
|
chunk = DecodeChunk(ctx_.responses()[1]);
|
|
EXPECT_EQ(chunk.transfer_id, 7u);
|
|
EXPECT_EQ(chunk.offset, 8u);
|
|
EXPECT_EQ(chunk.window_end_offset, 20u);
|
|
ASSERT_TRUE(chunk.pending_bytes.has_value());
|
|
EXPECT_EQ(chunk.pending_bytes.value(), 12u);
|
|
}
|
|
|
|
PW_MODIFY_DIAGNOSTICS_POP();
|
|
|
|
} // namespace
|
|
} // namespace pw::transfer::test
|