Class ConcurrentQ<I,​R>

  • Type Parameters:
    I - The type of the items to be processed.
    R - The type of objects resulting from processing an item; if you don't care about the return value, then make this value whatever you want, like Object or the same value as ConcurrentQ and return null from QCallback.process(Object, TaskMonitor).

    public class ConcurrentQ<I,​R>
    extends java.lang.Object
    A queue for easily scheduling tasks to be run in parallel (or sequentially) via a thread pool. This class provides a clean separation of items that need to be processed from the algorithm that does the processing, making it easy to parallelize the processing of multiple items. Further, you can control the maximum number of items that can be processed concurrently. This is useful to throttle operations that may starve the other threads in the system. You may also control how many items get placed into the queue at one time, blocking if some threshold is exceeded.

    Examples:


    Put and Forget:

     QCallback callback = new AbstractQCallback() {
         public RESULT process(ITEM item, TaskMonitor monitor) {
             // do work here...
         }
     };
     
     ConcurrentQBuilder builder = new ConcurrentQBuilder();
     builder.setThreadPoolName("Thread Pool Name");
     concurrentQ = builder.getQueue(callback);
     ...
     ...
     concurrentQ.add(item); // where item is one of the instances of ITEM
     
     

    Put Items and Handle Results in Any Order as They Available:

     QCallback callback = new AbstractQCallback() {
         public RESULT process(ITEM item, TaskMonitor monitor) {
             // do work here...
         }
     };
     
     QItemListener itemListener = new QItemListener() {
         public void itemProcessed(QResult result) {
             RESULT result = result.getResult();
                 // work on my result...
             }
     };
     
     ConcurrentQBuilder builder = new ConcurrentQBuilder();
     builder.setThreadPoolName("Thread Pool Name");
     builder.setListener(itemListener);
     concurrentQ = builder.build(callback);
     ...
     ...
     concurrentQ.add(item); // where item is one of the instances of ITEM
     concurrentQ.add(item);
     concurrentQ.add(item);
     
     

    Put Items and Handle Results When All Items Have Been Processed:

     QCallback callback = new AbstractQCallback() {
         public RESULT process(ITEM item, TaskMonitor monitor) {
             // do work here...
         }
     };
    
     ConcurrentQBuilder builder = new ConcurrentQBuilder();
     builder.setThreadPoolName("Thread Pool Name");
     builder.setCollectResults(true);
     concurrentQ = builder.getQueue(callback);
     ...
     ...
     concurrentQ.add(item); // where item is one of the instances of ITEM
     concurrentQ.add(item);
     concurrentQ.add(item);
     ...
     List<QResult<I, R>> results = concurrentQ.waitForResults();
     // process the results...
     
     

    Put Items, Blocking While Full, and Handle Results in Any Order as They Available:

     QCallback callback = new AbstractQCallback() {
         public RESULT process(ITEM item, TaskMonitor monitor) {
             // do work here...
         }
     };
    
     QItemListener itemListener = new QItemListener() {
         public void itemProcessed(QResult result) {
             RESULT result = result.getResult();
                 // work on my result...
             }
     };
     
     ConcurrentQBuilder builder = new ConcurrentQBuilder();
            builder.setThreadPoolName("Thread Pool Name");
     builder.setQueue(new LinkedBlockingQueue(100));
     concurrentQ = builder.getQueue(callback);
     ...
     ...
     Iterator iterator = <get an iterator for 1000s of items somewhere>
     concurrentQ.offer(iterator); // this call will block when the queue fills up (100 items or more)
     
     

    • Constructor Summary

      Constructors 
      Constructor Description
      ConcurrentQ​(QCallback<I,​R> callback, java.util.Queue<I> queue, GThreadPool threadPool, QItemListener<I,​R> listener, boolean collectResults, int maxInProgress, boolean jobsReportProgress)
      Creates a ConcurrentQ that will process at most maxInProgress items at a time, regardless of how many threads are available in the GThreadPool.
      ConcurrentQ​(java.lang.String name, QCallback<I,​R> callback)
      Creates a ConcurrentQ that will process as many items as the given threadPool can handle at one time.
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      void add​(I item)
      Adds the item to this queue for concurrent processing.
      void add​(java.util.Collection<I> items)
      Adds the list of items to this queue for concurrent processing.
      void add​(java.util.Iterator<I> iterator)
      Adds the items of the given iterator to this queue for concurrent processing.
      void addProgressListener​(QProgressListener<I> listener)
      Adds a progress listener for this queue.
      java.util.List<I> cancelAllTasks​(boolean interruptRunningTasks)
      Cancels the processing of currently scheduled items in this queue.
      void cancelScheduledJobs()  
      void dispose()
      Cancels all running tasks and disposes of the internal thread pool if it is a private pool.
      boolean isEmpty()
      Returns true if this queue has no items waiting to be processed or currently being processed.
      void offer​(java.util.Iterator<I> iterator)
      Allows clients to use a bounded queue (such as a LinkedBlockingQueue to control how many items get placed into this queue at one time.
      void removeProgressListener​(QProgressListener<I> listener)
      Removes a progress listener from this queue.
      java.util.List<I> removeUnscheduledJobs()  
      void setMonitor​(TaskMonitor monitor, boolean cancelClearsAllItems)
      Sets the monitor to use with this queue.
      QResult<I,​R> waitForNextResult()
      Wait until at least one result is available and then return the first result.
      java.util.Collection<QResult<I,​R>> waitForResults()
      Waits until all scheduled items have been completed or cancelled and returns a list of QResults if this queue has been told to collect results.
      java.util.Collection<QResult<I,​R>> waitForResults​(long timeout, java.util.concurrent.TimeUnit unit)
      Waits up to the specified time for scheduled jobs to complete.
      void waitUntilDone()
      Waits until all items have been processed OR an Exception happens during the processing of ANY item.
      boolean waitUntilDone​(long timeout, java.util.concurrent.TimeUnit unit)  
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Constructor Detail

      • ConcurrentQ

        public ConcurrentQ​(java.lang.String name,
                           QCallback<I,​R> callback)
        Creates a ConcurrentQ that will process as many items as the given threadPool can handle at one time.
        Parameters:
        name - The name of the thread pool that will be created by this constructor.
        callback - the QWorker object that will be used to process items concurrently.
      • ConcurrentQ

        public ConcurrentQ​(QCallback<I,​R> callback,
                           java.util.Queue<I> queue,
                           GThreadPool threadPool,
                           QItemListener<I,​R> listener,
                           boolean collectResults,
                           int maxInProgress,
                           boolean jobsReportProgress)
        Creates a ConcurrentQ that will process at most maxInProgress items at a time, regardless of how many threads are available in the GThreadPool.
        Parameters:
        callback - the QWorker object that will be used to process items concurrently.
        queue - the internal storage queue to use in this concurrent queue.
        threadPool - the GThreadPool to used for providing the threads for concurrent processing.
        listener - An optional QItemListener that will be called back with results when the item has been processed.
        collectResults - specifies if this queue should collect the results as items are processed so they can be returned in a waitForResults() call.
        maxInProgress - specifies the maximum number of items that can be process at a time. If this is set to 0, then this queue will attempt to execute as many items at a time as there are threads in the given threadPool. Setting this parameter to 1 will have the effect of guaranteeing that all times are processed one at a time in the order they were submitted. Any other positive value will run that many items concurrently, up to the number of available threads.
        jobsReportProgress - true signals that jobs wish to report progress via their task monitor. The default is false, which triggers this queue to report an overall progress for each job that is processed. False is a good default for clients that have a finite number of jobs to be done.
    • Method Detail

      • addProgressListener

        public void addProgressListener​(QProgressListener<I> listener)
        Adds a progress listener for this queue. All the progress and messages reported by a QWorker will be routed to these listener.
        Parameters:
        listener - the listener for receiving progress and message notifications.
      • removeProgressListener

        public void removeProgressListener​(QProgressListener<I> listener)
        Removes a progress listener from this queue. All the progress and messages reported by a QWorker will be routed to this listener.
        Parameters:
        listener - the listener for receiving progress and message notifications.
      • setMonitor

        public void setMonitor​(TaskMonitor monitor,
                               boolean cancelClearsAllItems)
        Sets the monitor to use with this queue.
        Parameters:
        monitor - the monitor to attache to this queue
        cancelClearsAllItems - if true, cancelling the monitor will cancel all items currently being processed by a thread and clear the scheduled items that haven't yet run. If false, only the items currently being processed will be cancelled.
      • add

        public void add​(java.util.Collection<I> items)
        Adds the list of items to this queue for concurrent processing.
        Parameters:
        items - the items to be scheduled for concurrent processing
      • add

        public void add​(java.util.Iterator<I> iterator)
        Adds the items of the given iterator to this queue for concurrent processing.
        Parameters:
        iterator - an iterator from which the items to be scheduled for concurrent processing will be taken.
      • offer

        public void offer​(java.util.Iterator<I> iterator)
                   throws java.lang.InterruptedException
        Allows clients to use a bounded queue (such as a LinkedBlockingQueue to control how many items get placed into this queue at one time. Calling the add methods will place all items into the queue, which for a large number of items, can consume a large amount of memory. This method will block once the queue at maximum capacity, continuing to add new items as existing items on the queue are processed.

        To enable blocking on the queue when it is full, construct this ConcurrentQ with an instance of BlockingQueue.

        Parameters:
        iterator - An iterator from which items will be taken.
        Throws:
        java.lang.InterruptedException - if this queue is interrupted while waiting to add more items
      • add

        public void add​(I item)
        Adds the item to this queue for concurrent processing.
        Parameters:
        item - the item to be scheduled for concurrent processing.
      • isEmpty

        public boolean isEmpty()
        Returns true if this queue has no items waiting to be processed or currently being processed.
        Returns:
        true if this queue has no items waiting to be processed or currently being processed.
      • waitForResults

        public java.util.Collection<QResult<I,​R>> waitForResults()
                                                                throws java.lang.InterruptedException
        Waits until all scheduled items have been completed or cancelled and returns a list of QResults if this queue has been told to collect results.

        You can still call this method to wait for items to be processed, even if you did not specify to collect results. In that case, the list returned will be empty.

        Returns:
        the list of QResult objects that have all the results of the completed jobs.
        Throws:
        java.lang.InterruptedException - if this call was interrupted--Note: this interruption only happens if the calling thread cannot acquire the lock. If the thread is interrupted while waiting for results, then it will try again.
      • waitForNextResult

        public QResult<I,​R> waitForNextResult()
                                             throws java.lang.InterruptedException
        Wait until at least one result is available and then return the first result.
        Returns:
        the first available result
        Throws:
        java.lang.InterruptedException - if interrupted while waiting for a result
      • waitUntilDone

        public void waitUntilDone()
                           throws java.lang.InterruptedException,
                                  java.lang.Exception
        Waits until all items have been processed OR an Exception happens during the processing of ANY item.

        Note: If an exception does occur then the remaining items in the queue will be cleared and all current items will be cancelled.

        If you wish for processing to continue for remaining items when any item encounters an exception, then you should instead use waitForResults(). That method will return all results, both with and without exceptions, which you can then process, including checking for exceptions. Note that to use waitForResults() to examine exceptions, you must have created this queue with collectResults as true.

        Throws:
        java.lang.InterruptedException - if interrupted while waiting for a result
        java.lang.Exception - any exception encountered while processing an item (this will cancel all items in the queue).
      • waitForResults

        public java.util.Collection<QResult<I,​R>> waitForResults​(long timeout,
                                                                       java.util.concurrent.TimeUnit unit)
                                                                throws java.lang.InterruptedException
        Waits up to the specified time for scheduled jobs to complete. The results of all completed jobs will be returned if this queue has been told to collect results. At the time that this returns, there may still be work to process. The returned list will contain as much work as has been processed when the wait has finished. Repeated calls to this method will not return results from previous waits.

        You can still call this method to wait for items to be processed, even if you did not specify to collect results. In that case, the list returned will be empty.

        Returns:
        the list of QResult objects that have all the results of the completed jobs.
        Throws:
        java.lang.InterruptedException - if this call was interrupted.
      • cancelAllTasks

        public java.util.List<I> cancelAllTasks​(boolean interruptRunningTasks)
        Cancels the processing of currently scheduled items in this queue. Any items that haven't yet been scheduled on the threadPool are returned immediately from this call. Items that are currently being processed will be cancelled and those results will be available on the next waitForResults() call and also if there is a QItemListener, it will be called with the QResult. There is no guarantee that scheduled tasks will terminate any time soon. If they check the isCancelled() state of their QMonitor, it will be true. Setting the interruptRunningTasks to true, will result in a thread interrupt to any currently running task which might be useful if the task perform waiting operations like I/O.
        Parameters:
        interruptRunningTasks - if true, an attempt will be made to interrupt any currently processing thread.
        Returns:
        a list of all items that have not yet been queued to the threadPool.
      • removeUnscheduledJobs

        public java.util.List<I> removeUnscheduledJobs()
      • cancelScheduledJobs

        public void cancelScheduledJobs()
      • dispose

        public void dispose()
        Cancels all running tasks and disposes of the internal thread pool if it is a private pool.
      • waitUntilDone

        public boolean waitUntilDone​(long timeout,
                                     java.util.concurrent.TimeUnit unit)
                              throws java.lang.InterruptedException
        Throws:
        java.lang.InterruptedException