diff --git a/cartographer/sensor/collator_test.cc b/cartographer/sensor/collator_test.cc index f7b7cce..4b6c18e 100644 --- a/cartographer/sensor/collator_test.cc +++ b/cartographer/sensor/collator_test.cc @@ -31,6 +31,7 @@ namespace { TEST(Collator, Ordering) { const std::array kSensorId = { {"horizontal_laser", "vertical_laser", "imu", "odometry"}}; + Data zero(common::FromUniversal(0), sensor::LaserFan{}); Data first(common::FromUniversal(100), sensor::LaserFan{}); Data second(common::FromUniversal(200), sensor::LaserFan{}); Data third(common::FromUniversal(300), Data::Imu{}); @@ -47,6 +48,17 @@ TEST(Collator, Ordering) { }); constexpr int kTrajectoryId = 0; + + // Establish a common start time. + collator.AddSensorData(kTrajectoryId, kSensorId[0], + common::make_unique(zero)); + collator.AddSensorData(kTrajectoryId, kSensorId[1], + common::make_unique(zero)); + collator.AddSensorData(kTrajectoryId, kSensorId[2], + common::make_unique(zero)); + collator.AddSensorData(kTrajectoryId, kSensorId[3], + common::make_unique(zero)); + collator.AddSensorData(kTrajectoryId, kSensorId[0], common::make_unique(first)); collator.AddSensorData(kTrajectoryId, kSensorId[3], @@ -60,22 +72,22 @@ TEST(Collator, Ordering) { collator.AddSensorData(kTrajectoryId, kSensorId[2], common::make_unique(third)); - ASSERT_EQ(3, received.size()); - EXPECT_EQ(100, common::ToUniversal(received[0].second.time)); - EXPECT_EQ(kSensorId[0], received[0].first); - EXPECT_EQ(200, common::ToUniversal(received[1].second.time)); - EXPECT_EQ(kSensorId[1], received[1].first); - EXPECT_EQ(300, common::ToUniversal(received[2].second.time)); - EXPECT_EQ(kSensorId[2], received[2].first); + ASSERT_EQ(7, received.size()); + EXPECT_EQ(100, common::ToUniversal(received[4].second.time)); + EXPECT_EQ(kSensorId[0], received[4].first); + EXPECT_EQ(200, common::ToUniversal(received[5].second.time)); + EXPECT_EQ(kSensorId[1], received[5].first); + EXPECT_EQ(300, common::ToUniversal(received[6].second.time)); + EXPECT_EQ(kSensorId[2], received[6].first); collator.Flush(); - ASSERT_EQ(6, received.size()); - EXPECT_EQ(kSensorId[0], received[3].first); - EXPECT_EQ(500, common::ToUniversal(received[4].second.time)); - EXPECT_EQ(kSensorId[1], received[4].first); - EXPECT_EQ(600, common::ToUniversal(received[5].second.time)); - EXPECT_EQ(kSensorId[3], received[5].first); + ASSERT_EQ(10, received.size()); + EXPECT_EQ(kSensorId[0], received[7].first); + EXPECT_EQ(500, common::ToUniversal(received[8].second.time)); + EXPECT_EQ(kSensorId[1], received[8].first); + EXPECT_EQ(600, common::ToUniversal(received[9].second.time)); + EXPECT_EQ(kSensorId[3], received[9].first); } } // namespace diff --git a/cartographer/sensor/ordered_multi_queue.h b/cartographer/sensor/ordered_multi_queue.h index 2a7cc29..30d7092 100644 --- a/cartographer/sensor/ordered_multi_queue.h +++ b/cartographer/sensor/ordered_multi_queue.h @@ -32,7 +32,8 @@ namespace cartographer { namespace sensor { -// Number of items that can be queued up before we LOG(WARNING). +// Number of items that can be queued up before we log which queues are waiting +// for data. const int kMaxQueueSize = 500; struct QueueKey { @@ -78,10 +79,6 @@ class OrderedMultiQueue { Dispatch(); } - bool HasQueue(const QueueKey& queue_key) { - return queues_.count(queue_key) != 0; - } - void Add(const QueueKey& queue_key, std::unique_ptr data) { auto* queue = FindOrNull(queue_key); if (queue == nullptr) { @@ -151,7 +148,7 @@ class OrderedMultiQueue { next_data = data; next_queue = &it->second; } - CHECK_LE(last_dispatched_key_, next_data->time) + CHECK_LE(last_dispatched_time_, next_data->time) << "Non-sorted data added to queue: '" << it->first << "'"; ++it; } @@ -159,7 +156,24 @@ class OrderedMultiQueue { CHECK(queues_.empty()); return; } - last_dispatched_key_ = next_data->time; + + // If we haven't dispatched any data yet, fast forward all queues until a + // common start time has been reached. + if (common_start_time_ == common::Time::min()) { + for (auto& entry : queues_) { + common_start_time_ = + std::max(common_start_time_, entry.second.queue.Peek()->time); + } + LOG(INFO) << "All sensor data is available starting at '" + << common_start_time_ << "'."; + } + + if (next_data->time < common_start_time_) { + next_queue->queue.Pop(); + continue; + } + + last_dispatched_time_ = next_data->time; next_queue->callback(next_queue->queue.Pop()); } } @@ -186,7 +200,8 @@ class OrderedMultiQueue { } // Used to verify that values are dispatched in sorted order. - common::Time last_dispatched_key_ = common::Time::min(); + common::Time last_dispatched_time_ = common::Time::min(); + common::Time common_start_time_ = common::Time::min(); std::map queues_; }; diff --git a/cartographer/sensor/ordered_multi_queue_test.cc b/cartographer/sensor/ordered_multi_queue_test.cc index 85bd50b..874d2a6 100644 --- a/cartographer/sensor/ordered_multi_queue_test.cc +++ b/cartographer/sensor/ordered_multi_queue_test.cc @@ -27,15 +27,16 @@ namespace { class OrderedMultiQueueTest : public ::testing::Test { protected: - const QueueKey kFirst{1, "foo"}; - const QueueKey kSecond{1, "bar"}; - const QueueKey kThird{2, "bar"}; + // These are keys are chosen so that they sort first, second, third. + const QueueKey kFirst{1, "a"}; + const QueueKey kSecond{1, "b"}; + const QueueKey kThird{2, "b"}; void SetUp() { for (const auto& queue_key : {kFirst, kSecond, kThird}) { queue_.AddQueue(queue_key, [this](std::unique_ptr data) { if (!values_.empty()) { - EXPECT_GT(data->time, values_.back().time); + EXPECT_GE(data->time, values_.back().time); } values_.push_back(*data); }); @@ -53,23 +54,26 @@ class OrderedMultiQueueTest : public ::testing::Test { }; TEST_F(OrderedMultiQueueTest, Ordering) { + queue_.Add(kFirst, MakeImu(0)); queue_.Add(kFirst, MakeImu(4)); queue_.Add(kFirst, MakeImu(5)); queue_.Add(kFirst, MakeImu(6)); EXPECT_TRUE(values_.empty()); + queue_.Add(kSecond, MakeImu(0)); queue_.Add(kSecond, MakeImu(1)); EXPECT_TRUE(values_.empty()); + queue_.Add(kThird, MakeImu(0)); queue_.Add(kThird, MakeImu(2)); - EXPECT_EQ(values_.size(), 1); + EXPECT_EQ(values_.size(), 4); queue_.Add(kSecond, MakeImu(3)); - EXPECT_EQ(values_.size(), 2); + EXPECT_EQ(values_.size(), 5); queue_.Add(kSecond, MakeImu(7)); queue_.Add(kThird, MakeImu(8)); queue_.Flush(); - EXPECT_EQ(8, values_.size()); - for (size_t i = 0; i < values_.size(); ++i) { - EXPECT_EQ(i + 1, common::ToUniversal(values_[i].time)); + EXPECT_EQ(11, values_.size()); + for (size_t i = 0; i < values_.size() - 1; ++i) { + EXPECT_LE(values_[i].time, values_[i + 1].time); } }