From f3526bd2528eea91ad68bb9451cc2d5993296469 Mon Sep 17 00:00:00 2001 From: Wolfgang Hess Date: Tue, 6 Dec 2016 14:09:44 +0100 Subject: [PATCH] Move the OrderedMultiQueue implementation out of the header. (#152) --- .../mapping/trajectory_connectivity_test.cc | 20 +- cartographer/mapping_2d/ray_casting.cc | 5 +- .../scan_matching/interpolated_grid_test.cc | 9 +- .../scan_matching/precomputation_grid_test.cc | 7 +- cartographer/mapping_3d/sparse_pose_graph.cc | 16 +- cartographer/sensor/CMakeLists.txt | 3 + cartographer/sensor/ordered_multi_queue.cc | 173 ++++++++++++++++++ cartographer/sensor/ordered_multi_queue.h | 169 ++--------------- 8 files changed, 226 insertions(+), 176 deletions(-) create mode 100644 cartographer/sensor/ordered_multi_queue.cc diff --git a/cartographer/mapping/trajectory_connectivity_test.cc b/cartographer/mapping/trajectory_connectivity_test.cc index 7b2c780..15a7a75 100644 --- a/cartographer/mapping/trajectory_connectivity_test.cc +++ b/cartographer/mapping/trajectory_connectivity_test.cc @@ -110,10 +110,12 @@ TEST_F(TrajectoryConnectivityTest, ConnectedComponents) { five_cluster = &connections[0]; } for (int i = 0; i <= 9; ++i) { - EXPECT_EQ(i <= 4, std::find(zero_cluster->begin(), zero_cluster->end(), - trajectory(i)) != zero_cluster->end()); - EXPECT_EQ(i > 4, std::find(five_cluster->begin(), five_cluster->end(), - trajectory(i)) != five_cluster->end()); + EXPECT_EQ(i <= 4, + std::find(zero_cluster->begin(), zero_cluster->end(), + trajectory(i)) != zero_cluster->end()); + EXPECT_EQ(i > 4, + std::find(five_cluster->begin(), five_cluster->end(), + trajectory(i)) != five_cluster->end()); } } @@ -121,12 +123,14 @@ TEST_F(TrajectoryConnectivityTest, ConnectionCount) { for (int i = 0; i < 10; ++i) { trajectory_connectivity_.Connect(trajectory(0), trajectory(1)); // Permute the arguments to check invariance. - EXPECT_EQ(i + 1, trajectory_connectivity_.ConnectionCount(trajectory(1), - trajectory(0))); + EXPECT_EQ( + i + 1, + trajectory_connectivity_.ConnectionCount(trajectory(1), trajectory(0))); } for (int i = 1; i < 9; ++i) { - EXPECT_EQ(0, trajectory_connectivity_.ConnectionCount(trajectory(i), - trajectory(i + 1))); + EXPECT_EQ(0, + trajectory_connectivity_.ConnectionCount(trajectory(i), + trajectory(i + 1))); } } diff --git a/cartographer/mapping_2d/ray_casting.cc b/cartographer/mapping_2d/ray_casting.cc index e78c689..7131eec 100644 --- a/cartographer/mapping_2d/ray_casting.cc +++ b/cartographer/mapping_2d/ray_casting.cc @@ -175,8 +175,9 @@ void CastRays(const sensor::LaserFan& laser_fan, const MapLimits& limits, // Finally, compute and add empty rays based on missing echos in the scan. for (const Eigen::Vector3f& missing_echo : laser_fan.misses) { - CastRay(begin, superscaled_limits.GetXYIndexOfCellContainingPoint( - missing_echo.x(), missing_echo.y()), + CastRay(begin, + superscaled_limits.GetXYIndexOfCellContainingPoint( + missing_echo.x(), missing_echo.y()), miss_visitor); } } diff --git a/cartographer/mapping_3d/scan_matching/interpolated_grid_test.cc b/cartographer/mapping_3d/scan_matching/interpolated_grid_test.cc index e7c2478..8bd1f63 100644 --- a/cartographer/mapping_3d/scan_matching/interpolated_grid_test.cc +++ b/cartographer/mapping_3d/scan_matching/interpolated_grid_test.cc @@ -75,10 +75,11 @@ TEST_F(InterpolatedGridTest, MonotonicBehaviorBetweenGridPointsInX) { for (double sample = kSampleStep; sample < hybrid_grid_.resolution() - 2 * kSampleStep; sample += kSampleStep) { - EXPECT_LT(0., grid_difference * (interpolated_grid_.GetProbability( - x + sample + kSampleStep, y, z) - - interpolated_grid_.GetProbability( - x + sample, y, z))); + EXPECT_LT(0., + grid_difference * + (interpolated_grid_.GetProbability( + x + sample + kSampleStep, y, z) - + interpolated_grid_.GetProbability(x + sample, y, z))); } } } diff --git a/cartographer/mapping_3d/scan_matching/precomputation_grid_test.cc b/cartographer/mapping_3d/scan_matching/precomputation_grid_test.cc index 1ff78c7..894fa2f 100644 --- a/cartographer/mapping_3d/scan_matching/precomputation_grid_test.cc +++ b/cartographer/mapping_3d/scan_matching/precomputation_grid_test.cc @@ -61,9 +61,10 @@ TEST(PrecomputedGridGeneratorTest, TestAgainstNaiveAlgorithm) { for (int dx = 0; dx < width; ++dx) { for (int dy = 0; dy < width; ++dy) { for (int dz = 0; dz < width; ++dz) { - max_probability = std::max( - max_probability, hybrid_grid.GetProbability( - Eigen::Array3i(x + dx, y + dy, z + dz))); + max_probability = + std::max(max_probability, + hybrid_grid.GetProbability( + Eigen::Array3i(x + dx, y + dy, z + dz))); } } } diff --git a/cartographer/mapping_3d/sparse_pose_graph.cc b/cartographer/mapping_3d/sparse_pose_graph.cc index 2c8fd7a..a7dc1c2 100644 --- a/cartographer/mapping_3d/sparse_pose_graph.cc +++ b/cartographer/mapping_3d/sparse_pose_graph.cc @@ -214,14 +214,14 @@ void SparsePoseGraph::ComputeConstraintsForScan( // Unchanged covariance as (submap <- map) is a translation. const transform::Rigid3d constraint_transform = submap->local_pose().inverse() * pose; - constraints_.push_back( - Constraint{submap_index, - scan_index, - {constraint_transform, - common::ComputeSpdMatrixSqrtInverse( - covariance, options_.constraint_builder_options() - .lower_covariance_eigenvalue_bound())}, - Constraint::INTRA_SUBMAP}); + constraints_.push_back(Constraint{ + submap_index, + scan_index, + {constraint_transform, common::ComputeSpdMatrixSqrtInverse( + covariance, + options_.constraint_builder_options() + .lower_covariance_eigenvalue_bound())}, + Constraint::INTRA_SUBMAP}); } CHECK_LT(submap_states_.size(), std::numeric_limits::max()); diff --git a/cartographer/sensor/CMakeLists.txt b/cartographer/sensor/CMakeLists.txt index 74330df..cdb67f2 100644 --- a/cartographer/sensor/CMakeLists.txt +++ b/cartographer/sensor/CMakeLists.txt @@ -78,6 +78,9 @@ google_library(sensor_laser ) google_library(sensor_ordered_multi_queue + USES_GLOG + SRCS + ordered_multi_queue.cc HDRS ordered_multi_queue.h DEPENDS diff --git a/cartographer/sensor/ordered_multi_queue.cc b/cartographer/sensor/ordered_multi_queue.cc new file mode 100644 index 0000000..3b4f70f --- /dev/null +++ b/cartographer/sensor/ordered_multi_queue.cc @@ -0,0 +1,173 @@ +/* + * Copyright 2016 The Cartographer Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cartographer/sensor/ordered_multi_queue.h" + +#include +#include +#include + +#include "cartographer/common/make_unique.h" +#include "glog/logging.h" + +namespace cartographer { +namespace sensor { + +namespace { + +// Number of items that can be queued up before we log which queues are waiting +// for data. +const int kMaxQueueSize = 500; + +inline std::ostream& operator<<(std::ostream& out, const QueueKey& key) { + return out << '(' << key.trajectory_id << ", " << key.sensor_id << ')'; +} + +} // namespace + +OrderedMultiQueue::OrderedMultiQueue() {} + +OrderedMultiQueue::~OrderedMultiQueue() { + for (auto& entry : queues_) { + CHECK(entry.second.finished); + } +} + +void OrderedMultiQueue::AddQueue(const QueueKey& queue_key, Callback callback) { + CHECK_EQ(queues_.count(queue_key), 0); + queues_[queue_key].callback = callback; +} + +void OrderedMultiQueue::MarkQueueAsFinished(const QueueKey& queue_key) { + auto it = queues_.find(queue_key); + CHECK(it != queues_.end()) << "Did not find '" << queue_key << "'."; + auto& queue = it->second; + CHECK(!queue.finished); + queue.finished = true; + Dispatch(); +} + +void OrderedMultiQueue::Add(const QueueKey& queue_key, + std::unique_ptr data) { + auto it = queues_.find(queue_key); + if (it == queues_.end()) { + LOG_EVERY_N(WARNING, 1000) << "Ignored data for queue: '" << queue_key + << "'"; + return; + } + it->second.queue.Push(std::move(data)); + Dispatch(); +} + +void OrderedMultiQueue::Flush() { + std::vector unfinished_queues; + for (auto& entry : queues_) { + if (!entry.second.finished) { + unfinished_queues.push_back(entry.first); + } + } + for (auto& unfinished_queue : unfinished_queues) { + MarkQueueAsFinished(unfinished_queue); + } +} + +void OrderedMultiQueue::Dispatch() { + while (true) { + Queue* next_queue = nullptr; + const Data* next_data = nullptr; + for (auto it = queues_.begin(); it != queues_.end();) { + auto& queue = it->second.queue; + const auto* data = queue.Peek(); + if (data == nullptr) { + if (it->second.finished) { + queues_.erase(it++); + continue; + } + CannotMakeProgress(); + return; + } + if (next_data == nullptr || + std::forward_as_tuple(data->time, it->first) < + std::forward_as_tuple(next_data->time, it->first)) { + next_data = data; + next_queue = &it->second; + } + CHECK_LE(last_dispatched_time_, next_data->time) + << "Non-sorted data added to queue: '" << it->first << "'"; + ++it; + } + if (next_data == nullptr) { + CHECK(queues_.empty()); + return; + } + + // If we haven't dispatched any data yet, fast forward all queues until a + // common start time has been reached. + if (common_start_time_ == common::Time::min()) { + for (auto& entry : queues_) { + common_start_time_ = + std::max(common_start_time_, entry.second.queue.Peek()->time); + } + LOG(INFO) << "All sensor data is available starting at '" + << common_start_time_ << "'."; + } + + if (next_data->time >= common_start_time_) { + // Happy case, we are beyond the 'common_start_time_' already. + last_dispatched_time_ = next_data->time; + next_queue->callback(next_queue->queue.Pop()); + } else if (next_queue->queue.Size() < 2) { + if (!next_queue->finished) { + // We cannot decide whether to drop or dispatch this yet. + return; + } + last_dispatched_time_ = next_data->time; + next_queue->callback(next_queue->queue.Pop()); + } else { + // We take a peek at the time after next data. If it also is not beyond + // 'common_start_time_' we drop 'next_data', otherwise we just found the + // first packet to dispatch from this queue. + std::unique_ptr next_data_owner = next_queue->queue.Pop(); + if (next_queue->queue.Peek()->time > common_start_time_) { + last_dispatched_time_ = next_data->time; + next_queue->callback(std::move(next_data_owner)); + } + } + } +} + +void OrderedMultiQueue::CannotMakeProgress() { + for (auto& entry : queues_) { + if (entry.second.queue.Size() > kMaxQueueSize) { + LOG_EVERY_N(WARNING, 60) << "Queues waiting for data: " + << EmptyQueuesDebugString(); + return; + } + } +} + +string OrderedMultiQueue::EmptyQueuesDebugString() { + std::ostringstream empty_queues; + for (auto& entry : queues_) { + if (entry.second.queue.Size() == 0) { + empty_queues << (empty_queues.tellp() > 0 ? ", " : "") << entry.first; + } + } + return empty_queues.str(); +} + +} // namespace sensor +} // namespace cartographer diff --git a/cartographer/sensor/ordered_multi_queue.h b/cartographer/sensor/ordered_multi_queue.h index 08b4bc3..55b7481 100644 --- a/cartographer/sensor/ordered_multi_queue.h +++ b/cartographer/sensor/ordered_multi_queue.h @@ -17,14 +17,13 @@ #ifndef CARTOGRAPHER_SENSOR_ORDERED_MULTI_QUEUE_H_ #define CARTOGRAPHER_SENSOR_ORDERED_MULTI_QUEUE_H_ -#include +#include #include #include -#include -#include +#include +#include #include "cartographer/common/blocking_queue.h" -#include "cartographer/common/make_unique.h" #include "cartographer/common/port.h" #include "cartographer/common/time.h" #include "cartographer/sensor/data.h" @@ -32,10 +31,6 @@ namespace cartographer { namespace sensor { -// Number of items that can be queued up before we log which queues are waiting -// for data. -const int kMaxQueueSize = 500; - struct QueueKey { int trajectory_id; string sensor_id; @@ -46,10 +41,6 @@ struct QueueKey { } }; -inline std::ostream& operator<<(std::ostream& out, const QueueKey& key) { - return out << '(' << key.trajectory_id << ", " << key.sensor_id << ')'; -} - // Maintains multiple queues of sorted sensor data and dispatches it in merge // sorted order. It will wait to see at least one value for each unfinished // queue before dispatching the next time ordered value across all queues. @@ -59,50 +50,24 @@ class OrderedMultiQueue { public: using Callback = std::function)>; - OrderedMultiQueue() {} + OrderedMultiQueue(); + ~OrderedMultiQueue(); - ~OrderedMultiQueue() { - for (auto& entry : queues_) { - CHECK(entry.second.finished); - } - } + // Adds a new queue with key 'queue_key' which must not already exist. + // 'callback' will be called whenever data from this queue can be dispatched. + void AddQueue(const QueueKey& queue_key, Callback callback); - void AddQueue(const QueueKey& queue_key, Callback callback) { - CHECK(FindOrNull(queue_key) == nullptr); - queues_[queue_key].callback = callback; - } + // Marks a queue as finished, i.e. no further data can be added. The queue + // will be removed once the last piece of data from it has been dispatched. + void MarkQueueAsFinished(const QueueKey& queue_key); - void MarkQueueAsFinished(const QueueKey& queue_key) { - auto& queue = FindOrDie(queue_key); - CHECK(!queue.finished); - queue.finished = true; - Dispatch(); - } - - void Add(const QueueKey& queue_key, std::unique_ptr data) { - auto* queue = FindOrNull(queue_key); - if (queue == nullptr) { - LOG_EVERY_N(WARNING, 1000) << "Ignored data for queue: '" << queue_key - << "'"; - return; - } - queue->queue.Push(std::move(data)); - Dispatch(); - } + // Adds 'data' to a queue with the given 'queue_key'. Data must be added + // sorted per queue. + void Add(const QueueKey& queue_key, std::unique_ptr data); // Dispatches all remaining values in sorted order and removes the underlying // queues. - void Flush() { - std::vector unfinished_queues; - for (auto& entry : queues_) { - if (!entry.second.finished) { - unfinished_queues.push_back(entry.first); - } - } - for (auto& unfinished_queue : unfinished_queues) { - MarkQueueAsFinished(unfinished_queue); - } - } + void Flush(); private: struct Queue { @@ -111,107 +76,9 @@ class OrderedMultiQueue { bool finished = false; }; - // Returns the queue with 'key' or LOG(FATAL). - Queue& FindOrDie(const QueueKey& key) { - auto it = queues_.find(key); - CHECK(it != queues_.end()) << "Did not find '" << key << "'."; - return it->second; - } - - // Returns the queue with 'key' or nullptr. - Queue* FindOrNull(const QueueKey& key) { - auto it = queues_.find(key); - if (it == queues_.end()) { - return nullptr; - } - return &it->second; - } - - void Dispatch() { - while (true) { - Queue* next_queue = nullptr; - const Data* next_data = nullptr; - for (auto it = queues_.begin(); it != queues_.end();) { - auto& queue = it->second.queue; - const auto* data = queue.Peek(); - if (data == nullptr) { - if (it->second.finished) { - queues_.erase(it++); - continue; - } - CannotMakeProgress(); - return; - } - if (next_data == nullptr || - std::forward_as_tuple(data->time, it->first) < - std::forward_as_tuple(next_data->time, it->first)) { - next_data = data; - next_queue = &it->second; - } - CHECK_LE(last_dispatched_time_, next_data->time) - << "Non-sorted data added to queue: '" << it->first << "'"; - ++it; - } - if (next_data == nullptr) { - CHECK(queues_.empty()); - return; - } - - // If we haven't dispatched any data yet, fast forward all queues until a - // common start time has been reached. - if (common_start_time_ == common::Time::min()) { - for (auto& entry : queues_) { - common_start_time_ = std::max(common_start_time_, - entry.second.queue.Peek()->time); - } - LOG(INFO) << "All sensor data is available starting at '" - << common_start_time_ << "'."; - } - - if (next_data->time >= common_start_time_) { - // Happy case, we are beyond the 'common_start_time_' already. - last_dispatched_time_ = next_data->time; - next_queue->callback(next_queue->queue.Pop()); - } else if (next_queue->queue.Size() < 2) { - if (!next_queue->finished) { - // We cannot decide whether to drop or dispatch this yet. - return; - } - last_dispatched_time_ = next_data->time; - next_queue->callback(next_queue->queue.Pop()); - } else { - // We take a peek at the time after next data. If it also is not beyond - // 'common_start_time_' we drop 'next_data', otherwise we just found the - // first packet to dispatch from this queue. - std::unique_ptr next_data_owner = next_queue->queue.Pop(); - if (next_queue->queue.Peek()->time > common_start_time_) { - last_dispatched_time_ = next_data->time; - next_queue->callback(std::move(next_data_owner)); - } - } - } - } - - // Called when not all necessary queues are filled to dispatch messages. - void CannotMakeProgress() { - for (auto& entry : queues_) { - if (entry.second.queue.Size() > kMaxQueueSize) { - LOG_EVERY_N(WARNING, 60) << "Queues waiting for data: " - << EmptyQueuesDebugString(); - return; - } - } - } - - string EmptyQueuesDebugString() { - std::ostringstream empty_queues; - for (auto& entry : queues_) { - if (entry.second.queue.Size() == 0) { - empty_queues << (empty_queues.tellp() > 0 ? ", " : "") << entry.first; - } - } - return empty_queues.str(); - } + void Dispatch(); + void CannotMakeProgress(); + string EmptyQueuesDebugString(); // Used to verify that values are dispatched in sorted order. common::Time last_dispatched_time_ = common::Time::min();