Skip to content

Azure: Add scheduled refresh for storage credentials held by ADLSFileIO#15869

Open
sachinnn99 wants to merge 1 commit into
apache:mainfrom
sachinnn99:adls-credential-refresh
Open

Azure: Add scheduled refresh for storage credentials held by ADLSFileIO#15869
sachinnn99 wants to merge 1 commit into
apache:mainfrom
sachinnn99:adls-credential-refresh

Conversation

@sachinnn99
Copy link
Copy Markdown
Contributor

@sachinnn99 sachinnn99 commented Apr 2, 2026

Closes #15852.

The ADLSFileIO implementation never refreshes the credentials that are held directly from the table load. The ADLS clients use a VendedAdlsCredentialProvider that internally refreshes via SimpleTokenCache, but those updates are not reflected back to the FileIO.

If an ADLSFileIO instance is serialized to remote workers, the credentials may be expired triggering a thundering herd of requests to refresh immediately.

This change addresses this problem by proactively updating the credentials in the FileIO so that only valid credentials are propagated to remote clients.

This applies the same changes as was already done for S3FileIO in #15678 and GCSFileIO in #15696.

synchronized (this) {
if (clientCache == null) {
clientCache = Maps.newConcurrentMap();
scheduleCredentialRefresh();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed a potential race condition here with the background timers. When the credentials refresh, the cache is nullified. The next time client(location) is called, it will rebuild the cache and blindly call scheduleCredentialRefresh() again, even if a timer was already scheduled by the background timer thread. This might lead to exponential timer threads being spawned over the life time of a long running Flink job. We probably need a check here like if (refreshFuture == null || refreshFuture.isDone()) before scheduling invoking scheduleCredentialRefresh();

DataLakeFileSystemClient client(ADLSLocation location) {
    if (clientCache == null) {
      synchronized (this) {
        if (clientCache == null) {
          clientCache = Maps.newConcurrentMap();
          if (refreshFuture == null || refreshFuture.isCancelled() || refreshFuture.isDone()) {
            scheduleCredentialRefresh();
          }
        }
      }
    }
    String cacheKey = location.host() + "/" + location.container().orElse("");
    return clientCache.computeIfAbsent(cacheKey, k -> buildClient(location));
  }

return ImmutableList.copyOf(storageCredentials);
}

private void scheduleCredentialRefresh() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the SPARK / Flink invoked client() before adding the credentials. In which case storageCredentials is null and storageCredentials.stream() will lead to NPE.
We need to add null check here,

    if (storageCredentials == null || storageCredentials.isEmpty()) {
      return;
    }


private void scheduleCredentialRefresh() {
storageCredentials.stream()
.flatMap(cred -> cred.config().entrySet().stream())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using flatMap and .startsWith() works, but it relies on a fuzzy match. If a future update introduces another configuration property that shares the same prefix (for example something like adls.sas-token-expires-at-ms-buffer.accountname), this filter can obtain more than 1 result. We could use credential prefix instead.

  private void scheduleCredentialRefresh() {
    if (storageCredentials == null || storageCredentials.isEmpty()) {
      return;
    }

    storageCredentials.stream()
            .map(
                    cred ->
                            cred.config().get(AzureProperties.ADLS_SAS_TOKEN_EXPIRES_AT_MS_PREFIX + cred.prefix()))
            .filter(Objects::nonNull)
            .map(expiresAtString -> java.time.Instant.ofEpochMilli(Long.parseLong(expiresAtString)))
            .min(Comparator.naturalOrder())
            .ifPresent(
                    expiresAt -> {
                      Instant prefetchAt = expiresAt.minus(5, java.time.temporal.ChronoUnit.MINUTES);
                      long delay = Duration.between(Instant.now(), prefetchAt).toMillis();
                      this.refreshFuture =
                              executorService().schedule(this::refreshStorageCredentials, delay, TimeUnit.MILLISECONDS);
                    });
  }

try {
List<StorageCredential> refreshed =
vendedAdlsCredentialProvider.fetchCredentials().credentials().stream()
.filter(c -> c.prefix().startsWith(ROOT_PREFIX))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch here to add ROOT_PREFIX!


if (!refreshed.isEmpty() && !isResourceClosed.get()) {
this.storageCredentials = Lists.newArrayList(refreshed);
scheduleCredentialRefresh();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that after we update storageCredentials, clientCache isn't being cleared. If its not cleared next time client invokes client() method will be handed over the same obsolete credentials. We need to invalidate the clientCache here.

synchronized (this) {
    this.clientCache = null;
}

Copy link
Copy Markdown

@ajayvembu ajayvembu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting this together! I’ve been looking deeply into the ADLS credential refresh logic recently as well. I left a few comments on some edge cases regarding the Azure client cache invalidation and multithreaded timer scheduling. Hope this helps!

@sachinnn99 sachinnn99 force-pushed the adls-credential-refresh branch 2 times, most recently from ad3a5a7 to c46b646 Compare May 2, 2026 11:40
@sachinnn99
Copy link
Copy Markdown
Contributor Author

Thanks for the thorough review @ajayvembu! I've addressed all your comments in the latest push:

1. Race condition on timer scheduling — Added a guard to check refreshFuture state before scheduling:

if (refreshFuture == null || refreshFuture.isCancelled() || refreshFuture.isDone()) {
    scheduleCredentialRefresh();
}

2. NPE guard on storageCredentials — Added a null/empty check at the top of scheduleCredentialRefresh().

3. Fuzzy startsWith() match — Good catch on the fuzzy match concern. I went with a slightly different approach than suggested — cred.prefix() is the full ABFS URI (e.g., abfss://container@account1.dfs.core.windows.net/dir), not the storage account name, so using it directly as the key suffix wouldn't match the config keys (which use just the account name like adls.sas-token-expires-at-ms.account1). Instead, I extract the account via new ADLSLocation(cred.prefix()).storageAccount() for an exact key lookup, which is consistent with how VendedAdlsCredentialProvider.sasTokenFromProperties() constructs the same keys.

4. ROOT_PREFIX — Thanks!

5. clientCache invalidation after refresh — Added synchronized (this) { this.clientCache = null; } after updating storageCredentials in refreshStorageCredentials().

@sachinnn99 sachinnn99 force-pushed the adls-credential-refresh branch from c46b646 to b9f3be7 Compare May 2, 2026 22:14
@sachinnn99 sachinnn99 changed the title Azure: Add scheduled credential refresh for ADLSFileIO Azure: Add scheduled refresh for storage credentials held by ADLSFileIO May 2, 2026
@sachinnn99 sachinnn99 force-pushed the adls-credential-refresh branch from b9f3be7 to c46b646 Compare May 2, 2026 22:27
@ajayvembu
Copy link
Copy Markdown

@sachinnn99
Thanks for making it more robust. All the changes are good now.

@sachinnn99 sachinnn99 force-pushed the adls-credential-refresh branch from 1fbe5b8 to 2e2dc99 Compare May 9, 2026 15:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[feature request] Add scheduled refresh for storage credentials held by ADLSFileIO

2 participants