ScanOS/backend/app/scanner/scanner.py

165 lines
5.6 KiB
Python

import gi, os, threading, logging
from typing import List, Optional
from PIL import Image
from app.scanner.enums import Status, PageStatus
from app.scanner.tesseract import Tesseract
from app.scanner.processing import correct_image
#from app.backends.common import create_pdf, ocr_pdf
gi.require_version('Libinsane', '1.0')
from gi.repository import Libinsane, GObject # type: ignore
class __LibinsaneSilentLogger(GObject.GObject, Libinsane.Logger):
def do_log(self, lvl, msg):
return
Libinsane.register_logger(__LibinsaneSilentLogger())
class Page:
filename: Optional[str] = None
size_bytes: int
status: PageStatus
class Scanner:
def __init__(self, storage_path, logger = logging.getLogger()):
self.scanned_pages: List[Page] = []
self.logger = logger
self.tesseract = Tesseract(logger)
self.storage_path = storage_path
self.status = Status.INITIALIZED
def __get_device_id(self):
"""
List local scanners and get the device id of the first found device.
:param self: Instance of this class
:returns: Device id of the first scan device
"""
devs = self.api.list_devices(Libinsane.DeviceLocations.LOCAL_ONLY)
self.logger.info("Using device: %s", devs[0].get_dev_id())
return devs[0].get_dev_id()
def __raw_to_img(self, params, img_bytes):
"""
"""
fmt = params.get_format()
assert(fmt == Libinsane.ImgFormat.RAW_RGB_24)
(w, h) = (
params.get_width(),
int(len(img_bytes) / 3 / params.get_width())
)
return Image.frombuffer("RGB", (w, h), img_bytes, "raw", "RGB", 0, 1)
def __write_file(self, scan_params, data, page_index, last_file):
data = b"".join(data)
if scan_params.get_format() == Libinsane.ImgFormat.RAW_RGB_24:
filesize = len(data)
page = Page()
page.status = PageStatus.PROCESSING
page.size_bytes = filesize
self.scanned_pages.append(page)
img = self.__raw_to_img(scan_params, data)
filename = f"out{page_index}.jpeg"
img = self.tesseract.rotate_img(img)
img_path = os.path.join(self.storage_path, filename)
img.save(img_path, format="jpeg", quality=95)
#correct_image(img_path)
page.filename = filename
page.status = PageStatus.DONE
self.scanned_pages[page_index] = page
if last_file:
#self.tesseract.create_pdf(scanner=self)
#ocr_pdf()
self.status = Status.DONE
def __set_defaults(self):
dev = self.api.get_device(self.device_id)
opts = dev.get_options()
opts = {opt.get_name(): opt for opt in opts}
opts["sleeptimer"].set_value(1)
opts["resolution"].set_value(200)
opts["swcrop"].set_value(True)
opts["swdeskew"].set_value(True)
opts["page-height"].set_value(300)
opts["mode"].set_value("Color")
dev.close()
def __scan(self):
self.logger.info("Scan requested")
self.status = Status.RUNNING
source = self.api.get_device(self.device_id)
opts = source.get_options()
opts = {opt.get_name(): opt for opt in opts}
if opts["cover-open"].get_value() == True:
self.logger.warn("Cover open. Can't scan.")
self.status = Status.ERR_COVER_OPEN
return
self.logger.info("Starting scan...")
session = source.scan_start()
try:
page_index = 0
while not session.end_of_feed() and page_index < 50:
self.logger.info("Processing page %s", page_index)
# Do not assume that all the pages will have the same size
scan_params = session.get_scan_parameters()
img = []
while not session.end_of_page():
data = session.read_bytes(256 * 1024)
data = data.get_data()
img.append(data)
t = threading.Thread(target=self.__write_file, args=(scan_params, img, page_index, session.end_of_feed()))
t.start()
page_index += 1
if page_index == 0:
self.logger.warn("No paper. Nothing to scan.")
self.status = Status.ERR_NO_PAPER
finally:
session.cancel()
source.close()
def preload(self):
os.environ["LIBINSANE_NORMALIZER_SAFE_DEFAULTS"] = "0"
self.api = Libinsane.Api.new_safebet()
self.device_id = self.__get_device_id()
self.__set_defaults()
self.status = Status.IDLE
def scan(self):
if self.status == Status.RUNNING:
raise RuntimeError("already_running")
if self.status == Status.INITIALIZED:
self.preload()
self.scanned_pages: List[Page] = []
t = threading.Thread(target=self.__scan)
t.start()
def get_status(self) -> Status:
return self.status
def get_pages(self) -> List[Page]:
return self.scanned_pages
def get_options(self):
dev = self.api.get_device(self.device_id)
opts = dev.get_options()
result = {}
for opt in opts:
try:
result[opt.get_name()] = opt.get_value()
except Exception:
continue
dev.close()
return result
def cleanup(self):
if self.status == Status.RUNNING:
raise RuntimeError("scan_running")
if self.status != Status.INITIALIZED:
self.api.cleanup()