Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# changes made to better apply number overflow edge cases
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#OutStream.increaseWindow")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#OutStream.streamId")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#OutStream.increaseWindow")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.http.impl.engine.http2.Http2StreamHandling#OutStreamImpl.increaseWindow")
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,9 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings,
frame match {
case WindowUpdateFrame(streamId, increment)
if streamId == 0 /* else fall through to StreamFrameEvent */ =>
multiplexer.updateConnectionLevelWindow(increment)
if (!multiplexer.updateConnectionLevelWindow(increment))
pushGOAWAY(ErrorCode.FLOW_CONTROL_ERROR,
"WINDOW_UPDATE would exceed maximum connection-level flow-control window size")
case p: PriorityFrame => multiplexer.updatePriority(p)
case s: StreamFrameEvent =>
if (!terminating)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import scala.collection.mutable
@InternalApi
private[http2] trait Http2Multiplexer {
def pushControlFrame(frame: FrameEvent): Unit
def updateConnectionLevelWindow(increment: Int): Unit
def updateConnectionLevelWindow(increment: Int): Boolean
def updateMaxFrameSize(newMaxFrameSize: Int): Unit
def updateDefaultWindow(newDefaultWindow: Int): Unit
def updatePriority(priorityFrame: PriorityFrame): Unit
Expand Down Expand Up @@ -110,10 +110,16 @@ private[http2] trait Http2MultiplexerSupport { logic: GraphStageLogic with Stage

override def pushControlFrame(frame: FrameEvent): Unit = updateState(_.pushControlFrame(frame))

def updateConnectionLevelWindow(increment: Int): Unit = {
connectionWindowLeft += increment
debug(s"Updating outgoing connection window by $increment to $connectionWindowLeft")
updateState(_.connectionWindowAvailable())
def updateConnectionLevelWindow(increment: Int): Boolean = {
val newWindow = connectionWindowLeft.toLong + increment
if (newWindow > Http2Protocol.MaxWindowSize) {
false
} else {
connectionWindowLeft = newWindow.toInt
debug(s"Updating outgoing connection window by $increment to $connectionWindowLeft")
updateState(_.connectionWindowAvailable())
true
}
}
override def updateMaxFrameSize(newMaxFrameSize: Int): Unit = currentMaxFrameSize = newMaxFrameSize
override def updateDefaultWindow(newDefaultWindow: Int): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ private[http] object Http2Protocol {
*/
final val InitialWindowSize = 65535

/**
* The maximum value of a flow-control window size as defined by the specification.
*
* See https://tools.ietf.org/html/rfc7540#section-6.9.1:
* A sender MUST NOT allow a flow-control window to exceed 2^31-1 octets.
* If a sender receives a WINDOW_UPDATE that causes a flow-control window to
* exceed this maximum, it MUST treat this as a connection or stream error
* (Section 5.4) of type FLOW_CONTROL_ERROR.
*/
final val MaxWindowSize: Int = Int.MaxValue

/**
* The initial frame size for both incoming and outgoing frames as defined by the
* specification.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,12 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper
extends ReceivingDataWithBuffer(HalfClosedRemoteWaitingForOutgoingStream(extraInitialWindow)) {
override def handleOutgoingCreated(
outStream: OutStream, correlationAttributes: Map[AttributeKey[_], _]): StreamState = {
outStream.increaseWindow(extraInitialWindow)
Open(buffer, outStream)
if (outStream.increaseWindow(extraInitialWindow)) Open(buffer, outStream)
else {
outStream.cancelStream()
multiplexer.pushControlFrame(RstStreamFrame(outStream.streamId, ErrorCode.FLOW_CONTROL_ERROR))
Closed
}
}
override def handleOutgoingCreatedAndFinished(correlationAttributes: Map[AttributeKey[_], _]): StreamState =
HalfClosedLocal(buffer)
Expand Down Expand Up @@ -421,8 +425,11 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper
}

def increaseWindow(delta: Int): StreamState = {
outStream.increaseWindow(delta)
this
if (outStream.increaseWindow(delta)) this
else {
multiplexer.pushControlFrame(RstStreamFrame(outStream.streamId, ErrorCode.FLOW_CONTROL_ERROR))
Closed
}
}
}

Expand Down Expand Up @@ -531,8 +538,12 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper
outStream.cancelStream()
}
override def incrementWindow(delta: Int): StreamState = {
outStream.increaseWindow(delta)
this
if (outStream.increaseWindow(delta)) this
else {
outStream.cancelStream()
multiplexer.pushControlFrame(RstStreamFrame(outStream.streamId, ErrorCode.FLOW_CONTROL_ERROR))
Closed
}
}
}

Expand All @@ -553,8 +564,12 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper

override def handleOutgoingCreated(
outStream: OutStream, correlationAttributes: Map[AttributeKey[_], _]): StreamState = {
outStream.increaseWindow(extraInitialWindow)
HalfClosedRemoteSendingData(outStream)
if (outStream.increaseWindow(extraInitialWindow)) HalfClosedRemoteSendingData(outStream)
else {
outStream.cancelStream()
multiplexer.pushControlFrame(RstStreamFrame(outStream.streamId, ErrorCode.FLOW_CONTROL_ERROR))
Closed
}
}
override def handleOutgoingCreatedAndFinished(correlationAttributes: Map[AttributeKey[_], _]): StreamState = Closed
}
Expand Down Expand Up @@ -700,11 +715,12 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper
}

trait OutStream {
def streamId: Int
def canSend: Boolean
def cancelStream(): Unit
def endStreamIfPossible(): Option[FrameEvent]
def nextFrame(maxBytesToSend: Int): DataFrame
def increaseWindow(delta: Int): Unit
def increaseWindow(delta: Int): Boolean
def isDone: Boolean
}
object OutStream {
Expand Down Expand Up @@ -816,11 +832,17 @@ private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper
}
def bufferedBytes: Int = buffer.length

override def increaseWindow(increment: Int): Unit = if (increment >= 0) {
outboundWindowLeft += increment
debug(s"Updating window for $streamId by $increment to $outboundWindowLeft buffered bytes: $bufferedBytes")
enqueueIfPossible()
}
override def increaseWindow(increment: Int): Boolean = if (increment >= 0) {
val newWindow = outboundWindowLeft.toLong + increment
if (newWindow > Http2Protocol.MaxWindowSize) {
false
} else {
outboundWindowLeft = newWindow.toInt
debug(s"Updating window for $streamId by $increment to $outboundWindowLeft buffered bytes: $bufferedBytes")
enqueueIfPossible()
true
}
} else true

// external callbacks, need to make sure that potential stream state changing events are run through the state machine
override def onPush(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ private[parsing] object SpecializedHeaderValueParsers {
: (HttpHeader, Int) = {
@tailrec def recurse(ix: Int = valueStart, result: Long = 0): (HttpHeader, Int) = {
val c = byteChar(input, ix)
if (result < 0) fail("`Content-Length` header value must not exceed 63-bit integer range")
else if (DIGIT(c)) recurse(ix + 1, result * 10 + c - '0')
else if (WSP(c)) recurse(ix + 1, result)
if (DIGIT(c)) {
val digit = c - '0'
if (result > (Long.MaxValue - digit) / 10)
fail("`Content-Length` header value must not exceed 63-bit integer range")
else recurse(ix + 1, result * 10 + digit)
} else if (WSP(c)) recurse(ix + 1, result)
else if (c == '\r' && byteAt(input, ix + 1) == LF_BYTE) (`Content-Length`(result), ix + 2)
else if (c == '\n') (`Content-Length`(result), ix + 1)
else fail("Illegal `Content-Length` header value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.concurrent.duration._
* We mark it private[http] because we don't want to support it as public API.
*/
@InternalApi
private[http] class Timestamp private (val timestampNanos: Long) extends AnyVal {
private[http] class Timestamp private[util] (val timestampNanos: Long) extends AnyVal {

def +(period: Duration): Timestamp =
if (isNever) this
Expand All @@ -49,6 +49,6 @@ private[http] object Timestamp {
def never: Timestamp = new Timestamp(Long.MaxValue)

implicit object Ordering extends Ordering[Timestamp] {
def compare(x: Timestamp, y: Timestamp): Int = math.signum(x.timestampNanos - y.timestampNanos).toInt
def compare(x: Timestamp, y: Timestamp): Int = java.lang.Long.compare(x.timestampNanos, y.timestampNanos)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ abstract class ContentLengthHeaderParserSpec(mode: String, newLine: String) exte
a[ParsingException] should be thrownBy parse("9223372036854775808") // Long.MaxValue + 1
a[ParsingException] should be thrownBy parse("92233720368547758070") // Long.MaxValue * 10 which is 0 taken overflow into account
a[ParsingException] should be thrownBy parse("92233720368547758080") // (Long.MaxValue + 1) * 10 which is 0 taken overflow into account
// overflow that wraps to a small positive value (was not caught by the old `result < 0` check)
a[ParsingException] should be thrownBy parse("18446744073709551634") // ~2^64+18, wraps to 18 in signed Long
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.http.impl.util

import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class TimestampSpec extends AnyWordSpec with Matchers {

"Timestamp.Ordering" should {
"correctly order two normal timestamps" in {
val t1 = new Timestamp(100L)
val t2 = new Timestamp(200L)
Timestamp.Ordering.compare(t1, t2) should be < 0
Timestamp.Ordering.compare(t2, t1) should be > 0
Timestamp.Ordering.compare(t1, t1) shouldEqual 0
}

"correctly order Timestamp.never against a normal timestamp" in {
val normal = new Timestamp(System.nanoTime())
val never = Timestamp.never
Timestamp.Ordering.compare(normal, never) should be < 0
Timestamp.Ordering.compare(never, normal) should be > 0
Timestamp.Ordering.compare(never, never) shouldEqual 0
}

"not overflow when comparing Long.MaxValue (never) with Long.MinValue" in {
// Subtraction Long.MaxValue - Long.MinValue would overflow in signed arithmetic;
// using Long.compare avoids this.
val tMax = new Timestamp(Long.MaxValue)
val tMin = new Timestamp(Long.MinValue)
Timestamp.Ordering.compare(tMax, tMin) should be > 0
Timestamp.Ordering.compare(tMin, tMax) should be < 0
}

"not overflow when comparing values near Long.MaxValue and Long.MinValue" in {
val tAlmostMax = new Timestamp(Long.MaxValue - 1)
val tAlmostMin = new Timestamp(Long.MinValue + 1)
Timestamp.Ordering.compare(tAlmostMax, tAlmostMin) should be > 0
Timestamp.Ordering.compare(tAlmostMin, tAlmostMax) should be < 0
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,17 @@ class Http2ServerSpec extends Http2SpecWithMaterializer("""
network.expectDecodedHEADERS(streamId = TheStreamId, endStream = true).headers should be(
immutable.Seq(RawHeader("grpc-status", "10")))
})

"reject stream-level WINDOW_UPDATE that would overflow the flow-control window with FLOW_CONTROL_ERROR"
.inAssertAllStagesStopped(
new WaitingForResponseDataSetup {
// fill the stream-level window up to Int.MaxValue
network.sendWINDOW_UPDATE(TheStreamId, Int.MaxValue - Http2Protocol.InitialWindowSize)
// one more increment must trigger RST_STREAM with FLOW_CONTROL_ERROR;
// use sendFrame directly to bypass the test-side window tracking assertion
network.sendFrame(WindowUpdateFrame(TheStreamId, 1))
network.expectRST_STREAM(TheStreamId, ErrorCode.FLOW_CONTROL_ERROR)
})
}

"support multiple concurrent substreams" should {
Expand Down Expand Up @@ -1274,6 +1285,18 @@ class Http2ServerSpec extends Http2SpecWithMaterializer("""
network.expectRST_STREAM(1, ErrorCode.PROTOCOL_ERROR)
})

"reject connection-level WINDOW_UPDATE that would overflow the flow-control window with FLOW_CONTROL_ERROR"
.inAssertAllStagesStopped(
new TestSetup with RequestResponseProbes {
// fill the connection-level window up to Int.MaxValue
network.sendWINDOW_UPDATE(0, Int.MaxValue - Http2Protocol.InitialWindowSize)
// one more increment must trigger GOAWAY with FLOW_CONTROL_ERROR;
// use sendFrame directly to bypass the test-side window tracking assertion
network.sendFrame(WindowUpdateFrame(0, 1))
val (_, errorCode) = network.expectGOAWAY()
errorCode should ===(ErrorCode.FLOW_CONTROL_ERROR)
})

"backpressure incoming frames when outgoing control frame buffer fills".inAssertAllStagesStopped(
new TestSetup with HandlerFunctionSupport {
override def settings: ServerSettings =
Expand Down