ScanOS/backend/app/scanner/scanner.py

165 lines
5.6 KiB
Python
Raw Permalink Normal View History

2024-07-07 00:30:13 +02:00
import gi, os, threading, logging
from typing import List, Optional
2023-09-21 14:56:01 +02:00
from PIL import Image
2024-07-07 00:30:13 +02:00
from app.scanner.enums import Status, PageStatus
from app.scanner.tesseract import Tesseract
from app.scanner.processing import correct_image
#from app.backends.common import create_pdf, ocr_pdf
2023-09-21 14:56:01 +02:00
gi.require_version('Libinsane', '1.0')
from gi.repository import Libinsane, GObject # type: ignore
class __LibinsaneSilentLogger(GObject.GObject, Libinsane.Logger):
def do_log(self, lvl, msg):
return
Libinsane.register_logger(__LibinsaneSilentLogger())
class Page:
2024-07-07 00:30:13 +02:00
filename: Optional[str] = None
2023-09-21 14:56:01 +02:00
size_bytes: int
2024-07-07 00:30:13 +02:00
status: PageStatus
2023-09-21 14:56:01 +02:00
class Scanner:
2024-07-07 00:30:13 +02:00
def __init__(self, storage_path, logger = logging.getLogger()):
self.scanned_pages: List[Page] = []
self.logger = logger
self.tesseract = Tesseract(logger)
self.storage_path = storage_path
self.status = Status.INITIALIZED
2023-09-21 14:56:01 +02:00
def __get_device_id(self):
"""
List local scanners and get the device id of the first found device.
:param self: Instance of this class
:returns: Device id of the first scan device
"""
devs = self.api.list_devices(Libinsane.DeviceLocations.LOCAL_ONLY)
2024-07-07 00:30:13 +02:00
self.logger.info("Using device: %s", devs[0].get_dev_id())
2023-09-21 14:56:01 +02:00
return devs[0].get_dev_id()
def __raw_to_img(self, params, img_bytes):
"""
"""
fmt = params.get_format()
assert(fmt == Libinsane.ImgFormat.RAW_RGB_24)
(w, h) = (
params.get_width(),
int(len(img_bytes) / 3 / params.get_width())
)
return Image.frombuffer("RGB", (w, h), img_bytes, "raw", "RGB", 0, 1)
def __write_file(self, scan_params, data, page_index, last_file):
data = b"".join(data)
if scan_params.get_format() == Libinsane.ImgFormat.RAW_RGB_24:
2024-07-07 00:30:13 +02:00
2023-09-21 14:56:01 +02:00
filesize = len(data)
page = Page()
2024-07-07 00:30:13 +02:00
page.status = PageStatus.PROCESSING
2023-09-21 14:56:01 +02:00
page.size_bytes = filesize
self.scanned_pages.append(page)
2024-07-07 00:30:13 +02:00
img = self.__raw_to_img(scan_params, data)
filename = f"out{page_index}.jpeg"
img = self.tesseract.rotate_img(img)
img_path = os.path.join(self.storage_path, filename)
img.save(img_path, format="jpeg", quality=95)
#correct_image(img_path)
page.filename = filename
page.status = PageStatus.DONE
self.scanned_pages[page_index] = page
2023-09-21 14:56:01 +02:00
if last_file:
2024-07-07 00:30:13 +02:00
#self.tesseract.create_pdf(scanner=self)
#ocr_pdf()
2023-09-21 14:56:01 +02:00
self.status = Status.DONE
def __set_defaults(self):
dev = self.api.get_device(self.device_id)
opts = dev.get_options()
opts = {opt.get_name(): opt for opt in opts}
opts["sleeptimer"].set_value(1)
opts["resolution"].set_value(200)
2024-07-07 00:30:13 +02:00
opts["swcrop"].set_value(True)
opts["swdeskew"].set_value(True)
opts["page-height"].set_value(300)
opts["mode"].set_value("Color")
2023-09-21 14:56:01 +02:00
dev.close()
def __scan(self):
2024-07-07 00:30:13 +02:00
self.logger.info("Scan requested")
2023-09-21 14:56:01 +02:00
self.status = Status.RUNNING
source = self.api.get_device(self.device_id)
opts = source.get_options()
opts = {opt.get_name(): opt for opt in opts}
if opts["cover-open"].get_value() == True:
2024-07-07 00:30:13 +02:00
self.logger.warn("Cover open. Can't scan.")
2023-09-21 14:56:01 +02:00
self.status = Status.ERR_COVER_OPEN
return
2024-07-07 00:30:13 +02:00
self.logger.info("Starting scan...")
2023-09-21 14:56:01 +02:00
session = source.scan_start()
try:
page_index = 0
while not session.end_of_feed() and page_index < 50:
2024-07-07 00:30:13 +02:00
self.logger.info("Processing page %s", page_index)
# Do not assume that all the pages will have the same size
2023-09-21 14:56:01 +02:00
scan_params = session.get_scan_parameters()
img = []
while not session.end_of_page():
data = session.read_bytes(256 * 1024)
data = data.get_data()
img.append(data)
t = threading.Thread(target=self.__write_file, args=(scan_params, img, page_index, session.end_of_feed()))
t.start()
page_index += 1
if page_index == 0:
2024-07-07 00:30:13 +02:00
self.logger.warn("No paper. Nothing to scan.")
2023-09-21 14:56:01 +02:00
self.status = Status.ERR_NO_PAPER
finally:
session.cancel()
source.close()
def preload(self):
os.environ["LIBINSANE_NORMALIZER_SAFE_DEFAULTS"] = "0"
self.api = Libinsane.Api.new_safebet()
self.device_id = self.__get_device_id()
self.__set_defaults()
self.status = Status.IDLE
def scan(self):
if self.status == Status.RUNNING:
raise RuntimeError("already_running")
if self.status == Status.INITIALIZED:
self.preload()
self.scanned_pages: List[Page] = []
t = threading.Thread(target=self.__scan)
t.start()
def get_status(self) -> Status:
return self.status
def get_pages(self) -> List[Page]:
return self.scanned_pages
def get_options(self):
dev = self.api.get_device(self.device_id)
opts = dev.get_options()
result = {}
for opt in opts:
try:
result[opt.get_name()] = opt.get_value()
except Exception:
continue
dev.close()
return result
def cleanup(self):
if self.status == Status.RUNNING:
raise RuntimeError("scan_running")
if self.status != Status.INITIALIZED:
self.api.cleanup()