View Javadoc
1   /*
2    * Copyright (C) 2008 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package com.google.common.util.concurrent;
18  
19  import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
20  
21  import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
22  
23  import java.lang.reflect.UndeclaredThrowableException;
24  import java.util.concurrent.CancellationException;
25  import java.util.concurrent.CountDownLatch;
26  import java.util.concurrent.ExecutionException;
27  
28  /**
29   * Unit tests for {@link Futures#transform(ListenableFuture, AsyncFunction)}.
30   *
31   * @author Nishant Thakkar
32   */
33  public class FuturesTransformAsyncFunctionTest
34      extends AbstractChainedListenableFutureTest<String> {
35    protected static final int SLOW_OUTPUT_VALID_INPUT_DATA = 2;
36    protected static final int SLOW_FUNC_VALID_INPUT_DATA = 3;
37    private static final String RESULT_DATA = "SUCCESS";
38  
39    private SettableFuture<String> outputFuture;
40    // Signals that the function is waiting to complete
41    private CountDownLatch funcIsWaitingLatch;
42    // Signals the function so it will complete
43    private CountDownLatch funcCompletionLatch;
44  
45    @Override protected ListenableFuture<String> buildChainingFuture(
46        ListenableFuture<Integer> inputFuture) {
47      outputFuture = SettableFuture.create();
48      funcIsWaitingLatch = new CountDownLatch(1);
49      funcCompletionLatch = new CountDownLatch(1);
50      return Futures.transform(inputFuture, new ChainingFunction());
51    }
52  
53    @Override protected String getSuccessfulResult() {
54      return RESULT_DATA;
55    }
56  
57    private class ChainingFunction implements AsyncFunction<Integer, String> {
58      @Override
59      public ListenableFuture<String> apply(Integer input) {
60        switch (input) {
61          case VALID_INPUT_DATA: outputFuture.set(RESULT_DATA); break;
62          case SLOW_OUTPUT_VALID_INPUT_DATA: break;  // do nothing to the result
63          case SLOW_FUNC_VALID_INPUT_DATA:
64            funcIsWaitingLatch.countDown();
65            awaitUninterruptibly(funcCompletionLatch);
66            break;
67          default: throw new UndeclaredThrowableException(EXCEPTION);
68        }
69        return outputFuture;
70      }
71    }
72  
73    public void testFutureGetThrowsFunctionException() throws Exception {
74      inputFuture.set(EXCEPTION_DATA);
75      listener.assertException(EXCEPTION);
76    }
77  
78    public void testFutureGetThrowsCancellationIfInputCancelled()
79        throws Exception {
80      inputFuture.cancel(true); // argument is ignored
81      try {
82        resultFuture.get();
83        fail("Result future must throw CancellationException"
84            + " if input future is cancelled.");
85      } catch (CancellationException expected) {}
86    }
87  
88    public void testFutureGetThrowsCancellationIfOutputCancelled()
89        throws Exception {
90      inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
91      outputFuture.cancel(true); // argument is ignored
92      try {
93        resultFuture.get();
94        fail("Result future must throw CancellationException"
95            + " if function output future is cancelled.");
96      } catch (CancellationException expected) {}
97    }
98  
99    public void testFutureCancelBeforeInputCompletion() throws Exception {
100     assertTrue(resultFuture.cancel(true));
101     assertTrue(resultFuture.isCancelled());
102     assertTrue(inputFuture.isCancelled());
103     assertFalse(outputFuture.isCancelled());
104     try {
105       resultFuture.get();
106       fail("Result future is cancelled and should have thrown a"
107           + " CancellationException");
108     } catch (CancellationException expected) {}
109   }
110 
111   public void testFutureCancellableBeforeOutputCompletion() throws Exception {
112     inputFuture.set(SLOW_OUTPUT_VALID_INPUT_DATA);
113     assertTrue(resultFuture.cancel(true));
114     assertTrue(resultFuture.isCancelled());
115     assertFalse(inputFuture.isCancelled());
116     assertTrue(outputFuture.isCancelled());
117     try {
118       resultFuture.get();
119       fail("Result future is cancelled and should have thrown a"
120           + " CancellationException");
121     } catch (CancellationException expected) {}
122   }
123 
124   public void testFutureCancellableBeforeFunctionCompletion() throws Exception {
125     // Set the result in a separate thread since this test runs the function
126     // (which will block) in the same thread.
127     new Thread() {
128       @Override
129       public void run() {
130         inputFuture.set(SLOW_FUNC_VALID_INPUT_DATA);
131       }
132     }.start();
133     funcIsWaitingLatch.await();
134 
135     assertTrue(resultFuture.cancel(true));
136     assertTrue(resultFuture.isCancelled());
137     assertFalse(inputFuture.isCancelled());
138     assertFalse(outputFuture.isCancelled());
139     try {
140       resultFuture.get();
141       fail("Result future is cancelled and should have thrown a"
142           + " CancellationException");
143     } catch (CancellationException expected) {}
144 
145     funcCompletionLatch.countDown();  // allow the function to complete
146     try {
147       outputFuture.get();
148       fail("The function output future is cancelled and should have thrown a"
149           + " CancellationException");
150     } catch (CancellationException expected) {}
151   }
152 
153   public void testFutureCancelAfterCompletion() throws Exception {
154     inputFuture.set(VALID_INPUT_DATA);
155     assertFalse(resultFuture.cancel(true));
156     assertFalse(resultFuture.isCancelled());
157     assertFalse(inputFuture.isCancelled());
158     assertFalse(outputFuture.isCancelled());
159     assertEquals(RESULT_DATA, resultFuture.get());
160   }
161 
162   public void testFutureGetThrowsRuntimeException() throws Exception {
163     BadFuture badInput = new BadFuture(Futures.immediateFuture(20));
164     ListenableFuture<String> chain = buildChainingFuture(badInput);
165     try {
166       chain.get();
167       fail("Future.get must throw an exception when the input future fails.");
168     } catch (ExecutionException e) {
169       assertSame(RuntimeException.class, e.getCause().getClass());
170     }
171   }
172 
173   /**
174    * Proxy to throw a {@link RuntimeException} out of the {@link #get()} method.
175    */
176   public static class BadFuture
177       extends SimpleForwardingListenableFuture<Integer> {
178     protected BadFuture(ListenableFuture<Integer> delegate) {
179       super(delegate);
180     }
181 
182     @Override
183     public Integer get() {
184       throw new RuntimeException("Oops");
185     }
186   }
187 }