136 lines
4.4 KiB
Python
136 lines
4.4 KiB
Python
# coding=utf-8
|
|
from __future__ import absolute_import
|
|
import threading
|
|
import cv2
|
|
import numpy as np
|
|
import requests
|
|
from time import sleep
|
|
import octoprint.plugin
|
|
|
|
|
|
class QrcodespoolswitcherPlugin(
|
|
octoprint.plugin.StartupPlugin,
|
|
octoprint.plugin.SettingsPlugin,
|
|
octoprint.plugin.AssetPlugin,
|
|
octoprint.plugin.TemplatePlugin,
|
|
octoprint.plugin.EventHandlerPlugin,
|
|
):
|
|
|
|
def __init__(self):
|
|
self.snapshot = ""
|
|
self.toolId = 0
|
|
self.selectedSpoolId = None
|
|
self.commitCurrentSpoolValues = True
|
|
self.frequency = 5
|
|
self.watching = False
|
|
|
|
def get_settings_defaults(self):
|
|
return {
|
|
"snapshot": "http://127.0.0.1:8080/?action=snapshot",
|
|
"toolId": 0,
|
|
"commitCurrentSpoolValues": True,
|
|
"frequency": 5,
|
|
}
|
|
|
|
def get_assets(self):
|
|
return {
|
|
"js": ["js/QRCodeSpoolSwitcher.js"],
|
|
"css": ["css/QRCodeSpoolSwitcher.css"],
|
|
"less": ["less/QRCodeSpoolSwitcher.less"],
|
|
}
|
|
|
|
def get_update_information(self):
|
|
return {
|
|
"QRCodeSpoolSwitcher": {
|
|
"displayName": "Qrcodespoolswitcher Plugin",
|
|
"displayVersion": self._plugin_version,
|
|
"type": "github_release",
|
|
"user": "mwalbeck",
|
|
"repo": "OctoPrint-Qrcodespoolswitcher",
|
|
"current": self._plugin_version,
|
|
"pip": "https://git.walbeck.it/mwalbeck/OctoPrint-Qrcodespoolswitcher/archive/{target_version}.zip",
|
|
}
|
|
}
|
|
|
|
def on_after_startup(self):
|
|
self.snapshot = self._settings.get(["snapshot"])
|
|
self.toolId = self._settings.get_int(["toolId"])
|
|
self.frequency = self._settings.get_int(["frequency"])
|
|
self.commitCurrentSpoolValues = self._settings.get_boolean(
|
|
["commitCurrentSpoolValues"]
|
|
)
|
|
|
|
self._logger.info("Starting QR code watcher")
|
|
watcher = threading.Thread(target=self.watch_for_qr)
|
|
watcher.start()
|
|
|
|
def on_event(self, event, payload):
|
|
if event == "plugin_spoolmanager_spool_selected":
|
|
if payload["toolId"] == self.toolId:
|
|
if payload["databaseId"] != self.selectedSpoolId:
|
|
self.selectedSpoolId = payload["databaseId"]
|
|
|
|
def watch_for_qr(self):
|
|
self.watching = True
|
|
qrcode_detector = cv2.QRCodeDetector()
|
|
|
|
while self.watching:
|
|
sleep(self.frequency)
|
|
try:
|
|
response = requests.get(self.snapshot, timeout=2)
|
|
response.raise_for_status()
|
|
except requests.RequestException as e:
|
|
self._logger.error(f"Failed to fetch image for QR code scaning: {e}")
|
|
continue
|
|
|
|
image_array = np.asarray(bytearray(response.content), dtype=np.uint8)
|
|
image = cv2.imdecode(image_array, cv2.IMREAD_UNCHANGED)
|
|
|
|
retval, decoded_info, points, straight_qrcode = (
|
|
qrcode_detector.detectAndDecodeMulti(image)
|
|
)
|
|
|
|
if retval:
|
|
self._logger.info(decoded_info)
|
|
|
|
for string in decoded_info:
|
|
if "SpoolManager/selectSpoolByQRCode" in string:
|
|
spoolId = int(string.split("/")[-1])
|
|
|
|
self._logger.info(f"Found spool {spoolId}")
|
|
|
|
if spoolId != self.selectedSpoolId:
|
|
self.select_spool(spoolId)
|
|
|
|
break
|
|
|
|
def select_spool(self, spoolId):
|
|
self.selectedSpoolId = spoolId
|
|
self._logger.info(f"Changed active spool to {spoolId}")
|
|
|
|
# payload = {
|
|
# "databaseId": spoolId,
|
|
# "toolIndex": self.toolId,
|
|
# "commitCurrentSpoolValues": self.commitCurrentSpoolValues,
|
|
# }
|
|
|
|
# try:
|
|
# requests.put(
|
|
# "http://localhost:5000/plugin/SpoolManager/selectSpool", json=payload
|
|
# )
|
|
# except requests.RequestException as e:
|
|
# self._logger.error(f"Error occured during request to change spool: {e}")
|
|
|
|
|
|
__plugin_name__ = "QRCodeSpoolSwitcher"
|
|
__plugin_pythoncompat__ = ">=3,<4"
|
|
|
|
|
|
def __plugin_load__():
|
|
global __plugin_implementation__
|
|
__plugin_implementation__ = QrcodespoolswitcherPlugin()
|
|
|
|
global __plugin_hooks__
|
|
__plugin_hooks__ = {
|
|
"octoprint.plugin.softwareupdate.check_config": __plugin_implementation__.get_update_information
|
|
}
|