diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecRuntime.java index 4fd510b8c3..d1757e4cc0 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecRuntime.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/AsyncExecRuntime.java @@ -175,6 +175,18 @@ Cancellable execute( */ void markConnectionNonReusable(); + /** + * Returns the route that has already been established by the connection pool, + * or {@code null} if route completion is not handled at the pool level. + * + * @return the established route, or {@code null}. + * + * @since 5.7 + */ + default HttpRoute getEstablishedRoute() { + return null; + } + /** * Forks this runtime for parallel execution. * diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncConnectExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncConnectExec.java index d8dd016339..bd7a0422f9 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncConnectExec.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncConnectExec.java @@ -46,7 +46,6 @@ import org.apache.hc.client5.http.auth.AuthenticationException; import org.apache.hc.client5.http.auth.ChallengeType; import org.apache.hc.client5.http.auth.MalformedChallengeException; -import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.impl.auth.AuthCacheKeeper; import org.apache.hc.client5.http.impl.auth.AuthenticationHandler; import org.apache.hc.client5.http.impl.routing.BasicRouteDirector; @@ -250,6 +249,15 @@ public void cancelled() { public void completed(final AsyncExecRuntime execRuntime) { final HttpHost proxy = route.getProxyHost(); tracker.connectProxy(proxy, route.isSecure() && !route.isTunnelled()); + if (route.isTunnelled() && execRuntime.getEstablishedRoute() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} tunnel to target already established by connection pool", exchangeId); + } + tracker.tunnelTarget(false); + if (route.isLayered()) { + tracker.layerProtocol(route.isSecure()); + } + } if (LOG.isDebugEnabled()) { LOG.debug("{} connected to proxy", exchangeId); } @@ -519,31 +527,8 @@ private boolean needAuthentication( final HttpHost proxy, final HttpResponse response, final HttpClientContext context) throws AuthenticationException, MalformedChallengeException { - final RequestConfig config = context.getRequestConfigOrDefault(); - if (config.isAuthenticationEnabled()) { - final boolean proxyAuthRequested = authenticator.isChallenged(proxy, ChallengeType.PROXY, response, proxyAuthExchange, context); - final boolean proxyMutualAuthRequired = authenticator.isChallengeExpected(proxyAuthExchange); - - if (authCacheKeeper != null) { - if (proxyAuthRequested) { - authCacheKeeper.updateOnChallenge(proxy, null, proxyAuthExchange, context); - } else { - authCacheKeeper.updateOnNoChallenge(proxy, null, proxyAuthExchange, context); - } - } - - if (proxyAuthRequested || proxyMutualAuthRequired) { - final boolean updated = authenticator.handleResponse(proxy, ChallengeType.PROXY, response, - proxyAuthStrategy, proxyAuthExchange, context); - - if (authCacheKeeper != null) { - authCacheKeeper.updateOnResponse(proxy, null, proxyAuthExchange, context); - } - - return updated; - } - } - return false; + return authenticator.needProxyAuthentication( + proxyAuthExchange, proxy, response, proxyAuthStrategy, authCacheKeeper, context); } private void proceedConnected( diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2AsyncClientBuilder.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2AsyncClientBuilder.java index 74648da764..13c4bd1974 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2AsyncClientBuilder.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2AsyncClientBuilder.java @@ -840,9 +840,12 @@ public CloseableHttpAsyncClient build() { new H2AsyncMainClientExec(httpProcessor), ChainElement.MAIN_TRANSPORT.name()); + final HttpProcessor proxyConnectHttpProcessor = + new DefaultHttpProcessor(new RequestTargetHost(), new RequestUserAgent(userAgentCopy)); + execChainDefinition.addFirst( new AsyncConnectExec( - new DefaultHttpProcessor(new RequestTargetHost(), new RequestUserAgent(userAgentCopy)), + proxyConnectHttpProcessor, proxyAuthStrategyCopy, schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE, authCachingDisabled), @@ -971,7 +974,21 @@ public CloseableHttpAsyncClient build() { } final MultihomeConnectionInitiator connectionInitiator = new MultihomeConnectionInitiator(ioReactor, dnsResolver); - final InternalH2ConnPool connPool = new InternalH2ConnPool(connectionInitiator, host -> null, tlsStrategyCopy); + final H2RouteOperator routeOperator = new H2RouteOperator( + tlsStrategyCopy, + new H2TunnelProtocolStarter(h2Config, charCodingConfig), + proxyConnectHttpProcessor, + proxyAuthStrategyCopy, + schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE, + authCachingDisabled, + authSchemeRegistryCopy, + credentialsProviderCopy, + defaultRequestConfig); + final InternalH2ConnPool connPool = new InternalH2ConnPool( + connectionInitiator, + host -> null, + tlsStrategyCopy, + routeOperator); connPool.setConnectionConfigResolver(connectionConfigResolver); List closeablesCopy = closeables != null ? new ArrayList<>(closeables) : null; diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2OverH2TunnelExchangeHandler.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2OverH2TunnelExchangeHandler.java new file mode 100644 index 0000000000..7b6b7bdc80 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2OverH2TunnelExchangeHandler.java @@ -0,0 +1,287 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.async; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ConnectionClosedException; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpRequestInterceptor; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.http.StreamControl; +import org.apache.hc.core5.http.impl.BasicEntityDetails; +import org.apache.hc.core5.http.message.BasicHttpRequest; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.RequestChannel; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.net.NamedEndpoint; +import org.apache.hc.core5.net.URIAuthority; +import org.apache.hc.core5.reactor.IOEventHandler; +import org.apache.hc.core5.reactor.IOEventHandlerFactory; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.reactor.ProtocolIOSession; +import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer; +import org.apache.hc.core5.util.Timeout; + +/** + * Exchange handler that establishes an HTTP/2 CONNECT tunnel and exposes + * the resulting data stream as a {@link ProtocolIOSession}. + * + * @since 5.7 + */ +final class H2OverH2TunnelExchangeHandler implements AsyncClientExchangeHandler { + + private final IOSession physicalSession; + private final NamedEndpoint targetEndpoint; + private final Timeout connectTimeout; + private final boolean secure; + private final TlsStrategy tlsStrategy; + private final HttpRequestInterceptor connectRequestInterceptor; + private final IOEventHandlerFactory protocolStarter; + private final FutureCallback callback; + + private final AtomicBoolean done; + + private volatile DataStreamChannel dataChannel; + private volatile CapacityChannel capacityChannel; + private volatile StreamControl streamControl; + private volatile H2TunnelProtocolIOSession tunnelSession; + + H2OverH2TunnelExchangeHandler( + final IOSession physicalSession, + final NamedEndpoint targetEndpoint, + final Timeout connectTimeout, + final boolean secure, + final TlsStrategy tlsStrategy, + final HttpRequestInterceptor connectRequestInterceptor, + final IOEventHandlerFactory protocolStarter, + final FutureCallback callback) { + this.physicalSession = physicalSession; + this.targetEndpoint = targetEndpoint; + this.connectTimeout = connectTimeout; + this.secure = secure; + this.tlsStrategy = tlsStrategy; + this.connectRequestInterceptor = connectRequestInterceptor; + this.protocolStarter = protocolStarter; + this.callback = callback; + this.done = new AtomicBoolean(false); + } + + void initiated(final StreamControl streamControl) { + this.streamControl = streamControl; + final H2TunnelProtocolIOSession tunnel = this.tunnelSession; + if (tunnel != null) { + tunnel.bindStreamControl(streamControl); + } + } + + @Override + public void releaseResources() { + } + + @Override + public void failed(final Exception cause) { + fail(cause); + } + + @Override + public void cancel() { + fail(new ConnectionClosedException("Tunnel setup cancelled")); + } + + @Override + public void produceRequest(final RequestChannel requestChannel, final HttpContext context) throws HttpException, IOException { + final HttpRequest connectRequest = new BasicHttpRequest(Method.CONNECT.name(), (String) null); + connectRequest.setAuthority(new URIAuthority(targetEndpoint)); + if (connectRequestInterceptor != null) { + connectRequestInterceptor.process(connectRequest, null, context); + } + requestChannel.sendRequest(connectRequest, new BasicEntityDetails(-1, null), context); + } + + @Override + public int available() { + final H2TunnelProtocolIOSession tunnel = this.tunnelSession; + return tunnel != null ? tunnel.available() : 0; + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + this.dataChannel = channel; + final H2TunnelProtocolIOSession tunnel = this.tunnelSession; + if (tunnel != null) { + tunnel.attachChannel(channel); + tunnel.onOutputReady(); + } + } + + @Override + public void consumeInformation(final HttpResponse response, final HttpContext context) { + } + + @Override + public void consumeResponse( + final HttpResponse response, + final EntityDetails entityDetails, + final HttpContext context) throws HttpException, IOException { + + final int status = response.getCode(); + if (status < 200 || status >= 300) { + throw new TunnelRefusedException(response); + } + + if (entityDetails == null) { + throw new HttpException("CONNECT response does not provide a tunneled data stream"); + } + + if (this.tunnelSession != null) { + return; + } + + final H2TunnelProtocolIOSession tunnel = + new H2TunnelProtocolIOSession(physicalSession, targetEndpoint, connectTimeout, streamControl); + + final DataStreamChannel currentChannel = this.dataChannel; + if (currentChannel != null) { + tunnel.attachChannel(currentChannel); + } + final CapacityChannel currentCapacity = this.capacityChannel; + if (currentCapacity != null) { + tunnel.updateCapacityChannel(currentCapacity); + } + this.tunnelSession = tunnel; + + if (secure) { + tlsStrategy.upgrade( + tunnel, + targetEndpoint, + null, + connectTimeout, + new FutureCallback() { + + @Override + public void completed(final TransportSecurityLayer transportSecurityLayer) { + try { + startProtocol(tunnel); + complete(tunnel); + } catch (final Exception ex) { + fail(ex); + } + } + + @Override + public void failed(final Exception ex) { + fail(ex); + } + + @Override + public void cancelled() { + fail(new ConnectionClosedException("Tunnel TLS upgrade cancelled")); + } + }); + } else { + startProtocol(tunnel); + complete(tunnel); + } + } + + private void startProtocol(final H2TunnelProtocolIOSession tunnel) throws IOException { + if (protocolStarter == null) { + return; + } + final IOEventHandler protocolHandler = protocolStarter.createHandler(tunnel, null); + tunnel.upgrade(protocolHandler); + protocolHandler.connected(tunnel); + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + this.capacityChannel = capacityChannel; + final H2TunnelProtocolIOSession tunnel = this.tunnelSession; + if (tunnel != null) { + tunnel.updateCapacityChannel(capacityChannel); + } + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + final H2TunnelProtocolIOSession tunnel = this.tunnelSession; + if (tunnel != null && src != null && src.hasRemaining()) { + tunnel.onInput(src); + } + } + + @Override + public void streamEnd(final List trailers) { + final H2TunnelProtocolIOSession tunnel = this.tunnelSession; + if (tunnel != null) { + tunnel.onRemoteStreamEnd(); + } else { + closeTransport(CloseMode.GRACEFUL); + if (done.compareAndSet(false, true) && callback != null) { + callback.failed(new ConnectionClosedException("Tunnel stream closed before establishment")); + } + } + } + + private void closeTransport(final CloseMode closeMode) { + final H2TunnelProtocolIOSession tunnel = this.tunnelSession; + if (tunnel != null) { + tunnel.close(closeMode); + return; + } + final StreamControl currentStreamControl = this.streamControl; + if (currentStreamControl != null) { + currentStreamControl.cancel(); + } + } + + private void fail(final Exception cause) { + closeTransport(CloseMode.IMMEDIATE); + if (done.compareAndSet(false, true) && callback != null) { + callback.failed(cause); + } + } + + private void complete(final H2TunnelProtocolIOSession tunnel) { + if (done.compareAndSet(false, true) && callback != null) { + callback.completed(tunnel); + } + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2OverH2TunnelSupport.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2OverH2TunnelSupport.java new file mode 100644 index 0000000000..5d27eab216 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2OverH2TunnelSupport.java @@ -0,0 +1,223 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.async; + +import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.ThreadingBehavior; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpRequestInterceptor; +import org.apache.hc.core5.http.nio.command.RequestExecutionCommand; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.net.NamedEndpoint; +import org.apache.hc.core5.reactor.Command; +import org.apache.hc.core5.reactor.IOEventHandlerFactory; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.reactor.ProtocolIOSession; +import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.Timeout; + +/** + * Helper for establishing HTTP/2 CONNECT tunnels through HTTP/2 proxies. + *

+ * Multiplexing-safe: tunnel close affects only the CONNECT stream, + * not the underlying physical HTTP/2 connection. + *

+ *

+ * This helper does not handle proxy authentication (407 retries). + * That responsibility belongs to client implementations that maintain + * authentication state. + *

+ * + * @since 5.7 + */ +@Contract(threading = ThreadingBehavior.STATELESS) +public final class H2OverH2TunnelSupport { + + private H2OverH2TunnelSupport() { + } + + /** + * Establishes a CONNECT tunnel and returns the resulting {@link ProtocolIOSession} + * via the callback. The tunnel session supports optional TLS upgrade. + * + * @param proxySession the HTTP/2 session to the proxy + * @param target the target endpoint for the CONNECT request authority + * @param connectTimeout timeout for the CONNECT handshake + * @param secure whether to upgrade the tunnel to TLS after establishment + * @param tlsStrategy TLS strategy for the upgrade (required when {@code secure} is true) + * @param callback completion callback receiving the tunnel session + * @since 5.7 + */ + public static void establish( + final IOSession proxySession, + final NamedEndpoint target, + final Timeout connectTimeout, + final boolean secure, + final TlsStrategy tlsStrategy, + final FutureCallback callback) { + establishInternal(proxySession, target, connectTimeout, secure, tlsStrategy, null, null, callback); + } + + /** + * Establishes a CONNECT tunnel with a request interceptor for injecting + * headers (e.g. proxy authentication) into the CONNECT request. + * + * @param proxySession the HTTP/2 session to the proxy + * @param target the target endpoint for the CONNECT request authority + * @param connectTimeout timeout for the CONNECT handshake + * @param secure whether to upgrade the tunnel to TLS after establishment + * @param tlsStrategy TLS strategy for the upgrade (required when {@code secure} is true) + * @param connectRequestInterceptor interceptor applied to the CONNECT request before sending + * @param callback completion callback receiving the tunnel session + * @since 5.7 + */ + public static void establish( + final IOSession proxySession, + final NamedEndpoint target, + final Timeout connectTimeout, + final boolean secure, + final TlsStrategy tlsStrategy, + final HttpRequestInterceptor connectRequestInterceptor, + final FutureCallback callback) { + establishInternal(proxySession, target, connectTimeout, secure, tlsStrategy, connectRequestInterceptor, null, callback); + } + + /** + * Establishes a CONNECT tunnel and starts a protocol handler inside it. + * The protocol starter factory creates an {@link org.apache.hc.core5.reactor.IOEventHandler} + * that is connected to the tunnel session immediately after establishment. + * + * @param proxySession the HTTP/2 session to the proxy + * @param target the target endpoint for the CONNECT request authority + * @param connectTimeout timeout for the CONNECT handshake + * @param secure whether to upgrade the tunnel to TLS after establishment + * @param tlsStrategy TLS strategy for the upgrade (required when {@code secure} is true) + * @param protocolStarter factory for the protocol handler to run inside the tunnel + * @param callback completion callback receiving the tunnel session + * @since 5.7 + */ + public static void establish( + final IOSession proxySession, + final NamedEndpoint target, + final Timeout connectTimeout, + final boolean secure, + final TlsStrategy tlsStrategy, + final IOEventHandlerFactory protocolStarter, + final FutureCallback callback) { + establish(proxySession, target, connectTimeout, secure, tlsStrategy, null, protocolStarter, callback); + } + + /** + * Establishes a CONNECT tunnel with both a request interceptor and a protocol starter. + * This is the most general overload combining proxy authentication support with + * automatic protocol initialization inside the tunnel. + * + * @param proxySession the HTTP/2 session to the proxy + * @param target the target endpoint for the CONNECT request authority + * @param connectTimeout timeout for the CONNECT handshake + * @param secure whether to upgrade the tunnel to TLS after establishment + * @param tlsStrategy TLS strategy for the upgrade (required when {@code secure} is true) + * @param connectRequestInterceptor interceptor applied to the CONNECT request before sending + * @param protocolStarter factory for the protocol handler to run inside the tunnel + * @param callback completion callback receiving the tunnel session + * @since 5.7 + */ + public static void establish( + final IOSession proxySession, + final NamedEndpoint target, + final Timeout connectTimeout, + final boolean secure, + final TlsStrategy tlsStrategy, + final HttpRequestInterceptor connectRequestInterceptor, + final IOEventHandlerFactory protocolStarter, + final FutureCallback callback) { + + final FutureCallback adapter = callback != null ? new FutureCallback() { + + @Override + public void completed(final ProtocolIOSession result) { + callback.completed(result); + } + + @Override + public void failed(final Exception ex) { + callback.failed(ex); + } + + @Override + public void cancelled() { + callback.cancelled(); + } + + } : null; + + establishInternal(proxySession, target, connectTimeout, secure, tlsStrategy, connectRequestInterceptor, protocolStarter, adapter); + } + + private static void establishInternal( + final IOSession proxySession, + final NamedEndpoint target, + final Timeout connectTimeout, + final boolean secure, + final TlsStrategy tlsStrategy, + final HttpRequestInterceptor connectRequestInterceptor, + final IOEventHandlerFactory protocolStarter, + final FutureCallback callback) { + + Args.notNull(proxySession, "Proxy I/O session"); + Args.notNull(target, "Tunnel target endpoint"); + if (secure) { + Args.notNull(tlsStrategy, "TLS strategy"); + } + + final H2OverH2TunnelExchangeHandler exchangeHandler = new H2OverH2TunnelExchangeHandler( + proxySession, + target, + connectTimeout, + secure, + tlsStrategy, + connectRequestInterceptor, + protocolStarter, + callback); + + proxySession.enqueue( + new RequestExecutionCommand( + exchangeHandler, + null, + HttpCoreContext.create(), + exchangeHandler::initiated), + Command.Priority.NORMAL); + } + + static void closeQuietly(final ProtocolIOSession session) { + if (session != null) { + session.close(CloseMode.IMMEDIATE); + } + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2RouteOperator.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2RouteOperator.java new file mode 100644 index 0000000000..6eaa38048f --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2RouteOperator.java @@ -0,0 +1,255 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.async; + +import org.apache.hc.client5.http.AuthenticationStrategy; +import org.apache.hc.client5.http.HttpRoute; +import org.apache.hc.client5.http.SchemePortResolver; +import org.apache.hc.client5.http.auth.AuthExchange; +import org.apache.hc.client5.http.auth.AuthSchemeFactory; +import org.apache.hc.client5.http.auth.AuthenticationException; +import org.apache.hc.client5.http.auth.ChallengeType; +import org.apache.hc.client5.http.auth.CredentialsProvider; +import org.apache.hc.client5.http.auth.MalformedChallengeException; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.auth.AuthCacheKeeper; +import org.apache.hc.client5.http.impl.auth.AuthenticationHandler; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.config.Lookup; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.http.protocol.HttpProcessor; +import org.apache.hc.core5.net.NamedEndpoint; +import org.apache.hc.core5.reactor.IOEventHandlerFactory; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.util.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Completes an HTTP/2 route by establishing a CONNECT tunnel through the proxy + * and optionally upgrading to TLS. Handles proxy authentication with bounded retry. + * + * @since 5.7 + */ +@Internal +final class H2RouteOperator { + + private static final Logger LOG = LoggerFactory.getLogger(H2RouteOperator.class); + private static final int MAX_TUNNEL_AUTH_ATTEMPTS = 3; + + private final TlsStrategy tlsStrategy; + private final IOEventHandlerFactory tunnelProtocolStarter; + private final HttpProcessor proxyHttpProcessor; + private final AuthenticationStrategy proxyAuthStrategy; + private final AuthenticationHandler authenticator; + private final AuthCacheKeeper authCacheKeeper; + private final Lookup authSchemeRegistry; + private final CredentialsProvider credentialsProvider; + private final RequestConfig defaultRequestConfig; + + H2RouteOperator( + final TlsStrategy tlsStrategy, + final IOEventHandlerFactory tunnelProtocolStarter) { + this(tlsStrategy, tunnelProtocolStarter, null, null, null, true, null, null, null); + } + + H2RouteOperator( + final TlsStrategy tlsStrategy, + final IOEventHandlerFactory tunnelProtocolStarter, + final HttpProcessor proxyHttpProcessor, + final AuthenticationStrategy proxyAuthStrategy, + final SchemePortResolver schemePortResolver, + final boolean authCachingDisabled, + final Lookup authSchemeRegistry, + final CredentialsProvider credentialsProvider, + final RequestConfig defaultRequestConfig) { + this.tlsStrategy = tlsStrategy; + this.tunnelProtocolStarter = tunnelProtocolStarter; + this.proxyHttpProcessor = proxyHttpProcessor; + this.proxyAuthStrategy = proxyAuthStrategy; + this.authenticator = proxyHttpProcessor != null && proxyAuthStrategy != null + ? new AuthenticationHandler() : null; + this.authCacheKeeper = proxyHttpProcessor != null && proxyAuthStrategy != null + && !authCachingDisabled && schemePortResolver != null + ? new AuthCacheKeeper(schemePortResolver) + : null; + this.authSchemeRegistry = authSchemeRegistry; + this.credentialsProvider = credentialsProvider; + this.defaultRequestConfig = defaultRequestConfig != null ? defaultRequestConfig : RequestConfig.DEFAULT; + } + + void completeRoute( + final HttpRoute route, + final Timeout connectTimeout, + final IOSession ioSession, + final FutureCallback callback) { + if (!route.isTunnelled()) { + callback.completed(ioSession); + return; + } + if (route.getHopCount() > 2) { + callback.failed(new HttpException("Proxy chains are not supported for HTTP/2 CONNECT tunneling")); + return; + } + if (tunnelProtocolStarter == null) { + callback.failed(new IllegalStateException("HTTP/2 tunnel protocol starter not configured")); + return; + } + if (route.isLayered() && tlsStrategy == null) { + callback.failed(new IllegalStateException("TLS strategy not configured")); + return; + } + final NamedEndpoint targetEndpoint = route.getTargetName() != null + ? route.getTargetName() : route.getTargetHost(); + final HttpHost proxy = route.getProxyHost(); + if (LOG.isDebugEnabled()) { + LOG.debug("{} establishing H2 tunnel to {} via {}", ioSession.getId(), targetEndpoint, proxy); + } + if (proxy != null && proxyHttpProcessor != null && proxyAuthStrategy != null && authenticator != null) { + establishTunnelWithAuth(route, ioSession, targetEndpoint, proxy, connectTimeout, callback); + } else { + H2OverH2TunnelSupport.establish( + ioSession, + targetEndpoint, + connectTimeout, + route.isLayered(), + tlsStrategy, + tunnelProtocolStarter, + callback); + } + } + + private void establishTunnelWithAuth( + final HttpRoute route, + final IOSession ioSession, + final NamedEndpoint targetEndpoint, + final HttpHost proxy, + final Timeout connectTimeout, + final FutureCallback callback) { + final HttpClientContext tunnelContext = HttpClientContext.create(); + if (authSchemeRegistry != null) { + tunnelContext.setAuthSchemeRegistry(authSchemeRegistry); + } + if (credentialsProvider != null) { + tunnelContext.setCredentialsProvider(credentialsProvider); + } + tunnelContext.setRequestConfig(defaultRequestConfig); + + final AuthExchange proxyAuthExchange = tunnelContext.getAuthExchange(proxy); + if (authCacheKeeper != null) { + authCacheKeeper.loadPreemptively(proxy, null, proxyAuthExchange, tunnelContext); + } + establishTunnelWithAuthAttempt( + route, ioSession, targetEndpoint, proxy, connectTimeout, + callback, tunnelContext, proxyAuthExchange, 1); + } + + private void establishTunnelWithAuthAttempt( + final HttpRoute route, + final IOSession ioSession, + final NamedEndpoint targetEndpoint, + final HttpHost proxy, + final Timeout connectTimeout, + final FutureCallback callback, + final HttpClientContext tunnelContext, + final AuthExchange proxyAuthExchange, + final int attemptCount) { + H2OverH2TunnelSupport.establish( + ioSession, + targetEndpoint, + connectTimeout, + route.isLayered(), + tlsStrategy, + (request, entityDetails, context) -> { + proxyHttpProcessor.process(request, null, tunnelContext); + authenticator.addAuthResponse(proxy, ChallengeType.PROXY, request, proxyAuthExchange, tunnelContext); + }, + tunnelProtocolStarter, + new FutureCallback() { + + @Override + public void completed(final IOSession result) { + callback.completed(result); + } + + @Override + public void failed(final Exception ex) { + if (!(ex instanceof TunnelRefusedException)) { + callback.failed(ex); + return; + } + final TunnelRefusedException tunnelRefusedException = (TunnelRefusedException) ex; + final HttpResponse response = tunnelRefusedException.getResponse(); + if (response.getCode() != HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED + || attemptCount >= MAX_TUNNEL_AUTH_ATTEMPTS) { + callback.failed(ex); + return; + } + try { + proxyHttpProcessor.process(response, null, tunnelContext); + final boolean retry = needAuthentication( + proxyAuthExchange, proxy, response, tunnelContext); + if (retry) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} tunnel auth challenge from {}; attempt {}/{}", + ioSession.getId(), proxy, attemptCount, MAX_TUNNEL_AUTH_ATTEMPTS); + } + establishTunnelWithAuthAttempt( + route, ioSession, targetEndpoint, proxy, connectTimeout, + callback, tunnelContext, proxyAuthExchange, attemptCount + 1); + } else { + callback.failed(ex); + } + } catch (final Exception ioEx) { + callback.failed(ioEx); + } + } + + @Override + public void cancelled() { + callback.cancelled(); + } + + }); + } + + private boolean needAuthentication( + final AuthExchange proxyAuthExchange, + final HttpHost proxy, + final HttpResponse response, + final HttpClientContext context) throws AuthenticationException, MalformedChallengeException { + return authenticator.needProxyAuthentication( + proxyAuthExchange, proxy, response, proxyAuthStrategy, authCacheKeeper, context); + } + +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2TunnelProtocolIOSession.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2TunnelProtocolIOSession.java new file mode 100644 index 0000000000..6d483a2e02 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2TunnelProtocolIOSession.java @@ -0,0 +1,386 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.async; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; + +import org.apache.hc.core5.concurrent.CallbackContribution; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.StreamControl; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.net.NamedEndpoint; +import org.apache.hc.core5.reactor.Command; +import org.apache.hc.core5.reactor.IOEventHandler; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.reactor.ProtocolIOSession; +import org.apache.hc.core5.reactor.ssl.SSLBufferMode; +import org.apache.hc.core5.reactor.ssl.SSLIOSession; +import org.apache.hc.core5.reactor.ssl.SSLMode; +import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer; +import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier; +import org.apache.hc.core5.reactor.ssl.TlsDetails; +import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer; +import org.apache.hc.core5.util.Timeout; + +/** + * {@link ProtocolIOSession} backed by a single HTTP/2 CONNECT stream. + *

+ * Supports optional TLS upgrade via {@link #startTls} for establishing + * secure tunnels to the target endpoint. + *

+ * + * @since 5.7 + */ +final class H2TunnelProtocolIOSession implements ProtocolIOSession { + + private static final IOEventHandler NOOP_HANDLER = new IOEventHandler() { + + @Override + public void connected(final IOSession session) { + } + + @Override + public void inputReady(final IOSession session, final ByteBuffer src) { + } + + @Override + public void outputReady(final IOSession session) { + } + + @Override + public void timeout(final IOSession session, final Timeout timeout) { + } + + @Override + public void exception(final IOSession session, final Exception cause) { + } + + @Override + public void disconnected(final IOSession session) { + } + }; + + private final NamedEndpoint initialEndpoint; + private final H2TunnelRawIOSession ioSession; + private final AtomicReference tlsSessionRef; + private final AtomicReference currentSessionRef; + + H2TunnelProtocolIOSession( + final IOSession physicalSession, + final NamedEndpoint initialEndpoint, + final Timeout socketTimeout, + final StreamControl streamControl) { + this.initialEndpoint = initialEndpoint; + this.ioSession = new H2TunnelRawIOSession(physicalSession, socketTimeout, streamControl); + this.tlsSessionRef = new AtomicReference<>(); + this.currentSessionRef = new AtomicReference<>(ioSession); + this.ioSession.upgrade(NOOP_HANDLER); + } + + void bindStreamControl(final StreamControl streamControl) { + ioSession.bindStreamControl(streamControl); + } + + void attachChannel(final DataStreamChannel channel) { + ioSession.attachChannel(channel); + } + + void updateCapacityChannel(final CapacityChannel capacityChannel) throws IOException { + ioSession.updateCapacityChannel(capacityChannel); + } + + int available() { + return ioSession.available(); + } + + void onInput(final ByteBuffer src) throws IOException { + final ByteBuffer handlerSrc = src != null ? src.asReadOnlyBuffer() : null; + + ioSession.appendInput(src); + + final IOSession currentSession = currentSessionRef.get(); + final IOEventHandler handler = currentSession.getHandler(); + if (handler != null) { + handler.inputReady(currentSession, handlerSrc); + if (handlerSrc != null) { + final int consumed = handlerSrc.position(); + if (consumed > 0) { + ioSession.discardInbound(consumed); + } + } + } + + if (ioSession.available() > 0) { + ioSession.requestOutput(); + } + } + + void onOutputReady() throws IOException { + final IOSession currentSession = currentSessionRef.get(); + final IOEventHandler handler = currentSession.getHandler(); + if (handler != null) { + handler.outputReady(currentSession); + } + ioSession.flushOutput(); + if (ioSession.available() > 0) { + ioSession.requestOutput(); + } + } + + void onRemoteStreamEnd() { + ioSession.onRemoteStreamEnd(); + final IOSession currentSession = currentSessionRef.get(); + final IOEventHandler handler = currentSession.getHandler(); + if (handler != null) { + handler.disconnected(currentSession); + } + } + + @Override + public NamedEndpoint getInitialEndpoint() { + return initialEndpoint; + } + + @Override + public IOEventHandler getHandler() { + return currentSessionRef.get().getHandler(); + } + + @Override + public void upgrade(final IOEventHandler handler) { + currentSessionRef.get().upgrade(handler); + } + + @Override + public Lock getLock() { + return ioSession.getLock(); + } + + @Override + public void enqueue(final Command command, final Command.Priority priority) { + currentSessionRef.get().enqueue(command, priority); + } + + @Override + public boolean hasCommands() { + return currentSessionRef.get().hasCommands(); + } + + @Override + public Command poll() { + return currentSessionRef.get().poll(); + } + + @Override + public ByteChannel channel() { + return currentSessionRef.get().channel(); + } + + @Override + public SocketAddress getRemoteAddress() { + return ioSession.getRemoteAddress(); + } + + @Override + public SocketAddress getLocalAddress() { + return ioSession.getLocalAddress(); + } + + @Override + public int getEventMask() { + return currentSessionRef.get().getEventMask(); + } + + @Override + public void setEventMask(final int ops) { + currentSessionRef.get().setEventMask(ops); + } + + @Override + public void setEvent(final int op) { + currentSessionRef.get().setEvent(op); + } + + @Override + public void clearEvent(final int op) { + currentSessionRef.get().clearEvent(op); + } + + @Override + public void close() { + close(CloseMode.GRACEFUL); + } + + @Override + public void close(final CloseMode closeMode) { + if (closeMode == CloseMode.IMMEDIATE) { + ioSession.close(closeMode); + } else { + currentSessionRef.get().close(closeMode); + } + } + + @Override + public Status getStatus() { + return currentSessionRef.get().getStatus(); + } + + @Override + public Timeout getSocketTimeout() { + return ioSession.getSocketTimeout(); + } + + @Override + public void setSocketTimeout(final Timeout timeout) { + ioSession.setSocketTimeout(timeout); + } + + @Override + public long getLastReadTime() { + return ioSession.getLastReadTime(); + } + + @Override + public long getLastWriteTime() { + return ioSession.getLastWriteTime(); + } + + @Override + public long getLastEventTime() { + return ioSession.getLastEventTime(); + } + + @Override + public void updateReadTime() { + ioSession.updateReadTime(); + } + + @Override + public void updateWriteTime() { + ioSession.updateWriteTime(); + } + + @Override + public int read(final ByteBuffer dst) throws IOException { + return currentSessionRef.get().read(dst); + } + + @Override + public int write(final ByteBuffer src) throws IOException { + return currentSessionRef.get().write(src); + } + + @Override + public boolean isOpen() { + return currentSessionRef.get().isOpen(); + } + + @Override + public String getId() { + return ioSession.getId(); + } + + @Override + public void startTls( + final SSLContext sslContext, + final NamedEndpoint endpoint, + final SSLBufferMode sslBufferMode, + final SSLSessionInitializer initializer, + final SSLSessionVerifier verifier, + final Timeout handshakeTimeout) { + startTls(sslContext, endpoint, sslBufferMode, initializer, verifier, handshakeTimeout, null); + } + + @Override + public void startTls( + final SSLContext sslContext, + final NamedEndpoint endpoint, + final SSLBufferMode sslBufferMode, + final SSLSessionInitializer initializer, + final SSLSessionVerifier verifier, + final Timeout handshakeTimeout, + final FutureCallback callback) { + + final SSLIOSession sslioSession = new SSLIOSession( + endpoint != null ? endpoint : initialEndpoint, + ioSession, + SSLMode.CLIENT, + sslContext, + sslBufferMode, + initializer, + verifier, + handshakeTimeout, + null, + null, + new CallbackContribution(callback) { + + @Override + public void completed(final SSLSession sslSession) { + if (callback != null) { + callback.completed(H2TunnelProtocolIOSession.this); + } + } + + }); + + if (tlsSessionRef.compareAndSet(null, sslioSession)) { + currentSessionRef.set(sslioSession); + } else { + throw new IllegalStateException("TLS already activated"); + } + + try { + sslioSession.beginHandshake(this); + } catch (final Exception ex) { + if (callback != null) { + callback.failed(ex); + } + close(CloseMode.IMMEDIATE); + } + } + + @Override + public TlsDetails getTlsDetails() { + final SSLIOSession sslIoSession = tlsSessionRef.get(); + return sslIoSession != null ? sslIoSession.getTlsDetails() : null; + } + + @Override + public int getPendingCommandCount() { + return currentSessionRef.get().getPendingCommandCount(); + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2TunnelProtocolStarter.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2TunnelProtocolStarter.java new file mode 100644 index 0000000000..cbad416933 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2TunnelProtocolStarter.java @@ -0,0 +1,74 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.impl.async; + +import org.apache.hc.core5.http.config.CharCodingConfig; +import org.apache.hc.core5.http.protocol.HttpProcessorBuilder; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.impl.nio.ClientH2PrefaceHandler; +import org.apache.hc.core5.http2.impl.nio.ClientH2StreamMultiplexerFactory; +import org.apache.hc.core5.reactor.IOEventHandler; +import org.apache.hc.core5.reactor.IOEventHandlerFactory; +import org.apache.hc.core5.reactor.ProtocolIOSession; + +/** + * Minimal {@link IOEventHandlerFactory} for starting HTTP/2 client protocol + * inside a CONNECT tunnel session. + *

+ * Unlike {@link H2AsyncClientProtocolStarter}, this factory does not + * install push consumer handling, frame/header logging listeners, or + * exception callbacks. Those concerns belong to the outer proxy + * connection, not the tunneled target connection. + *

+ * + * @since 5.7 + */ +final class H2TunnelProtocolStarter implements IOEventHandlerFactory { + + private final H2Config h2Config; + private final CharCodingConfig charCodingConfig; + + H2TunnelProtocolStarter( + final H2Config h2Config, + final CharCodingConfig charCodingConfig) { + this.h2Config = h2Config != null ? h2Config : H2Config.DEFAULT; + this.charCodingConfig = charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT; + } + + @Override + public IOEventHandler createHandler(final ProtocolIOSession ioSession, final Object attachment) { + final ClientH2StreamMultiplexerFactory multiplexerFactory = new ClientH2StreamMultiplexerFactory( + HttpProcessorBuilder.create().build(), + null, + h2Config, + charCodingConfig, + null); + return new ClientH2PrefaceHandler(ioSession, multiplexerFactory, false, null); + } + +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2TunnelRawIOSession.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2TunnelRawIOSession.java new file mode 100644 index 0000000000..038392fe24 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2TunnelRawIOSession.java @@ -0,0 +1,698 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.async; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.SelectionKey; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hc.core5.http.StreamControl; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.Command; +import org.apache.hc.core5.reactor.IOEventHandler; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.util.Timeout; + +/** + * Raw tunnel {@link IOSession} implementation with bounded buffering, + * capacity flow control and stream-scoped close semantics. + *

+ * Closing this session cancels only the CONNECT stream, + * not the underlying physical HTTP/2 connection. + *

+ * + * @since 5.7 + */ +final class H2TunnelRawIOSession implements IOSession { + + private static final int INBOUND_BUFFER_LIMIT = 64 * 1024; + private static final int OUTBOUND_BUFFER_LIMIT = 64 * 1024; + + private final IOSession physicalSession; + private final String id; + private final Lock lock; + + private final Deque commandQueue; + private final Deque inboundQueue; + private final Deque outboundQueue; + + private final AtomicReference handlerRef; + private final AtomicReference dataChannelRef; + private final AtomicReference streamControlRef; + + private CapacityChannel capacityChannel; + private Timeout socketTimeout; + private int eventMask; + private Status status; + + private int inboundBytes; + private int outboundBytes; + private int consumedBytesSinceUpdate; + + private boolean capacityInitialized; + private boolean localEndStreamSent; + private boolean remoteEndStream; + + private long lastReadTime; + private long lastWriteTime; + private long lastEventTime; + + H2TunnelRawIOSession( + final IOSession physicalSession, + final Timeout socketTimeout, + final StreamControl streamControl) { + this.physicalSession = physicalSession; + this.id = physicalSession.getId() + "-h2-tunnel"; + this.lock = new ReentrantLock(); + this.commandQueue = new ArrayDeque<>(); + this.inboundQueue = new ArrayDeque<>(); + this.outboundQueue = new ArrayDeque<>(); + this.handlerRef = new AtomicReference<>(); + this.dataChannelRef = new AtomicReference<>(); + this.streamControlRef = new AtomicReference<>(streamControl); + + this.capacityChannel = null; + this.socketTimeout = socketTimeout; + this.eventMask = SelectionKey.OP_READ; + this.status = Status.ACTIVE; + + this.capacityInitialized = false; + this.localEndStreamSent = false; + this.remoteEndStream = false; + + final long now = System.currentTimeMillis(); + this.lastReadTime = now; + this.lastWriteTime = now; + this.lastEventTime = now; + } + + void bindStreamControl(final StreamControl streamControl) { + streamControlRef.compareAndSet(null, streamControl); + } + + void attachChannel(final DataStreamChannel channel) { + dataChannelRef.set(channel); + } + + void updateCapacityChannel(final CapacityChannel capacityChannel) throws IOException { + int update = 0; + lock.lock(); + try { + this.capacityChannel = capacityChannel; + if (!capacityInitialized) { + update += Math.max(0, INBOUND_BUFFER_LIMIT - inboundBytes); + capacityInitialized = true; + } + if (consumedBytesSinceUpdate > 0) { + update += consumedBytesSinceUpdate; + consumedBytesSinceUpdate = 0; + } + } finally { + lock.unlock(); + } + if (update > 0) { + capacityChannel.update(update); + } + } + + void onRemoteStreamEnd() { + lock.lock(); + try { + remoteEndStream = true; + if (status == Status.ACTIVE) { + status = Status.CLOSING; + } + if (localEndStreamSent) { + status = Status.CLOSED; + } + lastEventTime = System.currentTimeMillis(); + } finally { + lock.unlock(); + } + } + + void requestOutput() { + final DataStreamChannel dataChannel = dataChannelRef.get(); + if (dataChannel != null) { + dataChannel.requestOutput(); + } + } + + int available() { + lock.lock(); + try { + if (outboundBytes > 0) { + return outboundBytes; + } + if (!localEndStreamSent && status == Status.CLOSING) { + return 1; + } + if (!commandQueue.isEmpty() || (eventMask & SelectionKey.OP_WRITE) != 0) { + return 1; + } + return 0; + } finally { + lock.unlock(); + } + } + + void appendInput(final ByteBuffer src) throws IOException { + if (src == null || !src.hasRemaining()) { + return; + } + lock.lock(); + try { + if (status == Status.CLOSED) { + return; + } + final int remaining = src.remaining(); + final int freeSpace = INBOUND_BUFFER_LIMIT - inboundBytes; + if (remaining > freeSpace) { + throw new IOException("Tunnel inbound buffer overflow"); + } + final byte[] data = new byte[remaining]; + src.get(data); + inboundQueue.addLast(ByteBuffer.wrap(data)); + inboundBytes += data.length; + final long now = System.currentTimeMillis(); + lastReadTime = now; + lastEventTime = now; + } finally { + lock.unlock(); + } + } + + void discardInbound(final int bytes) throws IOException { + if (bytes <= 0) { + return; + } + int remaining = bytes; + int update = 0; + CapacityChannel currentCapacityChannel = null; + + lock.lock(); + try { + while (remaining > 0) { + final ByteBuffer buffer = inboundQueue.peekFirst(); + if (buffer == null) { + break; + } + final int chunk = Math.min(remaining, buffer.remaining()); + if (chunk <= 0) { + break; + } + buffer.position(buffer.position() + chunk); + remaining -= chunk; + inboundBytes -= chunk; + if (!buffer.hasRemaining()) { + inboundQueue.pollFirst(); + } + consumedBytesSinceUpdate += chunk; + } + if (capacityChannel != null && consumedBytesSinceUpdate > 0) { + currentCapacityChannel = capacityChannel; + update = consumedBytesSinceUpdate; + consumedBytesSinceUpdate = 0; + } + } finally { + lock.unlock(); + } + + if (currentCapacityChannel != null && update > 0) { + currentCapacityChannel.update(update); + } + } + + void flushOutput() throws IOException { + final DataStreamChannel dataChannel = dataChannelRef.get(); + if (dataChannel == null) { + return; + } + + boolean sendEndStream = false; + + lock.lock(); + try { + for (; ; ) { + final ByteBuffer buffer = outboundQueue.peekFirst(); + if (buffer == null) { + break; + } + final int bytesWritten = dataChannel.write(buffer); + if (bytesWritten <= 0) { + break; + } + outboundBytes -= bytesWritten; + if (!buffer.hasRemaining()) { + outboundQueue.pollFirst(); + } + final long now = System.currentTimeMillis(); + lastWriteTime = now; + lastEventTime = now; + } + if (!localEndStreamSent && status == Status.CLOSING && outboundQueue.isEmpty()) { + localEndStreamSent = true; + sendEndStream = true; + } + } finally { + lock.unlock(); + } + + if (sendEndStream) { + try { + dataChannel.endStream(null); + } finally { + lock.lock(); + try { + if (remoteEndStream) { + status = Status.CLOSED; + } + } finally { + lock.unlock(); + } + } + } + } + + private void cancelPendingCommands() { + for (final Command command : commandQueue) { + command.cancel(); + } + commandQueue.clear(); + } + + private void cancelStream() { + final StreamControl streamControl = streamControlRef.get(); + if (streamControl != null) { + streamControl.cancel(); + } + } + + @Override + public IOEventHandler getHandler() { + return handlerRef.get(); + } + + @Override + public void upgrade(final IOEventHandler handler) { + handlerRef.set(handler); + } + + @Override + public Lock getLock() { + return lock; + } + + @Override + public void enqueue(final Command command, final Command.Priority priority) { + if (command == null) { + return; + } + lock.lock(); + try { + if (status != Status.ACTIVE) { + command.cancel(); + return; + } + if (priority == Command.Priority.IMMEDIATE) { + commandQueue.addFirst(command); + } else { + commandQueue.addLast(command); + } + lastEventTime = System.currentTimeMillis(); + } finally { + lock.unlock(); + } + requestOutput(); + } + + @Override + public boolean hasCommands() { + lock.lock(); + try { + return !commandQueue.isEmpty(); + } finally { + lock.unlock(); + } + } + + @Override + public Command poll() { + lock.lock(); + try { + return commandQueue.pollFirst(); + } finally { + lock.unlock(); + } + } + + @Override + public ByteChannel channel() { + return this; + } + + @Override + public SocketAddress getRemoteAddress() { + return physicalSession.getRemoteAddress(); + } + + @Override + public SocketAddress getLocalAddress() { + return physicalSession.getLocalAddress(); + } + + @Override + public int getEventMask() { + lock.lock(); + try { + return eventMask; + } finally { + lock.unlock(); + } + } + + @Override + public void setEventMask(final int ops) { + final boolean wantOutput = (ops & SelectionKey.OP_WRITE) != 0; + lock.lock(); + try { + eventMask = ops; + lastEventTime = System.currentTimeMillis(); + } finally { + lock.unlock(); + } + if (wantOutput) { + requestOutput(); + } + } + + @Override + public void setEvent(final int op) { + final boolean wantOutput = (op & SelectionKey.OP_WRITE) != 0; + lock.lock(); + try { + eventMask |= op; + lastEventTime = System.currentTimeMillis(); + } finally { + lock.unlock(); + } + if (wantOutput) { + requestOutput(); + } + } + + @Override + public void clearEvent(final int op) { + lock.lock(); + try { + eventMask &= ~op; + lastEventTime = System.currentTimeMillis(); + } finally { + lock.unlock(); + } + } + + @Override + public void close() { + close(CloseMode.GRACEFUL); + } + + @Override + public void close(final CloseMode closeMode) { + boolean cancel = false; + + lock.lock(); + try { + if (status == Status.CLOSED) { + return; + } + if (closeMode == CloseMode.IMMEDIATE) { + status = Status.CLOSED; + localEndStreamSent = true; + cancelPendingCommands(); + inboundQueue.clear(); + inboundBytes = 0; + outboundQueue.clear(); + outboundBytes = 0; + consumedBytesSinceUpdate = 0; + cancel = true; + } else { + status = Status.CLOSING; + if (dataChannelRef.get() == null && outboundBytes == 0) { + status = Status.CLOSED; + localEndStreamSent = true; + cancel = true; + } + } + lastEventTime = System.currentTimeMillis(); + } finally { + lock.unlock(); + } + + if (cancel) { + cancelStream(); + } else { + requestOutput(); + } + } + + @Override + public Status getStatus() { + lock.lock(); + try { + return status; + } finally { + lock.unlock(); + } + } + + @Override + public Timeout getSocketTimeout() { + lock.lock(); + try { + return socketTimeout; + } finally { + lock.unlock(); + } + } + + @Override + public void setSocketTimeout(final Timeout timeout) { + lock.lock(); + try { + socketTimeout = timeout; + lastEventTime = System.currentTimeMillis(); + } finally { + lock.unlock(); + } + } + + @Override + public long getLastReadTime() { + lock.lock(); + try { + return lastReadTime; + } finally { + lock.unlock(); + } + } + + @Override + public long getLastWriteTime() { + lock.lock(); + try { + return lastWriteTime; + } finally { + lock.unlock(); + } + } + + @Override + public long getLastEventTime() { + lock.lock(); + try { + return lastEventTime; + } finally { + lock.unlock(); + } + } + + @Override + public void updateReadTime() { + final long now = System.currentTimeMillis(); + lock.lock(); + try { + lastReadTime = now; + lastEventTime = now; + } finally { + lock.unlock(); + } + } + + @Override + public void updateWriteTime() { + final long now = System.currentTimeMillis(); + lock.lock(); + try { + lastWriteTime = now; + lastEventTime = now; + } finally { + lock.unlock(); + } + } + + @Override + public int read(final ByteBuffer dst) throws IOException { + int total = 0; + int update = 0; + CapacityChannel currentCapacityChannel = null; + + lock.lock(); + try { + if (inboundQueue.isEmpty()) { + return remoteEndStream || status == Status.CLOSED ? -1 : 0; + } + while (dst.hasRemaining()) { + final ByteBuffer buffer = inboundQueue.peekFirst(); + if (buffer == null) { + break; + } + final int chunk = Math.min(dst.remaining(), buffer.remaining()); + if (chunk <= 0) { + break; + } + + if (buffer.hasArray()) { + final int pos = buffer.position(); + dst.put(buffer.array(), buffer.arrayOffset() + pos, chunk); + buffer.position(pos + chunk); + } else { + for (int i = 0; i < chunk; i++) { + dst.put(buffer.get()); + } + } + + total += chunk; + inboundBytes -= chunk; + if (!buffer.hasRemaining()) { + inboundQueue.pollFirst(); + } + } + + if (total > 0) { + consumedBytesSinceUpdate += total; + final long now = System.currentTimeMillis(); + lastReadTime = now; + lastEventTime = now; + if (capacityChannel != null && consumedBytesSinceUpdate > 0) { + currentCapacityChannel = capacityChannel; + update = consumedBytesSinceUpdate; + consumedBytesSinceUpdate = 0; + } + } + } finally { + lock.unlock(); + } + + if (currentCapacityChannel != null && update > 0) { + currentCapacityChannel.update(update); + } + return total; + } + + @Override + public int write(final ByteBuffer src) { + if (src == null || !src.hasRemaining()) { + return 0; + } + int bytesAccepted = 0; + + lock.lock(); + try { + if (status != Status.ACTIVE) { + return 0; + } + final int freeSpace = OUTBOUND_BUFFER_LIMIT - outboundBytes; + if (freeSpace <= 0) { + return 0; + } + bytesAccepted = Math.min(src.remaining(), freeSpace); + if (bytesAccepted <= 0) { + return 0; + } + + final byte[] data = new byte[bytesAccepted]; + src.get(data); + + outboundQueue.addLast(ByteBuffer.wrap(data)); + outboundBytes += bytesAccepted; + + final long now = System.currentTimeMillis(); + lastWriteTime = now; + lastEventTime = now; + } finally { + lock.unlock(); + } + + requestOutput(); + return bytesAccepted; + } + + @Override + public boolean isOpen() { + lock.lock(); + try { + return status != Status.CLOSED && physicalSession.isOpen(); + } finally { + lock.unlock(); + } + } + + @Override + public String getId() { + return id; + } + + @Override + public int getPendingCommandCount() { + lock.lock(); + try { + return commandQueue.size(); + } finally { + lock.unlock(); + } + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java index 7afd03ccf0..a419c6360b 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java @@ -276,12 +276,16 @@ public static MinimalHttpAsyncClient createMinimal(final AsyncClientConnectionMa private static MinimalH2AsyncClient createMinimalHttp2AsyncClientImpl( final IOEventHandlerFactory eventHandlerFactory, final AsyncPushConsumerRegistry pushConsumerRegistry, + final H2Config h2Config, + final CharCodingConfig charCodingConfig, final IOReactorConfig ioReactorConfig, final DnsResolver dnsResolver, final TlsStrategy tlsStrategy) { return new MinimalH2AsyncClient( eventHandlerFactory, pushConsumerRegistry, + h2Config, + charCodingConfig, ioReactorConfig, new DefaultThreadFactory("httpclient-main", true), new DefaultThreadFactory("httpclient-dispatch", true), @@ -307,6 +311,8 @@ public static MinimalH2AsyncClient createHttp2Minimal( CharCodingConfig.DEFAULT, LoggingExceptionCallback.INSTANCE), pushConsumerRegistry, + h2Config, + CharCodingConfig.DEFAULT, ioReactorConfig, dnsResolver, tlsStrategy); diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java index 4d4c056124..9934c10fc0 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java @@ -108,11 +108,7 @@ HttpRoute determineRoute( final HttpHost httpHost, final HttpRequest request, final HttpClientContext clientContext) throws HttpException { - final HttpRoute route = routePlanner.determineRoute(httpHost, request, clientContext); - if (route.isTunnelled()) { - throw new HttpException("HTTP/2 tunneling not supported"); - } - return route; + return routePlanner.determineRoute(httpHost, request, clientContext); } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java index 6e2fd565db..4607765834 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java @@ -188,6 +188,12 @@ Endpoint ensureValid() { return endpoint; } + @Override + public HttpRoute getEstablishedRoute() { + final Endpoint endpoint = sessionRef.get(); + return endpoint != null ? endpoint.route : null; + } + @Override public Cancellable connectEndpoint( final HttpClientContext context, diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2ConnPool.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2ConnPool.java index 97c6981ff7..46c15c1ae7 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2ConnPool.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2ConnPool.java @@ -51,6 +51,8 @@ import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer; import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class InternalH2ConnPool implements ModalCloseable { @@ -58,10 +60,23 @@ class InternalH2ConnPool implements ModalCloseable { private volatile Resolver connectionConfigResolver; - InternalH2ConnPool(final ConnectionInitiator connectionInitiator, - final Resolver addressResolver, - final TlsStrategy tlsStrategy) { - this.sessionPool = new SessionPool(connectionInitiator, addressResolver, tlsStrategy); + InternalH2ConnPool( + final ConnectionInitiator connectionInitiator, + final Resolver addressResolver, + final TlsStrategy tlsStrategy) { + this(connectionInitiator, addressResolver, tlsStrategy, null); + } + + InternalH2ConnPool( + final ConnectionInitiator connectionInitiator, + final Resolver addressResolver, + final TlsStrategy tlsStrategy, + final H2RouteOperator routeOperator) { + this.sessionPool = new SessionPool( + connectionInitiator, + addressResolver, + tlsStrategy, + routeOperator); } @Override @@ -74,9 +89,10 @@ public void close() { sessionPool.close(); } - private ConnectionConfig resolveConnectionConfig(final HttpHost httpHost) { + private ConnectionConfig resolveConnectionConfig(final HttpRoute route) { + final HttpHost firstHop = route.getProxyHost() != null ? route.getProxyHost() : route.getTargetHost(); final Resolver resolver = this.connectionConfigResolver; - final ConnectionConfig connectionConfig = resolver != null ? resolver.resolve(httpHost) : null; + final ConnectionConfig connectionConfig = resolver != null ? resolver.resolve(firstHop) : null; return connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT; } @@ -84,7 +100,7 @@ public Future getSession( final HttpRoute route, final Timeout connectTimeout, final FutureCallback callback) { - final ConnectionConfig connectionConfig = resolveConnectionConfig(route.getTargetHost()); + final ConnectionConfig connectionConfig = resolveConnectionConfig(route); return sessionPool.getSession( route, connectTimeout != null ? connectTimeout : connectionConfig.getConnectTimeout(), @@ -118,32 +134,41 @@ public void setValidateAfterInactivity(final TimeValue timeValue) { sessionPool.validateAfterInactivity = timeValue; } - static class SessionPool extends AbstractIOSessionPool { + private static final Logger LOG = LoggerFactory.getLogger(InternalH2ConnPool.class); + private final ConnectionInitiator connectionInitiator; private final Resolver addressResolver; private final TlsStrategy tlsStrategy; + private final H2RouteOperator routeOperator; private volatile TimeValue validateAfterInactivity = TimeValue.NEG_ONE_MILLISECOND; - SessionPool(final ConnectionInitiator connectionInitiator, - final Resolver addressResolver, - final TlsStrategy tlsStrategy) { + SessionPool( + final ConnectionInitiator connectionInitiator, + final Resolver addressResolver, + final TlsStrategy tlsStrategy, + final H2RouteOperator routeOperator) { this.connectionInitiator = connectionInitiator; this.addressResolver = addressResolver; this.tlsStrategy = tlsStrategy; + this.routeOperator = routeOperator; } @Override - protected Future connectSession(final HttpRoute route, - final Timeout connectTimeout, - final FutureCallback callback) { + protected Future connectSession( + final HttpRoute route, + final Timeout connectTimeout, + final FutureCallback callback) { + final HttpHost proxy = route.getProxyHost(); final HttpHost target = route.getTargetHost(); + final HttpHost firstHop = proxy != null ? proxy : target; + final NamedEndpoint firstHopName = proxy == null && route.getTargetName() != null ? route.getTargetName() : firstHop; final InetSocketAddress localAddress = route.getLocalSocketAddress(); - final InetSocketAddress remoteAddress = addressResolver.resolve(target); + final InetSocketAddress remoteAddress = addressResolver.resolve(firstHop); return connectionInitiator.connect( - target, + firstHopName, remoteAddress, localAddress, connectTimeout, @@ -153,34 +178,46 @@ protected Future connectSession(final HttpRoute route, @Override public void completed(final IOSession ioSession) { if (tlsStrategy != null - && URIScheme.HTTPS.same(target.getSchemeName()) + && URIScheme.HTTPS.same(firstHop.getSchemeName()) && ioSession instanceof TransportSecurityLayer) { - final NamedEndpoint tlsName = route.getTargetName() != null ? route.getTargetName() : target; tlsStrategy.upgrade( (TransportSecurityLayer) ioSession, - tlsName, + firstHopName, null, connectTimeout, new CallbackContribution(callback) { @Override public void completed(final TransportSecurityLayer transportSecurityLayer) { - callback.completed(ioSession); + completeRoute(route, connectTimeout, ioSession, callback); } }); ioSession.setSocketTimeout(connectTimeout); } else { - callback.completed(ioSession); + completeRoute(route, connectTimeout, ioSession, callback); } } }); } + private void completeRoute( + final HttpRoute route, + final Timeout connectTimeout, + final IOSession ioSession, + final FutureCallback callback) { + if (routeOperator != null) { + routeOperator.completeRoute(route, connectTimeout, ioSession, callback); + } else { + callback.completed(ioSession); + } + } + @Override - protected void validateSession(final IOSession ioSession, - final Callback callback) { + protected void validateSession( + final IOSession ioSession, + final Callback callback) { if (ioSession.isOpen()) { final TimeValue timeValue = validateAfterInactivity; if (TimeValue.isNonNegative(timeValue)) { @@ -202,8 +239,9 @@ protected void validateSession(final IOSession ioSession, } @Override - protected void closeSession(final IOSession ioSession, - final CloseMode closeMode) { + protected void closeSession( + final IOSession ioSession, + final CloseMode closeMode) { if (closeMode == CloseMode.GRACEFUL) { ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL); } else { diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalH2AsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalH2AsyncClient.java index 6ec4c8615e..75c4a870ac 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalH2AsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalH2AsyncClient.java @@ -54,6 +54,7 @@ import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.URIScheme; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.CapacityChannel; @@ -61,9 +62,11 @@ import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http.nio.RequestChannel; import org.apache.hc.core5.http.nio.command.RequestExecutionCommand; +import org.apache.hc.core5.http.config.CharCodingConfig; import org.apache.hc.core5.http.nio.command.ShutdownCommand; import org.apache.hc.core5.http.nio.ssl.TlsStrategy; import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.reactor.Command; import org.apache.hc.core5.reactor.ConnectionInitiator; @@ -79,8 +82,7 @@ /** * Minimal implementation of HTTP/2 only {@link CloseableHttpAsyncClient}. This client * is optimized for HTTP/2 multiplexing message transport and does not support advanced - * HTTP protocol functionality such as request execution via a proxy, state management, - * authentication and request redirects. + * HTTP protocol functionality such as state management, authentication and request redirects. *

* Concurrent message exchanges with the same connection route executed by * this client will get automatically multiplexed over a single physical HTTP/2 @@ -99,6 +101,8 @@ public final class MinimalH2AsyncClient extends AbstractMinimalHttpAsyncClientBa MinimalH2AsyncClient( final IOEventHandlerFactory eventHandlerFactory, final AsyncPushConsumerRegistry pushConsumerRegistry, + final H2Config h2Config, + final CharCodingConfig charCodingConfig, final IOReactorConfig reactorConfig, final ThreadFactory threadFactory, final ThreadFactory workerThreadFactory, @@ -115,7 +119,13 @@ public final class MinimalH2AsyncClient extends AbstractMinimalHttpAsyncClientBa pushConsumerRegistry, threadFactory); this.connectionInitiator = new MultihomeConnectionInitiator(getConnectionInitiator(), dnsResolver); - this.connPool = new InternalH2ConnPool(this.connectionInitiator, object -> null, tlsStrategy); + this.connPool = new InternalH2ConnPool( + this.connectionInitiator, + object -> null, + tlsStrategy, + new H2RouteOperator( + tlsStrategy, + new H2TunnelProtocolStarter(h2Config, charCodingConfig))); } @Override @@ -144,8 +154,14 @@ public Cancellable execute( final Timeout connectTimeout = requestConfig.getConnectTimeout(); final Timeout responseTimeout = requestConfig.getResponseTimeout(); final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority()); + final HttpHost proxy = requestConfig.getProxy(); + final HttpRoute route = proxy != null ? new HttpRoute( + target, + null, + proxy, + URIScheme.HTTPS.same(target.getSchemeName())) : new HttpRoute(target); - final Future sessionFuture = connPool.getSession(new HttpRoute(target), connectTimeout, + final Future sessionFuture = connPool.getSession(route, connectTimeout, new FutureCallback() { @Override diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/TunnelRefusedException.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/TunnelRefusedException.java new file mode 100644 index 0000000000..3852b2b97b --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/TunnelRefusedException.java @@ -0,0 +1,86 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.async; + +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.message.BasicHttpResponse; +import org.apache.hc.core5.http.message.StatusLine; +import org.apache.hc.core5.util.Args; + +/** + * Exception indicating CONNECT tunnel refusal by the proxy. + * + * @since 5.7 + */ +public final class TunnelRefusedException extends HttpException { + + private static final long serialVersionUID = 1L; + + private final HttpResponse response; + + /** + * Creates a new exception from the proxy response that refused the tunnel. + * + * @param response the non-2xx CONNECT response from the proxy + * @since 5.7 + */ + public TunnelRefusedException(final HttpResponse response) { + super("Tunnel refused: " + new StatusLine(Args.notNull(response, "Response"))); + this.response = copy(response); + } + + /** + * Returns a defensive copy of the proxy response that refused the tunnel. + * + * @return the proxy response + * @since 5.7 + */ + public HttpResponse getResponse() { + return response; + } + + /** + * Returns the HTTP status code of the proxy response. + * + * @return the status code (e.g. 407 for proxy authentication required) + * @since 5.7 + */ + public int getStatusCode() { + return response.getCode(); + } + + private static HttpResponse copy(final HttpResponse response) { + final BasicHttpResponse copy = new BasicHttpResponse(response.getCode(), response.getReasonPhrase()); + copy.setVersion(response.getVersion()); + for (final Header header : response.getHeaders()) { + copy.addHeader(header); + } + return copy; + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/auth/AuthenticationHandler.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/auth/AuthenticationHandler.java index fc7e4a22dd..fee973d15e 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/auth/AuthenticationHandler.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/auth/AuthenticationHandler.java @@ -43,6 +43,7 @@ import org.apache.hc.client5.http.auth.ChallengeType; import org.apache.hc.client5.http.auth.CredentialsProvider; import org.apache.hc.client5.http.auth.MalformedChallengeException; +import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.Internal; @@ -362,6 +363,47 @@ public boolean handleResponse( return false; } + /** + * Determines whether proxy authentication is needed for the given response, + * updating the {@link AuthExchange} and auth cache state as appropriate. + * + * @since 5.7 + */ + public boolean needProxyAuthentication( + final AuthExchange proxyAuthExchange, + final HttpHost proxy, + final HttpResponse response, + final AuthenticationStrategy proxyAuthStrategy, + final AuthCacheKeeper authCacheKeeper, + final HttpClientContext context) throws AuthenticationException, MalformedChallengeException { + final RequestConfig config = context.getRequestConfigOrDefault(); + if (config.isAuthenticationEnabled()) { + final boolean proxyAuthRequested = isChallenged( + proxy, ChallengeType.PROXY, response, proxyAuthExchange, context); + final boolean proxyMutualAuthRequired = isChallengeExpected(proxyAuthExchange); + + if (authCacheKeeper != null) { + if (proxyAuthRequested) { + authCacheKeeper.updateOnChallenge(proxy, null, proxyAuthExchange, context); + } else { + authCacheKeeper.updateOnNoChallenge(proxy, null, proxyAuthExchange, context); + } + } + + if (proxyAuthRequested || proxyMutualAuthRequired) { + final boolean updated = handleResponse( + proxy, ChallengeType.PROXY, response, proxyAuthStrategy, proxyAuthExchange, context); + + if (authCacheKeeper != null) { + authCacheKeeper.updateOnResponse(proxy, null, proxyAuthExchange, context); + } + + return updated; + } + } + return false; + } + /** * Generates a response to the authentication challenge based on the actual {@link AuthExchange} state * and adds it to the given {@link HttpRequest} message . diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientH2ViaH2ProxyTunnel.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientH2ViaH2ProxyTunnel.java new file mode 100644 index 0000000000..e32b599728 --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientH2ViaH2ProxyTunnel.java @@ -0,0 +1,150 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.examples; + +import java.io.File; +import java.util.concurrent.CountDownLatch; + +import javax.net.ssl.SSLContext; + +import org.apache.hc.client5.http.HttpRoute; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.client5.http.routing.HttpRoutePlanner; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy; +import org.apache.hc.core5.http.message.StatusLine; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.ssl.SSLContexts; + +/** + * Full example of pure HTTP/2 client execution through an HTTP/2 proxy tunnel. + * + *

+ * Requirements: + *

+ *
    + *
  • Proxy endpoint speaks HTTP/2.
  • + *
  • Proxy supports CONNECT for the requested target.
  • + *
  • Target endpoint supports HTTP/2.
  • + *
+ * + *

+ * This example configures a tunneled and layered route: + * {@code client -> (h2) proxy -> CONNECT tunnel -> TLS -> (h2) target}. + *

+ */ +public class AsyncClientH2ViaH2ProxyTunnel { + + private static TlsStrategy createTlsStrategy() throws Exception { + final String trustStore = System.getProperty("h2.truststore"); + if (trustStore == null || trustStore.isEmpty()) { + return new H2ClientTlsStrategy(); + } + final String trustStorePassword = System.getProperty("h2.truststore.password", "changeit"); + final SSLContext sslContext = SSLContexts.custom() + .loadTrustMaterial(new File(trustStore), trustStorePassword.toCharArray()) + .build(); + return new H2ClientTlsStrategy(sslContext); + } + + public static void main(final String[] args) throws Exception { + final String proxyScheme = System.getProperty("h2.proxy.scheme", "http"); + final String proxyHost = System.getProperty("h2.proxy.host", "localhost"); + final int proxyPort = Integer.parseInt(System.getProperty("h2.proxy.port", "8080")); + final String targetScheme = System.getProperty("h2.target.scheme", "https"); + final String targetHost = System.getProperty("h2.target.host", "origin"); + final int targetPort = Integer.parseInt(System.getProperty("h2.target.port", "9443")); + final String[] requestUris = System.getProperty("h2.paths", "/").split(","); + + final HttpHost proxy = new HttpHost(proxyScheme, proxyHost, proxyPort); + final HttpHost target = new HttpHost(targetScheme, targetHost, targetPort); + + final HttpRoutePlanner routePlanner = (final HttpHost routeTarget, final org.apache.hc.core5.http.protocol.HttpContext context) -> + new HttpRoute(routeTarget, null, proxy, URIScheme.HTTPS.same(routeTarget.getSchemeName())); + final TlsStrategy tlsStrategy = createTlsStrategy(); + + try (CloseableHttpAsyncClient client = H2AsyncClientBuilder.create() + .setRoutePlanner(routePlanner) + .setTlsStrategy(tlsStrategy) + .build()) { + + client.start(); + + final CountDownLatch latch = new CountDownLatch(requestUris.length); + + for (final String requestUri : requestUris) { + final String normalizedRequestUri = requestUri.trim(); + final SimpleHttpRequest request = SimpleRequestBuilder.get() + .setHttpHost(target) + .setPath(normalizedRequestUri) + .build(); + final HttpClientContext clientContext = HttpClientContext.create(); + + client.execute( + SimpleRequestProducer.create(request), + SimpleResponseConsumer.create(), + clientContext, + new FutureCallback() { + + @Override + public void completed(final SimpleHttpResponse response) { + latch.countDown(); + System.out.println(request + " -> " + new StatusLine(response)); + System.out.println("Protocol: " + clientContext.getProtocolVersion()); + System.out.println(response.getBodyText()); + } + + @Override + public void failed(final Exception ex) { + latch.countDown(); + System.out.println(request + " -> " + ex); + } + + @Override + public void cancelled() { + latch.countDown(); + System.out.println(request + " cancelled"); + } + + }); + } + + latch.await(); + client.close(CloseMode.GRACEFUL); + } + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestH2OverH2TunnelSupport.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestH2OverH2TunnelSupport.java new file mode 100644 index 0000000000..3d80b9c0b9 --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestH2OverH2TunnelSupport.java @@ -0,0 +1,718 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.async; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.StreamControl; +import org.apache.hc.core5.http.message.BasicHttpResponse; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.RequestChannel; +import org.apache.hc.core5.http.nio.command.RequestExecutionCommand; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.net.NamedEndpoint; +import org.apache.hc.core5.reactor.Command; +import org.apache.hc.core5.reactor.IOEventHandler; +import org.apache.hc.core5.reactor.IOEventHandlerFactory; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.reactor.ProtocolIOSession; +import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class TestH2OverH2TunnelSupport { + + @Test + void testEstablishBuildsH2ConnectForAuthorityAndCompletes() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true); + final HttpHost target = new HttpHost("https", "example.org", 443); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish(session, target, Timeout.ofSeconds(1), false, null, callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertNull(callback.failed); + Assertions.assertFalse(callback.cancelled); + Assertions.assertNotNull(session.capturedRequest); + Assertions.assertEquals("CONNECT", session.capturedRequest.getMethod()); + Assertions.assertEquals("example.org", session.capturedRequest.getAuthority().getHostName()); + Assertions.assertEquals(443, session.capturedRequest.getAuthority().getPort()); + Assertions.assertNull(session.capturedRequest.getScheme()); + Assertions.assertNull(session.capturedRequest.getPath()); + } + + @Test + void testEstablishAppliesConnectRequestInterceptor() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("https", "example.org", 443), + Timeout.ofSeconds(1), + false, + null, + (request, entityDetails, context) -> request.addHeader("Proxy-Authorization", "Basic dGVzdDp0ZXN0"), + callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertNotNull(session.capturedRequest); + Assertions.assertEquals("Basic dGVzdDp0ZXN0", session.capturedRequest.getFirstHeader("Proxy-Authorization").getValue()); + } + + @Test + void testEstablishFailsOnRefusedTunnel() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED, false); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("https", "example.org", 443), + Timeout.ofSeconds(1), + false, + null, + callback); + + Assertions.assertFalse(callback.completed); + Assertions.assertNotNull(callback.failed); + Assertions.assertInstanceOf(TunnelRefusedException.class, callback.failed); + Assertions.assertEquals( + HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED, + ((TunnelRefusedException) callback.failed).getStatusCode()); + } + + @Test + void testEstablishFailsWhenConnectResponseHasNoTunnelStream() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, false); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("https", "example.org", 443), + Timeout.ofSeconds(1), + false, + null, + callback); + + Assertions.assertFalse(callback.completed); + Assertions.assertNotNull(callback.failed); + } + + @Test + void testEstablishWithProtocolStarterInvokesConnectedAndSeesInputBuffer() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true, false, true, false); + final RecordingProtocolStarter protocolStarter = new RecordingProtocolStarter(); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("http", "example.org", 80), + Timeout.ofSeconds(1), + false, + null, + protocolStarter, + callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertNull(callback.failed); + Assertions.assertTrue(protocolStarter.connectedCalled); + Assertions.assertTrue(protocolStarter.inputBufferSeen); + } + + @Test + void testEstablishSecureTunnelUsesTlsStrategy() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true); + final RecordingTlsStrategy tlsStrategy = new RecordingTlsStrategy(); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("https", "example.org", 443), + Timeout.ofSeconds(1), + true, + tlsStrategy, + callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertNull(callback.failed); + Assertions.assertTrue(tlsStrategy.invoked); + } + + @Test + void testClosingTunnelDoesNotClosePhysicalSession() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("http", "example.org", 80), + Timeout.ofSeconds(1), + false, + null, + callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertNotNull(callback.result); + callback.result.close(CloseMode.IMMEDIATE); + Assertions.assertTrue(session.isOpen(), "Closing tunnel session must not close physical HTTP/2 connection"); + } + + @Test + void testTunnelImmediateCloseCancelsStreamControlWhenPresent() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true, false, false, true); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("http", "example.org", 80), + Timeout.ofSeconds(1), + false, + null, + callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertNotNull(callback.result); + callback.result.close(CloseMode.IMMEDIATE); + + Assertions.assertTrue(session.streamCancelCalled.get(), "IMMEDIATE close must cancel the CONNECT stream"); + Assertions.assertTrue(session.isOpen(), "Cancelling tunnel stream must not close physical HTTP/2 connection"); + } + + @Test + void testTunnelWriteBufferIsBounded() throws Exception { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("http", "example.org", 80), + Timeout.ofSeconds(1), + false, + null, + callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertNotNull(callback.result); + final int payloadSize = 1024 * 1024; + final ByteBuffer src = ByteBuffer.allocate(payloadSize); + final int written = callback.result.write(src); + Assertions.assertTrue(written > 0); + Assertions.assertTrue(written < payloadSize, "Outbound writes must be bounded by tunnel buffer capacity"); + } + + @Test + void testCapacityUpdateIsNotUnbounded() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true, true, false, false); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("http", "example.org", 80), + Timeout.ofSeconds(1), + false, + null, + callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertTrue(session.lastCapacityUpdate > 0, "Tunnel setup should publish initial bounded capacity"); + Assertions.assertTrue(session.lastCapacityUpdate < Integer.MAX_VALUE, "Capacity must not be unbounded"); + } + + @Test + void testGracefulCloseEndsStreamAfterDrain() throws Exception { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("http", "example.org", 80), + Timeout.ofSeconds(1), + false, + null, + callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertNotNull(callback.result); + + final ByteBuffer src = ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5}); + final int written = callback.result.write(src); + Assertions.assertEquals(5, written); + + callback.result.close(CloseMode.GRACEFUL); + + // Drive output flush: test runs in same package -> can call package-private method. + ((H2TunnelProtocolIOSession) callback.result).onOutputReady(); + + Assertions.assertTrue(session.endStreamCalled.get(), "GRACEFUL close must end the CONNECT stream after draining"); + Assertions.assertTrue(session.isOpen(), "Ending tunnel stream must not close physical HTTP/2 connection"); + } + + @Test + void testStreamEndAfterEstablishedTunnelDoesNotFireFailure() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true, false, false, false, true); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("http", "example.org", 80), + Timeout.ofSeconds(1), + false, + null, + callback); + + Assertions.assertTrue(callback.completed, "Tunnel should complete successfully"); + Assertions.assertNull(callback.failed, "streamEnd after established tunnel must not overwrite with failure"); + } + + @Test + void testStreamEndBeforeTunnelEstablishedFiresFailure() { + // 200 but no entity details -> consumeResponse throws -> fail() fires + // then streamEnd arrives on the already-failed handler + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, false, false, false, false, true); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("http", "example.org", 80), + Timeout.ofSeconds(1), + false, + null, + callback); + + Assertions.assertFalse(callback.completed); + Assertions.assertNotNull(callback.failed, "streamEnd with no tunnel must report failure"); + } + + static class RecordingCallback implements FutureCallback { + + volatile boolean completed; + volatile boolean cancelled; + volatile Exception failed; + volatile T result; + + @Override + public void completed(final T result) { + this.completed = true; + this.result = result; + } + + @Override + public void failed(final Exception ex) { + this.failed = ex; + } + + @Override + public void cancelled() { + this.cancelled = true; + } + } + + static class RecordingProtocolStarter implements IOEventHandlerFactory { + + volatile boolean connectedCalled; + volatile boolean inputBufferSeen; + + @Override + public IOEventHandler createHandler(final ProtocolIOSession ioSession, final Object attachment) { + return new IOEventHandler() { + + @Override + public void connected(final IOSession session) { + connectedCalled = true; + } + + @Override + public void inputReady(final IOSession session, final ByteBuffer src) { + if (src != null && src.hasRemaining()) { + inputBufferSeen = true; + } + } + + @Override + public void outputReady(final IOSession session) { + } + + @Override + public void timeout(final IOSession session, final Timeout timeout) { + } + + @Override + public void exception(final IOSession session, final Exception cause) { + } + + @Override + public void disconnected(final IOSession session) { + } + }; + } + } + + static class RecordingTlsStrategy implements TlsStrategy { + + volatile boolean invoked; + + @Override + public boolean upgrade( + final TransportSecurityLayer sessionLayer, + final HttpHost host, + final SocketAddress localAddress, + final SocketAddress remoteAddress, + final Object attachment, + final Timeout handshakeTimeout) { + invoked = true; + return true; + } + + @Override + public void upgrade( + final TransportSecurityLayer sessionLayer, + final NamedEndpoint endpoint, + final Object attachment, + final Timeout handshakeTimeout, + final FutureCallback callback) { + invoked = true; + if (callback != null) { + callback.completed(sessionLayer); + } + } + } + + static class ScriptedProxySession implements IOSession { + + private final int responseCode; + private final boolean withTunnelStream; + private final boolean signalCapacity; + private final boolean emitInputData; + private final boolean provideStreamControl; + private final boolean sendStreamEnd; + + private final Lock lock; + private Timeout socketTimeout; + + HttpRequest capturedRequest; + volatile int lastCapacityUpdate; + + private final AtomicBoolean open; + final AtomicBoolean streamCancelCalled; + final AtomicBoolean endStreamCalled; + + ScriptedProxySession(final int responseCode, final boolean withTunnelStream) { + this(responseCode, withTunnelStream, false, false, false, false); + } + + ScriptedProxySession(final int responseCode, final boolean withTunnelStream, final boolean signalCapacity) { + this(responseCode, withTunnelStream, signalCapacity, false, false, false); + } + + ScriptedProxySession( + final int responseCode, + final boolean withTunnelStream, + final boolean signalCapacity, + final boolean emitInputData, + final boolean provideStreamControl) { + this(responseCode, withTunnelStream, signalCapacity, emitInputData, provideStreamControl, false); + } + + ScriptedProxySession( + final int responseCode, + final boolean withTunnelStream, + final boolean signalCapacity, + final boolean emitInputData, + final boolean provideStreamControl, + final boolean sendStreamEnd) { + this.responseCode = responseCode; + this.withTunnelStream = withTunnelStream; + this.signalCapacity = signalCapacity; + this.emitInputData = emitInputData; + this.provideStreamControl = provideStreamControl; + this.sendStreamEnd = sendStreamEnd; + + this.lock = new ReentrantLock(); + this.socketTimeout = Timeout.ofSeconds(30); + this.lastCapacityUpdate = -1; + this.open = new AtomicBoolean(true); + this.streamCancelCalled = new AtomicBoolean(false); + this.endStreamCalled = new AtomicBoolean(false); + } + + @Override + public IOEventHandler getHandler() { + return null; + } + + @Override + public void upgrade(final IOEventHandler handler) { + } + + @Override + public Lock getLock() { + return lock; + } + + @Override + public void enqueue(final Command command, final Command.Priority priority) { + if (!(command instanceof RequestExecutionCommand)) { + return; + } + + final RequestExecutionCommand requestExecutionCommand = (RequestExecutionCommand) command; + final AsyncClientExchangeHandler exchangeHandler = requestExecutionCommand.getExchangeHandler(); + final org.apache.hc.core5.http.protocol.HttpContext context = requestExecutionCommand.getContext(); + + try { + if (provideStreamControl && exchangeHandler instanceof H2OverH2TunnelExchangeHandler) { + final StreamControl streamControl = (StreamControl) Proxy.newProxyInstance( + StreamControl.class.getClassLoader(), + new Class[]{StreamControl.class}, + (proxy, method, args) -> { + if ("cancel".equals(method.getName())) { + streamCancelCalled.set(true); + return method.getReturnType() == Boolean.TYPE ? Boolean.TRUE : null; + } + return defaultValue(method); + }); + + ((H2OverH2TunnelExchangeHandler) exchangeHandler).initiated(streamControl); + } + + exchangeHandler.produceRequest((RequestChannel) (request, entityDetails, requestContext) -> capturedRequest = request, context); + + if (signalCapacity) { + exchangeHandler.updateCapacity((CapacityChannel) increment -> lastCapacityUpdate = increment); + } + + final EntityDetails responseEntityDetails = + withTunnelStream ? new org.apache.hc.core5.http.impl.BasicEntityDetails(-1, null) : null; + exchangeHandler.consumeResponse(new BasicHttpResponse(responseCode), responseEntityDetails, context); + + if (withTunnelStream && responseCode == HttpStatus.SC_OK) { + exchangeHandler.produce(new DataStreamChannel() { + + @Override + public void requestOutput() { + } + + @Override + public int write(final ByteBuffer src) { + final int remaining = src != null ? src.remaining() : 0; + if (remaining > 0 && src != null) { + final byte[] tmp = new byte[remaining]; + src.get(tmp); + } + return remaining; + } + + @Override + public void endStream() throws IOException { + + } + + @Override + public void endStream(final java.util.List trailers) { + endStreamCalled.set(true); + } + + }); + + if (emitInputData) { + exchangeHandler.consume(ByteBuffer.wrap(new byte[]{1, 2, 3})); + } + } + + if (sendStreamEnd) { + exchangeHandler.streamEnd(null); + } + } catch (final Exception ex) { + exchangeHandler.failed(ex); + } + } + + private static Object defaultValue(final Method method) { + final Class rt = method.getReturnType(); + if (rt == Void.TYPE) { + return null; + } + if (rt == Boolean.TYPE) { + return false; + } + if (rt == Integer.TYPE) { + return 0; + } + if (rt == Long.TYPE) { + return 0L; + } + if (rt == Short.TYPE) { + return (short) 0; + } + if (rt == Byte.TYPE) { + return (byte) 0; + } + if (rt == Character.TYPE) { + return (char) 0; + } + if (rt == Float.TYPE) { + return 0f; + } + if (rt == Double.TYPE) { + return 0d; + } + return null; + } + + @Override + public boolean hasCommands() { + return false; + } + + @Override + public Command poll() { + return null; + } + + @Override + public ByteChannel channel() { + return this; + } + + @Override + public SocketAddress getRemoteAddress() { + return null; + } + + @Override + public SocketAddress getLocalAddress() { + return null; + } + + @Override + public int getEventMask() { + return 0; + } + + @Override + public void setEventMask(final int ops) { + } + + @Override + public void setEvent(final int op) { + } + + @Override + public void clearEvent(final int op) { + } + + @Override + public void close() { + open.set(false); + } + + @Override + public void close(final CloseMode closeMode) { + open.set(false); + } + + @Override + public Status getStatus() { + return open.get() ? Status.ACTIVE : Status.CLOSED; + } + + @Override + public Timeout getSocketTimeout() { + return socketTimeout; + } + + @Override + public void setSocketTimeout(final Timeout timeout) { + this.socketTimeout = timeout; + } + + @Override + public long getLastReadTime() { + return 0; + } + + @Override + public long getLastWriteTime() { + return 0; + } + + @Override + public long getLastEventTime() { + return 0; + } + + @Override + public void updateReadTime() { + } + + @Override + public void updateWriteTime() { + } + + @Override + public int read(final ByteBuffer dst) { + return 0; + } + + @Override + public int write(final ByteBuffer src) { + final int remaining = src != null ? src.remaining() : 0; + if (remaining > 0) { + final byte[] tmp = new byte[remaining]; + src.get(tmp); + } + return remaining; + } + + @Override + public boolean isOpen() { + return open.get(); + } + + @Override + public String getId() { + return "proxy-session"; + } + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestH2TunnelProtocolStarter.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestH2TunnelProtocolStarter.java new file mode 100644 index 0000000000..c73a54da0b --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestH2TunnelProtocolStarter.java @@ -0,0 +1,83 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.async; + +import org.apache.hc.core5.http.config.CharCodingConfig; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.impl.nio.ClientH2PrefaceHandler; +import org.apache.hc.core5.reactor.IOEventHandler; +import org.apache.hc.core5.reactor.ProtocolIOSession; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +class TestH2TunnelProtocolStarter { + + @Test + void testCreatesMinimalH2HandlerWithoutPushOrLogging() { + final H2TunnelProtocolStarter starter = new H2TunnelProtocolStarter( + H2Config.DEFAULT, CharCodingConfig.DEFAULT); + final ProtocolIOSession ioSession = Mockito.mock(ProtocolIOSession.class); + Mockito.when(ioSession.getId()).thenReturn("test-tunnel"); + + final IOEventHandler handler = starter.createHandler(ioSession, null); + + Assertions.assertNotNull(handler); + Assertions.assertInstanceOf(ClientH2PrefaceHandler.class, handler); + } + + @Test + void testDefaultsWhenNullConfig() { + final H2TunnelProtocolStarter starter = new H2TunnelProtocolStarter(null, null); + final ProtocolIOSession ioSession = Mockito.mock(ProtocolIOSession.class); + Mockito.when(ioSession.getId()).thenReturn("test-tunnel-null"); + + final IOEventHandler handler = starter.createHandler(ioSession, null); + + Assertions.assertNotNull(handler); + Assertions.assertInstanceOf(ClientH2PrefaceHandler.class, handler); + } + + @Test + void testCustomH2ConfigIsRespected() { + final H2Config customConfig = H2Config.custom() + .setMaxFrameSize(32768) + .setInitialWindowSize(128 * 1024) + .setPushEnabled(false) + .build(); + final H2TunnelProtocolStarter starter = new H2TunnelProtocolStarter( + customConfig, CharCodingConfig.DEFAULT); + final ProtocolIOSession ioSession = Mockito.mock(ProtocolIOSession.class); + Mockito.when(ioSession.getId()).thenReturn("test-tunnel-custom"); + + final IOEventHandler handler = starter.createHandler(ioSession, null); + + Assertions.assertNotNull(handler); + Assertions.assertInstanceOf(ClientH2PrefaceHandler.class, handler); + } + +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestH2TunnelRawIOSession.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestH2TunnelRawIOSession.java new file mode 100644 index 0000000000..4b72e9c7c2 --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestH2TunnelRawIOSession.java @@ -0,0 +1,359 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.async; + +import java.io.IOException; +import java.lang.reflect.Proxy; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hc.core5.http.StreamControl; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.Command; +import org.apache.hc.core5.reactor.IOEventHandler; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class TestH2TunnelRawIOSession { + + @Test + void testCapacityInitialUpdateIsBounded() throws Exception { + final DummyPhysicalSession physical = new DummyPhysicalSession(); + final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), null); + + final AtomicInteger last = new AtomicInteger(0); + raw.updateCapacityChannel(new CapacityChannel() { + @Override + public void update(final int increment) { + last.set(increment); + } + }); + + Assertions.assertTrue(last.get() > 0); + Assertions.assertTrue(last.get() < Integer.MAX_VALUE); + } + + @Test + void testAppendInputOverflowFails() throws Exception { + final DummyPhysicalSession physical = new DummyPhysicalSession(); + final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), null); + + // INBOUND_BUFFER_LIMIT is 64k in the implementation; overflow by 1. + final ByteBuffer tooBig = ByteBuffer.allocate(64 * 1024 + 1); + Assertions.assertThrows(IOException.class, () -> raw.appendInput(tooBig)); + } + + @Test + void testReadTriggersCapacityUpdateOnConsumption() throws Exception { + final DummyPhysicalSession physical = new DummyPhysicalSession(); + final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), null); + + final AtomicInteger last = new AtomicInteger(-1); + raw.updateCapacityChannel(new CapacityChannel() { + @Override + public void update(final int increment) { + last.set(increment); + } + }); + + raw.appendInput(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})); + + final ByteBuffer dst = ByteBuffer.allocate(4); + final int n = raw.read(dst); + Assertions.assertEquals(4, n); + // Capacity update should publish the consumed bytes (4), not unbounded. + Assertions.assertEquals(4, last.get()); + } + + @Test + void testWriteIsBounded() { + final DummyPhysicalSession physical = new DummyPhysicalSession(); + final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), null); + + final ByteBuffer src = ByteBuffer.allocate(1024 * 1024); + final int n = raw.write(src); + Assertions.assertTrue(n > 0); + Assertions.assertTrue(n < 1024 * 1024); + Assertions.assertEquals(64 * 1024, n, "Expected OUTBOUND_BUFFER_LIMIT (64k) write bound"); + } + + @Test + void testImmediateCloseCancelsStreamControlButNotPhysicalSession() { + final DummyPhysicalSession physical = new DummyPhysicalSession(); + + final AtomicBoolean cancelled = new AtomicBoolean(false); + final StreamControl streamControl = (StreamControl) Proxy.newProxyInstance( + StreamControl.class.getClassLoader(), + new Class[]{StreamControl.class}, + (proxy, method, args) -> { + final String name = method.getName(); + final Class rt = method.getReturnType(); + + if ("cancel".equals(name)) { + cancelled.set(true); + // IMPORTANT: cancel() may return boolean (Cancellable) + return rt == Boolean.TYPE ? Boolean.TRUE : null; + } + + if (rt == Void.TYPE) { + return null; + } + if (rt == Boolean.TYPE) { + return false; + } + if (rt == Integer.TYPE) { + return 0; + } + if (rt == Long.TYPE) { + return 0L; + } + if (rt == Short.TYPE) { + return (short) 0; + } + if (rt == Byte.TYPE) { + return (byte) 0; + } + if (rt == Character.TYPE) { + return (char) 0; + } + if (rt == Float.TYPE) { + return 0f; + } + if (rt == Double.TYPE) { + return 0d; + } + return null; + }); + + final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), streamControl); + raw.close(CloseMode.IMMEDIATE); + + Assertions.assertTrue(cancelled.get(), "IMMEDIATE close must cancel stream"); + Assertions.assertTrue(physical.isOpen(), "Tunnel close must not close physical HTTP/2 connection"); + } + + @Test + void testGracefulCloseEndsStreamAfterDrain() throws Exception { + final DummyPhysicalSession physical = new DummyPhysicalSession(); + + final AtomicBoolean endStreamCalled = new AtomicBoolean(false); + rawAttachChannel(physical, endStreamCalled); + + final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), null); + raw.attachChannel(physical.dataChannel); + + // Put some outbound bytes + final ByteBuffer src = ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5}); + final int written = raw.write(src); + Assertions.assertEquals(5, written); + + raw.close(CloseMode.GRACEFUL); + raw.flushOutput(); + + Assertions.assertTrue(endStreamCalled.get(), "GRACEFUL close must endStream once outbound drained"); + } + + private static void rawAttachChannel(final DummyPhysicalSession physical, final AtomicBoolean endStreamCalled) { + physical.dataChannel = new DataStreamChannel() { + + @Override + public void requestOutput() { + } + + @Override + public int write(final ByteBuffer src) { + final int remaining = src != null ? src.remaining() : 0; + if (remaining > 0 && src != null) { + final byte[] tmp = new byte[remaining]; + src.get(tmp); + } + return remaining; + } + + @Override + public void endStream() throws IOException { + + } + + @Override + public void endStream(final java.util.List trailers) { + endStreamCalled.set(true); + } + }; + } + + static final class DummyPhysicalSession implements IOSession { + + private final Lock lock = new ReentrantLock(); + private volatile boolean open = true; + private volatile Timeout socketTimeout = Timeout.ofSeconds(30); + + volatile DataStreamChannel dataChannel; + + @Override + public IOEventHandler getHandler() { + return null; + } + + @Override + public void upgrade(final IOEventHandler handler) { + } + + @Override + public Lock getLock() { + return lock; + } + + @Override + public void enqueue(final Command command, final Command.Priority priority) { + } + + @Override + public boolean hasCommands() { + return false; + } + + @Override + public Command poll() { + return null; + } + + @Override + public ByteChannel channel() { + return this; + } + + @Override + public SocketAddress getRemoteAddress() { + return null; + } + + @Override + public SocketAddress getLocalAddress() { + return null; + } + + @Override + public int getEventMask() { + return 0; + } + + @Override + public void setEventMask(final int ops) { + } + + @Override + public void setEvent(final int op) { + } + + @Override + public void clearEvent(final int op) { + } + + @Override + public void close() { + open = false; + } + + @Override + public void close(final CloseMode closeMode) { + open = false; + } + + @Override + public Status getStatus() { + return open ? Status.ACTIVE : Status.CLOSED; + } + + @Override + public Timeout getSocketTimeout() { + return socketTimeout; + } + + @Override + public void setSocketTimeout(final Timeout timeout) { + socketTimeout = timeout; + } + + @Override + public long getLastReadTime() { + return 0; + } + + @Override + public long getLastWriteTime() { + return 0; + } + + @Override + public long getLastEventTime() { + return 0; + } + + @Override + public void updateReadTime() { + } + + @Override + public void updateWriteTime() { + } + + @Override + public int read(final ByteBuffer dst) { + return 0; + } + + @Override + public int write(final ByteBuffer src) { + final int remaining = src != null ? src.remaining() : 0; + if (remaining > 0 && src != null) { + final byte[] tmp = new byte[remaining]; + src.get(tmp); + } + return remaining; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public String getId() { + return "dummy-physical"; + } + } +}