Skip to content
Commits on Source (2)
from pyrsvp.rsvpclient import RsvpClientSerial
from pyrsvp.rsvptypes import *
# To emulate serial ports run the following command:
# socat -d -d pty,raw,echo=0 pty,raw,echo=0
def test_cb(args : dict):
print(args)
if __name__ == "__main__":
client = RsvpClientSerial("/dev/pts/6", baud=9600)
client = RsvpClientSerial("/dev/pts/9", baud=9600)
args = [
RsvpInteger("ARG1", 1),
......@@ -18,6 +21,6 @@ if __name__ == "__main__":
]
try:
client.send_command("TEST", args, callback=test_cb)
print(client.send_command("TEST", args))
except Exception as ex:
print(f"[{type(ex).__name__}] Error: {ex}")
\ No newline at end of file
from pyrsvp.rsvpserver import RsvpServer, RsvpServerSerial
from pyrsvp.rsvptypes import *
# To emulate serial ports run the following command:
# socat -d -d pty,raw,echo=0 pty,raw,echo=0
@RsvpServer.Command("TEST")
def test(args : dict):
print(args)
......@@ -9,5 +12,5 @@ def test(args : dict):
if __name__ == "__main__":
srv = RsvpServerSerial("/dev/pts/5", baud=9600)
srv.run()
\ No newline at end of file
srv = RsvpServerSerial("/dev/pts/8", baud=9600)
srv.run(True)
\ No newline at end of file
from pyrsvp.rsvpclient import RsvpClientTcp, RsvpClientNotAck
from pyrsvp.rsvpclient import RsvpClientTcp
from pyrsvp.rsvptypes import *
......@@ -20,7 +20,7 @@ if __name__ == "__main__":
]
try:
print(client.send_command("START", args))
print(client.send_command(""))
except Exception as ex:
print(f"[{type(ex).__name__}] Error: {ex}")
\ No newline at end of file
......@@ -21,7 +21,7 @@ def start(values : dict) -> str:
print(values)
return RsvpServer.reply(True, [RsvpInteger("CODE", 1)])
return RsvpServer.reply(True)
if __name__ == "__main__":
......
[tool.poetry]
name = "pyrsvp"
version = "0.2.3"
version = "0.2.4"
description = "Implements RSVP communication protocol in Python"
authors = ["Mattia Gallacchi <mattia.gallacchi@he-arc.ch>"]
readme = "README.md"
......
......@@ -239,7 +239,7 @@ class RsvpServerSerial(RsvpServer):
self._running = False
self.ser.cancel_read()
def run(self):
def run(self, debug=False):
"""Run the Serial server"""
try:
......@@ -255,7 +255,7 @@ class RsvpServerSerial(RsvpServer):
while self._running:
try:
data = self.ser.readline().strip()
data = self.ser.readline()
except serial.SerialException as ex:
# Something went wrong probably an empty line was received
print(ex)
......@@ -264,7 +264,15 @@ class RsvpServerSerial(RsvpServer):
if len(data) == 0:
continue
if debug:
print(f"[DEBUG] Data: {data}")
data = data.strip()
reply = super()._parse_data(data.decode())
if debug:
print(f"[DEBUG] Reply: {reply.encode()}")
self.ser.write(reply.encode())
self.ser.close()
......@@ -263,7 +263,15 @@ def process_data_types(tokens: list[str]) -> dict:
values = {}
for token in tokens:
key, value = token.split("=", 1)
# Ignore tokens with not enough characters
if len(token) < 3:
continue
try:
key, value = token.split("=", 1)
except ValueError:
# Ignore failed splits
continue
# Find type
try:
......
......@@ -109,4 +109,9 @@ class TestTcpClient:
def test_wrong_reply(self, client: RsvpClientTcp):
with pytest.raises(RsvpNotAck):
client.send_command("STOP")
\ No newline at end of file
client.send_command("STOP")
def test_reply_string(self, client: RsvpClientTcp):
reply = client.send_command("START")
assert reply["STATUS"] == "PASS"
\ No newline at end of file