diff --git a/http-core/src/main/mima-filters/2.0.x.backwards.excludes/num-overflow.excludes b/http-core/src/main/mima-filters/2.0.x.backwards.excludes/num-overflow.excludes new file mode 100644 index 000000000..511213069 --- /dev/null +++ b/http-core/src/main/mima-filters/2.0.x.backwards.excludes/num-overflow.excludes @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# changes made to better apply number overflow edge cases +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#OutStream.increaseWindow") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#OutStream.streamId") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#OutStream.increaseWindow") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#OutStreamImpl.increaseWindow") diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala index 2ad364dd3..f6f942956 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Demux.scala @@ -355,7 +355,9 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings, frame match { case WindowUpdateFrame(streamId, increment) if streamId == 0 /* else fall through to StreamFrameEvent */ => - multiplexer.updateConnectionLevelWindow(increment) + if (!multiplexer.updateConnectionLevelWindow(increment)) + pushGOAWAY(ErrorCode.FLOW_CONTROL_ERROR, + "WINDOW_UPDATE would exceed maximum connection-level flow-control window size") case p: PriorityFrame => multiplexer.updatePriority(p) case s: StreamFrameEvent => if (!terminating) diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala index 3a7f77b3e..60c3ad996 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Multiplexer.scala @@ -33,7 +33,7 @@ import scala.collection.mutable @InternalApi private[http2] trait Http2Multiplexer { def pushControlFrame(frame: FrameEvent): Unit - def updateConnectionLevelWindow(increment: Int): Unit + def updateConnectionLevelWindow(increment: Int): Boolean def updateMaxFrameSize(newMaxFrameSize: Int): Unit def updateDefaultWindow(newDefaultWindow: Int): Unit def updatePriority(priorityFrame: PriorityFrame): Unit @@ -110,10 +110,16 @@ private[http2] trait Http2MultiplexerSupport { logic: GraphStageLogic with Stage override def pushControlFrame(frame: FrameEvent): Unit = updateState(_.pushControlFrame(frame)) - def updateConnectionLevelWindow(increment: Int): Unit = { - connectionWindowLeft += increment - debug(s"Updating outgoing connection window by $increment to $connectionWindowLeft") - updateState(_.connectionWindowAvailable()) + def updateConnectionLevelWindow(increment: Int): Boolean = { + val newWindow = connectionWindowLeft.toLong + increment + if (newWindow > Http2Protocol.MaxWindowSize) { + false + } else { + connectionWindowLeft = newWindow.toInt + debug(s"Updating outgoing connection window by $increment to $connectionWindowLeft") + updateState(_.connectionWindowAvailable()) + true + } } override def updateMaxFrameSize(newMaxFrameSize: Int): Unit = currentMaxFrameSize = newMaxFrameSize override def updateDefaultWindow(newDefaultWindow: Int): Unit = { diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Protocol.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Protocol.scala index f81ea6ebb..daaf3f0be 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Protocol.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2Protocol.scala @@ -54,6 +54,17 @@ private[http] object Http2Protocol { */ final val InitialWindowSize = 65535 + /** + * The maximum value of a flow-control window size as defined by the specification. + * + * See https://tools.ietf.org/html/rfc7540#section-6.9.1: + * A sender MUST NOT allow a flow-control window to exceed 2^31-1 octets. + * If a sender receives a WINDOW_UPDATE that causes a flow-control window to + * exceed this maximum, it MUST treat this as a connection or stream error + * (Section 5.4) of type FLOW_CONTROL_ERROR. + */ + final val MaxWindowSize: Int = Int.MaxValue + /** * The initial frame size for both incoming and outgoing frames as defined by the * specification. diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala index e17ca4dc3..a0763edbf 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2StreamHandling.scala @@ -380,8 +380,12 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper extends ReceivingDataWithBuffer(HalfClosedRemoteWaitingForOutgoingStream(extraInitialWindow)) { override def handleOutgoingCreated( outStream: OutStream, correlationAttributes: Map[AttributeKey[_], _]): StreamState = { - outStream.increaseWindow(extraInitialWindow) - Open(buffer, outStream) + if (outStream.increaseWindow(extraInitialWindow)) Open(buffer, outStream) + else { + outStream.cancelStream() + multiplexer.pushControlFrame(RstStreamFrame(outStream.streamId, ErrorCode.FLOW_CONTROL_ERROR)) + Closed + } } override def handleOutgoingCreatedAndFinished(correlationAttributes: Map[AttributeKey[_], _]): StreamState = HalfClosedLocal(buffer) @@ -421,8 +425,11 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper } def increaseWindow(delta: Int): StreamState = { - outStream.increaseWindow(delta) - this + if (outStream.increaseWindow(delta)) this + else { + multiplexer.pushControlFrame(RstStreamFrame(outStream.streamId, ErrorCode.FLOW_CONTROL_ERROR)) + Closed + } } } @@ -531,8 +538,12 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper outStream.cancelStream() } override def incrementWindow(delta: Int): StreamState = { - outStream.increaseWindow(delta) - this + if (outStream.increaseWindow(delta)) this + else { + outStream.cancelStream() + multiplexer.pushControlFrame(RstStreamFrame(outStream.streamId, ErrorCode.FLOW_CONTROL_ERROR)) + Closed + } } } @@ -553,8 +564,12 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper override def handleOutgoingCreated( outStream: OutStream, correlationAttributes: Map[AttributeKey[_], _]): StreamState = { - outStream.increaseWindow(extraInitialWindow) - HalfClosedRemoteSendingData(outStream) + if (outStream.increaseWindow(extraInitialWindow)) HalfClosedRemoteSendingData(outStream) + else { + outStream.cancelStream() + multiplexer.pushControlFrame(RstStreamFrame(outStream.streamId, ErrorCode.FLOW_CONTROL_ERROR)) + Closed + } } override def handleOutgoingCreatedAndFinished(correlationAttributes: Map[AttributeKey[_], _]): StreamState = Closed } @@ -700,11 +715,12 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper } trait OutStream { + def streamId: Int def canSend: Boolean def cancelStream(): Unit def endStreamIfPossible(): Option[FrameEvent] def nextFrame(maxBytesToSend: Int): DataFrame - def increaseWindow(delta: Int): Unit + def increaseWindow(delta: Int): Boolean def isDone: Boolean } object OutStream { @@ -816,11 +832,17 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper } def bufferedBytes: Int = buffer.length - override def increaseWindow(increment: Int): Unit = if (increment >= 0) { - outboundWindowLeft += increment - debug(s"Updating window for $streamId by $increment to $outboundWindowLeft buffered bytes: $bufferedBytes") - enqueueIfPossible() - } + override def increaseWindow(increment: Int): Boolean = if (increment >= 0) { + val newWindow = outboundWindowLeft.toLong + increment + if (newWindow > Http2Protocol.MaxWindowSize) { + false + } else { + outboundWindowLeft = newWindow.toInt + debug(s"Updating window for $streamId by $increment to $outboundWindowLeft buffered bytes: $bufferedBytes") + enqueueIfPossible() + true + } + } else true // external callbacks, need to make sure that potential stream state changing events are run through the state machine override def onPush(): Unit = { diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/parsing/SpecializedHeaderValueParsers.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/parsing/SpecializedHeaderValueParsers.scala index 3b2977217..883cf8ffe 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/parsing/SpecializedHeaderValueParsers.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/parsing/SpecializedHeaderValueParsers.scala @@ -37,9 +37,12 @@ private[parsing] object SpecializedHeaderValueParsers { : (HttpHeader, Int) = { @tailrec def recurse(ix: Int = valueStart, result: Long = 0): (HttpHeader, Int) = { val c = byteChar(input, ix) - if (result < 0) fail("`Content-Length` header value must not exceed 63-bit integer range") - else if (DIGIT(c)) recurse(ix + 1, result * 10 + c - '0') - else if (WSP(c)) recurse(ix + 1, result) + if (DIGIT(c)) { + val digit = c - '0' + if (result > (Long.MaxValue - digit) / 10) + fail("`Content-Length` header value must not exceed 63-bit integer range") + else recurse(ix + 1, result * 10 + digit) + } else if (WSP(c)) recurse(ix + 1, result) else if (c == '\r' && byteAt(input, ix + 1) == LF_BYTE) (`Content-Length`(result), ix + 2) else if (c == '\n') (`Content-Length`(result), ix + 1) else fail("Illegal `Content-Length` header value") diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/util/Timestamp.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/util/Timestamp.scala index f4963bcd4..b63370719 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/util/Timestamp.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/util/Timestamp.scala @@ -24,7 +24,7 @@ import scala.concurrent.duration._ * We mark it private[http] because we don't want to support it as public API. */ @InternalApi -private[http] class Timestamp private (val timestampNanos: Long) extends AnyVal { +private[http] class Timestamp private[util] (val timestampNanos: Long) extends AnyVal { def +(period: Duration): Timestamp = if (isNever) this @@ -49,6 +49,6 @@ private[http] object Timestamp { def never: Timestamp = new Timestamp(Long.MaxValue) implicit object Ordering extends Ordering[Timestamp] { - def compare(x: Timestamp, y: Timestamp): Int = math.signum(x.timestampNanos - y.timestampNanos).toInt + def compare(x: Timestamp, y: Timestamp): Int = java.lang.Long.compare(x.timestampNanos, y.timestampNanos) } } diff --git a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/ContentLengthHeaderParserSpec.scala b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/ContentLengthHeaderParserSpec.scala index 1671d6249..2e28d92e4 100644 --- a/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/ContentLengthHeaderParserSpec.scala +++ b/http-core/src/test/scala/org/apache/pekko/http/impl/engine/parsing/ContentLengthHeaderParserSpec.scala @@ -37,6 +37,8 @@ abstract class ContentLengthHeaderParserSpec(mode: String, newLine: String) exte a[ParsingException] should be thrownBy parse("9223372036854775808") // Long.MaxValue + 1 a[ParsingException] should be thrownBy parse("92233720368547758070") // Long.MaxValue * 10 which is 0 taken overflow into account a[ParsingException] should be thrownBy parse("92233720368547758080") // (Long.MaxValue + 1) * 10 which is 0 taken overflow into account + // overflow that wraps to a small positive value (was not caught by the old `result < 0` check) + a[ParsingException] should be thrownBy parse("18446744073709551634") // ~2^64+18, wraps to 18 in signed Long } } diff --git a/http-core/src/test/scala/org/apache/pekko/http/impl/util/TimestampSpec.scala b/http-core/src/test/scala/org/apache/pekko/http/impl/util/TimestampSpec.scala new file mode 100644 index 000000000..6b3630c32 --- /dev/null +++ b/http-core/src/test/scala/org/apache/pekko/http/impl/util/TimestampSpec.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.http.impl.util + +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class TimestampSpec extends AnyWordSpec with Matchers { + + "Timestamp.Ordering" should { + "correctly order two normal timestamps" in { + val t1 = new Timestamp(100L) + val t2 = new Timestamp(200L) + Timestamp.Ordering.compare(t1, t2) should be < 0 + Timestamp.Ordering.compare(t2, t1) should be > 0 + Timestamp.Ordering.compare(t1, t1) shouldEqual 0 + } + + "correctly order Timestamp.never against a normal timestamp" in { + val normal = new Timestamp(System.nanoTime()) + val never = Timestamp.never + Timestamp.Ordering.compare(normal, never) should be < 0 + Timestamp.Ordering.compare(never, normal) should be > 0 + Timestamp.Ordering.compare(never, never) shouldEqual 0 + } + + "not overflow when comparing Long.MaxValue (never) with Long.MinValue" in { + // Subtraction Long.MaxValue - Long.MinValue would overflow in signed arithmetic; + // using Long.compare avoids this. + val tMax = new Timestamp(Long.MaxValue) + val tMin = new Timestamp(Long.MinValue) + Timestamp.Ordering.compare(tMax, tMin) should be > 0 + Timestamp.Ordering.compare(tMin, tMax) should be < 0 + } + + "not overflow when comparing values near Long.MaxValue and Long.MinValue" in { + val tAlmostMax = new Timestamp(Long.MaxValue - 1) + val tAlmostMin = new Timestamp(Long.MinValue + 1) + Timestamp.Ordering.compare(tAlmostMax, tAlmostMin) should be > 0 + Timestamp.Ordering.compare(tAlmostMin, tAlmostMax) should be < 0 + } + } +} diff --git a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerSpec.scala b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerSpec.scala index 7a8846435..9acb61233 100644 --- a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerSpec.scala +++ b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ServerSpec.scala @@ -1061,6 +1061,17 @@ class Http2ServerSpec extends Http2SpecWithMaterializer(""" network.expectDecodedHEADERS(streamId = TheStreamId, endStream = true).headers should be( immutable.Seq(RawHeader("grpc-status", "10"))) }) + + "reject stream-level WINDOW_UPDATE that would overflow the flow-control window with FLOW_CONTROL_ERROR" + .inAssertAllStagesStopped( + new WaitingForResponseDataSetup { + // fill the stream-level window up to Int.MaxValue + network.sendWINDOW_UPDATE(TheStreamId, Int.MaxValue - Http2Protocol.InitialWindowSize) + // one more increment must trigger RST_STREAM with FLOW_CONTROL_ERROR; + // use sendFrame directly to bypass the test-side window tracking assertion + network.sendFrame(WindowUpdateFrame(TheStreamId, 1)) + network.expectRST_STREAM(TheStreamId, ErrorCode.FLOW_CONTROL_ERROR) + }) } "support multiple concurrent substreams" should { @@ -1274,6 +1285,18 @@ class Http2ServerSpec extends Http2SpecWithMaterializer(""" network.expectRST_STREAM(1, ErrorCode.PROTOCOL_ERROR) }) + "reject connection-level WINDOW_UPDATE that would overflow the flow-control window with FLOW_CONTROL_ERROR" + .inAssertAllStagesStopped( + new TestSetup with RequestResponseProbes { + // fill the connection-level window up to Int.MaxValue + network.sendWINDOW_UPDATE(0, Int.MaxValue - Http2Protocol.InitialWindowSize) + // one more increment must trigger GOAWAY with FLOW_CONTROL_ERROR; + // use sendFrame directly to bypass the test-side window tracking assertion + network.sendFrame(WindowUpdateFrame(0, 1)) + val (_, errorCode) = network.expectGOAWAY() + errorCode should ===(ErrorCode.FLOW_CONTROL_ERROR) + }) + "backpressure incoming frames when outgoing control frame buffer fills".inAssertAllStagesStopped( new TestSetup with HandlerFunctionSupport { override def settings: ServerSettings =