# -*- coding: utf-8 -*- """ /*************************************************************************** plTools A QGIS plugin The plugin adds a set of algorithms from plTools Generated by Plugin Builder: http://g-sherman.github.io/Qgis-Plugin-Builder/ ------------------- begin : 2024-01-10 copyright : (C) 2024 by Pawel Netzel email : pawel@netzel.pl ***************************************************************************/ /*************************************************************************** * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * ***************************************************************************/ """ __author__ = 'Pawel Netzel' __date__ = '2024-01-10' __copyright__ = '(C) 2024 by Pawel Netzel' # This will get replaced with a git SHA1 when you do a git archive __revision__ = '$Format:%H$' import os import subprocess import platform import sys import stat import glob from qgis.PyQt.QtCore import QCoreApplication from qgis.core import (Qgis, QgsProcessing, QgsProcessingAlgorithm, QgsProcessingException, QgsProcessingFeedback, QgsSettings, QgsMessageLog) from osgeo import gdal class plToolsBaseAlgorithm(QgsProcessingAlgorithm): def initAlgorithm(self, config): """ Check the system. In the case of Linux, check x attribute """ if not hasattr(self,'libSubPath'): self.isX64 = platform.architecture()[0] == '64bit' self.gdalVersion = gdal.__version__ self.isOSX = platform.system() == 'Darwin' self.isLinux = platform.system() == 'Linux' self.isWindows = platform.system() == 'Windows' self.plToolsPath = os.getenv("PLTOOLSPATH") self.cmdRunner = "" if not self.plToolsPath: raise QgsProcessingException("The plTools package is not installed!") if not self.isX64 or (self.isWindows and self.gdalVersion<'3.0.0'): raise QgsProcessingException("This system is not supported by plTools") if self.isWindows: self.cmdRunner = os.path.join(self.plToolsPath,'pltoolsrun') def displayName(self): """ Returns the translated algorithm name, which should be used for any user-visible display of the algorithm name. """ return self.tr(self.name()) def group(self): """ Returns the name of the group this algorithm belongs to. This string should be localised. """ return self.tr(self.groupId()) def tr(self, string): return QCoreApplication.translate('Processing', string) def runplTools(self, commands, feedback=None): if feedback is None: feedback = QgsProcessingFeedback() envval = os.getenv('PATH') if self.isOSX and os.path.isfile(os.path.join(QgsApplication.prefixPath(), "bin", "gdalinfo")): # Looks like there's a bundled gdal. Let's use it. os.environ['PATH'] = "{}{}{}".format(os.path.join(QgsApplication.prefixPath(), "bin"), os.pathsep, envval) os.environ['DYLD_LIBRARY_PATH'] = os.path.join(QgsApplication.prefixPath(), "lib") else: # Other platforms should use default gdal finder codepath settings = QgsSettings() path = settings.value('/GdalTools/gdalPath', '') if not path.lower() in envval.lower().split(os.pathsep): envval += '{}{}'.format(os.pathsep, path) os.putenv('PATH', envval) if self.isWindows: commands.insert(0, self.cmdRunner) fused_command = self.escapeAndJoin(commands) QgsMessageLog.logMessage(fused_command, 'Processing', Qgis.Info) feedback.pushInfo('plTools command:') feedback.pushCommandInfo(fused_command) feedback.pushInfo('plTools command output:') success = False retry_count = 0 while not success: loglines = [] loglines.append('plTools execution console output') try: with subprocess.Popen( fused_command, shell=True, stdout=subprocess.PIPE, stdin=subprocess.DEVNULL, stderr=subprocess.STDOUT, universal_newlines=True, ) as proc: for line in proc.stdout: while line.find("\b") > -1: pos = line.find("\b") if pos<1: line = line[pos+1:] else: line = line[:pos-1] + line[pos+1:] feedback.pushConsoleInfo(line) loglines.append(line) success = True except IOError as e: if retry_count < 5: retry_count += 1 else: raise IOError( str(e) + u'\nTried 5 times without success. Last iteration stopped after reading {} line(s).\nLast line(s):\n{}'.format( len(loglines), u'\n'.join(loglines[-10:]))) QgsMessageLog.logMessage('\n'.join(loglines), 'Processing', Qgis.Info) def quoteString(self, param): if self.isWindows: param = '"'+param+'"' return param def getEnv(self, name): return os.getenv(name) def escapeAndJoin(self, strList): joined = '' for s in strList: if not isinstance(s, str): s = str(s) if s and s[0] != '-' and ' ' in s: escaped = '"' + s.replace('\\', '\\\\').replace('"', '\\"') + '"' #escaped = '"' + s.replace('"', '\\"') + '"' else: escaped = s if escaped is not None: joined += escaped + ' ' return joined.strip()