diff --git a/apps/sim/hooks/queries/kb/connectors.ts b/apps/sim/hooks/queries/kb/connectors.ts
index ae53f271640..c8a528cccc7 100644
--- a/apps/sim/hooks/queries/kb/connectors.ts
+++ b/apps/sim/hooks/queries/kb/connectors.ts
@@ -12,7 +12,7 @@ export interface ConnectorData {
sourceConfig: Record
syncMode: string
syncIntervalMinutes: number
- status: 'active' | 'paused' | 'syncing' | 'error'
+ status: 'active' | 'paused' | 'syncing' | 'error' | 'disabled'
lastSyncAt: string | null
lastSyncError: string | null
lastSyncDocCount: number | null
diff --git a/apps/sim/lib/knowledge/connectors/sync-engine.ts b/apps/sim/lib/knowledge/connectors/sync-engine.ts
index 0b545516a3b..24ab9210f5e 100644
--- a/apps/sim/lib/knowledge/connectors/sync-engine.ts
+++ b/apps/sim/lib/knowledge/connectors/sync-engine.ts
@@ -46,6 +46,7 @@ const MAX_PAGES = 500
const MAX_SAFE_TITLE_LENGTH = 200
const STALE_PROCESSING_MINUTES = 45
const RETRY_WINDOW_DAYS = 7
+const MAX_CONSECUTIVE_FAILURES = 10
/** Sanitizes a document title for use in S3 storage keys. */
function sanitizeStorageTitle(title: string): string {
@@ -230,7 +231,7 @@ async function resolveAccessToken(
connector: { credentialId: string | null; encryptedApiKey: string | null },
connectorConfig: { auth: ConnectorAuthConfig },
userId: string
-): Promise {
+): Promise {
if (connectorConfig.auth.mode === 'apiKey') {
if (!connector.encryptedApiKey) {
throw new Error('API key connector is missing encrypted API key')
@@ -243,11 +244,22 @@ async function resolveAccessToken(
throw new Error('OAuth connector is missing credential ID')
}
- return refreshAccessTokenIfNeeded(
- connector.credentialId,
- userId,
- `sync-${connector.credentialId}`
- )
+ const requestId = `sync-${connector.credentialId}`
+ const token = await refreshAccessTokenIfNeeded(connector.credentialId, userId, requestId)
+
+ if (!token) {
+ logger.error(`[${requestId}] refreshAccessTokenIfNeeded returned null`, {
+ credentialId: connector.credentialId,
+ userId,
+ authMode: connectorConfig.auth.mode,
+ authProvider: connectorConfig.auth.provider,
+ })
+ throw new Error(
+ `Failed to obtain access token for credential ${connector.credentialId} (provider: ${connectorConfig.auth.provider})`
+ )
+ }
+
+ return token
}
/**
@@ -305,12 +317,6 @@ export async function executeSync(
const userId = kbRows[0].userId
const sourceConfig = connector.sourceConfig as Record
- let accessToken = await resolveAccessToken(connector, connectorConfig, userId)
-
- if (!accessToken) {
- throw new Error('Failed to obtain access token')
- }
-
const lockResult = await db
.update(knowledgeConnector)
.set({ status: 'syncing', updatedAt: new Date() })
@@ -341,6 +347,8 @@ export async function executeSync(
let syncExitedCleanly = false
try {
+ let accessToken = await resolveAccessToken(connector, connectorConfig, userId)
+
const externalDocs: ExternalDocument[] = []
let cursor: string | undefined
let hasMore = true
@@ -357,8 +365,7 @@ export async function executeSync(
for (let pageNum = 0; hasMore && pageNum < MAX_PAGES; pageNum++) {
if (pageNum > 0 && connectorConfig.auth.mode === 'oauth') {
- const refreshed = await resolveAccessToken(connector, connectorConfig, userId)
- if (refreshed) accessToken = refreshed
+ accessToken = await resolveAccessToken(connector, connectorConfig, userId)
}
const page = await connectorConfig.listDocuments(
@@ -496,8 +503,7 @@ export async function executeSync(
if (deferredOps.length > 0) {
if (connectorConfig.auth.mode === 'oauth') {
- const refreshed = await resolveAccessToken(connector, connectorConfig, userId)
- if (refreshed) accessToken = refreshed
+ accessToken = await resolveAccessToken(connector, connectorConfig, userId)
}
const hydrated = await Promise.allSettled(
@@ -789,15 +795,25 @@ export async function executeSync(
const now = new Date()
const failures = (connector.consecutiveFailures ?? 0) + 1
+ const disabled = failures >= MAX_CONSECUTIVE_FAILURES
const backoffMinutes = Math.min(failures * 30, 1440)
- const nextSync = new Date(now.getTime() + backoffMinutes * 60 * 1000)
+ const nextSync = disabled ? null : new Date(now.getTime() + backoffMinutes * 60 * 1000)
+
+ if (disabled) {
+ logger.warn('Connector disabled after repeated failures', {
+ connectorId,
+ consecutiveFailures: failures,
+ })
+ }
await db
.update(knowledgeConnector)
.set({
- status: 'error',
+ status: disabled ? 'disabled' : 'error',
lastSyncAt: now,
- lastSyncError: errorMessage,
+ lastSyncError: disabled
+ ? 'Connector disabled after repeated sync failures. Please reconnect.'
+ : errorMessage,
nextSyncAt: nextSync,
consecutiveFailures: failures,
updatedAt: now,