Another attempt to fix #33467. RP by default keeps compatibility, there is new constructor to enable the interrupt() behaviour. As requested by msandor, the throughput of WizardDescriptor RP is higher than one
1.1 --- a/openide.util/apichanges.xml Fri Jun 24 16:35:43 2005 +0000
1.2 +++ b/openide.util/apichanges.xml Sun Jul 03 13:38:59 2005 +0000
1.3 @@ -31,7 +31,11 @@
1.4 <compatibility addition="yes" modification="yes" binary="compatible" source="compatible" semantic="compatible" deprecation="no" deletion="no"/>
1.5 <description>
1.6 When one calls <a href="@TOP@/org/openide/util/RequestProcessor.Task.html">RequestProcessor.Task</a>.cancel(),
1.7 - the running thread gets interrupted.
1.8 + the running thread gets interrupted if the
1.9 + <a href="@TOP@/org/openide/util/RequestProcessor.html#RequestProcessor(java.lang.String,%20int,%20boolean)">
1.10 + RequestProcessor(string, int, boolean)
1.11 + </a>
1.12 + constructor is used.
1.13 There always was a way how to cancel not yet running Task,
1.14 but if the task was already running, one was out of luck. Since now
1.15 the thread running the task is interrupted and the Runnable can check
2.1 --- a/openide.util/src/org/openide/util/RequestProcessor.java Fri Jun 24 16:35:43 2005 +0000
2.2 +++ b/openide.util/src/org/openide/util/RequestProcessor.java Sun Jul 03 13:38:59 2005 +0000
2.3 @@ -64,6 +64,24 @@
2.4 * <CODE>RequestProcessor</CODE> instance with limited throughput (probably
2.5 * set to 1), the IDE would try to run all your requests in parallel otherwise.
2.6 *
2.7 + * <P>
2.8 + * Since version 6.3 there is a conditional support for interruption of long running tasks.
2.9 + * There always was a way how to cancel not yet running task using {@link RequestProcessor.Task#cancel }
2.10 + * but if the task was already running, one was out of luck. Since version 6.3
2.11 + * the thread running the task is interrupted and the Runnable can check for that
2.12 + * and terminate its execution sooner. In the runnable one shall check for
2.13 + * thread interruption (done from {@link RequestProcessor.Task#cancel }) and
2.14 + * if true, return immediatelly as in this example:
2.15 + * <PRE>
2.16 + * public void run () {
2.17 + * while (veryLongTimeLook) {
2.18 + * doAPieceOfIt ();
2.19 + *
2.20 + * if (Thread.interrupted ()) return;
2.21 + * }
2.22 + * }
2.23 + * </PRE>
2.24 + *
2.25 * @author Petr Nejedly, Jaroslav Tulach
2.26 */
2.27 public final class RequestProcessor {
2.28 @@ -112,6 +130,9 @@
2.29 /** The maximal number of processors that can perform the requests sent
2.30 * to this RequestProcessors. If 1, all the requests are serialized. */
2.31 private int throughput;
2.32 +
2.33 + /** support for interrupts or not? */
2.34 + private boolean interruptThread;
2.35
2.36 /** Creates new RequestProcessor with automatically assigned unique name. */
2.37 public RequestProcessor() {
2.38 @@ -131,10 +152,41 @@
2.39 * @since OpenAPI version 2.12
2.40 */
2.41 public RequestProcessor(String name, int throughput) {
2.42 + this(name, throughput, false);
2.43 + }
2.44 +
2.45 + /** Creates a new named RequestProcessor with defined throughput which
2.46 + * can support interruption of the thread the processor runs in.
2.47 + * There always was a way how to cancel not yet running task using {@link RequestProcessor.Task#cancel }
2.48 + * but if the task was already running, one was out of luck. With this
2.49 + * constructor one can create a {@link RequestProcessor} which threads
2.50 + * thread running tasks are interrupted and the Runnable can check for that
2.51 + * and terminate its execution sooner. In the runnable one shall check for
2.52 + * thread interruption (done from {@link RequestProcessor.Task#cancel }) and
2.53 + * if true, return immediatelly as in this example:
2.54 + * <PRE>
2.55 + * public void run () {
2.56 + * while (veryLongTimeLook) {
2.57 + * doAPieceOfIt ();
2.58 + *
2.59 + * if (Thread.interrupted ()) return;
2.60 + * }
2.61 + * }
2.62 + * </PRE>
2.63 + *
2.64 + * @param name the name to use for the request processor thread
2.65 + * @param throughput the maximal count of requests allowed to run in parallel
2.66 + * @param interruptThread true if {@link RequestProcessor.Task#cancel} shall interrupt the thread
2.67 + *
2.68 + * @since 6.3
2.69 + */
2.70 + public RequestProcessor(String name, int throughput, boolean interruptThread) {
2.71 this.throughput = throughput;
2.72 this.name = (name != null) ? name : ("OpenIDE-request-processor-" + (counter++));
2.73 + this.interruptThread = interruptThread;
2.74 }
2.75 -
2.76 +
2.77 +
2.78 /** The getter for the shared instance of the <CODE>RequestProcessor</CODE>.
2.79 *
2.80 * @return an instance of RequestProcessor that is capable of performing
2.81 @@ -364,21 +416,19 @@
2.82 }
2.83
2.84 Task askForWork(Processor worker, String debug) {
2.85 - synchronized (processorLock) {
2.86 - if (stopped || queue.isEmpty()) { // no more work in this burst, return him
2.87 - processors.remove(worker);
2.88 - Processor.put(worker, debug);
2.89 - running--;
2.90 + if (stopped || queue.isEmpty()) { // no more work in this burst, return him
2.91 + processors.remove(worker);
2.92 + Processor.put(worker, debug);
2.93 + running--;
2.94
2.95 - return null;
2.96 - } else { // we have some work for the worker, pass it
2.97 + return null;
2.98 + } else { // we have some work for the worker, pass it
2.99
2.100 - Item i = (Item) queue.remove(0);
2.101 - Task t = i.getTask();
2.102 - i.clear();
2.103 + Item i = (Item) queue.remove(0);
2.104 + Task t = i.getTask();
2.105 + i.clear(worker);
2.106
2.107 - return t;
2.108 - }
2.109 + return t;
2.110 }
2.111 }
2.112
2.113 @@ -458,7 +508,7 @@
2.114 notifyRunning();
2.115
2.116 if (item != null) {
2.117 - item.clear();
2.118 + item.clear(null);
2.119 }
2.120
2.121 item = new Item(this, RequestProcessor.this);
2.122 @@ -490,7 +540,19 @@
2.123 */
2.124 public boolean cancel() {
2.125 synchronized (processorLock) {
2.126 - boolean success = (item == null) ? false : item.clear();
2.127 + boolean success;
2.128 +
2.129 + if (item == null) {
2.130 + success = false;
2.131 + } else {
2.132 + Processor p = item.getProcessor();
2.133 + success = item.clear(null);
2.134 +
2.135 + if (p != null) {
2.136 + p.interruptTask(this, RequestProcessor.this);
2.137 + item = null;
2.138 + }
2.139 + }
2.140
2.141 if (success) {
2.142 notifyFinished(); // mark it as finished
2.143 @@ -541,17 +603,17 @@
2.144 */
2.145 public void waitFinished() {
2.146 if (isRequestProcessorThread()) { //System.err.println("Task.waitFinished on " + this + " from other task in RP: " + Thread.currentThread().getName());
2.147 -
2.148 boolean toRun;
2.149
2.150 synchronized (processorLock) {
2.151 // correct line: toRun = (item == null) ? !isFinished (): (item.clear() && !isFinished ());
2.152 // the same: toRun = !isFinished () && (item == null ? true : item.clear ());
2.153 - toRun = !isFinished() && ((item == null) || item.clear());
2.154 + toRun = !isFinished() && ((item == null) || item.clear(null));
2.155 }
2.156
2.157 if (toRun) { //System.err.println(" ## running it synchronously");
2.158 - run();
2.159 + Processor processor = (Processor)Thread.currentThread();
2.160 + processor.doEvaluate (this, processorLock, RequestProcessor.this);
2.161 } else { // it is already running in other thread of this RP
2.162
2.163 if (lastThread != Thread.currentThread()) {
2.164 @@ -586,7 +648,7 @@
2.165 boolean toRun;
2.166
2.167 synchronized (processorLock) {
2.168 - toRun = !isFinished() && ((item == null) || item.clear());
2.169 + toRun = !isFinished() && ((item == null) || item.clear(null));
2.170 }
2.171
2.172 if (toRun) {
2.173 @@ -614,7 +676,7 @@
2.174 /* One item representing the task pending in the pending queue */
2.175 private static class Item extends Exception {
2.176 private final RequestProcessor owner;
2.177 - private Task action;
2.178 + private Object action;
2.179 private boolean enqueued;
2.180
2.181 Item(Task task, RequestProcessor rp) {
2.182 @@ -624,22 +686,30 @@
2.183 }
2.184
2.185 Task getTask() {
2.186 - return action;
2.187 + Object a = action;
2.188 +
2.189 + return (a instanceof Task) ? (Task) a : null;
2.190 }
2.191
2.192 /** Annulate this request iff still possible.
2.193 * @returns true if it was possible to skip this item, false
2.194 * if the item was/is already processed */
2.195 - boolean clear() {
2.196 + boolean clear(Processor processor) {
2.197 synchronized (owner.processorLock) {
2.198 - action = null;
2.199 + action = processor;
2.200
2.201 return enqueued ? owner.queue.remove(this) : true;
2.202 }
2.203 }
2.204
2.205 + Processor getProcessor() {
2.206 + Object a = action;
2.207 +
2.208 + return (a instanceof Processor) ? (Processor) a : null;
2.209 + }
2.210 +
2.211 int getPriority() {
2.212 - return action.getPriority();
2.213 + return getTask().getPriority();
2.214 }
2.215
2.216 public Throwable fillInStackTrace() {
2.217 @@ -669,6 +739,9 @@
2.218
2.219 //private Item task;
2.220 private RequestProcessor source;
2.221 +
2.222 + /** task we are working on */
2.223 + private RequestProcessor.Task todo;
2.224 private boolean idle = true;
2.225
2.226 /** Waiting lock */
2.227 @@ -771,7 +844,6 @@
2.228 }
2.229 }
2.230
2.231 - Task todo;
2.232 String debug = null;
2.233
2.234 ErrorManager em = logger();
2.235 @@ -782,8 +854,12 @@
2.236 }
2.237
2.238 // while we have something to do
2.239 - while ((todo = current.askForWork(this, debug)) != null) {
2.240 - // if(todo != null) {
2.241 + for (;;) {
2.242 + // need the same sync as interruptTask
2.243 + synchronized (current.processorLock) {
2.244 + todo = current.askForWork(this, debug);
2.245 + if (todo == null) break;
2.246 + }
2.247 setPrio(todo.getPriority());
2.248
2.249 try {
2.250 @@ -813,10 +889,14 @@
2.251 doNotify(todo, t);
2.252 }
2.253
2.254 - // to improve GC
2.255 - todo = null;
2.256 -
2.257 - // }
2.258 + // need the same sync as interruptTask
2.259 + synchronized (current.processorLock) {
2.260 + // to improve GC
2.261 + todo = null;
2.262 + // and to clear any possible interrupted state
2.263 + // set by calling Task.cancel ()
2.264 + Thread.interrupted();
2.265 + }
2.266 }
2.267
2.268 if (loggable) {
2.269 @@ -824,6 +904,41 @@
2.270 }
2.271 }
2.272 }
2.273 +
2.274 + /** Evaluates given task directly.
2.275 + */
2.276 + final void doEvaluate (Task t, Object processorLock, RequestProcessor src) {
2.277 + Task previous = todo;
2.278 + boolean interrupted = Thread.interrupted();
2.279 + try {
2.280 + todo = t;
2.281 + t.run ();
2.282 + } finally {
2.283 + synchronized (processorLock) {
2.284 + todo = previous;
2.285 + if (interrupted || todo.item == null) {
2.286 + if (src.interruptThread) {
2.287 + // reinterrupt the thread if it was interrupted and
2.288 + // we support interrupts
2.289 + Thread.currentThread().interrupt();
2.290 + }
2.291 + }
2.292 + }
2.293 + }
2.294 + }
2.295 +
2.296 + /** Called under the processorLock */
2.297 + public void interruptTask(Task t, RequestProcessor src) {
2.298 + if (t != todo) {
2.299 + // not running this task so
2.300 + return;
2.301 + }
2.302 +
2.303 + if (src.interruptThread) {
2.304 + // otherwise interrupt this thread
2.305 + interrupt();
2.306 + }
2.307 + }
2.308
2.309 /** @see "#20467" */
2.310 private static void doNotify(RequestProcessor.Task todo, Throwable ex) {
3.1 --- a/openide.util/test/unit/src/org/openide/util/RequestProcessorTest.java Fri Jun 24 16:35:43 2005 +0000
3.2 +++ b/openide.util/test/unit/src/org/openide/util/RequestProcessorTest.java Sun Jul 03 13:38:59 2005 +0000
3.3 @@ -690,7 +690,7 @@
3.4 }
3.5
3.6 public void testCancelInterruptsTheRunningThread () throws Exception {
3.7 - RequestProcessor rp = new RequestProcessor ("Cancellable");
3.8 + RequestProcessor rp = new RequestProcessor ("Cancellable", 1, true);
3.9
3.10 class R implements Runnable {
3.11 public boolean checkBefore;
3.12 @@ -750,9 +750,72 @@
3.13 assertTrue ("But interupted after", r.checkAfter);
3.14 }
3.15 }
3.16 +
3.17 + public void testCancelDoesNotInterruptTheRunningThread () throws Exception {
3.18 + RequestProcessor rp = new RequestProcessor ("Not Cancellable", 1, false);
3.19 +
3.20 + class R implements Runnable {
3.21 + public boolean checkBefore;
3.22 + public boolean checkAfter;
3.23 + public boolean interrupted;
3.24 +
3.25 + public synchronized void run () {
3.26 + checkBefore = Thread.interrupted();
3.27 +
3.28 + notifyAll ();
3.29 +
3.30 + try {
3.31 + wait ();
3.32 + interrupted = false;
3.33 + } catch (InterruptedException ex) {
3.34 + interrupted = true;
3.35 + }
3.36 +
3.37 + notifyAll ();
3.38 +
3.39 + try {
3.40 + wait ();
3.41 + } catch (InterruptedException ex) {
3.42 + }
3.43 +
3.44 + checkAfter = Thread.interrupted();
3.45 +
3.46 + notifyAll ();
3.47 + }
3.48 + }
3.49 +
3.50 + R r = new R ();
3.51 + synchronized (r) {
3.52 + RequestProcessor.Task t = rp.post (r);
3.53 + r.wait ();
3.54 + assertTrue ("The task is already running", !t.cancel ());
3.55 + r.notifyAll ();
3.56 + r.wait ();
3.57 + r.notifyAll ();
3.58 + r.wait ();
3.59 + assertFalse ("The task has not been interrupted", r.interrupted);
3.60 + assertTrue ("Not before", !r.checkBefore);
3.61 + assertTrue ("Not after - as the notification was thru InterruptedException", !r.checkAfter);
3.62 + }
3.63 +
3.64 + // interrupt after the task has finished
3.65 + r = new R ();
3.66 + synchronized (r) {
3.67 + RequestProcessor.Task t = rp.post (r);
3.68 + r.wait ();
3.69 + r.notifyAll ();
3.70 + r.wait ();
3.71 + assertTrue ("The task is already running", !t.cancel ());
3.72 + r.notifyAll ();
3.73 + r.wait ();
3.74 + assertTrue ("The task has not been interrupted by exception", !r.interrupted);
3.75 + assertFalse ("Not interupted before", r.checkBefore);
3.76 + assertFalse ("Not interupted after", r.checkAfter);
3.77 + }
3.78 + }
3.79
3.80 public void testInterruptedStatusIsClearedBetweenTwoTaskExecution () throws Exception {
3.81 - RequestProcessor rp = new RequestProcessor ("testInterruptedStatusIsClearedBetweenTwoTaskExecution");
3.82 + RequestProcessor rp = new RequestProcessor ("testInterruptedStatusIsClearedBetweenTwoTaskExecution", 1, true);
3.83
3.84 final RequestProcessor.Task[] task = new RequestProcessor.Task[1];
3.85 // test interrupted status is cleared after task ends
3.86 @@ -797,7 +860,7 @@
3.87 }
3.88
3.89 public void testInterruptedStatusWorksInInversedTasks() throws Exception {
3.90 - RequestProcessor rp = new RequestProcessor ("testInterruptedStatusWorksInInversedTasks");
3.91 + RequestProcessor rp = new RequestProcessor ("testInterruptedStatusWorksInInversedTasks", 1, true);
3.92
3.93 class Fail implements Runnable {
3.94 public Fail (String n) {
3.95 @@ -874,7 +937,7 @@
3.96 }
3.97
3.98 public void testInterruptedStatusWorksInInversedTasksWhenInterruptedSoon() throws Exception {
3.99 - RequestProcessor rp = new RequestProcessor ("testInterruptedStatusWorksInInversedTasksWhenInterruptedSoon");
3.100 + RequestProcessor rp = new RequestProcessor ("testInterruptedStatusWorksInInversedTasksWhenInterruptedSoon", 1, true);
3.101
3.102 class Fail implements Runnable {
3.103 public RequestProcessor.Task wait;