View Javadoc
1   /*
2    * Copyright (C) 2008 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package com.google.common.util.concurrent;
18  
19  import com.google.common.collect.ImmutableList;
20  import com.google.common.collect.Lists;
21  import com.google.common.collect.Queues;
22  
23  import junit.framework.TestCase;
24  
25  import java.util.List;
26  import java.util.Queue;
27  import java.util.concurrent.CyclicBarrier;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.RejectedExecutionException;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  import java.util.concurrent.atomic.AtomicInteger;
35  
36  /**
37   * Tests {@link SerializingExecutor}.
38   *
39   * @author JJ Furman
40   */
41  public class SerializingExecutorTest extends TestCase {
42    private static class FakeExecutor implements Executor {
43      Queue<Runnable> tasks = Queues.newArrayDeque();
44      @Override public void execute(Runnable command) {
45        tasks.add(command);
46      }
47  
48      boolean hasNext() {
49        return !tasks.isEmpty();
50      }
51  
52      void runNext() {
53        assertTrue("expected at least one task to run", hasNext());
54        tasks.remove().run();
55      }
56  
57    }
58    private FakeExecutor fakePool;
59    private SerializingExecutor e;
60  
61    @Override
62    public void setUp() {
63      fakePool = new FakeExecutor();
64      e = new SerializingExecutor(fakePool);
65    }
66  
67    public void testSerializingNullExecutor_fails() {
68      try {
69        new SerializingExecutor(null);
70        fail("Should have failed with NullPointerException.");
71      } catch (NullPointerException expected) {
72      }
73    }
74  
75    public void testBasics() {
76      final AtomicInteger totalCalls = new AtomicInteger();
77      Runnable intCounter = new Runnable() {
78        @Override
79        public void run() {
80          totalCalls.incrementAndGet();
81        }
82      };
83  
84      assertFalse(fakePool.hasNext());
85      e.execute(intCounter);
86      assertTrue(fakePool.hasNext());
87      e.execute(intCounter);
88      assertEquals(0, totalCalls.get());
89      fakePool.runNext(); // run just 1 sub task...
90      assertEquals(2, totalCalls.get());
91      assertFalse(fakePool.hasNext());
92  
93      // Check that execute can be safely repeated
94      e.execute(intCounter);
95      e.execute(intCounter);
96      e.execute(intCounter);
97      assertEquals(2, totalCalls.get());
98      fakePool.runNext();
99      assertEquals(5, totalCalls.get());
100     assertFalse(fakePool.hasNext());
101   }
102 
103   public void testOrdering() {
104     final List<Integer> callOrder = Lists.newArrayList();
105 
106     class FakeOp implements Runnable {
107       final int op;
108 
109       FakeOp(int op) {
110         this.op = op;
111       }
112 
113       @Override
114       public void run() {
115         callOrder.add(op);
116       }
117     }
118 
119     e.execute(new FakeOp(0));
120     e.execute(new FakeOp(1));
121     e.execute(new FakeOp(2));
122     fakePool.runNext();
123 
124     assertEquals(ImmutableList.of(0, 1, 2), callOrder);
125   }
126 
127   public void testExceptions() {
128 
129     final AtomicInteger numCalls = new AtomicInteger();
130 
131     Runnable runMe = new Runnable() {
132       @Override
133       public void run() {
134         numCalls.incrementAndGet();
135         throw new RuntimeException("FAKE EXCEPTION!");
136       }
137     };
138 
139     e.execute(runMe);
140     e.execute(runMe);
141     fakePool.runNext();
142 
143     assertEquals(2, numCalls.get());
144   }
145 
146   public void testDelegateRejection() {
147     final AtomicInteger numCalls = new AtomicInteger();
148     final AtomicBoolean reject = new AtomicBoolean(true);
149     final SerializingExecutor executor = new SerializingExecutor(
150         new Executor() {
151           @Override public void execute(Runnable r) {
152             if (reject.get()) {
153               throw new RejectedExecutionException();
154             }
155             r.run();
156           }
157         });
158     Runnable task = new Runnable() {
159       @Override
160       public void run() {
161         numCalls.incrementAndGet();
162       }
163     };
164     try {
165       executor.execute(task);
166       fail();
167     } catch (RejectedExecutionException expected) {}
168     assertEquals(0, numCalls.get());
169     reject.set(false);
170     executor.execute(task);
171     assertEquals(2, numCalls.get());
172   }
173 
174   public void testTaskThrowsError() throws Exception {
175     class MyError extends Error {}
176     final CyclicBarrier barrier = new CyclicBarrier(2);
177     // we need to make sure the error gets thrown on a different thread.
178     ExecutorService service = Executors.newSingleThreadExecutor();
179     try {
180       final SerializingExecutor executor = new SerializingExecutor(service);
181       Runnable errorTask = new Runnable() {
182         @Override
183         public void run() {
184           throw new MyError();
185         }
186       };
187       Runnable barrierTask = new Runnable() {
188         @Override
189         public void run() {
190           try {
191             barrier.await();
192           } catch (Exception e) {
193             throw new RuntimeException(e);
194           }
195         }
196       };
197       executor.execute(errorTask);
198       service.execute(barrierTask);  // submit directly to the service
199       // the barrier task runs after the error task so we know that the error has been observed by
200       // SerializingExecutor by the time the barrier is satified
201       barrier.await(10, TimeUnit.SECONDS);
202       executor.execute(barrierTask);
203       // timeout means the second task wasn't even tried
204       barrier.await(10, TimeUnit.SECONDS);
205     } finally {
206       service.shutdown();
207     }
208   }
209 }