1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.google.common.util.concurrent;
18
19 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
20
21 import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
22
23 import java.lang.reflect.UndeclaredThrowableException;
24 import java.util.concurrent.CancellationException;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ExecutionException;
27
28
29
30
31
32
33 public class FuturesTransformAsyncFunctionTest
34 extends AbstractChainedListenableFutureTest<String> {
35 protected static final int SLOW_OUTPUT_VALID_INPUT_DATA = 2;
36 protected static final int SLOW_FUNC_VALID_INPUT_DATA = 3;
37 private static final String RESULT_DATA = "SUCCESS";
38
39 private SettableFuture<String> outputFuture;
40
41 private CountDownLatch funcIsWaitingLatch;
42
43 private CountDownLatch funcCompletionLatch;
44
45 @Override protected ListenableFuture<String> buildChainingFuture(
46 ListenableFuture<Integer> inputFuture) {
47 outputFuture = SettableFuture.create();
48 funcIsWaitingLatch = new CountDownLatch(1);
49 funcCompletionLatch = new CountDownLatch(1);
50 return Futures.transform(inputFuture, new ChainingFunction());
51 }
52
53 @Override protected String getSuccessfulResult() {
54 return RESULT_DATA;
55 }
56
57 private class ChainingFunction implements AsyncFunction<Integer, String> {
58 @Override
59 public ListenableFuture<String> apply(Integer input) {
60 switch (input) {
61 case VALID_INPUT_DATA: outputFuture.set(RESULT_DATA); break;
62 case SLOW_OUTPUT_VALID_INPUT_DATA: break;
63 case SLOW_FUNC_VALID_INPUT_DATA:
64 funcIsWaitingLatch.countDown();
65 awaitUninterruptibly(funcCompletionLatch);
66 break;
67 default: throw new UndeclaredThrowableException(EXCEPTION);
68 }
69 return outputFuture;
70 }
71 }
72
73 public void testFutureGetThrowsFunctionException() throws Exception {
74 inputFuture.set(EXCEPTION_DATA);
75 listener.assertException(EXCEPTION);
76 }
77
78 public void testFutureGetThrowsCancellationIfInputCancelled()
79 throws Exception {
80 inputFuture.cancel(true);
81 try {
82 resultFuture.get();
83 fail("Result future must throw CancellationException"
84 + " if input future is cancelled.");
85 } catch (CancellationException expected) {}
86 }
87
88 public void testFutureGetThrowsCancellationIfOutputCancelled()
89 throws Exception {
90 inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
91 outputFuture.cancel(true);
92 try {
93 resultFuture.get();
94 fail("Result future must throw CancellationException"
95 + " if function output future is cancelled.");
96 } catch (CancellationException expected) {}
97 }
98
99 public void testFutureCancelBeforeInputCompletion() throws Exception {
100 assertTrue(resultFuture.cancel(true));
101 assertTrue(resultFuture.isCancelled());
102 assertTrue(inputFuture.isCancelled());
103 assertFalse(outputFuture.isCancelled());
104 try {
105 resultFuture.get();
106 fail("Result future is cancelled and should have thrown a"
107 + " CancellationException");
108 } catch (CancellationException expected) {}
109 }
110
111 public void testFutureCancellableBeforeOutputCompletion() throws Exception {
112 inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
113 assertTrue(resultFuture.cancel(true));
114 assertTrue(resultFuture.isCancelled());
115 assertFalse(inputFuture.isCancelled());
116 assertTrue(outputFuture.isCancelled());
117 try {
118 resultFuture.get();
119 fail("Result future is cancelled and should have thrown a"
120 + " CancellationException");
121 } catch (CancellationException expected) {}
122 }
123
124 public void testFutureCancellableBeforeFunctionCompletion() throws Exception {
125
126
127 new Thread() {
128 @Override
129 public void run() {
130 inputFuture.set(SLOW_FUNC_VALID_INPUT_DATA);
131 }
132 }.start();
133 funcIsWaitingLatch.await();
134
135 assertTrue(resultFuture.cancel(true));
136 assertTrue(resultFuture.isCancelled());
137 assertFalse(inputFuture.isCancelled());
138 assertFalse(outputFuture.isCancelled());
139 try {
140 resultFuture.get();
141 fail("Result future is cancelled and should have thrown a"
142 + " CancellationException");
143 } catch (CancellationException expected) {}
144
145 funcCompletionLatch.countDown();
146 try {
147 outputFuture.get();
148 fail("The function output future is cancelled and should have thrown a"
149 + " CancellationException");
150 } catch (CancellationException expected) {}
151 }
152
153 public void testFutureCancelAfterCompletion() throws Exception {
154 inputFuture.set(VALID_INPUT_DATA);
155 assertFalse(resultFuture.cancel(true));
156 assertFalse(resultFuture.isCancelled());
157 assertFalse(inputFuture.isCancelled());
158 assertFalse(outputFuture.isCancelled());
159 assertEquals(RESULT_DATA, resultFuture.get());
160 }
161
162 public void testFutureGetThrowsRuntimeException() throws Exception {
163 BadFuture badInput = new BadFuture(Futures.immediateFuture(20));
164 ListenableFuture<String> chain = buildChainingFuture(badInput);
165 try {
166 chain.get();
167 fail("Future.get must throw an exception when the input future fails.");
168 } catch (ExecutionException e) {
169 assertSame(RuntimeException.class, e.getCause().getClass());
170 }
171 }
172
173
174
175
176 public static class BadFuture
177 extends SimpleForwardingListenableFuture<Integer> {
178 protected BadFuture(ListenableFuture<Integer> delegate) {
179 super(delegate);
180 }
181
182 @Override
183 public Integer get() {
184 throw new RuntimeException("Oops");
185 }
186 }
187 }