开发者

AsyncHttpClient ListenableFuture源码流程解读

目录
  • ListenableFuture
  • CompletedFailure
  • NettyResponseFuture
    • done
    • abort
    • touch
    • addListener
    • toCompletableFuture
  • newNettyResponseFuture
    • 小结

      本文主要研究一下AsyncHttpClient的ListenableFuture

      ListenableFuture

      org/asynchttpclient/ListenableFuture.Java

      public interface ListenableFuture<V> extends Future<V> {
        /**
         * Terminate and if there is no exception, mark this Future as done and release the internal lock.
         */
        void done();
        /**
         * Abort the current processing, and propagate the {@link Throwable} to the {@link AsyncHandler} or {@link Future}
         *
         * @param t the exception
         */
        void abort(Throwable t);
        /**
         * Touch the current instance to prevent external service to times out.
         */
        void touch();
        /**
         * Adds a listener and executor to the ListenableFuture.
         * The listener will be {@linkplain java.util.concurrent.Executor#execute(Runnable) passed
         * to the executor} for execution when the {@code Future}'s computation is
         * {@linkplain Future#isDone() complete}.
         * <br>
         * Executor can be <code>null</code>, in that case executor will be executed
         * in the thread where completion happens.
         * <br>
         * There is no guaranteed ordering of execution of listeners, they may get
         * called in the order they were added and they may get called out of order,
         * but any listener added through this method is guaranteed to be called once
         * the computation is complete.
         *
         * @param listener the listener to run when the compCCXnglvHvdutation is complete.
         * @param exec     the executor to run the listener in.
         * @return this Future
         */
        ListenableFuture<V> addListener(Runnable listener, Executor exec);
        CompletableFuture<V> toCompletableFuture();
        //......
      }
      ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法

      CompletedFailure

      org/asynchttpclient/ListenableFuture.java

      class CompletedFailure<T> implements ListenableFuture<T> {
          private final ExecutionException e;
          public CompletedFailure(Throwable t) {
            e = new ExecutionException(t);
          }
          public CompletedFailure(String message, Throwable t) {
            e = new ExecutionException(message, t);
          }
          @Override
          public boolean cancel(boolean mayInterruptIfRunning) {
            return true;
          }
          @Override
          public boolean isCancelled() {
            return false;
          }
          @Override
          public boolean isDone() {
            return true;
          }
          @Override
          public T get() throws ExecutionException {
           javascript throw e;
          }
          @Override
          public T get(long timeout, TimeUnit unit) throws ExecutionException {
            throw e;
          }
          @Override
          public void done() {
          }
          @Override
          public void abort(Throwable t) {
          }
          @Override
          public void touch() {
          }
          @Override
          public ListenableFuture<T> addListener(Runnable listener, Executor exec) {
            if (exec != null) {
              exec.execute(listener);
            } else {
              listener.run();
            }
            return this;
          }
          @Override
          public CompletableFuture<T> toCompletableFuture() {
            CompletableFuture<T> future = new CompletableFuture<>();
            future.completeExceptionally(e);
            return future;
          }
        }
      CompletedFailure实现了ListenableFuture接口,其cancel方法返回true、isDone返回true

      NettyResponseFuture

      org/asynchttpclient/netty/NettyResponseFuture.java

      public final class NettyResponseFuture<V> implements ListenableFuture<V> {
        private static final Logger LOGGER = LoggerFactory.getLogger(NettyResponseFuture.class);
        @SuppressWarnings("rawtypes")
        private static final AtomicIntegerFieldUpdater<NettyResponseFuture> REDIRECT_COUNT_UPDATER = AtomicIntegerFieldUpdater
                .newUpdater(NettyResponseFuture.class, "redirectCount");
        @SuppressWarnings("rawtypes")
        private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CURRENT_RETRY_UPDATER = AtomicIntegerFieldUpdater
                .newUpdater编程客栈(NettyResponseFuture.class, "currentRetry");
        @SuppressWarnings("rawtypes")
        private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_DONE_FIELD = AtomicIntegerFieldUpdater
                .newUpdater(NettyResponseFuture.class, "isDone");
        @SuppressWarnings("rawtypes")
        private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_CANCELLED_FIELD = AtomicIntegerFieldUpdater
                .newUpdater(NettyResponseFuture.class, "isCancelled");
        @SuppressWarnings("rawtypes")
        private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_AUTH_FIELD = AtomicIntegerFieldUpdater
                .newUpdater(NettyResponseFuture.class, "inAuth");
        @SuppressWarnings("rawtypes")
        private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_PROXY_AUTH_FIELD = AtomicIntegerFieldUpdater
                .newUpdater(NettyResponseFuture.class, "inProxyAuth");
        @SuppressWarnings("rawtypes")
        private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CONTENT_PROCESSED_FIELD = AtomicIntegerFieldUpdater
                .newUpdater(NettyResponseFuture.class, "contentProcessed");
        @SuppressWarnings("rawtypes")
        private static final AtomicIntegerFieldUpdater<NettyResponseFuture> ON_THROWABLE_CALLED_FIELD = AtomicIntegerFieldUpdater
                .newUpdater(NettyResponseFuture.class, "onThrowableCalled");
        @SuppressWarnings("rawtypes")
        private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> TIMEOUTS_HOLDER_FIELD = AtomicReferenceFieldUpdater
                .newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder");
        @SuppressWarnings("rawtypes")
        private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> PARTITION_KEY_LOCK_FIELD = AtomicReferenceFieldUpdater
                .newUpdater(NettyResponseFuture.class, Object.class, "partitionKeyLock");
        private final long start = unpreciseMillisTime();
        private final ChannelPoolPartitioning connectionPoolPartitioning;
        private final ConnectionSemaphore connectionSemaphore;
        private final ProxyServer proxyServer;
        private final int maxRetry;
        private final CompletableFuture<V> future = new CompletableFuture<>();          
                //......
        @Override
        public V get() throws InterruptedException, ExecutionException {
          return future.get();
        }
        @Override
        public V get(long l, TimeUnit tu) throws InterruptedException, TimeoutException, ExecutionException {
          return future.get(l, tu);
        }          
      }
      NettyResponseFuture实现了ListenableFuture接口

      done

      public final void done() {
          if (terminateAndExit())
            return;
          try {
            loadContent();
          } catch (ExecutionException ignored) {
          } catch (RuntimeException t) {
            future.completeExceptionally(t);
          } catch (Throwable t) {
            future.completeExceptionally(t);
            throw t;
          }
        }
        private boolean terminateAndExit() {
          releasePartitionKeyLock();
          cancelTimeouts();
          this.channel = null;
          this.reuseChannel = false;
          return IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0;
        }  
      private void loadContent() throws ExecutionException {
          if (future.isDone()) {
            try {
              future.get();
            } catch (InterruptedException e) {
              throw new RuntimeException("unreachable", e);
            }
          }
          // No more retry
          CURRENT_RETRY_UPDATER.set(this, maxRetry);
          if (CONTENT_PROCESSED_FIELD.getAndSet(this, 1) == 0) {
            try {
              future.complete(asyncHandler.onCompleted());
            } catch (Throwable ex) {
              if (ON_THROWABLE_CALLED_FIELD.getAndSet(this, 1) == 0) {
                try {
                  try {
                    asyncHandler.onThrowable(ex);
                  } catch (Throwable t) {
                    LOGGER.debug("asyncHandler.onThrowable", t);
                  }
                } finally {
                  cancelTimeouts();
                }
              }
              future.completeExceptionally(ex);
            }
          }
          future.getNow(null);
        }
      done方法对于terminateAndExit返回true的直接返回,否则执行loadContent,它对于future.isDone()的执行future.get(),然后执行future.complete(asyncHandler.onCompleted())回调

      abort

      public final void abort(final Throwable t) {
          if (terminateAndExit())
            return;
          future.completeExceptionally(t);
          if (ON_THROWABLE_CALLED_FIELD.compareAndSet(this, 0, 1)) {
            try {
              asyncHandler.onThrowable(t);
            } catch (Throwable te) {
              LOGGER.debug("asyncHandler.onThrowable", te);
            }
          }
        }
      abort方法也是对于terminateAndExit返回true的直接返回,否则执行future.completeExceptionally(t),然后触发asyncHandler.onThrowable(t)回调

      touch

      public void touch() {
          touch = unpreciseMillisTime();
        }
      touch方法用当前时间戳更新touch属性

      addListener

      public ListenableFuture<V> addListener(Runnable listener, Executor exec) {
          if (exec == null) {
            exec = Runnable::run;
          }
          future.whenCompleteAsync((r, v) -> listener.ruCCXnglvHvdn(), exec);
          return this;
        }
      addListener方法会执行future.whenCompleteAsync((r, v) -> listener.run(), exec)

      toCompletableFuture

      public CompletableFuture<V> toCompletableFuture() {
          return future;
        }
      toCompletableFuture方法直接返回future

      newNettyResponseFuture

      org/asynchttpclient/netty/request/NettyRequestSender.java

      private <T> NettyResponseFuture<T> newNettyResponseFuture(Request request,
                                                                  AsyncHandler<T> asyncHandler,
                                                                  NettyRequest nettyRequest,
                                                                  ProxyServer proxyServer) {
          NettyResponseFuture<T> future = new NettyResponseFuture<>(
                  request,
                  asyncHandler,
                  nettyRequest,
                  config.getMaxRequestRetry(),
                  request.getChannelPoolPartitioning(),
                  connectionSemaphore,
                  proxyServer);
          String expectHeader = request.getHeaders().get(EXPECT);
          if (HttpHeaderValues.CONTINUE.contentEqualsIgnoreCase(expectHeader))
            future.setDontWriteBodyBecauseExpectContinue(true);
          return future;
        }
        private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(Request request,
                                                                           AsyncHandler<T> asyncHandler,
                                                                           NettyResponseFuture<T> future,
                                                                           ProxyServer proxyServer,
                                                                           boolean performConnectRequest) {
          NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer,
                  performConnectRequest);
          Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler);
          return Channels.isChannelActive(channel)
                  ? sendRequestWithOpenChannel(newFuture, asyncHandler, channel)
                  : sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler);
        }
      NettyRequestSender的newNettyResponseFuture创建的是NettyResponseFuture;sendRequestWithCertainForceConnect则将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求

      小结

      AsyncHttpClientandroid的ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法;它有两个实现类,分别是CompletedFailure及NettyResponseFuture;NettyRequestSender的sendRequest方法将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求。

      以上就是聊聊AsyncHttpClient的ListenableFuture的详细内容,更多关于AsyncHttpClient的ListenableFuture的资料请关注编程客栈(www.devze.com)其它相关文章!

      以上就是AsyncHttpClient ListenableFuture源码流程解读的详细内容,更多关于AsyncHttpClient ListenableFuture的资料请关注编程客栈(www.devze.com)其它相关文章!

      0

      上一篇:

      下一篇:

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      最新开发

      开发排行榜