1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.google.common.util.concurrent;
18
19 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
20
21 import com.google.caliper.AfterExperiment;
22 import com.google.caliper.BeforeExperiment;
23 import com.google.caliper.Benchmark;
24 import com.google.caliper.Param;
25 import com.google.caliper.api.Footprint;
26 import com.google.caliper.api.VmOptions;
27 import com.google.common.base.Preconditions;
28 import com.google.common.collect.Lists;
29
30 import java.util.Queue;
31 import java.util.concurrent.ArrayBlockingQueue;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.ThreadPoolExecutor;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicInteger;
38 import java.util.logging.Level;
39 import java.util.logging.Logger;
40
41 import javax.annotation.Nullable;
42 import javax.annotation.concurrent.GuardedBy;
43
44
45
46
47 @VmOptions({"-Xms3g", "-Xmx3g"})
48 public class ExecutionListBenchmark {
49 private static final int NUM_THREADS = 10;
50
51
52 interface ExecutionListWrapper {
53 void add(Runnable runnable, Executor executor);
54 void execute();
55
56 Object getImpl();
57 }
58
59 enum Impl {
60 NEW {
61 @Override ExecutionListWrapper newExecutionList() {
62 return new ExecutionListWrapper() {
63 final ExecutionList list = new ExecutionList();
64 @Override public void add(Runnable runnable, Executor executor) {
65 list.add(runnable, executor);
66 }
67
68 @Override public void execute() {
69 list.execute();
70 }
71
72 @Override public Object getImpl() {
73 return list;
74 }
75 };
76 }
77 },
78 NEW_WITH_CAS {
79 @Override ExecutionListWrapper newExecutionList() {
80 return new ExecutionListWrapper() {
81 final ExecutionListCAS list = new ExecutionListCAS();
82 @Override public void add(Runnable runnable, Executor executor) {
83 list.add(runnable, executor);
84 }
85
86 @Override public void execute() {
87 list.execute();
88 }
89
90 @Override public Object getImpl() {
91 return list;
92 }
93 };
94 }
95 },
96 NEW_WITH_QUEUE {
97 @Override ExecutionListWrapper newExecutionList() {
98 return new ExecutionListWrapper() {
99 final NewExecutionListQueue list = new NewExecutionListQueue();
100 @Override public void add(Runnable runnable, Executor executor) {
101 list.add(runnable, executor);
102 }
103
104 @Override public void execute() {
105 list.execute();
106 }
107
108 @Override public Object getImpl() {
109 return list;
110 }
111 };
112 }
113 },
114 NEW_WITHOUT_REVERSE {
115 @Override ExecutionListWrapper newExecutionList() {
116 return new ExecutionListWrapper() {
117 final NewExecutionListWithoutReverse list = new NewExecutionListWithoutReverse();
118 @Override public void add(Runnable runnable, Executor executor) {
119 list.add(runnable, executor);
120 }
121
122 @Override public void execute() {
123 list.execute();
124 }
125
126 @Override public Object getImpl() {
127 return list;
128 }
129 };
130 }
131 },
132 OLD {
133 @Override ExecutionListWrapper newExecutionList() {
134 return new ExecutionListWrapper() {
135 final OldExecutionList list = new OldExecutionList();
136 @Override public void add(Runnable runnable, Executor executor) {
137 list.add(runnable, executor);
138 }
139
140 @Override public void execute() {
141 list.execute();
142 }
143
144 @Override public Object getImpl() {
145 return list;
146 }
147 };
148 }
149 };
150 abstract ExecutionListWrapper newExecutionList();
151 }
152
153 private ExecutorService executorService;
154 private CountDownLatch listenerLatch;
155 private ExecutionListWrapper list;
156
157 @Param Impl impl;
158 @Param({"1", "5", "10"}) int numListeners;
159
160 private final Runnable listener = new Runnable() {
161 @Override public void run() {
162 listenerLatch.countDown();
163 }
164 };
165
166 @BeforeExperiment void setUp() throws Exception {
167 executorService = new ThreadPoolExecutor(NUM_THREADS,
168 NUM_THREADS,
169 Long.MAX_VALUE,
170 TimeUnit.SECONDS,
171 new ArrayBlockingQueue<Runnable>(1000));
172 final AtomicInteger integer = new AtomicInteger();
173
174 for (int i = 0; i < NUM_THREADS * 10; i++) {
175 executorService.submit(new Runnable() {
176 @Override public void run() {
177 integer.getAndIncrement();
178 }});
179 }
180 }
181
182 @AfterExperiment void tearDown() throws Exception {
183 executorService.shutdown();
184 }
185
186 @Footprint(exclude = {Runnable.class, Executor.class})
187 public Object measureSize() {
188 list = impl.newExecutionList();
189 for (int i = 0; i < numListeners; i++) {
190 list.add(listener, directExecutor());
191 }
192 return list.getImpl();
193 }
194
195 @Benchmark int addThenExecute_singleThreaded(int reps) {
196 int returnValue = 0;
197 for (int i = 0; i < reps; i++) {
198 list = impl.newExecutionList();
199 listenerLatch = new CountDownLatch(numListeners);
200 for (int j = 0; j < numListeners; j++) {
201 list.add(listener, directExecutor());
202 returnValue += listenerLatch.getCount();
203 }
204 list.execute();
205 returnValue += listenerLatch.getCount();
206 }
207 return returnValue;
208 }
209
210 @Benchmark int executeThenAdd_singleThreaded(int reps) {
211 int returnValue = 0;
212 for (int i = 0; i < reps; i++) {
213 list = impl.newExecutionList();
214 list.execute();
215 listenerLatch = new CountDownLatch(numListeners);
216 for (int j = 0; j < numListeners; j++) {
217 list.add(listener, directExecutor());
218 returnValue += listenerLatch.getCount();
219 }
220 returnValue += listenerLatch.getCount();
221 }
222 return returnValue;
223 }
224
225 private final Runnable executeTask = new Runnable() {
226 @Override public void run() {
227 list.execute();
228 }
229 };
230
231 @Benchmark int addThenExecute_multiThreaded(final int reps) throws InterruptedException {
232 Runnable addTask = new Runnable() {
233 @Override public void run() {
234 for (int i = 0; i < numListeners; i++) {
235 list.add(listener, directExecutor());
236 }
237 }
238 };
239 int returnValue = 0;
240 for (int i = 0; i < reps; i++) {
241 list = impl.newExecutionList();
242 listenerLatch = new CountDownLatch(numListeners * NUM_THREADS);
243 for (int j = 0; j < NUM_THREADS; j++) {
244 executorService.submit(addTask);
245 }
246 executorService.submit(executeTask);
247 returnValue = (int) listenerLatch.getCount();
248 listenerLatch.await();
249 }
250 return returnValue;
251 }
252
253 @Benchmark int executeThenAdd_multiThreaded(final int reps) throws InterruptedException {
254 Runnable addTask = new Runnable() {
255 @Override public void run() {
256 for (int i = 0; i < numListeners; i++) {
257 list.add(listener, directExecutor());
258 }
259 }
260 };
261 int returnValue = 0;
262 for (int i = 0; i < reps; i++) {
263 list = impl.newExecutionList();
264 listenerLatch = new CountDownLatch(numListeners * NUM_THREADS);
265 executorService.submit(executeTask);
266 for (int j = 0; j < NUM_THREADS; j++) {
267 executorService.submit(addTask);
268 }
269 returnValue = (int) listenerLatch.getCount();
270 listenerLatch.await();
271 }
272 return returnValue;
273 }
274
275
276 private static final class OldExecutionList {
277 static final Logger log = Logger.getLogger(OldExecutionList.class.getName());
278 final Queue<OldExecutionList.RunnableExecutorPair> runnables = Lists.newLinkedList();
279 boolean executed = false;
280
281 public void add(Runnable runnable, Executor executor) {
282 Preconditions.checkNotNull(runnable, "Runnable was null.");
283 Preconditions.checkNotNull(executor, "Executor was null.");
284
285 boolean executeImmediate = false;
286
287 synchronized (runnables) {
288 if (!executed) {
289 runnables.add(new RunnableExecutorPair(runnable, executor));
290 } else {
291 executeImmediate = true;
292 }
293 }
294
295 if (executeImmediate) {
296 new RunnableExecutorPair(runnable, executor).execute();
297 }
298 }
299
300 public void execute() {
301 synchronized (runnables) {
302 if (executed) {
303 return;
304 }
305 executed = true;
306 }
307
308 while (!runnables.isEmpty()) {
309 runnables.poll().execute();
310 }
311 }
312
313 private static class RunnableExecutorPair {
314 final Runnable runnable;
315 final Executor executor;
316
317 RunnableExecutorPair(Runnable runnable, Executor executor) {
318 this.runnable = runnable;
319 this.executor = executor;
320 }
321
322 void execute() {
323 try {
324 executor.execute(runnable);
325 } catch (RuntimeException e) {
326 log.log(Level.SEVERE, "RuntimeException while executing runnable "
327 + runnable + " with executor " + executor, e);
328 }
329 }
330 }
331 }
332
333
334 private static final class NewExecutionListWithoutReverse {
335 static final Logger log = Logger.getLogger(NewExecutionListWithoutReverse.class.getName());
336
337 @GuardedBy("this")
338 private RunnableExecutorPair runnables;
339 @GuardedBy("this")
340 private boolean executed;
341
342 public void add(Runnable runnable, Executor executor) {
343 Preconditions.checkNotNull(runnable, "Runnable was null.");
344 Preconditions.checkNotNull(executor, "Executor was null.");
345
346 synchronized (this) {
347 if (!executed) {
348 runnables = new RunnableExecutorPair(runnable, executor, runnables);
349 return;
350 }
351 }
352 executeListener(runnable, executor);
353 }
354
355 public void execute() {
356 RunnableExecutorPair list;
357 synchronized (this) {
358 if (executed) {
359 return;
360 }
361 executed = true;
362 list = runnables;
363 runnables = null;
364 }
365 while (list != null) {
366 executeListener(list.runnable, list.executor);
367 list = list.next;
368 }
369 }
370
371 private static void executeListener(Runnable runnable, Executor executor) {
372 try {
373 executor.execute(runnable);
374 } catch (RuntimeException e) {
375 log.log(Level.SEVERE, "RuntimeException while executing runnable "
376 + runnable + " with executor " + executor, e);
377 }
378 }
379
380 private static final class RunnableExecutorPair {
381 final Runnable runnable;
382 final Executor executor;
383 @Nullable RunnableExecutorPair next;
384
385 RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
386 this.runnable = runnable;
387 this.executor = executor;
388 this.next = next;
389 }
390 }
391 }
392
393
394
395 private static final class NewExecutionListQueue {
396 static final Logger log = Logger.getLogger(NewExecutionListQueue.class.getName());
397
398 @GuardedBy("this")
399 private RunnableExecutorPair head;
400 @GuardedBy("this")
401 private RunnableExecutorPair tail;
402 @GuardedBy("this")
403 private boolean executed;
404
405 public void add(Runnable runnable, Executor executor) {
406 Preconditions.checkNotNull(runnable, "Runnable was null.");
407 Preconditions.checkNotNull(executor, "Executor was null.");
408
409 synchronized (this) {
410 if (!executed) {
411 RunnableExecutorPair newTail = new RunnableExecutorPair(runnable, executor);
412 if (head == null) {
413 head = newTail;
414 tail = newTail;
415 } else {
416 tail.next = newTail;
417 tail = newTail;
418 }
419 return;
420 }
421 }
422 executeListener(runnable, executor);
423 }
424
425 public void execute() {
426 RunnableExecutorPair list;
427 synchronized (this) {
428 if (executed) {
429 return;
430 }
431 executed = true;
432 list = head;
433 head = null;
434 tail = null;
435 }
436 while (list != null) {
437 executeListener(list.runnable, list.executor);
438 list = list.next;
439 }
440 }
441
442 private static void executeListener(Runnable runnable, Executor executor) {
443 try {
444 executor.execute(runnable);
445 } catch (RuntimeException e) {
446 log.log(Level.SEVERE, "RuntimeException while executing runnable "
447 + runnable + " with executor " + executor, e);
448 }
449 }
450
451 private static final class RunnableExecutorPair {
452 Runnable runnable;
453 Executor executor;
454 @Nullable RunnableExecutorPair next;
455
456 RunnableExecutorPair(Runnable runnable, Executor executor) {
457 this.runnable = runnable;
458 this.executor = executor;
459 }
460 }
461 }
462
463
464 private static final class ExecutionListCAS {
465 static final Logger log = Logger.getLogger(ExecutionListCAS.class.getName());
466
467 private static final sun.misc.Unsafe UNSAFE;
468 private static final long HEAD_OFFSET;
469
470
471
472
473
474 private static final RunnableExecutorPair NULL_PAIR = new RunnableExecutorPair(null, null);
475
476 static {
477 try {
478 UNSAFE = getUnsafe();
479 HEAD_OFFSET = UNSAFE.objectFieldOffset(ExecutionListCAS.class.getDeclaredField("head"));
480 } catch (Exception ex) {
481 throw new Error(ex);
482 }
483 }
484
485
486
487
488 private static sun.misc.Unsafe getUnsafe() {
489 try {
490 return sun.misc.Unsafe.getUnsafe();
491 } catch (SecurityException tryReflectionInstead) {}
492 try {
493 return java.security.AccessController.doPrivileged
494 (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
495 @Override public sun.misc.Unsafe run() throws Exception {
496 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
497 for (java.lang.reflect.Field f : k.getDeclaredFields()) {
498 f.setAccessible(true);
499 Object x = f.get(null);
500 if (k.isInstance(x))
501 return k.cast(x);
502 }
503 throw new NoSuchFieldError("the Unsafe");
504 }});
505 } catch (java.security.PrivilegedActionException e) {
506 throw new RuntimeException("Could not initialize intrinsics",
507 e.getCause());
508 }
509 }
510 private volatile RunnableExecutorPair head = NULL_PAIR;
511
512 public void add(Runnable runnable, Executor executor) {
513 Preconditions.checkNotNull(runnable, "Runnable was null.");
514 Preconditions.checkNotNull(executor, "Executor was null.");
515
516 RunnableExecutorPair newHead = new RunnableExecutorPair(runnable, executor);
517 RunnableExecutorPair oldHead;
518 do {
519 oldHead = head;
520 if (oldHead == null) {
521
522
523 newHead.execute();
524 return;
525 }
526
527 newHead.next = oldHead;
528 } while (!UNSAFE.compareAndSwapObject(this, HEAD_OFFSET, oldHead, newHead));
529 }
530
531 public void execute() {
532 RunnableExecutorPair stack;
533 do {
534 stack = head;
535 if (stack == null) {
536
537 return;
538 }
539
540 } while (!UNSAFE.compareAndSwapObject(this, HEAD_OFFSET, stack, null));
541
542 RunnableExecutorPair reversedStack = null;
543 while (stack != NULL_PAIR) {
544 RunnableExecutorPair head = stack;
545 stack = stack.next;
546 head.next = reversedStack;
547 reversedStack = head;
548 }
549 stack = reversedStack;
550 while (stack != null) {
551 stack.execute();
552 stack = stack.next;
553 }
554 }
555
556 private static class RunnableExecutorPair {
557 final Runnable runnable;
558 final Executor executor;
559
560 @Nullable volatile RunnableExecutorPair next;
561
562 RunnableExecutorPair(Runnable runnable, Executor executor) {
563 this.runnable = runnable;
564 this.executor = executor;
565 }
566
567 void execute() {
568 try {
569 executor.execute(runnable);
570 } catch (RuntimeException e) {
571 log.log(Level.SEVERE, "RuntimeException while executing runnable "
572 + runnable + " with executor " + executor, e);
573 }
574 }
575 }
576 }
577 }