EL-UHF-RMT01 Menggunakan Python
Komunikasi antara komputer dengan EL-UHF-RMT01 dapat dilakukan dengan bantuan USB to TTL agar bisa mengirim ataupun menerima data melalui usb di komputer, data yang dikirim ataupun diterima dapat diakses secara program. Pada artikel ini modul RFID scanner akan diakses dengan bahasa pemrograman Python.
Persiapan
Sebelum masuk ke bagian code, siapkan dulu beberapa hal berikut:
- Dokumentasi protokol
- Python versi 3.9 atau lebih baru
- Library pyserial (
pip install pyserial) - PyCharm IDE
1. Koneksi
Sambungan EL-UHF-RMT01 dengan USB to TTL.

EL-UHF-RMT01 perlu dihubungkan ke USB to TTL sebelum melakukan komunikasi dengan komputer.
| EL-UHF-RMT01 | USB to TTL |
|---|---|
| VCC | 5V |
| TX | RX |
| RX | TX |
| EN | 3.3V |
| GND | GND |
2. Kode dan Penjelasan
1. command.py
Class ini akan mempermudah proses parsing data byte yang akan dikirim ke reader. List command yang dapat digunakan dapat dilihat di dokumentasi protokol.
Pada class ini terdapat kode untuk menghitung checksum, checksum didapat dari penjumlahan byte frame mulai dari Type hingga byte terakhir dari Parameter (data), hanya mengambil byte LSB saja.
HEADER: bytes = b'\xBB'
END: bytes = b'\x7E'
CMD_INVENTORY_SINGLE: bytes = b'\x22'
CMD_INVENTORY_MULTI_START: bytes = b'\x27'
CMD_INVENTORY_MULTI_STOP: bytes = b'\x28'
CMD_SET_BAUD_RATE: bytes = b'\x11'
CMD_GET_POWER: bytes = b'\xB7'
CMD_SET_POWER: bytes = b'\xB6'
CMD_SAVE_CONFIG: bytes = b'\x09'
CMD_SET_SELECT: bytes = b'\x0C'
CMD_GET_SELECT: bytes = b'\x0B'
CMD_SET_SELECT_MODE: bytes = b'\x12'
CMD_READ_MEMORY: bytes = b'\x39'
CMD_WRITE_MEMORY: bytes = b'\x49'
class Command:
def __init__(self, command: bytes, data: bytes | int | None = None):
self.command: bytes = command
self.data: bytes | int | None = data
if isinstance(data, int):
self.data = bytearray([data])
if data is None:
self.data = bytearray()
def serialize(self) -> bytes:
# Calculate CRC
crc_sum: int = sum((bytearray([0x00]) + self.command
+ len(self.data).to_bytes(2, byteorder="big") + self.data))
crc: bytes = (crc_sum & 0xFF).to_bytes(1, byteorder="big")
serialize: bytes = (HEADER + b'\x00' + self.command
+ len(self.data).to_bytes(2, byteorder="big") + self.data + crc + END)
return serialize2. response.py
Class Response bertujuan untuk parsing dari data bytes menjadi 1 class frame response.
from typing import Optional
MEMORY_BANKS: dict[int, str] = {
0: "Reserved",
1: "EPC",
2: "TID",
3: "User"
}
def hex_readable(data: bytes | int, bytes_separator: str = " ") -> str:
if isinstance(data, int):
return "{:02X}".format(data)
return bytes_separator.join("{:02X}".format(x) for x in data)
class Response:
def __init__(self, response_bytes: bytes):
self.response_bytes: bytes = response_bytes
self.header: bytes = int.to_bytes(response_bytes[0], byteorder="big")
self.frame_type: bytes = int.to_bytes(response_bytes[1], byteorder="big")
self.command: bytes = int.to_bytes(response_bytes[2], byteorder="big")
self.data_length: int = int.from_bytes(response_bytes[3:5], "big")
self.data: bytes = response_bytes[5:-2]
self.checksum: bytes = int.to_bytes(response_bytes[-2], byteorder="big")
self.end: bytes = int.to_bytes(response_bytes[-1], byteorder="big")
def __str__(self) -> str:
return_value = ''
value = '>>> START RESPONSE ================================'
return_value = f'{return_value}\n{value}'
value = f'RESPONSE >> {hex_readable(self.response_bytes)}' # Response
return_value = f'{return_value}\n{value}'
value = f'HEADER >> {hex_readable(self.header)}' # Header
return_value = f'{return_value}\n{value}'
value = f'TYPE >> {hex_readable(self.frame_type)}' # Type
return_value = f'{return_value}\n{value}'
value = f'COMMAND >> {hex_readable(self.command)}' # Command
return_value = f'{return_value}\n{value}'
if self.data:
value = f'DATA >> {hex_readable(self.data)}' # Data
return_value = f'{return_value}\n{value}'
value = f'CHECKSUM (CRC) >> {hex_readable(self.checksum)}' # Checksum (CRC)
return_value = f'{return_value}\n{value}'
value = f'END >> {hex_readable(self.end)}' # End
return_value = f'{return_value}\n{value}'
value = '>>> END RESPONSE ================================'
return_value = f'{return_value}\n{value}'
return return_value.strip()
class SelectParameter:
def __init__(self,
memory_bank: int,
mask: bytes,
target: int = 0,
action: int = 0,
start_address: int = 32,
truncate: bool = False,
mask_bit_length: Optional[int] = None
) -> None:
"""
Select parameter.
Args:
memory_bank (int): 0=Reserved, 1=EPC, 2=TID, 3=User.
mask (bytes): Mask data
target (int): Target session (0-7). Default 0 (S0).
action (int): Action flag (0-7). Default 0 (000).
start_address (int): Start address. Default 32 (0x20).
truncate (bool): Truncate, default: False.
mask_bit_length (Optional[int]): Mask length. If None, get from len(mask) * 8.
"""
self.memory_bank: int = memory_bank
self.mask: bytes = mask
self.target: int = target
self.action: int = action
self.start_address: int = start_address
self.truncate: bool = truncate
if mask_bit_length is not None:
self.mask_bit_length: int = mask_bit_length
else:
self.mask_bit_length: int = len(mask) * 8
@property
def memory_bank_label(self) -> str:
return MEMORY_BANKS.get(self.memory_bank, "Unknown")
@classmethod
def parse(cls, data: bytes) -> 'SelectParameter':
if len(data) < 7:
raise ValueError(f"Data too short: {len(data)} bytes (min 7 required)")
# 2. Parse Byte 0 (SelParam)
b0: int = data[0]
target: int = (b0 >> 5) & 0b111
action: int = (b0 >> 2) & 0b111
membank: int = b0 & 0b11
# 3. Parse Pointer (Byte 1-4, Big Endian)
start_address: int = int.from_bytes(data[1:5], byteorder='big')
# 4. Parse Length & Truncate
mask_len_bits: int = data[5]
is_truncate: bool = (data[6] != 0)
# 5. Parse Mask
mask_byte_count: int = (mask_len_bits + 7) // 8
mask_bytes: bytes = data[7: 7 + mask_byte_count]
return cls(
memory_bank=membank,
mask=mask_bytes,
target=target,
action=action,
start_address=start_address,
truncate=is_truncate,
mask_bit_length=mask_len_bits
)
def serialize(self) -> bytearray:
data: bytearray = bytearray()
# 1. Select parameter
sel_params: int = (self.target << 5) | (self.action << 2) | self.memory_bank
data.append(sel_params)
# 2. Start address / pointer (4 bytes)
data.extend(self.start_address.to_bytes(4, byteorder='big'))
# 3. Mask Length (Bits)
data.append(self.mask_bit_length)
# 4. Truncate
data.append(0x80 if self.truncate else 0x00)
# 5. Mask Data
data.extend(self.mask)
return data
def __repr__(self) -> str:
return (
f"SelectParameter(\n"
f" [Byte 0] Memory bank: {self.memory_bank} ({self.memory_bank_label}), "
f"Target: {self.target}, Action: {self.action}\n"
f" [Byte 1-4] Start address (bit): {self.start_address} (0x{self.start_address:08X})\n"
f" [Byte 5] Mask length: {self.mask_bit_length} bits\n"
f" [Byte 6] Truncate: {self.truncate}\n"
f" [Byte 7+] Mask: {hex_readable(self.mask)}\n"
f")"
)
class ReadMemoryResponse(Response):
def __init__(self, response_bytes: bytes) -> None:
super().__init__(response_bytes)
if not self.data:
raise RuntimeError("No data")
# If error
if int.from_bytes(self.command) == 0xFF:
if self.data[0] == 0x09:
raise RuntimeError("The tag is not in range.")
if self.data[0] == 0x16:
raise RuntimeError("Wrong access password.")
if self.data[0] == 0xA3:
raise RuntimeError("The Tag memory location does not exist, is too small, "
"or the tag does not support the specified EPC length.")
pc_epc_length: int = self.data[0]
self.pc: bytes = self.data[1:3]
self.epc: bytes = self.data[3:pc_epc_length+1]
self.read_data: bytes = self.data[pc_epc_length+1:]
def __str__(self) -> str:
return (
f"Read Memory(\n"
f" - PC : {hex_readable(self.pc)}\n"
f" - EPC : {hex_readable(self.epc)}\n"
f" - Data: {hex_readable(self.read_data)}\n"
f")"
)
class WriteMemoryResponse(Response):
def __init__(self, response_bytes: bytes) -> None:
super().__init__(response_bytes)
if not self.data:
raise RuntimeError("No data")
# If error
if int.from_bytes(self.command) == 0xFF:
if self.data[0] == 0x10:
raise RuntimeError("The tag is not in range.")
if self.data[0] == 0x16:
raise RuntimeError("Wrong access password.")
if self.data[0] == 0xB3:
raise RuntimeError("The Tag memory location does not exist, is too small, "
"or the tag does not support the specified EPC length.")
pc_epc_length: int = self.data[0]
self.pc: bytes = self.data[1:3]
self.epc: bytes = self.data[3:pc_epc_length+1]
self.result: int = self.data[pc_epc_length+1]
if self.result != 0x00:
raise RuntimeError("The result is not 0x00 (write failed).")
def __str__(self) -> str:
return (
f"Write Memory(\n"
f" - PC : {hex_readable(self.pc)}\n"
f" - EPC : {hex_readable(self.epc)}\n"
f" - Result: {'Write successfully.' if self.result == 0x00 else 'Write failed.' }\n"
f")"
)3. reader.py
Pada class Reader terdapat fungsi:
close(): Close serial port__send_request(...)&__get_response(): Private method proses kirim dan terima byte dari/ke reader.inventory_single(): Mengambil 1 data tag terdekatinventory_multiple_start(...): Memulai inventory data tag apa saja (banyak) di dekat readerinventory_multiple_stop(): Mengakhiri inventory (multi)save_config(...): Simpan reader settingsget_power()&set_power(...): Atur jarak / kekuatan readerset_baud_rate(...): Atur baud rate
Silahkan tambahkan method sesuai yang diinginkan mengikuti dokumentasi protokol yang sudah ada.
from typing import Iterator
from command import *
from response import *
import serial
class Reader:
def __init__(self, serial_port: str, baud_rate: int) -> None:
self.serial_port: str = serial_port
self.baud_rate: int = baud_rate
self.serial = serial.Serial(serial_port, baud_rate,
timeout=0.5, write_timeout=0.5)
def close(self) -> None:
self.serial.close()
def __send_request(self, command: Command) -> None:
self.serial.write(command.serialize())
def __get_response(self) -> bytes:
header: bytes = self.serial.read(1)
assert header == HEADER # Must equal to default header (0xBB)
header_frame: bytes = self.serial.read(4)
data_length: int = int.from_bytes(header_frame[-2:], "big")
data: bytes = self.serial.read(data_length)
crc_end: bytes = self.serial.read(2)
end: bytes = int.to_bytes(crc_end[-1], byteorder="big")
assert end == END
complete_frame: bytes = header + header_frame + data + crc_end
return complete_frame
def inventory_single(self) -> bytes | None:
"""
0x22 Single Inventory
:return: Data or none (no tag)
"""
command: Command = Command(CMD_INVENTORY_SINGLE)
self.__send_request(command)
response: Response = Response(self.__get_response())
if response.frame_type == b'\x01' and response.data == b'\x15':
return
assert response.frame_type == b'\x02' # Frame type ➜ 0x01: Response from the Interrogator to the Host Computer
return response.data
def inventory_multiple_start(self, count: int) -> Iterator[bytes]:
"""
0x27 Multiple Inventory
:param count: 0~65535
:return: yield data
"""
assert 0 <= count <= 65535, "Value must be between 0 and 65535, inclusive."
data: bytes = b'\x22' + count.to_bytes(2, byteorder="big")
command: Command = Command(CMD_INVENTORY_MULTI_START, data=data)
self.__send_request(command)
for _ in range(count):
response: Response = Response(self.__get_response())
if response.frame_type == b'\x01' and response.data == b'\x15':
continue
assert response.frame_type == b'\x02'
yield response.data
def inventory_multiple_stop(self) -> None:
"""
0x28 Multiple Inventory
:return:
"""
command: Command = Command(CMD_INVENTORY_MULTI_STOP)
self.__send_request(command)
self.__get_response()
def save_config(self, violate: bool) -> Response:
data: int = 0x00 if violate else 0x01
command: Command = Command(CMD_SAVE_CONFIG, data=data)
self.__send_request(command)
return Response(self.__get_response())
def get_power(self) -> int:
command: Command = Command(CMD_GET_POWER)
self.__send_request(command)
response: Response = Response(self.__get_response())
return int(int.from_bytes(response.data, byteorder='big') / 100)
def set_power(self, power: int) -> Response:
power: int = power * 100
data: bytes = power.to_bytes(2, byteorder="big")
command: Command = Command(CMD_SET_POWER, data=data)
self.__send_request(command)
return Response(self.__get_response())
def set_baud_rate(self, baud_rate: int) -> None:
baud_rate: int = int(baud_rate / 100)
data: bytes = baud_rate.to_bytes(2, byteorder="big")
command: Command = Command(CMD_SET_BAUD_RATE, data=data)
self.__send_request(command)
def get_select(self) -> SelectParameter:
command: Command = Command(CMD_GET_SELECT)
self.__send_request(command)
response: Response = Response(self.__get_response())
return SelectParameter.parse(response.data)
def set_select(self, parameter: SelectParameter) -> Response:
payload_data: bytes = parameter.serialize()
command: Command = Command(CMD_SET_SELECT, data=payload_data)
self.__send_request(command)
return Response(self.__get_response())
def read_tag_memory(
self,
memory_bank: int,
start_address: int, # In word
length: int, # In word
access_password: bytes = bytes(4),
) -> ReadMemoryResponse:
payload_data: bytearray = bytearray()
payload_data.extend(access_password)
payload_data.append(memory_bank)
payload_data.extend(start_address.to_bytes(2, byteorder="big"))
payload_data.extend(length.to_bytes(2, byteorder="big"))
command: Command = Command(CMD_READ_MEMORY, data=payload_data)
self.__send_request(command)
return ReadMemoryResponse(self.__get_response())
def write_tag_memory(
self,
memory_bank: int,
start_address: int, # In word
data: bytes,
access_password: bytes = bytes(4),
) -> WriteMemoryResponse:
payload_data: bytearray = bytearray()
payload_data.extend(access_password)
payload_data.append(memory_bank)
payload_data.extend(start_address.to_bytes(2, byteorder="big"))
payload_data.extend(int(len(data) / 2).to_bytes(2, byteorder="big"))
payload_data.extend(data)
command: Command = Command(CMD_WRITE_MEMORY, data=payload_data)
self.__send_request(command)
return WriteMemoryResponse(self.__get_response())4. main.py
File ini yang akan dijalankan pertama kali python main.py, kita akan memanggil class-class yang sudah dibuat.
Uncomment fungsi/kode yang ingin digunakan.
from time import sleep
from typing import Iterator
from response import hex_readable, Response, SelectParameter, ReadMemoryResponse, WriteMemoryResponse
from reader import Reader
# Windows: Replace '/dev/ttyUSB0' to 'COM1' (check Device Manager)
reader = Reader('/dev/ttyUSB0', 115200)
# # 1. Inventory - Single
tag: bytes | None = reader.inventory_single()
if isinstance(tag, bytes):
print(hex_readable(tag))
# # 2. Inventory - Multi
# count: int = 100
# try:
# tags: Iterator[bytes] = reader.inventory_multiple_start(count)
# for tag in tags:
# print(hex_readable(tag))
# except KeyboardInterrupt:
# reader.inventory_multiple_stop()
# # 3. 0xB7 Power
# response: Response = reader.set_power(21) # Must call save_config() for stored config in the module
# power: int = reader.get_power()
# print(f"Power: {power}")
# # 4. 0x11 Set baud rate
# NEW_BAUD_RATE: int = 115200
# reader.set_baud_rate(NEW_BAUD_RATE)
# reader.close()
# reader = Reader(SERIAL_PORT, NEW_BAUD_RATE)
# response_save_config: Response = reader.save_config(violate=False)
# print(response_save_config)
# 5. 0x0C Set Select (for read & write tag memory)
# Example: Select EPC: 11 22 33 44 55 66 77 88 99 00 AA BB
# mask: bytes = bytearray([0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0x00, 0xAA, 0xBB])
# select_parameter = SelectParameter(
# memory_bank=1, # EPC
# mask=mask, # EPC value
# start_address=32, # Start mask address (bit): 32 (or word 2)
# )
# response_set_select: Response = reader.set_select(parameter=select_parameter)
# # print(response_set_select)
#
# # 6. 0x0B Get Select
# response_get_select: SelectParameter = reader.get_select()
# print(response_get_select)
#
# # 7. 0x39 Read Memory (should run 0x0C first to select specific tag)
# for _ in range(20):
# try:
# response_read_memory: ReadMemoryResponse = reader.read_tag_memory(
# memory_bank=2, # TID
# start_address=0, # Start TID address (word): 0
# length=3, # Read 3 word
# )
# print(response_read_memory)
# break
# except RuntimeError as e:
# print(f"Error: {e} Retrying...")
# sleep(0.1)
#
# # 8. 0x49 Write Memory (should run 0x0C first to select specific tag)
# # Example: Memory bank: EPC
# # From: 11 22 33 44 55 66 77 88 99 00 AA BB
# # Become: "FF EE DD CC" 55 66 77 88 99 00 AA BB
# response_write_memory: WriteMemoryResponse = reader.write_tag_memory(
# memory_bank=1, # EPC
# start_address=2, # Start EPC address (word): 2
# data=bytearray([0xFF, 0xEE, 0xDD, 0xCC]),
# )
# print(response_write_memory)
reader.close()