From 7fa11dcde6975625e837242226ced0f427406b13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20Sch=C3=BCtte?= Date: Fri, 20 Jul 2018 00:09:44 +0200 Subject: [PATCH] Implement recovery behavior for LocalTrajectoryUploader (#1287) --- .../cloud/internal/client_server_test.cc | 54 ++++++++- .../handlers/add_trajectory_handler.cc | 13 +- .../handlers/add_trajectory_handler_test.cc | 3 +- .../internal/local_trajectory_uploader.cc | 112 +++++++++++++++--- .../internal/local_trajectory_uploader.h | 7 +- .../testing/mock_local_trajectory_uploader.h | 2 +- cartographer/common/blocking_queue.h | 12 ++ .../internal/2d/local_slam_result_2d.cc | 11 +- .../mapping/internal/submap_controller.cc | 12 +- .../mapping/internal/submap_controller.h | 4 +- 10 files changed, 201 insertions(+), 29 deletions(-) diff --git a/cartographer/cloud/internal/client_server_test.cc b/cartographer/cloud/internal/client_server_test.cc index 6a1fcbd..58c0ff8 100644 --- a/cartographer/cloud/internal/client_server_test.cc +++ b/cartographer/cloud/internal/client_server_test.cc @@ -185,10 +185,10 @@ class ClientServerTest : public ::testing::Test { } void WaitForLocalSlamResultUploads(size_t size) { - std::unique_lock lock(local_slam_result_upload_mutex_); - local_slam_result_upload_condition_.wait(lock, [&] { - return stub_->pose_graph()->GetTrajectoryNodePoses().size() >= size; - }); + while (stub_->pose_graph()->GetTrajectoryNodePoses().size() < size) { + LOG(INFO) << stub_->pose_graph()->GetTrajectoryNodePoses().size(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } } proto::MapBuilderServerOptions map_builder_server_options_; @@ -483,6 +483,52 @@ TEST_F(ClientServerTest, LocalSlam2DWithUploadingServer) { server_->Shutdown(); } +TEST_F(ClientServerTest, LocalSlam2DUplinkServerRestarting) { + InitializeRealServer(); + server_->Start(); + InitializeStub(); + InitializeRealUploadingServer(); + uploading_server_->Start(); + InitializeStubForUploadingServer(); + int trajectory_id = stub_for_uploading_server_->AddTrajectoryBuilder( + {kRangeSensorId}, trajectory_builder_options_, + local_slam_result_callback_); + TrajectoryBuilderInterface* trajectory_stub = + stub_for_uploading_server_->GetTrajectoryBuilder(trajectory_id); + const auto measurements = mapping::testing::GenerateFakeRangeMeasurements( + kTravelDistance, kDuration, kTimeStep); + + // Insert half of the measurements. + for (unsigned int i = 0; i < measurements.size() / 2; ++i) { + trajectory_stub->AddSensorData(kRangeSensorId.id, measurements.at(i)); + } + WaitForLocalSlamResults(measurements.size() / 2); + WaitForLocalSlamResultUploads(number_of_insertion_results_); + + // Simulate a cloud server restart. + LOG(INFO) << "Simulating server restart."; + constexpr int kUplinkTrajectoryId = 0; + stub_->FinishTrajectory(kUplinkTrajectoryId); + server_->Shutdown(); + server_->WaitForShutdown(); + InitializeRealServer(); + server_->Start(); + InitializeStub(); + + // Insert the second half of the measurements. + for (unsigned int i = measurements.size() / 2; i < measurements.size(); ++i) { + trajectory_stub->AddSensorData(kRangeSensorId.id, measurements.at(i)); + } + + WaitForLocalSlamResults(measurements.size() / 2); + WaitForLocalSlamResultUploads(2); + stub_for_uploading_server_->FinishTrajectory(trajectory_id); + uploading_server_->Shutdown(); + uploading_server_->WaitForShutdown(); + server_->Shutdown(); + server_->WaitForShutdown(); +} + TEST_F(ClientServerTest, LoadState) { InitializeRealServer(); server_->Start(); diff --git a/cartographer/cloud/internal/handlers/add_trajectory_handler.cc b/cartographer/cloud/internal/handlers/add_trajectory_handler.cc index a496c35..af072af 100644 --- a/cartographer/cloud/internal/handlers/add_trajectory_handler.cc +++ b/cartographer/cloud/internal/handlers/add_trajectory_handler.cc @@ -60,10 +60,15 @@ void AddTrajectoryHandler::OnRequest( // Ignore initial poses in trajectory_builder_options. trajectory_builder_options.clear_initial_trajectory_pose(); - GetContext() - ->local_trajectory_uploader() - ->AddTrajectory(request.client_id(), trajectory_id, expected_sensor_ids, - trajectory_builder_options); + if (!GetContext() + ->local_trajectory_uploader() + ->AddTrajectory(request.client_id(), trajectory_id, + expected_sensor_ids, trajectory_builder_options)) { + LOG(ERROR) << "Failed to create trajectory in uplink: " << trajectory_id; + Finish(::grpc::Status(::grpc::INTERNAL, + "Failed to create trajectory in uplink")); + return; + } } auto response = common::make_unique(); diff --git a/cartographer/cloud/internal/handlers/add_trajectory_handler_test.cc b/cartographer/cloud/internal/handlers/add_trajectory_handler_test.cc index efc5047..c6d785e 100644 --- a/cartographer/cloud/internal/handlers/add_trajectory_handler_test.cc +++ b/cartographer/cloud/internal/handlers/add_trajectory_handler_test.cc @@ -135,7 +135,8 @@ TEST_F(AddTrajectoryHandlerTest, WithLocalSlamUploader) { EXPECT_CALL(*mock_local_trajectory_uploader_, AddTrajectory(Eq("CLIENT_ID"), Eq(13), ParseSensorIds(request), Truly(testing::BuildProtoPredicateEquals( - &upstream_trajectory_builder_options)))); + &upstream_trajectory_builder_options)))) + .WillOnce(Return(13)); test_server_->SendWrite(request); EXPECT_EQ(test_server_->response().trajectory_id(), 13); } diff --git a/cartographer/cloud/internal/local_trajectory_uploader.cc b/cartographer/cloud/internal/local_trajectory_uploader.cc index 26a010c..7fbec27 100644 --- a/cartographer/cloud/internal/local_trajectory_uploader.cc +++ b/cartographer/cloud/internal/local_trajectory_uploader.cc @@ -40,7 +40,35 @@ constexpr int kConnectionTimeoutInSeconds = 10; constexpr int kTokenRefreshIntervalInSeconds = 60; const common::Duration kPopTimeout = common::FromMilliseconds(100); +// This defines the '::grpc::StatusCode's that are considered unrecoverable +// errors and hence no retries will be attempted by the client. +const std::set<::grpc::StatusCode> kUnrecoverableStatusCodes = { + ::grpc::NOT_FOUND}; + +bool IsNewSubmap(const mapping::proto::Submap& submap) { + return (submap.has_submap_2d() && submap.submap_2d().num_range_data() == 1) || + (submap.has_submap_3d() && submap.submap_3d().num_range_data() == 1); +} + class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { + public: + struct TrajectoryInfo { + TrajectoryInfo() = default; + TrajectoryInfo( + const int uplink_trajectory_id, + const std::set& expected_sensor_ids, + const mapping::proto::TrajectoryBuilderOptions& trajectory_options, + const std::string& client_id) + : uplink_trajectory_id(uplink_trajectory_id), + expected_sensor_ids(expected_sensor_ids), + trajectory_options(trajectory_options), + client_id(client_id) {} + const int uplink_trajectory_id; + const std::set expected_sensor_ids; + const mapping::proto::TrajectoryBuilderOptions trajectory_options; + const std::string client_id; + }; + public: LocalTrajectoryUploader(const std::string& uplink_server_address, int batch_size, bool enable_ssl_encryption, @@ -54,13 +82,14 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { // complete. void Shutdown() final; - void AddTrajectory( + bool AddTrajectory( const std::string& client_id, int local_trajectory_id, const std::set& expected_sensor_ids, const mapping::proto::TrajectoryBuilderOptions& trajectory_options) final; void FinishTrajectory(const std::string& client_id, int local_trajectory_id) final; void EnqueueSensorData(std::unique_ptr sensor_data) final; + void TryRecovery(); SensorId GetLocalSlamResultSensorId(int local_trajectory_id) const final { return SensorId{SensorId::SensorType::LOCAL_SLAM_RESULT, @@ -73,7 +102,7 @@ class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { std::shared_ptr<::grpc::Channel> client_channel_; int batch_size_; - std::map local_to_cloud_trajectory_id_map_; + std::map local_trajectory_id_to_trajectory_info_; common::BlockingQueue> send_queue_; bool shutting_down_ = false; std::unique_ptr upload_thread_; @@ -120,6 +149,39 @@ void LocalTrajectoryUploader::Shutdown() { upload_thread_->join(); } +void LocalTrajectoryUploader::TryRecovery() { + // Wind the sensor_data_queue forward to the next new submap. + while (true) { + proto::SensorData* sensor_data = + send_queue_.PeekWithTimeout(kPopTimeout); + if (sensor_data) { + CHECK_GE(sensor_data->local_slam_result_data().submaps_size(), 0); + if (sensor_data->sensor_data_case() == + proto::SensorData::kLocalSlamResultData && + sensor_data->local_slam_result_data().submaps_size() > 0 && + IsNewSubmap(sensor_data->local_slam_result_data().submaps( + sensor_data->local_slam_result_data().submaps_size() - 1))) { + break; + } else { + send_queue_.Pop(); + } + } + } + + // Attempt to recreate the trajectories. + const auto local_trajectory_id_to_trajectory_info = + local_trajectory_id_to_trajectory_info_; + local_trajectory_id_to_trajectory_info_.clear(); + for (const auto& entry : local_trajectory_id_to_trajectory_info) { + if (!AddTrajectory(entry.second.client_id, entry.first, + entry.second.expected_sensor_ids, + entry.second.trajectory_options)) { + LOG(ERROR) << "Failed to create trajectory. Aborting recovery attempt."; + return; + } + } +} + void LocalTrajectoryUploader::ProcessSendQueue() { LOG(INFO) << "Starting uploader thread."; proto::AddSensorDataBatchRequest batch_request; @@ -143,10 +205,18 @@ void LocalTrajectoryUploader::ProcessSendQueue() { if (batch_request.sensor_data_size() == batch_size_) { async_grpc::Client client( - client_channel_, async_grpc::CreateUnlimitedConstantDelayStrategy( - common::FromSeconds(1))); - CHECK(client.Write(batch_request)); + client_channel_, + async_grpc::CreateUnlimitedConstantDelayStrategy( + common::FromSeconds(1), kUnrecoverableStatusCodes)); + if (client.Write(batch_request)) { + LOG(INFO) << "Uploaded " << batch_request.ByteSize() + << " bytes of sensor data."; + batch_request.clear_sensor_data(); + continue; + } + // Unrecoverable error occurred. Attempt recovery. batch_request.clear_sensor_data(); + TryRecovery(); } } } @@ -154,12 +224,13 @@ void LocalTrajectoryUploader::ProcessSendQueue() { void LocalTrajectoryUploader::TranslateTrajectoryId( proto::SensorMetadata* sensor_metadata) { - int cloud_trajectory_id = - local_to_cloud_trajectory_id_map_.at(sensor_metadata->trajectory_id()); + int cloud_trajectory_id = local_trajectory_id_to_trajectory_info_ + .at(sensor_metadata->trajectory_id()) + .uplink_trajectory_id; sensor_metadata->set_trajectory_id(cloud_trajectory_id); } -void LocalTrajectoryUploader::AddTrajectory( +bool LocalTrajectoryUploader::AddTrajectory( const std::string& client_id, int local_trajectory_id, const std::set& expected_sensor_ids, const mapping::proto::TrajectoryBuilderOptions& trajectory_options) { @@ -176,17 +247,30 @@ void LocalTrajectoryUploader::AddTrajectory( cloud::ToProto(GetLocalSlamResultSensorId(local_trajectory_id)); async_grpc::Client client(client_channel_); ::grpc::Status status; - CHECK(client.Write(request, &status)) << status.error_message(); - CHECK_EQ(local_to_cloud_trajectory_id_map_.count(local_trajectory_id), 0); - local_to_cloud_trajectory_id_map_[local_trajectory_id] = - client.response().trajectory_id(); + if (!client.Write(request, &status)) { + LOG(ERROR) << status.error_message(); + return false; + } + LOG(INFO) << "Created trajectory for client_id: " << client_id + << " local trajectory_id: " << local_trajectory_id + << " uplink trajectory_id: " << client.response().trajectory_id(); + CHECK_EQ(local_trajectory_id_to_trajectory_info_.count(local_trajectory_id), + 0); + local_trajectory_id_to_trajectory_info_.emplace( + std::piecewise_construct, std::forward_as_tuple(local_trajectory_id), + std::forward_as_tuple(client.response().trajectory_id(), + expected_sensor_ids, trajectory_options, + client_id)); + return true; } void LocalTrajectoryUploader::FinishTrajectory(const std::string& client_id, int local_trajectory_id) { - CHECK_EQ(local_to_cloud_trajectory_id_map_.count(local_trajectory_id), 1); + CHECK_EQ(local_trajectory_id_to_trajectory_info_.count(local_trajectory_id), + 1); int cloud_trajectory_id = - local_to_cloud_trajectory_id_map_[local_trajectory_id]; + local_trajectory_id_to_trajectory_info_.at(local_trajectory_id) + .uplink_trajectory_id; proto::FinishTrajectoryRequest request; request.set_client_id(client_id); request.set_trajectory_id(cloud_trajectory_id); diff --git a/cartographer/cloud/internal/local_trajectory_uploader.h b/cartographer/cloud/internal/local_trajectory_uploader.h index 4c4a34c..f0cb6e2 100644 --- a/cartographer/cloud/internal/local_trajectory_uploader.h +++ b/cartographer/cloud/internal/local_trajectory_uploader.h @@ -44,13 +44,16 @@ class LocalTrajectoryUploaderInterface { // Enqueue an Add*DataRequest message to be uploaded. virtual void EnqueueSensorData( std::unique_ptr sensor_data) = 0; - virtual void AddTrajectory( + + // Creates a new trajectory with the specified settings in the uplink. A + // return value of 'false' indicates that the creation failed. + virtual bool AddTrajectory( const std::string& client_id, int local_trajectory_id, const std::set& expected_sensor_ids, const mapping::proto::TrajectoryBuilderOptions& trajectory_options) = 0; + virtual void FinishTrajectory(const std::string& client_id, int local_trajectory_id) = 0; - virtual SensorId GetLocalSlamResultSensorId( int local_trajectory_id) const = 0; }; diff --git a/cartographer/cloud/internal/testing/mock_local_trajectory_uploader.h b/cartographer/cloud/internal/testing/mock_local_trajectory_uploader.h index 7754d8b..af8bc98 100644 --- a/cartographer/cloud/internal/testing/mock_local_trajectory_uploader.h +++ b/cartographer/cloud/internal/testing/mock_local_trajectory_uploader.h @@ -36,7 +36,7 @@ class MockLocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { MOCK_METHOD0(Start, void()); MOCK_METHOD0(Shutdown, void()); MOCK_METHOD4(AddTrajectory, - void(const std::string &, int, const std::set &, + bool(const std::string &, int, const std::set &, const mapping::proto::TrajectoryBuilderOptions &)); MOCK_METHOD2(FinishTrajectory, void(const std::string &, int)); MOCK_CONST_METHOD1(GetLocalSlamResultSensorId, SensorId(int)); diff --git a/cartographer/common/blocking_queue.h b/cartographer/common/blocking_queue.h index 3987002..ad2a641 100644 --- a/cartographer/common/blocking_queue.h +++ b/cartographer/common/blocking_queue.h @@ -87,6 +87,18 @@ class BlockingQueue { return t; } + // Like Peek, but can timeout. Returns nullptr in this case. + template + R* PeekWithTimeout(const common::Duration timeout) { + MutexLocker lock(&mutex_); + if (!lock.AwaitWithTimeout( + [this]() REQUIRES(mutex_) { return !QueueEmptyCondition(); }, + timeout)) { + return nullptr; + } + return deque_.front().get(); + } + // Returns the next value in the queue or nullptr if the queue is empty. // Maintains ownership. This assumes a member function get() that returns // a pointer to the given type R. diff --git a/cartographer/mapping/internal/2d/local_slam_result_2d.cc b/cartographer/mapping/internal/2d/local_slam_result_2d.cc index bd7c93e..2acd48f 100644 --- a/cartographer/mapping/internal/2d/local_slam_result_2d.cc +++ b/cartographer/mapping/internal/2d/local_slam_result_2d.cc @@ -33,7 +33,16 @@ void LocalSlamResult2D::AddToPoseGraph(int trajectory_id, CHECK(local_slam_result_data_.submaps(0).has_submap_2d()); std::vector> submaps; for (const auto& submap_proto : local_slam_result_data_.submaps()) { - submaps.push_back(submap_controller_->UpdateSubmap(submap_proto)); + auto submap_ptr = submap_controller_->UpdateSubmap(submap_proto); + if (submap_ptr) { + submaps.push_back(submap_ptr); + } else { + LOG(INFO) << "Ignoring submap"; + } + } + if (submaps.size() == 0) { + LOG(INFO) << "Ignoring node"; + return; } static_cast(pose_graph) ->AddNode(std::make_shared( diff --git a/cartographer/mapping/internal/submap_controller.cc b/cartographer/mapping/internal/submap_controller.cc index 64d39c8..03e4f8e 100644 --- a/cartographer/mapping/internal/submap_controller.cc +++ b/cartographer/mapping/internal/submap_controller.cc @@ -23,6 +23,11 @@ template <> std::shared_ptr SubmapController::CreateSubmap( const mapping::proto::Submap& proto) { + if (proto.submap_2d().num_range_data() != 1) { + LOG(WARNING) << "Refusing to create partially filled submap: " + << proto.submap_2d().num_range_data(); + return nullptr; + } return std::make_shared(proto.submap_2d(), &conversion_tables_); } @@ -31,8 +36,13 @@ template <> std::shared_ptr SubmapController::CreateSubmap( const mapping::proto::Submap& proto) { + if (proto.submap_3d().num_range_data() != 1) { + LOG(INFO) << "Refusing to create partially filled submap: " + << proto.submap_3d().num_range_data(); + return nullptr; + } return std::make_shared(proto.submap_3d()); } } // namespace mapping -} // namespace cartographer \ No newline at end of file +} // namespace cartographer diff --git a/cartographer/mapping/internal/submap_controller.h b/cartographer/mapping/internal/submap_controller.h index c3ad51d..4efdc8e 100644 --- a/cartographer/mapping/internal/submap_controller.h +++ b/cartographer/mapping/internal/submap_controller.h @@ -36,7 +36,9 @@ class SubmapController { auto submap_it = unfinished_submaps_.find(submap_id); if (submap_it == unfinished_submaps_.end()) { submap_ptr = CreateSubmap(proto); - unfinished_submaps_.Insert(submap_id, submap_ptr); + if (submap_ptr) { + unfinished_submaps_.Insert(submap_id, submap_ptr); + } return submap_ptr; } submap_ptr = submap_it->data;