EL-UHF-RMT01 Menggunakan Python

Komunikasi antara komputer dengan EL-UHF-RMT01 dapat dilakukan dengan bantuan USB to TTL agar bisa mengirim ataupun menerima data melalui usb di komputer, data yang dikirim ataupun diterima dapat diakses secara program. Pada artikel ini modul RFID scanner akan diakses dengan bahasa pemrograman Python.

Persiapan

Sebelum masuk ke bagian code, siapkan dulu beberapa hal berikut:

1. Koneksi

Sambungan EL-UHF-RMT01 dengan USB to TTL.

EL-UHF-RMT01 dengan USB to TTL

EL-UHF-RMT01 perlu dihubungkan ke USB to TTL sebelum melakukan komunikasi dengan komputer.

EL-UHF-RMT01USB to TTL
VCC5V
TXRX
RXTX
EN3.3V
GNDGND

2. Kode dan Penjelasan

1. command.py

Class ini akan mempermudah proses parsing data byte yang akan dikirim ke reader. List command yang dapat digunakan dapat dilihat di dokumentasi protokol.

Pada class ini terdapat kode untuk menghitung checksum, checksum didapat dari penjumlahan byte frame mulai dari Type hingga byte terakhir dari Parameter (data), hanya mengambil byte LSB saja.

HEADER: bytes = b'\xBB'
END: bytes = b'\x7E'
CMD_INVENTORY_SINGLE: bytes = b'\x22'
CMD_INVENTORY_MULTI_START: bytes = b'\x27'
CMD_INVENTORY_MULTI_STOP: bytes = b'\x28'
CMD_SET_BAUD_RATE: bytes = b'\x11'
CMD_GET_POWER: bytes = b'\xB7'
CMD_SET_POWER: bytes = b'\xB6'
CMD_SAVE_CONFIG: bytes = b'\x09'
CMD_SET_SELECT: bytes = b'\x0C'
CMD_GET_SELECT: bytes = b'\x0B'
CMD_SET_SELECT_MODE: bytes = b'\x12'
CMD_READ_MEMORY: bytes = b'\x39'
CMD_WRITE_MEMORY: bytes = b'\x49'
 
 
class Command:
    def __init__(self, command: bytes, data: bytes | int | None = None):
        self.command: bytes = command
        self.data: bytes | int | None = data
        if isinstance(data, int):
            self.data = bytearray([data])
        if data is None:
            self.data = bytearray()
 
    def serialize(self) -> bytes:
        # Calculate CRC
        crc_sum: int = sum((bytearray([0x00]) + self.command
                            + len(self.data).to_bytes(2, byteorder="big") + self.data))
        crc: bytes = (crc_sum & 0xFF).to_bytes(1, byteorder="big")
        serialize: bytes = (HEADER + b'\x00' + self.command
                            + len(self.data).to_bytes(2, byteorder="big") + self.data + crc + END)
        return serialize

2. response.py

Class Response bertujuan untuk parsing dari data bytes menjadi 1 class frame response.

from typing import Optional
 
 
MEMORY_BANKS: dict[int, str] = {
    0: "Reserved",
    1: "EPC",
    2: "TID",
    3: "User"
}
 
 
def hex_readable(data: bytes | int, bytes_separator: str = " ") -> str:
    if isinstance(data, int):
        return "{:02X}".format(data)
    return bytes_separator.join("{:02X}".format(x) for x in data)
 
 
class Response:
    def __init__(self, response_bytes: bytes):
        self.response_bytes: bytes = response_bytes
        self.header: bytes = int.to_bytes(response_bytes[0], byteorder="big")
        self.frame_type: bytes = int.to_bytes(response_bytes[1], byteorder="big")
        self.command: bytes = int.to_bytes(response_bytes[2], byteorder="big")
        self.data_length: int = int.from_bytes(response_bytes[3:5], "big")
        self.data: bytes = response_bytes[5:-2]
        self.checksum: bytes = int.to_bytes(response_bytes[-2], byteorder="big")
        self.end: bytes = int.to_bytes(response_bytes[-1], byteorder="big")
 
    def __str__(self) -> str:
        return_value = ''
        value = '>>> START RESPONSE ================================'
        return_value = f'{return_value}\n{value}'
        value = f'RESPONSE       >> {hex_readable(self.response_bytes)}'  # Response
        return_value = f'{return_value}\n{value}'
        value = f'HEADER         >> {hex_readable(self.header)}'  # Header
        return_value = f'{return_value}\n{value}'
        value = f'TYPE           >> {hex_readable(self.frame_type)}'  # Type
        return_value = f'{return_value}\n{value}'
        value = f'COMMAND        >> {hex_readable(self.command)}'  # Command
        return_value = f'{return_value}\n{value}'
        if self.data:
            value = f'DATA           >> {hex_readable(self.data)}'  # Data
            return_value = f'{return_value}\n{value}'
        value = f'CHECKSUM (CRC) >> {hex_readable(self.checksum)}'  # Checksum (CRC)
        return_value = f'{return_value}\n{value}'
        value = f'END            >> {hex_readable(self.end)}'  # End
        return_value = f'{return_value}\n{value}'
        value = '>>> END RESPONSE   ================================'
        return_value = f'{return_value}\n{value}'
        return return_value.strip()
 
 
class SelectParameter:
    def __init__(self,
                 memory_bank: int,
                 mask: bytes,
                 target: int = 0,
                 action: int = 0,
                 start_address: int = 32,
                 truncate: bool = False,
                 mask_bit_length: Optional[int] = None
                 ) -> None:
        """
        Select parameter.
 
        Args:
            memory_bank (int): 0=Reserved, 1=EPC, 2=TID, 3=User.
            mask (bytes): Mask data
            target (int): Target session (0-7). Default 0 (S0).
            action (int): Action flag (0-7). Default 0 (000).
            start_address (int): Start address. Default 32 (0x20).
            truncate (bool): Truncate, default: False.
            mask_bit_length (Optional[int]): Mask length. If None, get from len(mask) * 8.
        """
        self.memory_bank: int = memory_bank
        self.mask: bytes = mask
        self.target: int = target
        self.action: int = action
        self.start_address: int = start_address
        self.truncate: bool = truncate
 
        if mask_bit_length is not None:
            self.mask_bit_length: int = mask_bit_length
        else:
            self.mask_bit_length: int = len(mask) * 8
 
    @property
    def memory_bank_label(self) -> str:
        return MEMORY_BANKS.get(self.memory_bank, "Unknown")
 
    @classmethod
    def parse(cls, data: bytes) -> 'SelectParameter':
        if len(data) < 7:
            raise ValueError(f"Data too short: {len(data)} bytes (min 7 required)")
 
        # 2. Parse Byte 0 (SelParam)
        b0: int = data[0]
        target: int = (b0 >> 5) & 0b111
        action: int = (b0 >> 2) & 0b111
        membank: int = b0 & 0b11
 
        # 3. Parse Pointer (Byte 1-4, Big Endian)
        start_address: int = int.from_bytes(data[1:5], byteorder='big')
 
        # 4. Parse Length & Truncate
        mask_len_bits: int = data[5]
        is_truncate: bool = (data[6] != 0)
 
        # 5. Parse Mask
        mask_byte_count: int = (mask_len_bits + 7) // 8
        mask_bytes: bytes = data[7: 7 + mask_byte_count]
 
        return cls(
            memory_bank=membank,
            mask=mask_bytes,
            target=target,
            action=action,
            start_address=start_address,
            truncate=is_truncate,
            mask_bit_length=mask_len_bits
        )
 
    def serialize(self) -> bytearray:
        data: bytearray = bytearray()
 
        # 1. Select parameter
        sel_params: int = (self.target << 5) | (self.action << 2) | self.memory_bank
        data.append(sel_params)
 
        # 2. Start address / pointer (4 bytes)
        data.extend(self.start_address.to_bytes(4, byteorder='big'))
 
        # 3. Mask Length (Bits)
        data.append(self.mask_bit_length)
 
        # 4. Truncate
        data.append(0x80 if self.truncate else 0x00)
 
        # 5. Mask Data
        data.extend(self.mask)
 
        return data
 
    def __repr__(self) -> str:
        return (
            f"SelectParameter(\n"
            f"  [Byte 0] Memory bank: {self.memory_bank} ({self.memory_bank_label}), "
            f"Target: {self.target}, Action: {self.action}\n"
            f"  [Byte 1-4] Start address (bit): {self.start_address} (0x{self.start_address:08X})\n"
            f"  [Byte 5] Mask length: {self.mask_bit_length} bits\n"
            f"  [Byte 6] Truncate: {self.truncate}\n"
            f"  [Byte 7+] Mask: {hex_readable(self.mask)}\n"
            f")"
        )
 
 
class ReadMemoryResponse(Response):
    def __init__(self, response_bytes: bytes) -> None:
        super().__init__(response_bytes)
 
        if not self.data:
            raise RuntimeError("No data")
 
        # If error
        if int.from_bytes(self.command) == 0xFF:
            if self.data[0] == 0x09:
                raise RuntimeError("The tag is not in range.")
            if self.data[0] == 0x16:
                raise RuntimeError("Wrong access password.")
            if self.data[0] == 0xA3:
                raise RuntimeError("The Tag memory location does not exist, is too small, "
                                   "or the tag does not support the specified EPC length.")
 
        pc_epc_length: int = self.data[0]
        self.pc: bytes = self.data[1:3]
        self.epc: bytes = self.data[3:pc_epc_length+1]
        self.read_data: bytes = self.data[pc_epc_length+1:]
 
    def __str__(self) -> str:
        return (
            f"Read Memory(\n"
            f" - PC  : {hex_readable(self.pc)}\n"
            f" - EPC : {hex_readable(self.epc)}\n"
            f" - Data: {hex_readable(self.read_data)}\n"
            f")"
        )
 
 
class WriteMemoryResponse(Response):
    def __init__(self, response_bytes: bytes) -> None:
        super().__init__(response_bytes)
 
        if not self.data:
            raise RuntimeError("No data")
 
        # If error
        if int.from_bytes(self.command) == 0xFF:
            if self.data[0] == 0x10:
                raise RuntimeError("The tag is not in range.")
            if self.data[0] == 0x16:
                raise RuntimeError("Wrong access password.")
            if self.data[0] == 0xB3:
                raise RuntimeError("The Tag memory location does not exist, is too small, "
                                   "or the tag does not support the specified EPC length.")
 
        pc_epc_length: int = self.data[0]
        self.pc: bytes = self.data[1:3]
        self.epc: bytes = self.data[3:pc_epc_length+1]
        self.result: int = self.data[pc_epc_length+1]
 
        if self.result != 0x00:
            raise RuntimeError("The result is not 0x00 (write failed).")
 
    def __str__(self) -> str:
        return (
            f"Write Memory(\n"
            f" - PC    : {hex_readable(self.pc)}\n"
            f" - EPC   : {hex_readable(self.epc)}\n"
            f" - Result: {'Write successfully.' if self.result == 0x00 else 'Write failed.' }\n"
            f")"
        )

3. reader.py

Pada class Reader terdapat fungsi:

  1. close(): Close serial port
  2. __send_request(...) & __get_response(): Private method proses kirim dan terima byte dari/ke reader.
  3. inventory_single(): Mengambil 1 data tag terdekat
  4. inventory_multiple_start(...): Memulai inventory data tag apa saja (banyak) di dekat reader
  5. inventory_multiple_stop(): Mengakhiri inventory (multi)
  6. save_config(...): Simpan reader settings
  7. get_power() & set_power(...): Atur jarak / kekuatan reader
  8. set_baud_rate(...): Atur baud rate

Silahkan tambahkan method sesuai yang diinginkan mengikuti dokumentasi protokol yang sudah ada.

from typing import Iterator
from command import *
from response import *
import serial
 
 
class Reader:
    def __init__(self, serial_port: str, baud_rate: int) -> None:
        self.serial_port: str = serial_port
        self.baud_rate: int = baud_rate
        self.serial = serial.Serial(serial_port, baud_rate,
                                    timeout=0.5, write_timeout=0.5)
 
    def close(self) -> None:
        self.serial.close()
 
    def __send_request(self, command: Command) -> None:
        self.serial.write(command.serialize())
 
    def __get_response(self) -> bytes:
        header: bytes = self.serial.read(1)
        assert header == HEADER  # Must equal to default header (0xBB)
        header_frame: bytes = self.serial.read(4)
        data_length: int = int.from_bytes(header_frame[-2:], "big")
        data: bytes = self.serial.read(data_length)
        crc_end: bytes = self.serial.read(2)
        end: bytes = int.to_bytes(crc_end[-1], byteorder="big")
        assert end == END
        complete_frame: bytes = header + header_frame + data + crc_end
        return complete_frame
 
    def inventory_single(self) -> bytes | None:
        """
        0x22 Single Inventory
        :return: Data or none (no tag)
        """
        command: Command = Command(CMD_INVENTORY_SINGLE)
        self.__send_request(command)
        response: Response = Response(self.__get_response())
        if response.frame_type == b'\x01' and response.data == b'\x15':
            return
        assert response.frame_type == b'\x02'  # Frame type ➜ 0x01: Response from the Interrogator to the Host Computer
        return response.data
 
    def inventory_multiple_start(self, count: int) -> Iterator[bytes]:
        """
        0x27 Multiple Inventory
        :param count: 0~65535
        :return: yield data
        """
        assert 0 <= count <= 65535, "Value must be between 0 and 65535, inclusive."
        data: bytes = b'\x22' + count.to_bytes(2, byteorder="big")
        command: Command = Command(CMD_INVENTORY_MULTI_START, data=data)
        self.__send_request(command)
        for _ in range(count):
            response: Response = Response(self.__get_response())
            if response.frame_type == b'\x01' and response.data == b'\x15':
                continue
            assert response.frame_type == b'\x02'
            yield response.data
 
    def inventory_multiple_stop(self) -> None:
        """
        0x28 Multiple Inventory
        :return:
        """
        command: Command = Command(CMD_INVENTORY_MULTI_STOP)
        self.__send_request(command)
        self.__get_response()
 
    def save_config(self, violate: bool) -> Response:
        data: int = 0x00 if violate else 0x01
        command: Command = Command(CMD_SAVE_CONFIG, data=data)
        self.__send_request(command)
        return Response(self.__get_response())
 
    def get_power(self) -> int:
        command: Command = Command(CMD_GET_POWER)
        self.__send_request(command)
        response: Response = Response(self.__get_response())
        return int(int.from_bytes(response.data, byteorder='big') / 100)
 
    def set_power(self, power: int) -> Response:
        power: int = power * 100
        data: bytes = power.to_bytes(2, byteorder="big")
        command: Command = Command(CMD_SET_POWER, data=data)
        self.__send_request(command)
        return Response(self.__get_response())
 
    def set_baud_rate(self, baud_rate: int) -> None:
        baud_rate: int = int(baud_rate / 100)
        data: bytes = baud_rate.to_bytes(2, byteorder="big")
        command: Command = Command(CMD_SET_BAUD_RATE, data=data)
        self.__send_request(command)
 
    def get_select(self) -> SelectParameter:
        command: Command = Command(CMD_GET_SELECT)
        self.__send_request(command)
 
        response: Response = Response(self.__get_response())
        return SelectParameter.parse(response.data)
 
    def set_select(self, parameter: SelectParameter) -> Response:
        payload_data: bytes = parameter.serialize()
        command: Command = Command(CMD_SET_SELECT, data=payload_data)
 
        self.__send_request(command)
 
        return Response(self.__get_response())
 
    def read_tag_memory(
            self,
            memory_bank: int,
            start_address: int, # In word
            length: int, # In word
            access_password: bytes = bytes(4),
    ) -> ReadMemoryResponse:
        payload_data: bytearray = bytearray()
        payload_data.extend(access_password)
        payload_data.append(memory_bank)
        payload_data.extend(start_address.to_bytes(2, byteorder="big"))
        payload_data.extend(length.to_bytes(2, byteorder="big"))
        command: Command = Command(CMD_READ_MEMORY, data=payload_data)
        self.__send_request(command)
 
        return ReadMemoryResponse(self.__get_response())
 
    def write_tag_memory(
            self,
            memory_bank: int,
            start_address: int, # In word
            data: bytes,
            access_password: bytes = bytes(4),
    ) -> WriteMemoryResponse:
        payload_data: bytearray = bytearray()
        payload_data.extend(access_password)
        payload_data.append(memory_bank)
        payload_data.extend(start_address.to_bytes(2, byteorder="big"))
        payload_data.extend(int(len(data) / 2).to_bytes(2, byteorder="big"))
        payload_data.extend(data)
        command: Command = Command(CMD_WRITE_MEMORY, data=payload_data)
        self.__send_request(command)
 
        return WriteMemoryResponse(self.__get_response())

4. main.py

File ini yang akan dijalankan pertama kali python main.py, kita akan memanggil class-class yang sudah dibuat.

Uncomment fungsi/kode yang ingin digunakan.

from time import sleep
from typing import Iterator
 
from response import hex_readable, Response, SelectParameter, ReadMemoryResponse, WriteMemoryResponse
from reader import Reader
 
# Windows: Replace '/dev/ttyUSB0' to 'COM1' (check Device Manager)
reader = Reader('/dev/ttyUSB0', 115200)
 
# # 1. Inventory - Single
tag: bytes | None = reader.inventory_single()
if isinstance(tag, bytes):
    print(hex_readable(tag))
 
# # 2. Inventory - Multi
# count: int = 100
# try:
#     tags: Iterator[bytes] = reader.inventory_multiple_start(count)
#     for tag in tags:
#         print(hex_readable(tag))
# except KeyboardInterrupt:
#     reader.inventory_multiple_stop()
 
# # 3. 0xB7 Power
# response: Response = reader.set_power(21) # Must call save_config() for stored config in the module
# power: int = reader.get_power()
# print(f"Power: {power}")
 
# # 4. 0x11 Set baud rate
# NEW_BAUD_RATE: int = 115200
# reader.set_baud_rate(NEW_BAUD_RATE)
# reader.close()
# reader = Reader(SERIAL_PORT, NEW_BAUD_RATE)
# response_save_config: Response = reader.save_config(violate=False)
# print(response_save_config)
 
# 5. 0x0C Set Select (for read & write tag memory)
# Example: Select EPC: 11 22 33 44 55 66 77 88 99 00 AA BB
# mask: bytes = bytearray([0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0x00, 0xAA, 0xBB])
# select_parameter = SelectParameter(
#     memory_bank=1,    # EPC
#     mask=mask,        # EPC value
#     start_address=32, # Start mask address (bit): 32 (or word 2)
# )
# response_set_select: Response = reader.set_select(parameter=select_parameter)
# # print(response_set_select)
#
# # 6. 0x0B Get Select
# response_get_select: SelectParameter = reader.get_select()
# print(response_get_select)
#
# # 7. 0x39 Read Memory (should run 0x0C first to select specific tag)
# for _ in range(20):
#     try:
#         response_read_memory: ReadMemoryResponse = reader.read_tag_memory(
#             memory_bank=2,   # TID
#             start_address=0, # Start TID address (word): 0
#             length=3,        # Read 3 word
#         )
#         print(response_read_memory)
#         break
#     except RuntimeError as e:
#         print(f"Error: {e} Retrying...")
#     sleep(0.1)
#
# # 8. 0x49 Write Memory (should run 0x0C first to select specific tag)
# # Example: Memory bank: EPC
# # From: 11 22 33 44 55 66 77 88 99 00 AA BB
# # Become: "FF EE DD CC" 55 66 77 88 99 00 AA BB
# response_write_memory: WriteMemoryResponse = reader.write_tag_memory(
#             memory_bank=1,   # EPC
#             start_address=2, # Start EPC address (word): 2
#             data=bytearray([0xFF, 0xEE, 0xDD, 0xCC]),
#         )
# print(response_write_memory)
 
 
reader.close()

Video