923 lines
30 KiB
TypeScript
923 lines
30 KiB
TypeScript
// Copyright 2021 The Pigweed Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
|
// use this file except in compliance with the License. You may obtain a copy of
|
|
// the License at
|
|
//
|
|
// https://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
// License for the specific language governing permissions and limitations under
|
|
// the License.
|
|
|
|
/* eslint-env browser, jasmine */
|
|
import 'jasmine';
|
|
|
|
import {Status} from '@pigweed/pw_status';
|
|
import {MessageCreator} from '@pigweed/pw_protobuf_compiler';
|
|
import {Message} from 'google-protobuf';
|
|
import {
|
|
PacketType,
|
|
RpcPacket,
|
|
} from 'packet_proto_tspb/packet_proto_tspb_pb/pw_rpc/internal/packet_pb';
|
|
import {ProtoCollection} from 'rpc_proto_collection/generated/ts_proto_collection';
|
|
import {
|
|
Request,
|
|
Response,
|
|
} from 'test_protos_tspb/test_protos_tspb_pb/pw_rpc/ts/test_pb';
|
|
|
|
import {Client} from './client';
|
|
import {Channel, Method} from './descriptors';
|
|
import {
|
|
BidirectionalStreamingMethodStub,
|
|
ClientStreamingMethodStub,
|
|
ServerStreamingMethodStub,
|
|
UnaryMethodStub,
|
|
} from './method';
|
|
import * as packets from './packets';
|
|
|
|
const TEST_PROTO_PATH = 'pw_rpc/ts/test_protos-descriptor-set.proto.bin';
|
|
|
|
describe('Client', () => {
|
|
let protoCollection: ProtoCollection;
|
|
let client: Client;
|
|
let lastPacketSent: RpcPacket;
|
|
|
|
beforeEach(() => {
|
|
protoCollection = new ProtoCollection();
|
|
const channels = [new Channel(1, savePacket), new Channel(5)];
|
|
client = Client.fromProtoSet(channels, protoCollection);
|
|
});
|
|
|
|
function savePacket(packetBytes: Uint8Array): void {
|
|
lastPacketSent = RpcPacket.deserializeBinary(packetBytes);
|
|
}
|
|
|
|
it('channel returns undefined for empty list', () => {
|
|
const channels = Array<Channel>();
|
|
const emptyChannelClient = Client.fromProtoSet(channels, protoCollection);
|
|
expect(emptyChannelClient.channel()).toBeUndefined();
|
|
});
|
|
|
|
it('fetches channel or returns undefined', () => {
|
|
expect(client.channel(1)!.channel.id).toEqual(1);
|
|
expect(client.channel(5)!.channel.id).toEqual(5);
|
|
expect(client.channel()!.channel.id).toEqual(1);
|
|
expect(client.channel(2)).toBeUndefined();
|
|
});
|
|
|
|
it('ChannelClient fetches method by name', () => {
|
|
const channel = client.channel()!;
|
|
const stub = channel.methodStub('pw.rpc.test1.TheTestService.SomeUnary')!;
|
|
expect(stub.method.name).toEqual('SomeUnary');
|
|
});
|
|
|
|
it('ChannelClient for unknown name returns undefined', () => {
|
|
const channel = client.channel()!;
|
|
expect(channel.methodStub('')).toBeUndefined();
|
|
expect(
|
|
channel.methodStub('pw.rpc.test1.Garbage.SomeUnary')
|
|
).toBeUndefined();
|
|
expect(
|
|
channel.methodStub('pw.rpc.test1.TheTestService.Garbage')
|
|
).toBeUndefined();
|
|
});
|
|
|
|
it('processPacket with invalid proto data', () => {
|
|
const textEncoder = new TextEncoder();
|
|
const data = textEncoder.encode('NOT a packet!');
|
|
expect(client.processPacket(data)).toEqual(Status.DATA_LOSS);
|
|
});
|
|
|
|
it('processPacket not for client', () => {
|
|
const packet = new RpcPacket();
|
|
packet.setType(PacketType.REQUEST);
|
|
const processStatus = client.processPacket(packet.serializeBinary());
|
|
expect(processStatus).toEqual(Status.INVALID_ARGUMENT);
|
|
});
|
|
|
|
it('processPacket for unrecognized channel', () => {
|
|
const packet = packets.encodeResponse([123, 456, 789], new Request());
|
|
expect(client.processPacket(packet)).toEqual(Status.NOT_FOUND);
|
|
});
|
|
|
|
it('processPacket for unrecognized service', () => {
|
|
const packet = packets.encodeResponse([1, 456, 789], new Request());
|
|
const status = client.processPacket(packet);
|
|
expect(client.processPacket(packet)).toEqual(Status.OK);
|
|
|
|
expect(lastPacketSent.getChannelId()).toEqual(1);
|
|
expect(lastPacketSent.getServiceId()).toEqual(456);
|
|
expect(lastPacketSent.getMethodId()).toEqual(789);
|
|
expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR);
|
|
expect(lastPacketSent.getStatus()).toEqual(Status.NOT_FOUND);
|
|
});
|
|
|
|
it('processPacket for unrecognized method', () => {
|
|
const service = client.services.values().next().value;
|
|
|
|
const packet = packets.encodeResponse([1, service.id, 789], new Request());
|
|
const status = client.processPacket(packet);
|
|
expect(client.processPacket(packet)).toEqual(Status.OK);
|
|
|
|
expect(lastPacketSent.getChannelId()).toEqual(1);
|
|
expect(lastPacketSent.getServiceId()).toEqual(service.id);
|
|
expect(lastPacketSent.getMethodId()).toEqual(789);
|
|
expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR);
|
|
expect(lastPacketSent.getStatus()).toEqual(Status.NOT_FOUND);
|
|
});
|
|
|
|
it('processPacket for non-pending method', () => {
|
|
const service = client.services.values().next().value;
|
|
const method = service.methods.values().next().value;
|
|
|
|
const packet = packets.encodeResponse(
|
|
[1, service.id, method.id],
|
|
new Request()
|
|
);
|
|
const status = client.processPacket(packet);
|
|
expect(client.processPacket(packet)).toEqual(Status.OK);
|
|
|
|
expect(lastPacketSent.getChannelId()).toEqual(1);
|
|
expect(lastPacketSent.getServiceId()).toEqual(service.id);
|
|
expect(lastPacketSent.getMethodId()).toEqual(method.id);
|
|
expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR);
|
|
expect(lastPacketSent.getStatus()).toEqual(Status.FAILED_PRECONDITION);
|
|
});
|
|
});
|
|
|
|
describe('RPC', () => {
|
|
let protoCollection: ProtoCollection;
|
|
let client: Client;
|
|
let lastPacketSent: RpcPacket | undefined;
|
|
let requests: RpcPacket[] = [];
|
|
let nextPackets: [Uint8Array, Status][] = [];
|
|
let responseLock = false;
|
|
let sendResponsesAfterPackets = 0;
|
|
let outputException: Error | undefined;
|
|
|
|
beforeEach(async () => {
|
|
protoCollection = new ProtoCollection();
|
|
const channels = [new Channel(1, handlePacket), new Channel(2, () => {})];
|
|
client = Client.fromProtoSet(channels, protoCollection);
|
|
lastPacketSent = undefined;
|
|
requests = [];
|
|
nextPackets = [];
|
|
responseLock = false;
|
|
sendResponsesAfterPackets = 0;
|
|
outputException = undefined;
|
|
});
|
|
|
|
function newRequest(magicNumber = 123): Message {
|
|
const request = new Request();
|
|
request.setMagicNumber(magicNumber);
|
|
return request;
|
|
}
|
|
|
|
function newResponse(payload = '._.'): Message {
|
|
const response = new Response();
|
|
response.setPayload(payload);
|
|
return response;
|
|
}
|
|
|
|
function enqueueResponse(
|
|
channelId: number,
|
|
method: Method,
|
|
status: Status,
|
|
response?: Message
|
|
) {
|
|
const packet = new RpcPacket();
|
|
packet.setType(PacketType.RESPONSE);
|
|
packet.setChannelId(channelId);
|
|
packet.setServiceId(method.service.id);
|
|
packet.setMethodId(method.id);
|
|
packet.setStatus(status);
|
|
if (response === undefined) {
|
|
packet.setPayload(new Uint8Array());
|
|
} else {
|
|
packet.setPayload(response.serializeBinary());
|
|
}
|
|
nextPackets.push([packet.serializeBinary(), Status.OK]);
|
|
}
|
|
|
|
function enqueueServerStream(
|
|
channelId: number,
|
|
method: Method,
|
|
response: Message,
|
|
status: Status = Status.OK
|
|
) {
|
|
const packet = new RpcPacket();
|
|
packet.setType(PacketType.SERVER_STREAM);
|
|
packet.setChannelId(channelId);
|
|
packet.setServiceId(method.service.id);
|
|
packet.setMethodId(method.id);
|
|
packet.setPayload(response.serializeBinary());
|
|
packet.setStatus(status);
|
|
nextPackets.push([packet.serializeBinary(), status]);
|
|
}
|
|
|
|
function enqueueError(
|
|
channelId: number,
|
|
method: Method,
|
|
status: Status,
|
|
processStatus: Status
|
|
) {
|
|
const packet = new RpcPacket();
|
|
packet.setType(PacketType.SERVER_ERROR);
|
|
packet.setChannelId(channelId);
|
|
packet.setServiceId(method.service.id);
|
|
packet.setMethodId(method.id);
|
|
packet.setStatus(status);
|
|
|
|
nextPackets.push([packet.serializeBinary(), processStatus]);
|
|
}
|
|
|
|
function lastRequest(): RpcPacket {
|
|
if (requests.length == 0) {
|
|
throw Error('Tried to fetch request from empty list');
|
|
}
|
|
return requests[requests.length - 1];
|
|
}
|
|
|
|
function sentPayload(messageType: typeof Message): any {
|
|
return messageType.deserializeBinary(lastRequest().getPayload_asU8());
|
|
}
|
|
|
|
function handlePacket(data: Uint8Array): void {
|
|
if (outputException !== undefined) {
|
|
throw outputException;
|
|
}
|
|
requests.push(packets.decode(data));
|
|
|
|
if (sendResponsesAfterPackets > 1) {
|
|
sendResponsesAfterPackets -= 1;
|
|
return;
|
|
}
|
|
|
|
processEnqueuedPackets();
|
|
}
|
|
|
|
function processEnqueuedPackets(): void {
|
|
// Avoid infinite recursion when processing a packet causes another packet
|
|
// to send.
|
|
responseLock = true;
|
|
for (const [packet, status] of nextPackets) {
|
|
expect(client.processPacket(packet)).toEqual(status);
|
|
}
|
|
nextPackets = [];
|
|
responseLock = false;
|
|
}
|
|
|
|
describe('Unary', () => {
|
|
let unaryStub: UnaryMethodStub;
|
|
|
|
beforeEach(async () => {
|
|
unaryStub = client
|
|
.channel()
|
|
?.methodStub(
|
|
'pw.rpc.test1.TheTestService.SomeUnary'
|
|
)! as UnaryMethodStub;
|
|
});
|
|
|
|
it('blocking call', async () => {
|
|
for (let i = 0; i < 3; i++) {
|
|
enqueueResponse(
|
|
1,
|
|
unaryStub.method,
|
|
Status.ABORTED,
|
|
newResponse('0_o')
|
|
);
|
|
const [status, response] = await unaryStub.call(newRequest(6));
|
|
|
|
expect(sentPayload(Request).getMagicNumber()).toEqual(6);
|
|
expect(status).toEqual(Status.ABORTED);
|
|
expect(response).toEqual(newResponse('0_o'));
|
|
}
|
|
});
|
|
|
|
it('nonblocking call', () => {
|
|
for (let i = 0; i < 3; i++) {
|
|
const response = newResponse('hello world');
|
|
enqueueResponse(1, unaryStub.method, Status.ABORTED, response);
|
|
|
|
const onNext = jasmine.createSpy();
|
|
const onCompleted = jasmine.createSpy();
|
|
const onError = jasmine.createSpy();
|
|
const call = unaryStub.invoke(
|
|
newRequest(5),
|
|
onNext,
|
|
onCompleted,
|
|
onError
|
|
);
|
|
|
|
expect(sentPayload(Request).getMagicNumber()).toEqual(5);
|
|
expect(onNext).toHaveBeenCalledOnceWith(response);
|
|
expect(onError).not.toHaveBeenCalled();
|
|
expect(onCompleted).toHaveBeenCalledOnceWith(Status.ABORTED);
|
|
}
|
|
});
|
|
|
|
it('open', () => {
|
|
outputException = Error('Error should be ignored');
|
|
|
|
for (let i = 0; i < 3; i++) {
|
|
const response = newResponse('hello world');
|
|
enqueueResponse(1, unaryStub.method, Status.ABORTED, response);
|
|
|
|
const onNext = jasmine.createSpy();
|
|
const onCompleted = jasmine.createSpy();
|
|
const onError = jasmine.createSpy();
|
|
unaryStub.open(newRequest(5), onNext, onCompleted, onError);
|
|
expect(requests).toHaveSize(0);
|
|
|
|
processEnqueuedPackets();
|
|
|
|
expect(onNext).toHaveBeenCalledOnceWith(response);
|
|
expect(onError).not.toHaveBeenCalled();
|
|
expect(onCompleted).toHaveBeenCalledOnceWith(Status.ABORTED);
|
|
}
|
|
});
|
|
|
|
it('blocking server error', async () => {
|
|
for (let i = 0; i < 3; i++) {
|
|
enqueueError(1, unaryStub.method, Status.NOT_FOUND, Status.OK);
|
|
|
|
try {
|
|
await unaryStub.call(newRequest());
|
|
fail('call expected to fail');
|
|
} catch (e: any) {
|
|
expect(e.status).toBe(Status.NOT_FOUND);
|
|
}
|
|
}
|
|
});
|
|
|
|
it('nonblocking call cancel', () => {
|
|
for (let i = 0; i < 3; i++) {
|
|
const onNext = jasmine.createSpy();
|
|
const call = unaryStub.invoke(newRequest(), onNext);
|
|
|
|
expect(requests.length).toBeGreaterThan(0);
|
|
requests = [];
|
|
|
|
expect(call.cancel()).toBeTrue();
|
|
expect(call.cancel()).toBeFalse();
|
|
expect(onNext).not.toHaveBeenCalled();
|
|
}
|
|
});
|
|
|
|
it('blocking call with timeout', async () => {
|
|
try {
|
|
await unaryStub.call(newRequest(), 10);
|
|
fail('Promise should not be resolve');
|
|
} catch (err: any) {
|
|
expect(err.timeoutMs).toEqual(10);
|
|
}
|
|
});
|
|
|
|
it('nonblocking duplicate calls first is cancelled', () => {
|
|
const firstCall = unaryStub.invoke(newRequest());
|
|
expect(firstCall.completed).toBeFalse();
|
|
|
|
const secondCall = unaryStub.invoke(newRequest());
|
|
expect(firstCall.error).toEqual(Status.CANCELLED);
|
|
expect(secondCall.completed).toBeFalse();
|
|
});
|
|
|
|
it('nonblocking exception in callback', () => {
|
|
const errorCallback = () => {
|
|
throw Error('Something went wrong!');
|
|
};
|
|
|
|
enqueueResponse(1, unaryStub.method, Status.OK);
|
|
const call = unaryStub.invoke(newRequest(), errorCallback);
|
|
expect(call.callbackException!.name).toEqual('Error');
|
|
expect(call.callbackException!.message).toEqual('Something went wrong!');
|
|
});
|
|
});
|
|
|
|
describe('ServerStreaming', () => {
|
|
let serverStreaming: ServerStreamingMethodStub;
|
|
|
|
beforeEach(async () => {
|
|
serverStreaming = client
|
|
.channel()
|
|
?.methodStub(
|
|
'pw.rpc.test1.TheTestService.SomeServerStreaming'
|
|
)! as ServerStreamingMethodStub;
|
|
});
|
|
|
|
it('non-blocking call', () => {
|
|
const response1 = newResponse('!!!');
|
|
const response2 = newResponse('?');
|
|
|
|
for (let i = 0; i < 3; i++) {
|
|
enqueueServerStream(1, serverStreaming.method, response1);
|
|
enqueueServerStream(1, serverStreaming.method, response2);
|
|
enqueueResponse(1, serverStreaming.method, Status.ABORTED);
|
|
|
|
const onNext = jasmine.createSpy();
|
|
const onCompleted = jasmine.createSpy();
|
|
const onError = jasmine.createSpy();
|
|
serverStreaming.invoke(newRequest(4), onNext, onCompleted, onError);
|
|
|
|
expect(onNext).toHaveBeenCalledWith(response1);
|
|
expect(onNext).toHaveBeenCalledWith(response2);
|
|
expect(onError).not.toHaveBeenCalled();
|
|
expect(onCompleted).toHaveBeenCalledOnceWith(Status.ABORTED);
|
|
|
|
expect(
|
|
sentPayload(serverStreaming.method.requestType).getMagicNumber()
|
|
).toEqual(4);
|
|
}
|
|
});
|
|
|
|
it('open', () => {
|
|
outputException = Error('Error should be ignored');
|
|
const response1 = newResponse('!!!');
|
|
const response2 = newResponse('?');
|
|
|
|
for (let i = 0; i < 3; i++) {
|
|
enqueueServerStream(1, serverStreaming.method, response1);
|
|
enqueueServerStream(1, serverStreaming.method, response2);
|
|
enqueueResponse(1, serverStreaming.method, Status.ABORTED);
|
|
|
|
const onNext = jasmine.createSpy();
|
|
const onCompleted = jasmine.createSpy();
|
|
const onError = jasmine.createSpy();
|
|
const call = serverStreaming.open(
|
|
newRequest(3),
|
|
onNext,
|
|
onCompleted,
|
|
onError
|
|
);
|
|
|
|
expect(requests).toHaveSize(0);
|
|
processEnqueuedPackets();
|
|
|
|
expect(onNext).toHaveBeenCalledWith(response1);
|
|
expect(onNext).toHaveBeenCalledWith(response2);
|
|
expect(onError).not.toHaveBeenCalled();
|
|
expect(onCompleted).toHaveBeenCalledOnceWith(Status.ABORTED);
|
|
}
|
|
});
|
|
|
|
it('blocking timeout', async () => {
|
|
try {
|
|
await serverStreaming.call(newRequest(), 10);
|
|
fail('Promise should not be resolve');
|
|
} catch (err: any) {
|
|
expect(err.timeoutMs).toEqual(10);
|
|
}
|
|
});
|
|
|
|
it('non-blocking cancel', () => {
|
|
const testResponse = newResponse('!!!');
|
|
enqueueServerStream(1, serverStreaming.method, testResponse);
|
|
|
|
const onNext = jasmine.createSpy();
|
|
const onCompleted = jasmine.createSpy();
|
|
const onError = jasmine.createSpy();
|
|
let call = serverStreaming.invoke(newRequest(3), onNext);
|
|
expect(onNext).toHaveBeenCalledOnceWith(testResponse);
|
|
|
|
onNext.calls.reset();
|
|
|
|
call.cancel();
|
|
expect(lastRequest().getType()).toEqual(PacketType.CLIENT_ERROR);
|
|
expect(lastRequest().getStatus()).toEqual(Status.CANCELLED);
|
|
|
|
// Ensure the RPC can be called after being cancelled.
|
|
enqueueServerStream(1, serverStreaming.method, testResponse);
|
|
enqueueResponse(1, serverStreaming.method, Status.OK);
|
|
call = serverStreaming.invoke(newRequest(), onNext, onCompleted, onError);
|
|
expect(onNext).toHaveBeenCalledWith(testResponse);
|
|
expect(onError).not.toHaveBeenCalled();
|
|
expect(onCompleted).toHaveBeenCalledOnceWith(Status.OK);
|
|
});
|
|
});
|
|
|
|
describe('ClientStreaming', () => {
|
|
let clientStreaming: ClientStreamingMethodStub;
|
|
|
|
beforeEach(async () => {
|
|
clientStreaming = client
|
|
.channel()
|
|
?.methodStub(
|
|
'pw.rpc.test1.TheTestService.SomeClientStreaming'
|
|
)! as ClientStreamingMethodStub;
|
|
});
|
|
|
|
it('non-blocking call', () => {
|
|
const testResponse = newResponse('-.-');
|
|
|
|
for (let i = 0; i < 3; i++) {
|
|
const onNext = jasmine.createSpy();
|
|
const stream = clientStreaming.invoke(onNext);
|
|
expect(stream.completed).toBeFalse();
|
|
|
|
stream.send(newRequest(31));
|
|
expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM);
|
|
expect(sentPayload(Request).getMagicNumber()).toEqual(31);
|
|
expect(stream.completed).toBeFalse();
|
|
|
|
// Enqueue the server response to be sent after the next message.
|
|
enqueueResponse(1, clientStreaming.method, Status.OK, testResponse);
|
|
|
|
stream.send(newRequest(32));
|
|
expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM);
|
|
expect(sentPayload(Request).getMagicNumber()).toEqual(32);
|
|
|
|
expect(onNext).toHaveBeenCalledOnceWith(testResponse);
|
|
expect(stream.completed).toBeTrue();
|
|
expect(stream.status).toEqual(Status.OK);
|
|
expect(stream.error).toBeUndefined();
|
|
}
|
|
});
|
|
|
|
it('open', () => {
|
|
outputException = Error('Error should be ignored');
|
|
const response = newResponse('!!!');
|
|
|
|
for (let i = 0; i < 3; i++) {
|
|
enqueueResponse(1, clientStreaming.method, Status.OK, response);
|
|
|
|
const onNext = jasmine.createSpy();
|
|
const onCompleted = jasmine.createSpy();
|
|
const onError = jasmine.createSpy();
|
|
const call = clientStreaming.open(onNext, onCompleted, onError);
|
|
expect(requests).toHaveSize(0);
|
|
|
|
processEnqueuedPackets();
|
|
|
|
expect(onNext).toHaveBeenCalledWith(response);
|
|
expect(onError).not.toHaveBeenCalled();
|
|
expect(onCompleted).toHaveBeenCalledOnceWith(Status.OK);
|
|
}
|
|
});
|
|
|
|
it('blocking timeout', async () => {
|
|
try {
|
|
await clientStreaming.call([newRequest()], 10);
|
|
fail('Promise should not be resolve');
|
|
} catch (err: any) {
|
|
expect(err.timeoutMs).toEqual(10);
|
|
}
|
|
});
|
|
|
|
it('non-blocking call ended by client', () => {
|
|
const testResponse = newResponse('0.o');
|
|
for (let i = 0; i < 3; i++) {
|
|
const onNext = jasmine.createSpy();
|
|
const stream = clientStreaming.invoke(onNext);
|
|
expect(stream.completed).toBeFalse();
|
|
|
|
stream.send(newRequest(31));
|
|
expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM);
|
|
expect(sentPayload(Request).getMagicNumber()).toEqual(31);
|
|
expect(stream.completed).toBeFalse();
|
|
|
|
// Enqueue the server response to be sent after the next message.
|
|
enqueueResponse(1, clientStreaming.method, Status.OK, testResponse);
|
|
|
|
stream.finishAndWait();
|
|
expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM_END);
|
|
|
|
expect(onNext).toHaveBeenCalledOnceWith(testResponse);
|
|
expect(stream.completed).toBeTrue();
|
|
expect(stream.status).toEqual(Status.OK);
|
|
expect(stream.error).toBeUndefined();
|
|
}
|
|
});
|
|
|
|
it('non-blocking call cancelled', () => {
|
|
for (let i = 0; i < 3; i++) {
|
|
const stream = clientStreaming.invoke();
|
|
stream.send(newRequest());
|
|
|
|
expect(stream.cancel()).toBeTrue();
|
|
expect(lastRequest().getType()).toEqual(PacketType.CLIENT_ERROR);
|
|
expect(lastRequest().getStatus()).toEqual(Status.CANCELLED);
|
|
expect(stream.cancel()).toBeFalse();
|
|
expect(stream.completed).toBeTrue();
|
|
expect(stream.error).toEqual(Status.CANCELLED);
|
|
}
|
|
});
|
|
|
|
it('non-blocking call server error', async () => {
|
|
for (let i = 0; i < 3; i++) {
|
|
const stream = clientStreaming.invoke();
|
|
enqueueError(
|
|
1,
|
|
clientStreaming.method,
|
|
Status.INVALID_ARGUMENT,
|
|
Status.OK
|
|
);
|
|
|
|
stream.send(newRequest());
|
|
|
|
await stream
|
|
.finishAndWait()
|
|
.then(() => {
|
|
fail('Promise should not be resolved');
|
|
})
|
|
.catch(reason => {
|
|
expect(reason.status).toEqual(Status.INVALID_ARGUMENT);
|
|
});
|
|
}
|
|
});
|
|
|
|
it('non-blocking call server error after stream end', async () => {
|
|
for (let i = 0; i < 3; i++) {
|
|
const stream = clientStreaming.invoke();
|
|
// Error will be sent in response to the CLIENT_STREAM_END packet.
|
|
enqueueError(
|
|
1,
|
|
clientStreaming.method,
|
|
Status.INVALID_ARGUMENT,
|
|
Status.OK
|
|
);
|
|
|
|
await stream
|
|
.finishAndWait()
|
|
.then(() => {
|
|
fail('Promise should not be resolved');
|
|
})
|
|
.catch(reason => {
|
|
expect(reason.status).toEqual(Status.INVALID_ARGUMENT);
|
|
});
|
|
}
|
|
});
|
|
|
|
it('non-blocking call send after cancelled', () => {
|
|
const stream = clientStreaming.invoke();
|
|
expect(stream.cancel()).toBeTrue();
|
|
|
|
expect(() => stream.send(newRequest())).toThrowMatching(
|
|
error => error.status === Status.CANCELLED
|
|
);
|
|
});
|
|
|
|
it('non-blocking finish after completed', async () => {
|
|
const enqueuedResponse = newResponse('?!');
|
|
enqueueResponse(
|
|
1,
|
|
clientStreaming.method,
|
|
Status.UNAVAILABLE,
|
|
enqueuedResponse
|
|
);
|
|
|
|
const stream = clientStreaming.invoke();
|
|
const result = await stream.finishAndWait();
|
|
expect(result[1]).toEqual([enqueuedResponse]);
|
|
|
|
expect(await stream.finishAndWait()).toEqual(result);
|
|
expect(await stream.finishAndWait()).toEqual(result);
|
|
});
|
|
|
|
it('non-blocking finish after error', async () => {
|
|
enqueueError(1, clientStreaming.method, Status.UNAVAILABLE, Status.OK);
|
|
const stream = clientStreaming.invoke();
|
|
|
|
for (let i = 0; i < 3; i++) {
|
|
await stream
|
|
.finishAndWait()
|
|
.then(() => {
|
|
fail('Promise should not be resolved');
|
|
})
|
|
.catch(reason => {
|
|
expect(reason.status).toEqual(Status.UNAVAILABLE);
|
|
expect(stream.error).toEqual(Status.UNAVAILABLE);
|
|
expect(stream.response).toBeUndefined();
|
|
});
|
|
}
|
|
});
|
|
|
|
it('non-blocking duplicate calls first is cancelled', () => {
|
|
const firstCall = clientStreaming.invoke();
|
|
expect(firstCall.completed).toBeFalse();
|
|
|
|
const secondCall = clientStreaming.invoke();
|
|
expect(firstCall.error).toEqual(Status.CANCELLED);
|
|
expect(secondCall.completed).toBeFalse();
|
|
});
|
|
});
|
|
|
|
describe('BidirectionalStreaming', () => {
|
|
let bidiStreaming: BidirectionalStreamingMethodStub;
|
|
|
|
beforeEach(async () => {
|
|
bidiStreaming = client
|
|
.channel()
|
|
?.methodStub(
|
|
'pw.rpc.test1.TheTestService.SomeBidiStreaming'
|
|
)! as BidirectionalStreamingMethodStub;
|
|
});
|
|
|
|
it('blocking call', async () => {
|
|
const testRequests = [newRequest(123), newRequest(456)];
|
|
|
|
sendResponsesAfterPackets = 3;
|
|
enqueueResponse(1, bidiStreaming.method, Status.NOT_FOUND);
|
|
|
|
const results = await bidiStreaming.call(testRequests);
|
|
expect(results[0]).toEqual(Status.NOT_FOUND);
|
|
expect(results[1]).toEqual([]);
|
|
});
|
|
|
|
it('blocking server error', async () => {
|
|
const testRequests = [newRequest(123)];
|
|
enqueueError(1, bidiStreaming.method, Status.NOT_FOUND, Status.OK);
|
|
|
|
await bidiStreaming
|
|
.call(testRequests)
|
|
.then(() => {
|
|
fail('Promise should not be resolved');
|
|
})
|
|
.catch(reason => {
|
|
expect(reason.status).toEqual(Status.NOT_FOUND);
|
|
});
|
|
});
|
|
|
|
it('non-blocking call', () => {
|
|
const rep1 = newResponse('!!!');
|
|
const rep2 = newResponse('?');
|
|
|
|
for (let i = 0; i < 3; i++) {
|
|
const testResponses: Array<Message> = [];
|
|
const stream = bidiStreaming.invoke(response => {
|
|
testResponses.push(response);
|
|
});
|
|
expect(stream.completed).toBeFalse();
|
|
|
|
stream.send(newRequest(55));
|
|
expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM);
|
|
expect(sentPayload(Request).getMagicNumber()).toEqual(55);
|
|
expect(stream.completed).toBeFalse();
|
|
expect(testResponses).toEqual([]);
|
|
|
|
enqueueServerStream(1, bidiStreaming.method, rep1);
|
|
enqueueServerStream(1, bidiStreaming.method, rep2);
|
|
|
|
stream.send(newRequest(66));
|
|
expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM);
|
|
expect(sentPayload(Request).getMagicNumber()).toEqual(66);
|
|
expect(stream.completed).toBeFalse();
|
|
expect(testResponses).toEqual([rep1, rep2]);
|
|
|
|
enqueueResponse(1, bidiStreaming.method, Status.OK);
|
|
|
|
stream.send(newRequest(77));
|
|
expect(stream.completed).toBeTrue();
|
|
expect(testResponses).toEqual([rep1, rep2]);
|
|
expect(stream.status).toEqual(Status.OK);
|
|
expect(stream.error).toBeUndefined();
|
|
}
|
|
});
|
|
|
|
it('open', () => {
|
|
outputException = Error('Error should be ignored');
|
|
const response1 = newResponse('!!!');
|
|
const response2 = newResponse('?');
|
|
|
|
for (let i = 0; i < 3; i++) {
|
|
enqueueServerStream(1, bidiStreaming.method, response1);
|
|
enqueueServerStream(1, bidiStreaming.method, response2);
|
|
enqueueResponse(1, bidiStreaming.method, Status.OK);
|
|
|
|
const onNext = jasmine.createSpy();
|
|
const onCompleted = jasmine.createSpy();
|
|
const onError = jasmine.createSpy();
|
|
const call = bidiStreaming.open(onNext, onCompleted, onError);
|
|
expect(requests).toHaveSize(0);
|
|
|
|
processEnqueuedPackets();
|
|
|
|
expect(onNext).toHaveBeenCalledWith(response1);
|
|
expect(onNext).toHaveBeenCalledWith(response2);
|
|
expect(onError).not.toHaveBeenCalled();
|
|
expect(onCompleted).toHaveBeenCalledOnceWith(Status.OK);
|
|
}
|
|
});
|
|
|
|
it('blocking timeout', async () => {
|
|
try {
|
|
await bidiStreaming.call([newRequest()], 10);
|
|
fail('Promise should not be resolve');
|
|
} catch (err: any) {
|
|
expect(err.timeoutMs).toEqual(10);
|
|
}
|
|
});
|
|
|
|
it('non-blocking server error', async () => {
|
|
const response = newResponse('!!!');
|
|
|
|
for (let i = 0; i < 3; i++) {
|
|
const testResponses: Array<Message> = [];
|
|
const stream = bidiStreaming.invoke(response => {
|
|
testResponses.push(response);
|
|
});
|
|
expect(stream.completed).toBeFalse();
|
|
|
|
enqueueServerStream(1, bidiStreaming.method, response);
|
|
|
|
stream.send(newRequest(55));
|
|
expect(stream.completed).toBeFalse();
|
|
expect(testResponses).toEqual([response]);
|
|
|
|
enqueueError(1, bidiStreaming.method, Status.OUT_OF_RANGE, Status.OK);
|
|
|
|
stream.send(newRequest(999));
|
|
expect(stream.completed).toBeTrue();
|
|
expect(testResponses).toEqual([response]);
|
|
expect(stream.status).toBeUndefined();
|
|
expect(stream.error).toEqual(Status.OUT_OF_RANGE);
|
|
|
|
await stream
|
|
.finishAndWait()
|
|
.then(() => {
|
|
fail('Promise should not be resolved');
|
|
})
|
|
.catch(reason => {
|
|
expect(reason.status).toEqual(Status.OUT_OF_RANGE);
|
|
});
|
|
}
|
|
});
|
|
it('non-blocking server error after stream end', async () => {
|
|
for (let i = 0; i < 3; i++) {
|
|
const stream = bidiStreaming.invoke();
|
|
|
|
// Error is sent in response to CLIENT_STREAM_END packet.
|
|
enqueueError(
|
|
1,
|
|
bidiStreaming.method,
|
|
Status.INVALID_ARGUMENT,
|
|
Status.OK
|
|
);
|
|
|
|
await stream
|
|
.finishAndWait()
|
|
.then(() => {
|
|
fail('Promise should not be resolved');
|
|
})
|
|
.catch(reason => {
|
|
expect(reason.status).toEqual(Status.INVALID_ARGUMENT);
|
|
});
|
|
}
|
|
});
|
|
|
|
it('non-blocking send after cancelled', async () => {
|
|
const stream = bidiStreaming.invoke();
|
|
expect(stream.cancel()).toBeTrue();
|
|
|
|
try {
|
|
stream.send(newRequest());
|
|
fail('send should have failed');
|
|
} catch (e: any) {
|
|
expect(e.status).toBe(Status.CANCELLED);
|
|
}
|
|
});
|
|
|
|
it('non-blocking finish after completed', async () => {
|
|
const response = newResponse('!?');
|
|
enqueueServerStream(1, bidiStreaming.method, response);
|
|
enqueueResponse(1, bidiStreaming.method, Status.UNAVAILABLE);
|
|
|
|
const stream = bidiStreaming.invoke();
|
|
const result = await stream.finishAndWait();
|
|
expect(result[1]).toEqual([response]);
|
|
|
|
expect(await stream.finishAndWait()).toEqual(result);
|
|
expect(await stream.finishAndWait()).toEqual(result);
|
|
});
|
|
|
|
it('non-blocking finish after error', async () => {
|
|
const response = newResponse('!?');
|
|
enqueueServerStream(1, bidiStreaming.method, response);
|
|
enqueueError(1, bidiStreaming.method, Status.UNAVAILABLE, Status.OK);
|
|
|
|
const stream = bidiStreaming.invoke();
|
|
|
|
for (let i = 0; i < 3; i++) {
|
|
await stream
|
|
.finishAndWait()
|
|
.then(() => {
|
|
fail('Promise should not be resolved');
|
|
})
|
|
.catch(reason => {
|
|
expect(reason.status).toEqual(Status.UNAVAILABLE);
|
|
expect(stream.error).toEqual(Status.UNAVAILABLE);
|
|
});
|
|
}
|
|
});
|
|
it('non-blocking duplicate calls first is cancelled', () => {
|
|
const firstCall = bidiStreaming.invoke();
|
|
expect(firstCall.completed).toBeFalse();
|
|
const secondCall = bidiStreaming.invoke();
|
|
expect(firstCall.error).toEqual(Status.CANCELLED);
|
|
expect(secondCall.completed).toBeFalse();
|
|
});
|
|
});
|
|
});
|