Commit 0872cf32 authored by Gallacchi Mattia's avatar Gallacchi Mattia
Browse files

Fix pytest and resole error

parent b86f7b50
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
[tool.poetry]
name = "pyrsvp"
version = "0.2.1"
version = "0.2.2"
description = "Implements RSVP communication protocol in Python"
authors = ["Mattia Gallacchi <mattia.gallacchi@he-arc.ch>"]
readme = "README.md"
+29 −19
Original line number Diff line number Diff line
@@ -12,10 +12,14 @@ class RsvpClientConnectionFailed(Exception):
    """Exception for client connection"""


class RsvpClientNotAck(Exception):
class RsvpNotAck(Exception):
    """Exception when server doesn't acknowledge"""


class RsvpMalformedReply(Exception):
    """Reply is malformed"""


class RsvpTimeoutExpired(Exception):
    """Received or send timeout"""

@@ -72,9 +76,15 @@ class RsvpClient:
        tokens = resp.split(";")

        cmd_reply = tokens[0]
        try:
            cmd, status = cmd_reply.split("=")
        except ValueError:
            raise RsvpMalformedReply(f"Malformed reply string: {resp}")

        if len(tokens) < 2:
            if status == "NACK":
                raise RsvpNotAck()

            return {}

        # You expect to have a REPLY-{cmd_name}
@@ -82,14 +92,14 @@ class RsvpClient:
            match status:
                case "NACK":
                    _, err_str = tokens[1].split("=")
                    raise RsvpClientNotAck(err_str)
                    raise RsvpNotAck(err_str)
                case "ACK":
                    tokens = tokens[1:]
                    return process_data_types(tokens)
                case _:
                    raise RsvpClientNotAck(f"Unknown reply {cmd_name}={status}")
                    raise RsvpMalformedReply(f"Unknown reply {cmd_name}={status}")
        else:
            raise RsvpClientNotAck(
            raise RsvpNotAck(
                f"Command {cmd_name} not included in reply: {cmd}. Server reply is malformed"
            )

@@ -159,8 +169,6 @@ class RsvpClientTcp(RsvpClient):
        Raises
        ------
        RsvpTimeoutExpired : If while sending or receiving data the timeout expires
        RsvpClientConnectionFailed : If it fails to connect to the server
        RsvpClientNotAck: If the server didn't acknowledged
        RsvpClientInvalidCallback: If something is wrong with the callback
        """

@@ -209,6 +217,15 @@ class RsvpClientSerial(RsvpClient):
        self.ser.port = port
        self.ser.baudrate = baud
        self.ser.write_timeout = 1.0
        self.ser.timeout = 1.0

        try:
            self.ser.open()
        except serial.SerialException as ex:
            raise RsvpClientConnectionFailed(ex.__str__()) from ex

    def __del__(self):
        self.ser.close()

    @typechecked
    def send_command(
@@ -232,31 +249,24 @@ class RsvpClientSerial(RsvpClient):
        Raises
        ------
        RsvpTimeoutExpired : If while sending or receiving data the timeout expires
        RsvpClientNotAck: If the server didn't acknowledged
        RsvpNotAck: If the server didn't acknowledged
        RsvpClientInvalidCallback: If something is wrong with the callback
        """

        try:
            self.ser.open()
        except Exception as ex:
            raise ex

        cmd = self._build_command(cmd_name, args)

        try:
            self.ser.write(cmd.encode())
        except Exception as ex:
            raise ex
        except serial.SerialTimeoutException as ex:
            raise RsvpTimeoutExpired(ex.__str__()) from ex

        try:
            data = self.ser.readline().strip()
        except Exception as ex:
            raise ex
        except serial.SerialTimeoutException as ex:
            raise RsvpTimeoutExpired(ex.__str__()) from ex

        data = self._check_response(cmd_name, data.decode())

        self.ser.close()

        if callback:
            try:
                callback(data)
+8 −2
Original line number Diff line number Diff line
@@ -227,6 +227,7 @@ class RsvpServerSerial(RsvpServer):
        self.ser.port = port
        self.ser.baudrate = baud
        self._running = False
        self.ser.timeout = 1
        signal.signal(signal.SIGINT, self.__sig_handler)

    def print_commands(self):
@@ -253,12 +254,17 @@ class RsvpServerSerial(RsvpServer):
        )

        while self._running:
            try:
                data = self.ser.readline().strip()
            except serial.SerialException as ex:
                # Something went wrong probably an empty line was received
                print(ex)
                continue

            if len(data) == 0:
                continue

            reply = super().parse_data(data.decode())
            reply = super()._parse_data(data.decode())
            self.ser.write(reply.encode())

        self.ser.close()
+7 −29
Original line number Diff line number Diff line
import pytest
from pyrsvp.rsvpserver import RsvpServer, RsvpServerTcp
from pyrsvp.rsvptypes import *
import multiprocessing

@RsvpServer.Command("START")
def start(a: int, b: float, c: bool, d: str, e: dict, f: list) -> str:
    
    print(f"a: {a}, b: {b}, c: {c}, d: {d}, e: {e}, f: {f}")
    return RsvpServer.reply_ack("START", [RsvpString("STATUS", "PASS")])

@RsvpServer.Command("STOP")
def stop() -> str:
    
    return RsvpServer.reply_ack("START")

@RsvpServer.Command("ERROR")
def error() -> str:

    return RsvpServer.reply_not_ack("ERROR", "TEST ERROR HANDLING")

def server():
    srv = RsvpServerTcp()
    print("Server start")
    srv.run()

@pytest.fixture(scope="session", autouse=True)
def session_start():
@@ -31,8 +7,10 @@ def session_start():
    before performing collection and entering the run test loop.
    """

    p = multiprocessing.Process(target=server)
    p.start()
    yield
    p.terminate()
    # p = multiprocessing.Process(target=tcp_server)
    # p.start()
    # p1 = multiprocessing.Process(target=serial_server)
    # p1.start()
    # yield
    # p.terminate()
    # p1.terminate()
 No newline at end of file
+112 −0
Original line number Diff line number Diff line
import pytest

from pyrsvp.rsvpclient import RsvpClientTcp, RsvpClientNotAck
from pyrsvp.rsvpserver import RsvpServerTcp, RsvpServer
from pyrsvp.rsvpclient import *
from pyrsvp.rsvptypes import *
import multiprocessing

@RsvpServer.Command("START")
def start(values: dict) -> str:
    
    print(values)
    return RsvpServer.reply(True, [RsvpString("STATUS", "PASS")])

@RsvpServer.Command("STOP")
def stop(values: dict) -> str:
    
    return RsvpServer.reply(False)

@RsvpServer.Command("ERROR")
def error(values: dict) -> str:

    return RsvpServer.reply(False, [RsvpString("ERROR", "TEST ERROR HANDLING")])

@RsvpServer.Command("NOACKNACK")
def no_ack_nack(values: dict) -> str:

    return ""

@RsvpServer.Command("MISSINGACKNACK")
def missing_ack_nack(values: dict) -> str:

    return "MISSING_ACK_NAK"

@RsvpServer.Command("NACK")
def nack(values: dict) -> str:

    return RsvpServer.reply(False)

def tcp_server():
    srv = RsvpServerTcp()
    print("Tcp Server start")
    srv.run()

@pytest.fixture(scope="module", autouse=True)
def module_start():

    p = multiprocessing.Process(target=tcp_server)
    p.start()
    yield
    p.terminate()


@pytest.fixture(scope="module")
def client() -> RsvpClientTcp:
@@ -35,19 +82,31 @@ def too_few_args() -> list[RsvpBaseType]:

    return args

class TestTcpServer:
class TestTcpClient:

    def test_bad_reply(self, client : RsvpClientTcp):

        with pytest.raises(RsvpMalformedReply) as ex:
            client.send_command("NOACKNACK")
            client.send_command("MISSINGACKNACK")
        

    def test_nack(self, client : RsvpClientTcp):

        with pytest.raises(RsvpNotAck):
            client.send_command("NACK")

    def test_error_handling(self, client):
    def test_error_handling(self, client : RsvpClientTcp):

        with pytest.raises(RsvpClientNotAck):
        with pytest.raises(RsvpNotAck):
            client.send_command("ERROR")

    def test_wrong_arg_type(self, client):
    def test_wrong_arg_type(self, client: RsvpClientTcp):

        with pytest.raises(RsvpClientNotAck):
        with pytest.raises(RsvpWrongTypeException):
            client.send_command("WRONG_TYPE", args=[RsvpInteger("INT", "Hello")])

    def test_wrong_reply(self, client):
    def test_wrong_reply(self, client: RsvpClientTcp):

        with pytest.raises(RsvpClientNotAck):
        with pytest.raises(RsvpNotAck):
            client.send_command("STOP")
 No newline at end of file
Loading