I am sure there is a better way to present code on this wiki. Sorry.
Please don't judge this code - I am NOT a professional DEV, and I really dislike python
This code takes an optional argument for a local MQTT broker.
If specified, it will publish all messages with a localTopicPrefix.
For a private channel, specify your channel name and key in the code.
If someone wants to make some improvements to this code, please do.
The first thing I would do would be to put the list of properties into a list, and iterate that, so that this code is much smaller.
Also, I am sure there is a way to get the APP Type out of the meshtastic library - I have just hard coded the numbers.
To install the required python libraries
mkdir python-venv-meshtastic
python3 -m venv python-venv-meshtastic/
source python-venv-meshtastic/bin/activate #activate this python environment
pip3 install --upgrade pytap2
pip3 install --upgrade meshtastic
python --version
meshtastic --get lora
You should be able to just copy it out
""" based off https://github.com/pdxlocations/Meshtastic-Python-Examples """
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
import sysimport base64
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from meshtastic.protobuf import mqtt_pb2, mesh_pb2
from meshtastic import protocols
import json""" argv[1] if you pass a servername for a local/different MQTT broker to publish the decoded messages to """
localMQTTserver = ""
if len(sys.argv) > 1:
localMQTTserver = sys.argv[1]localTopicPrefix = "mshlocal/"
BROKER = "mqtt.meshtastic.org"
USER = "meshdev"
PASS = "large4cats"
PORT = 1883""" TOPICS = ["msh/US/2/e/PKI/#","msh/US/2/e/LongFast/#"] """
""" TOPICS = ["msh/ANZ/2/e/#"] """TOPICS = ["msh/ANZ/2/e/LongFast/#"]
""" TOPICS = ["msh/ANZ/2/e//#"] #uncomment this line if you want to decode your private channel """KEY = "AQ=="
""" KEY = "" #uncomment this line if you want to decode your private channel """KEY = "1PG7OiApB1nwvP+rz05pAQ==" if KEY == "AQ==" else KEY
""" Callback when the client connects to the broker """
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT broker!")
for topic in TOPICS:
client.subscribe(topic)
print(f"Subscribed to topic: {topic}")
else:
print(f"Failed to connect, return code {rc}")""" Callback when a message is received """
def on_message(client, userdata, msg):
se = mqtt_pb2.ServiceEnvelope()
se.ParseFromString(msg.payload)
decoded_mp = se.packetapp = 'unknown' """ Try to decrypt the payload if it is encrypted """ if decoded_mp.HasField("encrypted") and not decoded_mp.HasField("decoded"): decoded_data = decrypt_packet(decoded_mp, KEY) if decoded_data is None: print("Decryption failed; retaining original encrypted payload") else: decoded_mp.decoded.CopyFrom(decoded_data) """ Attempt to process the decrypted or encrypted payload """ portNumInt = decoded_mp.decoded.portnum if decoded_mp.HasField("decoded") else None handler = protocols.get(portNumInt) if portNumInt else None pb = None if handler is not None and handler.protobufFactory is not None: pb = handler.protobufFactory() pb.ParseFromString(decoded_mp.decoded.payload) if pb: """ Clean and update the payload """ pb_str = str(pb).replace('\n', ' ').replace('\r', ' ').strip() decoded_mp.decoded.payload = pb_str.encode("utf-8") sender = getattr(decoded_mp, 'from', None) jsonOut = { 'from': sender, 'to': decoded_mp.to, 'channel': decoded_mp.channel, 'id': decoded_mp.id, 'rx_time': decoded_mp.rx_time, 'rx_snr': decoded_mp.rx_snr, 'rx_rssi': decoded_mp.rx_rssi, 'hop_limit': decoded_mp.hop_limit, 'hop_start': decoded_mp.hop_start} jsonPayload = {'portnum': decoded_mp.decoded.portnum, 'bitfield': decoded_mp.decoded.bitfield} if decoded_mp.decoded.portnum == 1: app = 'TEXT_MESSAGE_APP' jsonPayload.update({'app': app, 'payload': decoded_mp.decoded.payload.decode("utf-8")}) elif decoded_mp.decoded.portnum == 67: app = 'TELEMETRY_APP' jsonPayload.update({'app': app}) payloadStr = decoded_mp.decoded.payload.decode("utf-8") payload = payloadStr.split() if 'time' in payloadStr: jsonPayload.update({'time': payload[payload.index('time:')+1]}) if 'device_metrics' in payloadStr: jsonPayload.update({'type': 'device_metrics'}) elif 'environment_metrics' in payloadStr: jsonPayload.update({'type': 'environment_metrics'}) if 'temperature' in payloadStr: jsonPayload.update({'temperature': float(payload[payload.index('temperature:')+1])}) if 'barometric_pressure' in payloadStr: jsonPayload.update({'barometric_pressure': float(payload[payload.index('barometric_pressure:')+1])}) if 'relative_humidity' in payloadStr: jsonPayload.update({'relative_humidity': float(payload[payload.index('relative_humidity:')+1])}) if 'battery_level' in payloadStr: jsonPayload.update({'battery_level': int(payload[payload.index('battery_level:')+1])}) if 'voltage' in payloadStr: jsonPayload.update({'voltage': float(payload[payload.index('voltage:')+1])}) if 'channel_utilization' in payloadStr: jsonPayload.update({'channel_utilization': float(payload[payload.index('channel_utilization:')+1])}) if 'air_util_tx' in payloadStr: jsonPayload.update({'air_util_tx': float(payload[payload.index('air_util_tx:')+1])}) if 'uptime_seconds' in payloadStr: jsonPayload.update({'uptime_seconds': int(payload[payload.index('uptime_seconds:')+1])}) elif decoded_mp.decoded.portnum == 3: app = 'POSITION_APP' jsonPayload.update({'app': app}) payloadStr = decoded_mp.decoded.payload.decode("utf-8") payload = payloadStr.split() if 'time' in payloadStr: jsonPayload.update({'time': payload[payload.index('time:')+1]}) if 'latitude_i' in payloadStr: jsonPayload.update({'latitude': int(payload[payload.index('latitude_i:')+1])/10000000}) if 'longitude_i' in payloadStr: jsonPayload.update({'longitude': int(payload[payload.index('longitude_i:')+1])/10000000}) if 'altitude' in payloadStr: jsonPayload.update({'altitude': int(payload[payload.index('altitude:')+1])}) if 'location_source' in payloadStr: jsonPayload.update({'location_source': payload[payload.index('location_source:')+1]}) if 'ground_speed' in payloadStr: jsonPayload.update({'ground_speed': int(payload[payload.index('ground_speed:')+1])}) if 'ground_track' in payloadStr: jsonPayload.update({'ground_track': int(payload[payload.index('ground_track:')+1])}) if 'precision_bits' in payloadStr: jsonPayload.update({'precision_bits': int(payload[payload.index('precision_bits:')+1])}) elif decoded_mp.decoded.portnum == 4: app = 'NODEINFO_APP' jsonPayload.update({'app': app}) payloadStr = decoded_mp.decoded.payload.decode("utf-8") payload = payloadStr.split() if 'id' in payloadStr: jsonPayload.update({'id': payload[payload.index('id:')+1]}) if 'short_name' in payloadStr: jsonPayload.update({'short_name': payload[payload.index('short_name:')+1]}) if 'long_name' in payloadStr: jsonPayload.update({'long_name': payload[payload.index('long_name:')+1]}) if 'macaddr' in payloadStr: jsonPayload.update({'macaddr': payload[payload.index('macaddr:')+1]}) if 'hw_model' in payloadStr: jsonPayload.update({'hw_model': payload[payload.index('hw_model:')+1]}) if 'public_key' in payloadStr: jsonPayload.update({'public_key': payload[payload.index('public_key:')+1]}) if 'request_id' in payloadStr: jsonPayload.update({'request_id': payload[payload.index('request_id:')+1]}) jsonOut.update({'payload': jsonPayload}) if len(localMQTTserver) > 0: publish.single(localTopicPrefix+msg.topic,json.dumps(jsonOut), hostname=localMQTTserver) print(msg.topic, ' ', jsonOut)
def decrypt_packet(mp, key):
try:
key_bytes = base64.b64decode(key.encode('ascii'))"""Build the nonce from message ID and sender""" nonce_packet_id = getattr(mp, "id").to_bytes(8, "little") nonce_from_node = getattr(mp, "from").to_bytes(8, "little") nonce = nonce_packet_id + nonce_from_node """Decrypt the encrypted payload""" cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend()) decryptor = cipher.decryptor() decrypted_bytes = decryptor.update(getattr(mp, "encrypted")) + decryptor.finalize() """Parse the decrypted bytes into a Data object""" data = mesh_pb2.Data() data.ParseFromString(decrypted_bytes) return data except Exception as e: return None
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(USER, PASS)
try:
client.connect(BROKER, PORT, keepalive=60)
client.loop_forever()
except Exception as e:
print(f"mqtt client.connect(BROKER, PORT, keepalive=60): An error occurred: {e}")