Compare commits

...

21 Commits

Author SHA1 Message Date
47c0236129 update image in readme 2025-12-06 23:41:16 +01:00
56aa4a2721 Merge pull request 'Add Checkbox connection type (SATA/NVME) to label' (#22) from feature/drive-type-marker into main
Reviewed-on: #22
2025-12-06 23:38:30 +01:00
731a00a644 reHDDPrinter reads and handle conn type 2025-12-06 23:29:05 +01:00
b85ff21648 add connection_type to dummies 2025-12-06 23:16:33 +01:00
b545f9e326 Merge branch 'main' into feature/drive-type-marker 2025-12-06 23:06:05 +01:00
5853eca6d5 Merge pull request 'Dummy sender and receiver for IPC queue' (#21) from feature/ipc-dummy into main
Reviewed-on: #21
2025-12-06 23:04:55 +01:00
6a747cb127 fix creation of queue on dummy 2025-12-06 23:04:00 +01:00
da4fbd09f0 ipc api 2025-12-05 22:17:05 +01:00
b3eaafbbe5 print checkbox 2025-12-05 21:41:52 +01:00
6dda384428 add drive type to api and data model 2025-12-05 21:21:18 +01:00
d6c4b24149 fix ipc api 2025-06-22 15:08:54 +02:00
c10b1cd342 Merge pull request 'Shrink label size to allow support for nvme drives' (#19) from feature/nvme-label into main
Reviewed-on: #19
2025-06-22 13:34:16 +02:00
e06d10caee shorten qrcode data 2025-06-22 13:32:56 +02:00
880ff8c6e3 increase font size 2025-06-22 12:56:40 +02:00
6ec782609c shrink more 2025-06-22 12:54:48 +02:00
cbf4c3c273 shrink label 2025-06-22 12:43:13 +02:00
954bed9e56 Merge pull request 'wait for IPC creation' (#18) from fix/ipc-startup into main
Reviewed-on: #18
2025-06-15 18:36:41 +02:00
ad2da922db wait for IPC creation 2025-06-15 18:35:33 +02:00
6b3dee9864 Merge pull request 'feature/native-ipc' (#17) from feature/native-ipc into main
Reviewed-on: #17
2025-06-15 18:05:02 +02:00
211bf80b91 use libc IPC 2025-06-15 18:03:51 +02:00
0b94ac2a3b try native ipc 2025-06-08 22:59:52 +02:00
9 changed files with 706 additions and 141 deletions

1
.gitignore vendored
View File

@ -174,3 +174,4 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
dummy_sender/dummy_sender

View File

@ -4,11 +4,11 @@
- Receive data from [reHDD](https://git.mosad.xyz/localhorst/reHDD) via IPC message queue.
## Example Label
![Screenshot of label](https://git.mosad.xyz/localhorst/reHDDPrinter/raw/commit/977baf27db7d7460cac0be9eea6b545afbd118d0/output.png "Example Label ")
![Screenshot of label](https://git.mosad.xyz/localhorst/reHDDPrinter/raw/branch/main/output.png "Example Label ")
## Install ##
`pip install qrcode sysv-ipc pycstruct brother-ql`
`pip install qrcode brother-ql`
```
cd /root/
@ -37,5 +37,20 @@ systemctl enable --now /lib/systemd/system/reHDDPrinter.service
see https://github.com/pklaus/brother_ql for details for printer access
## Test IPC msg queue
### Dummy Sender
```
cd dummy_sender
clear && g++ -Wall main.cpp -o dummy_sender
clear && ./dummy_sender
```
### Dummy Receiver
```
clear && python ./dummy_receiver.py
```
### Clear IPC mgs queue
```
clear && bash ./cleanup_queues.py
```

16
cleanup_queue.sh Normal file
View File

@ -0,0 +1,16 @@
#!/bin/bash
# Cleanup script to remove the IPC message queue
MSG_QUEUE_KEY="0x1b11193c0"
echo "Removing message queue with key: $MSG_QUEUE_KEY"
ipcrm -Q $MSG_QUEUE_KEY 2>/dev/null
if [ $? -eq 0 ]; then
echo "Message queue removed successfully"
else
echo "No message queue found or already removed"
fi
echo "Current message queues:"
ipcs -q

View File

@ -1,53 +1,188 @@
import sysv_ipc #pip install sysv-ipc
import pycstruct #pip install pycstruct
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Author: Hendrik Schutter, localhorst@mosad.xyz
Date of creation: 2025/12/05
Date of last modification: 2025/12/05
"""
import ctypes
import os
import time
import argparse
# Constants
STR_BUFFER_SIZE = 64
MSG_QUEUE_KEY = 0x1B11193C0
IPC_CREAT = 0o1000
terminate = False
class TDriveData(ctypes.Structure):
_fields_ = [
("caDriveIndex", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveHours", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveCycles", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveErrors", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveShredTimestamp", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveShredDuration", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveCapacity", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveState", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveConnectionType", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveModelFamily", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveModelName", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveSerialnumber", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveReHddVersion", ctypes.c_char * STR_BUFFER_SIZE),
]
str_buffer_size = 64
msg_queue_key = 0x1B11193C0
class TMsgQueueData(ctypes.Structure):
_fields_ = [
("msg_queue_type", ctypes.c_long),
("driveData", TDriveData),
]
try:
mq = sysv_ipc.MessageQueue(msg_queue_key, sysv_ipc.IPC_CREAT)
# IPC bindings - enable errno support
libc = ctypes.CDLL("libc.so.6", use_errno=True)
msgget = libc.msgget
msgrcv = libc.msgrcv
msgget.argtypes = [ctypes.c_int, ctypes.c_int]
msgget.restype = ctypes.c_int
msgrcv.argtypes = [
ctypes.c_int,
ctypes.POINTER(TMsgQueueData),
ctypes.c_size_t,
ctypes.c_long,
ctypes.c_int,
]
msgrcv.restype = ctypes.c_ssize_t
def create_drive_objects(drive_info):
"""Convert dictionary to layouter-compatible DriveData and ReHddInfo objects"""
drive = layouter.DriveData(
drive_index=int(drive_info["driveIndex"]),
drive_state=drive_info["driveState"],
modelfamily=drive_info["driveModelFamily"],
modelname=drive_info["driveModelName"],
capacity=int(drive_info["driveCapacity"]),
serialnumber=drive_info["driveSerialnumber"],
power_on_hours=int(drive_info["driveHours"]),
power_cycle=int(drive_info["driveCycles"]),
smart_error_count=int(drive_info["driveErrors"]),
shred_timestamp=int(drive_info["driveShredTimestamp"]),
shred_duration=int(drive_info["driveShredDuration"]),
connection_type=drive_info["driveConnectionType"],
)
rehdd_info = layouter.ReHddInfo(
link="https://git.mosad.xyz/localhorst/reHDD",
version=drive_info["driveReHddVersion"],
)
return drive, rehdd_info
def worker(queue_id, test_mode=False):
try:
while not terminate:
time.sleep(3)
if test_mode:
drive_info = {
"driveIndex": "42",
"driveHours": 44,
"driveCycles": 45,
"driveErrors": 43,
"driveShredTimestamp": int(time.time()),
"driveShredDuration": 0,
"driveCapacity": 42,
"driveState": "shredded",
"driveConnectionType": "sata",
"driveModelFamily": "modelFamily",
"driveModelName": "modelName",
"driveSerialnumber": "serial",
"driveReHddVersion": "V1.1.2",
}
else:
msg = TMsgQueueData()
print("Waiting for message from queue...")
# Calculate message size - must match C++ side: sizeof(t_msgQueueData) - sizeof(long)
# This is the size of the data portion (excluding msg_queue_type)
msg_size = ctypes.sizeof(TMsgQueueData) - ctypes.sizeof(ctypes.c_long)
print(f"Message size to receive: {msg_size} bytes")
result = msgrcv(
queue_id,
ctypes.byref(msg),
msg_size,
0, # msg type (0 = get first message)
0, # flags (0 = blocking)
)
if result == -1:
err = ctypes.get_errno()
print(
f"Error reading from message queue: {os.strerror(err)} (errno: {err})"
)
break
print(f"Received {result} bytes from queue")
d = msg.driveData
drive_info = {
"driveIndex": d.caDriveIndex.decode().strip("\x00"),
"driveHours": int(d.caDriveHours.decode().strip("\x00")),
"driveCycles": int(d.caDriveCycles.decode().strip("\x00")),
"driveErrors": int(d.caDriveErrors.decode().strip("\x00")),
"driveShredTimestamp": int(
d.caDriveShredTimestamp.decode().strip("\x00")
),
"driveShredDuration": int(
d.caDriveShredDuration.decode().strip("\x00")
),
"driveCapacity": int(d.caDriveCapacity.decode().strip("\x00")),
"driveState": d.caDriveState.decode().strip("\x00"),
"driveConnectionType": d.caDriveConnectionType.decode().strip("\x00"),
"driveModelFamily": d.caDriveModelFamily.decode().strip("\x00"),
"driveModelName": d.caDriveModelName.decode().strip("\x00"),
"driveSerialnumber": d.caDriveSerialnumber.decode().strip("\x00"),
"driveReHddVersion": d.caDriveReHddVersion.decode().strip("\x00"),
}
print(f"Received Drive Data: {drive_info}")
except Exception as e:
import traceback
print(f"Worker encountered an error: {e}")
traceback.print_exc()
def main():
while True:
message, mtype = mq.receive()
print("")
#print("*** New message received ***")
# print(f"Raw message: {message}")
try:
# Create or connect to the message queue with IPC_CREAT flag
# This matches the C++ sender's flags (IPC_CREAT | 0666)
queue_id = msgget(MSG_QUEUE_KEY, IPC_CREAT | 0o666)
if queue_id == -1:
err = ctypes.get_errno()
raise RuntimeError(
f"Failed to create/connect to the message queue: {os.strerror(err)}"
)
#uint8_t u8DriveIndex;
#uint32_t u32DriveHours;
#uint32_t u32DriveCycles;
#uint32_t u32DriveError;
#uint64_t u64DriveShredTimestamp;
#uint64_t u64DriveShredDuration;
#uint64_t u64DriveCapacity;
#char caDriveState[STR_BUFFER_SIZE];
#char caDriveModelFamiliy[STR_BUFFER_SIZE];
#char caDriveModelName[STR_BUFFER_SIZE];
#char caDriveSerialnumber[STR_BUFFER_SIZE];
driveData = pycstruct.StructDef()
driveData.add('utf-8', 'driveIndex', length=str_buffer_size)
driveData.add('utf-8', 'driveHours', length=str_buffer_size)
driveData.add('utf-8', 'driveCycles', length=str_buffer_size)
driveData.add('utf-8', 'driveErrors', length=str_buffer_size)
driveData.add('utf-8', 'driveShredTimestamp', length=str_buffer_size)
driveData.add('utf-8', 'driveShredDuration', length=str_buffer_size)
driveData.add('utf-8', 'driveCapacity', length=str_buffer_size)
driveData.add('utf-8', 'driveState', length=str_buffer_size)
driveData.add('utf-8', 'driveModelFamiliy', length=str_buffer_size)
driveData.add('utf-8', 'driveModelModel', length=str_buffer_size)
driveData.add('utf-8', 'driveSerialnumber', length=str_buffer_size)
driveData.add('utf-8', 'driveReHddVersion', length=str_buffer_size)
# Dictionary representation
result = driveData.deserialize(message)
print('Dictionary object:')
print(str(result))
except sysv_ipc.ExistentialError:
print("ERROR: message queue creation failed")
print(f"Successfully connected to message queue (ID: {queue_id})")
worker(queue_id)
except Exception as e:
import traceback
print(f"Main process encountered an error: {e}")
traceback.print_exc()
time.sleep(30)
if __name__ == "__main__":
main()

61
dummy_sender/main.cpp Normal file
View File

@ -0,0 +1,61 @@
/**
* @file main.cpp
* @brief Send drive data to printer service using ipc msg queue
* @author Hendrik Schutter
* @date 06.12.2025
*/
#include "main.h"
#define REHDD_VERSION "V99.99.99"
/**
* \brief app entry point
* \param void
* \return Status-Code
*/
int main(void)
{
int msqid;
std::cout << "Dummy sender for IPC queue" << std::endl;
if (-1 == (msqid = msgget((key_t)IPC_MSG_QUEUE_KEY, IPC_CREAT | 0666)))
{
std::cout << "Printer: Create msg queue failed! Error: " << strerror(errno) << std::endl;
return EXIT_FAILURE;
}
else
{
std::cout << "Printer: Created/connected to msg queue (ID: " << msqid << ")" << std::endl;
}
t_msgQueueData msgQueueData;
msgQueueData.msg_queue_type = 1;
sprintf(msgQueueData.driveData.caDriveIndex, "%i", 0);
sprintf(msgQueueData.driveData.caDriveState, "shredded");
strcpy(msgQueueData.driveData.caDriveModelFamily, "Toshiba 2.5 HDD MK..65GSSX");
strcpy(msgQueueData.driveData.caDriveModelName, "TOSHIBA MK3265GSDX");
sprintf(msgQueueData.driveData.caDriveCapacity, "%li", 343597383000LU);
strcpy(msgQueueData.driveData.caDriveSerialnumber, "YG6742U56UDRL123456789ABCDEFGJKL");
sprintf(msgQueueData.driveData.caDriveHours, "%i", 7074);
sprintf(msgQueueData.driveData.caDriveCycles, "%i", 4792);
sprintf(msgQueueData.driveData.caDriveErrors, "%i", 1);
sprintf(msgQueueData.driveData.caDriveShredTimestamp, "%li", 71718LU);
sprintf(msgQueueData.driveData.caDriveShredDuration, "%li", 81718LU);
strcpy(msgQueueData.driveData.caDriveConnectionType, "sata");
sprintf(msgQueueData.driveData.caDriveReHddVersion, REHDD_VERSION);
std::cout << "Sending message to queue..." << std::endl;
if (-1 == msgsnd(msqid, &msgQueueData, sizeof(t_msgQueueData) - sizeof(long), 0))
{
std::cout << "Printer: Send msg queue failed! Error: " << strerror(errno) << std::endl;
return EXIT_FAILURE;
}
else
{
std::cout << "Printer: print triggered successfully" << std::endl;
}
return EXIT_SUCCESS;
}

45
dummy_sender/main.h Normal file
View File

@ -0,0 +1,45 @@
/**
* @file main.h
* @brief Send drive data to printer service using ipc msg queue
* @author Hendrik Schutter
* @date 06.12.2025
*/
#ifndef PRINTER_H_
#define PRINTER_H_
#include <sys/ipc.h>
#include <sys/msg.h>
#include <cstring>
#include <sstream>
#include <iostream>
#include <cerrno>
#define STR_BUFFER_SIZE 64U
#define IPC_MSG_QUEUE_KEY 0x1B11193C0
typedef struct
{
char caDriveIndex[STR_BUFFER_SIZE];
char caDriveHours[STR_BUFFER_SIZE];
char caDriveCycles[STR_BUFFER_SIZE];
char caDriveErrors[STR_BUFFER_SIZE];
char caDriveShredTimestamp[STR_BUFFER_SIZE];
char caDriveShredDuration[STR_BUFFER_SIZE];
char caDriveCapacity[STR_BUFFER_SIZE];
char caDriveState[STR_BUFFER_SIZE];
char caDriveConnectionType[STR_BUFFER_SIZE];
char caDriveModelFamily[STR_BUFFER_SIZE];
char caDriveModelName[STR_BUFFER_SIZE];
char caDriveSerialnumber[STR_BUFFER_SIZE];
char caDriveReHddVersion[STR_BUFFER_SIZE];
} t_driveData;
typedef struct
{
long msg_queue_type;
t_driveData driveData;
} t_msgQueueData;
#endif // PRINTER_H_

View File

@ -21,10 +21,9 @@ FONT_PATH = "/usr/share/fonts"
DEFAULT_FONT_REGULAR = "DejaVuSans.ttf"
DEFAULT_FONT_BOLD = "DejaVuSans-Bold.ttf"
OUTPUT_WIDTH = 696 # px
OUTPUT_HEIGHT = 300 # px
TEXT_X_OFFSET = 300 # px
QR_CODE_SIZE = 289 # px
OUTPUT_HEIGHT = 190 # px
TEXT_X_OFFSET = 190 # px
QR_CODE_SIZE = 179 # px
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
@ -35,6 +34,7 @@ logging.basicConfig(
class DriveData:
drive_index: int
drive_state: str
drive_connection_type: str
modelfamily: str
modelname: str
capacity: int
@ -46,8 +46,24 @@ class DriveData:
shred_duration: int
@dataclasses.dataclass
class DriveDataJson:
state: str
contype: str
fam: str
name: str
cap: int
sn: str
poh: int
pc: int
err: int
time: int
dur: int
@dataclasses.dataclass
class DriveDataPrintable:
connectiontype: str
modelfamily: str
modelname: str
capacity: str
@ -66,8 +82,8 @@ class ReHddInfo:
@dataclasses.dataclass
class DriveDataJson:
drive: DriveData
class QrDataJson:
drive: DriveDataJson
rehdd: ReHddInfo
@ -119,10 +135,11 @@ def cut_string(max_length, data, direction="end"):
def format_to_printable(drive):
return DriveDataPrintable(
drive.drive_connection_type,
cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelfamily), "end"),
cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelname), "end"),
cut_string(20, human_readable_capacity(drive.capacity), "end"),
cut_string(16, re.sub(r"[^a-zA-Z0-9.-_]", "", drive.serialnumber), "start"),
cut_string(20, re.sub(r"[^a-zA-Z0-9.-_]", "", drive.serialnumber), "start"),
cut_string(30, human_readable_power_on_hours(drive.power_on_hours), "end"),
cut_string(10, str(drive.power_cycle), "end"),
cut_string(10, str(drive.smart_error_count), "end"),
@ -136,45 +153,148 @@ def format_to_printable(drive):
cut_string(30, str(datetime.timedelta(seconds=drive.shred_duration)), "end"),
)
def draw_text(drawable, printable_data, font_regular, font_bold, font_bold_bigger):
"""Draws the formatted text onto the image."""
text_y_offset = 10
value_column_x_offset = 120
font_size = 20
"""Draws formatted text with Cycles and Errors on one row."""
y_start = 4
line_height = 26
label_x = TEXT_X_OFFSET
value_offset = 115
right_field_spacing = 200 # Horizontal gap between Cycles and Errors
x_capacity = 520
y_capacity = 142
x_connection_type = 600
y_connection_type = y_start
y_spacing_connection_type = 25
# Serial Number
drawable.text((label_x, y_start), "Serial:", fill=0, font=font_bold)
drawable.text(
(TEXT_X_OFFSET, text_y_offset), printable_data.serialnumber, (0), font=font_bold
(label_x + value_offset, y_start),
printable_data.serialnumber,
fill=0,
font=font_bold,
)
text_y_offset += 25
fields = [
("Family:", printable_data.modelfamily, font_regular),
("Model:", printable_data.modelname, font_regular),
("Hours:", printable_data.power_on_hours, font_regular),
("Cycles:", printable_data.power_cycle, font_regular),
("Errors:", printable_data.smart_error_count, font_regular),
("Shred on:", printable_data.shred_timestamp, font_regular),
("Duration:", printable_data.shred_duration, font_regular),
]
y1 = y_start + line_height
y2 = y1 + line_height
y3 = y2 + line_height
y4 = y3 + line_height
y5 = y4 + line_height
y6 = y5 + line_height
for label, value, font in fields:
drawable.text((TEXT_X_OFFSET, text_y_offset), label, fill=0, font=font_bold)
drawable.text(
(TEXT_X_OFFSET + value_column_x_offset, text_y_offset),
value,
fill=0,
font=font,
)
text_y_offset += 25
# Left-Aligned Fields (One per row)
drawable.text((label_x, y1), "Family:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y1),
printable_data.modelfamily,
fill=0,
font=font_regular,
)
drawable.text((label_x, y2), "Model:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y2),
printable_data.modelname,
fill=0,
font=font_regular,
)
drawable.text((label_x, y3), "Hours:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y3),
printable_data.power_on_hours,
fill=0,
font=font_regular,
)
# Cycles and Errors on the same line
drawable.text((label_x, y4), "Cycles:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y4),
printable_data.power_cycle,
fill=0,
font=font_regular,
)
drawable.text(
(TEXT_X_OFFSET, text_y_offset),
(label_x + right_field_spacing, y4), "Errors:", fill=0, font=font_bold
)
drawable.text(
(label_x + right_field_spacing + value_offset, y4),
printable_data.smart_error_count,
fill=0,
font=font_regular,
)
# Continue remaining fields
drawable.text((label_x, y5), "Shred on:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y5),
printable_data.shred_timestamp,
fill=0,
font=font_regular,
)
drawable.text((label_x, y6), "Duration:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y6),
printable_data.shred_duration,
fill=0,
font=font_regular,
)
# Capacity at the bottom
drawable.text(
(x_capacity, y_capacity),
printable_data.capacity,
(0),
fill=0,
font=font_bold_bigger,
)
if (printable_data.connectiontype == "sata"):
drawable.text(
(x_connection_type, y_connection_type),
"⬤ SATA",
fill=0,
font=font_regular,
)
drawable.text(
(x_connection_type, y_connection_type+y_spacing_connection_type),
"◯ NVME",
fill=0,
font=font_regular,
)
elif (printable_data.connectiontype == "nvme"):
drawable.text(
(x_connection_type, y_connection_type),
"◯ SATA",
fill=0,
font=font_regular,
)
drawable.text(
(x_connection_type, y_connection_type+y_spacing_connection_type),
"⬤ NVME",
fill=0,
font=font_regular,
)
else:
drawable.text(
(x_connection_type, y_connection_type),
"◯ SATA",
fill=0,
font=font_regular,
)
drawable.text(
(x_connection_type, y_connection_type+y_spacing_connection_type),
"◯ NVME",
fill=0,
font=font_regular,
)
def draw_qr_code(image, data):
"""
@ -199,7 +319,6 @@ def draw_qr_code(image, data):
region = (5, 5, 5 + QR_CODE_SIZE, 5 + QR_CODE_SIZE)
image.paste(qr_img, box=region)
def draw_outline(drawable, margin, width, output_width, output_height):
"""
Draws a rectangular outline on the drawable canvas.
@ -228,11 +347,25 @@ def draw_outline(drawable, margin, width, output_width, output_height):
for line in lines:
drawable.line(line, fill=0, width=width)
def generate_image(drive, rehdd_info, output_file):
"""Generates an image containing drive data and a QR code."""
try:
qr_data = json.dumps(dataclasses.asdict(DriveDataJson(drive, rehdd_info)))
drive_json = DriveDataJson(
state=drive.drive_state,
contype=drive.drive_connection_type,
fam=drive.modelfamily,
name=drive.modelname,
cap=drive.capacity,
sn=drive.serialnumber,
poh=drive.power_on_hours,
pc=drive.power_cycle,
err=drive.smart_error_count,
time=int(drive.shred_timestamp),
dur=drive.shred_duration,
)
qr_data = json.dumps(dataclasses.asdict(QrDataJson(drive_json, rehdd_info)))
printable_data = format_to_printable(drive)
except Exception as e:
logging.error(f"Error preparing data: {e}")
@ -243,9 +376,9 @@ def generate_image(drive, rehdd_info, output_file):
font_regular = ImageFont.truetype(find_font_path(DEFAULT_FONT_REGULAR), 20)
font_bold = ImageFont.truetype(find_font_path(DEFAULT_FONT_BOLD), 20)
font_bold_bigger = ImageFont.truetype(find_font_path(DEFAULT_FONT_BOLD), 60)
font_bold_bigger = ImageFont.truetype(find_font_path(DEFAULT_FONT_BOLD), 42)
draw_outline(draw, 1, 4, OUTPUT_WIDTH, OUTPUT_HEIGHT)
draw_outline(draw, 0, 3, OUTPUT_WIDTH + 1, OUTPUT_HEIGHT + 1)
draw_text(draw, printable_data, font_regular, font_bold, font_bold_bigger)
draw_qr_code(output_image, qr_data)
@ -261,10 +394,11 @@ def main():
temp_drive = DriveData(
drive_index=0,
drive_connection_type="sata",
drive_state="shredded",
modelfamily='Toshiba 2.5" HDD MK..65GSSX',
modelname="TOSHIBA MK3265GSDX",
capacity=343597383680,
capacity=343597383000,
serialnumber="YG6742U56UDRL123456789ABCDEFGJKL",
power_on_hours=7074,
power_cycle=4792,
@ -275,5 +409,6 @@ def main():
generate_image(temp_drive, rehdd_info, "output.png")
if __name__ == "__main__":
main()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.5 KiB

After

Width:  |  Height:  |  Size: 4.6 KiB

View File

@ -1,77 +1,234 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" Author: Hendrik Schutter, localhorst@mosad.xyz
Date of creation: 2022/11/23
Date of last modification: 2022/11/23
"""Author: Hendrik Schutter, localhorst@mosad.xyz
Date of creation: 2022/11/23
Date of last modification: 2025/06/15
"""
import sysv_ipc
import pycstruct
import ctypes
import os
import time
import signal
import argparse
import warnings
import logging
from PIL import Image
from brother_ql.brother_ql_create import create_label
from brother_ql.raster import BrotherQLRaster
import layouter
str_buffer_size = 64 #keep this synchronous to reHDD
msg_queue_key = 0x1B11193C0 #keep this synchronous to reHDD
# Suppress deprecation and printer warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
logging.getLogger("brother_ql").setLevel(logging.ERROR)
# Constants
STR_BUFFER_SIZE = 64
MSG_QUEUE_KEY = 0x1B11193C0
IPC_CREAT = 0o1000
file_name = "output.png"
printer_path = "/dev/usb/lp0"
def get_struct_format():
#keep this synchronous to struct in reHDD
driveData = pycstruct.StructDef()
driveData.add('utf-8', 'driveIndex', length=str_buffer_size)
driveData.add('utf-8', 'driveHours', length=str_buffer_size)
driveData.add('utf-8', 'driveCycles', length=str_buffer_size)
driveData.add('utf-8', 'driveErrors', length=str_buffer_size)
driveData.add('utf-8', 'driveShredTimestamp', length=str_buffer_size)
driveData.add('utf-8', 'driveShredDuration', length=str_buffer_size)
driveData.add('utf-8', 'driveCapacity', length=str_buffer_size)
driveData.add('utf-8', 'driveState', length=str_buffer_size)
driveData.add('utf-8', 'driveModelFamiliy', length=str_buffer_size)
driveData.add('utf-8', 'driveModelName', length=str_buffer_size)
driveData.add('utf-8', 'driveSerialnumber', length=str_buffer_size)
driveData.add('utf-8', 'driveReHddVersion', length=str_buffer_size)
return driveData
terminate = False
class TDriveData(ctypes.Structure):
_fields_ = [
("caDriveIndex", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveHours", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveCycles", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveErrors", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveShredTimestamp", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveShredDuration", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveCapacity", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveState", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveConnectionType", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveModelFamily", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveModelName", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveSerialnumber", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveReHddVersion", ctypes.c_char * STR_BUFFER_SIZE),
]
class TMsgQueueData(ctypes.Structure):
_fields_ = [
("msg_queue_type", ctypes.c_long),
("driveData", TDriveData),
]
# IPC bindings
libc = ctypes.CDLL("libc.so.6")
msgget = libc.msgget
msgrcv = libc.msgrcv
msgget.argtypes = [ctypes.c_int, ctypes.c_int]
msgget.restype = ctypes.c_int
msgrcv.argtypes = [
ctypes.c_int,
ctypes.POINTER(TMsgQueueData),
ctypes.c_size_t,
ctypes.c_long,
ctypes.c_int,
]
msgrcv.restype = ctypes.c_ssize_t
def signal_handler(signum, frame):
global terminate
print(f"Signal {signum} received, terminating...")
terminate = True
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def wait_for_printer():
while not os.path.exists(printer_path):
print("Printer not found, waiting ...")
time.sleep(30)
return True
def create_drive_objects(drive_info):
"""Convert dictionary to layouter-compatible DriveData and ReHddInfo objects"""
drive = layouter.DriveData(
drive_index=int(drive_info["driveIndex"]),
drive_state=drive_info["driveState"],
modelfamily=drive_info["driveModelFamily"],
modelname=drive_info["driveModelName"],
capacity=int(drive_info["driveCapacity"]),
serialnumber=drive_info["driveSerialnumber"],
power_on_hours=int(drive_info["driveHours"]),
power_cycle=int(drive_info["driveCycles"]),
smart_error_count=int(drive_info["driveErrors"]),
shred_timestamp=int(drive_info["driveShredTimestamp"]),
shred_duration=int(drive_info["driveShredDuration"]),
drive_connection_type=drive_info["driveConnectionType"],
)
rehdd_info = layouter.ReHddInfo(
link="https://git.mosad.xyz/localhorst/reHDD",
version=drive_info["driveReHddVersion"],
)
return drive, rehdd_info
def worker(queue_id, test_mode=False):
try:
while not terminate:
time.sleep(3)
if test_mode:
drive_info = {
"driveIndex": "42",
"driveHours": 44,
"driveCycles": 45,
"driveErrors": 43,
"driveShredTimestamp": int(time.time()),
"driveShredDuration": 0,
"driveCapacity": 42,
"driveState": "shredded",
"driveConnectionType": "sata",
"driveModelFamily": "modelFamily",
"driveModelName": "modelName",
"driveSerialnumber": "serial",
"driveConnectionType": "sata",
"driveReHddVersion": "V1.1.2",
}
else:
msg = TMsgQueueData()
result = msgrcv(
queue_id,
ctypes.byref(msg),
ctypes.sizeof(TMsgQueueData) - ctypes.sizeof(ctypes.c_long),
0,
0,
)
if result == -1:
err = ctypes.get_errno()
print(f"Error reading from message queue: {os.strerror(err)}")
break
d = msg.driveData
drive_info = {
"driveIndex": d.caDriveIndex.decode().strip("\x00"),
"driveHours": int(d.caDriveHours.decode().strip("\x00")),
"driveCycles": int(d.caDriveCycles.decode().strip("\x00")),
"driveErrors": int(d.caDriveErrors.decode().strip("\x00")),
"driveShredTimestamp": int(
d.caDriveShredTimestamp.decode().strip("\x00")
),
"driveShredDuration": int(
d.caDriveShredDuration.decode().strip("\x00")
),
"driveCapacity": int(d.caDriveCapacity.decode().strip("\x00")),
"driveState": d.caDriveState.decode().strip("\x00"),
"driveConnectionType": d.caDriveConnectionType.decode().strip(
"\x00"
),
"driveModelFamily": d.caDriveModelFamily.decode().strip("\x00"),
"driveModelName": d.caDriveModelName.decode().strip("\x00"),
"driveSerialnumber": d.caDriveSerialnumber.decode().strip("\x00"),
"driveConnectionType": d.caDriveConnectionType.decode().strip(
"\x00"
),
"driveReHddVersion": d.caDriveReHddVersion.decode().strip("\x00"),
}
print(f"Received Drive Data: {drive_info}")
drive_obj, rehdd_info = create_drive_objects(drive_info)
layouter.generate_image(drive_obj, rehdd_info, file_name)
if wait_for_printer():
qlr = BrotherQLRaster("QL-570")
image = Image.open(file_name)
create_label(qlr, image, "62")
with open(printer_path, "wb") as file:
file.write(qlr.data)
os.remove(file_name)
else:
print("Skipping printing due to printer unavailability.")
if test_mode:
break
except Exception as e:
print(f"Worker encountered an error: {e}")
def main():
try:
mq = sysv_ipc.MessageQueue(msg_queue_key, sysv_ipc.IPC_CREAT)
parser = argparse.ArgumentParser()
parser.add_argument(
"--test", action="store_true", help="Run in test mode with fake data"
)
args = parser.parse_args()
while True:
message, mtype = mq.receive()
driveData = get_struct_format().deserialize(message)
if args.test:
print("Running in test mode.")
worker(None, test_mode=True)
return
while True:
try:
# Create or connect to the message queue with IPC_CREAT flag
# This matches the C++ sender's flags (IPC_CREAT | 0666)
queue_id = msgget(MSG_QUEUE_KEY, IPC_CREAT | 0o666)
if queue_id == -1:
err = ctypes.get_errno()
raise RuntimeError(
f"Failed to create/connect to the message queue: {os.strerror(err)}"
)
rehdd_info = layouter.ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", driveData['driveReHddVersion'])
drive = layouter.DriveData(
drive_index=int(driveData['driveIndex']),\
drive_state=str(driveData['driveState']),\
modelfamiliy=str(driveData['driveModelFamiliy']),\
modelname=str(driveData['driveModelName']),\
capacity=int(driveData['driveCapacity']),\
serialnumber=str(driveData['driveSerialnumber']),\
power_on_hours=int(driveData['driveHours']),\
power_cycle=int(driveData['driveCycles']),\
smart_error_count=int(driveData['driveErrors']),\
shred_timestamp=int(driveData['driveShredTimestamp']),\
shred_duration=int(driveData['driveShredDuration']))
print(f"Successfully connected to message queue (ID: {queue_id})")
worker(queue_id)
except Exception as e:
print(f"Main process encountered an error: {e}")
time.sleep(30)
while(not os.path.exists(printer_path)):
print("Printer not found, waiting ...")
time.sleep(30) #sleep 30
layouter.generate_image(drive, rehdd_info, file_name)
qlr = BrotherQLRaster("QL-570")
create_label(qlr, file_name, '62')
with open(printer_path, 'wb') as file:
file.write(qlr.data)
os.remove(file_name)
except sysv_ipc.ExistentialError:
print("ERROR: message queue creation failed")
if __name__ == "__main__":
main()