memory2 tf service#2707
Conversation
Unify replay tf with the live service: StreamTF(MultiTBuffer, TFSpec) mirrors PubSubTF, pulling windows from a recorded tf stream on demand instead of receiving pushed messages. Lookups span buffer_size backward (or an explicit time_tolerance) plus forward_tolerance ahead; a cache miss prefetches cache_span past the query window and evicts everything first, so chronological replay costs one db query per cache_span. - TFLookup protocol (read side) + mypy conformance checks - get_pose hoisted from PubSubTF to TFSpec - MultiTBuffer: None tolerance resolves to buffer_size explicitly - map global: registration via tf stream (never Observation poses), --frame auto-detects world/map/odom via probe lookups, fail-fast when the cloud frame can't be resolved - tf lookup tests consolidated into a grid: live MultiTBuffer vs StreamTF over memory/sqlite stores
|
Preview deployment for your docs. Learn more about Mintlify Previews.
💡 Tip: Enable Workflows to automatically generate PRs for you. |
Codecov Report❌ Patch coverage is @@ Coverage Diff @@
## main #2707 +/- ##
==========================================
+ Coverage 72.05% 72.07% +0.01%
==========================================
Files 893 894 +1
Lines 80728 80867 +139
Branches 7310 7332 +22
==========================================
+ Hits 58165 58281 +116
- Misses 20614 20628 +14
- Partials 1949 1958 +9
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 3 files with indirect coverage changes 🚀 New features to boost your workflow:
|
Greptile SummaryThis PR introduces
Confidence Score: 4/5Safe to merge for recordings where the tf stream has no temporal gaps; recordings with tf gaps during flight may silently produce holes in the accumulated map due to the dedup/registration inconsistency. The dimos/mapping/utils/cli/map.py — specifically the interaction between Important Files Changed
Sequence Diagram%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant CLI as map.py (main)
participant STF as StreamTF
participant Store as Stream[TFMessage]
participant Acc as _accumulate
CLI->>Store: stream(lidar_stream, PointCloud2)
CLI->>STF: StreamTF.from_store(store)
CLI->>STF: get(world, cloud_frame, first_obs.ts) [probe]
STF->>STF: _ensure(tp-back, tp+fwd)
STF->>Store: stream.at(center, radius)
Store-->>STF: observations with TFMessages
STF->>STF: "receive_transform(*transforms)"
STF-->>CLI: Transform or None [probe result]
loop dedup frames
CLI->>STF: _position(obs) → tf lookup
STF-->>CLI: (x,y,z) or None
Note over CLI: fallback to obs.pose if tf=None
end
CLI->>Acc: "_accumulate(kept, register=_register)"
loop kept frames
Acc->>STF: register(obs) → buf.get(world, frame, ts)
STF-->>Acc: Transform or None
alt Transform available
Acc->>Acc: apply transform to cloud
else None
Acc->>Acc: skip frame (continue)
end
end
Acc-->>CLI: PointCloud2 (global map)
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant CLI as map.py (main)
participant STF as StreamTF
participant Store as Stream[TFMessage]
participant Acc as _accumulate
CLI->>Store: stream(lidar_stream, PointCloud2)
CLI->>STF: StreamTF.from_store(store)
CLI->>STF: get(world, cloud_frame, first_obs.ts) [probe]
STF->>STF: _ensure(tp-back, tp+fwd)
STF->>Store: stream.at(center, radius)
Store-->>STF: observations with TFMessages
STF->>STF: "receive_transform(*transforms)"
STF-->>CLI: Transform or None [probe result]
loop dedup frames
CLI->>STF: _position(obs) → tf lookup
STF-->>CLI: (x,y,z) or None
Note over CLI: fallback to obs.pose if tf=None
end
CLI->>Acc: "_accumulate(kept, register=_register)"
loop kept frames
Acc->>STF: register(obs) → buf.get(world, frame, ts)
STF-->>Acc: Transform or None
alt Transform available
Acc->>Acc: apply transform to cloud
else None
Acc->>Acc: skip frame (continue)
end
end
Acc-->>CLI: PointCloud2 (global map)
Reviews (2): Last reviewed commit: "Merge branch 'feat/ivan/memtf' of github..." | Re-trigger Greptile |
| if stream is not None: | ||
| kwargs["stream"] = stream | ||
| TFSpec.__init__(self, **kwargs) | ||
| MultiTBuffer.__init__(self, buffer_size=math.inf) |
There was a problem hiding this comment.
Why infinity? Is it wise to never evict? Was this meant to be `self.config.cache_span?
| def _load(self, lo: float, hi: float) -> None: | ||
| for obs in self.stream.at((lo + hi) / 2, (hi - lo) / 2): | ||
| self.receive_transform(*obs.data.transforms) | ||
| self._covered = (lo, hi) |
There was a problem hiding this comment.
Don't you need to use self._cv for modifying self._covered too?
|
|
||
| if TYPE_CHECKING: | ||
| # mypy conformance check: StreamTF satisfies the read-side tf protocol. | ||
| _lookup_impl: TFLookup = cast("StreamTF", None) |
There was a problem hiding this comment.
Is this used anywhere?
|
|
||
| if TYPE_CHECKING: | ||
| # mypy conformance checks: the live services satisfy the read-side protocol. | ||
| _lookup_impls: tuple[type[TFLookup], ...] = (MultiTBuffer, PubSubTF) |
There was a problem hiding this comment.
Is this used anywhere?
we converted recordings to sensor frame, mem now needs a tf service for global mapping