Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ class CoreDqlExtension extends ExtensionSpi {
protected var licenseManager: Option[LicenseManager] = None
protected val logger = org.slf4j.LoggerFactory.getLogger(getClass)

/** Story P0.6 -- the cap-hit collector captured at `initialize`. It is the SAME per-strategy
* `TelemetryCollector` that `GatewayApi.run` increments for `queriesTotal`, so a `QueryResults`
* cap-hit recorded here rides the existing `InstancePing` delta to the license-server. Defaults
* to `Noop` until initialized (or if no strategy carries a real collector). The closed
* `EnforcedDqlExtension` (priority < 100) extends this class and inherits the increment via
* the shared `capOrReject`, so the paid enforcement path counts cap-hits too.
*/
protected var capHitCollector: TelemetryCollector = TelemetryCollector.Noop

override def extensionId: String = "core-dql"
override def extensionName: String = "Core DQL Quotas"
override def version: String = "0.1.0"
Expand All @@ -70,6 +79,7 @@ class CoreDqlExtension extends ExtensionSpi {
): Either[String, Unit] = {
logger.info("🔌 Initializing Core DQL extension")
licenseManager = Some(licenseRefreshStrategy.licenseManager)
capHitCollector = licenseRefreshStrategy.telemetryCollector
Right(())
}

Expand Down Expand Up @@ -172,6 +182,9 @@ class CoreDqlExtension extends ExtensionSpi {
(single.limit, quota.maxQueryResults) match {
// (1) Explicit LIMIT over a finite quota → hard 402 (UNCHANGED).
case (Some(l), Some(max)) if max < l.limit =>
// Story P0.6 — the meter bit: record a QueryResults cap-hit BEFORE building the 402
// (side-effect only; the reject itself is unchanged — AC 8).
capHitCollector.incrementCapHit(TelemetryCollector.CapHitKind.QueryResults)
logger.warn(
s"⚠️ Query result limit (${l.limit}) exceeds license quota ($max)"
)
Expand All @@ -198,6 +211,12 @@ class CoreDqlExtension extends ExtensionSpi {
// and must NOT be re-routed to scroll (it would mishandle aggregation buckets). JOIN legs
// (ResultCapContext.isSuppressed) also skip the cap so the join input is not truncated.
case (None, Some(max)) if single.fields.nonEmpty && !ResultCapContext.isSuppressed =>
// Story P0.6 (OQ-2) — the no-LIMIT truncation IS the meter biting (non-fatally): count it
// as a QueryResults cap-hit, the SAME kind as the explicit-LIMIT 402 above. The cap is
// suppressed for JOIN legs (the `!isSuppressed` guard), so a per-leg input truncation is
// never counted — only a genuine single-index result cap. (JOIN-output truncation is
// counted on the Joins axis downstream, not here — no double-count.)
capHitCollector.incrementCapHit(TelemetryCollector.CapHitKind.QueryResults)
logger.info(
s"ℹ️ No LIMIT on single-index scroll query; capping the stream at license quota ($max rows) and flagging truncation"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import app.softnetwork.elastic.client._
import app.softnetwork.elastic.client.result._
import app.softnetwork.elastic.client.scroll.{ScrollConfig, ScrollMetrics}
import app.softnetwork.elastic.licensing._
import app.softnetwork.elastic.licensing.metrics.MetricsApi
import app.softnetwork.elastic.sql.parser.Parser
import app.softnetwork.elastic.sql.query.{SearchStatement, SelectStatement, SingleSearch}
import com.typesafe.config.ConfigFactory
Expand Down Expand Up @@ -63,11 +64,15 @@ class CoreDqlExtensionSpec extends AnyFlatSpec with Matchers {
override def licenseType: LicenseType = tier
}

private def strategy(mgr: LicenseManager): LicenseRefreshStrategy =
private def strategy(
mgr: LicenseManager,
collector: TelemetryCollector = TelemetryCollector.Noop
): LicenseRefreshStrategy =
new LicenseRefreshStrategy {
override def initialize(): LicenseKey = LicenseKey.Community
override def refresh(): Either[LicenseError, LicenseKey] = Left(RefreshNotSupported)
override def licenseManager: LicenseManager = mgr
override def telemetryCollector: TelemetryCollector = collector
}

/** Records every statement forwarded to the scroll / searchAsync seam plus the scroll config, so
Expand Down Expand Up @@ -283,4 +288,78 @@ class CoreDqlExtensionSpec extends AnyFlatSpec with Matchers {
client.scrolledConfig.get().maxDocuments shouldBe None
truncationOf(res) shouldBe None
}

// ---- Story P0.6: QueryResults cap-hit recorded on BOTH reject branches ----

behavior of "CoreDqlExtension cap-hit instrumentation (P0.6)"

private def runWithCollector(
sql: String,
quota: Quota,
tier: LicenseType,
suppress: Boolean = false
): (TelemetryCollector, ElasticResult[QueryResult]) = {
val parsed = Parser(sql) match {
case Right(s) => s
case Left(e) => fail(s"parse failed: ${e.msg}")
}
val collector = new TelemetryCollector
val client = new RecordingClient()
val ext = new CoreDqlExtension()
ext.initialize(ConfigFactory.empty(), strategy(managerWithQuota(quota, tier), collector))
val res =
if (suppress)
ResultCapContext.suppressed(Await.result(ext.execute(parsed, client), 5.seconds))
else Await.result(ext.execute(parsed, client), 5.seconds)
(collector, res)
}

private def capHits(c: TelemetryCollector): Map[String, Long] =
c.collect(MetricsApi.Noop).capHitsByKind

it should "increment the QueryResults cap-hit on the explicit-LIMIT 402 branch" in {
val (collector, res) =
runWithCollector("SELECT a FROM idx LIMIT 20000", Quota.Community, LicenseType.Community)
res shouldBe a[ElasticFailure]
res.asInstanceOf[ElasticFailure].elasticError.statusCode shouldBe Some(402)
capHits(collector)("max_query_results") shouldBe 1L
// no other bucket bumped
capHits(collector)("max_joins") shouldBe 0L
capHits(collector)("max_clusters") shouldBe 0L
capHits(collector)("max_materialized_views") shouldBe 0L
}

it should "increment the QueryResults cap-hit on the P0.5 no-LIMIT truncation branch (OQ-2)" in {
val (collector, res) =
runWithCollector("SELECT a, b FROM idx", Quota.Community, LicenseType.Community)
res shouldBe a[ElasticSuccess[_]]
truncationOf(res).map(_.truncated) shouldBe Some(true)
capHits(collector)("max_query_results") shouldBe 1L
}

it should "NOT increment any cap-hit when an explicit LIMIT is within quota" in {
val (collector, res) =
runWithCollector("SELECT a FROM idx LIMIT 50", Quota.Community, LicenseType.Community)
res shouldBe a[ElasticSuccess[_]]
capHits(collector).values.toSet shouldBe Set(0L)
}

it should "NOT increment a cap-hit when a no-LIMIT query is a suppressed JOIN leg" in {
val (collector, res) =
runWithCollector(
"SELECT a, b FROM idx",
Quota.Community,
LicenseType.Community,
suppress = true
)
res shouldBe a[ElasticSuccess[_]]
capHits(collector).values.toSet shouldBe Set(0L)
}

it should "NOT increment a cap-hit for an unlimited (Enterprise) no-LIMIT query" in {
val (collector, res) =
runWithCollector("SELECT a, b FROM idx", Quota.Enterprise, LicenseType.Enterprise)
res shouldBe a[ElasticSuccess[_]]
capHits(collector).values.toSet shouldBe Set(0L)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ import java.util.concurrent.atomic.AtomicLong
* `Map.empty` default below applies only to the bare/no-op construction, never to a `collect()` /
* `collectAndReset()` result. Story 15.2 reads `joinQueryCount` into the `InstancePing` proto
* `join_query_count` field.
*
* Story P0.6 -- `capHitsByKind` is the per-quota-type cap-hit DELTA, with the SAME per-interval
* semantics as `joinQueryByRow` (NOT cumulative like `queriesTotal`): it counts how many times a
* license quota REJECTED an operation in the interval, keyed by the four quota types
* (`max_materialized_views`, `max_query_results`, `max_joins`, `max_clusters`). This is the launch
* "is the meter biting?" leading indicator (PRD §15.1 cap-hits). Like the JOIN buckets it ALWAYS
* carries all four keys on a COLLECTED snapshot (a zero is an explicit `0`, never an absent entry)
* so the wire shape stays stable for the daily ping; the `Map.empty` default applies only to the
* bare/no-op construction. Story P0.6 reads these into the `InstancePing` proto `capHitMax*`
* fields (14-17).
*/
case class TelemetryData(
queriesTotal: Long = 0,
Expand All @@ -48,7 +58,8 @@ case class TelemetryData(
clusterVersion: Option[String] = None,
aggregatedMetrics: AggregatedMetrics = AggregatedMetrics.empty,
joinQueryCount: Long = 0,
joinQueryByRow: Map[String, Long] = Map.empty
joinQueryByRow: Map[String, Long] = Map.empty,
capHitsByKind: Map[String, Long] = Map.empty
)

/** Mutable telemetry collector with atomic counters.
Expand All @@ -70,6 +81,12 @@ class TelemetryCollector {
private val _joinPassthrough = new AtomicLong(0L)
private val _joinCrossCluster = new AtomicLong(0L)
private val _joinCoordinator = new AtomicLong(0L)
// Story P0.6 -- per-interval cap-hit DELTA buckets, one per quota type. Same lock-free pattern as
// the JOIN buckets: independent AtomicLong so the increment never contends with collect/reset.
private val _capMv = new AtomicLong(0L)
private val _capResults = new AtomicLong(0L)
private val _capJoins = new AtomicLong(0L)
private val _capClusters = new AtomicLong(0L)
private val clusterInfoLock = new AnyRef
private var _mvsActive: Int = 0
private var _clustersConnected: Int = 0
Expand Down Expand Up @@ -106,6 +123,36 @@ class TelemetryCollector {
(p + x + c, Map("passthrough" -> p, "cross_cluster" -> x, "coordinator" -> c))
}

/** Record one license cap-hit (a quota REJECTED an operation), attributed to its quota type
* (Story P0.6). Real-time, lock-free (AtomicLong); the four buckets are independent so the
* increment never contends with collect/collectAndReset. Called at each of the four reject sites
* (CoreDqlExtension / MaterializedViewExtension / JoinPlanner / JoinLicenseGuard), regardless of
* whether that reject surfaces as an HTTP 402, a `Left(String)`, or a startup exit -- this is a
* SEMANTIC cap-hit, not an HTTP-status filter (PRD §15.1 / P0.6 OQ-1).
*/
def incrementCapHit(kind: TelemetryCollector.CapHitKind): Unit = {
val _ = (kind match {
case TelemetryCollector.CapHitKind.MaterializedViews => _capMv
case TelemetryCollector.CapHitKind.QueryResults => _capResults
case TelemetryCollector.CapHitKind.Joins => _capJoins
case TelemetryCollector.CapHitKind.Clusters => _capClusters
}).incrementAndGet()
}

/** Snapshot+reset ONLY the cap-hit delta (read+zero with `getAndSet`). Used by the scheduled tick
* when `config.telemetryEnabled` is false (so the buckets still reset, mirroring AC 5a) and by
* the clean-shutdown flush (mirroring AC 5b / P0.6 OQ-8 -- the `maxClusters` cap-hit happens on
* a startup CrashLoop, so its delta MUST be flushed before `sys.exit`). ALWAYS returns all four
* keys (an explicit `0`, never an absent entry) so the wire shape stays stable for the daily
* ping, exactly like `collectAndResetJoinCounts`.
*/
def collectAndResetCapHits(): Map[String, Long] = Map(
"max_materialized_views" -> _capMv.getAndSet(0L),
"max_query_results" -> _capResults.getAndSet(0L),
"max_joins" -> _capJoins.getAndSet(0L),
"max_clusters" -> _capClusters.getAndSet(0L)
)

def setMvsActive(count: Int): Unit = clusterInfoLock.synchronized { _mvsActive = count }

def setClustersConnected(count: Int): Unit = clusterInfoLock.synchronized {
Expand All @@ -124,6 +171,14 @@ class TelemetryCollector {

// --- Read methods (called by AutoRefreshStrategy.doScheduleRefresh) ---

/** Read-only snapshot of the cap-hit buckets (all four keys, NO reset). Used by `collect`. */
private def readCapHits(): Map[String, Long] = Map(
"max_materialized_views" -> _capMv.get(),
"max_query_results" -> _capResults.get(),
"max_joins" -> _capJoins.get(),
"max_clusters" -> _capClusters.get()
)

def collect(metrics: MetricsApi): TelemetryData = clusterInfoLock.synchronized {
// JOIN buckets: read with .get() -- collect NEVER resets (interval read without flush).
val p = _joinPassthrough.get()
Expand All @@ -138,7 +193,9 @@ class TelemetryCollector {
clusterVersion = _clusterVersion,
aggregatedMetrics = metrics.getAggregatedMetrics,
joinQueryCount = p + x + c,
joinQueryByRow = Map("passthrough" -> p, "cross_cluster" -> x, "coordinator" -> c)
joinQueryByRow = Map("passthrough" -> p, "cross_cluster" -> x, "coordinator" -> c),
// Cap-hit buckets: read with .get() -- collect NEVER resets (interval read without flush).
capHitsByKind = readCapHits()
)
}

Expand All @@ -160,7 +217,9 @@ class TelemetryCollector {
clusterVersion = _clusterVersion,
aggregatedMetrics = metrics.collectAndResetAggregatedMetrics,
joinQueryCount = p + x + c,
joinQueryByRow = Map("passthrough" -> p, "cross_cluster" -> x, "coordinator" -> c)
joinQueryByRow = Map("passthrough" -> p, "cross_cluster" -> x, "coordinator" -> c),
// Cap-hit buckets: read+zero with .getAndSet(0L) -- per-interval delta; this IS the flush.
capHitsByKind = collectAndResetCapHits()
)
}
}
Expand Down Expand Up @@ -189,6 +248,37 @@ object TelemetryCollector {
case object Coordinator extends JoinRow
}

/** The license quota type a cap-hit is attributed to (Story P0.6). Each rejected operation
* increments exactly one bucket. The four types map 1:1 to the four enforced quotas; the `key`
* is the snake-case wire/column name carried to the license-server `instance_ping` cap-hit
* columns.
*/
sealed trait CapHitKind { def key: String }
object CapHitKind {

/** `maxMaterializedViews` -- `MaterializedViewExtension` CREATE over quota (the quota-exceeded
* 402 branch ONLY; NOT the feature-absent path -- P0.6 OQ-7).
*/
case object MaterializedViews extends CapHitKind { val key = "max_materialized_views" }

/** `maxQueryResults` -- `CoreDqlExtension` single-index result boundary: BOTH the
* explicit-LIMIT 402 reject AND the P0.5 no-LIMIT truncation/cappedScroll bite (P0.6 OQ-2).
*/
case object QueryResults extends CapHitKind { val key = "max_query_results" }

/** `maxJoins` -- `JoinPlanner.plan()` cross-index JOIN-count reject (`Left(String)`, NOT a
* 402).
*/
case object Joins extends CapHitKind { val key = "max_joins" }

/** `maxClusters` -- `JoinLicenseGuard` federation cluster-count reject (startup `Left`/exit or
* `checkClusterLimit` `Left(QuotaExceeded)`, NOT a 402).
*/
case object Clusters extends CapHitKind { val key = "max_clusters" }

val values: Seq[CapHitKind] = Seq(MaterializedViews, QueryResults, Joins, Clusters)
}

/** Default collector returning zero-valued data. Used when telemetry is disabled or no runtime
* wires a real collector. Write methods are no-ops to prevent accidental mutation of the shared
* singleton.
Expand All @@ -198,6 +288,9 @@ object TelemetryCollector {
override def incrementJoin(row: JoinRow): Unit = ()
override def collectAndResetJoinCounts(): (Long, Map[String, Long]) =
(0L, Map("passthrough" -> 0L, "cross_cluster" -> 0L, "coordinator" -> 0L))
override def incrementCapHit(kind: CapHitKind): Unit = ()
override def collectAndResetCapHits(): Map[String, Long] =
CapHitKind.values.map(_.key -> 0L).toMap
override def setMvsActive(count: Int): Unit = ()
override def setClustersConnected(count: Int): Unit = ()
override def setClusterInfo(
Expand Down
Loading
Loading