import gi, os, threading, logging from typing import List, Optional from PIL import Image from app.scanner.enums import Status, PageStatus from app.scanner.tesseract import Tesseract from app.scanner.processing import correct_image #from app.backends.common import create_pdf, ocr_pdf gi.require_version('Libinsane', '1.0') from gi.repository import Libinsane, GObject # type: ignore class __LibinsaneSilentLogger(GObject.GObject, Libinsane.Logger): def do_log(self, lvl, msg): return Libinsane.register_logger(__LibinsaneSilentLogger()) class Page: filename: Optional[str] = None size_bytes: int status: PageStatus class Scanner: def __init__(self, storage_path, logger = logging.getLogger()): self.scanned_pages: List[Page] = [] self.logger = logger self.tesseract = Tesseract(logger) self.storage_path = storage_path self.status = Status.INITIALIZED def __get_device_id(self): """ List local scanners and get the device id of the first found device. :param self: Instance of this class :returns: Device id of the first scan device """ devs = self.api.list_devices(Libinsane.DeviceLocations.LOCAL_ONLY) self.logger.info("Using device: %s", devs[0].get_dev_id()) return devs[0].get_dev_id() def __raw_to_img(self, params, img_bytes): """ """ fmt = params.get_format() assert(fmt == Libinsane.ImgFormat.RAW_RGB_24) (w, h) = ( params.get_width(), int(len(img_bytes) / 3 / params.get_width()) ) return Image.frombuffer("RGB", (w, h), img_bytes, "raw", "RGB", 0, 1) def __write_file(self, scan_params, data, page_index, last_file): data = b"".join(data) if scan_params.get_format() == Libinsane.ImgFormat.RAW_RGB_24: filesize = len(data) page = Page() page.status = PageStatus.PROCESSING page.size_bytes = filesize self.scanned_pages.append(page) img = self.__raw_to_img(scan_params, data) filename = f"out{page_index}.jpeg" img = self.tesseract.rotate_img(img) img_path = os.path.join(self.storage_path, filename) img.save(img_path, format="jpeg", quality=95) #correct_image(img_path) page.filename = filename page.status = PageStatus.DONE self.scanned_pages[page_index] = page if last_file: #self.tesseract.create_pdf(scanner=self) #ocr_pdf() self.status = Status.DONE def __set_defaults(self): dev = self.api.get_device(self.device_id) opts = dev.get_options() opts = {opt.get_name(): opt for opt in opts} opts["sleeptimer"].set_value(1) opts["resolution"].set_value(200) opts["swcrop"].set_value(True) opts["swdeskew"].set_value(True) opts["page-height"].set_value(300) opts["mode"].set_value("Color") dev.close() def __scan(self): self.logger.info("Scan requested") self.status = Status.RUNNING source = self.api.get_device(self.device_id) opts = source.get_options() opts = {opt.get_name(): opt for opt in opts} if opts["cover-open"].get_value() == True: self.logger.warn("Cover open. Can't scan.") self.status = Status.ERR_COVER_OPEN return self.logger.info("Starting scan...") session = source.scan_start() try: page_index = 0 while not session.end_of_feed() and page_index < 50: self.logger.info("Processing page %s", page_index) # Do not assume that all the pages will have the same size scan_params = session.get_scan_parameters() img = [] while not session.end_of_page(): data = session.read_bytes(256 * 1024) data = data.get_data() img.append(data) t = threading.Thread(target=self.__write_file, args=(scan_params, img, page_index, session.end_of_feed())) t.start() page_index += 1 if page_index == 0: self.logger.warn("No paper. Nothing to scan.") self.status = Status.ERR_NO_PAPER finally: session.cancel() source.close() def preload(self): os.environ["LIBINSANE_NORMALIZER_SAFE_DEFAULTS"] = "0" self.api = Libinsane.Api.new_safebet() self.device_id = self.__get_device_id() self.__set_defaults() self.status = Status.IDLE def scan(self): if self.status == Status.RUNNING: raise RuntimeError("already_running") if self.status == Status.INITIALIZED: self.preload() self.scanned_pages: List[Page] = [] t = threading.Thread(target=self.__scan) t.start() def get_status(self) -> Status: return self.status def get_pages(self) -> List[Page]: return self.scanned_pages def get_options(self): dev = self.api.get_device(self.device_id) opts = dev.get_options() result = {} for opt in opts: try: result[opt.get_name()] = opt.get_value() except Exception: continue dev.close() return result def cleanup(self): if self.status == Status.RUNNING: raise RuntimeError("scan_running") if self.status != Status.INITIALIZED: self.api.cleanup()