AsyncHttpClient RequestFilter请求筛选源码解读
目录
- 序
- RequestFilter
- ThrottleRequestFilter
- ReleasePermitOnComplete
- preProcessRequest
- executeRequest
- 小结
序
本文主要研究一下AsyncHttpClient的RequestFilter
RequestFilter
org/asynchttpclient/filter/RequestFilter.Java
/** * A Filter interface that gets invoked before making an actual request. */ public interface RequestFilter { /** * An {@link org.asynchttpclient.AsyncHttpClient} will invoke {@link RequestFilter#filter} and will use the * returned {@link FilterContext#getRequest()} and {@link FilterContext#getAsyncHandler()} to continue the request * processing. * * @param ctx a {@link FilterContext} * @param <T> the handler result type * @return {@link FilterContext}. The {@link FilterContext} instance may not the same as the original one. * @throws FilterException to interrupt the filter processing. */ <T> FilterContext<T> filter(FilterContext<T> ctx) throws FilterException; }
RequestFilter定义了filter方法
ThrottleRequestFilter
org/asynchttpclient/filter/ThrottleRequestFilter.java
/** * A {@link org.asynchttpclient.filter.RequestFilter} throttles requests and block when the number of permits is reached, * waiting for the response to arrives before executing the next request. */ public class ThrottleRequestFilter implements RequestFilter { private statipythonc final Logger logger = LoggerFactory.getLogger(ThrottleRequestFilter.class); private final Semaphore available; private final int maxWait; public ThrottleRequestFilter(int maxConnections) { this(maxConnections, Integer.MAX_VALUE); } public ThrottleRequestFilter(int maxConnections, int maxWait) { this(maxConnections, maxWait, false); } public ThrottleRequestFilter(int maxConnections, int maxWait, boolean fair) { this.maxWait = maxWait; available = new Semaphore(maxConnections, fair); } /** * {@inheritDoc} */ @Override public <T> FilterContext<T> filter(FilterContext<T> ctx) throws FilterException { try { if (logger.isDebugEnabled()) { logger.debug("Current Throttling Status {}", available.availablePermits()); } if (!available.tryAcquire(maxWait, TimeUnit.MILLISECONDS)) { throw new FilterException(String.format("No slot available for processing Request %s with AsyncHandler %s", ctx.getRequest(), ctx.getAsyncHandler())); } } catch (InterruptedException e) { throw new FilterException(String.format("Interrupted Request %s with AsyncHandler %s", ctx.getRequest(), ctx.getAsyncHandler())); } return new FilterContext.FilterContextBuilder<>(ctx) .asyncHandler(ReleasePermitOnComplete.wrap(ctx.getAsyncHandler(), available)) .build(); } }
ThrottleRequestFilter实现了RequestFilter接口,它使用Semaphore来对request进行限流,限流不通过抛出FilterException,若通过则通过ReleasePermitOnComplete.wrap(ctx.getAsyncHandler(), available)包装一下asyncHandler以释放信号量ReleasePermitOnComplete
ReleasePermitOnComplete
org/asynchttpclient/filter/ReleasePermitOnComplete.java
/** * Wrapper for {@link AsyncHandler}s to release a permit on {@link编程客栈 AsyncHandler#onCompleted()}. This is done via a dynamic proxy to preserve all interfaces of the wrapped handler. */ public class ReleasePermitOnComplete { /** * Wrap handler to release the permit of the semaphore on {@link AsyncHandler#onCompleted()}. * * @param handler the handler to be wrapped * @param available the Semaphore to be released when the wrapped handler is completed * @param <T> the handler result type * @return the wrapped handler */ @SuppressWarnings("unchecked") public static <T> AsyncHandler<T> wrap(final AsyncHandler<T> handler, final Semaphore available) { Class<?> handlerClass = handler.getClass(); ClassLoader classLoader = handlerClass.getClassLoader(); Class<?>[] interfaces = allInterfaces(handlerClass); return (AsyncHandler<T>) Proxy.newproxyInstance(classLoader, interfaces, (proxy, method, args) -> { try { return method.invoke(handler, args); } finally { switch (method.getName()) { case "onCompleted": case "onThrowable": available.release(); default: } } }); } //...... }
ReleasePermitOnComplete的wrap对原来的handler进行代理,在finally里头执行available.release()
preProcessRequest
org/asynchttpclient/DefaultAsyncHttpClient.java
/** * Configure and execute the associated {@link RequestFilter}. This class * may decorate the {@link Request} and {@link AsyncHandler} * * @param fc {@link FilterContext} * @return {@link FilterContext} */ private <T> FilterContext<T> preProcessRequest(FilterContext<T> fc) throws FilterException { for (RequestFilter asyncFilter : config.getRequestFilters()) { fc = asyncFilter.filter(fc); assertNotNull(fc, "filterContext"); } Request request = fc.getRequest(); if (fc.getAsyncHandler() instanceof ResumableAsyncHandler) { request = ResumableAsyncHandler.class.cast(fc.getAsyncHandler()).adjustRequestRange(request); } if (request.getRangeOffset() != 0) { RequestBuilder builder = new RequestBuilder(request); builder.setHeader("Range", "bytes=" + request.getRangeOffset() + "-"); request = builder.build(); } fc = new FilterContext.FilterContextBuilder<>(fc).requestjavascript(request).build(); return fc; }
DefaultAsyncHttpClient的preProcessRequest方法遍历config.getRequestFilters(),挨个执行asyncFilter.filter(fc)
executeRequest
org/asynchttpclient/DefaultAsyncHttpClient.java
public <T> ListenableFuture<T> executeRequest(Request request, AsyncHandler<编程;T> handler) { if (config.getCookieStore() != null) { try { List<Cookie> cookies = config.getCookieStore().get(request.getUri()); if (!cookies.isEmpty()) { RequestBuilder requestBuilder = new RequestBuilder(request); for (Cookie cookie : cookies) { requestBuilder.addOrReplaceCookie(cookie); } request = requestBuilder.build(); } } catch (Exception e) { handler.onThrowable(e); return new ListenableFuture.CompletedFailure<>("Failed to set cookies of request", e); } } if (noRequestFilters) { return execute(request, handler); } else { FilterContext<T> fc = new FilterContext.FilterContextBuilder<T>().asyncHandler(handler).request(request).build(); try { fc = preProcessRequest(fc); } catch (Exception e) { handler.onThrowable(e); return new ListenableFuture.CompletedFailure<>("preProcessRequest failed", e); } return execute(fc.getRequest(), fc.getAsyncHandler()); } }
executeRequest方法对于noRequestFilters为false会执行preProcessRequest
小结
AsyncHttpClient的RequestFilter定义了filter方法,它有一个实现类为ThrottleRequestFilter,使用信号量用于对请求进行限流;
DefaultArOXDmsyncHttpClient的executeRequest方法对于noRequestFilters为false会执行preProcessRequest,而preProcessRequest方法遍历config.getRequestFilters(),挨个执行asyncFilter.filter(fc)。
以上就是AsyncHttpClient RequestFilter请求筛选源码解读的详细内容,更多关于AsyncHttpClient RequestFilter的资料请关注编程客栈(www.devze.com)其它相关文章!
精彩评论