ScanOS/backend/app/scanner/scanner.py

140 lines
4.4 KiB
Python
Raw Permalink Normal View History

2023-09-21 14:56:01 +02:00
import gi, os, threading
from typing import List
from PIL import Image
from app.scanner.enums import Status
gi.require_version('Libinsane', '1.0')
from gi.repository import Libinsane, GObject # type: ignore
class __LibinsaneSilentLogger(GObject.GObject, Libinsane.Logger):
def do_log(self, lvl, msg):
return
Libinsane.register_logger(__LibinsaneSilentLogger())
class Page:
filename: str
size_bytes: int
class Scanner:
def __get_device_id(self):
"""
List local scanners and get the device id of the first found device.
:param self: Instance of this class
:returns: Device id of the first scan device
"""
devs = self.api.list_devices(Libinsane.DeviceLocations.LOCAL_ONLY)
return devs[0].get_dev_id()
def __raw_to_img(self, params, img_bytes):
"""
"""
fmt = params.get_format()
assert(fmt == Libinsane.ImgFormat.RAW_RGB_24)
(w, h) = (
params.get_width(),
int(len(img_bytes) / 3 / params.get_width())
)
return Image.frombuffer("RGB", (w, h), img_bytes, "raw", "RGB", 0, 1)
def __write_file(self, scan_params, data, page_index, last_file):
data = b"".join(data)
if scan_params.get_format() == Libinsane.ImgFormat.RAW_RGB_24:
filesize = len(data)
img = self.__raw_to_img(scan_params, data)
filename = f"out{page_index}.png"
img.save(os.path.join(self.storage_path, filename), format="PNG")
page = Page()
page.filename = filename
page.size_bytes = filesize
self.scanned_pages.append(page)
if last_file:
self.status = Status.DONE
def __set_defaults(self):
dev = self.api.get_device(self.device_id)
opts = dev.get_options()
opts = {opt.get_name(): opt for opt in opts}
opts["sleeptimer"].set_value(1)
opts["resolution"].set_value(200)
dev.close()
def __scan(self):
self.status = Status.RUNNING
source = self.api.get_device(self.device_id)
opts = source.get_options()
opts = {opt.get_name(): opt for opt in opts}
if opts["cover-open"].get_value() == True:
self.status = Status.ERR_COVER_OPEN
return
session = source.scan_start()
try:
page_index = 0
while not session.end_of_feed() and page_index < 50:
# Do not assume that all the pages will have the same size !
scan_params = session.get_scan_parameters()
img = []
while not session.end_of_page():
data = session.read_bytes(256 * 1024)
data = data.get_data()
img.append(data)
t = threading.Thread(target=self.__write_file, args=(scan_params, img, page_index, session.end_of_feed()))
t.start()
page_index += 1
if page_index == 0:
self.status = Status.ERR_NO_PAPER
finally:
session.cancel()
source.close()
def __init__(self, storage_path):
self.scanned_pages: List[Page] = []
self.storage_path = storage_path
self.status = Status.INITIALIZED
def preload(self):
os.environ["LIBINSANE_NORMALIZER_SAFE_DEFAULTS"] = "0"
self.api = Libinsane.Api.new_safebet()
self.device_id = self.__get_device_id()
self.__set_defaults()
self.status = Status.IDLE
def scan(self):
if self.status == Status.RUNNING:
raise RuntimeError("already_running")
if self.status == Status.INITIALIZED:
self.preload()
self.scanned_pages: List[Page] = []
t = threading.Thread(target=self.__scan)
t.start()
def get_status(self) -> Status:
return self.status
def get_pages(self) -> List[Page]:
return self.scanned_pages
def get_options(self):
dev = self.api.get_device(self.device_id)
opts = dev.get_options()
result = {}
for opt in opts:
try:
result[opt.get_name()] = opt.get_value()
except Exception:
continue
dev.close()
return result
def cleanup(self):
if self.status == Status.RUNNING:
raise RuntimeError("scan_running")
if self.status != Status.INITIALIZED:
self.api.cleanup()