feat(inkless): POD-2395 Prune consolidated diskless offsets#587
feat(inkless): POD-2395 Prune consolidated diskless offsets#587viktorsomogyi wants to merge 11 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
Adds support for pruning diskless WAL batch metadata once it has been consolidated to remote storage, wiring the pruning into the broker so diskless logs can advance their effective start offset and drop already-tiered batch metadata from the control plane.
Changes:
- Introduces a Postgres routine + control-plane job/API for pruning batches below the highest tiered offset and updating
logs.diskless_start_offset. - Adds a periodic broker-side pruner (
ConsolidatedDisklessLogPruner) scheduled fromReplicaManagerto invoke the control-plane pruning and update in-memory partition state. - Updates fetch-path log start offset handling and expands test coverage for pruning + fetch overlay behavior.
Reviewed changes
Copilot reviewed 19 out of 136 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java | Exposes highestOffsetInRemoteStorage() publicly for cross-module access. |
| storage/inkless/src/test/java/io/aiven/inkless/control_plane/postgres/PruneBatchesBelowHighestTieredOffsetV1Test.java | New PG integration tests for pruning routine semantics. |
| storage/inkless/src/main/resources/db/migration/V12__Prune_diskless_batches.sql | Adds V12 types + pruning function in Postgres. |
| storage/inkless/src/main/jooq/org/jooq/generated/UDTs.java | jOOQ generated updates: adds prune UDT references and schema v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/ReleaseFileMergeWorkItemResponseV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/ReleaseFileMergeWorkItemResponseV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/ListOffsetsResponseV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/ListOffsetsRequestV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/InitDisklessLogResponseV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/InitDisklessLogRequestV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/InitDisklessLogProducerStateV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/FindBatchesResponseV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/FindBatchesRequestV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/FileMergeWorkItemResponseV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/FileMergeWorkItemResponseFileV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/FileMergeWorkItemResponseBatchV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/EnforceRetentionResponseV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/EnforceRetentionRequestV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/DeleteRecordsResponseV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/DeleteRecordsRequestV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/CommitFileMergeWorkItemResponseV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/CommitFileMergeWorkItemBatchV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/CommitBatchResponseV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/CommitBatchRequestV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/BatchMetadataV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/records/BatchInfoV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/ReleaseFileMergeWorkItemResponseV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/ListOffsetsResponseV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/ListOffsetsRequestV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/InitDisklessLogResponseV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/InitDisklessLogRequestV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/InitDisklessLogProducerStateV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/FindBatchesResponseV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/FindBatchesRequestV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/FileMergeWorkItemResponseV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/FileMergeWorkItemResponseFileV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/FileMergeWorkItemResponseBatchV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/EnforceRetentionResponseV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/EnforceRetentionRequestV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/DeleteRecordsResponseV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/DeleteRecordsRequestV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/CommitFileMergeWorkItemResponseV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/CommitFileMergeWorkItemBatchV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/CommitBatchResponseV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/CommitBatchRequestV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/BatchMetadataV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/paths/BatchInfoV1Path.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/ListOffsetsResponseV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/ListOffsetsRequestV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/InitDisklessLogResponseV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/InitDisklessLogRequestV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/InitDisklessLogProducerStateV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/FindBatchesResponseV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/FindBatchesRequestV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/FileMergeWorkItemResponseV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/FileMergeWorkItemResponseFileV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/FileMergeWorkItemResponseBatchV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/EnforceRetentionResponseV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/EnforceRetentionRequestV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/DeleteRecordsResponseV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/DeleteRecordsRequestV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/CommitFileMergeWorkItemResponseV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/CommitFileMergeWorkItemBatchV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/CommitBatchResponseV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/CommitBatchRequestV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/BatchMetadataV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/udt/BatchInfoV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/ProducerStateRecord.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/LogsRecord.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/ListOffsetsV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/InitDisklessLogV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/GetFileMergeWorkItemV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/FindBatchesV2Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/FindBatchesV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/FilesRecord.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/FileMergeWorkItemsRecord.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/FileMergeWorkItemFilesRecord.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/EnforceRetentionV2Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/EnforceRetentionV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/DeleteRecordsV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/CommitFileV1Record.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/records/BatchesRecord.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/ProducerState.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/Logs.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/ListOffsetsV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/InitDisklessLogV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/GetFileMergeWorkItemV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/FindBatchesV2.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/FindBatchesV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/Files.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/FileMergeWorkItems.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/FileMergeWorkItemFiles.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/EnforceRetentionV2.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/EnforceRetentionV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/DeleteRecordsV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/CommitFileV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/tables/Batches.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/Tables.java | jOOQ generated updates: adds prune table-function wiring and schema v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/ReleaseFileMergeWorkItemV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/MarkFileToDeleteV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/DeleteTopicV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/DeleteFilesV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/DeleteBatchV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/CommitFileMergeWorkItemV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/routines/BatchTimestamp.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/Routines.java | jOOQ generated updates: adds prune routine wiring and schema v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/Keys.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/Indexes.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/ReleaseFileMergeWorkItemErrorV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/ListOffsetsResponseErrorV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/InitDisklessLogErrorV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/FindBatchesResponseErrorV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/FileStateT.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/FileReasonT.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/EnforceRetentionResponseErrorV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/DeleteRecordsResponseErrorV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/CommitFileMergeWorkItemErrorV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/enums/CommitBatchResponseErrorV1.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/Domains.java | jOOQ generated schema version bump to v12. |
| storage/inkless/src/main/jooq/org/jooq/generated/DefaultSchema.java | jOOQ generated updates: adds prune objects into schema model and schema v12. |
| storage/inkless/src/main/java/io/aiven/inkless/delete/PruneDisklessLogsResponse.java | New control-plane response record for pruning results. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/PruneDisklessLogsRequest.java | New control-plane request record for pruning inputs. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PruneDisklessLogsJob.java | New PG job calling prune routine and mapping results. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlaneMetrics.java | Adds metrics tracking for prune job latency. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/postgres/PostgresControlPlane.java | Exposes pruneDisklessLogs via Postgres control plane. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/MetadataView.java | Adds topic-id->name lookup and consolidating partition enumeration. |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/InMemoryControlPlane.java | Adds new API method (currently unimplemented). |
| storage/inkless/src/main/java/io/aiven/inkless/control_plane/ControlPlane.java | Adds new pruneDisklessLogs control-plane API. |
| core/src/test/scala/io/aiven/inkless/consolidation/DisklessLeaderEndPointTest.scala | Adjusts/extends fetch overlay tests for logStartOffset + error behavior. |
| core/src/test/scala/io/aiven/inkless/consolidation/ConsolidatedDisklessLogPrunerTest.scala | New unit tests for pruner request building and update behavior. |
| core/src/test/java/kafka/server/InklessConsolidatedDisklessTopicsTest.java | Adds end-to-end assertions that control-plane WAL metadata is pruned post-tiering. |
| core/src/main/scala/kafka/server/ReplicaManager.scala | Schedules periodic consolidated-diskless pruning task. |
| core/src/main/scala/kafka/server/metadata/InklessMetadataView.scala | Implements new MetadataView APIs for topic name and consolidating partitions. |
| core/src/main/scala/kafka/cluster/Partition.scala | Adds volatile diskless start offset state + setters/getters. |
| core/src/main/scala/io/aiven/inkless/consolidation/DisklessLeaderEndPoint.scala | Overlays local partition logStartOffset into fetch response when appropriate. |
| core/src/main/scala/io/aiven/inkless/consolidation/ConsolidatedDisklessLogPruner.scala | New broker-side pruner invoking control plane and updating partitions. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
bc2a937 to
e4525a8
Compare
e4525a8 to
1d2bba3
Compare
1a67b69 to
f813f46
Compare
7f6ecd5 to
a5347e6
Compare
0e3b86b to
7d817a1
Compare
a5347e6 to
69b7334
Compare
| ORDER BY topic_id, partition | ||
| FOR UPDATE |
There was a problem hiding this comment.
Are these needed? there is a FOR UPDATE before already, and results are filtered so ordering shouldn't be needed.
There was a problem hiding this comment.
well, it's there to catch an update that was made after the execution of the first one (if new log rows were added since). But maybe it would be better to create a temporary table from the first one and work on that? That way we may not need this and hopefully it would be cleaner.
There was a problem hiding this comment.
Ended up refactoring into a temp table. I could get rid of the FOR UPDATE though 😄 . Unfortunately we still need to iterate through arg_requests to be able to return unknown_topic_or_partition errors so I can't really simplify it more.
There was a problem hiding this comment.
I think I misunderstood the initial implementation and the FOR UPDATE wasn't redundant -- it's the lock that prevents concurrent mutations between your read and your UPDATE logs SET log_start_offset. Without it, there's a TOCTOU window. The ORDER BY on the locking query is also needed (deadlock prevention). Could you bring those back and add a comment on those? And given that FOR UPDATE must stay, the temp table becomes optional — a PERFORM ... FOR UPDATE + direct re-read from logs inside the loop achieves the same correctness without DDL overhead. Sorry for the confusion here.
| replicaManager.getPartitionOrError(pruneDisklessLogsResponse.topicIdPartition.topicPartition) match { | ||
| case Right(partition) => | ||
| val newDisklessLogStart = pruneDisklessLogsResponse.disklessLogStartOffset | ||
| if (partition.getDisklessLogStartOffset() > newDisklessLogStart) { |
There was a problem hiding this comment.
@jeqo put the monotonicity check here because inside Partition it would've been weird because that way we wouldn't have used getDisklessLogStartOffset so we would have ended up with a setter that could potentially throw an exception (depending on the implementation for invalid new diskless start offset).
I have an upcoming PR where I can possibly further enhance the handling of these kind of issues and also add pre-consolidation truncation (because doing consolidation for a migrated topic might be unsafe), so I might be able to address this more completely in that.
There was a problem hiding this comment.
What's the intended consumer of this field beyond the pruner's own monotonicity check? If it's just monotonicity enforcement, this should be:
// Self-enforcing — no external caller needs to know the contract
def advanceDisklessLogStartOffset(newOffset: Long): Boolean = {
if (newOffset > disklessLogStartOffset) {
disklessLogStartOffset = newOffset
true
} else {
false
}
}
Then the pruner becomes:
if (!partition.advanceDisklessLogStartOffset(newDisklessLogStart)) {
logger.error("Diskless log start offset is non-monotonic...")
}
If there is a future reader (e.g., metrics, etc.), then a proper getter makes sense but should be named to signal it's a cached/approximate value (e.g., cachedDisklessLogStartOffset()), since the source of truth is the control plane. For fetch routing the control plane already handled this I think.
There was a problem hiding this comment.
Instead of advanceDisklessLogStartOffset maybe something closer to Log's maybeUpdateHighWatermark, e.g. maybeUpdateDisklessLogStartOffset?
There was a problem hiding this comment.
If there is a future reader (e.g., metrics, etc.), then a proper getter makes sense but should be named to signal it's a cached/approximate value (e.g., cachedDisklessLogStartOffset()), since the source of truth is the control plane. For fetch routing the control plane already handled this I think.
Yea, perviously thought that there will be, but it turns out I won't need to use the cached diskless start offset. I want to add a reconciliation step as a pre-requirement to consolidation to eliminate the possibility of the cases where the classic log LEO isn't truncated yet. For this classicToDiskless and diskless log start offsets might have been useful but at that point there's no cached value yet, so I'd have to issue a query to the control plane. But anyway, this is a future issue and since I won't end up using the disklessLogStartOffset, I agree that your proposal makes more sense.
Consolidated diskless log pruning will be introduced on the interface level too to define a common API for various control plane implementations.
Adds consolidation.cleanup.interval.ms to be able to control the log pruning interval. This will be used to start a periodic cleanup process to remove diskless batches that have been moved to tiered storage already.
ConsolidatedDisklessLogPruner implements the cleanup logic that is common to all control plane implementations and utilizes the control plane to start the cleanup process. # Conflicts: # core/src/main/scala/kafka/server/ReplicaManager.scala
Adds the diskless pruner implementation in the Postgres control plane.
Adds extra verification to the consolidation test to verify that diskless pruning happens correctly.
b82eb4e to
19c93ea
Compare
19c93ea to
08c2067
Compare
Co-authored-by: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
Diskless logs which have been already consolidated to the remote
tier should be removed from the coordinator and the WAL. This commit
adds the functionality to do that:
The pruning will be invoked as a periodic task in ReplicaManager
with a configurable cleanup period.