diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 8b866def4..051a7a8e8 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -552,6 +552,13 @@ def onConnected(interface): waitForAckNak = True interface.getNode(args.dest, False, **getNode_kwargs).resetNodeDb() + if args.add_contact: + closeNow = True + waitForAckNak = True + interface.getNode(args.dest, False, **getNode_kwargs).addContactURL( + args.add_contact + ) + if args.sendtext: closeNow = True channelIndex = mt_config.channel_index or 0 @@ -1074,6 +1081,20 @@ def setSimpleConfig(modem_preset): else: print("Install pyqrcode to view a QR code printed to terminal.") + if args.contact_qr: + closeNow = True + url = interface.getNode(args.dest, True, **getNode_kwargs).getContactURL( + args.contact_qr, + should_ignore=args.contact_ignore, + manually_verified=args.contact_verified, + ) + print(f"Contact URL: {url}") + if pyqrcode is not None: + qr = pyqrcode.create(url) + print(qr.terminal()) + else: + print("Install pyqrcode to view a QR code printed to terminal.") + log_set: Optional = None # type: ignore[annotation-unchecked] # we need to keep a reference to the logset so it doesn't get GCed early @@ -1858,6 +1879,24 @@ def addChannelConfigArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPa action="store_true", ) + group.add_argument( + "--contact-qr", + help="Display a QR code for a node's contact data. " + "Use the node ID with a '!' or '0x' prefix or the node number. " + "Also shows the shareable contact URL.", + metavar="!xxxxxxxx", + ) + group.add_argument( + "--contact-verified", + help="Set the IS_KEY_MANUALLY_VERIFIED bit in the generated contact URL", + action="store_true", + ) + group.add_argument( + "--contact-ignore", + help="Mark this contact as blocked/ignored in the generated contact URL", + action="store_true", + ) + group.add_argument( "--ch-enable", help="Enable the specified channel. Use --ch-add instead whenever possible.", @@ -2095,6 +2134,13 @@ def addRemoteAdminArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars action="store_true", ) + group.add_argument( + "--add-contact", + help="Add a contact (User) to the NodeDB from a shareable URL. " + "Example: https://meshtastic.org/v/#", + metavar="URL", + ) + group.add_argument( "--set-time", help="Set the time to the provided unix epoch timestamp, or the system's current time if omitted or 0.", diff --git a/meshtastic/node.py b/meshtastic/node.py index b18eff072..3554cc15b 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -380,6 +380,46 @@ def getURL(self, includeAll: bool = True): s = s.replace("=", "").replace("+", "-").replace("/", "_") return f"https://meshtastic.org/e/#{s}" + def getContactURL(self, node_id: Union[int, str], should_ignore: bool = False, manually_verified: bool = False): + """Generate a shareable contact URL for the specified node""" + nodeNum = to_node_num(node_id) + + node = self.iface.nodesByNum.get(nodeNum) + if not node or not node.get("user"): + our_exit(f"Warning: Node {node_id} not found in NodeDB") + + contact = admin_pb2.SharedContact() + contact.node_num = nodeNum + + u = node["user"] + if u.get("id"): + contact.user.id = u["id"] + if u.get("macaddr"): + contact.user.macaddr = base64.b64decode(u["macaddr"]) + if u.get("longName"): + contact.user.long_name = u["longName"] + if u.get("shortName"): + contact.user.short_name = u["shortName"] + if u.get("hwModel") and u["hwModel"] != "UNSET": + contact.user.hw_model = mesh_pb2.HardwareModel.Value(u["hwModel"]) + if u.get("role"): + contact.user.role = config_pb2.Config.DeviceConfig.Role.Value(u["role"]) + if u.get("publicKey"): + contact.user.public_key = base64.b64decode(u["publicKey"]) + if u.get("isLicensed"): + contact.user.is_licensed = u["isLicensed"] + if u.get("isUnmessagable") is not None: + contact.user.is_unmessagable = u["isUnmessagable"] + if should_ignore: + contact.should_ignore = True + if manually_verified: + contact.manually_verified = True + + data = contact.SerializeToString() + s = base64.urlsafe_b64encode(data).decode("ascii") + s = s.replace("=", "").replace("+", "-").replace("/", "_") + return f"https://meshtastic.org/v/#{s}" + def setURL(self, url: str, addOnly: bool = False): """Set mesh network URL""" if self.localConfig is None or self.channels is None: @@ -445,6 +485,32 @@ def setURL(self, url: str, addOnly: bool = False): self.ensureSessionKey() self._sendAdmin(p) + def addContactURL(self, url: str): + """Add a contact (User) to the NodeDB from a shareable URL""" + self.ensureSessionKey() + + splitURL = url.split("/#") + if len(splitURL) == 1: + our_exit(f"Warning: Invalid URL '{url}'") + b64 = splitURL[-1] + + missing_padding = len(b64) % 4 + if missing_padding: + b64 += "=" * (4 - missing_padding) + + decodedURL = base64.urlsafe_b64decode(b64) + contact = admin_pb2.SharedContact() + contact.ParseFromString(decodedURL) + + p = admin_pb2.AdminMessage() + p.add_contact.CopyFrom(contact) + + if self == self.iface.localNode: + onResponse = None + else: + onResponse = self.onAckNak + return self._sendAdmin(p, onResponse=onResponse) + def onResponseRequestRingtone(self, p): """Handle the response packet for requesting ringtone part 1""" logger.debug(f"onResponseRequestRingtone() p:{p}") diff --git a/meshtastic/tests/test_main.py b/meshtastic/tests/test_main.py index 60f352084..772717cb6 100644 --- a/meshtastic/tests/test_main.py +++ b/meshtastic/tests/test_main.py @@ -2998,6 +2998,56 @@ def test_remove_ignored_node(): main() mocked_node.removeIgnored.assert_called_once_with("!12345678") + +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +def test_add_contact_url(): + """Test --add-contact with a shareable URL""" + url = "https://meshtastic.org/v/#CKqkvZgIElEKCSE4MzBmNTIyYRIQUm9hZHJ1bm5lciBSaWRnZRoEUktTTiIGAAAAAAAAKAk4AkIgRxo_Fw_ergQIhRqBbrHasLYy3gU-Ay8hrhu4OVnIPQc=" # pylint: disable=line-too-long + sys.argv = ["", "--add-contact", url] + mt_config.args = sys.argv + mocked_node = MagicMock(autospec=Node) + iface = MagicMock(autospec=SerialInterface) + iface.getNode.return_value = mocked_node + with patch("meshtastic.serial_interface.SerialInterface", return_value=iface): + main() + + mocked_node.addContactURL.assert_called_once_with(url) + + +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +def test_contact_qr(): + """Test --contact-qr with a node ID""" + sys.argv = ["", "--contact-qr", "!830f522a"] + mt_config.args = sys.argv + mocked_node = MagicMock(autospec=Node) + iface = MagicMock(autospec=SerialInterface) + iface.getNode.return_value = mocked_node + mocked_node.iface = iface + with patch("meshtastic.serial_interface.SerialInterface", return_value=iface): + main() + + mocked_node.getContactURL.assert_called_once_with("!830f522a", should_ignore=False, manually_verified=False) + mocked_node.getContactURL.reset_mock() + + +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +def test_contact_qr_with_flags(): + """Test --contact-qr with --contact-verified and --contact-ignore""" + sys.argv = ["", "--contact-qr", "!830f522a", "--contact-verified", "--contact-ignore"] + mt_config.args = sys.argv + mocked_node = MagicMock(autospec=Node) + iface = MagicMock(autospec=SerialInterface) + iface.getNode.return_value = mocked_node + mocked_node.iface = iface + with patch("meshtastic.serial_interface.SerialInterface", return_value=iface): + main() + + mocked_node.getContactURL.assert_called_once_with("!830f522a", should_ignore=True, manually_verified=True) + + @pytest.mark.unit @pytest.mark.usefixtures("reset_mt_config") def test_main_set_owner_whitespace_only(capsys): diff --git a/meshtastic/tests/test_node.py b/meshtastic/tests/test_node.py index bd0644dec..7065fc03e 100644 --- a/meshtastic/tests/test_node.py +++ b/meshtastic/tests/test_node.py @@ -1,13 +1,14 @@ """Meshtastic unit tests for node.py""" # pylint: disable=C0302 +import base64 import logging import re from unittest.mock import MagicMock, patch import pytest -from ..protobuf import admin_pb2, localonly_pb2, config_pb2 +from ..protobuf import admin_pb2, localonly_pb2, config_pb2, mesh_pb2 from ..protobuf.channel_pb2 import Channel # pylint: disable=E0611 from ..node import Node from ..serial_interface import SerialInterface @@ -339,6 +340,56 @@ def test_setURL_valid_URL_but_no_settings(capsys): assert err == "" +@pytest.mark.unit +def test_contact_url_roundtrip(): + """Verify that contact URL generation and parsing is fully reversible""" + def encode_url(contact): + data = contact.SerializeToString() + s = base64.urlsafe_b64encode(data).decode("ascii") + s = s.replace("=", "").replace("+", "-").replace("/", "_") + return f"https://meshtastic.org/v/#{s}" + + def decode_url(url): + b64 = url.split("/#")[-1] + missing_padding = len(b64) % 4 + if missing_padding: + b64 += "=" * (4 - missing_padding) + decoded = base64.urlsafe_b64decode(b64) + contact = admin_pb2.SharedContact() + contact.ParseFromString(decoded) + return contact + + original = admin_pb2.SharedContact() + original.node_num = 2198819370 + original.user.id = "!830f522a" + original.user.long_name = "Roadrunner Ridge" + original.user.short_name = "RKSN" + original.user.macaddr = b'\x00\x00\x00\x00\x00\x00' + original.user.hw_model = mesh_pb2.HardwareModel.Value("RAK4631") + original.user.role = mesh_pb2.User.DESCRIPTOR.fields_by_name['role'].enum_type.values_by_name["ROUTER"].number + original.user.public_key = bytes.fromhex("471a3f170fdeae0408851a816eb1dab0b632de053e032f21ae1bb83959c83d07") + original.user.is_licensed = True + original.user.is_unmessagable = False + original.should_ignore = True + original.manually_verified = True + + url = encode_url(original) + parsed = decode_url(url) + + assert parsed.node_num == original.node_num + assert parsed.user.id == original.user.id + assert parsed.user.long_name == original.user.long_name + assert parsed.user.short_name == original.user.short_name + assert parsed.user.macaddr == original.user.macaddr + assert parsed.user.hw_model == original.user.hw_model + assert parsed.user.role == original.user.role + assert parsed.user.public_key == original.user.public_key + assert parsed.user.is_licensed == original.user.is_licensed + assert parsed.user.is_unmessagable == original.user.is_unmessagable + assert parsed.should_ignore == original.should_ignore + assert parsed.manually_verified == original.manually_verified + + # TODO # @pytest.mark.unit # def test_showChannels(capsys):