refactor(inkless): SharedState to own StorageBackend lifecycle#562
refactor(inkless): SharedState to own StorageBackend lifecycle#562jeqo wants to merge 5 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors Inkless SharedState to centrally own the lifecycle of storage backends by eagerly creating dedicated StorageBackend instances for fetch/produce/background use, and updating consumers to stop creating/closing their own backends.
Changes:
SharedState.initialize()now creates and stores 4 backend instances (fetchStorage, optionalmaybeLaggingFetchStorage,produceStorage,backgroundStorage) andclose()shuts them down.- Consumers (
FetchHandler/Reader,AppendHandler/Writer,FileCleaner,FileMerger) are updated to use the new accessors and no longer close storage. - Adds
SharedStateTestand updates multiple tests to reflect the new constructor/lifecycle behavior.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java | Centralizes backend creation and lifecycle; adds dedicated backend accessors and new close behavior. |
| storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java | Switches from buildStorage() to fetchStorage() / maybeLaggingFetchStorage(). |
| storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java | Accepts optional lagging backend and stops closing storage backends on close(). |
| storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java | Uses produceStorage() instead of building a backend per consumer. |
| storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java | Removes direct storage ownership/closing; relies on SharedState lifecycle. |
| storage/inkless/src/main/java/io/aiven/inkless/delete/FileCleaner.java | Uses backgroundStorage() and stops closing storage in close(). |
| storage/inkless/src/main/java/io/aiven/inkless/merge/FileMerger.java | Uses backgroundStorage() and stops closing storage in close(). |
| storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java | New tests for shutdown ordering, lagging-disabled path, and init-failure cleanup. |
| storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java | Updates test construction to avoid SharedState dependency. |
| storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java | Adds config stubs needed for eager SharedState.initialize() backend creation. |
| storage/inkless/src/test/java/io/aiven/inkless/produce/WriterMockedTest.java | Updates Writer construction to reflect removed storage parameter in test ctor. |
| storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java | Updates Writer construction to reflect removed storage parameter in test ctor. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
02c2af6 to
1beeda2
Compare
81a7708 to
ba45b77
Compare
EelisK
left a comment
There was a problem hiding this comment.
Nice work, left a few comments and questions 👍
I think it could be good to have another pair of eyes validate the approach. I fear that SharedState could become a god object, which is not good either. Have we considered passing the storage dependencies directly via constructors to each component instead?
| brokerTopicStats, | ||
| defaultTopicConfigs | ||
| ); | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
This is a pretty broad catch statement 👀 is it possible to limit the scope to only the statements that can realistically raise exceptions?
There was a problem hiding this comment.
Yeah, it's a bit long. The other option is using Utils.closeAll within (yet another) try-catch but I'm not sure that's much clearer.
I'd try to move it into a new function but it would need to take parameters as resources are not the same and cannot be hidden.
Using closeAll would be around the same lines:
} catch (Exception e) {
try {
Utils.closeAll(
backgroundStorage,
produceStorage,
laggingFetchStorage,
fetchStorage,
storageMetrics,
batchCoordinateCache,
objectCache
);
} catch (IOException ex) {
throw new RuntimeException("Failed to close all resources when initializing SharedState", ex);
}
throw new RuntimeException("Failed to initialize SharedState", e);
}If you feel it's much readable I can to reconsider and use this approach.
| // Enabling lagging consumer support requires a broker restart so that a new storage client can be created. | ||
| laggingFetchStorage = config.fetchLaggingConsumerThreadPoolSize() > 0 ? config.storage(storageMetrics) : null; | ||
| produceStorage = config.storage(storageMetrics); | ||
| backgroundStorage = config.storage(storageMetrics); |
There was a problem hiding this comment.
Note: I think this is now shared between FileCleaner and FileMerger and could be problematic with multithreaded use, but it should be fine because FileMerger is not used and is pending removal?
There was a problem hiding this comment.
It's ok to share these resources when related to similar jobs -- in this case background jobs -- as these are thread-safe. Now that they are managed by SharedState, they don't need to race for who needs to close the resource.
Or do you mean another type of problem could be created here?
|
@EelisK good catch on the god object. Agree SharedState has high cohesion by design — it's the composition root for the inkless module. Its job is to own the lifecycle of shared infrastructure (storage backends, caches, metrics) and provide them to components. When upstream this should be broken up and move the initialization into respective managers (e.g. ReplicaManager) |
SharedState now creates and manages 4 dedicated StorageBackend instances (fetch, laggingFetch, produce, background) instead of exposing a buildStorage() factory method. This centralizes resource lifecycle, eliminates double-close risks, and makes ownership explicit. - Private constructor with typed storage backend parameters - initialize() creates backends with try/catch cleanup on partial failure - close() shuts down all backends in reverse creation order - Consumers (Reader, Writer, FileCleaner, FileMerger) no longer close storage backends — SharedState owns their lifecycle Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ateTest.java Co-authored-by: Eelis Kostiainen <eelis.kostiainen@aiven.io>
f7dab86 to
586d0c1
Compare
Gate lagging consumer fetcher on pool size in Reader constructor, fail-fast with IllegalStateException if pool size > 0 but no storage is provided. Fix DisklessSwitchInvariantsTest by explicitly disabling lagging consumer feature. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
SharedStatenow creates and manages 4 dedicatedStorageBackendinstances (fetchStorage,maybeLaggingFetchStorage,produceStorage,backgroundStorage) instead of exposing abuildStorage()factory methodReader,Writer,FileCleaner,FileMerger) no longer close storage backends —SharedStateowns their lifecycleChanges
initialize()creates backends with try/catch cleanup,close()shuts down all backends in reverse creation order usingUtils.closeQuietlyfetchStorage()andmaybeLaggingFetchStorage()(Optional) from SharedStateproduceStorage(), Writer no longer stores or closes a StorageBackend fieldbackgroundStorage()(shared, thread-safe)SharedStateTest: verifies close ordering, lagging-disabled path, and failure cleanupMotivation
Previously each consumer called
buildStorage()to create its own backend and was responsible for closing it. This led to:Test plan
SharedStateTestcovers close ordering, disabled lagging path, and partial init failure cleanup🤖 Generated with Claude Code