diff --git a/cartographer/mapping/map_builder.cc b/cartographer/mapping/map_builder.cc index 91c3e36..a42ad8e 100644 --- a/cartographer/mapping/map_builder.cc +++ b/cartographer/mapping/map_builder.cc @@ -108,6 +108,10 @@ void MapBuilder::FinishTrajectory(const int trajectory_id) { sensor_collator_.FinishTrajectory(trajectory_id); } +int MapBuilder::GetBlockingTrajectoryId() const { + return sensor_collator_.GetBlockingTrajectoryId(); +} + int MapBuilder::GetTrajectoryId(const Submaps* const trajectory) const { const auto trajectory_id = trajectory_ids_.find(trajectory); CHECK(trajectory_id != trajectory_ids_.end()); diff --git a/cartographer/mapping/map_builder.h b/cartographer/mapping/map_builder.h index 2f87ee3..f2c7e1d 100644 --- a/cartographer/mapping/map_builder.h +++ b/cartographer/mapping/map_builder.h @@ -67,6 +67,11 @@ class MapBuilder { // i.e. no further sensor data is expected. void FinishTrajectory(int trajectory_id); + // Must only be called if at least one unfinished trajectory exists. Returns + // the ID of the trajectory that needs more data before the MapBuilder is + // unblocked. + int GetBlockingTrajectoryId() const; + // Returns the trajectory ID for 'trajectory'. int GetTrajectoryId(const mapping::Submaps* trajectory) const; diff --git a/cartographer/sensor/collator.cc b/cartographer/sensor/collator.cc index 14d440d..0d62a65 100644 --- a/cartographer/sensor/collator.cc +++ b/cartographer/sensor/collator.cc @@ -46,5 +46,9 @@ void Collator::AddSensorData(const int trajectory_id, const string& sensor_id, void Collator::Flush() { queue_.Flush(); } +int Collator::GetBlockingTrajectoryId() const { + return queue_.GetBlocker().trajectory_id; +} + } // namespace sensor } // namespace cartographer diff --git a/cartographer/sensor/collator.h b/cartographer/sensor/collator.h index 627aaef..b159a15 100644 --- a/cartographer/sensor/collator.h +++ b/cartographer/sensor/collator.h @@ -57,6 +57,11 @@ class Collator { // AddSensorData may not be called after Flush. void Flush(); + // Must only be called if at least one unfinished trajectory exists. Returns + // the ID of the trajectory that needs more data before the Collator is + // unblocked. + int GetBlockingTrajectoryId() const; + private: // Queue keys are a pair of trajectory ID and sensor identifier. OrderedMultiQueue queue_; diff --git a/cartographer/sensor/ordered_multi_queue.cc b/cartographer/sensor/ordered_multi_queue.cc index ad6ce33..fc0a28b 100644 --- a/cartographer/sensor/ordered_multi_queue.cc +++ b/cartographer/sensor/ordered_multi_queue.cc @@ -32,12 +32,12 @@ namespace { // for data. const int kMaxQueueSize = 500; +} // namespace + inline std::ostream& operator<<(std::ostream& out, const QueueKey& key) { return out << '(' << key.trajectory_id << ", " << key.sensor_id << ')'; } -} // namespace - OrderedMultiQueue::OrderedMultiQueue() {} OrderedMultiQueue::~OrderedMultiQueue() { @@ -84,11 +84,16 @@ void OrderedMultiQueue::Flush() { } } +QueueKey OrderedMultiQueue::GetBlocker() const { + CHECK(!queues_.empty()); + return blocker_; +} + void OrderedMultiQueue::Dispatch() { while (true) { - Queue* next_queue = nullptr; const Data* next_data = nullptr; - int next_trajectory_id = -1; + Queue* next_queue = nullptr; + QueueKey next_queue_key; for (auto it = queues_.begin(); it != queues_.end();) { const auto* data = it->second.queue.Peek(); if (data == nullptr) { @@ -96,13 +101,13 @@ void OrderedMultiQueue::Dispatch() { queues_.erase(it++); continue; } - CannotMakeProgress(); + CannotMakeProgress(it->first); return; } if (next_data == nullptr || data->time < next_data->time) { next_data = data; next_queue = &it->second; - next_trajectory_id = it->first.trajectory_id; + next_queue_key = it->first; } CHECK_LE(last_dispatched_time_, next_data->time) << "Non-sorted data added to queue: '" << it->first << "'"; @@ -116,7 +121,7 @@ void OrderedMultiQueue::Dispatch() { // If we haven't dispatched any data for this trajectory yet, fast forward // all queues of this trajectory until a common start time has been reached. const common::Time common_start_time = - GetCommonStartTime(next_trajectory_id); + GetCommonStartTime(next_queue_key.trajectory_id); if (next_data->time >= common_start_time) { // Happy case, we are beyond the 'common_start_time' already. @@ -125,6 +130,7 @@ void OrderedMultiQueue::Dispatch() { } else if (next_queue->queue.Size() < 2) { if (!next_queue->finished) { // We cannot decide whether to drop or dispatch this yet. + CannotMakeProgress(next_queue_key); return; } last_dispatched_time_ = next_data->time; @@ -142,26 +148,16 @@ void OrderedMultiQueue::Dispatch() { } } -void OrderedMultiQueue::CannotMakeProgress() { +void OrderedMultiQueue::CannotMakeProgress(const QueueKey& queue_key) { + blocker_ = queue_key; for (auto& entry : queues_) { if (entry.second.queue.Size() > kMaxQueueSize) { - LOG_EVERY_N(WARNING, 60) << "Queues waiting for data: " - << EmptyQueuesDebugString(); + LOG_EVERY_N(WARNING, 60) << "Queue waiting for data: " << queue_key; return; } } } -string OrderedMultiQueue::EmptyQueuesDebugString() { - std::ostringstream empty_queues; - for (auto& entry : queues_) { - if (entry.second.queue.Size() == 0) { - empty_queues << (empty_queues.tellp() > 0 ? ", " : "") << entry.first; - } - } - return empty_queues.str(); -} - common::Time OrderedMultiQueue::GetCommonStartTime(const int trajectory_id) { auto emplace_result = common_start_time_per_trajectory_.emplace( trajectory_id, common::Time::min()); diff --git a/cartographer/sensor/ordered_multi_queue.h b/cartographer/sensor/ordered_multi_queue.h index 57f75d5..2cf83d2 100644 --- a/cartographer/sensor/ordered_multi_queue.h +++ b/cartographer/sensor/ordered_multi_queue.h @@ -69,6 +69,11 @@ class OrderedMultiQueue { // queues. void Flush(); + // Must only be called if at least one unfinished queue exists. Returns the + // key of a queue that needs more data before the OrderedMultiQueue can + // dispatch data. + QueueKey GetBlocker() const; + private: struct Queue { common::BlockingQueue> queue; @@ -77,8 +82,7 @@ class OrderedMultiQueue { }; void Dispatch(); - void CannotMakeProgress(); - string EmptyQueuesDebugString(); + void CannotMakeProgress(const QueueKey& queue_key); common::Time GetCommonStartTime(int trajectory_id); // Used to verify that values are dispatched in sorted order. @@ -86,6 +90,7 @@ class OrderedMultiQueue { std::map common_start_time_per_trajectory_; std::map queues_; + QueueKey blocker_; }; } // namespace sensor