Another attempt to fix #33467. RP by default keeps compatibility, there is new constructor to enable the interrupt() behaviour. As requested by msandor, the throughput of WizardDescriptor RP is higher than one release41_fixes_base
authorjtulach@netbeans.org
Sun, 03 Jul 2005 13:38:59 +0000
changeset 418b65d466cdb6
parent 40 96fd1e07fd06
child 42 89b022b5a00e
Another attempt to fix #33467. RP by default keeps compatibility, there is new constructor to enable the interrupt() behaviour. As requested by msandor, the throughput of WizardDescriptor RP is higher than one
openide.util/apichanges.xml
openide.util/src/org/openide/util/RequestProcessor.java
openide.util/test/unit/src/org/openide/util/RequestProcessorTest.java
     1.1 --- a/openide.util/apichanges.xml	Fri Jun 24 16:35:43 2005 +0000
     1.2 +++ b/openide.util/apichanges.xml	Sun Jul 03 13:38:59 2005 +0000
     1.3 @@ -31,7 +31,11 @@
     1.4        <compatibility addition="yes" modification="yes" binary="compatible" source="compatible" semantic="compatible" deprecation="no" deletion="no"/>
     1.5        <description>
     1.6            When one calls <a href="@TOP@/org/openide/util/RequestProcessor.Task.html">RequestProcessor.Task</a>.cancel(), 
     1.7 -          the running thread gets interrupted.
     1.8 +          the running thread gets interrupted if the 
     1.9 +          <a href="@TOP@/org/openide/util/RequestProcessor.html#RequestProcessor(java.lang.String,%20int,%20boolean)">
    1.10 +          RequestProcessor(string, int, boolean) 
    1.11 +          </a> 
    1.12 +          constructor is used.
    1.13            There always was a way how to cancel not yet running Task, 
    1.14            but if the task was already running, one was out of luck. Since now
    1.15            the thread running the task is interrupted and the Runnable can check 
     2.1 --- a/openide.util/src/org/openide/util/RequestProcessor.java	Fri Jun 24 16:35:43 2005 +0000
     2.2 +++ b/openide.util/src/org/openide/util/RequestProcessor.java	Sun Jul 03 13:38:59 2005 +0000
     2.3 @@ -64,6 +64,24 @@
     2.4   * <CODE>RequestProcessor</CODE> instance with limited throughput (probably
     2.5   * set to 1), the IDE would try to run all your requests in parallel otherwise.
     2.6   *
     2.7 + * <P>
     2.8 + * Since version 6.3 there is a conditional support for interruption of long running tasks.
     2.9 + * There always was a way how to cancel not yet running task using {@link RequestProcessor.Task#cancel }
    2.10 + * but if the task was already running, one was out of luck. Since version 6.3
    2.11 + * the thread running the task is interrupted and the Runnable can check for that
    2.12 + * and terminate its execution sooner. In the runnable one shall check for 
    2.13 + * thread interruption (done from {@link RequestProcessor.Task#cancel }) and 
    2.14 + * if true, return immediatelly as in this example:
    2.15 + * <PRE>
    2.16 + * public void run () {
    2.17 + *     while (veryLongTimeLook) {
    2.18 + *       doAPieceOfIt ();
    2.19 + *
    2.20 + *       if (Thread.interrupted ()) return;
    2.21 + *     }
    2.22 + * }
    2.23 + * </PRE>
    2.24 + * 
    2.25   * @author Petr Nejedly, Jaroslav Tulach
    2.26   */
    2.27  public final class RequestProcessor {
    2.28 @@ -112,6 +130,9 @@
    2.29      /** The maximal number of processors that can perform the requests sent
    2.30       * to this RequestProcessors. If 1, all the requests are serialized. */
    2.31      private int throughput;
    2.32 +    
    2.33 +    /** support for interrupts or not? */
    2.34 +    private boolean interruptThread;
    2.35  
    2.36      /** Creates new RequestProcessor with automatically assigned unique name. */
    2.37      public RequestProcessor() {
    2.38 @@ -131,10 +152,41 @@
    2.39       * @since OpenAPI version 2.12
    2.40       */
    2.41      public RequestProcessor(String name, int throughput) {
    2.42 +        this(name, throughput, false);
    2.43 +    }
    2.44 +
    2.45 +    /** Creates a new named RequestProcessor with defined throughput which 
    2.46 +     * can support interruption of the thread the processor runs in.
    2.47 +     * There always was a way how to cancel not yet running task using {@link RequestProcessor.Task#cancel }
    2.48 +     * but if the task was already running, one was out of luck. With this
    2.49 +     * constructor one can create a {@link RequestProcessor} which threads
    2.50 +     * thread running tasks are interrupted and the Runnable can check for that
    2.51 +     * and terminate its execution sooner. In the runnable one shall check for 
    2.52 +     * thread interruption (done from {@link RequestProcessor.Task#cancel }) and 
    2.53 +     * if true, return immediatelly as in this example:
    2.54 +     * <PRE>
    2.55 +     * public void run () {
    2.56 +     *     while (veryLongTimeLook) {
    2.57 +     *       doAPieceOfIt ();
    2.58 +     *
    2.59 +     *       if (Thread.interrupted ()) return;
    2.60 +     *     }
    2.61 +     * }
    2.62 +     * </PRE>
    2.63 +     *
    2.64 +     * @param name the name to use for the request processor thread
    2.65 +     * @param throughput the maximal count of requests allowed to run in parallel
    2.66 +     * @param interruptThread true if {@link RequestProcessor.Task#cancel} shall interrupt the thread
    2.67 +     *
    2.68 +     * @since 6.3
    2.69 +     */
    2.70 +    public RequestProcessor(String name, int throughput, boolean interruptThread) {
    2.71          this.throughput = throughput;
    2.72          this.name = (name != null) ? name : ("OpenIDE-request-processor-" + (counter++));
    2.73 +        this.interruptThread = interruptThread;
    2.74      }
    2.75 -
    2.76 +    
    2.77 +    
    2.78      /** The getter for the shared instance of the <CODE>RequestProcessor</CODE>.
    2.79       *
    2.80       * @return an instance of RequestProcessor that is capable of performing
    2.81 @@ -364,21 +416,19 @@
    2.82      }
    2.83  
    2.84      Task askForWork(Processor worker, String debug) {
    2.85 -        synchronized (processorLock) {
    2.86 -            if (stopped || queue.isEmpty()) { // no more work in this burst, return him
    2.87 -                processors.remove(worker);
    2.88 -                Processor.put(worker, debug);
    2.89 -                running--;
    2.90 +        if (stopped || queue.isEmpty()) { // no more work in this burst, return him
    2.91 +            processors.remove(worker);
    2.92 +            Processor.put(worker, debug);
    2.93 +            running--;
    2.94  
    2.95 -                return null;
    2.96 -            } else { // we have some work for the worker, pass it
    2.97 +            return null;
    2.98 +        } else { // we have some work for the worker, pass it
    2.99  
   2.100 -                Item i = (Item) queue.remove(0);
   2.101 -                Task t = i.getTask();
   2.102 -                i.clear();
   2.103 +            Item i = (Item) queue.remove(0);
   2.104 +            Task t = i.getTask();
   2.105 +            i.clear(worker);
   2.106  
   2.107 -                return t;
   2.108 -            }
   2.109 +            return t;
   2.110          }
   2.111      }
   2.112  
   2.113 @@ -458,7 +508,7 @@
   2.114                  notifyRunning();
   2.115  
   2.116                  if (item != null) {
   2.117 -                    item.clear();
   2.118 +                    item.clear(null);
   2.119                  }
   2.120  
   2.121                  item = new Item(this, RequestProcessor.this);
   2.122 @@ -490,7 +540,19 @@
   2.123          */
   2.124          public boolean cancel() {
   2.125              synchronized (processorLock) {
   2.126 -                boolean success = (item == null) ? false : item.clear();
   2.127 +                boolean success;
   2.128 +
   2.129 +                if (item == null) {
   2.130 +                    success = false;
   2.131 +                } else {
   2.132 +                    Processor p = item.getProcessor();
   2.133 +                    success = item.clear(null);
   2.134 +
   2.135 +                    if (p != null) {
   2.136 +                        p.interruptTask(this, RequestProcessor.this);
   2.137 +                        item = null;
   2.138 +                    }
   2.139 +                }
   2.140  
   2.141                  if (success) {
   2.142                      notifyFinished(); // mark it as finished
   2.143 @@ -541,17 +603,17 @@
   2.144          */
   2.145          public void waitFinished() {
   2.146              if (isRequestProcessorThread()) { //System.err.println("Task.waitFinished on " + this + " from other task in RP: " + Thread.currentThread().getName());
   2.147 -
   2.148                  boolean toRun;
   2.149  
   2.150                  synchronized (processorLock) {
   2.151                      // correct line:    toRun = (item == null) ? !isFinished (): (item.clear() && !isFinished ());
   2.152                      // the same:        toRun = !isFinished () && (item == null ? true : item.clear ());
   2.153 -                    toRun = !isFinished() && ((item == null) || item.clear());
   2.154 +                    toRun = !isFinished() && ((item == null) || item.clear(null));
   2.155                  }
   2.156  
   2.157                  if (toRun) { //System.err.println("    ## running it synchronously");
   2.158 -                    run();
   2.159 +                    Processor processor = (Processor)Thread.currentThread();
   2.160 +                    processor.doEvaluate (this, processorLock, RequestProcessor.this);
   2.161                  } else { // it is already running in other thread of this RP
   2.162  
   2.163                      if (lastThread != Thread.currentThread()) {
   2.164 @@ -586,7 +648,7 @@
   2.165                  boolean toRun;
   2.166  
   2.167                  synchronized (processorLock) {
   2.168 -                    toRun = !isFinished() && ((item == null) || item.clear());
   2.169 +                    toRun = !isFinished() && ((item == null) || item.clear(null));
   2.170                  }
   2.171  
   2.172                  if (toRun) {
   2.173 @@ -614,7 +676,7 @@
   2.174      /* One item representing the task pending in the pending queue */
   2.175      private static class Item extends Exception {
   2.176          private final RequestProcessor owner;
   2.177 -        private Task action;
   2.178 +        private Object action;
   2.179          private boolean enqueued;
   2.180  
   2.181          Item(Task task, RequestProcessor rp) {
   2.182 @@ -624,22 +686,30 @@
   2.183          }
   2.184  
   2.185          Task getTask() {
   2.186 -            return action;
   2.187 +            Object a = action;
   2.188 +
   2.189 +            return (a instanceof Task) ? (Task) a : null;
   2.190          }
   2.191  
   2.192          /** Annulate this request iff still possible.
   2.193           * @returns true if it was possible to skip this item, false
   2.194           * if the item was/is already processed */
   2.195 -        boolean clear() {
   2.196 +        boolean clear(Processor processor) {
   2.197              synchronized (owner.processorLock) {
   2.198 -                action = null;
   2.199 +                action = processor;
   2.200  
   2.201                  return enqueued ? owner.queue.remove(this) : true;
   2.202              }
   2.203          }
   2.204  
   2.205 +        Processor getProcessor() {
   2.206 +            Object a = action;
   2.207 +
   2.208 +            return (a instanceof Processor) ? (Processor) a : null;
   2.209 +        }
   2.210 +
   2.211          int getPriority() {
   2.212 -            return action.getPriority();
   2.213 +            return getTask().getPriority();
   2.214          }
   2.215  
   2.216          public Throwable fillInStackTrace() {
   2.217 @@ -669,6 +739,9 @@
   2.218  
   2.219          //private Item task;
   2.220          private RequestProcessor source;
   2.221 +
   2.222 +        /** task we are working on */
   2.223 +        private RequestProcessor.Task todo;
   2.224          private boolean idle = true;
   2.225  
   2.226          /** Waiting lock */
   2.227 @@ -771,7 +844,6 @@
   2.228                      }
   2.229                  }
   2.230  
   2.231 -                Task todo;
   2.232                  String debug = null;
   2.233  
   2.234                  ErrorManager em = logger();
   2.235 @@ -782,8 +854,12 @@
   2.236                  }
   2.237  
   2.238                  // while we have something to do
   2.239 -                while ((todo = current.askForWork(this, debug)) != null) {
   2.240 -                    // if(todo != null) {
   2.241 +                for (;;) {
   2.242 +                    // need the same sync as interruptTask
   2.243 +                    synchronized (current.processorLock) {
   2.244 +                        todo = current.askForWork(this, debug);
   2.245 +                        if (todo == null) break;
   2.246 +                    }
   2.247                      setPrio(todo.getPriority());
   2.248  
   2.249                      try {
   2.250 @@ -813,10 +889,14 @@
   2.251                          doNotify(todo, t);
   2.252                      }
   2.253  
   2.254 -                    // to improve GC
   2.255 -                    todo = null;
   2.256 -
   2.257 -                    // }
   2.258 +                    // need the same sync as interruptTask
   2.259 +                    synchronized (current.processorLock) {
   2.260 +                        // to improve GC
   2.261 +                        todo = null;
   2.262 +                        // and to clear any possible interrupted state
   2.263 +                        // set by calling Task.cancel ()
   2.264 +                        Thread.interrupted();
   2.265 +                    }
   2.266                  }
   2.267  
   2.268                  if (loggable) {
   2.269 @@ -824,6 +904,41 @@
   2.270                  }
   2.271              }
   2.272          }
   2.273 +        
   2.274 +        /** Evaluates given task directly.
   2.275 +         */
   2.276 +        final void doEvaluate (Task t, Object processorLock, RequestProcessor src) {
   2.277 +            Task previous = todo;
   2.278 +            boolean interrupted = Thread.interrupted();
   2.279 +            try {
   2.280 +                todo = t;
   2.281 +                t.run ();
   2.282 +            } finally {
   2.283 +                synchronized (processorLock) {
   2.284 +                    todo = previous;
   2.285 +                    if (interrupted || todo.item == null) {
   2.286 +                        if (src.interruptThread) {
   2.287 +                            // reinterrupt the thread if it was interrupted and
   2.288 +                            // we support interrupts
   2.289 +                            Thread.currentThread().interrupt();
   2.290 +                        }
   2.291 +                    }
   2.292 +                }
   2.293 +            }
   2.294 +        }
   2.295 +
   2.296 +        /** Called under the processorLock */
   2.297 +        public void interruptTask(Task t, RequestProcessor src) {
   2.298 +            if (t != todo) {
   2.299 +                // not running this task so
   2.300 +                return;
   2.301 +            }
   2.302 +            
   2.303 +            if (src.interruptThread) {
   2.304 +                // otherwise interrupt this thread
   2.305 +                interrupt();
   2.306 +            }
   2.307 +        }
   2.308  
   2.309          /** @see "#20467" */
   2.310          private static void doNotify(RequestProcessor.Task todo, Throwable ex) {
     3.1 --- a/openide.util/test/unit/src/org/openide/util/RequestProcessorTest.java	Fri Jun 24 16:35:43 2005 +0000
     3.2 +++ b/openide.util/test/unit/src/org/openide/util/RequestProcessorTest.java	Sun Jul 03 13:38:59 2005 +0000
     3.3 @@ -690,7 +690,7 @@
     3.4      }
     3.5      
     3.6      public void testCancelInterruptsTheRunningThread () throws Exception {
     3.7 -        RequestProcessor rp = new RequestProcessor ("Cancellable");
     3.8 +        RequestProcessor rp = new RequestProcessor ("Cancellable", 1, true);
     3.9          
    3.10          class R implements Runnable {
    3.11              public boolean checkBefore;
    3.12 @@ -750,9 +750,72 @@
    3.13              assertTrue ("But interupted after", r.checkAfter);
    3.14          }
    3.15      }
    3.16 +
    3.17 +    public void testCancelDoesNotInterruptTheRunningThread () throws Exception {
    3.18 +        RequestProcessor rp = new RequestProcessor ("Not Cancellable", 1, false);
    3.19 +        
    3.20 +        class R implements Runnable {
    3.21 +            public boolean checkBefore;
    3.22 +            public boolean checkAfter;
    3.23 +            public boolean interrupted;
    3.24 +            
    3.25 +            public synchronized void run () {
    3.26 +                checkBefore = Thread.interrupted();
    3.27 +                
    3.28 +                notifyAll ();
    3.29 +                
    3.30 +                try {
    3.31 +                    wait ();
    3.32 +                    interrupted = false;
    3.33 +                } catch (InterruptedException ex) {
    3.34 +                    interrupted = true;
    3.35 +                }
    3.36 +                
    3.37 +                notifyAll ();
    3.38 +                
    3.39 +                try {
    3.40 +                    wait ();
    3.41 +                } catch (InterruptedException ex) {
    3.42 +                }
    3.43 +                
    3.44 +                checkAfter = Thread.interrupted();
    3.45 +                
    3.46 +                notifyAll ();
    3.47 +            }
    3.48 +        }
    3.49 +        
    3.50 +        R r = new R ();
    3.51 +        synchronized (r) {
    3.52 +            RequestProcessor.Task t = rp.post (r);
    3.53 +            r.wait ();
    3.54 +            assertTrue ("The task is already running", !t.cancel ());
    3.55 +            r.notifyAll ();
    3.56 +            r.wait ();
    3.57 +            r.notifyAll ();
    3.58 +            r.wait ();
    3.59 +            assertFalse ("The task has not been interrupted", r.interrupted);
    3.60 +            assertTrue ("Not before", !r.checkBefore);
    3.61 +            assertTrue ("Not after - as the notification was thru InterruptedException", !r.checkAfter);
    3.62 +        }
    3.63 +        
    3.64 +        // interrupt after the task has finished
    3.65 +        r = new R ();
    3.66 +        synchronized (r) {
    3.67 +            RequestProcessor.Task t = rp.post (r);
    3.68 +            r.wait ();
    3.69 +            r.notifyAll ();
    3.70 +            r.wait ();
    3.71 +            assertTrue ("The task is already running", !t.cancel ());
    3.72 +            r.notifyAll ();
    3.73 +            r.wait ();
    3.74 +            assertTrue ("The task has not been interrupted by exception", !r.interrupted);
    3.75 +            assertFalse ("Not interupted before", r.checkBefore);
    3.76 +            assertFalse ("Not interupted after", r.checkAfter);
    3.77 +        }
    3.78 +    }
    3.79      
    3.80      public void testInterruptedStatusIsClearedBetweenTwoTaskExecution () throws Exception {
    3.81 -        RequestProcessor rp = new RequestProcessor ("testInterruptedStatusIsClearedBetweenTwoTaskExecution");
    3.82 +        RequestProcessor rp = new RequestProcessor ("testInterruptedStatusIsClearedBetweenTwoTaskExecution", 1, true);
    3.83          
    3.84          final RequestProcessor.Task[] task = new RequestProcessor.Task[1];
    3.85          // test interrupted status is cleared after task ends
    3.86 @@ -797,7 +860,7 @@
    3.87      }
    3.88      
    3.89      public void testInterruptedStatusWorksInInversedTasks() throws Exception {
    3.90 -        RequestProcessor rp = new RequestProcessor ("testInterruptedStatusWorksInInversedTasks");
    3.91 +        RequestProcessor rp = new RequestProcessor ("testInterruptedStatusWorksInInversedTasks", 1, true);
    3.92          
    3.93          class Fail implements Runnable {
    3.94              public Fail (String n) {
    3.95 @@ -874,7 +937,7 @@
    3.96      }
    3.97  
    3.98      public void testInterruptedStatusWorksInInversedTasksWhenInterruptedSoon() throws Exception {
    3.99 -        RequestProcessor rp = new RequestProcessor ("testInterruptedStatusWorksInInversedTasksWhenInterruptedSoon");
   3.100 +        RequestProcessor rp = new RequestProcessor ("testInterruptedStatusWorksInInversedTasksWhenInterruptedSoon", 1, true);
   3.101          
   3.102          class Fail implements Runnable {
   3.103              public RequestProcessor.Task wait;