Commit b833aaeb authored by Gallacchi Mattia's avatar Gallacchi Mattia
Browse files

First commit

parents
Loading
Loading
Loading
Loading

.gitignore

0 → 100644
+4 −0
Original line number Diff line number Diff line
# Python
__pycache__/
dist/
 No newline at end of file

README.md

0 → 100644
+2 −0
Original line number Diff line number Diff line
# MS210 python driver
 No newline at end of file

main.py

0 → 100644
+29 −0
Original line number Diff line number Diff line
from ms210.driver import MS210

channels = ["IR", "R", "B", "G"]

if "__main__" == __name__:

    dev = MS210()
    value = 0
    index = 1
    current_channel = channels[index]

    try:
        while True:
            
            dev.set_value(current_channel, value)
            value += 50

            if value > 1000:
                value = 0
                dev.set_value(current_channel, value)
                index = index + 1 if index < 3 else 1
                current_channel = channels[index]
                

    except KeyboardInterrupt:
        pass

    print("Exit")
 No newline at end of file

ms210/__init__.py

0 → 100644
+0 −0

Empty file added.

ms210/driver.py

0 → 100644
+112 −0
Original line number Diff line number Diff line
import serial
from typing import Literal
from dataclasses import dataclass

_BAUD = 19200
_CHANNELS = ["IR", "R", "B", "G"]
_MAX_VALUE = 1000
_MIN_VALUE = 0

@dataclass
class Channel:
    name : str
    index : int
    value : int

class MS210InitFailed(Exception):
    pass

class MS210:

    def __init__(self, port: str = "/dev/ttyUSB0"):
        """Constructor of the MS210 object

        Parameters
        ----------
        port : str
            COM port of the Serial device. Windows = "COM<X>", Linux = "/dev/ttyUSB<X>"

        Raises
        ------
        MS210InitFailed if device not found 
        """
        
        self._ser = serial.Serial()
        self._ser.port = port
        self._ser.baudrate = _BAUD
        
        self.channels = []
        for index, channel in enumerate(_CHANNELS):
            self.channels.append(Channel(channel, index, 0))

        try:
            self._ser.open()
        except serial.SerialException as ex:
            raise MS210InitFailed from ex

        # Read current values
        success, msg = self.__get_current_values()
        if not success:
            raise MS210InitFailed(msg)

    def __del__(self):

        for channel in _CHANNELS:
            self.set_value(channel, 0)
            
        self._ser.close()

    def __check_limits(self, value: int) -> bool:

        if value > _MAX_VALUE:
            return False
        
        if value < _MIN_VALUE:
            return False
        
        return True

    def __get_current_values(self) -> tuple[bool, str]:

        buf = ""
        
        for channel in self.channels:
            try:
                self._ser.write(f"R{channel.index}".encode())
            except serial.SerialException as ex:
                return (False, ex.__str__())

            try:
                buf = self._ser.read(8)
            except serial.SerialException as ex:
                return (False, ex.__str__())
            
            channel.value = int(buf[2:].decode().strip())

        return (True, "OK")
    
    def get_value(self, channel: Literal["IR", "R", "B", "G"]) -> int:
        return [x for x in self.channels if x.name == channel][0].value

    def set_value(self, channel: Literal["IR", "R", "B", "G"], value : int = 0) -> tuple[bool, str]:

        if not self.__check_limits(value):
            return (False, f"Value {value} is out of bound. {_MIN_VALUE} < value < {_MAX_VALUE}")
        
        _channel = [x for x in self.channels if x.name == channel][0]

        cmd = f"S{_channel.index}{value:04d}"

        try:
            self._ser.write(cmd.encode())
        except serial.SerialException as ex:
            return (False, ex.__str__())
        try:
            buf = self._ser.read(8)
        except serial.SerialException as ex:
            return (False, ex.__str__())

        _channel.value = int(buf[2:].decode().strip())

        return (True, "OK")
 No newline at end of file