Azure: Add scheduled refresh for storage credentials held by ADLSFileIO#15869
Azure: Add scheduled refresh for storage credentials held by ADLSFileIO#15869sachinnn99 wants to merge 1 commit into
Conversation
| synchronized (this) { | ||
| if (clientCache == null) { | ||
| clientCache = Maps.newConcurrentMap(); | ||
| scheduleCredentialRefresh(); |
There was a problem hiding this comment.
I noticed a potential race condition here with the background timers. When the credentials refresh, the cache is nullified. The next time client(location) is called, it will rebuild the cache and blindly call scheduleCredentialRefresh() again, even if a timer was already scheduled by the background timer thread. This might lead to exponential timer threads being spawned over the life time of a long running Flink job. We probably need a check here like if (refreshFuture == null || refreshFuture.isDone()) before scheduling invoking scheduleCredentialRefresh();
DataLakeFileSystemClient client(ADLSLocation location) {
if (clientCache == null) {
synchronized (this) {
if (clientCache == null) {
clientCache = Maps.newConcurrentMap();
if (refreshFuture == null || refreshFuture.isCancelled() || refreshFuture.isDone()) {
scheduleCredentialRefresh();
}
}
}
}
String cacheKey = location.host() + "/" + location.container().orElse("");
return clientCache.computeIfAbsent(cacheKey, k -> buildClient(location));
}
| return ImmutableList.copyOf(storageCredentials); | ||
| } | ||
|
|
||
| private void scheduleCredentialRefresh() { |
There was a problem hiding this comment.
What if the SPARK / Flink invoked client() before adding the credentials. In which case storageCredentials is null and storageCredentials.stream() will lead to NPE.
We need to add null check here,
if (storageCredentials == null || storageCredentials.isEmpty()) {
return;
}
|
|
||
| private void scheduleCredentialRefresh() { | ||
| storageCredentials.stream() | ||
| .flatMap(cred -> cred.config().entrySet().stream()) |
There was a problem hiding this comment.
Using flatMap and .startsWith() works, but it relies on a fuzzy match. If a future update introduces another configuration property that shares the same prefix (for example something like adls.sas-token-expires-at-ms-buffer.accountname), this filter can obtain more than 1 result. We could use credential prefix instead.
private void scheduleCredentialRefresh() {
if (storageCredentials == null || storageCredentials.isEmpty()) {
return;
}
storageCredentials.stream()
.map(
cred ->
cred.config().get(AzureProperties.ADLS_SAS_TOKEN_EXPIRES_AT_MS_PREFIX + cred.prefix()))
.filter(Objects::nonNull)
.map(expiresAtString -> java.time.Instant.ofEpochMilli(Long.parseLong(expiresAtString)))
.min(Comparator.naturalOrder())
.ifPresent(
expiresAt -> {
Instant prefetchAt = expiresAt.minus(5, java.time.temporal.ChronoUnit.MINUTES);
long delay = Duration.between(Instant.now(), prefetchAt).toMillis();
this.refreshFuture =
executorService().schedule(this::refreshStorageCredentials, delay, TimeUnit.MILLISECONDS);
});
}
| try { | ||
| List<StorageCredential> refreshed = | ||
| vendedAdlsCredentialProvider.fetchCredentials().credentials().stream() | ||
| .filter(c -> c.prefix().startsWith(ROOT_PREFIX)) |
There was a problem hiding this comment.
Great catch here to add ROOT_PREFIX!
|
|
||
| if (!refreshed.isEmpty() && !isResourceClosed.get()) { | ||
| this.storageCredentials = Lists.newArrayList(refreshed); | ||
| scheduleCredentialRefresh(); |
There was a problem hiding this comment.
I noticed that after we update storageCredentials, clientCache isn't being cleared. If its not cleared next time client invokes client() method will be handed over the same obsolete credentials. We need to invalidate the clientCache here.
synchronized (this) {
this.clientCache = null;
}
ajayvembu
left a comment
There was a problem hiding this comment.
Thanks for putting this together! I’ve been looking deeply into the ADLS credential refresh logic recently as well. I left a few comments on some edge cases regarding the Azure client cache invalidation and multithreaded timer scheduling. Hope this helps!
ad3a5a7 to
c46b646
Compare
|
Thanks for the thorough review @ajayvembu! I've addressed all your comments in the latest push: 1. Race condition on timer scheduling — Added a guard to check if (refreshFuture == null || refreshFuture.isCancelled() || refreshFuture.isDone()) {
scheduleCredentialRefresh();
}2. NPE guard on 3. Fuzzy 4. ROOT_PREFIX — Thanks! 5. |
c46b646 to
b9f3be7
Compare
b9f3be7 to
c46b646
Compare
|
@sachinnn99 |
1fbe5b8 to
2e2dc99
Compare
Closes #15852.
The
ADLSFileIOimplementation never refreshes the credentials that are held directly from the table load. The ADLS clients use aVendedAdlsCredentialProviderthat internally refreshes viaSimpleTokenCache, but those updates are not reflected back to the FileIO.If an
ADLSFileIOinstance is serialized to remote workers, the credentials may be expired triggering a thundering herd of requests to refresh immediately.This change addresses this problem by proactively updating the credentials in the FileIO so that only valid credentials are propagated to remote clients.
This applies the same changes as was already done for S3FileIO in #15678 and GCSFileIO in #15696.