NewLine Serialization Method

Arduino Library

Initialization

NLSMSerial.init(115200);

Receive

uint8_t buffer[128];
uint16_t rx_size = NLSMSerial.receive(buffer, 128);

Transmit

uint8_t buffer[128];
uint16_t tx_size = 1;
NLSMSerial.transmit(buffer, tx_size);

Python

import serial

class NLSMSerial:
    END = b"\x0A"
    ESC = b"\x1B"
    ESC_END = b"\x1C"
    ESC_ESC = b"\x1D"
    def __init__(self, COM=None, baudrate=115200, timeout=0):
        self._COM = COM
        if not self._COM:
            self._COM = "COM6"
            
        self._ser = serial.Serial(port=self._COM, baudrate=baudrate, timeout=timeout)

    def transmit(self, buffer):
        index = 0
        while index < len(buffer):
            c = struct.pack("B", buffer[index])
            if c == NLSMSerial.END:
                self._ser.write(NLSMSerial.ESC)
                self._ser.write(NLSMSerial.ESC_END)
            elif c == NLSMSerial.ESC:
                self._ser.write(NLSMSerial.ESC)
                self._ser.write(NLSMSerial.ESC_ESC)
            else:
                self._ser.write(c)
            index += 1
        self._ser.write(NLSMSerial.END)

    def receive(self):
        c = b""
        buffer = b""
        while c != NLSMSerial.END:
            if c == NLSMSerial.ESC:
                c = self._ser.read(1)
                if c == NLSMSerial.ESC_END:
                    buffer += NLSMSerial.END
                elif c == NLSMSerial.ESC_ESC:
                    buffer += NLSMSerial.ESC
                else:
                    buffer += c
            else:
                buffer += c
            c = self._ser.read(1)
            if c == b"":
                return buffer
        return buffer

Example Usage: Echo

Arduino

#include "NLSMSerial.h"

uint8_t buffer[128];

void setup() {
  NLSMSerial.init(115200);
}

void loop() {
  uint16_t recv_size = NLSMSerial.receive(buffer, 128);
  NLSMSerial.transmit(buffer, recv_size);
}

Python

import struct
import logging
import json
import time

import random
import threading

import serial
import serial.tools.list_ports

class NLSMSerial:
    END = b"\x0A"
    ESC = b"\x1B"
    ESC_END = b"\x1C"
    ESC_ESC = b"\x1D"
    def __init__(self, COM=None, baudrate=115200, timeout=0):
        self._COM = COM
        if not self._COM:
            self._COM = "COM6"
            
        self._ser = serial.Serial(port=self._COM, baudrate=baudrate, timeout=timeout)

    def transmit(self, buffer):
        index = 0
        while index < len(buffer):
            c = struct.pack("B", buffer[index])
            if c == NLSMSerial.END:
                self._ser.write(NLSMSerial.ESC)
                self._ser.write(NLSMSerial.ESC_END)
            elif c == NLSMSerial.ESC:
                self._ser.write(NLSMSerial.ESC)
                self._ser.write(NLSMSerial.ESC_ESC)
            else:
                self._ser.write(c)
            index += 1
        self._ser.write(NLSMSerial.END)

    def receive(self):
        c = b""
        buffer = b""
        while c != NLSMSerial.END:
            if c == NLSMSerial.ESC:
                c = self._ser.read(1)
                if c == NLSMSerial.ESC_END:
                    buffer += NLSMSerial.END
                elif c == NLSMSerial.ESC_ESC:
                    buffer += NLSMSerial.ESC
                else:
                    buffer += c
            else:
                buffer += c
            c = self._ser.read(1)
            if c == b"":
                return buffer
        return buffer

rx_counter = 0
tx_counter = 0
recv_data = b""


def tx_handler():
    global tx_counter
    counter = 0
    while True:
        counter += 1
        
        buffer = b"hello world"+struct.pack("B", counter % 256)
        ser.transmit(buffer)
        tx_counter += 1


def rx_handler():
    global rx_counter
    global recv_data
    while True:
        recv = ser.receive()
        if not recv:
            continue
        recv_data = recv
        rx_counter += 1

ser = NLSMSerial(timeout=0.1)
time.sleep(2)
print("connected")

tx = threading.Thread(target=tx_handler)
rx = threading.Thread(target=rx_handler)
tx.start()
rx.start()

counter = 0
while True:
    print(tx_counter, rx_counter, tx_counter-rx_counter, recv_data)

    

Last updated