Added tests to convert Hybrid BN to corresponding "likelihood" FG
parent
d4ee6997f7
commit
2d688a1986
|
@ -13,73 +13,152 @@ Author: Fan Jiang
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from gtsam.symbol_shorthand import C, X
|
from gtsam.symbol_shorthand import C, M, X, Z
|
||||||
from gtsam.utils.test_case import GtsamTestCase
|
from gtsam.utils.test_case import GtsamTestCase
|
||||||
|
|
||||||
import gtsam
|
import gtsam
|
||||||
|
from gtsam import (
|
||||||
|
DecisionTreeFactor,
|
||||||
|
DiscreteConditional,
|
||||||
|
DiscreteKeys,
|
||||||
|
GaussianConditional,
|
||||||
|
GaussianMixture,
|
||||||
|
GaussianMixtureFactor,
|
||||||
|
HybridGaussianFactorGraph,
|
||||||
|
JacobianFactor,
|
||||||
|
Ordering,
|
||||||
|
noiseModel,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestHybridGaussianFactorGraph(GtsamTestCase):
|
class TestHybridGaussianFactorGraph(GtsamTestCase):
|
||||||
"""Unit tests for HybridGaussianFactorGraph."""
|
"""Unit tests for HybridGaussianFactorGraph."""
|
||||||
|
|
||||||
def test_create(self):
|
def test_create(self):
|
||||||
"""Test construction of hybrid factor graph."""
|
"""Test construction of hybrid factor graph."""
|
||||||
noiseModel = gtsam.noiseModel.Unit.Create(3)
|
model = noiseModel.Unit.Create(3)
|
||||||
dk = gtsam.DiscreteKeys()
|
dk = DiscreteKeys()
|
||||||
dk.push_back((C(0), 2))
|
dk.push_back((C(0), 2))
|
||||||
|
|
||||||
jf1 = gtsam.JacobianFactor(X(0), np.eye(3), np.zeros((3, 1)),
|
jf1 = JacobianFactor(X(0), np.eye(3), np.zeros((3, 1)), model)
|
||||||
noiseModel)
|
jf2 = JacobianFactor(X(0), np.eye(3), np.ones((3, 1)), model)
|
||||||
jf2 = gtsam.JacobianFactor(X(0), np.eye(3), np.ones((3, 1)),
|
|
||||||
noiseModel)
|
|
||||||
|
|
||||||
gmf = gtsam.GaussianMixtureFactor.FromFactors([X(0)], dk, [jf1, jf2])
|
gmf = GaussianMixtureFactor([X(0)], dk, [jf1, jf2])
|
||||||
|
|
||||||
hfg = gtsam.HybridGaussianFactorGraph()
|
hfg = HybridGaussianFactorGraph()
|
||||||
hfg.add(jf1)
|
hfg.push_back(jf1)
|
||||||
hfg.add(jf2)
|
hfg.push_back(jf2)
|
||||||
hfg.push_back(gmf)
|
hfg.push_back(gmf)
|
||||||
|
|
||||||
hbn = hfg.eliminateSequential(
|
hbn = hfg.eliminateSequential(
|
||||||
gtsam.Ordering.ColamdConstrainedLastHybridGaussianFactorGraph(
|
Ordering.ColamdConstrainedLastHybridGaussianFactorGraph(hfg, [C(0)])
|
||||||
hfg, [C(0)]))
|
)
|
||||||
|
|
||||||
self.assertEqual(hbn.size(), 2)
|
self.assertEqual(hbn.size(), 2)
|
||||||
|
|
||||||
mixture = hbn.at(0).inner()
|
mixture = hbn.at(0).inner()
|
||||||
self.assertIsInstance(mixture, gtsam.GaussianMixture)
|
self.assertIsInstance(mixture, GaussianMixture)
|
||||||
self.assertEqual(len(mixture.keys()), 2)
|
self.assertEqual(len(mixture.keys()), 2)
|
||||||
|
|
||||||
discrete_conditional = hbn.at(hbn.size() - 1).inner()
|
discrete_conditional = hbn.at(hbn.size() - 1).inner()
|
||||||
self.assertIsInstance(discrete_conditional, gtsam.DiscreteConditional)
|
self.assertIsInstance(discrete_conditional, DiscreteConditional)
|
||||||
|
|
||||||
def test_optimize(self):
|
def test_optimize(self):
|
||||||
"""Test construction of hybrid factor graph."""
|
"""Test construction of hybrid factor graph."""
|
||||||
noiseModel = gtsam.noiseModel.Unit.Create(3)
|
model = noiseModel.Unit.Create(3)
|
||||||
dk = gtsam.DiscreteKeys()
|
dk = DiscreteKeys()
|
||||||
dk.push_back((C(0), 2))
|
dk.push_back((C(0), 2))
|
||||||
|
|
||||||
jf1 = gtsam.JacobianFactor(X(0), np.eye(3), np.zeros((3, 1)),
|
jf1 = JacobianFactor(X(0), np.eye(3), np.zeros((3, 1)), model)
|
||||||
noiseModel)
|
jf2 = JacobianFactor(X(0), np.eye(3), np.ones((3, 1)), model)
|
||||||
jf2 = gtsam.JacobianFactor(X(0), np.eye(3), np.ones((3, 1)),
|
|
||||||
noiseModel)
|
|
||||||
|
|
||||||
gmf = gtsam.GaussianMixtureFactor.FromFactors([X(0)], dk, [jf1, jf2])
|
gmf = GaussianMixtureFactor([X(0)], dk, [jf1, jf2])
|
||||||
|
|
||||||
hfg = gtsam.HybridGaussianFactorGraph()
|
hfg = HybridGaussianFactorGraph()
|
||||||
hfg.add(jf1)
|
hfg.push_back(jf1)
|
||||||
hfg.add(jf2)
|
hfg.push_back(jf2)
|
||||||
hfg.push_back(gmf)
|
hfg.push_back(gmf)
|
||||||
|
|
||||||
dtf = gtsam.DecisionTreeFactor([(C(0), 2)], "0 1")
|
dtf = gtsam.DecisionTreeFactor([(C(0), 2)], "0 1")
|
||||||
hfg.add(dtf)
|
hfg.push_back(dtf)
|
||||||
|
|
||||||
hbn = hfg.eliminateSequential(
|
hbn = hfg.eliminateSequential(
|
||||||
gtsam.Ordering.ColamdConstrainedLastHybridGaussianFactorGraph(
|
Ordering.ColamdConstrainedLastHybridGaussianFactorGraph(hfg, [C(0)])
|
||||||
hfg, [C(0)]))
|
)
|
||||||
|
|
||||||
hv = hbn.optimize()
|
hv = hbn.optimize()
|
||||||
self.assertEqual(hv.atDiscrete(C(0)), 1)
|
self.assertEqual(hv.atDiscrete(C(0)), 1)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def tiny(num_measurements: int = 1):
|
||||||
|
"""Create a tiny two variable hybrid model."""
|
||||||
|
# Create hybrid Bayes net.
|
||||||
|
bayesNet = gtsam.HybridBayesNet()
|
||||||
|
|
||||||
|
# Create mode key: 0 is low-noise, 1 is high-noise.
|
||||||
|
modeKey = M(0)
|
||||||
|
mode = (modeKey, 2)
|
||||||
|
|
||||||
|
# Create Gaussian mixture Z(0) = X(0) + noise for each measurement.
|
||||||
|
I = np.eye(1)
|
||||||
|
keys = DiscreteKeys()
|
||||||
|
keys.push_back(mode)
|
||||||
|
for i in range(num_measurements):
|
||||||
|
conditional0 = GaussianConditional.FromMeanAndStddev(
|
||||||
|
Z(i), I, X(0), [0], sigma=0.5
|
||||||
|
)
|
||||||
|
conditional1 = GaussianConditional.FromMeanAndStddev(
|
||||||
|
Z(i), I, X(0), [0], sigma=3
|
||||||
|
)
|
||||||
|
bayesNet.emplaceMixture([Z(i)], [X(0)], keys, [conditional0, conditional1])
|
||||||
|
|
||||||
|
# Create prior on X(0).
|
||||||
|
prior_on_x0 = GaussianConditional.FromMeanAndStddev(X(0), [5.0], 5.0)
|
||||||
|
bayesNet.addGaussian(prior_on_x0)
|
||||||
|
|
||||||
|
# Add prior on mode.
|
||||||
|
bayesNet.emplaceDiscrete(mode, "1/1")
|
||||||
|
|
||||||
|
return bayesNet
|
||||||
|
|
||||||
|
def test_tiny(self):
|
||||||
|
"""Test a tiny two variable hybrid model."""
|
||||||
|
bayesNet = self.tiny()
|
||||||
|
sample = bayesNet.sample()
|
||||||
|
# print(sample)
|
||||||
|
|
||||||
|
# Create a factor graph from the Bayes net with sampled measurements.
|
||||||
|
fg = HybridGaussianFactorGraph()
|
||||||
|
conditional = bayesNet.atMixture(0)
|
||||||
|
measurement = gtsam.VectorValues()
|
||||||
|
measurement.insert(Z(0), sample.at(Z(0)))
|
||||||
|
factor = conditional.likelihood(measurement)
|
||||||
|
fg.push_back(factor)
|
||||||
|
fg.push_back(bayesNet.atGaussian(1))
|
||||||
|
fg.push_back(bayesNet.atDiscrete(2))
|
||||||
|
|
||||||
|
self.assertEqual(fg.size(), 3)
|
||||||
|
|
||||||
|
def test_tiny2(self):
|
||||||
|
"""Test a tiny two variable hybrid model, with 2 measurements."""
|
||||||
|
# Create the Bayes net and sample from it.
|
||||||
|
bayesNet = self.tiny(num_measurements=2)
|
||||||
|
sample = bayesNet.sample()
|
||||||
|
# print(sample)
|
||||||
|
|
||||||
|
# Create a factor graph from the Bayes net with sampled measurements.
|
||||||
|
fg = HybridGaussianFactorGraph()
|
||||||
|
for i in range(2):
|
||||||
|
conditional = bayesNet.atMixture(i)
|
||||||
|
measurement = gtsam.VectorValues()
|
||||||
|
measurement.insert(Z(i), sample.at(Z(i)))
|
||||||
|
factor = conditional.likelihood(measurement)
|
||||||
|
fg.push_back(factor)
|
||||||
|
fg.push_back(bayesNet.atGaussian(2))
|
||||||
|
fg.push_back(bayesNet.atDiscrete(3))
|
||||||
|
|
||||||
|
self.assertEqual(fg.size(), 4)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
Loading…
Reference in New Issue