Skip to content

[api][runtime] Extend agent skill loading repository.#655

Merged
wenjin272 merged 3 commits into
apache:mainfrom
wenjin272:skill-repo-extending
May 20, 2026
Merged

[api][runtime] Extend agent skill loading repository.#655
wenjin272 merged 3 commits into
apache:mainfrom
wenjin272:skill-repo-extending

Conversation

@wenjin272
Copy link
Copy Markdown
Collaborator

@wenjin272 wenjin272 commented May 11, 2026

Linked issue: #592, #593, #594

Purpose of change

Extend the way Flink-Agents loads external agent skills:

  • load agent skills from remote urls (only http/https point to a zip)
  • load agent skills from python package
  • load agent skills from java classpath

Tests

ut

API

Yes, add from_xxx to Skills.

Documentation

  • doc-needed
  • doc-not-needed
  • doc-included

@github-actions github-actions Bot added doc-needed Your PR changes impact docs. fixVersion/0.3.0 The feature or bug should be implemented/fixed in the 0.3.0 version. priority/major Default priority of the PR or issue. and removed doc-needed Your PR changes impact docs. labels May 11, 2026
@wenjin272 wenjin272 changed the title Skill repo extending [api][runtime] Extend agent skill loading repository. May 11, 2026
@github-actions github-actions Bot added doc-needed Your PR changes impact docs. and removed doc-needed Your PR changes impact docs. labels May 11, 2026
@wenjin272 wenjin272 force-pushed the skill-repo-extending branch from f448700 to f2d4239 Compare May 12, 2026 04:00
@wenjin272 wenjin272 requested a review from wzhero1 May 12, 2026 05:57
@wzhero1
Copy link
Copy Markdown
Collaborator

wzhero1 commented May 13, 2026

Thanks for the PR. Left some comments as below.

* <p>The zip is downloaded to a temp file and extracted into a process-local temp directory
* (cleaned up at JVM exit). The downloaded zip itself is removed once extraction completes.
*/
public class URLSkillRepository extends FileSystemSkillRepository {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inheritance is being used as composition.

A URL repo is not logically a FileSystem repo — it fetches over the network, then delegates. Side effects: the constructor performs network I/O before super(...), the original url is lost for diagnostics, there's no close() lifecycle, and the fetch step can't be mocked alone.

Suggest a SkillSource { Path materialize() } interface composed into one MaterializingSkillRepository. Same applies to ClasspathSkillRepository and PackageSkillRepository.

private List<String> paths;
private final List<String> paths;
private final List<String> urls;
private final List<String> classpathResources;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a new source today forces 5 parallel changes (Skills field + factory + XxxSkillRepository + SkillManager for-loop + AgentPlan merge branch).

Suggest a discriminated union — single sources: List<SkillSourceSpec(scheme, location)> field + a scheme → handler registry. New source = one registration, no other file touched. Bonus: cross-language ser/de becomes uniform.

Wire-format-affecting; much cheaper to fix before 0.3.0 ships.

Comment thread python/flink_agents/api/skills.py Outdated

# http(s) URLs. Each URL must point to a ``.zip`` whose top level is
# the baseDir.
urls: List[str] = Field(default_factory=list)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cross-language deserialization silently drops fields. Python writes packages → Java drops it → 0 skills loaded, no warning. Mirror for Java classpathResources → Python.

Short term: emit a WARN on unrecognized source fields during deserialization. Long term: unify under one cross-language scheme (see the sources discriminator suggested on Skills.java).

if ("jar".equals(protocol)) {
return SkillMaterializer.extractClasspathFromJar(url, resource);
}
throw new IllegalArgumentException(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Flink fat-jar / nested-jar deployment is not covered.

Flink user JARs typically use Maven Shade or jar-in-jar; getResource("skills") returns protocols like jar:nested: that throw right here. findInUrlClassLoader also assumes URLClassLoader, not Flink's ChildFirstClassLoader.

Issue #593 explicitly targets this scenario. Before merge: (1) an end-to-end test on a Flink mini-cluster, (2) use ClassLoader.getResources() + getResourceAsStream instead of dispatching on URL protocol.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the significance of highlighting URLClassLoader and ChildFirstClassLoader here? I understand that ChildFirstClassLoader is also a subclass of URLClassLoader.

Since we set the contextClassLoader to Flink's UserCodeClassloader before executing a JavaAction, it is used in most scenarios. However, there was an issue with the initialization of the skill-managed path in Python Actions. I have modified the code to explicitly pass the UserCodeClassloader.

Typically, I understand that the Flink user JAR does not use the jar-in-jar approach.

getResourceAsStream requires a file path as input, but the resource here is typically a directory name, so getResourceAsStream cannot be used.

@@ -44,7 +47,7 @@ public class SkillManager {

public SkillManager(Skills config) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constructor is synchronous, blocking, and not recoverable. loadAll() fetches sources serially; one slow URL stalls startup, one failure kills the whole SkillManager, and repos is final so there's no hot reload.

Follow-up: parallel materialization (CompletableFuture), fail-soft mode with a failure list, lazy load on first getSkill(), optional reloadSource() API.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior will be optimized in subsequent iterations.

}
}

private void registerRepo(SkillRepository repo) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate skill names overwrite silently with no log. Multi-source name collisions go through skills.put() and the later registration wins — no WARN, no error. Users debugging "why doesn't my SKILL.md take effect" get no diagnostic.

Minimum: WARN with both origins. Better: document the paths → urls → classpath precedence. (Naming the source concretely needs the SkillOrigin change on AgentSkill.)

// Fallback for URLClassLoader: JARs without explicit directory entries will not return
// a URL for a prefix via getResource(). Scan the classloader's URLs directly to find
// any JAR entry that starts with the resource prefix.
url = findInUrlClassLoader(resource, classLoader);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

findInUrlClassLoader silently uses only the first matching jar. Multiple plugin jars with the same prefix → every jar after the first is dropped, no log. Minimum: log the choice. Ideally merge across all matches, or fail loudly when more than one matches.

Also assumes URLClassLoader, not Flink's ChildFirstClassLoader — see the nested-jar thread on materialize.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think merge across all matches makes sense.


@JsonCreator
public Skills(@JsonProperty("paths") List<String> paths) {
public Skills(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Source-incompatible constructor change. Wire compat is fine; but new Skills(List.of("/data/skills")) no longer compiles. Either keep a @Deprecated 1-arg constructor that forwards, or call it out in the 0.3.0 release notes.

If Skills later migrates to a single sources field (see the discriminator thread), this signature changes again — better to break once.

org.apache.flink.agents.api.resource.ResourceContext
.fromGetResource((n, t) -> null));
assertEquals(java.util.List.of("/tmp/skill-d"), merged.getPaths());
assertEquals(List.of("/tmp/skill-d"), merged.getPaths());
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge-coverage gap. AgentPlan.addSkills now merges three LinkedHashSets but the test only asserts on getPaths(). Add cases for overlapping urls, overlapping classpathResources, and a mixed three-source case asserting all three end up in the merged result.

Comment thread python/flink_agents/api/skills.py Outdated
return cls(urls=list(urls))

@classmethod
def from_package(cls, package: str, resource: str) -> Skills:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API asymmetry. Java fromClasspath(String...) is varargs; Python from_package(package, resource) is single-pair, so two packages require two @skills functions.

Make it symmetric: def from_package(cls, *pairs: Tuple[str, str]). See also the cross-language drop thread on the packages field.

wenjin272 added a commit to wenjin272/flink-agents that referenced this pull request May 14, 2026
…RL prefix.

Merge the two-pass zip-slip-validate-then-extract into one pass, and
extract the magic 4 in extractClasspathFromJar as JAR_URL_PREFIX_LEN.
Addresses review comments on PR apache#655.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
wenjin272 added a commit to wenjin272/flink-agents that referenced this pull request May 14, 2026
#1.

Replaces Skills's parallel paths/urls/classpathResources/packages fields
with a single sources: List<SkillSourceSpec> + a scheme-keyed registry.
Reduces "add a new source" from 5 touchpoints to 1, also closes review
apache#3, apache#13, apache#16, apache#17 along the way. Java/Python in lockstep; pre-0.3.0
wire-format break, no deprecation shims.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
wenjin272 added a commit to wenjin272/flink-agents that referenced this pull request May 14, 2026
…inator + registry.

Skills now carries a single sources: List<SkillSourceSpec> field instead
of paths / urls / classpathResources (Java) and paths / urls / packages
(Python). Each spec is {scheme, params}; loading dispatches through a
scheme-keyed SkillSourceRegistry. Adding a new source = one register
call plus one SkillRepository, rather than the previous five parallel
touchpoints (Skills field + factory + repo class + SkillManager loop +
AgentPlan merge branch).

Java registers local / url / classpath; Python registers local / url /
package. Cross-language unsupported schemes fail loud at load time
(replaces the previous silent field drop).

Also picks up:
* from_package on Python becomes varargs (closes review apache#17).
* AgentPlanDeclareSkillsTest covers url / classpath / mixed-scheme dedup
  (closes review apache#16).

Addresses PR apache#655 review #1; one-shot wire-format break before 0.3.0
ships, no deprecation shims (decision per design spec).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
wenjin272 added a commit to wenjin272/flink-agents that referenced this pull request May 14, 2026
…emp-dir leak.

Per PR apache#655 review apache#10: every URL / classpath / zip materialization
registered a JVM shutdown hook that survived past the owning SkillManager,
so Flink failover (task redeployed but JVM not exiting) leaked hooks and
temp dirs monotonically.

Fix is a close-on-operator-close chain that mirrors the pattern already
used by other Resources:
* SkillMaterializer.extractZipSafely / extractClasspathFromJar now return
  a Materialized AutoCloseable handle holding the temp dir + hook; close()
  is idempotent and tolerates a JVM already in shutdown.
* SkillRepository extends AutoCloseable (default no-op); the repo impls
  that own a Materialized close it.
* SkillManager implements AutoCloseable, closing every owned repo
  (identity-dedup since multiple skill names can share a repo).
* ResourceContextImpl implements AutoCloseable, closing its lazily-cached
  SkillManager; ensureSkillManager() now closes the previous instance on
  config swap.
* Java aligned with Python: ResourceCache gains setResourceContext +
  reuses the injected ctx in getResource (replacing the per-call
  new ResourceContextImpl), and close() cascades to ctx -> SkillManager.
  ActionExecutionOperator injects one ctx; the two ctx instances at
  lines 683 / 759 (and the one in PythonMCPResourceDiscovery) now share
  the cache's ctx.
* Python mirrors the same shape: Materialized class with
  atexit.unregister-able cleanup; SkillRepository.close default no-op;
  SkillManager close + context-manager; ResourceContextImpl.close;
  ResourceCache.close cascades to ctx.
* PackageSkillRepository also restructures its as_file atexit lambda
  into a bound method so close() can unregister it.

Tests: SkillMaterializerTest gains closeRemovesTempDirAndDeregistersHook
and borrowedMaterializedDoesNotRemoveDir; SkillManagerTest gains
closeReleasesUrlRepoTempDir; ResourceCacheTest gains
testGetResourceWithoutContextThrows + the existing two cases wire up the
context via a new helper. Python: test_materialize.py rewritten around
Materialized; test_manager.py adds test_close_releases_url_repo_temp_dir.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
wenjin272 added a commit to wenjin272/flink-agents that referenced this pull request May 14, 2026
Previously ClasspathSkillRepository.findInUrlClassLoader returned the
first matching JAR and silently dropped the rest. When a deployment had
multiple plugin JARs each contributing skills under the same prefix
(plugin-a.jar -> skills/github, plugin-b.jar -> skills/email, ...),
only one plugin's skills loaded with no log.

Fix:
* findAllMatches replaces findInUrlClassLoader. Returns the full list
  of distinct URLs from ClassLoader.getResources(...) plus the
  URLClassLoader-getURLs() fallback scan (kept for JARs without
  explicit directory entries).
* SkillMaterializer gains extractClasspathFromJars(List<URL>, prefix):
  merges entries from every JAR into a single temp dir + single
  shutdown hook. Same-path collisions log WARN and last-write-wins.
  extractClasspathFromJar(URL, prefix) becomes a thin wrapper.
* ClasspathSkillRepository.materialize dispatches by URL protocol:
  - all jar: -> extractClasspathFromJars (merged)
  - file: only -> existing single-source path (borrowed dir / zip)
  - mixed -> WARN, file URL wins (rare; multi-classpath-root case)
  Multi-jar merge logs INFO; mixed/file-multi logs WARN.

Tests:
* SkillMaterializerTest: extractClasspathFromJarsMergesEntries,
  extractClasspathFromJarsLastWriteWinsOnCollision.
* ClasspathSkillRepositoryTest: loadFromMultipleJarsMergesSkills,
  loadFromMultipleJarsWithCollisionLastWins.
* Existing single-jar / directory / missing tests unchanged and pass.

Addresses PR apache#655 review apache#12.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
wenjin272 added a commit to wenjin272/flink-agents that referenced this pull request May 14, 2026
…ame override.

Addresses PR apache#655 review apache#6 and apache#11 together (their fixes are coupled —
apache#11's "WARN with both origins" depends on apache#6 making origin queryable).

* New SkillOrigin (Java + Python) carries the scheme + location of the
  source that produced a skill. scheme mirrors the SkillSourceSpec
  scheme; location is a human-readable identifier (path / URL /
  classpath resource name / package+resource).
* AgentSkill gains a nullable `origin` field + setter. SkillManager
  attaches it during registerRepo (origin built from the
  SkillSourceSpec the repo came from).
* registerRepo logs WARN when a put replaces an existing skill of the
  same name, naming both the new origin and the previous one — closing
  the silent-override path from review apache#11.
* SkillManager's parallel `repos` map is left in place; it exists for
  getSkillDir(s) / resolveResourcePath (filesystem-backed paths), not
  for origin, and is orthogonal to this change. Review #2's
  composition refactor would clean it up.
* loadedAt (suggested in review apache#6) intentionally omitted — current
  loadAll constructs all skills in one synchronous batch so timestamps
  are uninformative; revisit when hot-reload (review apache#5) lands.

Tests:
* SkillOriginTest: toString format + equality.
* SkillManagerTest: origin attached after load; duplicate name —
  last-write-wins and the surviving skill carries the second origin
  (the WARN log itself isn't asserted; verified via origin check).
* Python mirrors.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@wenjin272
Copy link
Copy Markdown
Collaborator Author

Hi, @wzhero1, Ty for your thorough review.

I fixed some design issues and bugs, including

  • Skills rewritten as a single discriminated sources: List<SkillSourceSpec(scheme, params)> + SkillSourceRegistry. Uniform Java/Python wire format.
  • URL / Classpath / Package / FileSystem repos are now sibling SkillRepository impls composing a shared SkillDirectoryReader. No more inheritance-as-composition.
  • SkillOrigin(scheme, location) attached to every AgentSkill at registration.
  • SkillRepository extends AutoCloseable; SkillManager.close() cascades through ResourceContextImpl on operator close (failover-safe).
  • Duplicate skill names now LOG.warn with both origins; last-write-wins kept.
  • All matching classpath JARs merged via extractClasspathFromJars; same-path collisions WARN.
  • Single-pass zip extract; JAR_URL_PREFIX_LEN named constant.

The remaining issues related to robustness and security will be iterated on in subsequent phases.

wenjin272 added a commit to wenjin272/flink-agents that referenced this pull request May 14, 2026
…RL prefix.

Merge the two-pass zip-slip-validate-then-extract into one pass, and
extract the magic 4 in extractClasspathFromJar as JAR_URL_PREFIX_LEN.
Addresses review comments on PR apache#655.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
wenjin272 added a commit to wenjin272/flink-agents that referenced this pull request May 14, 2026
…inator + registry.

Skills now carries a single sources: List<SkillSourceSpec> field instead
of paths / urls / classpathResources (Java) and paths / urls / packages
(Python). Each spec is {scheme, params}; loading dispatches through a
scheme-keyed SkillSourceRegistry. Adding a new source = one register
call plus one SkillRepository, rather than the previous five parallel
touchpoints (Skills field + factory + repo class + SkillManager loop +
AgentPlan merge branch).

Java registers local / url / classpath; Python registers local / url /
package. Cross-language unsupported schemes fail loud at load time
(replaces the previous silent field drop).

Also picks up:
* from_package on Python becomes varargs (closes review apache#17).
* AgentPlanDeclareSkillsTest covers url / classpath / mixed-scheme dedup
  (closes review apache#16).

Addresses PR apache#655 review #1; one-shot wire-format break before 0.3.0
ships, no deprecation shims (decision per design spec).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
wenjin272 added a commit to wenjin272/flink-agents that referenced this pull request May 14, 2026
…emp-dir leak.

Per PR apache#655 review apache#10: every URL / classpath / zip materialization
registered a JVM shutdown hook that survived past the owning SkillManager,
so Flink failover (task redeployed but JVM not exiting) leaked hooks and
temp dirs monotonically.

Fix is a close-on-operator-close chain that mirrors the pattern already
used by other Resources:
* SkillMaterializer.extractZipSafely / extractClasspathFromJar now return
  a Materialized AutoCloseable handle holding the temp dir + hook; close()
  is idempotent and tolerates a JVM already in shutdown.
* SkillRepository extends AutoCloseable (default no-op); the repo impls
  that own a Materialized close it.
* SkillManager implements AutoCloseable, closing every owned repo
  (identity-dedup since multiple skill names can share a repo).
* ResourceContextImpl implements AutoCloseable, closing its lazily-cached
  SkillManager; ensureSkillManager() now closes the previous instance on
  config swap.
* Java aligned with Python: ResourceCache gains setResourceContext +
  reuses the injected ctx in getResource (replacing the per-call
  new ResourceContextImpl), and close() cascades to ctx -> SkillManager.
  ActionExecutionOperator injects one ctx; the two ctx instances at
  lines 683 / 759 (and the one in PythonMCPResourceDiscovery) now share
  the cache's ctx.
* Python mirrors the same shape: Materialized class with
  atexit.unregister-able cleanup; SkillRepository.close default no-op;
  SkillManager close + context-manager; ResourceContextImpl.close;
  ResourceCache.close cascades to ctx.
* PackageSkillRepository also restructures its as_file atexit lambda
  into a bound method so close() can unregister it.

Tests: SkillMaterializerTest gains closeRemovesTempDirAndDeregistersHook
and borrowedMaterializedDoesNotRemoveDir; SkillManagerTest gains
closeReleasesUrlRepoTempDir; ResourceCacheTest gains
testGetResourceWithoutContextThrows + the existing two cases wire up the
context via a new helper. Python: test_materialize.py rewritten around
Materialized; test_manager.py adds test_close_releases_url_repo_temp_dir.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

fix rebase conflict
wenjin272 added a commit to wenjin272/flink-agents that referenced this pull request May 14, 2026
Previously ClasspathSkillRepository.findInUrlClassLoader returned the
first matching JAR and silently dropped the rest. When a deployment had
multiple plugin JARs each contributing skills under the same prefix
(plugin-a.jar -> skills/github, plugin-b.jar -> skills/email, ...),
only one plugin's skills loaded with no log.

Fix:
* findAllMatches replaces findInUrlClassLoader. Returns the full list
  of distinct URLs from ClassLoader.getResources(...) plus the
  URLClassLoader-getURLs() fallback scan (kept for JARs without
  explicit directory entries).
* SkillMaterializer gains extractClasspathFromJars(List<URL>, prefix):
  merges entries from every JAR into a single temp dir + single
  shutdown hook. Same-path collisions log WARN and last-write-wins.
  extractClasspathFromJar(URL, prefix) becomes a thin wrapper.
* ClasspathSkillRepository.materialize dispatches by URL protocol:
  - all jar: -> extractClasspathFromJars (merged)
  - file: only -> existing single-source path (borrowed dir / zip)
  - mixed -> WARN, file URL wins (rare; multi-classpath-root case)
  Multi-jar merge logs INFO; mixed/file-multi logs WARN.

Tests:
* SkillMaterializerTest: extractClasspathFromJarsMergesEntries,
  extractClasspathFromJarsLastWriteWinsOnCollision.
* ClasspathSkillRepositoryTest: loadFromMultipleJarsMergesSkills,
  loadFromMultipleJarsWithCollisionLastWins.
* Existing single-jar / directory / missing tests unchanged and pass.

Addresses PR apache#655 review apache#12.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
wenjin272 added a commit to wenjin272/flink-agents that referenced this pull request May 14, 2026
…ame override.

Addresses PR apache#655 review apache#6 and apache#11 together (their fixes are coupled —
apache#11's "WARN with both origins" depends on apache#6 making origin queryable).

* New SkillOrigin (Java + Python) carries the scheme + location of the
  source that produced a skill. scheme mirrors the SkillSourceSpec
  scheme; location is a human-readable identifier (path / URL /
  classpath resource name / package+resource).
* AgentSkill gains a nullable `origin` field + setter. SkillManager
  attaches it during registerRepo (origin built from the
  SkillSourceSpec the repo came from).
* registerRepo logs WARN when a put replaces an existing skill of the
  same name, naming both the new origin and the previous one — closing
  the silent-override path from review apache#11.
* SkillManager's parallel `repos` map is left in place; it exists for
  getSkillDir(s) / resolveResourcePath (filesystem-backed paths), not
  for origin, and is orthogonal to this change. Review #2's
  composition refactor would clean it up.
* loadedAt (suggested in review apache#6) intentionally omitted — current
  loadAll constructs all skills in one synchronous batch so timestamps
  are uninformative; revisit when hot-reload (review apache#5) lands.

Tests:
* SkillOriginTest: toString format + equality.
* SkillManagerTest: origin attached after load; duplicate name —
  last-write-wins and the surviving skill carries the second origin
  (the WARN log itself isn't asserted; verified via origin check).
* Python mirrors.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@wenjin272 wenjin272 requested a review from wzhero1 May 14, 2026 16:49
wzhero1

This comment was marked as duplicate.

Copy link
Copy Markdown
Collaborator

@wzhero1 wzhero1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Round-2 follow-up review against the new commits (485573ad6d58d8). Per-comment scope: lifecycle / cross-language consistency / API hygiene gaps left after the prior round, plus one architectural smell carried into the registry refactor. None of these block merge individually; the constructor-leak (#5) and the non-deterministic merge order (#7) are the most worth addressing this PR.

Comment thread runtime/src/main/java/org/apache/flink/agents/runtime/skill/SkillManager.java Outdated
@Nullable private volatile Map<String, String> resources;
@Nullable private Supplier<Map<String, String>> resourceLoader;
private volatile boolean activated;
@Nullable private volatile SkillOrigin origin;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java/Python serialization surface for origin is asymmetric.

Python's AgentSkill extends BaseModel, so origin is in default model_dump() output. Java's AgentSkill is a POJO with no Jackson annotation — any future debug/checkpoint/Pemja serialization path silently drops it on Java while Python emits it. Same class of cross-language drop A3 fixed at the Skills envelope.

Add @JsonProperty("origin") + @JsonInclude(NON_NULL). Near-zero cost now; cost grows once AgentSkill travels on the wire.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a closer look and decided to skip this for now. Grep'd every AgentSkill reference in the repo (Java + Python) — no serialization path uses it today, and Jackson's
default bean introspection would emit getOrigin() as "origin" automatically even without the annotation, so the "silently drops" failure mode doesn't actually fire under
the project's default ObjectMapper.

Also worth noting: I ran Python's model_dump() and it emits "origin": null by default (pydantic v2 doesn't drop None fields), so @JsonInclude(NON_NULL) would have made
the two sides byte-asymmetric. I'd rather design the schema (NON_NULL vs null, resources handling, @JsonCreator) when there's an actual wire path with concrete
requirements. Happy to revisit if you have a specific path in mind.

Comment thread runtime/src/main/java/org/apache/flink/agents/runtime/skill/SkillManager.java Outdated
Comment thread plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java Outdated
wenjin272 added a commit to wenjin272/flink-agents that referenced this pull request May 19, 2026
…RL prefix.

Merge the two-pass zip-slip-validate-then-extract into one pass, and
extract the magic 4 in extractClasspathFromJar as JAR_URL_PREFIX_LEN.
Addresses review comments on PR apache#655.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
wenjin272 added a commit to wenjin272/flink-agents that referenced this pull request May 19, 2026
…inator + registry.

Skills now carries a single sources: List<SkillSourceSpec> field instead
of paths / urls / classpathResources (Java) and paths / urls / packages
(Python). Each spec is {scheme, params}; loading dispatches through a
scheme-keyed SkillSourceRegistry. Adding a new source = one register
call plus one SkillRepository, rather than the previous five parallel
touchpoints (Skills field + factory + repo class + SkillManager loop +
AgentPlan merge branch).

Java registers local / url / classpath; Python registers local / url /
package. Cross-language unsupported schemes fail loud at load time
(replaces the previous silent field drop).

Also picks up:
* from_package on Python becomes varargs (closes review apache#17).
* AgentPlanDeclareSkillsTest covers url / classpath / mixed-scheme dedup
  (closes review apache#16).

Addresses PR apache#655 review #1; one-shot wire-format break before 0.3.0
ships, no deprecation shims (decision per design spec).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
wenjin272 added a commit to wenjin272/flink-agents that referenced this pull request May 19, 2026
…emp-dir leak.

Per PR apache#655 review apache#10: every URL / classpath / zip materialization
registered a JVM shutdown hook that survived past the owning SkillManager,
so Flink failover (task redeployed but JVM not exiting) leaked hooks and
temp dirs monotonically.

Fix is a close-on-operator-close chain that mirrors the pattern already
used by other Resources:
* SkillMaterializer.extractZipSafely / extractClasspathFromJar now return
  a Materialized AutoCloseable handle holding the temp dir + hook; close()
  is idempotent and tolerates a JVM already in shutdown.
* SkillRepository extends AutoCloseable (default no-op); the repo impls
  that own a Materialized close it.
* SkillManager implements AutoCloseable, closing every owned repo
  (identity-dedup since multiple skill names can share a repo).
* ResourceContextImpl implements AutoCloseable, closing its lazily-cached
  SkillManager; ensureSkillManager() now closes the previous instance on
  config swap.
* Java aligned with Python: ResourceCache gains setResourceContext +
  reuses the injected ctx in getResource (replacing the per-call
  new ResourceContextImpl), and close() cascades to ctx -> SkillManager.
  ActionExecutionOperator injects one ctx; the two ctx instances at
  lines 683 / 759 (and the one in PythonMCPResourceDiscovery) now share
  the cache's ctx.
* Python mirrors the same shape: Materialized class with
  atexit.unregister-able cleanup; SkillRepository.close default no-op;
  SkillManager close + context-manager; ResourceContextImpl.close;
  ResourceCache.close cascades to ctx.
* PackageSkillRepository also restructures its as_file atexit lambda
  into a bound method so close() can unregister it.

Tests: SkillMaterializerTest gains closeRemovesTempDirAndDeregistersHook
and borrowedMaterializedDoesNotRemoveDir; SkillManagerTest gains
closeReleasesUrlRepoTempDir; ResourceCacheTest gains
testGetResourceWithoutContextThrows + the existing two cases wire up the
context via a new helper. Python: test_materialize.py rewritten around
Materialized; test_manager.py adds test_close_releases_url_repo_temp_dir.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

fix rebase conflict
wenjin272 added a commit to wenjin272/flink-agents that referenced this pull request May 19, 2026
Previously ClasspathSkillRepository.findInUrlClassLoader returned the
first matching JAR and silently dropped the rest. When a deployment had
multiple plugin JARs each contributing skills under the same prefix
(plugin-a.jar -> skills/github, plugin-b.jar -> skills/email, ...),
only one plugin's skills loaded with no log.

Fix:
* findAllMatches replaces findInUrlClassLoader. Returns the full list
  of distinct URLs from ClassLoader.getResources(...) plus the
  URLClassLoader-getURLs() fallback scan (kept for JARs without
  explicit directory entries).
* SkillMaterializer gains extractClasspathFromJars(List<URL>, prefix):
  merges entries from every JAR into a single temp dir + single
  shutdown hook. Same-path collisions log WARN and last-write-wins.
  extractClasspathFromJar(URL, prefix) becomes a thin wrapper.
* ClasspathSkillRepository.materialize dispatches by URL protocol:
  - all jar: -> extractClasspathFromJars (merged)
  - file: only -> existing single-source path (borrowed dir / zip)
  - mixed -> WARN, file URL wins (rare; multi-classpath-root case)
  Multi-jar merge logs INFO; mixed/file-multi logs WARN.

Tests:
* SkillMaterializerTest: extractClasspathFromJarsMergesEntries,
  extractClasspathFromJarsLastWriteWinsOnCollision.
* ClasspathSkillRepositoryTest: loadFromMultipleJarsMergesSkills,
  loadFromMultipleJarsWithCollisionLastWins.
* Existing single-jar / directory / missing tests unchanged and pass.

Addresses PR apache#655 review apache#12.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@wenjin272 wenjin272 force-pushed the skill-repo-extending branch from d6d58d8 to 0c892b5 Compare May 19, 2026 08:26
wenjin272 added a commit to wenjin272/flink-agents that referenced this pull request May 19, 2026
…ame override.

Addresses PR apache#655 review apache#6 and apache#11 together (their fixes are coupled —
apache#11's "WARN with both origins" depends on apache#6 making origin queryable).

* New SkillOrigin (Java + Python) carries the scheme + location of the
  source that produced a skill. scheme mirrors the SkillSourceSpec
  scheme; location is a human-readable identifier (path / URL /
  classpath resource name / package+resource).
* AgentSkill gains a nullable `origin` field + setter. SkillManager
  attaches it during registerRepo (origin built from the
  SkillSourceSpec the repo came from).
* registerRepo logs WARN when a put replaces an existing skill of the
  same name, naming both the new origin and the previous one — closing
  the silent-override path from review apache#11.
* SkillManager's parallel `repos` map is left in place; it exists for
  getSkillDir(s) / resolveResourcePath (filesystem-backed paths), not
  for origin, and is orthogonal to this change. Review #2's
  composition refactor would clean it up.
* loadedAt (suggested in review apache#6) intentionally omitted — current
  loadAll constructs all skills in one synchronous batch so timestamps
  are uninformative; revisit when hot-reload (review apache#5) lands.

Tests:
* SkillOriginTest: toString format + equality.
* SkillManagerTest: origin attached after load; duplicate name —
  last-write-wins and the surviving skill carries the second origin
  (the WARN log itself isn't asserted; verified via origin check).
* Python mirrors.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@wenjin272 wenjin272 force-pushed the skill-repo-extending branch from 0c892b5 to 0db2cc8 Compare May 19, 2026 08:35
@wenjin272
Copy link
Copy Markdown
Collaborator Author

wenjin272 commented May 19, 2026

Hi, @wzhero1. Ty for the careful review.

I have updated and addressed the second round comments:

  • owned rename + 5× delegating boilerplate: the three repos collapsed into AbstractMaterializedSkillRepository (Java) / MaterializedSkillRepository (Python); the
    field becomes materialization on the base. Python's PackageSkillRepository simplified to own its tempdir (no more as_file ctx + atexit two-step). — 68dc94b

  • originOf scheme ladder: moved onto SkillSourceHandler.describeLocation; built-ins register both functions together. Dead case "package" is gone. — 85caed1

  • Partial-load repo leak + close() swallow: constructor closes already-registered repos before propagating; close cascade aligned with ResourceCache.close()
    (first-exception rethrow + suppressed). — 47360d2

  • @Skills merge non-determinism: addSkills sorts keys lexically before merging; mixed-scheme test re-asserts as List to lock the order. — dcab835

  • AgentSkill JSON asymmetry: not taking — see inline reply.

Six of seven taken; cross-language symmetry preserved on every fix.

try:
resources[relative_path] = file_path.read_text()
except UnicodeDecodeError:
resources[relative_path] = f"base64: {file_path.read_bytes()}"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch is not base64-encoding — binary resources are silently corrupted.

resources[relative_path] = f"base64: {file_path.read_bytes()}"

The f-string returns the bytes repr (literal "base64: b'\\x89PNG...'"), not encoded data. base64.b64decode(value.removeprefix("base64:")) will raise on any real payload. Java sibling uses Base64.getEncoder().encodeToString(...), so the cross-language contract diverges silently and every binary resource read via AgentSkill.get_resource() on Python is unusable.

Fix: "base64:" + base64.b64encode(file_path.read_bytes()).decode("ascii"). Worth adding a PNG round-trip test through both languages — none exists today.

self._skills[skill.name] = skill
self._repos[skill.name] = repo
raise RuntimeError(msg) from e
self._register_repo(repo, _origin_of(spec))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial-load leak: a failed source leaves earlier-opened repos uncleaned.

Java's loadAll() now wraps each open() in try/catch and closes already-opened repos on failure (round-2 follow-up). Python here still just re-raises — if source #3 of 5 fails, sources #1 and #2 keep their extracted temp dirs and atexit handlers for the interpreter's lifetime. In a long-lived TaskManager Python worker every failed reload leaks.

Same try/finally + first-exception pattern as the Java side: collect opened repos in a list, close them on the failure path before re-raising.

origin,
previous.getOrigin() == null ? "<unknown>" : previous.getOrigin());
}
repos.put(skillName, repo);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate skill name overwrites the repo-index entry; the displaced repo is orphaned from closeRepos(). Bug exists in both languages.

repos.put(skillName, repo);   // overwrites previous repo ref

closeRepos() iterates repos.values(), so when two sources contribute skill "foo" the displaced repo is no longer reachable — its temp dir survives until JVM exit, only the shutdown hook ever fires. The recently-added close-cascade plumbing can't help because the orphan is never visited. Python mirrors the same shape at python/.../skill_manager.py:177.

Fix: track opened repos in a separate List<SkillRepository> and iterate that in closeRepos() (dedup with IdentityHashMap).

@wenjin272
Copy link
Copy Markdown
Collaborator Author

Hi, @wzhero1, thanks for round three. All three taken:

  • Python base64 silently corrupting binary resources: spot on — f-string was inlining the bytes repr, not real base64. Switched to
    base64.b64encode(...).decode("ascii"). The single-line startsWith("base64: ") assertion is exactly how this hid for a cycle; both Java and Python tests now b64-decode the
    value and assert the round-trip against a PNG signature + non-UTF-8 byte sequence. — e795648
  • Python partial-load leak: Python _load_skills now closes already-opened repos before re-raising, mirroring the Java fix from round two. — aa7c978
  • Duplicate-name repo orphan in close(): confirmed on both sides — the skill-name → repo map drops the displaced repo's reference, so iterating it on close orphaned the
    earlier repo. Added an openedRepos/_opened_repos list (identity / id() dedup) and changed close to iterate that. New test on each side: two sources contributing the
    same skill name, both repos must be closed. — 8b10d3a

@wzhero1
Copy link
Copy Markdown
Collaborator

wzhero1 commented May 20, 2026

approved

@wenjin272 wenjin272 force-pushed the skill-repo-extending branch 2 times, most recently from 27d45e1 to dbebf0e Compare May 20, 2026 08:27
@wenjin272 wenjin272 force-pushed the skill-repo-extending branch from dbebf0e to daa6c7a Compare May 20, 2026 09:40
@wenjin272 wenjin272 merged commit 2fa1a3d into apache:main May 20, 2026
44 of 46 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-needed Your PR changes impact docs. fixVersion/0.3.0 The feature or bug should be implemented/fixed in the 0.3.0 version. priority/major Default priority of the PR or issue.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants