From 92fa1782f383da45ddcd87aa68a1047ee4c76259 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20Sch=C3=BCtte?= Date: Thu, 1 Feb 2018 22:31:33 +0100 Subject: [PATCH] Implement RetryStrategies and use for AddTrajectory (#880) --- cartographer_grpc/framework/client.h | 95 ++++++++++++------- cartographer_grpc/framework/retry.cc | 90 ++++++++++++++++++ cartographer_grpc/framework/retry.h | 51 ++++++++++ cartographer_grpc/mapping/map_builder_stub.cc | 5 +- 4 files changed, 207 insertions(+), 34 deletions(-) create mode 100644 cartographer_grpc/framework/retry.cc create mode 100644 cartographer_grpc/framework/retry.h diff --git a/cartographer_grpc/framework/client.h b/cartographer_grpc/framework/client.h index 3057073..52b3c63 100644 --- a/cartographer_grpc/framework/client.h +++ b/cartographer_grpc/framework/client.h @@ -17,6 +17,7 @@ #ifndef CARTOGRAPHER_GRPC_FRAMEWORK_CLIENT_H #define CARTOGRAPHER_GRPC_FRAMEWORK_CLIENT_H +#include "cartographer_grpc/framework/retry.h" #include "cartographer_grpc/framework/rpc_handler_interface.h" #include "cartographer_grpc/framework/type_traits.h" #include "grpc++/grpc++.h" @@ -29,8 +30,26 @@ namespace framework { template class Client { public: + Client(std::shared_ptr channel, RetryStrategy retry_strategy) + : channel_(channel), + client_context_( + cartographer::common::make_unique()), + rpc_method_name_( + RpcHandlerInterface::Instantiate()->method_name()), + rpc_method_(rpc_method_name_.c_str(), + RpcType::value, + channel_), + retry_strategy_(retry_strategy) { + CHECK(!retry_strategy || + rpc_method_.method_type() == ::grpc::internal::RpcMethod::NORMAL_RPC) + << "Retry is currently only support for NORMAL_RPC."; + } + Client(std::shared_ptr channel) : channel_(channel), + client_context_( + cartographer::common::make_unique()), rpc_method_name_( RpcHandlerInterface::Instantiate()->method_name()), rpc_method_(rpc_method_name_.c_str(), @@ -38,7 +57,7 @@ class Client { typename RpcHandlerType::OutgoingType>::value, channel_) {} - bool Read(typename RpcHandlerType::ResponseType* response) { + bool Read(typename RpcHandlerType::ResponseType *response) { switch (rpc_method_.method_type()) { case grpc::internal::RpcMethod::BIDI_STREAMING: InstantiateClientReaderWriterIfNeeded(); @@ -51,21 +70,11 @@ class Client { } } - bool Write(const typename RpcHandlerType::RequestType& request) { - switch (rpc_method_.method_type()) { - case grpc::internal::RpcMethod::NORMAL_RPC: - return MakeBlockingUnaryCall(request, &response_).ok(); - case grpc::internal::RpcMethod::CLIENT_STREAMING: - InstantiateClientWriterIfNeeded(); - return client_writer_->Write(request); - case grpc::internal::RpcMethod::BIDI_STREAMING: - InstantiateClientReaderWriterIfNeeded(); - return client_reader_writer_->Write(request); - case grpc::internal::RpcMethod::SERVER_STREAMING: - InstantiateClientReader(request); - return true; - } - LOG(FATAL) << "Not reached."; + bool Write(const typename RpcHandlerType::RequestType &request) { + return RetryWithStrategy( + retry_strategy_, + std::bind(&Client::WriteImpl, this, request), + std::bind(&Client::Reset, this)); } bool WritesDone() { @@ -97,7 +106,7 @@ class Client { } } - const typename RpcHandlerType::ResponseType& response() { + const typename RpcHandlerType::ResponseType &response() { CHECK(rpc_method_.method_type() == grpc::internal::RpcMethod::NORMAL_RPC || rpc_method_.method_type() == grpc::internal::RpcMethod::CLIENT_STREAMING); @@ -105,16 +114,36 @@ class Client { } private: + void Reset() { + client_context_ = cartographer::common::make_unique(); + } + + bool WriteImpl(const typename RpcHandlerType::RequestType &request) { + switch (rpc_method_.method_type()) { + case grpc::internal::RpcMethod::NORMAL_RPC: + return MakeBlockingUnaryCall(request, &response_).ok(); + case grpc::internal::RpcMethod::CLIENT_STREAMING: + InstantiateClientWriterIfNeeded(); + return client_writer_->Write(request); + case grpc::internal::RpcMethod::BIDI_STREAMING: + InstantiateClientReaderWriterIfNeeded(); + return client_reader_writer_->Write(request); + case grpc::internal::RpcMethod::SERVER_STREAMING: + InstantiateClientReader(request); + return true; + } + LOG(FATAL) << "Not reached."; + } + void InstantiateClientWriterIfNeeded() { CHECK_EQ(rpc_method_.method_type(), grpc::internal::RpcMethod::CLIENT_STREAMING); if (!client_writer_) { client_writer_.reset( - grpc::internal::ClientWriterFactory< - typename RpcHandlerType::RequestType>::Create(channel_.get(), - rpc_method_, - &client_context_, - &response_)); + grpc::internal:: + ClientWriterFactory::Create( + channel_.get(), rpc_method_, client_context_.get(), + &response_)); } } @@ -127,32 +156,31 @@ class Client { typename RpcHandlerType::RequestType, typename RpcHandlerType::ResponseType>::Create(channel_.get(), rpc_method_, - &client_context_)); + client_context_ + .get())); } } void InstantiateClientReader( - const typename RpcHandlerType::RequestType& request) { + const typename RpcHandlerType::RequestType &request) { CHECK_EQ(rpc_method_.method_type(), grpc::internal::RpcMethod::SERVER_STREAMING); client_reader_.reset( - grpc::internal::ClientReaderFactory< - typename RpcHandlerType::ResponseType>::Create(channel_.get(), - rpc_method_, - &client_context_, - request)); + grpc::internal:: + ClientReaderFactory::Create( + channel_.get(), rpc_method_, client_context_.get(), request)); } grpc::Status MakeBlockingUnaryCall( - const typename RpcHandlerType::RequestType& request, - typename RpcHandlerType::ResponseType* response) { + const typename RpcHandlerType::RequestType &request, + typename RpcHandlerType::ResponseType *response) { CHECK_EQ(rpc_method_.method_type(), grpc::internal::RpcMethod::NORMAL_RPC); return ::grpc::internal::BlockingUnaryCall( - channel_.get(), rpc_method_, &client_context_, request, response); + channel_.get(), rpc_method_, client_context_.get(), request, response); } std::shared_ptr channel_; - grpc::ClientContext client_context_; + std::unique_ptr client_context_; const std::string rpc_method_name_; const ::grpc::internal::RpcMethod rpc_method_; @@ -165,6 +193,7 @@ class Client { std::unique_ptr> client_reader_; typename RpcHandlerType::ResponseType response_; + RetryStrategy retry_strategy_; }; } // namespace framework diff --git a/cartographer_grpc/framework/retry.cc b/cartographer_grpc/framework/retry.cc new file mode 100644 index 0000000..dc962fe --- /dev/null +++ b/cartographer_grpc/framework/retry.cc @@ -0,0 +1,90 @@ +/* + * Copyright 2018 The Cartographer Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "cartographer_grpc/framework/retry.h" +#include "glog/logging.h" + +namespace cartographer_grpc { +namespace framework { + +RetryStrategy CreateRetryStrategy(RetryIndicator retry_indicator, + RetryDelayCalculator retry_delay_calculator) { + return [retry_indicator, retry_delay_calculator](int failed_attempts) { + if (!retry_indicator(failed_attempts)) { + return optional(); + } + return optional(retry_delay_calculator(failed_attempts)); + }; +} + +RetryIndicator CreateLimitedRetryIndicator(int max_attempts) { + return [max_attempts](int failed_attempts) { + return failed_attempts < max_attempts; + }; +} + +RetryDelayCalculator CreateBackoffDelayCalculator(Duration min_delay, + float backoff_factor) { + return [min_delay, backoff_factor](int failed_attempts) -> Duration { + CHECK_GE(failed_attempts, 0); + using cartographer::common::FromSeconds; + using cartographer::common::ToSeconds; + return FromSeconds(std::pow(backoff_factor, failed_attempts - 1) * + ToSeconds(min_delay)); + }; +} + +RetryStrategy CreateLimitedBackoffStrategy(Duration min_delay, + float backoff_factor, + int max_attempts) { + return CreateRetryStrategy( + CreateLimitedRetryIndicator(max_attempts), + CreateBackoffDelayCalculator(min_delay, backoff_factor)); +} + +bool RetryWithStrategy(RetryStrategy retry_strategy, std::function op, + std::function reset) { + optional delay; + int failed_attemps = 0; + for (;;) { + if (op()) { + return true; + } + if (!retry_strategy) { + return false; + } + delay = retry_strategy(++failed_attemps); + if (!delay.has_value()) { + break; + } + LOG(INFO) << "Retrying after " + << std::chrono::duration_cast( + delay.value()) + .count() + << " milliseconds."; + std::this_thread::sleep_for(delay.value()); + if (reset) { + reset(); + } + } + return false; +} + +} // namespace framework +} // namespace cartographer_grpc diff --git a/cartographer_grpc/framework/retry.h b/cartographer_grpc/framework/retry.h new file mode 100644 index 0000000..33182af --- /dev/null +++ b/cartographer_grpc/framework/retry.h @@ -0,0 +1,51 @@ +/* + * Copyright 2018 The Cartographer Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef CARTOGRAPHER_GRPC_FRAMEWORK_RETRY_H +#define CARTOGRAPHER_GRPC_FRAMEWORK_RETRY_H + +#include "cartographer/common/optional.h" +#include "cartographer/common/time.h" +#include "grpc++/grpc++.h" + +namespace cartographer_grpc { +namespace framework { + +using cartographer::common::Duration; +using cartographer::common::optional; + +using RetryStrategy = + std::function(int /* failed_attempts */)>; +using RetryIndicator = std::function; +using RetryDelayCalculator = std::function; + +RetryStrategy CreateRetryStrategy(RetryIndicator retry_indicator, + RetryDelayCalculator retry_delay_calculator); + +RetryIndicator CreateLimitedRetryIndicator(int max_attempts); +RetryDelayCalculator CreateBackoffDelayCalculator(Duration min_delay, + float backoff_factor); +RetryStrategy CreateLimitedBackoffStrategy(Duration min_delay, + float backoff_factor, + int max_attempts); + +bool RetryWithStrategy(RetryStrategy retry_strategy, std::function op, + std::function reset = nullptr); + +} // namespace framework +} // namespace cartographer_grpc + +#endif // CARTOGRAPHER_GRPC_FRAMEWORK_RETRY_H diff --git a/cartographer_grpc/mapping/map_builder_stub.cc b/cartographer_grpc/mapping/map_builder_stub.cc index 52af0ef..1b021e2 100644 --- a/cartographer_grpc/mapping/map_builder_stub.cc +++ b/cartographer_grpc/mapping/map_builder_stub.cc @@ -42,7 +42,10 @@ int MapBuilderStub::AddTrajectoryBuilder( for (const auto& sensor_id : expected_sensor_ids) { *request.add_expected_sensor_ids() = sensor::ToProto(sensor_id); } - framework::Client client(client_channel_); + framework::Client client( + client_channel_, + framework::CreateLimitedBackoffStrategy( + cartographer::common::FromMilliseconds(100), 2.f, 5)); CHECK(client.Write(request)); // Construct trajectory builder stub.