View Javadoc
1   /*
2    * Copyright (C) 2011 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package com.google.common.util.concurrent;
18  
19  import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
20  import com.google.common.util.concurrent.Service.State;
21  
22  import junit.framework.TestCase;
23  
24  import java.util.concurrent.CyclicBarrier;
25  import java.util.concurrent.ExecutionException;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.Future;
28  import java.util.concurrent.ScheduledExecutorService;
29  import java.util.concurrent.ScheduledFuture;
30  import java.util.concurrent.ScheduledThreadPoolExecutor;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicReference;
35  
36  /**
37   * Unit test for {@link AbstractScheduledService}.
38   *
39   * @author Luke Sandberg
40   */
41  
42  public class AbstractScheduledServiceTest extends TestCase {
43  
44    volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
45    volatile ScheduledFuture<?> future = null;
46  
47    volatile boolean atFixedRateCalled = false;
48    volatile boolean withFixedDelayCalled = false;
49    volatile boolean scheduleCalled = false;
50  
51    final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) {
52      @Override
53      public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
54          long delay, TimeUnit unit) {
55        return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
56      }
57    };
58  
59    public void testServiceStartStop() throws Exception {
60      NullService service = new NullService();
61      service.startAsync().awaitRunning();
62      assertFalse(future.isDone());
63      service.stopAsync().awaitTerminated();
64      assertTrue(future.isCancelled());
65    }
66  
67    private class NullService extends AbstractScheduledService {
68      @Override protected void runOneIteration() throws Exception {}
69      @Override protected Scheduler scheduler() { return configuration; }
70      @Override protected ScheduledExecutorService executor() { return executor; }
71    }
72  
73    public void testFailOnExceptionFromRun() throws Exception {
74      TestService service = new TestService();
75      service.runException = new Exception();
76      service.startAsync().awaitRunning();
77      service.runFirstBarrier.await();
78      service.runSecondBarrier.await();
79      try {
80        future.get();
81        fail();
82      } catch (ExecutionException e) {
83        // An execution exception holds a runtime exception (from throwables.propogate) that holds our
84        // original exception.
85        assertEquals(service.runException, e.getCause().getCause());
86      }
87      assertEquals(service.state(), Service.State.FAILED);
88    }
89  
90    public void testFailOnExceptionFromStartUp() {
91      TestService service = new TestService();
92      service.startUpException = new Exception();
93      try {
94        service.startAsync().awaitRunning();
95        fail();
96      } catch (IllegalStateException e) {
97        assertEquals(service.startUpException, e.getCause());
98      }
99      assertEquals(0, service.numberOfTimesRunCalled.get());
100     assertEquals(Service.State.FAILED, service.state());
101   }
102 
103   public void testFailOnExceptionFromShutDown() throws Exception {
104     TestService service = new TestService();
105     service.shutDownException = new Exception();
106     service.startAsync().awaitRunning();
107     service.runFirstBarrier.await();
108     service.stopAsync();
109     service.runSecondBarrier.await();
110     try {
111       service.awaitTerminated();
112       fail();
113     } catch (IllegalStateException e) {
114       assertEquals(service.shutDownException, e.getCause());
115     }
116     assertEquals(Service.State.FAILED, service.state());
117   }
118 
119   public void testRunOneIterationCalledMultipleTimes() throws Exception {
120     TestService service = new TestService();
121     service.startAsync().awaitRunning();
122     for (int i = 1; i < 10; i++) {
123       service.runFirstBarrier.await();
124       assertEquals(i, service.numberOfTimesRunCalled.get());
125       service.runSecondBarrier.await();
126     }
127     service.runFirstBarrier.await();
128     service.stopAsync();
129     service.runSecondBarrier.await();
130     service.stopAsync().awaitTerminated();
131   }
132 
133   public void testExecutorOnlyCalledOnce() throws Exception {
134     TestService service = new TestService();
135     service.startAsync().awaitRunning();
136     // It should be called once during startup.
137     assertEquals(1, service.numberOfTimesExecutorCalled.get());
138     for (int i = 1; i < 10; i++) {
139       service.runFirstBarrier.await();
140       assertEquals(i, service.numberOfTimesRunCalled.get());
141       service.runSecondBarrier.await();
142     }
143     service.runFirstBarrier.await();
144     service.stopAsync();
145     service.runSecondBarrier.await();
146     service.stopAsync().awaitTerminated();
147     // Only called once overall.
148     assertEquals(1, service.numberOfTimesExecutorCalled.get());
149   }
150 
151   public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
152     final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference();
153     AbstractScheduledService service = new AbstractScheduledService() {
154       @Override protected void runOneIteration() throws Exception {}
155 
156       @Override protected ScheduledExecutorService executor() {
157         executor.set(super.executor());
158         return executor.get();
159       }
160 
161       @Override protected Scheduler scheduler() {
162         return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
163       }
164     };
165 
166     service.startAsync();
167     assertFalse(service.executor().isShutdown());
168     service.awaitRunning();
169     service.stopAsync();
170     service.awaitTerminated();
171     assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS));
172   }
173 
174   public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
175     final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference();
176     AbstractScheduledService service = new AbstractScheduledService() {
177       @Override protected void startUp() throws Exception {
178         throw new Exception("Failed");
179       }
180 
181       @Override protected void runOneIteration() throws Exception {}
182 
183       @Override protected ScheduledExecutorService executor() {
184         executor.set(super.executor());
185         return executor.get();
186       }
187 
188       @Override protected Scheduler scheduler() {
189         return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
190       }
191     };
192 
193     try {
194       service.startAsync().awaitRunning();
195       fail("Expected service to fail during startup");
196     } catch (IllegalStateException expected) {}
197 
198     assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS));
199   }
200 
201   public void testSchedulerOnlyCalledOnce() throws Exception {
202     TestService service = new TestService();
203     service.startAsync().awaitRunning();
204     // It should be called once during startup.
205     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
206     for (int i = 1; i < 10; i++) {
207       service.runFirstBarrier.await();
208       assertEquals(i, service.numberOfTimesRunCalled.get());
209       service.runSecondBarrier.await();
210     }
211     service.runFirstBarrier.await();
212     service.stopAsync();
213     service.runSecondBarrier.await();
214     service.awaitTerminated();
215     // Only called once overall.
216     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
217   }
218 
219   private class TestService extends AbstractScheduledService {
220     CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
221     CyclicBarrier runSecondBarrier = new CyclicBarrier(2);
222 
223     volatile boolean startUpCalled = false;
224     volatile boolean shutDownCalled = false;
225     AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
226     AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
227     AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
228     volatile Exception runException = null;
229     volatile Exception startUpException = null;
230     volatile Exception shutDownException = null;
231 
232     @Override
233     protected void runOneIteration() throws Exception {
234       assertTrue(startUpCalled);
235       assertFalse(shutDownCalled);
236       numberOfTimesRunCalled.incrementAndGet();
237       assertEquals(State.RUNNING, state());
238       runFirstBarrier.await();
239       runSecondBarrier.await();
240       if (runException != null) {
241         throw runException;
242       }
243     }
244 
245     @Override
246     protected void startUp() throws Exception {
247       assertFalse(startUpCalled);
248       assertFalse(shutDownCalled);
249       startUpCalled = true;
250       assertEquals(State.STARTING, state());
251       if (startUpException != null) {
252         throw startUpException;
253       }
254     }
255 
256     @Override
257     protected void shutDown() throws Exception {
258       assertTrue(startUpCalled);
259       assertFalse(shutDownCalled);
260       shutDownCalled = true;
261       if (shutDownException != null) {
262         throw shutDownException;
263       }
264     }
265 
266     @Override
267     protected ScheduledExecutorService executor() {
268       numberOfTimesExecutorCalled.incrementAndGet();
269       return executor;
270     }
271 
272     @Override
273     protected Scheduler scheduler() {
274       numberOfTimesSchedulerCalled.incrementAndGet();
275       return configuration;
276     }
277   }
278 
279   public static class SchedulerTest extends TestCase {
280     // These constants are arbitrary and just used to make sure that the correct method is called
281     // with the correct parameters.
282     private static final int initialDelay = 10;
283     private static final int delay = 20;
284     private static final TimeUnit unit = TimeUnit.MILLISECONDS;
285 
286     // Unique runnable object used for comparison.
287     final Runnable testRunnable = new Runnable() {@Override public void run() {}};
288     boolean called = false;
289 
290     private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay,
291         long delay, TimeUnit unit) {
292       assertFalse(called);  // only called once.
293       called = true;
294       assertEquals(SchedulerTest.initialDelay, initialDelay);
295       assertEquals(SchedulerTest.delay, delay);
296       assertEquals(SchedulerTest.unit, unit);
297       assertEquals(testRunnable, command);
298     }
299 
300     public void testFixedRateSchedule() {
301       Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
302       schedule.schedule(null, new ScheduledThreadPoolExecutor(1) {
303         @Override
304         public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
305             long period, TimeUnit unit) {
306           assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
307           return null;
308         }
309       }, testRunnable);
310       assertTrue(called);
311     }
312 
313     public void testFixedDelaySchedule() {
314       Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit);
315       schedule.schedule(null, new ScheduledThreadPoolExecutor(10) {
316         @Override
317         public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
318             long delay, TimeUnit unit) {
319           assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
320           return null;
321         }
322       }, testRunnable);
323       assertTrue(called);
324     }
325 
326     private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
327       public AtomicInteger scheduleCounter = new AtomicInteger(0);
328       @Override
329       protected Schedule getNextSchedule() throws Exception {
330         scheduleCounter.incrementAndGet();
331         return new Schedule(0, TimeUnit.SECONDS);
332       }
333     }
334 
335     public void testCustomSchedule_startStop() throws Exception {
336       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
337       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
338       final AtomicBoolean shouldWait = new AtomicBoolean(true);
339       Runnable task = new Runnable() {
340         @Override public void run() {
341           try {
342             if (shouldWait.get()) {
343               firstBarrier.await();
344               secondBarrier.await();
345             }
346           } catch (Exception e) {
347             throw new RuntimeException(e);
348           }
349         }
350       };
351       TestCustomScheduler scheduler = new TestCustomScheduler();
352       Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
353       firstBarrier.await();
354       assertEquals(1, scheduler.scheduleCounter.get());
355       secondBarrier.await();
356       firstBarrier.await();
357       assertEquals(2, scheduler.scheduleCounter.get());
358       shouldWait.set(false);
359       secondBarrier.await();
360       future.cancel(false);
361     }
362 
363     public void testCustomSchedulerServiceStop() throws Exception {
364       TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
365       service.startAsync().awaitRunning();
366       service.firstBarrier.await();
367       assertEquals(1, service.numIterations.get());
368       service.stopAsync();
369       service.secondBarrier.await();
370       service.awaitTerminated();
371       // Sleep for a while just to ensure that our task wasn't called again.
372       Thread.sleep(unit.toMillis(3 * delay));
373       assertEquals(1, service.numIterations.get());
374     }
375 
376     public void testBig() throws Exception {
377       TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
378         @Override protected Scheduler scheduler() {
379           return new AbstractScheduledService.CustomScheduler() {
380             @Override
381             protected Schedule getNextSchedule() throws Exception {
382               // Explicitly yield to increase the probability of a pathological scheduling.
383               Thread.yield();
384               return new Schedule(0, TimeUnit.SECONDS);
385             }
386           };
387         }
388       };
389       service.useBarriers = false;
390       service.startAsync().awaitRunning();
391       Thread.sleep(50);
392       service.useBarriers = true;
393       service.firstBarrier.await();
394       int numIterations = service.numIterations.get();
395       service.stopAsync();
396       service.secondBarrier.await();
397       service.awaitTerminated();
398       assertEquals(numIterations, service.numIterations.get());
399     }
400 
401     private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
402       final AtomicInteger numIterations = new AtomicInteger(0);
403       volatile boolean useBarriers = true;
404       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
405       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
406 
407       @Override protected void runOneIteration() throws Exception {
408         numIterations.incrementAndGet();
409         if (useBarriers) {
410           firstBarrier.await();
411           secondBarrier.await();
412         }
413       }
414 
415       @Override protected ScheduledExecutorService executor() {
416         // use a bunch of threads so that weird overlapping schedules are more likely to happen.
417         return Executors.newScheduledThreadPool(10);
418       }
419 
420       @Override protected void startUp() throws Exception {}
421 
422       @Override protected void shutDown() throws Exception {}
423 
424       @Override protected Scheduler scheduler() {
425         return new CustomScheduler() {
426           @Override
427           protected Schedule getNextSchedule() throws Exception {
428             return new Schedule(delay, unit);
429           }};
430       }
431     }
432 
433     public void testCustomSchedulerFailure() throws Exception {
434       TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
435       service.startAsync().awaitRunning();
436       for (int i = 1; i < 4; i++) {
437         service.firstBarrier.await();
438         assertEquals(i, service.numIterations.get());
439         service.secondBarrier.await();
440       }
441       Thread.sleep(1000);
442       try {
443         service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS);
444         fail();
445       } catch (IllegalStateException e) {
446         assertEquals(State.FAILED, service.state());
447       }
448     }
449 
450     private static class TestFailingCustomScheduledService extends AbstractScheduledService {
451       final AtomicInteger numIterations = new AtomicInteger(0);
452       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
453       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
454 
455       @Override protected void runOneIteration() throws Exception {
456         numIterations.incrementAndGet();
457         firstBarrier.await();
458         secondBarrier.await();
459       }
460 
461       @Override protected ScheduledExecutorService executor() {
462         // use a bunch of threads so that weird overlapping schedules are more likely to happen.
463         return Executors.newScheduledThreadPool(10);
464       }
465 
466       @Override protected Scheduler scheduler() {
467         return new CustomScheduler() {
468           @Override
469           protected Schedule getNextSchedule() throws Exception {
470             if (numIterations.get() > 2) {
471               throw new IllegalStateException("Failed");
472             }
473             return new Schedule(delay, unit);
474           }};
475       }
476     }
477   }
478 }