Source code for polo.threads.thread

import os
import time

from PyQt5 import QtWidgets
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtGui import QBrush, QColor, QIcon, QPixmap
from PyQt5.QtWidgets import *

from polo import BLANK_IMAGE, make_default_logger
from polo.utils.math_utils import best_aspect_ratio, get_cell_image_dims

logger = make_default_logger(__name__)


[docs]class thread(QThread): '''Very basic wrapper class around :class:`QThread` class. Should be inherited by a more specific class and then the `run` method can be overwritten to provide functionality. Whatever code is in the :meth:`~polo.threads.thread.thread.run` method will be executed when :meth:`~polo.threads.thread.thread.start` is called. The :meth:`~polo.threads.thread.thread.run` method should not be called explicitly. :param parent: parent widget, defaults to None :type parent: QWidget, optional ''' def __init__(self, parent=None): super(thread, self).__init__(parent=parent) def __del__(self): self.existing = True self.wait()
[docs] def run(self): return NotImplementedError
[docs]class QuickThread(thread): '''QuickThreads are very similar to thread objects except instead of you writing code that would be executed by the `run` method directly, the function that the `QuickThread` will execute is passed as an argument to the `__init__`. Any arguments that the passed function requires are passed as key word arguments. Once the thread finished any values returned by the passed function are stored in the `QuickThread`'s :attr:`polo.threads.thread.QuickThread.results` attribute. .. highlight:: python .. code-block:: python my_func = lambda x, y: x + y x, y = 40, 60 my_thread = QuickThread(job_func=my_func, x=x, y=y) # set up the thread with my_func and the args we want to pass my_thread.start() # my_thread.result will = 100 (x + y) :param job_func: Function to execute on the thread :type job_func: func ''' def __init__(self, job_func, parent=None, **kwargs): super(QuickThread, self).__init__(parent=parent) self.job_func = job_func self.func_args = dict(kwargs) self.result = None logger.debug('Created {} job: {}'.format(self, self.job_func)) def __del__(self): self.exiting = True self.wait()
[docs] def run(self): try: self.result = self.job_func(**self.func_args) logger.debug('{} completed job'.format(self)) except Exception as e: result = e logger.error('Caught {} calling {} on {}'.format( e, self.job_func, self ))
[docs]class ClassificationThread(thread): '''Thread that is specifically for classifying images using the MARCO model. This is a very CPU intensive process so it cannot be run on the GUI thread. :param run_object: Run who's images are to be classified :type run_object: Run or HWIRun ''' change_value = pyqtSignal(int) estimated_time = pyqtSignal(float, int) def __init__(self, run_object, parent=None): super(ClassificationThread, self).__init__(parent) self.classification_run = run_object self.exceptions = None logger.debug('Created classification thread {}'.format(self))
[docs] def run(self): '''Method that actually does the classification work. Emits the the :const:`change_value` signal everytime an image is classified. This is primary to update the progress bar widget in the `RunOrganizer` widget to notify the user how many images have been classified. Additionally, every five images classified the :const:`estimated_time` signal is emitted which includes a tuple that contains as the first item the time in seconds it took to classify the last five images and the number of images that remain to be classified as the second item. This allows for making an estimate on about how much time remains in until the thread finishes. ''' try: start_time = time.time() for i, image in enumerate(self.classification_run.images): s = time.time() if image and not image.is_placeholder: image.classify_image() self.change_value.emit(i+1) if i % 5 == 0: e = time.time() self.estimated_time.emit( e-s, len(self.classification_run.images)-(i+1)) end_time = time.time() self.classification_run.has_been_machine_classified = True logger.debug( 'Classified {} images in {} minutes'.format( len(self.classification_run.images), round((end_time - start_time) / 60), 2) ) except Exception as e: self.change_value.emit(0) # reset the progress bar logger.error('Caught {} at {}'.format(e, self.run)) self.exceptions = e
[docs]class FTPDownloadThread(thread): '''Thread specific for downloading files from a remote FTP server. :param ftp_connection: FTP connection object to download files from :type ftp_connection: :class:`FTP` :param file_paths: List absolute filepaths on the FTP server to download :type file_paths: list :param save_dir_path: Path on the local machine to store all downloaded files in :type save_dir_path: str or Path ''' file_downloaded = pyqtSignal(int) download_path = pyqtSignal(str) def __init__(self, ftp_connection, file_paths, save_dir_path, parent=None): super(FTPDownloadThread, self).__init__(parent) self.ftp = ftp_connection self.file_paths = file_paths self.save_dir_path = save_dir_path self.exceptions = None logger.debug('Created {}'.format(self))
[docs] def run(self): try: for i, remote_file_path in enumerate(self.file_paths): if self.ftp: local_file_path = os.path.join( str(self.save_dir_path), os.path.basename(str(remote_file_path)) ) with open(local_file_path, 'wb') as local_file: cmd = 'RETR {}'.format(remote_file_path) status = self.ftp.retrbinary(cmd, local_file.write) logger.debug('{} returned exit status of {}'.format( cmd, status)) self.file_downloaded.emit(i) self.download_path.emit(local_file_path) else: logger.warning('Attempted FTP download with object type {}'.format( type(self.ftp) )) except Exception as e: logger.error('Caught {} calling {}'.format(e, self.run)) self.exceptions = e