gsc_pgo: online and offline PGO #2587
Conversation
Convert the memory2 Recorder from thread/disposable rx subscriptions to manual async callbacks via process_observable, and let pose_setter_for methods be async (awaited in _resolve_pose). Update the fastlio and go2 recorders accordingly.
Raise TypeError at decoration time if a non-async function is decorated, and always await the setter in _resolve_pose.
…imos into jeff/fix/pose_setter_for
…kitti, voxel_map, module_loading)
process_observable gains an optional on_drop callback fired once per message dropped by the dispatcher's single-slot LATEST mailbox. The Recorder uses it to count dropped frames per stream and log a throttled warning, so a slow sink no longer loses data silently.
Greptile SummaryThis PR ports the PGO / loop-closure stack into the
Confidence Score: 3/5The default run of post_process.py fails immediately with ModuleNotFoundError due to the wrong import path for make_rrd; two prior-round bugs in DbTfSql (OperationalError on a tf-less store, IndexError on empty graph list) remain unresolved. post_process.py default invocation hits dimos/navigation/jnav/components/loop_closure/gsc_pgo/scripts/post_process.py (broken make_rrd import), dimos/memory2/db_tf_sql.py (two open bugs from prior review rounds) Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
subgraph Online["Online (live robot)"]
PGO["PGO NativeModule\n(C++ iSAM2 + PCL ICP)"]
DN["tf_deformation_nodes\nOut[DeformationNode]"]
TF["PubSubTF / LCMTF\nDeformationBuffer"]
PGO -->|"corrected keyframe poses"| DN
DN -->|"receive_deformation()"| TF
TF -->|"loop-closure-corrected\nget() / lookup()"| Consumers["Consumers\n(SemanticSearch, nav, etc.)"]
end
subgraph Offline["Offline (post_process.py)"]
PP["post_process.py\nStage 1: GTSAM tag PGO\nStage 2: ICP refinement"]
DeformStream["gt_tf_deformation_nodes\nstream in mem2.db"]
PP -->|"write raw + optimized\nkeyframe poses"| DeformStream
end
subgraph Memory2["memory2 / SqliteStore"]
DbTfSql["DbTfSql.get()\n(indexed SQL + deformation)"]
DeformStream -->|"_load_deformation_ids()\n_edge_delta()"| DbTfSql
DbTfSql -->|"loop-closure-corrected\nTransform"| Consumers2["Consumers\n(SemanticSearch, eval)"]
end
Recorder["Recorder\n(memory2.Recorder)"] -->|"tf stream\ntf_deformation_nodes stream"| SqliteDB["mem2.db\nSQLite"]
SqliteDB --> DbTfSql
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
flowchart TD
subgraph Online["Online (live robot)"]
PGO["PGO NativeModule\n(C++ iSAM2 + PCL ICP)"]
DN["tf_deformation_nodes\nOut[DeformationNode]"]
TF["PubSubTF / LCMTF\nDeformationBuffer"]
PGO -->|"corrected keyframe poses"| DN
DN -->|"receive_deformation()"| TF
TF -->|"loop-closure-corrected\nget() / lookup()"| Consumers["Consumers\n(SemanticSearch, nav, etc.)"]
end
subgraph Offline["Offline (post_process.py)"]
PP["post_process.py\nStage 1: GTSAM tag PGO\nStage 2: ICP refinement"]
DeformStream["gt_tf_deformation_nodes\nstream in mem2.db"]
PP -->|"write raw + optimized\nkeyframe poses"| DeformStream
end
subgraph Memory2["memory2 / SqliteStore"]
DbTfSql["DbTfSql.get()\n(indexed SQL + deformation)"]
DeformStream -->|"_load_deformation_ids()\n_edge_delta()"| DbTfSql
DbTfSql -->|"loop-closure-corrected\nTransform"| Consumers2["Consumers\n(SemanticSearch, eval)"]
end
Recorder["Recorder\n(memory2.Recorder)"] -->|"tf stream\ntf_deformation_nodes stream"| SqliteDB["mem2.db\nSQLite"]
SqliteDB --> DbTfSql
Reviews (9): Last reviewed commit: "test(memory2): skip DbTfSql tests when s..." | Re-trigger Greptile |
Codecov Report❌ Patch coverage is @@ Coverage Diff @@
## main #2587 +/- ##
==========================================
+ Coverage 71.85% 71.89% +0.04%
==========================================
Files 891 909 +18
Lines 80512 81939 +1427
Branches 7331 7487 +156
==========================================
+ Hits 57849 58909 +1060
- Misses 20726 21093 +367
Partials 1937 1937
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 1 file with indirect coverage changes 🚀 New features to boost your workflow:
|
…eff/feat/jnav_pgo
…ose cached stores
…ms) so add_april imports resolve
Replace per-frame static/bracket queries with a single UNION of per-frame LIMIT-1 subqueries joined to the blob store. Each subquery is a direct (child_frame, ts) range seek via a new composite index, so the warm lookup is one query (graph served from RAM) at the same wall-time as the per-row DbTf (~126us) instead of ~4 queries.
…k-row deep tree) + bench
Replace the per-row tf-tree DbTf with the graph-stream implementation (formerly DbTf2): topology change-log + child_frame-tagged rows, in-RAM graph cache with a query-per-lookup fallback, latched statics, and disjoint multi-robot graphs. - merge db_tf2.py into db_tf.py; DbTf2 -> DbTf; keep transform_matrix + write_tf_tree (now one-transform-per-row so the reader can tag/range-query it) and the non-sqlite RAM fallback. - recorder writes only tf_graph now (no per-row tree). - trim tests to four integration cases (interp vs full-load, latched static, mid-run re-parent, disjoint multi-robot); remove db_tf2 tests + bench scaffold.
The topology change-log was a raw sqlite side-table written directly by
TfGraphWriter. Promote it to a real memory2 stream ("<tf>_graph") so it's
discoverable via list_streams(), replayable, and uses the standard codec/blob
machinery like every other recorded stream.
- new TfGraph payload (recording-internal, pickle-codec); TfGraphWriter appends
snapshots via store.stream() instead of raw INSERTs.
- DbTf reads the graph through the stream API (count + order_by + time_range)
instead of raw SQL; migration build_graph_stream appends to the stream.
- recorder records the graph stream for all store types, not just sqlite.
- drop the raw-table helpers (ensure_graph_table/_graph_table).
The loop-closure eval's raw baseline (tag agreement, voxel re-anchoring, drift recovery) now composes the robot pose from the recording's tf (world->base_link, sampled at each odom time) when the recording has tf; recordings without tf fall back to the odom stream's stored poses. Adds --world-frame/--body-frame, records pose_source in the summary, and bumps EVAL_VERSION to invalidate cached cells.
# Conflicts: # dimos/hardware/sensors/lidar/fastlio2/recorder.py # dimos/memory2/module.py # dimos/robot/all_blueprints.py # dimos/robot/unitree/go2/blueprints/smart/unitree_go2.py
…igned mid360_link Keep the physical sensor pose (position + 44deg tilt) as mid360_mount, and add a counter-rotated mid360_link at the same position whose orientation is un-tilted (gravity-aligned) — the frame the gravity-anchored LIO scans live in. Preserves the physical mount geometry while giving a level data frame.
…360_link->mid360_gravity
- remove unused write_tf_tree; move transform_matrix into post_process (its only use) - drop GRAPH_STREAM_SUFFIX indirection (one '<tf>_graph' stream, name derived inline) - inline TfGraphWriter into the recorder and build_graph_stream into DbTf._ensure_built - name the pose-equality rounding constant; namespace the child-frame index name - fix: order graph snapshots by (ts, id) so same-timestamp topology changes resolve to the complete snapshot; raise the in-RAM cache threshold for fixed multi-frame rigs
Static mount frames are re-published onto the regular tf stream by the rig's
StaticTfPublisher (5Hz), so the recorder already captures them via the tf
subscription; the separate hardcoded Topic('/tf_static') had no publisher. Record
tf through the one tf stream; static-vs-dynamic is recovered offline by DbTf's
graph build (pose-constancy).
…ed LCM topic Add an optional tf_static: In[TFMessage] port (a future system publishes latched mount/extrinsic transforms there). Folded into the 'tf' stream + flagged static in tf_graph; excluded from the generic per-port recording. Subscribed only when wired (guards the unconnected transport), so it's a no-op until something publishes.
…rames) Add DeformationNode msg (id, tf_id=FNV-1a-64(frame_from|frame_to), pose) and a Recorder tf_deformation_nodes In-port recorded by default (no-op when unconnected, like tf_static). Basis for loop-closure-corrected tf at query time.
Autoconnects to the Recorder's tf_deformation_nodes In. Binary publishes to it once the pinned rev is bumped (gated on pushing the gsc_pgo C++ change).
For each edge in the resolved chain, hash (parent|child) -> tf_id; if the deformation stream has matching keyframes, take the ones bracketing the query time, compute each node's current ∘ inv(original) delta, linear-blend-skin between them, and apply on the parent side of that edge. Edges with no match (and recordings with no deformation stream) take the normal path at zero cost.
…nto jeff/feat/jnav_pgo
…rrection Split db_tf into DbTfLive (in-RAM buffer, all stores) and DbTfSql (indexed per-lookup queries + deformation correction, sqlite only). SemanticSearch now resolves a match's world pose at query time via store.tf so it reflects loop closure. PubSubTF gains a deformation_topic that corrects live tf.get from a tf_deformation_nodes stream. Recorder only clears streams whose source is connected (pcap_to_db no longer wipes an existing tf tree). Drop unused Marker msg.
Detect AprilTags over the camera stream when the raw tag stream is absent, so a fresh recording only needs post_process (derive distance/view-angle from the tag pose, defaulting unknown speeds to pass). Persist the PGO's keyframe deformation (gt_tf_deformation_nodes) and optimized pose_graph as real typed streams so a deformation-aware tf.get can replay the loop-closure correction offline.
eval.py's odom-pose baseline was replaced by tf_pose_samples, but eval_ground_truth_tag.py still imported the deleted odometry_pose7_lookup. Build the raw pose7 lookup from tf_pose_samples + pose7_lookup, mirroring evaluate(). Fixes the broken import (full-repo mypy).
| tf_tag = "json_extract(tags, '$.tf_id')" | ||
| conn = self._connection() | ||
| lo = conn.execute( | ||
| f"SELECT {id_tag}, ts FROM {stream} WHERE {tf_tag} = ? AND ts <= ? " | ||
| "ORDER BY ts DESC, id DESC LIMIT 1", | ||
| (str(tf_id), query_time), | ||
| ).fetchone() | ||
| hi = conn.execute( |
There was a problem hiding this comment.
_graph_at throws IndexError on an empty _graph_in_ram list
When _ensure_built() finds zero tf rows (empty tf table), it returns without building the graph stream. _load_graph_if_small() then sets _graph_in_ram = [] (because _graph_count() returns 0 and 0 < 64). The next _graph_at(t) call runs bisect.bisect_right([], t) - 1 = -1, falls into the index < 0 branch, and executes return self._graph_in_ram[0][1] — an IndexError on an empty list. The DbTf.get() protocol contract is to return None when no transform exists, not raise. The fix is to guard the "earliest" path: if not self._graph_in_ram: return None before the index access.
| (frame, kind) -> blob bytes.""" | ||
| cf = "json_extract(tags, '$.child_frame')" | ||
| # One UNION of per-frame, index-served LIMIT-1 subqueries: each is a direct | ||
| # (child_frame, ts) range seek — far cheaper than a window-function scan, and | ||
| # still a single round-trip. | ||
| parts: list[str] = [] | ||
| params: list[Any] = [] | ||
|
|
||
| def pick(frame: str, kind: str, where_ts: str, order: str) -> None: | ||
| parts.append( | ||
| f"SELECT ? AS cf, ? AS kind, " | ||
| f'(SELECT id FROM "tf" WHERE {cf} = ?{where_ts} ORDER BY ts {order} LIMIT 1) AS id' | ||
| ) | ||
| params.extend([frame, kind, frame]) | ||
|
|
||
| for frame in dynamic: | ||
| pick(frame, "lo", " AND ts <= ?", "DESC") | ||
| params.append(query_time) | ||
| pick(frame, "hi", " AND ts >= ?", "ASC") | ||
| params.append(query_time) |
There was a problem hiding this comment.
_ensure_built throws OperationalError when the tf table is absent
_ensure_built() immediately executes SELECT count(*) FROM "tf" without first checking whether the "tf" stream (and thus its table) actually exists in the recording. If a SqliteStore has no tf stream at all, this throws sqlite3.OperationalError: no such table: tf. By contrast, has_transforms() correctly guards with if "tf" not in set(self._store.list_streams()): return False. Any caller that invokes store.tf.get(...) on a tf-less recording — without first calling has_transforms() — gets an uncaught exception rather than the None the DbTf protocol promises.
test_db_tf.py instantiates SqliteStore directly (not via the sqlite_store fixture), so it bypassed conftest's _SKIP_SQLITE_VEC guard and failed on the ubuntu-24.04-arm runner (vec0.so wrong ELF class). Add a module-level skipif mirroring memory2/conftest.py so these 6 tests skip instead of fail there.
| if OPEN_RRD and WHAT in ("lidar", "both"): | ||
| import subprocess | ||
|
|
||
| from dimos.navigation.jnav.components.loop_closure.gsc_pgo import make_rrd |
There was a problem hiding this comment.
make_rrd lives at gsc_pgo/scripts/make_rrd.py, but this import resolves to gsc_pgo/make_rrd.py — a path that does not exist. Since OPEN_RRD is True and WHAT defaults to "both", the default invocation (python post_process.py --rec=...) will fail here with ModuleNotFoundError.
| from dimos.navigation.jnav.components.loop_closure.gsc_pgo import make_rrd | |
| from dimos.navigation.jnav.components.loop_closure.gsc_pgo.scripts import make_rrd |
gsc_pgo, Ports the PGO / loop-closure stack into the new
jnavlayout, plus a tf-tree feature formemory2stores.