View Javadoc
1   /*
2    * Copyright (C) 2013 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package com.google.common.util.concurrent;
18  
19  import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
20  
21  import com.google.caliper.AfterExperiment;
22  import com.google.caliper.BeforeExperiment;
23  import com.google.caliper.Benchmark;
24  import com.google.caliper.Param;
25  import com.google.caliper.api.Footprint;
26  import com.google.caliper.api.VmOptions;
27  import com.google.common.base.Preconditions;
28  import com.google.common.collect.Lists;
29  
30  import java.util.Queue;
31  import java.util.concurrent.ArrayBlockingQueue;
32  import java.util.concurrent.CountDownLatch;
33  import java.util.concurrent.Executor;
34  import java.util.concurrent.ExecutorService;
35  import java.util.concurrent.ThreadPoolExecutor;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicInteger;
38  import java.util.logging.Level;
39  import java.util.logging.Logger;
40  
41  import javax.annotation.Nullable;
42  import javax.annotation.concurrent.GuardedBy;
43  
44  /**
45   * Benchmarks for {@link ExecutionList}.
46   */
47  @VmOptions({"-Xms3g", "-Xmx3g"})
48  public class ExecutionListBenchmark {
49    private static final int NUM_THREADS = 10;  // make a param?
50  
51    // simple interface to wrap our two implementations.
52    interface ExecutionListWrapper {
53      void add(Runnable runnable, Executor executor);
54      void execute();
55      /** Returns the underlying implementation, useful for the Footprint benchmark. */
56      Object getImpl();
57    }
58  
59    enum Impl {
60      NEW {
61        @Override ExecutionListWrapper newExecutionList() {
62          return new ExecutionListWrapper() {
63            final ExecutionList list = new ExecutionList();
64            @Override public void add(Runnable runnable, Executor executor) {
65              list.add(runnable, executor);
66            }
67  
68            @Override public void execute() {
69              list.execute();
70            }
71  
72            @Override public Object getImpl() {
73              return list;
74            }
75          };
76        }
77      },
78      NEW_WITH_CAS {
79        @Override ExecutionListWrapper newExecutionList() {
80          return new ExecutionListWrapper() {
81            final ExecutionListCAS list = new ExecutionListCAS();
82            @Override public void add(Runnable runnable, Executor executor) {
83              list.add(runnable, executor);
84            }
85  
86            @Override public void execute() {
87              list.execute();
88            }
89  
90            @Override public Object getImpl() {
91              return list;
92            }
93          };
94        }
95      },
96      NEW_WITH_QUEUE {
97        @Override ExecutionListWrapper newExecutionList() {
98          return new ExecutionListWrapper() {
99            final NewExecutionListQueue list = new NewExecutionListQueue();
100           @Override public void add(Runnable runnable, Executor executor) {
101             list.add(runnable, executor);
102           }
103 
104           @Override public void execute() {
105             list.execute();
106           }
107 
108           @Override public Object getImpl() {
109             return list;
110           }
111         };
112       }
113     },
114     NEW_WITHOUT_REVERSE {
115       @Override ExecutionListWrapper newExecutionList() {
116         return new ExecutionListWrapper() {
117           final NewExecutionListWithoutReverse list = new NewExecutionListWithoutReverse();
118           @Override public void add(Runnable runnable, Executor executor) {
119             list.add(runnable, executor);
120           }
121 
122           @Override public void execute() {
123             list.execute();
124           }
125 
126           @Override public Object getImpl() {
127             return list;
128           }
129         };
130       }
131     },
132     OLD {
133       @Override ExecutionListWrapper newExecutionList() {
134         return new ExecutionListWrapper() {
135           final OldExecutionList list = new OldExecutionList();
136           @Override public void add(Runnable runnable, Executor executor) {
137             list.add(runnable, executor);
138           }
139 
140           @Override public void execute() {
141             list.execute();
142           }
143 
144           @Override public Object getImpl() {
145             return list;
146           }
147         };
148       }
149     };
150     abstract ExecutionListWrapper newExecutionList();
151   }
152 
153   private ExecutorService executorService;
154   private CountDownLatch listenerLatch;
155   private ExecutionListWrapper list;
156 
157   @Param Impl impl;
158   @Param({"1", "5", "10"}) int numListeners;
159 
160   private final Runnable listener = new Runnable() {
161     @Override public void run() {
162       listenerLatch.countDown();
163     }
164   };
165 
166   @BeforeExperiment void setUp() throws Exception {
167     executorService = new ThreadPoolExecutor(NUM_THREADS,
168         NUM_THREADS,
169         Long.MAX_VALUE,
170         TimeUnit.SECONDS,
171         new ArrayBlockingQueue<Runnable>(1000));
172     final AtomicInteger integer = new AtomicInteger();
173     // Execute a bunch of tasks to ensure that our threads are allocated and hot
174     for (int i = 0; i < NUM_THREADS * 10; i++) {
175       executorService.submit(new Runnable() {
176         @Override public void run() {
177           integer.getAndIncrement();
178         }});
179     }
180   }
181 
182   @AfterExperiment void tearDown() throws Exception {
183     executorService.shutdown();
184   }
185 
186   @Footprint(exclude = {Runnable.class, Executor.class})
187   public Object measureSize() {
188     list = impl.newExecutionList();
189     for (int i = 0; i < numListeners; i++) {
190       list.add(listener, directExecutor());
191     }
192     return list.getImpl();
193   }
194 
195   @Benchmark int addThenExecute_singleThreaded(int reps) {
196     int returnValue = 0;
197     for (int i = 0; i < reps; i++) {
198       list = impl.newExecutionList();
199       listenerLatch = new CountDownLatch(numListeners);
200       for (int j = 0; j < numListeners; j++) {
201         list.add(listener, directExecutor());
202         returnValue += listenerLatch.getCount();
203       }
204       list.execute();
205       returnValue += listenerLatch.getCount();
206     }
207     return returnValue;
208   }
209 
210   @Benchmark int executeThenAdd_singleThreaded(int reps) {
211     int returnValue = 0;
212     for (int i = 0; i < reps; i++) {
213       list = impl.newExecutionList();
214       list.execute();
215       listenerLatch = new CountDownLatch(numListeners);
216       for (int j = 0; j < numListeners; j++) {
217         list.add(listener, directExecutor());
218         returnValue += listenerLatch.getCount();
219       }
220       returnValue += listenerLatch.getCount();
221     }
222     return returnValue;
223   }
224 
225   private final Runnable executeTask = new Runnable() {
226     @Override public void run() {
227       list.execute();
228     }
229   };
230 
231   @Benchmark int addThenExecute_multiThreaded(final int reps) throws InterruptedException {
232     Runnable addTask = new Runnable() {
233       @Override public void run() {
234         for (int i = 0; i < numListeners; i++) {
235           list.add(listener, directExecutor());
236         }
237       }
238     };
239     int returnValue = 0;
240     for (int i = 0; i < reps; i++) {
241       list = impl.newExecutionList();
242       listenerLatch = new CountDownLatch(numListeners * NUM_THREADS);
243       for (int j = 0; j < NUM_THREADS; j++) {
244         executorService.submit(addTask);
245       }
246       executorService.submit(executeTask);
247       returnValue = (int) listenerLatch.getCount();
248       listenerLatch.await();
249     }
250     return returnValue;
251   }
252 
253   @Benchmark int executeThenAdd_multiThreaded(final int reps) throws InterruptedException {
254     Runnable addTask = new Runnable() {
255       @Override public void run() {
256         for (int i = 0; i < numListeners; i++) {
257           list.add(listener, directExecutor());
258         }
259       }
260     };
261     int returnValue = 0;
262     for (int i = 0; i < reps; i++) {
263       list = impl.newExecutionList();
264       listenerLatch = new CountDownLatch(numListeners * NUM_THREADS);
265       executorService.submit(executeTask);
266       for (int j = 0; j < NUM_THREADS; j++) {
267         executorService.submit(addTask);
268       }
269       returnValue = (int) listenerLatch.getCount();
270       listenerLatch.await();
271     }
272     return returnValue;
273   }
274 
275   // This is the old implementation of ExecutionList using a LinkedList.
276   private static final class OldExecutionList {
277     static final Logger log = Logger.getLogger(OldExecutionList.class.getName());
278     final Queue<OldExecutionList.RunnableExecutorPair> runnables = Lists.newLinkedList();
279     boolean executed = false;
280 
281     public void add(Runnable runnable, Executor executor) {
282       Preconditions.checkNotNull(runnable, "Runnable was null.");
283       Preconditions.checkNotNull(executor, "Executor was null.");
284 
285       boolean executeImmediate = false;
286 
287       synchronized (runnables) {
288         if (!executed) {
289           runnables.add(new RunnableExecutorPair(runnable, executor));
290         } else {
291           executeImmediate = true;
292         }
293       }
294 
295       if (executeImmediate) {
296         new RunnableExecutorPair(runnable, executor).execute();
297       }
298     }
299 
300     public void execute() {
301       synchronized (runnables) {
302         if (executed) {
303           return;
304         }
305         executed = true;
306       }
307 
308       while (!runnables.isEmpty()) {
309         runnables.poll().execute();
310       }
311     }
312 
313     private static class RunnableExecutorPair {
314       final Runnable runnable;
315       final Executor executor;
316 
317       RunnableExecutorPair(Runnable runnable, Executor executor) {
318         this.runnable = runnable;
319         this.executor = executor;
320       }
321 
322       void execute() {
323         try {
324           executor.execute(runnable);
325         } catch (RuntimeException e) {
326           log.log(Level.SEVERE, "RuntimeException while executing runnable "
327               + runnable + " with executor " + executor, e);
328         }
329       }
330     }
331   }
332 
333   // A version of the execution list that doesn't reverse the stack in execute().
334   private static final class NewExecutionListWithoutReverse {
335     static final Logger log = Logger.getLogger(NewExecutionListWithoutReverse.class.getName());
336 
337     @GuardedBy("this")
338     private RunnableExecutorPair runnables;
339     @GuardedBy("this")
340     private boolean executed;
341 
342     public void add(Runnable runnable, Executor executor) {
343       Preconditions.checkNotNull(runnable, "Runnable was null.");
344       Preconditions.checkNotNull(executor, "Executor was null.");
345 
346       synchronized (this) {
347         if (!executed) {
348           runnables = new RunnableExecutorPair(runnable, executor, runnables);
349           return;
350         }
351       }
352       executeListener(runnable, executor);
353     }
354 
355     public void execute() {
356       RunnableExecutorPair list;
357       synchronized (this) {
358         if (executed) {
359           return;
360         }
361         executed = true;
362         list = runnables;
363         runnables = null;  // allow GC to free listeners even if this stays around for a while.
364       }
365       while (list != null) {
366         executeListener(list.runnable, list.executor);
367         list = list.next;
368       }
369     }
370 
371     private static void executeListener(Runnable runnable, Executor executor) {
372       try {
373         executor.execute(runnable);
374       } catch (RuntimeException e) {
375         log.log(Level.SEVERE, "RuntimeException while executing runnable "
376             + runnable + " with executor " + executor, e);
377       }
378     }
379 
380     private static final class RunnableExecutorPair {
381       final Runnable runnable;
382       final Executor executor;
383       @Nullable RunnableExecutorPair next;
384 
385       RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
386         this.runnable = runnable;
387         this.executor = executor;
388         this.next = next;
389       }
390     }
391   }
392 
393   // A version of the ExecutionList that uses an explicit tail pointer to keep the nodes in order
394   // rather than flipping the stack in execute().
395   private static final class NewExecutionListQueue {
396     static final Logger log = Logger.getLogger(NewExecutionListQueue.class.getName());
397 
398     @GuardedBy("this")
399     private RunnableExecutorPair head;
400     @GuardedBy("this")
401     private RunnableExecutorPair tail;
402     @GuardedBy("this")
403     private boolean executed;
404 
405     public void add(Runnable runnable, Executor executor) {
406       Preconditions.checkNotNull(runnable, "Runnable was null.");
407       Preconditions.checkNotNull(executor, "Executor was null.");
408 
409       synchronized (this) {
410         if (!executed) {
411           RunnableExecutorPair newTail = new RunnableExecutorPair(runnable, executor);
412           if (head == null) {
413             head = newTail;
414             tail = newTail;
415           } else {
416             tail.next = newTail;
417             tail = newTail;
418           }
419           return;
420         }
421       }
422       executeListener(runnable, executor);
423     }
424 
425     public void execute() {
426       RunnableExecutorPair list;
427       synchronized (this) {
428         if (executed) {
429           return;
430         }
431         executed = true;
432         list = head;
433         head = null;  // allow GC to free listeners even if this stays around for a while.
434         tail = null;
435       }
436       while (list != null) {
437         executeListener(list.runnable, list.executor);
438         list = list.next;
439       }
440     }
441 
442     private static void executeListener(Runnable runnable, Executor executor) {
443       try {
444         executor.execute(runnable);
445       } catch (RuntimeException e) {
446         log.log(Level.SEVERE, "RuntimeException while executing runnable "
447             + runnable + " with executor " + executor, e);
448       }
449     }
450 
451     private static final class RunnableExecutorPair {
452       Runnable runnable;
453       Executor executor;
454       @Nullable RunnableExecutorPair next;
455 
456       RunnableExecutorPair(Runnable runnable, Executor executor) {
457         this.runnable = runnable;
458         this.executor = executor;
459       }
460     }
461   }
462 
463   // A version of the list that uses compare and swap to manage the stack without locks.
464   private static final class ExecutionListCAS {
465     static final Logger log = Logger.getLogger(ExecutionListCAS.class.getName());
466 
467     private static final sun.misc.Unsafe UNSAFE;
468     private static final long HEAD_OFFSET;
469 
470     /**
471      * A special instance of {@link RunnableExecutorPair} that is used as a sentinel value for the
472      * bottom of the stack.
473      */
474     private static final RunnableExecutorPair NULL_PAIR = new RunnableExecutorPair(null, null);
475 
476     static {
477       try {
478         UNSAFE = getUnsafe();
479         HEAD_OFFSET = UNSAFE.objectFieldOffset(ExecutionListCAS.class.getDeclaredField("head"));
480       } catch (Exception ex) {
481         throw new Error(ex);
482       }
483     }
484 
485     /**
486      * TODO(user):  This was copied verbatim from Striped64.java... standardize this?
487      */
488     private static sun.misc.Unsafe getUnsafe() {
489         try {
490             return sun.misc.Unsafe.getUnsafe();
491         } catch (SecurityException tryReflectionInstead) {}
492         try {
493             return java.security.AccessController.doPrivileged
494             (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
495                 @Override public sun.misc.Unsafe run() throws Exception {
496                     Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
497                     for (java.lang.reflect.Field f : k.getDeclaredFields()) {
498                         f.setAccessible(true);
499                         Object x = f.get(null);
500                         if (k.isInstance(x))
501                             return k.cast(x);
502                     }
503                     throw new NoSuchFieldError("the Unsafe");
504                 }});
505         } catch (java.security.PrivilegedActionException e) {
506             throw new RuntimeException("Could not initialize intrinsics",
507                                        e.getCause());
508         }
509     }
510     private volatile RunnableExecutorPair head = NULL_PAIR;
511 
512     public void add(Runnable runnable, Executor executor) {
513       Preconditions.checkNotNull(runnable, "Runnable was null.");
514       Preconditions.checkNotNull(executor, "Executor was null.");
515 
516       RunnableExecutorPair newHead = new RunnableExecutorPair(runnable, executor);
517       RunnableExecutorPair oldHead;
518       do {
519         oldHead = head;
520         if (oldHead == null) {
521           // If runnables == null then execute() has been called so we should just execute our
522           // listener immediately.
523           newHead.execute();
524           return;
525         }
526         // Try to make newHead the new head of the stack at runnables.
527         newHead.next = oldHead;
528       } while (!UNSAFE.compareAndSwapObject(this, HEAD_OFFSET, oldHead, newHead));
529     }
530 
531     public void execute() {
532       RunnableExecutorPair stack;
533       do {
534         stack = head;
535         if (stack == null) {
536           // If head == null then execute() has been called so we should just return
537           return;
538         }
539         // try to swap null into head.
540       } while (!UNSAFE.compareAndSwapObject(this, HEAD_OFFSET, stack, null));
541 
542       RunnableExecutorPair reversedStack = null;
543       while (stack != NULL_PAIR) {
544         RunnableExecutorPair head = stack;
545         stack = stack.next;
546         head.next = reversedStack;
547         reversedStack = head;
548       }
549       stack = reversedStack;
550       while (stack != null) {
551         stack.execute();
552         stack = stack.next;
553       }
554     }
555 
556     private static class RunnableExecutorPair {
557       final Runnable runnable;
558       final Executor executor;
559       // Volatile because this is written on one thread and read on another with no synchronization.
560       @Nullable volatile RunnableExecutorPair next;
561 
562       RunnableExecutorPair(Runnable runnable, Executor executor) {
563         this.runnable = runnable;
564         this.executor = executor;
565       }
566 
567       void execute() {
568         try {
569           executor.execute(runnable);
570         } catch (RuntimeException e) {
571           log.log(Level.SEVERE, "RuntimeException while executing runnable "
572               + runnable + " with executor " + executor, e);
573         }
574       }
575     }
576   }
577 }