Compare commits

...

24 Commits

Author SHA1 Message Date
47c0236129 update image in readme 2025-12-06 23:41:16 +01:00
56aa4a2721 Merge pull request 'Add Checkbox connection type (SATA/NVME) to label' (#22) from feature/drive-type-marker into main
Reviewed-on: #22
2025-12-06 23:38:30 +01:00
731a00a644 reHDDPrinter reads and handle conn type 2025-12-06 23:29:05 +01:00
b85ff21648 add connection_type to dummies 2025-12-06 23:16:33 +01:00
b545f9e326 Merge branch 'main' into feature/drive-type-marker 2025-12-06 23:06:05 +01:00
5853eca6d5 Merge pull request 'Dummy sender and receiver for IPC queue' (#21) from feature/ipc-dummy into main
Reviewed-on: #21
2025-12-06 23:04:55 +01:00
6a747cb127 fix creation of queue on dummy 2025-12-06 23:04:00 +01:00
da4fbd09f0 ipc api 2025-12-05 22:17:05 +01:00
b3eaafbbe5 print checkbox 2025-12-05 21:41:52 +01:00
6dda384428 add drive type to api and data model 2025-12-05 21:21:18 +01:00
d6c4b24149 fix ipc api 2025-06-22 15:08:54 +02:00
c10b1cd342 Merge pull request 'Shrink label size to allow support for nvme drives' (#19) from feature/nvme-label into main
Reviewed-on: #19
2025-06-22 13:34:16 +02:00
e06d10caee shorten qrcode data 2025-06-22 13:32:56 +02:00
880ff8c6e3 increase font size 2025-06-22 12:56:40 +02:00
6ec782609c shrink more 2025-06-22 12:54:48 +02:00
cbf4c3c273 shrink label 2025-06-22 12:43:13 +02:00
954bed9e56 Merge pull request 'wait for IPC creation' (#18) from fix/ipc-startup into main
Reviewed-on: #18
2025-06-15 18:36:41 +02:00
ad2da922db wait for IPC creation 2025-06-15 18:35:33 +02:00
6b3dee9864 Merge pull request 'feature/native-ipc' (#17) from feature/native-ipc into main
Reviewed-on: #17
2025-06-15 18:05:02 +02:00
211bf80b91 use libc IPC 2025-06-15 18:03:51 +02:00
0b94ac2a3b try native ipc 2025-06-08 22:59:52 +02:00
7b4dfebbdc cleanup 2025-06-08 20:21:42 +02:00
63ba1e8d1d Merge branch 'wait_for_printer' 2023-01-19 19:56:20 +01:00
5bf2ab8b2e cut SN at start 2023-01-19 19:54:34 +01:00
9 changed files with 882 additions and 248 deletions

1
.gitignore vendored
View File

@ -174,3 +174,4 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ #.idea/
dummy_sender/dummy_sender

View File

@ -4,11 +4,11 @@
- Receive data from [reHDD](https://git.mosad.xyz/localhorst/reHDD) via IPC message queue. - Receive data from [reHDD](https://git.mosad.xyz/localhorst/reHDD) via IPC message queue.
## Example Label ## Example Label
![Screenshot of label](https://git.mosad.xyz/localhorst/reHDDPrinter/raw/commit/977baf27db7d7460cac0be9eea6b545afbd118d0/output.png "Example Label ") ![Screenshot of label](https://git.mosad.xyz/localhorst/reHDDPrinter/raw/branch/main/output.png "Example Label ")
## Install ## ## Install ##
`pip install qrcode sysv-ipc pycstruct brother-ql` `pip install qrcode brother-ql`
``` ```
cd /root/ cd /root/
@ -37,5 +37,20 @@ systemctl enable --now /lib/systemd/system/reHDDPrinter.service
see https://github.com/pklaus/brother_ql for details for printer access see https://github.com/pklaus/brother_ql for details for printer access
## Test IPC msg queue
### Dummy Sender
```
cd dummy_sender
clear && g++ -Wall main.cpp -o dummy_sender
clear && ./dummy_sender
```
### Dummy Receiver
```
clear && python ./dummy_receiver.py
```
### Clear IPC mgs queue
```
clear && bash ./cleanup_queues.py
```

16
cleanup_queue.sh Normal file
View File

@ -0,0 +1,16 @@
#!/bin/bash
# Cleanup script to remove the IPC message queue
MSG_QUEUE_KEY="0x1b11193c0"
echo "Removing message queue with key: $MSG_QUEUE_KEY"
ipcrm -Q $MSG_QUEUE_KEY 2>/dev/null
if [ $? -eq 0 ]; then
echo "Message queue removed successfully"
else
echo "No message queue found or already removed"
fi
echo "Current message queues:"
ipcs -q

View File

@ -1,53 +1,188 @@
import sysv_ipc #pip install sysv-ipc #!/usr/bin/env python3
import pycstruct #pip install pycstruct # -*- coding: utf-8 -*-
"""Author: Hendrik Schutter, localhorst@mosad.xyz
Date of creation: 2025/12/05
Date of last modification: 2025/12/05
"""
import ctypes
import os
import time
import argparse
# Constants
STR_BUFFER_SIZE = 64
MSG_QUEUE_KEY = 0x1B11193C0
IPC_CREAT = 0o1000
terminate = False
class TDriveData(ctypes.Structure):
_fields_ = [
("caDriveIndex", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveHours", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveCycles", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveErrors", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveShredTimestamp", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveShredDuration", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveCapacity", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveState", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveConnectionType", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveModelFamily", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveModelName", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveSerialnumber", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveReHddVersion", ctypes.c_char * STR_BUFFER_SIZE),
]
str_buffer_size = 64 class TMsgQueueData(ctypes.Structure):
msg_queue_key = 0x1B11193C0 _fields_ = [
("msg_queue_type", ctypes.c_long),
("driveData", TDriveData),
]
try:
mq = sysv_ipc.MessageQueue(msg_queue_key, sysv_ipc.IPC_CREAT)
# IPC bindings - enable errno support
libc = ctypes.CDLL("libc.so.6", use_errno=True)
msgget = libc.msgget
msgrcv = libc.msgrcv
msgget.argtypes = [ctypes.c_int, ctypes.c_int]
msgget.restype = ctypes.c_int
msgrcv.argtypes = [
ctypes.c_int,
ctypes.POINTER(TMsgQueueData),
ctypes.c_size_t,
ctypes.c_long,
ctypes.c_int,
]
msgrcv.restype = ctypes.c_ssize_t
def create_drive_objects(drive_info):
"""Convert dictionary to layouter-compatible DriveData and ReHddInfo objects"""
drive = layouter.DriveData(
drive_index=int(drive_info["driveIndex"]),
drive_state=drive_info["driveState"],
modelfamily=drive_info["driveModelFamily"],
modelname=drive_info["driveModelName"],
capacity=int(drive_info["driveCapacity"]),
serialnumber=drive_info["driveSerialnumber"],
power_on_hours=int(drive_info["driveHours"]),
power_cycle=int(drive_info["driveCycles"]),
smart_error_count=int(drive_info["driveErrors"]),
shred_timestamp=int(drive_info["driveShredTimestamp"]),
shred_duration=int(drive_info["driveShredDuration"]),
connection_type=drive_info["driveConnectionType"],
)
rehdd_info = layouter.ReHddInfo(
link="https://git.mosad.xyz/localhorst/reHDD",
version=drive_info["driveReHddVersion"],
)
return drive, rehdd_info
def worker(queue_id, test_mode=False):
try:
while not terminate:
time.sleep(3)
if test_mode:
drive_info = {
"driveIndex": "42",
"driveHours": 44,
"driveCycles": 45,
"driveErrors": 43,
"driveShredTimestamp": int(time.time()),
"driveShredDuration": 0,
"driveCapacity": 42,
"driveState": "shredded",
"driveConnectionType": "sata",
"driveModelFamily": "modelFamily",
"driveModelName": "modelName",
"driveSerialnumber": "serial",
"driveReHddVersion": "V1.1.2",
}
else:
msg = TMsgQueueData()
print("Waiting for message from queue...")
# Calculate message size - must match C++ side: sizeof(t_msgQueueData) - sizeof(long)
# This is the size of the data portion (excluding msg_queue_type)
msg_size = ctypes.sizeof(TMsgQueueData) - ctypes.sizeof(ctypes.c_long)
print(f"Message size to receive: {msg_size} bytes")
result = msgrcv(
queue_id,
ctypes.byref(msg),
msg_size,
0, # msg type (0 = get first message)
0, # flags (0 = blocking)
)
if result == -1:
err = ctypes.get_errno()
print(
f"Error reading from message queue: {os.strerror(err)} (errno: {err})"
)
break
print(f"Received {result} bytes from queue")
d = msg.driveData
drive_info = {
"driveIndex": d.caDriveIndex.decode().strip("\x00"),
"driveHours": int(d.caDriveHours.decode().strip("\x00")),
"driveCycles": int(d.caDriveCycles.decode().strip("\x00")),
"driveErrors": int(d.caDriveErrors.decode().strip("\x00")),
"driveShredTimestamp": int(
d.caDriveShredTimestamp.decode().strip("\x00")
),
"driveShredDuration": int(
d.caDriveShredDuration.decode().strip("\x00")
),
"driveCapacity": int(d.caDriveCapacity.decode().strip("\x00")),
"driveState": d.caDriveState.decode().strip("\x00"),
"driveConnectionType": d.caDriveConnectionType.decode().strip("\x00"),
"driveModelFamily": d.caDriveModelFamily.decode().strip("\x00"),
"driveModelName": d.caDriveModelName.decode().strip("\x00"),
"driveSerialnumber": d.caDriveSerialnumber.decode().strip("\x00"),
"driveReHddVersion": d.caDriveReHddVersion.decode().strip("\x00"),
}
print(f"Received Drive Data: {drive_info}")
except Exception as e:
import traceback
print(f"Worker encountered an error: {e}")
traceback.print_exc()
def main():
while True: while True:
message, mtype = mq.receive() try:
print("") # Create or connect to the message queue with IPC_CREAT flag
#print("*** New message received ***") # This matches the C++ sender's flags (IPC_CREAT | 0666)
# print(f"Raw message: {message}") queue_id = msgget(MSG_QUEUE_KEY, IPC_CREAT | 0o666)
if queue_id == -1:
err = ctypes.get_errno()
raise RuntimeError(
f"Failed to create/connect to the message queue: {os.strerror(err)}"
)
#uint8_t u8DriveIndex; print(f"Successfully connected to message queue (ID: {queue_id})")
#uint32_t u32DriveHours; worker(queue_id)
#uint32_t u32DriveCycles; except Exception as e:
#uint32_t u32DriveError; import traceback
#uint64_t u64DriveShredTimestamp;
#uint64_t u64DriveShredDuration; print(f"Main process encountered an error: {e}")
#uint64_t u64DriveCapacity; traceback.print_exc()
#char caDriveState[STR_BUFFER_SIZE]; time.sleep(30)
#char caDriveModelFamiliy[STR_BUFFER_SIZE];
#char caDriveModelName[STR_BUFFER_SIZE];
#char caDriveSerialnumber[STR_BUFFER_SIZE];
driveData = pycstruct.StructDef()
driveData.add('utf-8', 'driveIndex', length=str_buffer_size)
driveData.add('utf-8', 'driveHours', length=str_buffer_size)
driveData.add('utf-8', 'driveCycles', length=str_buffer_size)
driveData.add('utf-8', 'driveErrors', length=str_buffer_size)
driveData.add('utf-8', 'driveShredTimestamp', length=str_buffer_size)
driveData.add('utf-8', 'driveShredDuration', length=str_buffer_size)
driveData.add('utf-8', 'driveCapacity', length=str_buffer_size)
driveData.add('utf-8', 'driveState', length=str_buffer_size)
driveData.add('utf-8', 'driveModelFamiliy', length=str_buffer_size)
driveData.add('utf-8', 'driveModelModel', length=str_buffer_size)
driveData.add('utf-8', 'driveSerialnumber', length=str_buffer_size)
driveData.add('utf-8', 'driveReHddVersion', length=str_buffer_size)
# Dictionary representation
result = driveData.deserialize(message)
print('Dictionary object:')
print(str(result))
except sysv_ipc.ExistentialError:
print("ERROR: message queue creation failed")
if __name__ == "__main__":
main()

61
dummy_sender/main.cpp Normal file
View File

@ -0,0 +1,61 @@
/**
* @file main.cpp
* @brief Send drive data to printer service using ipc msg queue
* @author Hendrik Schutter
* @date 06.12.2025
*/
#include "main.h"
#define REHDD_VERSION "V99.99.99"
/**
* \brief app entry point
* \param void
* \return Status-Code
*/
int main(void)
{
int msqid;
std::cout << "Dummy sender for IPC queue" << std::endl;
if (-1 == (msqid = msgget((key_t)IPC_MSG_QUEUE_KEY, IPC_CREAT | 0666)))
{
std::cout << "Printer: Create msg queue failed! Error: " << strerror(errno) << std::endl;
return EXIT_FAILURE;
}
else
{
std::cout << "Printer: Created/connected to msg queue (ID: " << msqid << ")" << std::endl;
}
t_msgQueueData msgQueueData;
msgQueueData.msg_queue_type = 1;
sprintf(msgQueueData.driveData.caDriveIndex, "%i", 0);
sprintf(msgQueueData.driveData.caDriveState, "shredded");
strcpy(msgQueueData.driveData.caDriveModelFamily, "Toshiba 2.5 HDD MK..65GSSX");
strcpy(msgQueueData.driveData.caDriveModelName, "TOSHIBA MK3265GSDX");
sprintf(msgQueueData.driveData.caDriveCapacity, "%li", 343597383000LU);
strcpy(msgQueueData.driveData.caDriveSerialnumber, "YG6742U56UDRL123456789ABCDEFGJKL");
sprintf(msgQueueData.driveData.caDriveHours, "%i", 7074);
sprintf(msgQueueData.driveData.caDriveCycles, "%i", 4792);
sprintf(msgQueueData.driveData.caDriveErrors, "%i", 1);
sprintf(msgQueueData.driveData.caDriveShredTimestamp, "%li", 71718LU);
sprintf(msgQueueData.driveData.caDriveShredDuration, "%li", 81718LU);
strcpy(msgQueueData.driveData.caDriveConnectionType, "sata");
sprintf(msgQueueData.driveData.caDriveReHddVersion, REHDD_VERSION);
std::cout << "Sending message to queue..." << std::endl;
if (-1 == msgsnd(msqid, &msgQueueData, sizeof(t_msgQueueData) - sizeof(long), 0))
{
std::cout << "Printer: Send msg queue failed! Error: " << strerror(errno) << std::endl;
return EXIT_FAILURE;
}
else
{
std::cout << "Printer: print triggered successfully" << std::endl;
}
return EXIT_SUCCESS;
}

45
dummy_sender/main.h Normal file
View File

@ -0,0 +1,45 @@
/**
* @file main.h
* @brief Send drive data to printer service using ipc msg queue
* @author Hendrik Schutter
* @date 06.12.2025
*/
#ifndef PRINTER_H_
#define PRINTER_H_
#include <sys/ipc.h>
#include <sys/msg.h>
#include <cstring>
#include <sstream>
#include <iostream>
#include <cerrno>
#define STR_BUFFER_SIZE 64U
#define IPC_MSG_QUEUE_KEY 0x1B11193C0
typedef struct
{
char caDriveIndex[STR_BUFFER_SIZE];
char caDriveHours[STR_BUFFER_SIZE];
char caDriveCycles[STR_BUFFER_SIZE];
char caDriveErrors[STR_BUFFER_SIZE];
char caDriveShredTimestamp[STR_BUFFER_SIZE];
char caDriveShredDuration[STR_BUFFER_SIZE];
char caDriveCapacity[STR_BUFFER_SIZE];
char caDriveState[STR_BUFFER_SIZE];
char caDriveConnectionType[STR_BUFFER_SIZE];
char caDriveModelFamily[STR_BUFFER_SIZE];
char caDriveModelName[STR_BUFFER_SIZE];
char caDriveSerialnumber[STR_BUFFER_SIZE];
char caDriveReHddVersion[STR_BUFFER_SIZE];
} t_driveData;
typedef struct
{
long msg_queue_type;
t_driveData driveData;
} t_msgQueueData;
#endif // PRINTER_H_

View File

@ -1,9 +1,10 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" Author: Hendrik Schutter, localhorst@mosad.xyz """
Date of creation: 2022/11/16 Author: Hendrik Schutter, localhorst@mosad.xyz
Date of last modification: 2022/11/23 Date of creation: 2022/11/16
Date of last modification: 2025/06/08
""" """
import re import re
@ -11,197 +12,400 @@ import dataclasses
import glob import glob
import datetime import datetime
import json import json
import logging
import qrcode import qrcode
from PIL import Image from PIL import Image, ImageFont, ImageDraw
from PIL import ImageFont
from PIL import ImageDraw # Constants
FONT_PATH = "/usr/share/fonts"
DEFAULT_FONT_REGULAR = "DejaVuSans.ttf"
DEFAULT_FONT_BOLD = "DejaVuSans-Bold.ttf"
OUTPUT_WIDTH = 696 # px
OUTPUT_HEIGHT = 190 # px
TEXT_X_OFFSET = 190 # px
QR_CODE_SIZE = 179 # px
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
@dataclasses.dataclass @dataclasses.dataclass
class DriveData: class DriveData:
drive_index: int drive_index: int
drive_state: str #none, deleted, shredded drive_state: str
modelfamiliy: str drive_connection_type: str
modelfamily: str
modelname: str modelname: str
capacity: int #in bytes capacity: int
serialnumber: str serialnumber: str
power_on_hours: int #in hours power_on_hours: int
power_cycle: int power_cycle: int
smart_error_count: int smart_error_count: int
shred_timestamp: int #unix timestamp shred_timestamp: int
shred_duration: int #in seconds shred_duration: int
@dataclasses.dataclass
class DriveDataJson:
state: str
contype: str
fam: str
name: str
cap: int
sn: str
poh: int
pc: int
err: int
time: int
dur: int
@dataclasses.dataclass @dataclasses.dataclass
class DriveDataPrintable: class DriveDataPrintable:
modelfamiliy: str #max lenght 25 connectiontype: str
modelname: str #max lenght 25 modelfamily: str
capacity: str #max lenght 25, in human-readable format with unit (GB/TB) modelname: str
serialnumber: str #max lenght 25 capacity: str
power_on_hours: str #max lenght 25, in hours and days and years serialnumber: str
power_cycle: str #max lenght 25 power_on_hours: str
smart_error_count: str #max lenght 25 power_cycle: str
shred_timestamp: str #max lenght 25, human-readable smart_error_count: str
shred_duration: str #max lenght 25, human-readable shred_timestamp: str
shred_duration: str
@dataclasses.dataclass @dataclasses.dataclass
class ReHddInfo: class ReHddInfo:
link: str link: str
version: str version: str
@dataclasses.dataclass @dataclasses.dataclass
class DriveDataJson: class QrDataJson:
drive: DriveData drive: DriveDataJson
rehdd: ReHddInfo rehdd: ReHddInfo
def get_font_path_regular():
path = "/usr/share/fonts"
files = glob.glob(path + "/**/DejaVuSans.ttf", recursive = True)
return files[0]
def get_font_path_bold(): def find_font_path(font_name):
path = "/usr/share/fonts" """Finds the full path of the specified font."""
files = glob.glob(path + "/**/DejaVuSans-Bold.ttf", recursive = True) try:
return files[0] files = glob.glob(f"{FONT_PATH}/**/{font_name}", recursive=True)
return files[0] if files else None
except Exception as e:
logging.error(f"Error locating font {font_name}: {e}")
return None
def human_readable_capacity_1024(size, decimal_places=0):
for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']: def human_readable_capacity(size, decimal_places=0, base=1000):
if size < 1024.0 or unit == 'PiB': """Converts bytes to a human-readable string."""
break units = (
size /= 1024.0 ["B", "KB", "MB", "GB", "TB", "PB"]
if base == 1000
else ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]
)
for unit in units:
if size < base or unit == units[-1]:
return f"{size:.{decimal_places}f} {unit}" return f"{size:.{decimal_places}f} {unit}"
size /= base
def human_readable_capacity_1000(size, decimal_places=0):
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
if size < 1000.0 or unit == 'PB':
break
size /= 1000.0
return f"{size:.{decimal_places}f} {unit}"
def human_readable_power_on_hours(hours, decimal_places=2): def human_readable_power_on_hours(hours):
return str(hours) + "h or " + str(int(hours/24)) + "d or " + str("{:.2f}".format(float(hours/24/365))) + "y" """Converts hours to human-readable string in hours, days, and years."""
return (
str(hours)
+ "h or "
+ str(int(hours / 24))
+ "d or "
+ str("{:.2f}".format(float(hours / 24 / 365)))
+ "y"
)
def cut_string(max_lenght, data):
if (len(data) > max_lenght): def cut_string(max_length, data, direction="end"):
return data[0:(max_lenght-4)] + " ..." """Trims a string to the maximum length, adding ellipses if necessary."""
else: if len(data) > max_length:
return (
f"{data[:max_length-4]} ..."
if direction == "end"
else f"... {data[-(max_length-4):]}"
)
return data return data
def format_to_printable(drive): def format_to_printable(drive):
return DriveDataPrintable( return DriveDataPrintable(
cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelfamiliy)),\ drive.drive_connection_type,
cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelname)),\ cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelfamily), "end"),
cut_string(20, human_readable_capacity_1000(drive.capacity)),\ cut_string(20, re.sub(r"[^a-zA-Z0-9. ]", "", drive.modelname), "end"),
cut_string(16, re.sub(r"[^a-zA-Z0-9.-_]", "", drive.serialnumber)),\ cut_string(20, human_readable_capacity(drive.capacity), "end"),
cut_string(30, human_readable_power_on_hours(drive.power_on_hours)),\ cut_string(20, re.sub(r"[^a-zA-Z0-9.-_]", "", drive.serialnumber), "start"),
cut_string(10, str(drive.power_cycle)),\ cut_string(30, human_readable_power_on_hours(drive.power_on_hours), "end"),
cut_string(10, str(drive.smart_error_count)),\ cut_string(10, str(drive.power_cycle), "end"),
cut_string(30, datetime.datetime.utcfromtimestamp(drive.shred_timestamp).strftime('%Y-%m-%d %H:%M:%S')),\ cut_string(10, str(drive.smart_error_count), "end"),
cut_string(30, str(datetime.timedelta(seconds = drive.shred_duration)))) cut_string(
30,
datetime.datetime.fromtimestamp(
drive.shred_timestamp, datetime.UTC
).strftime("%Y-%m-%d %H:%M:%S"),
"end",
),
cut_string(30, str(datetime.timedelta(seconds=drive.shred_duration)), "end"),
)
def draw_text(drawable, printable_data, text_x_offset): def draw_text(drawable, printable_data, font_regular, font_bold, font_bold_bigger):
try: """Draws formatted text with Cycles and Errors on one row."""
font_file_regular = get_font_path_regular() y_start = 4
font_file_bold = get_font_path_bold() line_height = 26
except Exception as ex: label_x = TEXT_X_OFFSET
print("unable to find font: " + str(ex)) value_offset = 115
return right_field_spacing = 200 # Horizontal gap between Cycles and Errors
x_capacity = 520
y_capacity = 142
x_connection_type = 600
y_connection_type = y_start
y_spacing_connection_type = 25
font_size = 20
text_y_offset = 10
text_y_offset_increment = 25
value_colum_x_offset = 120
drawable.text((text_x_offset, text_y_offset), printable_data.serialnumber,(0),font=ImageFont.truetype(font_file_bold, 30)) # Serial Number
text_y_offset += 40 drawable.text((label_x, y_start), "Serial:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y_start),
printable_data.serialnumber,
fill=0,
font=font_bold,
)
drawable.text((text_x_offset, text_y_offset), "Familiy: ",(0),font=ImageFont.truetype(font_file_bold, font_size)) y1 = y_start + line_height
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.modelfamiliy,(0),font=ImageFont.truetype(font_file_regular, font_size)) y2 = y1 + line_height
text_y_offset += text_y_offset_increment y3 = y2 + line_height
drawable.text((text_x_offset, text_y_offset), "Model: ",(0),font=ImageFont.truetype(font_file_bold, font_size)) y4 = y3 + line_height
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.modelname,(0),font=ImageFont.truetype(font_file_regular, font_size)) y5 = y4 + line_height
text_y_offset += text_y_offset_increment y6 = y5 + line_height
drawable.text((text_x_offset, text_y_offset), "Hours: " ,(0),font=ImageFont.truetype(font_file_bold, font_size))
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.power_on_hours,(0),font=ImageFont.truetype(font_file_regular, font_size))
text_y_offset += text_y_offset_increment
drawable.text((text_x_offset, text_y_offset), "Cycles: ",(0),font=ImageFont.truetype(font_file_bold, font_size))
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.power_cycle,(0),font=ImageFont.truetype(font_file_regular, font_size))
text_y_offset += text_y_offset_increment
drawable.text((text_x_offset, text_y_offset), "Errors: ", (0),font=ImageFont.truetype(font_file_bold, font_size))
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.smart_error_count,(0),font=ImageFont.truetype(font_file_regular, font_size))
text_y_offset += text_y_offset_increment
drawable.text((text_x_offset, text_y_offset), "Shred on: ",(0),font=ImageFont.truetype(font_file_bold, font_size))
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.shred_timestamp,(0),font=ImageFont.truetype(font_file_regular, font_size))
text_y_offset += text_y_offset_increment
drawable.text((text_x_offset, text_y_offset), "Duration: " ,(0),font=ImageFont.truetype(font_file_bold, font_size))
drawable.text((text_x_offset+value_colum_x_offset, text_y_offset), printable_data.shred_duration,(0),font=ImageFont.truetype(font_file_regular, font_size))
text_y_offset += text_y_offset_increment
drawable.text((text_x_offset, text_y_offset), printable_data.capacity,(0),font=ImageFont.truetype(font_file_bold, font_size*3)) # Left-Aligned Fields (One per row)
drawable.text((label_x, y1), "Family:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y1),
printable_data.modelfamily,
fill=0,
font=font_regular,
)
drawable.text((label_x, y2), "Model:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y2),
printable_data.modelname,
fill=0,
font=font_regular,
)
drawable.text((label_x, y3), "Hours:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y3),
printable_data.power_on_hours,
fill=0,
font=font_regular,
)
# Cycles and Errors on the same line
drawable.text((label_x, y4), "Cycles:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y4),
printable_data.power_cycle,
fill=0,
font=font_regular,
)
drawable.text(
(label_x + right_field_spacing, y4), "Errors:", fill=0, font=font_bold
)
drawable.text(
(label_x + right_field_spacing + value_offset, y4),
printable_data.smart_error_count,
fill=0,
font=font_regular,
)
# Continue remaining fields
drawable.text((label_x, y5), "Shred on:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y5),
printable_data.shred_timestamp,
fill=0,
font=font_regular,
)
drawable.text((label_x, y6), "Duration:", fill=0, font=font_bold)
drawable.text(
(label_x + value_offset, y6),
printable_data.shred_duration,
fill=0,
font=font_regular,
)
# Capacity at the bottom
drawable.text(
(x_capacity, y_capacity),
printable_data.capacity,
fill=0,
font=font_bold_bigger,
)
if (printable_data.connectiontype == "sata"):
drawable.text(
(x_connection_type, y_connection_type),
"⬤ SATA",
fill=0,
font=font_regular,
)
drawable.text(
(x_connection_type, y_connection_type+y_spacing_connection_type),
"◯ NVME",
fill=0,
font=font_regular,
)
elif (printable_data.connectiontype == "nvme"):
drawable.text(
(x_connection_type, y_connection_type),
"◯ SATA",
fill=0,
font=font_regular,
)
drawable.text(
(x_connection_type, y_connection_type+y_spacing_connection_type),
"⬤ NVME",
fill=0,
font=font_regular,
)
else:
drawable.text(
(x_connection_type, y_connection_type),
"◯ SATA",
fill=0,
font=font_regular,
)
drawable.text(
(x_connection_type, y_connection_type+y_spacing_connection_type),
"◯ NVME",
fill=0,
font=font_regular,
)
def draw_outline(drawable, margin, width, output_width, output_height):
#upper
drawable.line([(margin,margin), (output_width -margin ,margin)], fill=None, width=width, joint=None)
#left
drawable.line([(margin,margin), (margin ,output_height-margin)], fill=None, width=width, joint=None)
#right
drawable.line([(output_width-margin,margin), (output_width-margin ,output_height-margin)], fill=None, width=width, joint=None)
#lower
drawable.line([(margin,output_height - margin), (output_width-margin,output_height -margin)], fill=None, width=width, joint=None)
def draw_qr_code(image, data): def draw_qr_code(image, data):
qr_img = qrcode.make(data) """
qr_img.thumbnail((291, 291), Image.Resampling.LANCZOS) Draws a QR code on the provided image at a specific location.
image.paste(qr_img, (7, 7))
Parameters:
image (Image.Image): The target image.
data (str): The data to encode in the QR code.
"""
# Generate the QR code
qr_img = qrcode.make(data)
qr_img = qr_img.convert("1") # Ensure QR code is in binary (black/white)
# Remove white border
bbox = qr_img.getbbox() # Get the bounding box of the non-white area
qr_img = qr_img.crop(bbox)
# Resize to desired size
qr_img = qr_img.resize((QR_CODE_SIZE, QR_CODE_SIZE), Image.Resampling.LANCZOS)
# Paste the QR code onto the image
region = (5, 5, 5 + QR_CODE_SIZE, 5 + QR_CODE_SIZE)
image.paste(qr_img, box=region)
def draw_outline(drawable, margin, width, output_width, output_height):
"""
Draws a rectangular outline on the drawable canvas.
Parameters:
drawable (ImageDraw.Draw): The drawable canvas.
margin (int): The margin from the edges of the image.
width (int): The width of the outline.
output_width (int): The total width of the image.
output_height (int): The total height of the image.
"""
# Define the four corners of the rectangle, adjusting for line width
top_left = (margin, margin)
top_right = (output_width - margin - width, margin)
bottom_left = (margin, output_height - margin - width)
bottom_right = (output_width - margin - width, output_height - margin - width)
# Draw the outline lines with adjusted coordinates
lines = [
(top_left, top_right), # Top edge
(top_left, bottom_left), # Left edge
(top_right, bottom_right), # Right edge
(bottom_left, bottom_right), # Bottom edge
]
for line in lines:
drawable.line(line, fill=0, width=width)
def generate_image(drive, rehdd_info, output_file): def generate_image(drive, rehdd_info, output_file):
output_width = 696 #in px set by used paper """Generates an image containing drive data and a QR code."""
output_height = 300 #in px
text_x_offset= 300 #in px
qr_data = DriveDataJson(drive, rehdd_info)
try: try:
json_qr_daten = json.dumps(dataclasses.asdict(qr_data))
except Exception as ex:
print("unable to generate json: " + str(ex))
return
try: drive_json = DriveDataJson(
state=drive.drive_state,
contype=drive.drive_connection_type,
fam=drive.modelfamily,
name=drive.modelname,
cap=drive.capacity,
sn=drive.serialnumber,
poh=drive.power_on_hours,
pc=drive.power_cycle,
err=drive.smart_error_count,
time=int(drive.shred_timestamp),
dur=drive.shred_duration,
)
qr_data = json.dumps(dataclasses.asdict(QrDataJson(drive_json, rehdd_info)))
printable_data = format_to_printable(drive) printable_data = format_to_printable(drive)
except Exception as ex: except Exception as e:
print("unable to format data: " + str(ex)) logging.error(f"Error preparing data: {e}")
return return
#print(printable_data) output_image = Image.new("1", (OUTPUT_WIDTH, OUTPUT_HEIGHT), "white")
#create black and white (binary) image with white background
output_image = Image.new('1', (output_width, output_height), "white")
#create draw pane
draw = ImageDraw.Draw(output_image) draw = ImageDraw.Draw(output_image)
draw_outline(draw, 1, 4, output_width, output_height) font_regular = ImageFont.truetype(find_font_path(DEFAULT_FONT_REGULAR), 20)
draw_text(draw, printable_data, text_x_offset) font_bold = ImageFont.truetype(find_font_path(DEFAULT_FONT_BOLD), 20)
draw_qr_code(output_image, str(json_qr_daten).replace(" ", "")) font_bold_bigger = ImageFont.truetype(find_font_path(DEFAULT_FONT_BOLD), 42)
draw_outline(draw, 0, 3, OUTPUT_WIDTH + 1, OUTPUT_HEIGHT + 1)
draw_text(draw, printable_data, font_regular, font_bold, font_bold_bigger)
draw_qr_code(output_image, qr_data)
try:
output_image.save(output_file) output_image.save(output_file)
logging.info(f"Image saved to {output_file}")
except Exception as e:
logging.error(f"Error saving image: {e}")
def main(): def main():
rehdd_info = ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", "bV0.2.2")
rehdd_info = ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", "bV0.2.2") # read this from rehdd process
temp_drive = DriveData( temp_drive = DriveData(
drive_index=0,\ drive_index=0,
drive_state="shredded",\ drive_connection_type="sata",
modelfamiliy="Toshiba 2.5\\ HDD MK..65GSSX",\ drive_state="shredded",
modelname="TOSHIBA MK3265GSDX",\ modelfamily='Toshiba 2.5" HDD MK..65GSSX',
capacity=343597383680,\ modelname="TOSHIBA MK3265GSDX",
serialnumber="YG6742U56UDRL123",\ capacity=343597383000,
power_on_hours=7074,\ serialnumber="YG6742U56UDRL123456789ABCDEFGJKL",
power_cycle=4792,\ power_on_hours=7074,
smart_error_count=1,\ power_cycle=4792,
shred_timestamp=1647937421,\ smart_error_count=1,
shred_duration=81718) shred_timestamp=datetime.datetime.now(datetime.timezone.utc).timestamp(),
shred_duration=81718,
)
generate_image(temp_drive, rehdd_info, "output.png") generate_image(temp_drive, rehdd_info, "output.png")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.4 KiB

After

Width:  |  Height:  |  Size: 4.6 KiB

View File

@ -1,77 +1,234 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" Author: Hendrik Schutter, localhorst@mosad.xyz """Author: Hendrik Schutter, localhorst@mosad.xyz
Date of creation: 2022/11/23 Date of creation: 2022/11/23
Date of last modification: 2022/11/23 Date of last modification: 2025/06/15
""" """
import sysv_ipc import ctypes
import pycstruct
import os import os
import time import time
import signal
import argparse
import warnings
import logging
from PIL import Image
from brother_ql.brother_ql_create import create_label from brother_ql.brother_ql_create import create_label
from brother_ql.raster import BrotherQLRaster from brother_ql.raster import BrotherQLRaster
import layouter import layouter
str_buffer_size = 64 #keep this synchronous to reHDD # Suppress deprecation and printer warnings
msg_queue_key = 0x1B11193C0 #keep this synchronous to reHDD warnings.filterwarnings("ignore", category=DeprecationWarning)
logging.getLogger("brother_ql").setLevel(logging.ERROR)
# Constants
STR_BUFFER_SIZE = 64
MSG_QUEUE_KEY = 0x1B11193C0
IPC_CREAT = 0o1000
file_name = "output.png" file_name = "output.png"
printer_path = "/dev/usb/lp0" printer_path = "/dev/usb/lp0"
def get_struct_format(): terminate = False
#keep this synchronous to struct in reHDD
driveData = pycstruct.StructDef()
driveData.add('utf-8', 'driveIndex', length=str_buffer_size)
driveData.add('utf-8', 'driveHours', length=str_buffer_size)
driveData.add('utf-8', 'driveCycles', length=str_buffer_size)
driveData.add('utf-8', 'driveErrors', length=str_buffer_size)
driveData.add('utf-8', 'driveShredTimestamp', length=str_buffer_size)
driveData.add('utf-8', 'driveShredDuration', length=str_buffer_size)
driveData.add('utf-8', 'driveCapacity', length=str_buffer_size)
driveData.add('utf-8', 'driveState', length=str_buffer_size)
driveData.add('utf-8', 'driveModelFamiliy', length=str_buffer_size)
driveData.add('utf-8', 'driveModelName', length=str_buffer_size)
driveData.add('utf-8', 'driveSerialnumber', length=str_buffer_size)
driveData.add('utf-8', 'driveReHddVersion', length=str_buffer_size)
return driveData
def main():
try:
mq = sysv_ipc.MessageQueue(msg_queue_key, sysv_ipc.IPC_CREAT)
while True: class TDriveData(ctypes.Structure):
message, mtype = mq.receive() _fields_ = [
driveData = get_struct_format().deserialize(message) ("caDriveIndex", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveHours", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveCycles", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveErrors", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveShredTimestamp", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveShredDuration", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveCapacity", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveState", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveConnectionType", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveModelFamily", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveModelName", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveSerialnumber", ctypes.c_char * STR_BUFFER_SIZE),
("caDriveReHddVersion", ctypes.c_char * STR_BUFFER_SIZE),
]
rehdd_info = layouter.ReHddInfo("https://git.mosad.xyz/localhorst/reHDD", driveData['driveReHddVersion'])
drive = layouter.DriveData(
drive_index=int(driveData['driveIndex']),\
drive_state=str(driveData['driveState']),\
modelfamiliy=str(driveData['driveModelFamiliy']),\
modelname=str(driveData['driveModelName']),\
capacity=int(driveData['driveCapacity']),\
serialnumber=str(driveData['driveSerialnumber']),\
power_on_hours=int(driveData['driveHours']),\
power_cycle=int(driveData['driveCycles']),\
smart_error_count=int(driveData['driveErrors']),\
shred_timestamp=int(driveData['driveShredTimestamp']),\
shred_duration=int(driveData['driveShredDuration']))
while(not os.path.exists(printer_path)): class TMsgQueueData(ctypes.Structure):
_fields_ = [
("msg_queue_type", ctypes.c_long),
("driveData", TDriveData),
]
# IPC bindings
libc = ctypes.CDLL("libc.so.6")
msgget = libc.msgget
msgrcv = libc.msgrcv
msgget.argtypes = [ctypes.c_int, ctypes.c_int]
msgget.restype = ctypes.c_int
msgrcv.argtypes = [
ctypes.c_int,
ctypes.POINTER(TMsgQueueData),
ctypes.c_size_t,
ctypes.c_long,
ctypes.c_int,
]
msgrcv.restype = ctypes.c_ssize_t
def signal_handler(signum, frame):
global terminate
print(f"Signal {signum} received, terminating...")
terminate = True
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def wait_for_printer():
while not os.path.exists(printer_path):
print("Printer not found, waiting ...") print("Printer not found, waiting ...")
time.sleep(30) #sleep 30 time.sleep(30)
return True
layouter.generate_image(drive, rehdd_info, file_name)
def create_drive_objects(drive_info):
"""Convert dictionary to layouter-compatible DriveData and ReHddInfo objects"""
drive = layouter.DriveData(
drive_index=int(drive_info["driveIndex"]),
drive_state=drive_info["driveState"],
modelfamily=drive_info["driveModelFamily"],
modelname=drive_info["driveModelName"],
capacity=int(drive_info["driveCapacity"]),
serialnumber=drive_info["driveSerialnumber"],
power_on_hours=int(drive_info["driveHours"]),
power_cycle=int(drive_info["driveCycles"]),
smart_error_count=int(drive_info["driveErrors"]),
shred_timestamp=int(drive_info["driveShredTimestamp"]),
shred_duration=int(drive_info["driveShredDuration"]),
drive_connection_type=drive_info["driveConnectionType"],
)
rehdd_info = layouter.ReHddInfo(
link="https://git.mosad.xyz/localhorst/reHDD",
version=drive_info["driveReHddVersion"],
)
return drive, rehdd_info
def worker(queue_id, test_mode=False):
try:
while not terminate:
time.sleep(3)
if test_mode:
drive_info = {
"driveIndex": "42",
"driveHours": 44,
"driveCycles": 45,
"driveErrors": 43,
"driveShredTimestamp": int(time.time()),
"driveShredDuration": 0,
"driveCapacity": 42,
"driveState": "shredded",
"driveConnectionType": "sata",
"driveModelFamily": "modelFamily",
"driveModelName": "modelName",
"driveSerialnumber": "serial",
"driveConnectionType": "sata",
"driveReHddVersion": "V1.1.2",
}
else:
msg = TMsgQueueData()
result = msgrcv(
queue_id,
ctypes.byref(msg),
ctypes.sizeof(TMsgQueueData) - ctypes.sizeof(ctypes.c_long),
0,
0,
)
if result == -1:
err = ctypes.get_errno()
print(f"Error reading from message queue: {os.strerror(err)}")
break
d = msg.driveData
drive_info = {
"driveIndex": d.caDriveIndex.decode().strip("\x00"),
"driveHours": int(d.caDriveHours.decode().strip("\x00")),
"driveCycles": int(d.caDriveCycles.decode().strip("\x00")),
"driveErrors": int(d.caDriveErrors.decode().strip("\x00")),
"driveShredTimestamp": int(
d.caDriveShredTimestamp.decode().strip("\x00")
),
"driveShredDuration": int(
d.caDriveShredDuration.decode().strip("\x00")
),
"driveCapacity": int(d.caDriveCapacity.decode().strip("\x00")),
"driveState": d.caDriveState.decode().strip("\x00"),
"driveConnectionType": d.caDriveConnectionType.decode().strip(
"\x00"
),
"driveModelFamily": d.caDriveModelFamily.decode().strip("\x00"),
"driveModelName": d.caDriveModelName.decode().strip("\x00"),
"driveSerialnumber": d.caDriveSerialnumber.decode().strip("\x00"),
"driveConnectionType": d.caDriveConnectionType.decode().strip(
"\x00"
),
"driveReHddVersion": d.caDriveReHddVersion.decode().strip("\x00"),
}
print(f"Received Drive Data: {drive_info}")
drive_obj, rehdd_info = create_drive_objects(drive_info)
layouter.generate_image(drive_obj, rehdd_info, file_name)
if wait_for_printer():
qlr = BrotherQLRaster("QL-570") qlr = BrotherQLRaster("QL-570")
create_label(qlr, file_name, '62') image = Image.open(file_name)
create_label(qlr, image, "62")
with open(printer_path, 'wb') as file: with open(printer_path, "wb") as file:
file.write(qlr.data) file.write(qlr.data)
os.remove(file_name) os.remove(file_name)
except sysv_ipc.ExistentialError: else:
print("ERROR: message queue creation failed") print("Skipping printing due to printer unavailability.")
if test_mode:
break
except Exception as e:
print(f"Worker encountered an error: {e}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--test", action="store_true", help="Run in test mode with fake data"
)
args = parser.parse_args()
if args.test:
print("Running in test mode.")
worker(None, test_mode=True)
return
while True:
try:
# Create or connect to the message queue with IPC_CREAT flag
# This matches the C++ sender's flags (IPC_CREAT | 0666)
queue_id = msgget(MSG_QUEUE_KEY, IPC_CREAT | 0o666)
if queue_id == -1:
err = ctypes.get_errno()
raise RuntimeError(
f"Failed to create/connect to the message queue: {os.strerror(err)}"
)
print(f"Successfully connected to message queue (ID: {queue_id})")
worker(queue_id)
except Exception as e:
print(f"Main process encountered an error: {e}")
time.sleep(30)
if __name__ == "__main__": if __name__ == "__main__":
main() main()