diff --git a/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java b/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java index 216dc4ed67..e6e9a1d913 100644 --- a/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java +++ b/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java @@ -277,6 +277,62 @@ public interface AsyncHttpClientConfig { */ boolean isFilterInsecureCipherSuites(); + /** + * @return true if HTTP/2 is enabled (negotiated via ALPN for HTTPS connections) + */ + default boolean isHttp2Enabled() { + return true; + } + + /** + * @return the HTTP/2 initial window size in bytes, defaults to 65535 + */ + default int getHttp2InitialWindowSize() { + return 65_535; + } + + /** + * @return the HTTP/2 max frame size in bytes, must be between 16384 and 16777215 per RFC 7540 §4.2 + */ + default int getHttp2MaxFrameSize() { + return 16_384; + } + + /** + * @return the HTTP/2 HPACK header table size in bytes, defaults to 4096 + */ + default int getHttp2HeaderTableSize() { + return 4_096; + } + + /** + * @return the HTTP/2 max header list size in bytes, defaults to 8192 + */ + default int getHttp2MaxHeaderListSize() { + return 8_192; + } + + /** + * @return the HTTP/2 max concurrent streams per connection, -1 means unlimited (server-controlled) + */ + default int getHttp2MaxConcurrentStreams() { + return -1; + } + + /** + * @return the interval between HTTP/2 PING keepalive frames, {@link Duration#ZERO} disables pinging + */ + default Duration getHttp2PingInterval() { + return Duration.ZERO; + } + + /** + * @return true if cleartext HTTP/2 (h2c) via prior knowledge is enabled for non-TLS connections + */ + default boolean isHttp2CleartextEnabled() { + return false; + } + /** * @return the size of the SSL session cache, 0 means using the default value */ diff --git a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java index 1c7dbf37f8..4500d0a24f 100644 --- a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java +++ b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java @@ -101,6 +101,13 @@ import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultUserAgent; import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultValidateResponseHeaders; import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultWebSocketMaxBufferSize; +import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2CleartextEnabled; +import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2HeaderTableSize; +import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2InitialWindowSize; +import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2MaxConcurrentStreams; +import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2MaxFrameSize; +import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2MaxHeaderListSize; +import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2PingInterval; import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultWebSocketMaxFrameSize; /** @@ -166,6 +173,14 @@ public class DefaultAsyncHttpClientConfig implements AsyncHttpClientConfig { private final int sslSessionTimeout; private final @Nullable SslContext sslContext; private final @Nullable SslEngineFactory sslEngineFactory; + private final boolean http2Enabled; + private final int http2InitialWindowSize; + private final int http2MaxFrameSize; + private final int http2HeaderTableSize; + private final int http2MaxHeaderListSize; + private final int http2MaxConcurrentStreams; + private final Duration http2PingInterval; + private final boolean http2CleartextEnabled; // filters private final List requestFilters; @@ -253,6 +268,14 @@ private DefaultAsyncHttpClientConfig(// http int sslSessionTimeout, @Nullable SslContext sslContext, @Nullable SslEngineFactory sslEngineFactory, + boolean http2Enabled, + int http2InitialWindowSize, + int http2MaxFrameSize, + int http2HeaderTableSize, + int http2MaxHeaderListSize, + int http2MaxConcurrentStreams, + Duration http2PingInterval, + boolean http2CleartextEnabled, // filters List requestFilters, @@ -348,6 +371,14 @@ private DefaultAsyncHttpClientConfig(// http this.sslSessionTimeout = sslSessionTimeout; this.sslContext = sslContext; this.sslEngineFactory = sslEngineFactory; + this.http2Enabled = http2Enabled; + this.http2InitialWindowSize = http2InitialWindowSize; + this.http2MaxFrameSize = http2MaxFrameSize; + this.http2HeaderTableSize = http2HeaderTableSize; + this.http2MaxHeaderListSize = http2MaxHeaderListSize; + this.http2MaxConcurrentStreams = http2MaxConcurrentStreams; + this.http2PingInterval = http2PingInterval; + this.http2CleartextEnabled = http2CleartextEnabled; // filters this.requestFilters = requestFilters; @@ -382,6 +413,14 @@ private DefaultAsyncHttpClientConfig(// http throw new IllegalArgumentException("Native Transport must be enabled to use Epoll Native Transport only"); } + if (http2MaxFrameSize < 16384 || http2MaxFrameSize > 16777215) { + throw new IllegalArgumentException("HTTP/2 max frame size must be between 16384 and 16777215 per RFC 7540 §4.2"); + } + + if (http2InitialWindowSize < 0) { + throw new IllegalArgumentException("HTTP/2 initial window size must be non-negative"); + } + this.allocator = allocator; this.nettyTimer = nettyTimer; this.threadFactory = threadFactory; @@ -608,6 +647,46 @@ public boolean isFilterInsecureCipherSuites() { return filterInsecureCipherSuites; } + @Override + public boolean isHttp2Enabled() { + return http2Enabled; + } + + @Override + public int getHttp2InitialWindowSize() { + return http2InitialWindowSize; + } + + @Override + public int getHttp2MaxFrameSize() { + return http2MaxFrameSize; + } + + @Override + public int getHttp2HeaderTableSize() { + return http2HeaderTableSize; + } + + @Override + public int getHttp2MaxHeaderListSize() { + return http2MaxHeaderListSize; + } + + @Override + public int getHttp2MaxConcurrentStreams() { + return http2MaxConcurrentStreams; + } + + @Override + public Duration getHttp2PingInterval() { + return http2PingInterval; + } + + @Override + public boolean isHttp2CleartextEnabled() { + return http2CleartextEnabled; + } + @Override public int getSslSessionCacheSize() { return sslSessionCacheSize; @@ -847,6 +926,14 @@ public static class Builder { private int sslSessionTimeout = defaultSslSessionTimeout(); private @Nullable SslContext sslContext; private @Nullable SslEngineFactory sslEngineFactory; + private boolean http2Enabled = true; + private int http2InitialWindowSize = defaultHttp2InitialWindowSize(); + private int http2MaxFrameSize = defaultHttp2MaxFrameSize(); + private int http2HeaderTableSize = defaultHttp2HeaderTableSize(); + private int http2MaxHeaderListSize = defaultHttp2MaxHeaderListSize(); + private int http2MaxConcurrentStreams = defaultHttp2MaxConcurrentStreams(); + private Duration http2PingInterval = defaultHttp2PingInterval(); + private boolean http2CleartextEnabled = defaultHttp2CleartextEnabled(); // cookie store private CookieStore cookieStore = new ThreadSafeCookieStore(); @@ -939,6 +1026,14 @@ public Builder(AsyncHttpClientConfig config) { sslSessionTimeout = config.getSslSessionTimeout(); sslContext = config.getSslContext(); sslEngineFactory = config.getSslEngineFactory(); + http2Enabled = config.isHttp2Enabled(); + http2InitialWindowSize = config.getHttp2InitialWindowSize(); + http2MaxFrameSize = config.getHttp2MaxFrameSize(); + http2HeaderTableSize = config.getHttp2HeaderTableSize(); + http2MaxHeaderListSize = config.getHttp2MaxHeaderListSize(); + http2MaxConcurrentStreams = config.getHttp2MaxConcurrentStreams(); + http2PingInterval = config.getHttp2PingInterval(); + http2CleartextEnabled = config.isHttp2CleartextEnabled(); // filters requestFilters.addAll(config.getRequestFilters()); @@ -1254,6 +1349,46 @@ public Builder setSslEngineFactory(SslEngineFactory sslEngineFactory) { return this; } + public Builder setHttp2Enabled(boolean http2Enabled) { + this.http2Enabled = http2Enabled; + return this; + } + + public Builder setHttp2InitialWindowSize(int http2InitialWindowSize) { + this.http2InitialWindowSize = http2InitialWindowSize; + return this; + } + + public Builder setHttp2MaxFrameSize(int http2MaxFrameSize) { + this.http2MaxFrameSize = http2MaxFrameSize; + return this; + } + + public Builder setHttp2HeaderTableSize(int http2HeaderTableSize) { + this.http2HeaderTableSize = http2HeaderTableSize; + return this; + } + + public Builder setHttp2MaxHeaderListSize(int http2MaxHeaderListSize) { + this.http2MaxHeaderListSize = http2MaxHeaderListSize; + return this; + } + + public Builder setHttp2MaxConcurrentStreams(int http2MaxConcurrentStreams) { + this.http2MaxConcurrentStreams = http2MaxConcurrentStreams; + return this; + } + + public Builder setHttp2PingInterval(Duration http2PingInterval) { + this.http2PingInterval = http2PingInterval; + return this; + } + + public Builder setHttp2CleartextEnabled(boolean http2CleartextEnabled) { + this.http2CleartextEnabled = http2CleartextEnabled; + return this; + } + // filters public Builder addRequestFilter(RequestFilter requestFilter) { requestFilters.add(requestFilter); @@ -1486,6 +1621,14 @@ public DefaultAsyncHttpClientConfig build() { sslSessionTimeout, sslContext, sslEngineFactory, + http2Enabled, + http2InitialWindowSize, + http2MaxFrameSize, + http2HeaderTableSize, + http2MaxHeaderListSize, + http2MaxConcurrentStreams, + http2PingInterval, + http2CleartextEnabled, requestFilters.isEmpty() ? Collections.emptyList() : Collections.unmodifiableList(requestFilters), responseFilters.isEmpty() ? Collections.emptyList() : Collections.unmodifiableList(responseFilters), ioExceptionFilters.isEmpty() ? Collections.emptyList() : Collections.unmodifiableList(ioExceptionFilters), diff --git a/client/src/main/java/org/asynchttpclient/HttpProtocol.java b/client/src/main/java/org/asynchttpclient/HttpProtocol.java new file mode 100644 index 0000000000..12e31789de --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/HttpProtocol.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2014-2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.asynchttpclient; + +/** + * HTTP protocol version used for a request/response exchange. + */ +public enum HttpProtocol { + + HTTP_1_0("HTTP/1.0"), + HTTP_1_1("HTTP/1.1"), + HTTP_2("HTTP/2"); + + private final String text; + + HttpProtocol(String text) { + this.text = text; + } + + /** + * @return the protocol version string (e.g. "HTTP/1.1", "HTTP/2") + */ + public String getText() { + return text; + } + + @Override + public String toString() { + return text; + } +} diff --git a/client/src/main/java/org/asynchttpclient/Response.java b/client/src/main/java/org/asynchttpclient/Response.java index 220d989b09..77512094d2 100644 --- a/client/src/main/java/org/asynchttpclient/Response.java +++ b/client/src/main/java/org/asynchttpclient/Response.java @@ -169,6 +169,15 @@ public interface Response { */ boolean hasResponseBody(); + /** + * Return the HTTP protocol version used for this response. + * + * @return the protocol, defaults to {@link HttpProtocol#HTTP_1_1} + */ + default HttpProtocol getProtocol() { + return HttpProtocol.HTTP_1_1; + } + /** * Get the remote address that the client initiated the request to. * diff --git a/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java b/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java index 3596c67a92..4f97926bdb 100644 --- a/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java +++ b/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java @@ -83,6 +83,13 @@ public final class AsyncHttpClientConfigDefaults { public static final String HASHED_WHEEL_TIMER_TICK_DURATION = "hashedWheelTimerTickDuration"; public static final String HASHED_WHEEL_TIMER_SIZE = "hashedWheelTimerSize"; public static final String EXPIRED_COOKIE_EVICTION_DELAY = "expiredCookieEvictionDelay"; + public static final String HTTP2_INITIAL_WINDOW_SIZE_CONFIG = "http2InitialWindowSize"; + public static final String HTTP2_MAX_FRAME_SIZE_CONFIG = "http2MaxFrameSize"; + public static final String HTTP2_HEADER_TABLE_SIZE_CONFIG = "http2HeaderTableSize"; + public static final String HTTP2_MAX_HEADER_LIST_SIZE_CONFIG = "http2MaxHeaderListSize"; + public static final String HTTP2_MAX_CONCURRENT_STREAMS_CONFIG = "http2MaxConcurrentStreams"; + public static final String HTTP2_PING_INTERVAL_CONFIG = "http2PingInterval"; + public static final String HTTP2_CLEARTEXT_ENABLED_CONFIG = "http2CleartextEnabled"; public static final String AHC_VERSION; @@ -332,4 +339,32 @@ public static int defaultHashedWheelTimerSize() { public static int defaultExpiredCookieEvictionDelay() { return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + EXPIRED_COOKIE_EVICTION_DELAY); } + + public static int defaultHttp2InitialWindowSize() { + return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + HTTP2_INITIAL_WINDOW_SIZE_CONFIG); + } + + public static int defaultHttp2MaxFrameSize() { + return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + HTTP2_MAX_FRAME_SIZE_CONFIG); + } + + public static int defaultHttp2HeaderTableSize() { + return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + HTTP2_HEADER_TABLE_SIZE_CONFIG); + } + + public static int defaultHttp2MaxHeaderListSize() { + return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + HTTP2_MAX_HEADER_LIST_SIZE_CONFIG); + } + + public static int defaultHttp2MaxConcurrentStreams() { + return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + HTTP2_MAX_CONCURRENT_STREAMS_CONFIG); + } + + public static Duration defaultHttp2PingInterval() { + return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getDuration(ASYNC_CLIENT_CONFIG_ROOT + HTTP2_PING_INTERVAL_CONFIG); + } + + public static boolean defaultHttp2CleartextEnabled() { + return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getBoolean(ASYNC_CLIENT_CONFIG_ROOT + HTTP2_CLEARTEXT_ENABLED_CONFIG); + } } diff --git a/client/src/main/java/org/asynchttpclient/netty/NettyResponse.java b/client/src/main/java/org/asynchttpclient/netty/NettyResponse.java index 61fb15161c..31c7cb2f04 100755 --- a/client/src/main/java/org/asynchttpclient/netty/NettyResponse.java +++ b/client/src/main/java/org/asynchttpclient/netty/NettyResponse.java @@ -22,6 +22,7 @@ import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.cookie.ClientCookieDecoder; import io.netty.handler.codec.http.cookie.Cookie; +import org.asynchttpclient.HttpProtocol; import org.asynchttpclient.HttpResponseBodyPart; import org.asynchttpclient.HttpResponseStatus; import org.asynchttpclient.Response; @@ -158,6 +159,20 @@ public List getCookies() { } + @Override + public HttpProtocol getProtocol() { + if (status == null) { + return HttpProtocol.HTTP_1_1; + } + int major = status.getProtocolMajorVersion(); + if (major == 2) { + return HttpProtocol.HTTP_2; + } else if (status.getProtocolMinorVersion() == 0) { + return HttpProtocol.HTTP_1_0; + } + return HttpProtocol.HTTP_1_1; + } + @Override public boolean hasResponseStatus() { return status != null; @@ -223,6 +238,7 @@ public InputStream getResponseBodyAsStream() { public String toString() { StringBuilder sb = new StringBuilder(); sb.append(getClass().getSimpleName()).append(" {\n") + .append("\tprotocol=").append(getProtocol()).append('\n') .append("\tstatusCode=").append(getStatusCode()).append('\n') .append("\theaders=\n"); diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java index e59daadd1c..0af03dfd41 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java @@ -20,6 +20,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; @@ -34,6 +36,15 @@ import io.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder; import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler; +import io.netty.handler.codec.http2.DefaultHttp2ResetFrame; +import io.netty.handler.codec.http2.Http2Error; +import io.netty.handler.codec.http2.Http2FrameCodec; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2Settings; +import io.netty.handler.codec.http2.Http2GoAwayFrame; +import io.netty.handler.codec.http2.Http2SettingsFrame; +import io.netty.handler.codec.http2.Http2StreamChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.proxy.ProxyHandler; @@ -41,6 +52,7 @@ import io.netty.handler.proxy.Socks5ProxyHandler; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.handler.timeout.IdleStateHandler; import io.netty.resolver.NameResolver; import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; @@ -61,6 +73,8 @@ import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.OnLastHttpContentCallback; import org.asynchttpclient.netty.handler.AsyncHttpClientHandler; +import org.asynchttpclient.netty.handler.Http2Handler; +import org.asynchttpclient.netty.handler.Http2PingHandler; import org.asynchttpclient.netty.handler.HttpHandler; import org.asynchttpclient.netty.handler.WebSocketHandler; import org.asynchttpclient.netty.request.NettyRequestSender; @@ -77,6 +91,7 @@ import java.net.InetSocketAddress; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -96,6 +111,9 @@ public class ChannelManager { public static final String AHC_HTTP_HANDLER = "ahc-http"; public static final String AHC_WS_HANDLER = "ahc-ws"; public static final String LOGGING_HANDLER = "logging"; + public static final String HTTP2_FRAME_CODEC = "http2-frame-codec"; + public static final String HTTP2_MULTIPLEX = "http2-multiplex"; + public static final String AHC_HTTP2_HANDLER = "ahc-http2"; private static final Logger LOGGER = LoggerFactory.getLogger(ChannelManager.class); private final AsyncHttpClientConfig config; private final SslEngineFactory sslEngineFactory; @@ -107,8 +125,10 @@ public class ChannelManager { private final ChannelPool channelPool; private final ChannelGroup openChannels; + private final ConcurrentHashMap http2Connections = new ConcurrentHashMap<>(); private AsyncHttpClientHandler wsHandler; + private Http2Handler http2Handler; private boolean isInstanceof(Object object, String name) { final Class clazz; @@ -239,6 +259,7 @@ private static Bootstrap newBootstrap(ChannelFactory channelF public void configureBootstraps(NettyRequestSender requestSender) { final AsyncHttpClientHandler httpHandler = new HttpHandler(config, this, requestSender); wsHandler = new WebSocketHandler(config, this, requestSender); + http2Handler = new Http2Handler(config, this, requestSender); httpBootstrap.handler(new ChannelInitializer() { @Override @@ -321,6 +342,59 @@ public final void tryToOfferChannelToPool(Channel channel, AsyncHandler async } } + /** + * Registers an HTTP/2 connection in the registry for the given partition key. + * The connection stays in the registry (not the regular pool) to allow multiplexing — + * multiple requests can share the same connection concurrently. + */ + public void registerHttp2Connection(Object partitionKey, Channel channel) { + Http2ConnectionState state = channel.attr(Http2ConnectionState.HTTP2_STATE_KEY).get(); + if (state != null) { + state.setPartitionKey(partitionKey); + } + http2Connections.put(partitionKey, channel); + // Auto-remove from registry when the connection closes + channel.closeFuture().addListener(future -> removeHttp2Connection(partitionKey, channel)); + } + + /** + * Removes an HTTP/2 connection from the registry, but only if it's the currently registered + * connection for that partition key (avoids removing a replacement connection). + */ + public void removeHttp2Connection(Object partitionKey, Channel channel) { + http2Connections.remove(partitionKey, channel); + } + + /** + * Returns an active, non-draining HTTP/2 connection for the given partition key, or {@code null}. + * Unlike the regular pool, this does NOT remove the connection — it remains available for + * concurrent multiplexed requests. + */ + public Channel pollHttp2Connection(Object partitionKey) { + Channel channel = http2Connections.get(partitionKey); + if (channel == null) { + return null; + } + if (!channel.isActive()) { + http2Connections.remove(partitionKey, channel); + return null; + } + Http2ConnectionState state = channel.attr(Http2ConnectionState.HTTP2_STATE_KEY).get(); + if (state != null && state.isDraining()) { + return null; + } + return channel; + } + + /** + * Polls for an HTTP/2 connection by URI/virtualHost/proxy, using the same partition key logic + * as the regular pool. Returns the connection without removing it from the registry. + */ + public Channel pollHttp2(Uri uri, String virtualHost, ProxyServer proxy, ChannelPoolPartitioning connectionPoolPartitioning) { + Object partitionKey = connectionPoolPartitioning.getPartitionKey(uri, virtualHost, proxy); + return pollHttp2Connection(partitionKey); + } + public Channel poll(Uri uri, String virtualHost, ProxyServer proxy, ChannelPoolPartitioning connectionPoolPartitioning) { Object partitionKey = connectionPoolPartitioning.getPartitionKey(uri, virtualHost, proxy); return channelPool.poll(partitionKey); @@ -331,6 +405,7 @@ public void removeAll(Channel connection) { } private void doClose() { + http2Connections.clear(); ChannelGroupFuture groupFuture = openChannels.close(); channelPool.destroy(); groupFuture.addListener(future -> sslEngineFactory.destroy()); @@ -549,6 +624,135 @@ protected void initChannel(Channel channel) throws Exception { return promise; } + /** + * Checks whether the given channel is an HTTP/2 connection (i.e. has the HTTP/2 multiplex handler installed). + */ + public static boolean isHttp2(Channel channel) { + return channel.pipeline().get(HTTP2_MULTIPLEX) != null; + } + + /** + * Checks whether the given channel is an HTTP/2 stream child channel. + * Stream channels are single-use and don't support HTTP/1.1 operations like draining or pipeline modification. + */ + public static boolean isHttp2StreamChannel(Channel channel) { + return channel instanceof Http2StreamChannel; + } + + /** + * Returns the shared {@link Http2Handler} instance for use with stream child channels. + */ + public Http2Handler getHttp2Handler() { + return http2Handler; + } + + /** + * Upgrades the pipeline from HTTP/1.1 to HTTP/2 after ALPN negotiates "h2". + * Removes HTTP/1.1 handlers and adds {@link Http2FrameCodec} + {@link Http2MultiplexHandler}. + * The per-stream {@link Http2Handler} is added separately on each stream child channel. + */ + public void upgradePipelineToHttp2(ChannelPipeline pipeline) { + // Remove HTTP/1.1 specific handlers + if (pipeline.get(HTTP_CLIENT_CODEC) != null) { + pipeline.remove(HTTP_CLIENT_CODEC); + } + if (pipeline.get(INFLATER_HANDLER) != null) { + pipeline.remove(INFLATER_HANDLER); + } + if (pipeline.get(CHUNKED_WRITER_HANDLER) != null) { + pipeline.remove(CHUNKED_WRITER_HANDLER); + } + if (pipeline.get(AHC_HTTP_HANDLER) != null) { + pipeline.remove(AHC_HTTP_HANDLER); + } + + // Add HTTP/2 frame codec (handles connection preface, SETTINGS, PING, flow control, etc.) + Http2Settings settings = new Http2Settings() + .initialWindowSize(config.getHttp2InitialWindowSize()) + .maxFrameSize(config.getHttp2MaxFrameSize()) + .headerTableSize(config.getHttp2HeaderTableSize()) + .maxHeaderListSize(config.getHttp2MaxHeaderListSize()); + + Http2FrameCodec frameCodec = Http2FrameCodecBuilder.forClient() + .initialSettings(settings) + .build(); + + // Http2MultiplexHandler creates a child channel per HTTP/2 stream. + // Server-push streams are rejected with RST_STREAM(REFUSED_STREAM). + Http2MultiplexHandler multiplexHandler = new Http2MultiplexHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + // Reject server push by sending RST_STREAM(REFUSED_STREAM) + ch.writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.REFUSED_STREAM)) + .addListener(f -> ch.close()); + } + }); + + pipeline.addLast(HTTP2_FRAME_CODEC, frameCodec); + pipeline.addLast(HTTP2_MULTIPLEX, multiplexHandler); + + // Attach HTTP/2 connection state for MAX_CONCURRENT_STREAMS tracking and GOAWAY draining + Http2ConnectionState state = new Http2ConnectionState(); + int configMaxStreams = config.getHttp2MaxConcurrentStreams(); + if (configMaxStreams > 0) { + state.updateMaxConcurrentStreams(configMaxStreams); + } + pipeline.channel().attr(Http2ConnectionState.HTTP2_STATE_KEY).set(state); + + // Install SETTINGS listener to update MAX_CONCURRENT_STREAMS from server + pipeline.addLast("http2-settings-listener", new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Http2SettingsFrame) { + Http2SettingsFrame settingsFrame = (Http2SettingsFrame) msg; + Long maxStreams = settingsFrame.settings().maxConcurrentStreams(); + if (maxStreams != null) { + Http2ConnectionState connState = ctx.channel().attr(Http2ConnectionState.HTTP2_STATE_KEY).get(); + if (connState != null) { + connState.updateMaxConcurrentStreams(maxStreams.intValue()); + } + } + } + ctx.fireChannelRead(msg); + } + }); + + // Install GOAWAY handler on the parent channel to mark the connection as draining + // and remove it from the HTTP/2 registry. GOAWAY is a connection-level frame that + // arrives on the parent channel, not on stream child channels. + pipeline.addLast("http2-goaway-listener", new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Http2GoAwayFrame) { + Http2GoAwayFrame goAwayFrame = (Http2GoAwayFrame) msg; + int lastStreamId = goAwayFrame.lastStreamId(); + Http2ConnectionState connState = ctx.channel().attr(Http2ConnectionState.HTTP2_STATE_KEY).get(); + if (connState != null) { + connState.setDraining(lastStreamId); + Object pk = connState.getPartitionKey(); + if (pk != null) { + removeHttp2Connection(pk, ctx.channel()); + } + } + LOGGER.debug("HTTP/2 GOAWAY received on {}, lastStreamId={}, errorCode={}", + ctx.channel(), lastStreamId, goAwayFrame.errorCode()); + // Close the connection when no more active streams + if (connState != null && connState.getActiveStreams() <= 0) { + closeChannel(ctx.channel()); + } + } + ctx.fireChannelRead(msg); + } + }); + + // Install PING handler for keepalive if configured + long pingIntervalMs = config.getHttp2PingInterval().toMillis(); + if (pingIntervalMs > 0) { + pipeline.addLast("http2-idle-state", new IdleStateHandler(0, 0, pingIntervalMs, TimeUnit.MILLISECONDS)); + pipeline.addLast("http2-ping", new Http2PingHandler()); + } + } + public void upgradePipelineForWebSockets(ChannelPipeline pipeline) { pipeline.addAfter(HTTP_CLIENT_CODEC, WS_ENCODER_HANDLER, new WebSocket08FrameEncoder(true)); pipeline.addAfter(WS_ENCODER_HANDLER, WS_DECODER_HANDLER, new WebSocket08FrameDecoder(false, @@ -610,4 +814,8 @@ public ClientStats getClientStats() { public boolean isOpen() { return channelPool.isOpen(); } -} \ No newline at end of file + + public boolean isHttp2CleartextEnabled() { + return config.isHttp2Enabled() && config.isHttp2CleartextEnabled(); + } +} diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/Http2ConnectionState.java b/client/src/main/java/org/asynchttpclient/netty/channel/Http2ConnectionState.java new file mode 100644 index 0000000000..3911c45d72 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/netty/channel/Http2ConnectionState.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2014-2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.asynchttpclient.netty.channel; + +import io.netty.util.AttributeKey; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Tracks per-connection HTTP/2 state: active stream count, max concurrent streams, + * draining status (from GOAWAY), and pending stream openers. + */ +public class Http2ConnectionState { + + public static final AttributeKey HTTP2_STATE_KEY = + AttributeKey.valueOf("http2ConnectionState"); + + private final AtomicInteger activeStreams = new AtomicInteger(0); + private volatile int maxConcurrentStreams = Integer.MAX_VALUE; + private final AtomicBoolean draining = new AtomicBoolean(false); + private volatile int lastGoAwayStreamId = Integer.MAX_VALUE; + private final ConcurrentLinkedQueue pendingOpeners = new ConcurrentLinkedQueue<>(); + private volatile Object partitionKey; + + public boolean tryAcquireStream() { + if (draining.get()) { + return false; + } + while (true) { + int current = activeStreams.get(); + if (current >= maxConcurrentStreams) { + return false; + } + if (activeStreams.compareAndSet(current, current + 1)) { + return true; + } + } + } + + public void releaseStream() { + activeStreams.decrementAndGet(); + // Try to dequeue and run a pending opener + Runnable pending = pendingOpeners.poll(); + if (pending != null) { + pending.run(); + } + } + + public void addPendingOpener(Runnable opener) { + pendingOpeners.add(opener); + // Re-check in case a stream was released between the failed tryAcquire and this enqueue + if (tryAcquireStream()) { + Runnable dequeued = pendingOpeners.poll(); + if (dequeued != null) { + dequeued.run(); + } else { + releaseStream(); + } + } + } + + public void updateMaxConcurrentStreams(int maxConcurrentStreams) { + this.maxConcurrentStreams = maxConcurrentStreams; + } + + public int getMaxConcurrentStreams() { + return maxConcurrentStreams; + } + + public int getActiveStreams() { + return activeStreams.get(); + } + + public boolean isDraining() { + return draining.get(); + } + + public void setDraining(int lastStreamId) { + this.lastGoAwayStreamId = lastStreamId; + this.draining.set(true); + } + + public int getLastGoAwayStreamId() { + return lastGoAwayStreamId; + } + + public void setPartitionKey(Object partitionKey) { + this.partitionKey = partitionKey; + } + + public Object getPartitionKey() { + return partitionKey; + } +} diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java b/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java index a1d61177eb..4f5612223e 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java @@ -17,12 +17,14 @@ import io.netty.channel.Channel; import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.SslHandler; import org.asynchttpclient.AsyncHandler; import org.asynchttpclient.Request; import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.SimpleFutureListener; import org.asynchttpclient.netty.future.StackTraceInspector; +import org.asynchttpclient.channel.ChannelPoolPartitioning; import org.asynchttpclient.netty.request.NettyRequestSender; import org.asynchttpclient.netty.timeout.TimeoutsHolder; import org.asynchttpclient.proxy.ProxyServer; @@ -80,14 +82,9 @@ private void writeRequest(Channel channel) { } public void onSuccess(Channel channel, InetSocketAddress remoteAddress) { - if (connectionSemaphore != null) { - // transfer lock from future to channel - Object partitionKeyLock = future.takePartitionKeyLock(); - - if (partitionKeyLock != null) { - channel.closeFuture().addListener(future -> connectionSemaphore.releaseChannelLock(partitionKeyLock)); - } - } + // Take the semaphore lock from the future. For HTTP/1.1, we'll transfer it to channel.closeFuture(). + // For HTTP/2, we release it immediately after ALPN negotiation since the connection is multiplexed. + final Object partitionKeyLock = (connectionSemaphore != null) ? future.takePartitionKeyLock() : null; Channels.setActiveToken(channel); TimeoutsHolder timeoutsHolder = future.getTimeoutsHolder(); @@ -139,6 +136,7 @@ protected void onSuccess(Channel value) { return; } // After SSL handshake to proxy, continue with normal proxy request + attachSemaphoreToChannelClose(channel, partitionKeyLock); writeRequest(channel); } @@ -185,6 +183,15 @@ protected void onSuccess(Channel value) { NettyConnectListener.this.onFailure(channel, e); return; } + // Detect ALPN-negotiated protocol and upgrade pipeline to HTTP/2 if "h2" was selected + String alpnProtocol = sslHandler.applicationProtocol(); + if (ApplicationProtocolNames.HTTP_2.equals(alpnProtocol)) { + channelManager.upgradePipelineToHttp2(channel.pipeline()); + registerHttp2AndReleaseSemaphore(channel); + releaseSemaphoreImmediately(partitionKeyLock); + } else { + attachSemaphoreToChannelClose(channel, partitionKeyLock); + } writeRequest(channel); } @@ -202,10 +209,51 @@ protected void onFailure(Throwable cause) { }); } else { + // h2c (cleartext HTTP/2 prior knowledge): upgrade to HTTP/2 without TLS + if (!uri.isSecured() && channelManager.isHttp2CleartextEnabled()) { + channelManager.upgradePipelineToHttp2(channel.pipeline()); + registerHttp2AndReleaseSemaphore(channel); + releaseSemaphoreImmediately(partitionKeyLock); + } else { + attachSemaphoreToChannelClose(channel, partitionKeyLock); + } writeRequest(channel); } } + /** + * Attaches the semaphore lock to the channel's close future (HTTP/1.1 behavior). + * The semaphore slot is released when the connection closes. + */ + private void attachSemaphoreToChannelClose(Channel channel, Object partitionKeyLock) { + if (connectionSemaphore != null && partitionKeyLock != null) { + channel.closeFuture().addListener(f -> connectionSemaphore.releaseChannelLock(partitionKeyLock)); + } + } + + /** + * Releases the semaphore lock immediately (HTTP/2 behavior). + * HTTP/2 connections are multiplexed, so the semaphore should not be held + * for the lifetime of the connection. + */ + private void releaseSemaphoreImmediately(Object partitionKeyLock) { + if (connectionSemaphore != null && partitionKeyLock != null) { + connectionSemaphore.releaseChannelLock(partitionKeyLock); + } + } + + /** + * Registers the HTTP/2 connection in the channel manager's H2 registry. + */ + private void registerHttp2AndReleaseSemaphore(Channel channel) { + Request request = future.getTargetRequest(); + Uri uri = request.getUri(); + ProxyServer proxy = future.getProxyServer(); + ChannelPoolPartitioning partitioning = request.getChannelPoolPartitioning(); + Object partitionKey = partitioning.getPartitionKey(uri, request.getVirtualHost(), proxy); + channelManager.registerHttp2Connection(partitionKey, channel); + } + public void onFailure(Channel channel, Throwable cause) { // beware, channel can be null diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/Http2ContentDecompressor.java b/client/src/main/java/org/asynchttpclient/netty/handler/Http2ContentDecompressor.java new file mode 100644 index 0000000000..b53c8fe658 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/netty/handler/Http2ContentDecompressor.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2014-2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.asynchttpclient.netty.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.compression.JdkZlibDecoder; +import io.netty.handler.codec.compression.ZlibWrapper; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.Http2DataFrame; +import io.netty.handler.codec.http2.Http2HeadersFrame; + +/** + * HTTP/2 content decompressor that transparently decompresses gzip/deflate response bodies. + * Installed on stream child channels when automatic decompression is enabled. + *

+ * Uses Netty's {@link JdkZlibDecoder} via an {@link EmbeddedChannel} for streaming decompression, + * forwarding decompressed data frames as they arrive rather than buffering the entire response. + */ +public class Http2ContentDecompressor extends ChannelInboundHandlerAdapter { + + private final boolean keepEncodingHeader; + private EmbeddedChannel decompressor; + + public Http2ContentDecompressor(boolean keepEncodingHeader) { + this.keepEncodingHeader = keepEncodingHeader; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Http2HeadersFrame) { + Http2HeadersFrame headersFrame = (Http2HeadersFrame) msg; + CharSequence contentEncoding = headersFrame.headers().get("content-encoding"); + if (contentEncoding != null) { + String enc = contentEncoding.toString().toLowerCase(); + if (enc.contains("gzip") || enc.contains("deflate")) { + ZlibWrapper wrapper = enc.contains("gzip") ? ZlibWrapper.GZIP : ZlibWrapper.ZLIB_OR_NONE; + decompressor = new EmbeddedChannel(false, new JdkZlibDecoder(wrapper)); + if (!keepEncodingHeader) { + headersFrame.headers().remove("content-encoding"); + } + headersFrame.headers().remove("content-length"); + } + } + ctx.fireChannelRead(msg); + } else if (msg instanceof Http2DataFrame && decompressor != null) { + Http2DataFrame dataFrame = (Http2DataFrame) msg; + ByteBuf content = dataFrame.content(); + boolean endStream = dataFrame.isEndStream(); + + if (content.isReadable()) { + decompressor.writeInbound(content.retain()); + } + + // Release the original frame + dataFrame.release(); + + // Read all decompressed output from the embedded channel + CompositeByteBuf decompressed = ctx.alloc().compositeBuffer(); + ByteBuf decoded; + while ((decoded = decompressor.readInbound()) != null) { + decompressed.addComponent(true, decoded); + } + + if (endStream) { + decompressor.finish(); + while ((decoded = decompressor.readInbound()) != null) { + decompressed.addComponent(true, decoded); + } + releaseDecompressor(); + } + + if (decompressed.isReadable() || endStream) { + ctx.fireChannelRead(new DefaultHttp2DataFrame(decompressed, endStream)); + } else { + decompressed.release(); + } + } else { + ctx.fireChannelRead(msg); + } + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + releaseDecompressor(); + } + + private void releaseDecompressor() { + if (decompressor != null) { + decompressor.finishAndReleaseAll(); + decompressor = null; + } + } +} diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/Http2Handler.java b/client/src/main/java/org/asynchttpclient/netty/handler/Http2Handler.java new file mode 100644 index 0000000000..7bac415373 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/netty/handler/Http2Handler.java @@ -0,0 +1,310 @@ +/* + * Copyright (c) 2014-2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.asynchttpclient.netty.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http2.Http2DataFrame; +import io.netty.handler.codec.http2.Http2GoAwayFrame; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2HeadersFrame; +import io.netty.handler.codec.http2.Http2ResetFrame; +import io.netty.handler.codec.http2.Http2StreamChannel; +import org.asynchttpclient.AsyncHandler; +import org.asynchttpclient.AsyncHandler.State; +import org.asynchttpclient.AsyncHttpClientConfig; +import org.asynchttpclient.HttpResponseBodyPart; +import org.asynchttpclient.netty.NettyResponseFuture; +import org.asynchttpclient.netty.NettyResponseStatus; +import org.asynchttpclient.netty.channel.ChannelManager; +import org.asynchttpclient.netty.channel.Http2ConnectionState; +import org.asynchttpclient.netty.request.NettyRequestSender; + +import java.io.IOException; + +/** + * HTTP/2 channel handler for stream child channels created by {@link io.netty.handler.codec.http2.Http2MultiplexHandler}. + *

+ * Each HTTP/2 stream is represented as a child channel. This handler is attached to each stream child channel + * and processes {@link Http2HeadersFrame} (response status + headers) and {@link Http2DataFrame} (response body) + * frames directly for maximum performance — no HTTP/1.1 object conversion overhead. + *

+ * Follows the same structure as {@link HttpHandler} and reuses the same interceptor chain, + * body part factory, and lifecycle methods from {@link AsyncHttpClientHandler}. + */ +@Sharable +public final class Http2Handler extends AsyncHttpClientHandler { + + private static final HttpVersion HTTP_2 = new HttpVersion("HTTP", 2, 0, true); + + public Http2Handler(AsyncHttpClientConfig config, ChannelManager channelManager, NettyRequestSender requestSender) { + super(config, channelManager, requestSender); + } + + /** + * Handles incoming frames on the HTTP/2 stream child channel. + * Dispatches to the appropriate handler based on frame type. + */ + @Override + public void handleRead(final Channel channel, final NettyResponseFuture future, final Object e) throws Exception { + if (future.isDone()) { + channelManager.closeChannel(channel); + return; + } + + AsyncHandler handler = future.getAsyncHandler(); + try { + if (e instanceof Http2HeadersFrame) { + Http2HeadersFrame headersFrame = (Http2HeadersFrame) e; + if (headersFrame.headers().status() != null) { + handleHttp2HeadersFrame(headersFrame, channel, future, handler); + } else { + handleHttp2TrailingHeadersFrame(headersFrame, channel, future, handler); + } + } else if (e instanceof Http2DataFrame) { + handleHttp2DataFrame((Http2DataFrame) e, channel, future, handler); + } else if (e instanceof Http2ResetFrame) { + handleHttp2ResetFrame((Http2ResetFrame) e, channel, future); + } else if (e instanceof Http2GoAwayFrame) { + handleHttp2GoAwayFrame((Http2GoAwayFrame) e, channel, future); + } + } catch (Exception t) { + if (hasIOExceptionFilters && t instanceof IOException + && requestSender.applyIoExceptionFiltersAndReplayRequest(future, (IOException) t, channel)) { + return; + } + readFailed(channel, future, t); + throw t; + } + } + + /** + * Processes an HTTP/2 HEADERS frame, which carries the response status and headers. + * Builds a synthetic {@link HttpResponse} from the HTTP/2 pseudo-headers so the existing + * interceptor chain can be reused without modification. + */ + private void handleHttp2HeadersFrame(Http2HeadersFrame headersFrame, Channel channel, + NettyResponseFuture future, AsyncHandler handler) throws Exception { + Http2Headers h2Headers = headersFrame.headers(); + + // Extract :status pseudo-header and convert to HTTP status + CharSequence statusValue = h2Headers.status(); + int statusCode = statusValue != null ? Integer.parseInt(statusValue.toString()) : 200; + HttpResponseStatus nettyStatus = HttpResponseStatus.valueOf(statusCode); + + // Build HTTP/1.1-style headers, skipping HTTP/2 pseudo-headers (start with ':') + HttpHeaders responseHeaders = new DefaultHttpHeaders(); + h2Headers.forEach(entry -> { + CharSequence name = entry.getKey(); + if (name.length() > 0 && name.charAt(0) != ':') { + responseHeaders.add(name, entry.getValue()); + } + }); + + // Build a synthetic HttpResponse so the existing interceptor chain can be reused unchanged + HttpResponse syntheticResponse = new DefaultHttpResponse(HTTP_2, nettyStatus, responseHeaders); + + // Respect user's keepAlive config; only multiplex/pool if keepAlive is enabled + future.setKeepAlive(config.isKeepAlive()); + + NettyResponseStatus status = new NettyResponseStatus(future.getUri(), syntheticResponse, channel); + + if (!interceptors.exitAfterIntercept(channel, future, handler, syntheticResponse, status, responseHeaders)) { + boolean abort = handler.onStatusReceived(status) == State.ABORT; + if (!abort) { + abort = handler.onHeadersReceived(responseHeaders) == State.ABORT; + } + if (abort) { + finishUpdate(future, channel, false); + return; + } + } + + // If headers frame also ends the stream (no body), finish the response + if (headersFrame.isEndStream()) { + finishUpdate(future, channel, false); + } + } + + /** + * Processes an HTTP/2 DATA frame, which carries response body bytes. + * Passes body content directly to {@link AsyncHandler#onBodyPartReceived} using the + * configured {@link org.asynchttpclient.ResponseBodyPartFactory} — same as HTTP/1.1. + */ + private void handleHttp2DataFrame(Http2DataFrame dataFrame, Channel channel, + NettyResponseFuture future, AsyncHandler handler) throws Exception { + boolean last = dataFrame.isEndStream(); + ByteBuf data = dataFrame.content(); + + if (data.isReadable() || last) { + HttpResponseBodyPart bodyPart = config.getResponseBodyPartFactory().newResponseBodyPart(data, last); + boolean abort = handler.onBodyPartReceived(bodyPart) == State.ABORT; + if (abort || last) { + finishUpdate(future, channel, false); + } + } + } + + /** + * Processes trailing HTTP/2 HEADERS frame (no :status pseudo-header), which carries trailer headers + * sent after the DATA frames. Delegates to {@link AsyncHandler#onTrailingHeadersReceived}. + */ + private void handleHttp2TrailingHeadersFrame(Http2HeadersFrame headersFrame, Channel channel, + NettyResponseFuture future, AsyncHandler handler) throws Exception { + Http2Headers h2Headers = headersFrame.headers(); + + HttpHeaders trailingHeaders = new DefaultHttpHeaders(); + h2Headers.forEach(entry -> { + CharSequence name = entry.getKey(); + if (name.length() > 0 && name.charAt(0) != ':') { + trailingHeaders.add(name, entry.getValue()); + } + }); + + boolean abort = false; + if (!trailingHeaders.isEmpty()) { + abort = handler.onTrailingHeadersReceived(trailingHeaders) == State.ABORT; + } + + if (abort || headersFrame.isEndStream()) { + finishUpdate(future, channel, false); + } + } + + /** + * Processes an HTTP/2 RST_STREAM frame, which indicates the server aborted the stream. + */ + private void handleHttp2ResetFrame(Http2ResetFrame resetFrame, Channel channel, NettyResponseFuture future) { + long errorCode = resetFrame.errorCode(); + readFailed(channel, future, new IOException("HTTP/2 stream reset by server, error code: " + errorCode)); + } + + /** + * Processes an HTTP/2 GOAWAY frame, which indicates the server is shutting down the connection. + * The parent connection is removed from the pool to prevent new streams from being created on it. + * The current stream's future is failed so the request can be retried on a new connection. + */ + private void handleHttp2GoAwayFrame(Http2GoAwayFrame goAwayFrame, Channel channel, NettyResponseFuture future) { + long errorCode = goAwayFrame.errorCode(); + int lastStreamId = goAwayFrame.lastStreamId(); + + // Remove the parent connection from the HTTP/2 registry so no new streams are opened on it + Channel parentChannel = (channel instanceof Http2StreamChannel) + ? ((Http2StreamChannel) channel).parent() + : channel; + + // Mark the connection as draining and remove from registry + Http2ConnectionState state = parentChannel.attr(Http2ConnectionState.HTTP2_STATE_KEY).get(); + if (state != null) { + state.setDraining(lastStreamId); + Object partitionKey = state.getPartitionKey(); + if (partitionKey != null) { + channelManager.removeHttp2Connection(partitionKey, parentChannel); + } + } + + // Check if this stream's ID is within the allowed range + if (channel instanceof Http2StreamChannel) { + int streamId = ((Http2StreamChannel) channel).stream().id(); + if (streamId <= lastStreamId) { + // This stream is allowed to complete — don't fail it + return; + } + } + + readFailed(channel, future, new IOException("HTTP/2 connection GOAWAY received, error code: " + errorCode + + ", lastStreamId: " + lastStreamId)); + } + + /** + * Overrides the base {@link AsyncHttpClientHandler#finishUpdate} to correctly handle HTTP/2 + * connection pooling. HTTP/2 stream channels are single-use — after the stream completes, + * it must be closed. The reusable resource is the parent TCP connection channel, which is + * offered back to the pool so future requests can open new streams on the same connection. + * + * @param future the completed request future + * @param streamChannel the stream child channel (single-use, will be closed) + * @param close if {@code true}, close the parent connection entirely rather than pooling it + */ + @Override + void finishUpdate(NettyResponseFuture future, Channel streamChannel, boolean close) { + future.cancelTimeouts(); + + // Stream channels are single-use in HTTP/2 — close the stream + streamChannel.close(); + + // The parent HTTP/2 connection stays in the HTTP/2 registry (not the regular pool) + // to allow concurrent multiplexed requests. We only need to release the stream count. + Channel parentChannel = (streamChannel instanceof Http2StreamChannel) + ? ((Http2StreamChannel) streamChannel).parent() + : null; + + if (parentChannel != null) { + Http2ConnectionState state = parentChannel.attr(Http2ConnectionState.HTTP2_STATE_KEY).get(); + if (state != null) { + state.releaseStream(); + + // If connection is draining and no more active streams, close it + if (state.isDraining() && state.getActiveStreams() <= 0) { + channelManager.closeChannel(parentChannel); + } + } + + // Fire onConnectionOffer to maintain event lifecycle contract + try { + future.getAsyncHandler().onConnectionOffer(parentChannel); + } catch (Exception e) { + logger.error("onConnectionOffer crashed", e); + } + } + + // If close was requested, close the parent connection entirely + if (close && parentChannel != null) { + channelManager.closeChannel(parentChannel); + } + + try { + future.done(); + } catch (Exception t) { + logger.debug(t.getMessage(), t); + } + } + + private void readFailed(Channel channel, NettyResponseFuture future, Throwable t) { + try { + requestSender.abort(channel, future, t); + } catch (Exception abortException) { + logger.debug("Abort failed", abortException); + } finally { + finishUpdate(future, channel, true); + } + } + + @Override + public void handleException(NettyResponseFuture future, Throwable error) { + } + + @Override + public void handleChannelInactive(NettyResponseFuture future) { + } +} diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/Http2PingHandler.java b/client/src/main/java/org/asynchttpclient/netty/handler/Http2PingHandler.java new file mode 100644 index 0000000000..e33f5e0ac2 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/netty/handler/Http2PingHandler.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2014-2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.asynchttpclient.netty.handler; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.DefaultHttp2PingFrame; +import io.netty.handler.codec.http2.Http2PingFrame; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Sends HTTP/2 PING frames when the connection is idle and closes the connection + * if no PING ACK is received within the timeout period. + */ +public class Http2PingHandler extends ChannelDuplexHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(Http2PingHandler.class); + private static final long PING_ACK_TIMEOUT_MS = 5000; + + private boolean waitingForPingAck; + private ScheduledFuture pingAckTimeoutFuture; + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent idleEvent = (IdleStateEvent) evt; + if (idleEvent.state() == IdleState.ALL_IDLE && !waitingForPingAck) { + waitingForPingAck = true; + LOGGER.debug("Sending HTTP/2 PING on idle connection {}", ctx.channel()); + ctx.writeAndFlush(new DefaultHttp2PingFrame(System.nanoTime(), false)); + + pingAckTimeoutFuture = ctx.executor().schedule(() -> { + if (waitingForPingAck) { + LOGGER.debug("PING ACK timeout on connection {}, closing", ctx.channel()); + ctx.close(); + } + }, PING_ACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } + } + super.userEventTriggered(ctx, evt); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Http2PingFrame) { + Http2PingFrame pingFrame = (Http2PingFrame) msg; + if (pingFrame.ack()) { + waitingForPingAck = false; + if (pingAckTimeoutFuture != null) { + pingAckTimeoutFuture.cancel(false); + pingAckTimeoutFuture = null; + } + LOGGER.debug("Received PING ACK on connection {}", ctx.channel()); + return; // consume the PING ACK + } + } + super.channelRead(ctx, msg); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + if (pingAckTimeoutFuture != null) { + pingAckTimeoutFuture.cancel(false); + pingAckTimeoutFuture = null; + } + } +} diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/ConnectSuccessInterceptor.java b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/ConnectSuccessInterceptor.java index bf64e59099..696da92720 100644 --- a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/ConnectSuccessInterceptor.java +++ b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/ConnectSuccessInterceptor.java @@ -16,6 +16,7 @@ package org.asynchttpclient.netty.handler.intercept; import io.netty.channel.Channel; +import io.netty.handler.codec.http2.Http2StreamChannel; import io.netty.util.concurrent.Future; import org.asynchttpclient.Request; import org.asynchttpclient.netty.NettyResponseFuture; @@ -40,6 +41,12 @@ public class ConnectSuccessInterceptor { } public boolean exitAfterHandlingConnect(Channel channel, NettyResponseFuture future, Request request, ProxyServer proxyServer) { + // CONNECT tunneling is an HTTP/1.1 concept — it should never occur on HTTP/2 stream channels. + if (channel instanceof Http2StreamChannel) { + LOGGER.warn("CONNECT success on HTTP/2 stream channel is unexpected — ignoring"); + return false; + } + if (future.isKeepAlive()) { future.attachChannel(channel, true); } diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Continue100Interceptor.java b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Continue100Interceptor.java index aadd7f980a..5c39688587 100644 --- a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Continue100Interceptor.java +++ b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Continue100Interceptor.java @@ -16,6 +16,7 @@ package org.asynchttpclient.netty.handler.intercept; import io.netty.channel.Channel; +import io.netty.handler.codec.http2.Http2StreamChannel; import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.OnLastHttpContentCallback; import org.asynchttpclient.netty.channel.Channels; @@ -32,14 +33,21 @@ class Continue100Interceptor { public boolean exitAfterHandling100(final Channel channel, final NettyResponseFuture future) { future.setHeadersAlreadyWrittenOnContinue(true); future.setDontWriteBodyBecauseExpectContinue(false); - // directly send the body - Channels.setAttribute(channel, new OnLastHttpContentCallback(future) { - @Override - public void call() { - Channels.setAttribute(channel, future); - requestSender.writeRequest(future, channel); - } - }); + + if (channel instanceof Http2StreamChannel) { + // HTTP/2 stream channels don't produce LastHttpContent. + // Directly write the body on the stream channel. + requestSender.writeRequest(future, channel); + } else { + // HTTP/1.1: wait for LastHttpContent before sending the body + Channels.setAttribute(channel, new OnLastHttpContentCallback(future) { + @Override + public void call() { + Channels.setAttribute(channel, future); + requestSender.writeRequest(future, channel); + } + }); + } return true; } } diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/ProxyUnauthorized407Interceptor.java b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/ProxyUnauthorized407Interceptor.java index b30f6bbd94..89c6c7ec0c 100644 --- a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/ProxyUnauthorized407Interceptor.java +++ b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/ProxyUnauthorized407Interceptor.java @@ -29,6 +29,7 @@ import org.asynchttpclient.netty.channel.ChannelManager; import org.asynchttpclient.netty.channel.ChannelState; import org.asynchttpclient.netty.request.NettyRequestSender; +import io.netty.handler.codec.http2.Http2StreamChannel; import org.asynchttpclient.ntlm.NtlmEngine; import org.asynchttpclient.proxy.ProxyServer; import org.asynchttpclient.spnego.SpnegoEngine; @@ -170,7 +171,11 @@ public boolean exitAfterHandling407(Channel channel, NettyResponseFuture futu final Request nextRequest = nextRequestBuilder.build(); LOGGER.debug("Sending proxy authentication to {}", request.getUri()); - if (future.isKeepAlive() + if (channel instanceof Http2StreamChannel) { + // HTTP/2 stream channels are single-use — close the stream and send the auth retry. + channel.close(); + requestSender.sendNextRequest(nextRequest, future); + } else if (future.isKeepAlive() && !HttpUtil.isTransferEncodingChunked(httpRequest) && !HttpUtil.isTransferEncodingChunked(response)) { future.setConnectAllowed(true); diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java index 40628a7e51..01bbb265b8 100644 --- a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java +++ b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Redirect30xInterceptor.java @@ -30,6 +30,7 @@ import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.channel.ChannelManager; import org.asynchttpclient.netty.request.NettyRequestSender; +import io.netty.handler.codec.http2.Http2StreamChannel; import org.asynchttpclient.uri.Uri; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,7 +160,12 @@ public boolean exitAfterHandlingRedirect(Channel channel, NettyResponseFuture LOGGER.debug("Sending redirect to {}", newUri); - if (future.isKeepAlive() && !HttpUtil.isTransferEncodingChunked(response)) { + if (channel instanceof Http2StreamChannel) { + // HTTP/2 stream channels are single-use and close immediately after the response. + // No draining needed — just close the stream and send the next request. + channel.close(); + requestSender.sendNextRequest(nextRequest, future); + } else if (future.isKeepAlive() && !HttpUtil.isTransferEncodingChunked(response)) { if (sameBase) { future.setReuseChannel(true); // we can't directly send the next request because we still have to received LastContent diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Unauthorized401Interceptor.java b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Unauthorized401Interceptor.java index cb89f70b83..4b12460ac8 100644 --- a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Unauthorized401Interceptor.java +++ b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Unauthorized401Interceptor.java @@ -28,6 +28,7 @@ import org.asynchttpclient.netty.channel.ChannelManager; import org.asynchttpclient.netty.channel.ChannelState; import org.asynchttpclient.netty.request.NettyRequestSender; +import io.netty.handler.codec.http2.Http2StreamChannel; import org.asynchttpclient.ntlm.NtlmEngine; import org.asynchttpclient.spnego.SpnegoEngine; import org.asynchttpclient.spnego.SpnegoEngineException; @@ -162,7 +163,11 @@ public boolean exitAfterHandling401(Channel channel, NettyResponseFuture futu final Request nextRequest = future.getCurrentRequest().toBuilder().setHeaders(requestHeaders).build(); LOGGER.debug("Sending authentication to {}", request.getUri()); - if (future.isKeepAlive() && !HttpUtil.isTransferEncodingChunked(httpRequest) && !HttpUtil.isTransferEncodingChunked(response)) { + if (channel instanceof Http2StreamChannel) { + // HTTP/2 stream channels are single-use — close the stream and send the auth retry. + channel.close(); + requestSender.sendNextRequest(nextRequest, future); + } else if (future.isKeepAlive() && !HttpUtil.isTransferEncodingChunked(httpRequest) && !HttpUtil.isTransferEncodingChunked(response)) { future.setReuseChannel(true); requestSender.drainChannelAndExecuteNextRequest(channel, future, nextRequest); } else { diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java index c929d35e27..b60fd8af45 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java @@ -16,15 +16,25 @@ package org.asynchttpclient.netty.request; import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelProgressivePromise; import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2StreamChannel; +import io.netty.handler.codec.http2.Http2StreamChannelBootstrap; +import io.netty.util.ReferenceCountUtil; import io.netty.util.Timer; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.ImmediateEventExecutor; @@ -49,9 +59,13 @@ import org.asynchttpclient.netty.channel.ChannelState; import org.asynchttpclient.netty.channel.Channels; import org.asynchttpclient.netty.channel.ConnectionSemaphore; +import org.asynchttpclient.netty.channel.Http2ConnectionState; import org.asynchttpclient.netty.channel.DefaultConnectionSemaphoreFactory; import org.asynchttpclient.netty.channel.NettyChannelConnector; import org.asynchttpclient.netty.channel.NettyConnectListener; +import org.asynchttpclient.netty.handler.Http2ContentDecompressor; +import org.asynchttpclient.netty.request.body.NettyBody; +import org.asynchttpclient.netty.request.body.NettyDirectBody; import org.asynchttpclient.netty.timeout.TimeoutsHolder; import org.asynchttpclient.proxy.ProxyServer; import org.asynchttpclient.proxy.ProxyType; @@ -65,14 +79,17 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.List; +import java.util.Set; import static io.netty.handler.codec.http.HttpHeaderNames.EXPECT; import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; +import static java.util.Set.of; import static org.asynchttpclient.util.AuthenticatorUtils.perConnectionAuthorizationHeader; import static org.asynchttpclient.util.AuthenticatorUtils.perConnectionProxyAuthorizationHeader; import static org.asynchttpclient.util.HttpConstants.Methods.CONNECT; import static org.asynchttpclient.util.HttpConstants.Methods.GET; +import static org.asynchttpclient.util.HttpUtils.hostHeader; import static org.asynchttpclient.util.MiscUtils.getCause; import static org.asynchttpclient.util.ProxyUtils.getProxyServer; @@ -298,7 +315,19 @@ private ListenableFuture sendRequestWithNewChannel(Request request, Proxy // Do not throw an exception when we need an extra connection for a // redirect. - future.acquirePartitionLockLazily(); + try { + future.acquirePartitionLockLazily(); + } catch (IOException semaphoreException) { + // If HTTP/2 is enabled, another thread may be establishing an H2 connection. + // Poll the H2 registry with brief retries before giving up. + if (config.isHttp2Enabled()) { + Channel h2Channel = waitForHttp2Connection(request, proxy); + if (h2Channel != null) { + return sendRequestWithOpenChannel(future, asyncHandler, h2Channel); + } + } + throw semaphoreException; + } } catch (Throwable t) { abort(null, future, getCause(t)); // exit and don't try to resolve address @@ -375,18 +404,31 @@ private NettyResponseFuture newNettyResponseFuture(Request request, Async return future; } - public void writeRequest(NettyResponseFuture future, Channel channel) { - NettyRequest nettyRequest = future.getNettyRequest(); - HttpRequest httpRequest = nettyRequest.getHttpRequest(); - AsyncHandler asyncHandler = future.getAsyncHandler(); + /** + * HTTP/2 connection-specific headers that must NOT be forwarded as per RFC 7540 §8.1.2.2. + * These are HTTP/1.1 connection-specific headers that have no meaning in HTTP/2. + */ + private static final Set HTTP2_EXCLUDED_HEADERS = of( + "connection", "keep-alive", "proxy-connection", "transfer-encoding", "upgrade", "host" + ); - // if the channel is dead because it was pooled and the remote server decided to - // close it, + public void writeRequest(NettyResponseFuture future, Channel channel) { + // if the channel is dead because it was pooled and the remote server decided to close it, // we just let it go and the channelInactive do its work if (!Channels.isChannelActive(channel)) { return; } + // Route to HTTP/2 path if the parent channel has the HTTP/2 multiplex handler installed + if (ChannelManager.isHttp2(channel)) { + writeHttp2Request(future, channel); + return; + } + + NettyRequest nettyRequest = future.getNettyRequest(); + HttpRequest httpRequest = nettyRequest.getHttpRequest(); + AsyncHandler asyncHandler = future.getAsyncHandler(); + try { if (asyncHandler instanceof TransferCompletionHandler) { configureTransferAdapter(asyncHandler, httpRequest); @@ -431,6 +473,153 @@ public void writeRequest(NettyResponseFuture future, Channel channel) { } } + /** + * Opens a new HTTP/2 stream child channel on the given parent connection channel and writes the request + * as HTTP/2 frames ({@link DefaultHttp2HeadersFrame} + optional {@link DefaultHttp2DataFrame}). + * The stream child channel has the {@link org.asynchttpclient.netty.handler.Http2Handler} installed + * and the {@link NettyResponseFuture} attached to it, mirroring the HTTP/1.1 channel model. + */ + private void writeHttp2Request(NettyResponseFuture future, Channel parentChannel) { + Http2ConnectionState state = parentChannel.attr(Http2ConnectionState.HTTP2_STATE_KEY).get(); + Runnable openStream = () -> openHttp2Stream(future, parentChannel); + + if (state != null && !state.tryAcquireStream()) { + if (state.isDraining()) { + // Connection is draining from GOAWAY — fail the future so it retries on a new connection. + // Don't close the parent channel since it may still have active streams. + future.abort(new java.io.IOException("HTTP/2 connection is draining (GOAWAY received)")); + return; + } + // Queue for later when a stream slot opens up + state.addPendingOpener(openStream); + return; + } + openStream.run(); + } + + private void openHttp2Stream(NettyResponseFuture future, Channel parentChannel) { + new Http2StreamChannelBootstrap(parentChannel) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(Http2StreamChannel streamCh) { + if (config.isEnableAutomaticDecompression()) { + streamCh.pipeline().addLast("http2-decompressor", + new Http2ContentDecompressor(config.isKeepEncodingHeader())); + } + streamCh.pipeline().addLast(channelManager.getHttp2Handler()); + } + }) + .open() + .addListener((Future f) -> { + if (f.isSuccess()) { + Http2StreamChannel streamChannel = f.getNow(); + channelManager.registerOpenChannel(streamChannel); + Channels.setAttribute(streamChannel, future); + future.attachChannel(streamChannel, false); + try { + AsyncHandler asyncHandler = future.getAsyncHandler(); + try { + asyncHandler.onRequestSend(future.getNettyRequest()); + } catch (Exception e) { + LOGGER.error("onRequestSend crashed", e); + abort(streamChannel, future, e); + return; + } + + if (asyncHandler instanceof TransferCompletionHandler) { + configureTransferAdapter(asyncHandler, future.getNettyRequest().getHttpRequest()); + } + + sendHttp2Frames(future, streamChannel); + scheduleReadTimeout(future); + } catch (Exception e) { + LOGGER.error("Can't write HTTP/2 request", e); + abort(streamChannel, future, e); + } + } else { + abort(parentChannel, future, f.cause()); + } + }); + } + + /** + * Builds and writes HTTP/2 frames for the given request on the stream child channel. + *

+ * Manually assembles {@link DefaultHttp2Headers} with HTTP/2 pseudo-headers (:method, :path, + * :scheme, :authority) plus all regular request headers, then writes them as a + * {@link DefaultHttp2HeadersFrame}. If the request has a body, writes it as a + * {@link DefaultHttp2DataFrame} with {@code endStream=true}. + *

+ * Currently supports in-memory bodies ({@link DefaultFullHttpRequest} content and + * {@link org.asynchttpclient.netty.request.body.NettyDirectBody}). Streaming bodies + * (file uploads, input streams) are not yet supported over HTTP/2. + */ + private void sendHttp2Frames(NettyResponseFuture future, Http2StreamChannel streamChannel) throws IOException { + NettyRequest nettyRequest = future.getNettyRequest(); + HttpRequest httpRequest = nettyRequest.getHttpRequest(); + Uri uri = future.getUri(); + + try { + // Build HTTP/2 pseudo-headers + regular headers + Http2Headers h2Headers = new DefaultHttp2Headers() + .method(httpRequest.method().name()) + .path(uri.getNonEmptyPath() + (uri.getQuery() != null ? "?" + uri.getQuery() : "")) + .scheme(uri.getScheme()) + .authority(hostHeader(uri)); + + // Copy HTTP/1.1 headers, skipping connection-specific ones that are forbidden in HTTP/2. + // RFC 7540 §8.1.2 requires all header field names to be lowercase in HTTP/2. + httpRequest.headers().forEach(entry -> { + String name = entry.getKey().toLowerCase(); + if (!HTTP2_EXCLUDED_HEADERS.contains(name)) { + h2Headers.add(name, entry.getValue()); + } + }); + + // Determine if we have a body to write. + // Support both DefaultFullHttpRequest (inline content) and NettyDirectBody (byte array/buffer bodies). + ByteBuf bodyBuf = null; + if (httpRequest instanceof DefaultFullHttpRequest) { + ByteBuf content = ((DefaultFullHttpRequest) httpRequest).content(); + if (content != null && content.isReadable()) { + bodyBuf = content; + } + } + + NettyBody nettyBody = nettyRequest.getBody(); + if (bodyBuf == null && nettyBody != null) { + if (nettyBody instanceof NettyDirectBody) { + ByteBuf directBuf = ((NettyDirectBody) nettyBody).byteBuf(); + if (directBuf != null && directBuf.isReadable()) { + bodyBuf = directBuf; + } + } + } + + // Determine if we have a streaming body that needs writeHttp2() + boolean hasStreamingBody = bodyBuf == null && nettyBody != null && !(nettyBody instanceof NettyDirectBody); + boolean hasBody = bodyBuf != null || hasStreamingBody; + + // Write HEADERS frame (endStream=true when there is no body) + streamChannel.write(new DefaultHttp2HeadersFrame(h2Headers, !hasBody)); + + if (hasStreamingBody) { + streamChannel.flush(); + nettyBody.writeHttp2(streamChannel, future); + } else if (bodyBuf != null) { + // Write DATA frame with endStream=true — body is sent as a single frame + streamChannel.write(new DefaultHttp2DataFrame(bodyBuf.retainedDuplicate(), true)); + streamChannel.flush(); + } else { + streamChannel.flush(); + } + } finally { + // Release the original HTTP/1.1 request — in the HTTP/2 path it is not written to the channel, + // so we must release it manually to avoid leaking its content ByteBuf. + ReferenceCountUtil.release(httpRequest); + } + } + private static void configureTransferAdapter(AsyncHandler handler, HttpRequest httpRequest) { HttpHeaders h = new DefaultHttpHeaders().set(httpRequest.headers()); ((TransferCompletionHandler) handler).headers(h); @@ -554,6 +743,31 @@ private static void validateWebSocketRequest(Request request, AsyncHandler as } } + /** + * Waits briefly for an HTTP/2 connection to appear in the registry. + * Used when the semaphore blocks a new connection but another thread is establishing + * an HTTP/2 connection that this request can multiplex onto. + */ + private Channel waitForHttp2Connection(Request request, ProxyServer proxy) { + Uri uri = request.getUri(); + String virtualHost = request.getVirtualHost(); + long deadline = System.nanoTime() + config.getConnectTimeout().toNanos(); + + while (System.nanoTime() < deadline) { + Channel h2Channel = channelManager.pollHttp2(uri, virtualHost, proxy, request.getChannelPoolPartitioning()); + if (h2Channel != null) { + return h2Channel; + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + } + return null; + } + private Channel pollPooledChannel(Request request, ProxyServer proxy, AsyncHandler asyncHandler) { try { asyncHandler.onConnectionPoolAttempt(); @@ -563,6 +777,15 @@ private Channel pollPooledChannel(Request request, ProxyServer proxy, AsyncHandl Uri uri = request.getUri(); String virtualHost = request.getVirtualHost(); + + // Check HTTP/2 connection registry first — these connections support multiplexing + // and are not removed from the registry on poll (unlike the regular pool) + Channel h2Channel = channelManager.pollHttp2(uri, virtualHost, proxy, request.getChannelPoolPartitioning()); + if (h2Channel != null) { + LOGGER.debug("Using HTTP/2 multiplexed Channel '{}' for '{}' to '{}'", h2Channel, request.getMethod(), uri); + return h2Channel; + } + final Channel channel = channelManager.poll(uri, virtualHost, proxy, request.getChannelPoolPartitioning()); if (channel != null) { diff --git a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBody.java b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBody.java index f38ef3939d..1d9c50e7ef 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBody.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBody.java @@ -16,6 +16,7 @@ package org.asynchttpclient.netty.request.body; import io.netty.channel.Channel; +import io.netty.handler.codec.http2.Http2StreamChannel; import org.asynchttpclient.netty.NettyResponseFuture; import java.io.IOException; @@ -29,4 +30,10 @@ default CharSequence getContentTypeOverride() { } void write(Channel channel, NettyResponseFuture future) throws IOException; + + default void writeHttp2(Http2StreamChannel channel, NettyResponseFuture future) throws IOException { + throw new UnsupportedOperationException( + "Streaming request bodies (" + getClass().getSimpleName() + + ") are not yet supported over HTTP/2. Use an in-memory body or disable HTTP/2."); + } } diff --git a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java index efe337bfe8..b4145ef077 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java @@ -15,20 +15,26 @@ */ package org.asynchttpclient.netty.request.body; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelProgressiveFuture; import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.Http2StreamChannel; import io.netty.handler.stream.ChunkedWriteHandler; import org.asynchttpclient.AsyncHttpClientConfig; import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.channel.ChannelManager; import org.asynchttpclient.netty.request.WriteProgressListener; import org.asynchttpclient.request.body.Body; +import org.asynchttpclient.request.body.Body.BodyState; import org.asynchttpclient.request.body.RandomAccessBody; import org.asynchttpclient.request.body.generator.BodyGenerator; import org.asynchttpclient.request.body.generator.FeedListener; import org.asynchttpclient.request.body.generator.FeedableBodyGenerator; +import java.io.IOException; + import static org.asynchttpclient.util.MiscUtils.closeSilently; public class NettyBodyBody implements NettyBody { @@ -86,4 +92,35 @@ public void operationComplete(ChannelProgressiveFuture cf) { }); channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, channel.voidPromise()); } + + @Override + public void writeHttp2(Http2StreamChannel channel, NettyResponseFuture future) throws IOException { + try { + ByteBuf buf = channel.alloc().buffer(8192); + ByteBuf pending = null; + while (true) { + buf.clear(); + BodyState state = body.transferTo(buf); + if (buf.isReadable()) { + if (pending != null) { + channel.write(new DefaultHttp2DataFrame(pending, false)); + } + pending = buf; + buf = channel.alloc().buffer(8192); + } + if (state == BodyState.STOP) { + break; + } + } + buf.release(); + if (pending != null) { + channel.write(new DefaultHttp2DataFrame(pending, true)); + } else { + channel.write(new DefaultHttp2DataFrame(channel.alloc().buffer(0), true)); + } + channel.flush(); + } finally { + closeSilently(body); + } + } } diff --git a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyFileBody.java b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyFileBody.java index a3c40322dc..01fb5d175a 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyFileBody.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyFileBody.java @@ -15,9 +15,12 @@ */ package org.asynchttpclient.netty.request.body; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.DefaultFileRegion; import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.Http2StreamChannel; import io.netty.handler.stream.ChunkedNioFile; import org.asynchttpclient.AsyncHttpClientConfig; import org.asynchttpclient.netty.NettyResponseFuture; @@ -27,6 +30,7 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; import java.nio.channels.FileChannel; public class NettyFileBody implements NettyBody { @@ -71,4 +75,28 @@ public void write(Channel channel, NettyResponseFuture future) throws IOExcep .addListener(new WriteProgressListener(future, false, length)); channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, channel.voidPromise()); } + + @Override + public void writeHttp2(Http2StreamChannel channel, NettyResponseFuture future) throws IOException { + int chunkSize = config.getChunkedFileChunkSize(); + try (RandomAccessFile raf = new RandomAccessFile(file, "r"); + FileChannel fileChannel = raf.getChannel()) { + long remaining = length; + long pos = offset; + while (remaining > 0) { + int toRead = (int) Math.min(chunkSize, remaining); + ByteBuf buf = channel.alloc().buffer(toRead); + int read = buf.writeBytes(fileChannel, pos, toRead); + if (read <= 0) { + buf.release(); + break; + } + remaining -= read; + pos += read; + boolean last = remaining <= 0; + channel.write(new DefaultHttp2DataFrame(buf, last)); + } + channel.flush(); + } + } } diff --git a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyInputStreamBody.java b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyInputStreamBody.java index 4dba9d951a..61517b6516 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyInputStreamBody.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyInputStreamBody.java @@ -15,9 +15,12 @@ */ package org.asynchttpclient.netty.request.body; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelProgressiveFuture; import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.Http2StreamChannel; import io.netty.handler.stream.ChunkedStream; import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.request.WriteProgressListener; @@ -79,4 +82,42 @@ public void operationComplete(ChannelProgressiveFuture cf) { }); channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, channel.voidPromise()); } + + @Override + public void writeHttp2(Http2StreamChannel channel, NettyResponseFuture future) throws IOException { + final InputStream is = inputStream; + + if (future.isStreamConsumed()) { + if (is.markSupported()) { + is.reset(); + } else { + LOGGER.warn("Stream has already been consumed and cannot be reset"); + return; + } + } else { + future.setStreamConsumed(true); + } + + try { + // Read all data into chunks, then send with last frame having endStream=true + byte[] buffer = new byte[8192]; + ByteBuf pending = null; + int read; + while ((read = is.read(buffer)) != -1) { + if (pending != null) { + channel.write(new DefaultHttp2DataFrame(pending, false)); + } + pending = channel.alloc().buffer(read); + pending.writeBytes(buffer, 0, read); + } + if (pending != null) { + channel.write(new DefaultHttp2DataFrame(pending, true)); + } else { + channel.write(new DefaultHttp2DataFrame(channel.alloc().buffer(0), true)); + } + channel.flush(); + } finally { + closeSilently(is); + } + } } diff --git a/client/src/main/java/org/asynchttpclient/netty/ssl/DefaultSslEngineFactory.java b/client/src/main/java/org/asynchttpclient/netty/ssl/DefaultSslEngineFactory.java index 323b75d5d2..da0feae8cd 100644 --- a/client/src/main/java/org/asynchttpclient/netty/ssl/DefaultSslEngineFactory.java +++ b/client/src/main/java/org/asynchttpclient/netty/ssl/DefaultSslEngineFactory.java @@ -16,6 +16,8 @@ package org.asynchttpclient.netty.ssl; import io.netty.buffer.ByteBufAllocator; +import io.netty.handler.ssl.ApplicationProtocolConfig; +import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.IdentityCipherSuiteFilter; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; @@ -61,6 +63,15 @@ private SslContext buildSslContext(AsyncHttpClientConfig config) throws SSLExcep sslContextBuilder.endpointIdentificationAlgorithm( config.isDisableHttpsEndpointIdentificationAlgorithm() ? "" : "HTTPS"); + if (config.isHttp2Enabled()) { + sslContextBuilder.applicationProtocolConfig(new ApplicationProtocolConfig( + ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2, + ApplicationProtocolNames.HTTP_1_1)); + } + return configureSslContextBuilder(sslContextBuilder).build(); } diff --git a/client/src/main/resources/org/asynchttpclient/config/ahc-default.properties b/client/src/main/resources/org/asynchttpclient/config/ahc-default.properties index f74127c23d..9a9e06c994 100644 --- a/client/src/main/resources/org/asynchttpclient/config/ahc-default.properties +++ b/client/src/main/resources/org/asynchttpclient/config/ahc-default.properties @@ -55,3 +55,10 @@ org.asynchttpclient.ioThreadsCount=-1 org.asynchttpclient.hashedWheelTimerTickDuration=100 org.asynchttpclient.hashedWheelTimerSize=512 org.asynchttpclient.expiredCookieEvictionDelay=30000 +org.asynchttpclient.http2InitialWindowSize=65535 +org.asynchttpclient.http2MaxFrameSize=16384 +org.asynchttpclient.http2HeaderTableSize=4096 +org.asynchttpclient.http2MaxHeaderListSize=8192 +org.asynchttpclient.http2MaxConcurrentStreams=-1 +org.asynchttpclient.http2PingInterval=PT0S +org.asynchttpclient.http2CleartextEnabled=false diff --git a/client/src/test/java/org/asynchttpclient/BasicHttp2Test.java b/client/src/test/java/org/asynchttpclient/BasicHttp2Test.java new file mode 100644 index 0000000000..b3daaa9589 --- /dev/null +++ b/client/src/test/java/org/asynchttpclient/BasicHttp2Test.java @@ -0,0 +1,1202 @@ +/* + * Copyright (c) 2014-2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.asynchttpclient; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.QueryStringDecoder; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.DefaultHttp2ResetFrame; +import io.netty.handler.codec.http2.Http2DataFrame; +import io.netty.handler.codec.http2.Http2Error; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.util.ReferenceCountUtil; +import io.netty.handler.codec.http2.Http2Headers; +import io.netty.handler.codec.http2.Http2HeadersFrame; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2StreamChannel; +import io.netty.handler.ssl.ApplicationProtocolConfig; +import io.netty.handler.ssl.ApplicationProtocolNames; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.SelfSignedCertificate; +import io.netty.util.concurrent.GlobalEventExecutor; +import org.asynchttpclient.test.EventCollectingHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.asynchttpclient.Dsl.asyncHttpClient; +import static org.asynchttpclient.Dsl.config; +import static org.asynchttpclient.test.TestUtils.AsyncCompletionHandlerAdapter; +import static org.asynchttpclient.util.DateUtils.unpreciseMillisTime; +import static org.asynchttpclient.util.ThrowableUtil.unknownStackTrace; +import static org.junit.jupiter.api.Assertions.*; + +/** + * Integration tests for HTTP/2 support using a self-contained Netty-based HTTP/2 test server. + *

+ * The embedded server uses {@link Http2FrameCodecBuilder} and {@link Http2MultiplexHandler} on + * the server side, and tests verify that the client correctly: + *

    + *
  • Negotiates HTTP/2 via ALPN
  • + *
  • Sends requests as HTTP/2 frames ({@link Http2HeadersFrame} + {@link Http2DataFrame})
  • + *
  • Receives responses and delivers them via the normal {@link AsyncHandler} callback sequence
  • + *
  • Correctly multiplexes concurrent requests over a single connection
  • + *
  • Falls back to HTTP/1.1 when HTTP/2 is disabled
  • + *
+ */ +public class BasicHttp2Test { + + // Event constants (from HttpTest/EventCollectingHandler) + private static final String COMPLETED_EVENT = "Completed"; + private static final String STATUS_RECEIVED_EVENT = "StatusReceived"; + private static final String HEADERS_RECEIVED_EVENT = "HeadersReceived"; + private static final String HEADERS_WRITTEN_EVENT = "HeadersWritten"; + private static final String CONNECTION_OPEN_EVENT = "ConnectionOpen"; + private static final String HOSTNAME_RESOLUTION_EVENT = "HostnameResolution"; + private static final String HOSTNAME_RESOLUTION_SUCCESS_EVENT = "HostnameResolutionSuccess"; + private static final String CONNECTION_SUCCESS_EVENT = "ConnectionSuccess"; + private static final String TLS_HANDSHAKE_EVENT = "TlsHandshake"; + private static final String TLS_HANDSHAKE_SUCCESS_EVENT = "TlsHandshakeSuccess"; + private static final String CONNECTION_POOL_EVENT = "ConnectionPool"; + private static final String CONNECTION_OFFER_EVENT = "ConnectionOffer"; + private static final String REQUEST_SEND_EVENT = "RequestSend"; + + private NioEventLoopGroup serverGroup; + private Channel serverChannel; + private ChannelGroup serverChildChannels; + private SslContext serverSslCtx; + private int serverPort; + + /** + * Path-routing HTTP/2 server handler that supports multiple test scenarios. + */ + private static final class Http2TestServerHandler extends SimpleChannelInboundHandler { + private Http2Headers requestHeaders; + private final List bodyChunks = new ArrayList<>(); + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof Http2HeadersFrame) { + Http2HeadersFrame headersFrame = (Http2HeadersFrame) msg; + this.requestHeaders = headersFrame.headers(); + if (headersFrame.isEndStream()) { + routeRequest(ctx, Unpooled.EMPTY_BUFFER); + } + } else if (msg instanceof Http2DataFrame) { + Http2DataFrame dataFrame = (Http2DataFrame) msg; + bodyChunks.add(dataFrame.content().retain()); + if (dataFrame.isEndStream()) { + int totalBytes = bodyChunks.stream().mapToInt(ByteBuf::readableBytes).sum(); + ByteBuf combined = ctx.alloc().buffer(totalBytes); + bodyChunks.forEach(chunk -> { + combined.writeBytes(chunk); + chunk.release(); + }); + bodyChunks.clear(); + routeRequest(ctx, combined); + } + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + releaseBodyChunks(); + super.channelInactive(ctx); + } + + private void releaseBodyChunks() { + for (ByteBuf chunk : bodyChunks) { + if (chunk.refCnt() > 0) { + chunk.release(); + } + } + bodyChunks.clear(); + } + + private void routeRequest(ChannelHandlerContext ctx, ByteBuf body) { + String path = requestHeaders.path() != null ? requestHeaders.path().toString() : "/"; + String method = requestHeaders.method() != null ? requestHeaders.method().toString() : "GET"; + + // Strip query string for routing + String queryString = null; + int qIdx = path.indexOf('?'); + String routePath = path; + if (qIdx >= 0) { + queryString = path.substring(qIdx + 1); + routePath = path.substring(0, qIdx); + } + + if (routePath.equals("/ok")) { + ReferenceCountUtil.safeRelease(body); + sendSimpleResponse(ctx, "200", Unpooled.EMPTY_BUFFER, null); + } else if (routePath.startsWith("/status/")) { + String statusCode = routePath.substring("/status/".length()); + ReferenceCountUtil.safeRelease(body); + sendSimpleResponse(ctx, statusCode, Unpooled.EMPTY_BUFFER, null); + } else if (routePath.startsWith("/delay/")) { + long millis = Long.parseLong(routePath.substring("/delay/".length())); + ReferenceCountUtil.safeRelease(body); + ctx.executor().schedule(() -> { + if (ctx.channel().isActive()) { + sendSimpleResponse(ctx, "200", Unpooled.EMPTY_BUFFER, null); + } + }, millis, TimeUnit.MILLISECONDS); + } else if (routePath.startsWith("/redirect/")) { + int count = Integer.parseInt(routePath.substring("/redirect/".length())); + ReferenceCountUtil.safeRelease(body); + Http2Headers responseHeaders = new DefaultHttp2Headers().status("302"); + if (count > 0) { + responseHeaders.add("location", "/redirect/" + (count - 1)); + } else { + responseHeaders.status("200"); + } + ctx.write(new DefaultHttp2HeadersFrame(responseHeaders, true)); + ctx.flush(); + } else if (routePath.equals("/head")) { + ReferenceCountUtil.safeRelease(body); + Http2Headers responseHeaders = new DefaultHttp2Headers() + .status("200") + .add(HttpHeaderNames.CONTENT_LENGTH, "100"); + if ("HEAD".equalsIgnoreCase(method)) { + ctx.write(new DefaultHttp2HeadersFrame(responseHeaders, true)); + ctx.flush(); + } else { + sendSimpleResponse(ctx, "200", Unpooled.EMPTY_BUFFER, null); + } + } else if (routePath.equals("/options")) { + ReferenceCountUtil.safeRelease(body); + Http2Headers responseHeaders = new DefaultHttp2Headers() + .status("200") + .add("allow", "GET,HEAD,POST,OPTIONS,TRACE"); + ctx.write(new DefaultHttp2HeadersFrame(responseHeaders, true)); + ctx.flush(); + } else if (routePath.equals("/cookies")) { + ReferenceCountUtil.safeRelease(body); + Http2Headers responseHeaders = new DefaultHttp2Headers().status("200"); + CharSequence cookieHeader = requestHeaders.get("cookie"); + if (cookieHeader != null) { + String[] cookies = cookieHeader.toString().split(";\\s*"); + for (String cookie : cookies) { + responseHeaders.add("set-cookie", cookie.trim()); + } + } + ctx.write(new DefaultHttp2HeadersFrame(responseHeaders, true)); + ctx.flush(); + } else if (routePath.equals("/reset")) { + ReferenceCountUtil.safeRelease(body); + ctx.writeAndFlush(new DefaultHttp2ResetFrame(Http2Error.INTERNAL_ERROR)); + } else { + // Default: echo handler — takes ownership of body via writeResponse + sendEchoResponse(ctx, body, path, routePath, queryString, method); + } + } + + private void sendEchoResponse(ChannelHandlerContext ctx, ByteBuf body, String fullPath, + String routePath, String queryString, String method) { + Http2Headers responseHeaders = new DefaultHttp2Headers().status("200"); + + // Echo Content-Type + if (requestHeaders.get(CONTENT_TYPE) != null) { + responseHeaders.add(CONTENT_TYPE, requestHeaders.get(CONTENT_TYPE)); + } + + // Echo path info + responseHeaders.add("x-pathinfo", routePath); + + // Echo query string + if (queryString != null) { + responseHeaders.add("x-querystring", queryString); + } + + // Echo request headers as X-{name} + for (Map.Entry entry : requestHeaders) { + String name = entry.getKey().toString(); + // Skip pseudo-headers + if (!name.startsWith(":")) { + responseHeaders.add("x-" + name, entry.getValue()); + } + } + + // Handle OPTIONS + if ("OPTIONS".equalsIgnoreCase(method)) { + responseHeaders.add("allow", "GET,HEAD,POST,OPTIONS,TRACE"); + } + + // Parse form parameters from body if content-type is form-urlencoded + CharSequence contentType = requestHeaders.get(CONTENT_TYPE); + if (contentType != null && contentType.toString().contains("application/x-www-form-urlencoded") + && body.isReadable()) { + String bodyStr = body.toString(UTF_8); + QueryStringDecoder decoder = new QueryStringDecoder("?" + bodyStr); + for (Map.Entry> entry : decoder.parameters().entrySet()) { + String value = entry.getValue().get(0); + responseHeaders.add("x-" + entry.getKey(), + URLEncoder.encode(value, UTF_8)); + } + } + + // Handle cookies + CharSequence cookieHeader = requestHeaders.get("cookie"); + if (cookieHeader != null) { + String[] cookies = cookieHeader.toString().split(";\\s*"); + for (String cookie : cookies) { + responseHeaders.add("set-cookie", cookie.trim()); + } + } + + responseHeaders.add(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(body.readableBytes())); + writeResponse(ctx, responseHeaders, body); + } + + private void sendSimpleResponse(ChannelHandlerContext ctx, String status, ByteBuf body, + Map extraHeaders) { + Http2Headers responseHeaders = new DefaultHttp2Headers() + .status(status) + .add(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(body.readableBytes())); + if (extraHeaders != null) { + extraHeaders.forEach(responseHeaders::add); + } + writeResponse(ctx, responseHeaders, body); + } + + private void writeResponse(ChannelHandlerContext ctx, Http2Headers responseHeaders, ByteBuf body) { + boolean hasBody = body.isReadable(); + ctx.write(new DefaultHttp2HeadersFrame(responseHeaders, !hasBody)); + if (hasBody) { + ctx.writeAndFlush(new DefaultHttp2DataFrame(body, true)).addListener(f -> { + if (!f.isSuccess() && body.refCnt() > 0) { + body.release(); + } + }); + } else { + ctx.flush(); + ReferenceCountUtil.safeRelease(body); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ctx.close(); + } + } + + @BeforeEach + public void startServer() throws Exception { + SelfSignedCertificate ssc = new SelfSignedCertificate(); + + serverSslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) + .applicationProtocolConfig(new ApplicationProtocolConfig( + ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT, + ApplicationProtocolNames.HTTP_2, + ApplicationProtocolNames.HTTP_1_1)) + .build(); + + serverGroup = new NioEventLoopGroup(1); + serverChildChannels = new DefaultChannelGroup("http2-test-server", GlobalEventExecutor.INSTANCE); + + ServerBootstrap b = new ServerBootstrap() + .group(serverGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + serverChildChannels.add(ch); + ch.pipeline() + .addLast("ssl", serverSslCtx.newHandler(ch.alloc())) + .addLast(Http2FrameCodecBuilder.forServer().build()) + .addLast(new Http2MultiplexHandler(new ChannelInitializer() { + @Override + protected void initChannel(Http2StreamChannel streamCh) { + streamCh.pipeline().addLast(new Http2TestServerHandler()); + } + })); + } + }); + + serverChannel = b.bind(0).sync().channel(); + serverPort = ((java.net.InetSocketAddress) serverChannel.localAddress()).getPort(); + } + + @AfterEach + public void stopServer() throws InterruptedException { + if (serverChildChannels != null) { + serverChildChannels.close().sync(); + } + if (serverChannel != null) { + serverChannel.close().sync(); + } + if (serverGroup != null) { + serverGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS).sync(); + } + ReferenceCountUtil.release(serverSslCtx); + } + + private String httpsUrl(String path) { + return "https://localhost:" + serverPort + path; + } + + /** + * Creates an AHC client configured to trust self-signed certs (for testing) with HTTP/2 enabled. + */ + private AsyncHttpClient http2Client() { + return asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true)); + } + + /** + * Creates an AHC client with HTTP/2 disabled (forced HTTP/1.1 fallback). + */ + private AsyncHttpClient http1Client() { + return asyncHttpClient(config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(false)); + } + + /** + * Creates an AHC client with custom config + trust manager + HTTP/2. + */ + private AsyncHttpClient http2ClientWithConfig(Consumer customizer) { + DefaultAsyncHttpClientConfig.Builder builder = config() + .setUseInsecureTrustManager(true) + .setHttp2Enabled(true); + customizer.accept(builder); + return asyncHttpClient(builder); + } + + /** + * Creates an AHC client with a specific request timeout. + */ + private AsyncHttpClient http2ClientWithTimeout(int requestTimeoutMs) { + return http2ClientWithConfig(b -> b.setRequestTimeout(Duration.ofMillis(requestTimeoutMs))); + } + + /** + * Creates an AHC client configured for redirect tests. + */ + private AsyncHttpClient http2ClientWithRedirects(int maxRedirects) { + return http2ClientWithConfig(b -> b.setMaxRedirects(maxRedirects).setFollowRedirect(true)); + } + + // ------------------------------------------------------------------------- + // Existing test cases + // ------------------------------------------------------------------------- + + @Test + public void simpleGetOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Response response = client.prepareGet(httpsUrl("/hello")) + .execute() + .get(30, SECONDS); + + assertNotNull(response); + assertEquals(200, response.getStatusCode()); + } + } + + @Test + public void postStringBodyOverHttp2() throws Exception { + String body = "Hello HTTP/2 world!"; + try (AsyncHttpClient client = http2Client()) { + Response response = client.preparePost(httpsUrl("/echo")) + .setBody(body) + .setHeader(CONTENT_TYPE, "text/plain") + .execute() + .get(30, SECONDS); + + assertNotNull(response); + assertEquals(200, response.getStatusCode()); + assertEquals(body, response.getResponseBody()); + } + } + + @Test + public void postByteArrayBodyOverHttp2() throws Exception { + byte[] body = "Binary data over HTTP/2".getBytes(StandardCharsets.UTF_8); + try (AsyncHttpClient client = http2Client()) { + Response response = client.preparePost(httpsUrl("/echo")) + .setBody(body) + .setHeader(CONTENT_TYPE, "application/octet-stream") + .execute() + .get(30, SECONDS); + + assertNotNull(response); + assertEquals(200, response.getStatusCode()); + assertArrayEquals(body, response.getResponseBodyAsBytes()); + } + } + + @Test + public void largeBodyOverHttp2() throws Exception { + // 64KB body to test DATA frame handling + byte[] body = new byte[64 * 1024]; + for (int i = 0; i < body.length; i++) { + body[i] = (byte) (i % 256); + } + try (AsyncHttpClient client = http2Client()) { + Response response = client.preparePost(httpsUrl("/echo")) + .setBody(body) + .setHeader(CONTENT_TYPE, "application/octet-stream") + .execute() + .get(30, SECONDS); + + assertNotNull(response); + assertEquals(200, response.getStatusCode()); + assertArrayEquals(body, response.getResponseBodyAsBytes()); + } + } + + @Test + public void multipleSequentialRequestsOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + for (int i = 0; i < 5; i++) { + String body = "Request " + i; + Response response = client.preparePost(httpsUrl("/echo")) + .setBody(body) + .setHeader(CONTENT_TYPE, "text/plain") + .execute() + .get(30, SECONDS); + + assertNotNull(response); + assertEquals(200, response.getStatusCode()); + assertEquals(body, response.getResponseBody()); + } + } + } + + @Test + public void multipleConcurrentRequestsOverHttp2() throws Exception { + int numRequests = 10; + CountDownLatch latch = new CountDownLatch(numRequests); + AtomicInteger successCount = new AtomicInteger(0); + AtomicReference error = new AtomicReference<>(); + + try (AsyncHttpClient client = http2Client()) { + List> futures = new ArrayList<>(); + for (int i = 0; i < numRequests; i++) { + String body = "Concurrent request " + i; + CompletableFuture future = client.preparePost(httpsUrl("/echo")) + .setBody(body) + .setHeader(CONTENT_TYPE, "text/plain") + .execute() + .toCompletableFuture() + .whenComplete((r, t) -> { + if (t != null) { + error.compareAndSet(null, t); + } else { + successCount.incrementAndGet(); + } + latch.countDown(); + }); + futures.add(future); + } + + assertTrue(latch.await(30, SECONDS), "Timed out waiting for concurrent requests"); + assertNull(error.get(), "Unexpected error: " + error.get()); + assertEquals(numRequests, successCount.get()); + } + } + + @Test + public void http2HeadersContainPseudoHeaders() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Response response = client.prepareGet(httpsUrl("/headers-check")) + .addHeader("X-Custom-Header", "test-value") + .execute() + .get(30, SECONDS); + + assertNotNull(response); + assertEquals(200, response.getStatusCode()); + } + } + + @Test + public void http2ResponseReportsCorrectProtocol() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Response response = client.prepareGet(httpsUrl("/hello")) + .execute() + .get(30, SECONDS); + + assertNotNull(response); + assertEquals(200, response.getStatusCode()); + assertEquals(HttpProtocol.HTTP_2, response.getProtocol(), + "Response should report HTTP/2 protocol"); + } + } + + @Test + public void http2DisabledFallsBackToHttp11() throws Exception { + try (AsyncHttpClient client = http1Client()) { + assertNotNull(client); + } + } + + @Test + public void http2IsEnabledByDefault() { + AsyncHttpClientConfig defaultConfig = config().build(); + assertTrue(defaultConfig.isHttp2Enabled(), + "HTTP/2 should be enabled by default"); + } + + @Test + public void http2CanBeDisabledViaConfig() { + AsyncHttpClientConfig configWithHttp2Disabled = config() + .setHttp2Enabled(false) + .build(); + assertFalse(configWithHttp2Disabled.isHttp2Enabled(), + "HTTP/2 should be disabled when setHttp2Enabled(false) is called"); + } + + // ------------------------------------------------------------------------- + // Basic request/response tests (mirrored from BasicHttpTest) + // ------------------------------------------------------------------------- + + @Test + public void getRootUrlOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Response response = client.prepareGet(httpsUrl("/ok")) + .execute() + .get(30, SECONDS); + + assertEquals(200, response.getStatusCode()); + } + } + + @Test + public void getResponseBodyOverHttp2() throws Exception { + String body = "Hello World"; + try (AsyncHttpClient client = http2Client()) { + Response response = client.preparePost(httpsUrl("/echo")) + .setBody(body) + .setHeader(CONTENT_TYPE, "text/plain") + .execute() + .get(30, SECONDS); + + assertEquals(200, response.getStatusCode()); + assertEquals(body, response.getResponseBody()); + } + } + + @Test + public void getEmptyBodyOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Response response = client.prepareGet(httpsUrl("/ok")) + .execute() + .get(30, SECONDS); + + assertEquals(200, response.getStatusCode()); + assertTrue(response.getResponseBody().isEmpty()); + } + } + + @Test + public void getEmptyBodyNotifiesHandlerOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + final AtomicBoolean handlerWasNotified = new AtomicBoolean(); + + client.prepareGet(httpsUrl("/ok")).execute(new AsyncCompletionHandlerAdapter() { + @Override + public Response onCompleted(Response response) { + assertEquals(200, response.getStatusCode()); + handlerWasNotified.set(true); + return response; + } + }).get(30, SECONDS); + + assertTrue(handlerWasNotified.get()); + } + } + + @Test + public void headHasEmptyBodyOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Response response = client.prepareHead(httpsUrl("/head")) + .execute() + .get(30, SECONDS); + + assertEquals(200, response.getStatusCode()); + assertTrue(response.getResponseBody().isEmpty()); + } + } + + @Test + public void defaultRequestBodyEncodingIsUtf8OverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Response response = client.preparePost(httpsUrl("/echo")) + .setBody("\u017D\u017D\u017D\u017D\u017D\u017D") + .execute() + .get(30, SECONDS); + + assertArrayEquals(response.getResponseBodyAsBytes(), + "\u017D\u017D\u017D\u017D\u017D\u017D".getBytes(UTF_8)); + } + } + + // ------------------------------------------------------------------------- + // Path and query string tests + // ------------------------------------------------------------------------- + + @Test + public void getUrlWithPathWithoutQueryOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Response response = client.prepareGet(httpsUrl("/foo/bar")) + .execute() + .get(30, SECONDS); + + assertEquals(200, response.getStatusCode()); + assertEquals("/foo/bar", response.getHeader("X-PathInfo")); + } + } + + @Test + public void getUrlWithPathWithQueryOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Response response = client.prepareGet(httpsUrl("/foo/bar?q=+%20x")) + .execute() + .get(30, SECONDS); + + assertEquals(200, response.getStatusCode()); + assertEquals("/foo/bar", response.getHeader("X-PathInfo")); + assertNotNull(response.getHeader("X-QueryString")); + } + } + + @Test + public void getUrlWithPathWithQueryParamsOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Response response = client.prepareGet(httpsUrl("/foo/bar")) + .addQueryParam("q", "a b") + .execute() + .get(30, SECONDS); + + assertEquals(200, response.getStatusCode()); + assertNotNull(response.getHeader("X-QueryString")); + } + } + + @Test + public void getProperPathAndQueryStringOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Response response = client.prepareGet(httpsUrl("/foo/bar?foo=bar")) + .execute() + .get(30, SECONDS); + + assertEquals(200, response.getStatusCode()); + assertNotNull(response.getHeader("X-PathInfo")); + assertNotNull(response.getHeader("X-QueryString")); + } + } + + // ------------------------------------------------------------------------- + // Headers and cookies tests + // ------------------------------------------------------------------------- + + @Test + public void getWithHeadersOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Response response = client.prepareGet(httpsUrl("/echo")) + .addHeader("Test1", "Test1") + .addHeader("Test2", "Test2") + .addHeader("Test3", "Test3") + .addHeader("Test4", "Test4") + .execute() + .get(30, SECONDS); + + assertEquals(200, response.getStatusCode()); + for (int i = 1; i < 5; i++) { + assertEquals("Test" + i, response.getHeader("X-test" + i)); + } + } + } + + @Test + public void postWithHeadersAndFormParamsOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Map> m = new HashMap<>(); + for (int i = 0; i < 5; i++) { + m.put("param_" + i, Collections.singletonList("value_" + i)); + } + + Response response = client.preparePost(httpsUrl("/echo")) + .setHeader(CONTENT_TYPE, HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED) + .setFormParams(m) + .execute() + .get(30, SECONDS); + + assertEquals(200, response.getStatusCode()); + for (int i = 0; i < 5; i++) { + assertEquals("value_" + i, + URLDecoder.decode(response.getHeader("X-param_" + i), UTF_8)); + } + } + } + + @Test + public void postChineseCharOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + String chineseChar = "\u662F"; + + Map> m = new HashMap<>(); + m.put("param", Collections.singletonList(chineseChar)); + + Response response = client.preparePost(httpsUrl("/echo")) + .setHeader(CONTENT_TYPE, HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED) + .setFormParams(m) + .execute() + .get(30, SECONDS); + + assertEquals(200, response.getStatusCode()); + String value = URLDecoder.decode(response.getHeader("X-param"), UTF_8); + assertEquals(chineseChar, value); + } + } + + @Test + public void getWithCookiesOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Response response = client.prepareGet(httpsUrl("/cookies")) + .addHeader("cookie", "foo=value") + .execute() + .get(30, SECONDS); + + assertEquals(200, response.getStatusCode()); + String setCookie = response.getHeader("set-cookie"); + assertNotNull(setCookie); + assertTrue(setCookie.contains("foo=value")); + } + } + + @Test + public void postFormParametersAsBodyStringOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 5; i++) { + sb.append("param_").append(i).append("=value_").append(i).append('&'); + } + sb.setLength(sb.length() - 1); + + Response response = client.preparePost(httpsUrl("/echo")) + .setHeader(CONTENT_TYPE, HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED) + .setBody(sb.toString()) + .execute() + .get(30, SECONDS); + + assertEquals(200, response.getStatusCode()); + for (int i = 0; i < 5; i++) { + assertEquals("value_" + i, + URLDecoder.decode(response.getHeader("X-param_" + i), UTF_8)); + } + } + } + + // ------------------------------------------------------------------------- + // Timeout and cancellation tests + // ------------------------------------------------------------------------- + + @Test + public void cancelledFutureThrowsCancellationExceptionOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Future future = client.prepareGet(httpsUrl("/delay/5000")) + .execute(new AsyncCompletionHandlerAdapter() { + @Override + public void onThrowable(Throwable t) { + } + }); + future.cancel(true); + assertThrows(CancellationException.class, () -> future.get(30, SECONDS)); + } + } + + @Test + public void futureTimeOutThrowsTimeoutExceptionOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Future future = client.prepareGet(httpsUrl("/delay/5000")) + .execute(new AsyncCompletionHandlerAdapter() { + @Override + public void onThrowable(Throwable t) { + } + }); + + assertThrows(TimeoutException.class, () -> future.get(2, SECONDS)); + } + } + + @Test + public void configTimeoutNotifiesOnThrowableAndFutureOverHttp2() throws Exception { + try (AsyncHttpClient client = http2ClientWithTimeout(1000)) { + final AtomicBoolean onCompletedWasNotified = new AtomicBoolean(); + final AtomicBoolean onThrowableWasNotifiedWithTimeoutException = new AtomicBoolean(); + final CountDownLatch latch = new CountDownLatch(1); + + Future whenResponse = client.prepareGet(httpsUrl("/delay/5000")) + .execute(new AsyncCompletionHandlerAdapter() { + @Override + public Response onCompleted(Response response) { + onCompletedWasNotified.set(true); + latch.countDown(); + return response; + } + + @Override + public void onThrowable(Throwable t) { + onThrowableWasNotifiedWithTimeoutException.set(t instanceof TimeoutException); + latch.countDown(); + } + }); + + if (!latch.await(30, SECONDS)) { + fail("Timed out"); + } + + assertFalse(onCompletedWasNotified.get()); + assertTrue(onThrowableWasNotifiedWithTimeoutException.get()); + + assertThrows(ExecutionException.class, () -> whenResponse.get(30, SECONDS)); + } + } + + @Test + public void configRequestTimeoutHappensInDueTimeOverHttp2() throws Exception { + try (AsyncHttpClient client = http2ClientWithTimeout(1000)) { + long start = unpreciseMillisTime(); + try { + client.prepareGet(httpsUrl("/delay/2000")).execute().get(); + fail("Should have thrown"); + } catch (ExecutionException ex) { + final long elapsedTime = unpreciseMillisTime() - start; + assertTrue(elapsedTime >= 1_000 && elapsedTime <= 1_500, + "Elapsed time was " + elapsedTime + "ms"); + } + } + } + + @Test + public void cancellingFutureNotifiesOnThrowableWithCancellationExceptionOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + CountDownLatch latch = new CountDownLatch(1); + + Future future = client.preparePost(httpsUrl("/delay/2000")) + .setBody("Body") + .execute(new AsyncCompletionHandlerAdapter() { + @Override + public void onThrowable(Throwable t) { + if (t instanceof CancellationException) { + latch.countDown(); + } + } + }); + + future.cancel(true); + if (!latch.await(30, SECONDS)) { + fail("Timed out"); + } + } + } + + // ------------------------------------------------------------------------- + // Handler exception notification tests + // ------------------------------------------------------------------------- + + @Test + public void exceptionInOnCompletedGetNotifiedToOnThrowableOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference message = new AtomicReference<>(); + + client.prepareGet(httpsUrl("/ok")).execute(new AsyncCompletionHandlerAdapter() { + @Override + public Response onCompleted(Response response) { + throw unknownStackTrace(new IllegalStateException("FOO"), + BasicHttp2Test.class, "exceptionInOnCompletedGetNotifiedToOnThrowableOverHttp2"); + } + + @Override + public void onThrowable(Throwable t) { + message.set(t.getMessage()); + latch.countDown(); + } + }); + + if (!latch.await(30, SECONDS)) { + fail("Timed out"); + } + + assertEquals("FOO", message.get()); + } + } + + @Test + public void exceptionInOnCompletedGetNotifiedToFutureOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Future whenResponse = client.prepareGet(httpsUrl("/ok")) + .execute(new AsyncCompletionHandlerAdapter() { + @Override + public Response onCompleted(Response response) { + throw unknownStackTrace(new IllegalStateException("FOO"), + BasicHttp2Test.class, "exceptionInOnCompletedGetNotifiedToFutureOverHttp2"); + } + + @Override + public void onThrowable(Throwable t) { + } + }); + + try { + whenResponse.get(30, SECONDS); + fail("Should have thrown"); + } catch (ExecutionException e) { + assertInstanceOf(IllegalStateException.class, e.getCause()); + } + } + } + + // ------------------------------------------------------------------------- + // Redirects and methods tests + // ------------------------------------------------------------------------- + + @Test + public void reachingMaxRedirectThrowsMaxRedirectExceptionOverHttp2() throws Exception { + try (AsyncHttpClient client = http2ClientWithRedirects(1)) { + try { + client.prepareGet(httpsUrl("/redirect/3")) + .execute(new AsyncCompletionHandlerAdapter() { + @Override + public Response onCompleted(Response response) { + fail("Should not be here"); + return response; + } + + @Override + public void onThrowable(Throwable t) { + } + }).get(30, SECONDS); + fail("Should have thrown"); + } catch (ExecutionException e) { + assertInstanceOf(org.asynchttpclient.handler.MaxRedirectException.class, e.getCause()); + } + } + } + + @Test + public void optionsIsSupportedOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Response response = client.prepareOptions(httpsUrl("/options")) + .execute() + .get(30, SECONDS); + + assertEquals(200, response.getStatusCode()); + assertEquals("GET,HEAD,POST,OPTIONS,TRACE", response.getHeader("allow")); + } + } + + // ------------------------------------------------------------------------- + // Connection events tests + // ------------------------------------------------------------------------- + + @Test + public void newConnectionEventsAreFiredOverHttp2() throws Exception { + try (AsyncHttpClient client = http2Client()) { + EventCollectingHandler handler = new EventCollectingHandler(); + client.prepareGet(httpsUrl("/ok")).execute(handler).get(30, SECONDS); + handler.waitForCompletion(30, SECONDS); + + Object[] expectedEvents = { + CONNECTION_POOL_EVENT, + HOSTNAME_RESOLUTION_EVENT, + HOSTNAME_RESOLUTION_SUCCESS_EVENT, + CONNECTION_OPEN_EVENT, + CONNECTION_SUCCESS_EVENT, + TLS_HANDSHAKE_EVENT, + TLS_HANDSHAKE_SUCCESS_EVENT, + REQUEST_SEND_EVENT, + STATUS_RECEIVED_EVENT, + HEADERS_RECEIVED_EVENT, + CONNECTION_OFFER_EVENT, + COMPLETED_EVENT}; + + assertArrayEquals(expectedEvents, handler.firedEvents.toArray(), + "Got " + Arrays.toString(handler.firedEvents.toArray())); + } + } + + // ------------------------------------------------------------------------- + // HTTP/2-specific tests + // ------------------------------------------------------------------------- + + @Test + public void http2ErrorStatusCodesAreReported() throws Exception { + try (AsyncHttpClient client = http2Client()) { + Response response404 = client.prepareGet(httpsUrl("/status/404")) + .execute() + .get(30, SECONDS); + assertEquals(404, response404.getStatusCode()); + + Response response500 = client.prepareGet(httpsUrl("/status/500")) + .execute() + .get(30, SECONDS); + assertEquals(500, response500.getStatusCode()); + } + } + + @Test + public void http2StreamResetIsHandledGracefully() throws Exception { + try (AsyncHttpClient client = http2ClientWithTimeout(5000)) { + try { + client.prepareGet(httpsUrl("/reset")) + .execute() + .get(10, SECONDS); + fail("Should have thrown"); + } catch (ExecutionException e) { + assertNotNull(e.getCause()); + } + } + } + + @Test + public void postByteBodyOverHttp2() throws Exception { + byte[] bodyBytes = "Hello from byte array body".getBytes(UTF_8); + try (AsyncHttpClient client = http2Client()) { + Response response = client.preparePost(httpsUrl("/echo")) + .setHeader(CONTENT_TYPE, "application/octet-stream") + .setBody(bodyBytes) + .execute() + .get(30, SECONDS); + + assertEquals(200, response.getStatusCode()); + assertArrayEquals(bodyBytes, response.getResponseBodyAsBytes()); + } + } + + // ------------------------------------------------------------------------- + // HTTP/2 multiplexing and connection management tests + // ------------------------------------------------------------------------- + + @Test + public void http2MultiplexesConcurrentRequestsOnSingleConnection() throws Exception { + try (AsyncHttpClient client = http2ClientWithConfig(b -> b.setMaxConnectionsPerHost(1))) { + int concurrentRequests = 10; + CountDownLatch latch = new CountDownLatch(concurrentRequests); + AtomicInteger successCount = new AtomicInteger(0); + AtomicReference firstError = new AtomicReference<>(); + + // Fire off concurrent requests — with maxConnectionsPerHost=1 and HTTP/1.1, + // these would block waiting for the single connection. With HTTP/2 multiplexing, + // they should all complete on the same connection concurrently. + for (int i = 0; i < concurrentRequests; i++) { + final int idx = i; + client.prepareGet(httpsUrl("/delay/100")) + .execute(new AsyncCompletionHandlerBase() { + @Override + public Response onCompleted(Response response) throws Exception { + if (response.getStatusCode() == 200) { + successCount.incrementAndGet(); + } + latch.countDown(); + return response; + } + + @Override + public void onThrowable(Throwable t) { + firstError.compareAndSet(null, t); + latch.countDown(); + } + }); + } + + assertTrue(latch.await(30, SECONDS), "All requests should complete within 30s"); + assertNull(firstError.get(), "No errors expected, got: " + firstError.get()); + assertEquals(concurrentRequests, successCount.get(), + "All concurrent requests should succeed via HTTP/2 multiplexing"); + } + } + + @Test + public void http2ConnectionIsReusedAcrossSequentialRequests() throws Exception { + try (AsyncHttpClient client = http2Client()) { + // First request — establishes the HTTP/2 connection + Response response1 = client.prepareGet(httpsUrl("/ok")).execute().get(30, SECONDS); + assertEquals(200, response1.getStatusCode()); + + // Second request — should reuse the same HTTP/2 connection from the registry + EventCollectingHandler handler = new EventCollectingHandler(); + Response response2 = client.prepareGet(httpsUrl("/ok")).execute(handler).get(30, SECONDS); + assertEquals(200, response2.getStatusCode()); + handler.waitForCompletion(30, SECONDS); + + // The second request should hit the connection pool (HTTP/2 registry) and NOT + // open a new connection — no DNS resolution, no TLS handshake + var events = handler.firedEvents; + assertTrue(events.contains(CONNECTION_POOL_EVENT), "Should attempt pool lookup"); + assertFalse(events.contains(HOSTNAME_RESOLUTION_EVENT), + "Should NOT resolve hostname for reused H2 connection"); + assertFalse(events.contains(TLS_HANDSHAKE_EVENT), + "Should NOT do TLS handshake for reused H2 connection"); + } + } + + @Test + public void http2SequentialRequestsWithMaxConnectionsPerHostOne() throws Exception { + // Verify that with maxConnectionsPerHost=1, sequential HTTP/2 requests don't deadlock + try (AsyncHttpClient client = http2ClientWithConfig(b -> b.setMaxConnectionsPerHost(1))) { + for (int i = 0; i < 5; i++) { + Response response = client.prepareGet(httpsUrl("/ok")).execute().get(30, SECONDS); + assertEquals(200, response.getStatusCode()); + } + } + } +} diff --git a/client/src/test/java/org/asynchttpclient/LargeResponseTest.java b/client/src/test/java/org/asynchttpclient/LargeResponseTest.java index df0558f566..5651ebb84b 100644 --- a/client/src/test/java/org/asynchttpclient/LargeResponseTest.java +++ b/client/src/test/java/org/asynchttpclient/LargeResponseTest.java @@ -46,7 +46,7 @@ public class LargeResponseTest { private static final int textSize = 4 * 1024; private static final byte[] textBytes = "z".repeat(textSize).getBytes(StandardCharsets.UTF_8); - private static final long responseSize = ((long)textSize) * (1_500_000L); + private static final long responseSize = ((long)textSize) * (100_000L); private static HttpServer HTTP_SERVER; diff --git a/client/src/test/java/org/asynchttpclient/channel/MaxTotalConnectionTest.java b/client/src/test/java/org/asynchttpclient/channel/MaxTotalConnectionTest.java index 6094f5bdb9..345ce9818f 100644 --- a/client/src/test/java/org/asynchttpclient/channel/MaxTotalConnectionTest.java +++ b/client/src/test/java/org/asynchttpclient/channel/MaxTotalConnectionTest.java @@ -40,7 +40,7 @@ public class MaxTotalConnectionTest extends AbstractBasicTest { @RepeatedIfExceptionsTest(repeats = 5) public void testMaxTotalConnectionsExceedingException() throws IOException { - String[] urls = {"https://google.com", "https://github.com"}; + String[] urls = {getTargetUrl(), String.format("http://localhost:%d/foo/test", port2)}; AsyncHttpClientConfig config = config() .setConnectTimeout(Duration.ofSeconds(1)) @@ -76,7 +76,7 @@ public void testMaxTotalConnectionsExceedingException() throws IOException { @RepeatedIfExceptionsTest(repeats = 5) public void testMaxTotalConnections() throws Exception { - String[] urls = {"https://www.google.com", "https://www.youtube.com"}; + String[] urls = {getTargetUrl(), String.format("http://localhost:%d/foo/test", port2)}; final CountDownLatch latch = new CountDownLatch(2); final AtomicReference ex = new AtomicReference<>(); diff --git a/client/src/test/java/org/asynchttpclient/proxy/HttpsProxyTestcontainersIntegrationTest.java b/client/src/test/java/org/asynchttpclient/proxy/HttpsProxyTestcontainersIntegrationTest.java index e915e86663..40b0f6ee3e 100644 --- a/client/src/test/java/org/asynchttpclient/proxy/HttpsProxyTestcontainersIntegrationTest.java +++ b/client/src/test/java/org/asynchttpclient/proxy/HttpsProxyTestcontainersIntegrationTest.java @@ -144,6 +144,8 @@ public void testHttpProxyToHttpsTarget() throws Exception { .setProxyType(ProxyType.HTTP) .build()) .setUseInsecureTrustManager(true) + // HTTP/2 ALPN upgrade after proxy CONNECT tunnel is not yet supported + .setHttp2Enabled(false) .setConnectTimeout(Duration.ofMillis(10000)) .setRequestTimeout(Duration.ofMillis(30000)) .build(); @@ -165,6 +167,8 @@ public void testHttpsProxyToHttpsTarget() throws Exception { .setProxyType(ProxyType.HTTPS) .build()) .setUseInsecureTrustManager(true) + // HTTP/2 ALPN upgrade after proxy CONNECT tunnel is not yet supported + .setHttp2Enabled(false) .setConnectTimeout(Duration.ofMillis(10000)) .setRequestTimeout(Duration.ofMillis(30000)) .build(); diff --git a/pom.xml b/pom.xml index f7d4807e9d..a089bd8723 100644 --- a/pom.xml +++ b/pom.xml @@ -130,6 +130,12 @@ ${netty.version} + + io.netty + netty-codec-http2 + ${netty.version} + + io.netty netty-codec