Skip to content

Commit

Permalink
Update and reenable TelemetryLogWriter unit tests.
Browse files Browse the repository at this point in the history
This was originally postponed to make the branch deadline.

Bug: 968640
Change-Id: I5066bd1f161e25428f872819bfc92bb16f4c7c35
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1668070
Commit-Queue: Erik Jensen <rkjnsn@chromium.org>
Reviewed-by: Yuwei Huang <yuweih@chromium.org>
Reviewed-by: Joe Downing <joedow@chromium.org>
Cr-Commit-Position: refs/heads/master@{#792002}
  • Loading branch information
Erik Jensen authored and Commit Bot committed Jul 27, 2020
1 parent 3e029cf commit 1ea0101
Show file tree
Hide file tree
Showing 20 changed files with 316 additions and 205 deletions.
5 changes: 1 addition & 4 deletions remoting/base/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,7 @@ source_set("unit_tests") {
"run_all_unittests.cc",
"running_samples_unittest.cc",
"session_options_unittest.cc",

# TODO(https://crbug.com/968640): Fix and reenable tests once we have some
# breathing room.
# "telemetry_log_writer_unittest.cc",
"telemetry_log_writer_unittest.cc",
"typed_buffer_unittest.cc",
"util_unittest.cc",
"weighted_samples_unittest.cc",
Expand Down
4 changes: 2 additions & 2 deletions remoting/base/grpc_support/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class MyClass {
// Requests will be silently dropped once the executor is destroyed, so it's
// safe to bind with raw pointers.
auto grpc_request = CreateGrpcAsyncUnaryRequest(
base::BindOnce(&HelloService::Stub::AsyncSayHello,
base::BindOnce(&HelloService::StubInterface::AsyncSayHello,
base::Unretained(stub_.get())),
request,
base::BindOnce(&MyClass::OnHelloResult,
Expand All @@ -37,7 +37,7 @@ class MyClass {
void StartHelloStream() {
StreamHelloRequest request;
auto grpc_Request = CreateGrpcAsyncServerStreamingRequest(
base::BindOnce(&HelloService::Stub::AsyncStreamHello,
base::BindOnce(&HelloService::StubInterface::AsyncStreamHello,
base::Unretained(stub_.get())),
request,
base::BindRepeating(&MyClass::OnHelloStreamMessage,
Expand Down
18 changes: 10 additions & 8 deletions remoting/base/grpc_support/grpc_async_executor_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ void GrpcAsyncExecutorTest::AsyncSendText(
EchoRequest request;
request.set_text(text);
auto grpc_request = CreateGrpcAsyncUnaryRequest(
base::BindOnce(&GrpcAsyncExecutorTestService::Stub::AsyncEcho,
base::BindOnce(&GrpcAsyncExecutorTestService::StubInterface::AsyncEcho,
base::Unretained(stub_.get())),
request, std::move(callback));
executor_->ExecuteRpc(std::move(grpc_request));
Expand All @@ -143,8 +143,9 @@ GrpcAsyncExecutorTest::StartEchoStreamOnExecutor(
request.set_text(request_text);
std::unique_ptr<ScopedGrpcServerStream> scoped_stream;
auto grpc_request = CreateGrpcAsyncServerStreamingRequest(
base::BindOnce(&GrpcAsyncExecutorTestService::Stub::AsyncStreamEcho,
base::Unretained(stub_.get())),
base::BindOnce(
&GrpcAsyncExecutorTestService::StubInterface::AsyncStreamEcho,
base::Unretained(stub_.get())),
request, std::move(on_channel_ready), on_incoming_msg,
std::move(on_channel_closed), &scoped_stream);
if (!deadline.is_null()) {
Expand Down Expand Up @@ -286,7 +287,7 @@ TEST_F(GrpcAsyncExecutorTest, UnaryRpcCanceledBeforeExecution) {
EchoRequest request;
request.set_text("Hello 1");
auto grpc_request = CreateGrpcAsyncUnaryRequest(
base::BindOnce(&GrpcAsyncExecutorTestService::Stub::AsyncEcho,
base::BindOnce(&GrpcAsyncExecutorTestService::StubInterface::AsyncEcho,
base::Unretained(stub_.get())),
request, base::BindOnce([](const grpc::Status&, const EchoResponse&) {
NOTREACHED();
Expand Down Expand Up @@ -335,8 +336,9 @@ TEST_F(GrpcAsyncExecutorTest, ServerStreamingRpcCanceledBeforeExecution) {
request.set_text("Hello 1");
std::unique_ptr<ScopedGrpcServerStream> scoped_stream_1;
auto grpc_request = CreateGrpcAsyncServerStreamingRequest(
base::BindOnce(&GrpcAsyncExecutorTestService::Stub::AsyncStreamEcho,
base::Unretained(stub_.get())),
base::BindOnce(
&GrpcAsyncExecutorTestService::StubInterface::AsyncStreamEcho,
base::Unretained(stub_.get())),
request, NotReachedClosure(), NotReachedStreamingCallback(),
NotReachedStatusCallback(), &scoped_stream_1);
scoped_stream_1.reset();
Expand Down Expand Up @@ -591,7 +593,7 @@ TEST_F(GrpcAsyncExecutorTest, StreamWithTwoExecutors_VerifyNoInterference) {
TEST_F(GrpcAsyncExecutorTest, ExecuteWithoutDeadline_DefaultDeadlineSet) {
EchoRequest request;
auto grpc_request = CreateGrpcAsyncUnaryRequest(
base::BindOnce(&GrpcAsyncExecutorTestService::Stub::AsyncEcho,
base::BindOnce(&GrpcAsyncExecutorTestService::StubInterface::AsyncEcho,
base::Unretained(stub_.get())),
request, base::BindOnce([](const grpc::Status&, const EchoResponse&) {
NOTREACHED();
Expand All @@ -610,7 +612,7 @@ TEST_F(GrpcAsyncExecutorTest, ExecuteWithDeadline_DeadlineNotChanged) {
constexpr base::TimeDelta kDeadlineEpsilon = base::TimeDelta::FromSeconds(1);
EchoRequest request;
auto grpc_request = CreateGrpcAsyncUnaryRequest(
base::BindOnce(&GrpcAsyncExecutorTestService::Stub::AsyncEcho,
base::BindOnce(&GrpcAsyncExecutorTestService::StubInterface::AsyncEcho,
base::Unretained(stub_.get())),
request, base::BindOnce([](const grpc::Status&, const EchoResponse&) {
NOTREACHED();
Expand Down
18 changes: 8 additions & 10 deletions remoting/base/grpc_support/grpc_async_server_streaming_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
namespace remoting {

template <typename RequestType, typename ResponseType>
using GrpcAsyncServerStreamingRpcFunction =
base::OnceCallback<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
grpc::ClientContext*,
const RequestType&,
grpc_impl::CompletionQueue*,
void*)>;
using GrpcAsyncServerStreamingRpcFunction = base::OnceCallback<std::unique_ptr<
grpc::ClientAsyncReaderInterface<ResponseType>>(grpc::ClientContext*,
const RequestType&,
grpc_impl::CompletionQueue*,
void*)>;

// GrpcAsyncRequest implementation for server streaming call. The object is
// first enqueued for starting the stream, then kept being re-enqueued to
Expand Down Expand Up @@ -112,9 +111,8 @@ class GrpcAsyncServerStreamingRequest
using OnIncomingMessageCallback =
base::RepeatingCallback<void(const ResponseType&)>;
using StartAndCreateReaderCallback =
base::OnceCallback<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
grpc_impl::CompletionQueue* cq,
void* event_tag)>;
base::OnceCallback<std::unique_ptr<grpc::ClientAsyncReaderInterface<
ResponseType>>(grpc_impl::CompletionQueue* cq, void* event_tag)>;

~GrpcAsyncServerStreamingRequest() override = default;

Expand Down Expand Up @@ -177,7 +175,7 @@ class GrpcAsyncServerStreamingRequest

StartAndCreateReaderCallback create_reader_callback_;
ResponseType response_;
std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> reader_;
std::unique_ptr<grpc::ClientAsyncReaderInterface<ResponseType>> reader_;
OnIncomingMessageCallback on_incoming_msg_;

DISALLOW_COPY_AND_ASSIGN(GrpcAsyncServerStreamingRequest);
Expand Down
6 changes: 3 additions & 3 deletions remoting/base/grpc_support/grpc_async_unary_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace remoting {

template <typename RequestType, typename ResponseType>
using GrpcAsyncUnaryRpcFunction = base::OnceCallback<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
std::unique_ptr<grpc::ClientAsyncResponseReaderInterface<ResponseType>>(
grpc::ClientContext*,
const RequestType&,
grpc_impl::CompletionQueue*)>;
Expand All @@ -36,7 +36,7 @@ template <typename ResponseType>
class GrpcAsyncUnaryRequest : public GrpcAsyncRequest {
public:
using StartAndCreateReaderCallback = base::OnceCallback<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
std::unique_ptr<grpc::ClientAsyncResponseReaderInterface<ResponseType>>(
grpc_impl::CompletionQueue*)>;

~GrpcAsyncUnaryRequest() override = default;
Expand Down Expand Up @@ -101,7 +101,7 @@ class GrpcAsyncUnaryRequest : public GrpcAsyncRequest {

StartAndCreateReaderCallback create_reader_cb_;
RunTaskCallback run_task_cb_;
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
std::unique_ptr<grpc::ClientAsyncResponseReaderInterface<ResponseType>>
response_reader_;
ResponseType response_;
GrpcAsyncUnaryRpcCallback<ResponseType> callback_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ TEST_F(GrpcAuthenticatedExecutorTest, VerifyExecuteRpcCallIsForwarded) {
auto request = CreateGrpcAsyncUnaryRequest(
base::BindOnce([](grpc::ClientContext*, const EchoRequest&,
grpc_impl::CompletionQueue*) {
return std::unique_ptr<grpc::ClientAsyncResponseReader<EchoResponse>>();
return std::unique_ptr<
grpc::ClientAsyncResponseReaderInterface<EchoResponse>>();
}),
EchoRequest(),
base::DoNothing::Once<const grpc::Status&, const EchoResponse&>());
Expand All @@ -96,7 +97,8 @@ TEST_F(GrpcAuthenticatedExecutorTest, CancelAuthenticatingRpcAndSendNewOne) {
auto request = CreateGrpcAsyncUnaryRequest(
base::BindOnce([](grpc::ClientContext*, const EchoRequest&,
grpc_impl::CompletionQueue*) {
return std::unique_ptr<grpc::ClientAsyncResponseReader<EchoResponse>>();
return std::unique_ptr<
grpc::ClientAsyncResponseReaderInterface<EchoResponse>>();
}),
EchoRequest(),
base::DoNothing::Once<const grpc::Status&, const EchoResponse&>());
Expand All @@ -123,7 +125,8 @@ TEST_F(GrpcAuthenticatedExecutorTest, CancelAuthenticatingRpcAndSendNewOne) {
request = CreateGrpcAsyncUnaryRequest(
base::BindOnce([](grpc::ClientContext*, const EchoRequest&,
grpc_impl::CompletionQueue*) {
return std::unique_ptr<grpc::ClientAsyncResponseReader<EchoResponse>>();
return std::unique_ptr<
grpc::ClientAsyncResponseReaderInterface<EchoResponse>>();
}),
EchoRequest(),
base::DoNothing::Once<const grpc::Status&, const EchoResponse&>());
Expand Down
1 change: 1 addition & 0 deletions remoting/base/grpc_test_support/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ source_set("grpc_test_support") {
testonly = true

sources = [
"fake_client_async_response_reader.h",
"grpc_async_test_server.cc",
"grpc_async_test_server.h",
"grpc_test_server.h",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef REMOTING_BASE_GRPC_TEST_SUPPORT_FAKE_CLIENT_ASYNC_RESPONSE_READER_H_
#define REMOTING_BASE_GRPC_TEST_SUPPORT_FAKE_CLIENT_ASYNC_RESPONSE_READER_H_

#include <utility>

#include "base/callback.h"
#include "third_party/grpc/src/include/grpc/support/time.h"
#include "third_party/grpc/src/include/grpcpp/alarm.h"
#include "third_party/grpc/src/include/grpcpp/completion_queue.h"
#include "third_party/grpc/src/include/grpcpp/support/async_unary_call.h"
#include "third_party/grpc/src/include/grpcpp/support/status.h"

namespace remoting {
namespace test {

// Converts asynchronous stub calls to synchronous stub calls. Useful when
// creating mock StubInterface implementations: only the synchronous ops need to
// be mocked, while the async ops can return an instance of this class.
template <typename Response>
class FakeClientAsyncResponseReader
: public grpc::ClientAsyncResponseReaderInterface<Response> {
public:
using SynchronousOp = base::OnceCallback<grpc::Status(Response* response)>;
FakeClientAsyncResponseReader(SynchronousOp synchronous_op,
grpc::CompletionQueue* completion_queue,
bool start)
: synchronous_op_(std::move(synchronous_op)),
completion_queue_(completion_queue),
started_(start) {}

~FakeClientAsyncResponseReader() override = default;

void StartCall() override {
DCHECK(!started_);
started_ = true;
}

void ReadInitialMetadata(void* tag) override {
alarm_.Set(completion_queue_, gpr_now(GPR_CLOCK_MONOTONIC), tag);
}

void Finish(Response* msg, grpc::Status* status, void* tag) override {
DCHECK(started_);
*status = std::move(synchronous_op_).Run(msg);
alarm_.Set(completion_queue_, gpr_now(GPR_CLOCK_MONOTONIC), tag);
}

private:
SynchronousOp synchronous_op_;
grpc::CompletionQueue* completion_queue_;
grpc::Alarm alarm_;
bool started_;
};

} // namespace test
} // namespace remoting

#endif // REMOTING_BASE_GRPC_TEST_SUPPORT_FAKE_CLIENT_ASYNC_RESPONSE_READER_H_
29 changes: 22 additions & 7 deletions remoting/base/telemetry_log_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@ const int kMaxSendAttempts = 5;

TelemetryLogWriter::TelemetryLogWriter(
std::unique_ptr<OAuthTokenGetter> token_getter)
: token_getter_(std::move(token_getter)),
stub_(apis::v1::RemotingTelemetryService::NewStub(
CreateSslChannelForEndpoint(
ServiceUrls::GetInstance()->remoting_server_endpoint()))),
executor_(token_getter_.get()),
backoff_(&kBackoffPolicy) {
: TelemetryLogWriter(
std::move(token_getter),
apis::v1::RemotingTelemetryService::NewStub(
CreateSslChannelForEndpoint(
ServiceUrls::GetInstance()->remoting_server_endpoint()))) {
DETACH_FROM_THREAD(thread_checker_);
DCHECK(token_getter_);
}
Expand All @@ -72,6 +71,18 @@ void TelemetryLogWriter::Log(const ChromotingEvent& entry) {
SendPendingEntries();
}

TelemetryLogWriter::TelemetryLogWriter(
std::unique_ptr<OAuthTokenGetter> token_getter,
std::unique_ptr<apis::v1::RemotingTelemetryService::StubInterface> stub)
: token_getter_(std::move(token_getter)),
stub_(std::move(stub)),
executor_(token_getter_.get()),
backoff_(&kBackoffPolicy) {
DETACH_FROM_THREAD(thread_checker_);
DCHECK(token_getter_);
DCHECK(stub_);
}

void TelemetryLogWriter::SendPendingEntries() {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
if (!sending_entries_.empty() || pending_entries_.empty()) {
Expand All @@ -98,7 +109,7 @@ void TelemetryLogWriter::SendPendingEntries() {
void TelemetryLogWriter::DoSend(apis::v1::CreateEventRequest request) {
executor_.ExecuteRpc(CreateGrpcAsyncUnaryRequest(
base::BindOnce(
&apis::v1::RemotingTelemetryService::Stub::AsyncCreateEvent,
&apis::v1::RemotingTelemetryService::StubInterface::AsyncCreateEvent,
base::Unretained(stub_.get())),
request,
base::BindOnce(&TelemetryLogWriter::OnSendLogResult,
Expand Down Expand Up @@ -130,4 +141,8 @@ void TelemetryLogWriter::OnSendLogResult(
SendPendingEntries();
}

bool TelemetryLogWriter::IsIdleForTesting() {
return sending_entries_.empty() && pending_entries_.empty();
}

} // namespace remoting
13 changes: 11 additions & 2 deletions remoting/base/telemetry_log_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,31 @@ namespace remoting {
// unless otherwise noted.
class TelemetryLogWriter : public ChromotingEventLogWriter {
public:
TelemetryLogWriter(std::unique_ptr<OAuthTokenGetter> token_getter);
explicit TelemetryLogWriter(std::unique_ptr<OAuthTokenGetter> token_getter);

~TelemetryLogWriter() override;

// Push the log entry to the pending list and send out all the pending logs.
void Log(const ChromotingEvent& entry) override;

private:
// Used by unit tests to provide custom stub.
TelemetryLogWriter(
std::unique_ptr<OAuthTokenGetter> token_getter,
std::unique_ptr<apis::v1::RemotingTelemetryService::StubInterface> stub);

void SendPendingEntries();
void DoSend(apis::v1::CreateEventRequest request);
void OnSendLogResult(const grpc::Status& status,
const apis::v1::CreateEventResponse& response);

// Returns true if there are no events sending or pending.
bool IsIdleForTesting();

THREAD_CHECKER(thread_checker_);

std::unique_ptr<OAuthTokenGetter> token_getter_;
std::unique_ptr<apis::v1::RemotingTelemetryService::Stub> stub_;
std::unique_ptr<apis::v1::RemotingTelemetryService::StubInterface> stub_;
GrpcAuthenticatedExecutor executor_;
net::BackoffEntry backoff_;
base::OneShotTimer backoff_timer_;
Expand All @@ -60,6 +68,7 @@ class TelemetryLogWriter : public ChromotingEventLogWriter {
// These will be pushed back to pending_entries if error occurs.
base::circular_deque<ChromotingEvent> sending_entries_;

friend class TelemetryLogWriterTest;
DISALLOW_COPY_AND_ASSIGN(TelemetryLogWriter);
};

Expand Down
Loading

0 comments on commit 1ea0101

Please sign in to comment.