Skip to content

refactor(inkless): SharedState to own StorageBackend lifecycle#562

Open
jeqo wants to merge 5 commits into
mainfrom
jeqo/refactor-shared-state-v2
Open

refactor(inkless): SharedState to own StorageBackend lifecycle#562
jeqo wants to merge 5 commits into
mainfrom
jeqo/refactor-shared-state-v2

Conversation

@jeqo
Copy link
Copy Markdown
Contributor

@jeqo jeqo commented Apr 1, 2026

Summary

  • SharedState now creates and manages 4 dedicated StorageBackend instances (fetchStorage, maybeLaggingFetchStorage, produceStorage, backgroundStorage) instead of exposing a buildStorage() factory method
  • Centralizes resource lifecycle with proper cleanup on partial initialization failure and ordered shutdown
  • Consumers (Reader, Writer, FileCleaner, FileMerger) no longer close storage backends — SharedState owns their lifecycle

Changes

  • SharedState: private constructor, initialize() creates backends with try/catch cleanup, close() shuts down all backends in reverse creation order using Utils.closeQuietly
  • FetchHandler/Reader: use fetchStorage() and maybeLaggingFetchStorage() (Optional) from SharedState
  • AppendHandler/Writer: use produceStorage(), Writer no longer stores or closes a StorageBackend field
  • FileCleaner/FileMerger: use backgroundStorage() (shared, thread-safe)
  • New SharedStateTest: verifies close ordering, lagging-disabled path, and failure cleanup

Motivation

Previously each consumer called buildStorage() to create its own backend and was responsible for closing it. This led to:

  • Distributed ownership making it hard to reason about lifecycle
  • Risk of double-close or missed-close bugs
  • No cleanup on partial initialization failure

Test plan

  • All 900 inkless storage tests pass
  • New SharedStateTest covers close ordering, disabled lagging path, and partial init failure cleanup
  • CI passes

🤖 Generated with Claude Code

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors Inkless SharedState to centrally own the lifecycle of storage backends by eagerly creating dedicated StorageBackend instances for fetch/produce/background use, and updating consumers to stop creating/closing their own backends.

Changes:

  • SharedState.initialize() now creates and stores 4 backend instances (fetchStorage, optional maybeLaggingFetchStorage, produceStorage, backgroundStorage) and close() shuts them down.
  • Consumers (FetchHandler/Reader, AppendHandler/Writer, FileCleaner, FileMerger) are updated to use the new accessors and no longer close storage.
  • Adds SharedStateTest and updates multiple tests to reflect the new constructor/lifecycle behavior.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java Centralizes backend creation and lifecycle; adds dedicated backend accessors and new close behavior.
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchHandler.java Switches from buildStorage() to fetchStorage() / maybeLaggingFetchStorage().
storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java Accepts optional lagging backend and stops closing storage backends on close().
storage/inkless/src/main/java/io/aiven/inkless/produce/AppendHandler.java Uses produceStorage() instead of building a backend per consumer.
storage/inkless/src/main/java/io/aiven/inkless/produce/Writer.java Removes direct storage ownership/closing; relies on SharedState lifecycle.
storage/inkless/src/main/java/io/aiven/inkless/delete/FileCleaner.java Uses backgroundStorage() and stops closing storage in close().
storage/inkless/src/main/java/io/aiven/inkless/merge/FileMerger.java Uses backgroundStorage() and stops closing storage in close().
storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java New tests for shutdown ordering, lagging-disabled path, and init-failure cleanup.
storage/inkless/src/test/java/io/aiven/inkless/delete/DeleteRecordsInterceptorTest.java Updates test construction to avoid SharedState dependency.
storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java Adds config stubs needed for eager SharedState.initialize() backend creation.
storage/inkless/src/test/java/io/aiven/inkless/produce/WriterMockedTest.java Updates Writer construction to reflect removed storage parameter in test ctor.
storage/inkless/src/test/java/io/aiven/inkless/produce/WriterPropertyTest.java Updates Writer construction to reflect removed storage parameter in test ctor.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread storage/inkless/src/main/java/io/aiven/inkless/consume/Reader.java Outdated
Comment thread storage/inkless/src/test/java/io/aiven/inkless/merge/FileMergerMockedTest.java Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo marked this pull request as ready for review April 2, 2026 18:43
@jeqo jeqo force-pushed the jeqo/refactor-shared-state-v2 branch from 02c2af6 to 1beeda2 Compare April 20, 2026 20:10
@jeqo jeqo requested a review from viktorsomogyi April 20, 2026 21:30
@jeqo jeqo force-pushed the jeqo/refactor-shared-state-v2 branch 3 times, most recently from 81a7708 to ba45b77 Compare April 30, 2026 17:33
@jeqo jeqo requested a review from EelisK May 18, 2026 08:18
Copy link
Copy Markdown
Member

@EelisK EelisK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work, left a few comments and questions 👍

I think it could be good to have another pair of eyes validate the approach. I fear that SharedState could become a god object, which is not good either. Have we considered passing the storage dependencies directly via constructors to each component instead?

Comment thread storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java Outdated
Comment thread storage/inkless/src/main/java/io/aiven/inkless/common/SharedState.java Outdated
brokerTopicStats,
defaultTopicConfigs
);
} catch (Exception e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pretty broad catch statement 👀 is it possible to limit the scope to only the statements that can realistically raise exceptions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's a bit long. The other option is using Utils.closeAll within (yet another) try-catch but I'm not sure that's much clearer.
I'd try to move it into a new function but it would need to take parameters as resources are not the same and cannot be hidden.

Using closeAll would be around the same lines:

        } catch (Exception e) {
            try {
                Utils.closeAll(
                    backgroundStorage,
                    produceStorage,
                    laggingFetchStorage,
                    fetchStorage,
                    storageMetrics,
                    batchCoordinateCache,
                    objectCache
                );
            } catch (IOException ex) {
                throw new RuntimeException("Failed to close all resources when initializing SharedState", ex);
            }
            throw new RuntimeException("Failed to initialize SharedState", e);
        }

If you feel it's much readable I can to reconsider and use this approach.

// Enabling lagging consumer support requires a broker restart so that a new storage client can be created.
laggingFetchStorage = config.fetchLaggingConsumerThreadPoolSize() > 0 ? config.storage(storageMetrics) : null;
produceStorage = config.storage(storageMetrics);
backgroundStorage = config.storage(storageMetrics);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I think this is now shared between FileCleaner and FileMerger and could be problematic with multithreaded use, but it should be fine because FileMerger is not used and is pending removal?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok to share these resources when related to similar jobs -- in this case background jobs -- as these are thread-safe. Now that they are managed by SharedState, they don't need to race for who needs to close the resource.
Or do you mean another type of problem could be created here?

Comment thread storage/inkless/src/test/java/io/aiven/inkless/common/SharedStateTest.java Outdated
@jeqo
Copy link
Copy Markdown
Contributor Author

jeqo commented May 18, 2026

@EelisK good catch on the god object. Agree SharedState has high cohesion by design — it's the composition root for the inkless module. Its job is to own the lifecycle of shared infrastructure (storage backends, caches, metrics) and provide them to components.

When upstream this should be broken up and move the initialization into respective managers (e.g. ReplicaManager)

jeqo and others added 4 commits May 25, 2026 22:24
SharedState now creates and manages 4 dedicated StorageBackend instances
(fetch, laggingFetch, produce, background) instead of exposing a
buildStorage() factory method. This centralizes resource lifecycle,
eliminates double-close risks, and makes ownership explicit.

- Private constructor with typed storage backend parameters
- initialize() creates backends with try/catch cleanup on partial failure
- close() shuts down all backends in reverse creation order
- Consumers (Reader, Writer, FileCleaner, FileMerger) no longer close
  storage backends — SharedState owns their lifecycle

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ateTest.java

Co-authored-by: Eelis Kostiainen <eelis.kostiainen@aiven.io>
@jeqo jeqo force-pushed the jeqo/refactor-shared-state-v2 branch from f7dab86 to 586d0c1 Compare May 25, 2026 19:24
@jeqo jeqo requested a review from AnatolyPopov May 25, 2026 19:26
Gate lagging consumer fetcher on pool size in Reader constructor,
fail-fast with IllegalStateException if pool size > 0 but no storage
is provided. Fix DisklessSwitchInvariantsTest by explicitly disabling
lagging consumer feature.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants