Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions meshtastic/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,13 @@ def onConnected(interface):
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).resetNodeDb()

if args.add_contact:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).addContactURL(
args.add_contact
)

if args.sendtext:
closeNow = True
channelIndex = mt_config.channel_index or 0
Expand Down Expand Up @@ -1074,6 +1081,20 @@ def setSimpleConfig(modem_preset):
else:
print("Install pyqrcode to view a QR code printed to terminal.")

if args.contact_qr:
closeNow = True
url = interface.getNode(args.dest, True, **getNode_kwargs).getContactURL(
args.contact_qr,
should_ignore=args.contact_ignore,
manually_verified=args.contact_verified,
)
print(f"Contact URL: {url}")
if pyqrcode is not None:
qr = pyqrcode.create(url)
print(qr.terminal())
else:
print("Install pyqrcode to view a QR code printed to terminal.")

log_set: Optional = None # type: ignore[annotation-unchecked]
# we need to keep a reference to the logset so it doesn't get GCed early

Expand Down Expand Up @@ -1858,6 +1879,24 @@ def addChannelConfigArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPa
action="store_true",
)

group.add_argument(
"--contact-qr",
help="Display a QR code for a node's contact data. "
"Use the node ID with a '!' or '0x' prefix or the node number. "
"Also shows the shareable contact URL.",
metavar="!xxxxxxxx",
)
group.add_argument(
"--contact-verified",
help="Set the IS_KEY_MANUALLY_VERIFIED bit in the generated contact URL",
action="store_true",
)
group.add_argument(
"--contact-ignore",
help="Mark this contact as blocked/ignored in the generated contact URL",
action="store_true",
)

group.add_argument(
"--ch-enable",
help="Enable the specified channel. Use --ch-add instead whenever possible.",
Expand Down Expand Up @@ -2095,6 +2134,13 @@ def addRemoteAdminArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars
action="store_true",
)

group.add_argument(
"--add-contact",
help="Add a contact (User) to the NodeDB from a shareable URL. "
"Example: https://meshtastic.org/v/#<base64>",
metavar="URL",
)

group.add_argument(
"--set-time",
help="Set the time to the provided unix epoch timestamp, or the system's current time if omitted or 0.",
Expand Down
66 changes: 66 additions & 0 deletions meshtastic/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,46 @@ def getURL(self, includeAll: bool = True):
s = s.replace("=", "").replace("+", "-").replace("/", "_")
return f"https://meshtastic.org/e/#{s}"

def getContactURL(self, node_id: Union[int, str], should_ignore: bool = False, manually_verified: bool = False):
"""Generate a shareable contact URL for the specified node"""
nodeNum = to_node_num(node_id)

node = self.iface.nodesByNum.get(nodeNum)
if not node or not node.get("user"):
our_exit(f"Warning: Node {node_id} not found in NodeDB")

contact = admin_pb2.SharedContact()
contact.node_num = nodeNum

u = node["user"]
if u.get("id"):
contact.user.id = u["id"]
if u.get("macaddr"):
contact.user.macaddr = base64.b64decode(u["macaddr"])
Comment on lines +397 to +398
if u.get("longName"):
contact.user.long_name = u["longName"]
if u.get("shortName"):
contact.user.short_name = u["shortName"]
if u.get("hwModel") and u["hwModel"] != "UNSET":
contact.user.hw_model = mesh_pb2.HardwareModel.Value(u["hwModel"])
if u.get("role"):
contact.user.role = config_pb2.Config.DeviceConfig.Role.Value(u["role"])
if u.get("publicKey"):
contact.user.public_key = base64.b64decode(u["publicKey"])
if u.get("isLicensed"):
contact.user.is_licensed = u["isLicensed"]
if u.get("isUnmessagable") is not None:
contact.user.is_unmessagable = u["isUnmessagable"]
if should_ignore:
contact.should_ignore = True
if manually_verified:
contact.manually_verified = True

data = contact.SerializeToString()
s = base64.urlsafe_b64encode(data).decode("ascii")
s = s.replace("=", "").replace("+", "-").replace("/", "_")
return f"https://meshtastic.org/v/#{s}"

def setURL(self, url: str, addOnly: bool = False):
"""Set mesh network URL"""
if self.localConfig is None or self.channels is None:
Expand Down Expand Up @@ -445,6 +485,32 @@ def setURL(self, url: str, addOnly: bool = False):
self.ensureSessionKey()
self._sendAdmin(p)

def addContactURL(self, url: str):
"""Add a contact (User) to the NodeDB from a shareable URL"""
self.ensureSessionKey()

splitURL = url.split("/#")
if len(splitURL) == 1:
our_exit(f"Warning: Invalid URL '{url}'")
b64 = splitURL[-1]

missing_padding = len(b64) % 4
if missing_padding:
b64 += "=" * (4 - missing_padding)

decodedURL = base64.urlsafe_b64decode(b64)
contact = admin_pb2.SharedContact()
contact.ParseFromString(decodedURL)
Comment on lines +501 to +503

p = admin_pb2.AdminMessage()
p.add_contact.CopyFrom(contact)

if self == self.iface.localNode:
onResponse = None
else:
onResponse = self.onAckNak
return self._sendAdmin(p, onResponse=onResponse)

def onResponseRequestRingtone(self, p):
"""Handle the response packet for requesting ringtone part 1"""
logger.debug(f"onResponseRequestRingtone() p:{p}")
Expand Down
50 changes: 50 additions & 0 deletions meshtastic/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2998,6 +2998,56 @@ def test_remove_ignored_node():
main()

mocked_node.removeIgnored.assert_called_once_with("!12345678")

@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_add_contact_url():
"""Test --add-contact with a shareable URL"""
url = "https://meshtastic.org/v/#CKqkvZgIElEKCSE4MzBmNTIyYRIQUm9hZHJ1bm5lciBSaWRnZRoEUktTTiIGAAAAAAAAKAk4AkIgRxo_Fw_ergQIhRqBbrHasLYy3gU-Ay8hrhu4OVnIPQc=" # pylint: disable=line-too-long
sys.argv = ["", "--add-contact", url]
mt_config.args = sys.argv
mocked_node = MagicMock(autospec=Node)
iface = MagicMock(autospec=SerialInterface)
iface.getNode.return_value = mocked_node
with patch("meshtastic.serial_interface.SerialInterface", return_value=iface):
main()

mocked_node.addContactURL.assert_called_once_with(url)


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_contact_qr():
"""Test --contact-qr with a node ID"""
sys.argv = ["", "--contact-qr", "!830f522a"]
mt_config.args = sys.argv
mocked_node = MagicMock(autospec=Node)
iface = MagicMock(autospec=SerialInterface)
iface.getNode.return_value = mocked_node
mocked_node.iface = iface
with patch("meshtastic.serial_interface.SerialInterface", return_value=iface):
main()

mocked_node.getContactURL.assert_called_once_with("!830f522a", should_ignore=False, manually_verified=False)
mocked_node.getContactURL.reset_mock()


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_contact_qr_with_flags():
"""Test --contact-qr with --contact-verified and --contact-ignore"""
sys.argv = ["", "--contact-qr", "!830f522a", "--contact-verified", "--contact-ignore"]
mt_config.args = sys.argv
mocked_node = MagicMock(autospec=Node)
iface = MagicMock(autospec=SerialInterface)
iface.getNode.return_value = mocked_node
mocked_node.iface = iface
with patch("meshtastic.serial_interface.SerialInterface", return_value=iface):
main()

mocked_node.getContactURL.assert_called_once_with("!830f522a", should_ignore=True, manually_verified=True)


@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_main_set_owner_whitespace_only(capsys):
Expand Down
53 changes: 52 additions & 1 deletion meshtastic/tests/test_node.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""Meshtastic unit tests for node.py"""
# pylint: disable=C0302

import base64
import logging
import re
from unittest.mock import MagicMock, patch

import pytest

from ..protobuf import admin_pb2, localonly_pb2, config_pb2
from ..protobuf import admin_pb2, localonly_pb2, config_pb2, mesh_pb2
from ..protobuf.channel_pb2 import Channel # pylint: disable=E0611
from ..node import Node
from ..serial_interface import SerialInterface
Expand Down Expand Up @@ -339,6 +340,56 @@ def test_setURL_valid_URL_but_no_settings(capsys):
assert err == ""


@pytest.mark.unit
def test_contact_url_roundtrip():
"""Verify that contact URL generation and parsing is fully reversible"""
def encode_url(contact):
data = contact.SerializeToString()
s = base64.urlsafe_b64encode(data).decode("ascii")
s = s.replace("=", "").replace("+", "-").replace("/", "_")
return f"https://meshtastic.org/v/#{s}"

def decode_url(url):
b64 = url.split("/#")[-1]
missing_padding = len(b64) % 4
if missing_padding:
b64 += "=" * (4 - missing_padding)
decoded = base64.urlsafe_b64decode(b64)
contact = admin_pb2.SharedContact()
contact.ParseFromString(decoded)
return contact

original = admin_pb2.SharedContact()
original.node_num = 2198819370
original.user.id = "!830f522a"
original.user.long_name = "Roadrunner Ridge"
original.user.short_name = "RKSN"
original.user.macaddr = b'\x00\x00\x00\x00\x00\x00'
original.user.hw_model = mesh_pb2.HardwareModel.Value("RAK4631")
original.user.role = mesh_pb2.User.DESCRIPTOR.fields_by_name['role'].enum_type.values_by_name["ROUTER"].number
original.user.public_key = bytes.fromhex("471a3f170fdeae0408851a816eb1dab0b632de053e032f21ae1bb83959c83d07")
original.user.is_licensed = True
original.user.is_unmessagable = False
original.should_ignore = True
original.manually_verified = True

url = encode_url(original)
parsed = decode_url(url)

assert parsed.node_num == original.node_num
assert parsed.user.id == original.user.id
assert parsed.user.long_name == original.user.long_name
assert parsed.user.short_name == original.user.short_name
assert parsed.user.macaddr == original.user.macaddr
assert parsed.user.hw_model == original.user.hw_model
assert parsed.user.role == original.user.role
assert parsed.user.public_key == original.user.public_key
assert parsed.user.is_licensed == original.user.is_licensed
assert parsed.user.is_unmessagable == original.user.is_unmessagable
assert parsed.should_ignore == original.should_ignore
assert parsed.manually_verified == original.manually_verified


# TODO
# @pytest.mark.unit
# def test_showChannels(capsys):
Expand Down
Loading