diff --git a/src/@types/AccessList.ts b/src/@types/AccessList.ts index 242b991d1..88b521710 100644 --- a/src/@types/AccessList.ts +++ b/src/@types/AccessList.ts @@ -4,3 +4,10 @@ export interface AccessList { [chainId: string]: string[] } + +export interface AccessListUser { + wallet: string + tokenId: number + block: number + txId: string +} diff --git a/src/@types/commands.ts b/src/@types/commands.ts index 6ed0f76f4..cbeb6977f 100644 --- a/src/@types/commands.ts +++ b/src/@types/commands.ts @@ -29,6 +29,16 @@ export interface FindPeerCommand extends Command { export interface GetP2PPeersCommand extends Command {} export interface GetP2PNetworkStatsCommand extends Command {} +export interface GetAccessListCommand extends Command { + chainId: number + contractAddress: string +} + +export interface SearchAccessListCommand extends Command { + wallet: string + chainId?: number +} + export interface SignedCommand extends Command { nonce: string signature: string diff --git a/src/components/Indexer/processor.ts b/src/components/Indexer/processor.ts index 8dbbe1d10..98f9d2723 100644 --- a/src/components/Indexer/processor.ts +++ b/src/components/Indexer/processor.ts @@ -17,6 +17,9 @@ import { ExchangeActivatedEventProcessor, ExchangeDeactivatedEventProcessor, ExchangeRateChangedEventProcessor, + NewAccessListEventProcessor, + AddressAddedEventProcessor, + AddressRemovedEventProcessor, ProcessorConstructor } from './processors/index.js' import { findEventByKey } from './utils.js' @@ -36,7 +39,10 @@ const EVENT_PROCESSOR_MAP: Record = { [EVENTS.EXCHANGE_CREATED]: ExchangeCreatedEventProcessor, [EVENTS.EXCHANGE_ACTIVATED]: ExchangeActivatedEventProcessor, [EVENTS.EXCHANGE_DEACTIVATED]: ExchangeDeactivatedEventProcessor, - [EVENTS.EXCHANGE_RATE_CHANGED]: ExchangeRateChangedEventProcessor + [EVENTS.EXCHANGE_RATE_CHANGED]: ExchangeRateChangedEventProcessor, + [EVENTS.NEW_ACCESS_LIST]: NewAccessListEventProcessor, + [EVENTS.ADDRESS_ADDED]: AddressAddedEventProcessor, + [EVENTS.ADDRESS_REMOVED]: AddressRemovedEventProcessor } const processorInstances = new Map() diff --git a/src/components/Indexer/processors/AddressAddedEventProcessor.ts b/src/components/Indexer/processors/AddressAddedEventProcessor.ts new file mode 100644 index 000000000..a14dd511e --- /dev/null +++ b/src/components/Indexer/processors/AddressAddedEventProcessor.ts @@ -0,0 +1,48 @@ +import { ethers, Signer, FallbackProvider, Interface } from 'ethers' +import { INDEXER_LOGGER } from '../../../utils/logging/common.js' +import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js' +import { BaseEventProcessor } from './BaseProcessor.js' +import AccessList from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' with { type: 'json' } + +const accessListInterface = new Interface(AccessList.abi) + +export class AddressAddedEventProcessor extends BaseEventProcessor { + async processEvent( + event: ethers.Log, + chainId: number, + signer: Signer, + provider: FallbackProvider + ): Promise { + try { + const decoded = accessListInterface.parseLog({ + topics: Array.from(event.topics), + data: event.data + }) + if (!decoded) return null + + const wallet = decoded.args[0].toString().toLowerCase() + const tokenId = Number(decoded.args[1]) + const contractAddress = event.address.toLowerCase() + + const { accessList } = await this.getDatabase() + const result = await accessList.addUser(chainId, contractAddress, { + wallet, + tokenId, + block: event.blockNumber, + txId: event.transactionHash + }) + + INDEXER_LOGGER.logMessage( + `[AddressAdded] ${wallet} (tokenId=${tokenId}) added to ${contractAddress} on chain ${chainId}` + ) + return result + } catch (err) { + INDEXER_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Error processing AddressAdded event: ${err.message}`, + true + ) + return null + } + } +} diff --git a/src/components/Indexer/processors/AddressRemovedEventProcessor.ts b/src/components/Indexer/processors/AddressRemovedEventProcessor.ts new file mode 100644 index 000000000..923c1dad1 --- /dev/null +++ b/src/components/Indexer/processors/AddressRemovedEventProcessor.ts @@ -0,0 +1,46 @@ +import { ethers, Signer, FallbackProvider, Interface } from 'ethers' +import { INDEXER_LOGGER } from '../../../utils/logging/common.js' +import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js' +import { BaseEventProcessor } from './BaseProcessor.js' +import AccessList from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' with { type: 'json' } + +const accessListInterface = new Interface(AccessList.abi) + +export class AddressRemovedEventProcessor extends BaseEventProcessor { + async processEvent( + event: ethers.Log, + chainId: number, + signer: Signer, + provider: FallbackProvider + ): Promise { + try { + const decoded = accessListInterface.parseLog({ + topics: Array.from(event.topics), + data: event.data + }) + if (!decoded) return null + + const tokenId = Number(decoded.args[0]) + const contractAddress = event.address.toLowerCase() + + const { accessList } = await this.getDatabase() + const result = await accessList.removeUserByTokenId( + chainId, + contractAddress, + tokenId + ) + + INDEXER_LOGGER.logMessage( + `[AddressRemoved] tokenId=${tokenId} removed from ${contractAddress} on chain ${chainId}` + ) + return result + } catch (err) { + INDEXER_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Error processing AddressRemoved event: ${err.message}`, + true + ) + return null + } + } +} diff --git a/src/components/Indexer/processors/NewAccessListEventProcessor.ts b/src/components/Indexer/processors/NewAccessListEventProcessor.ts new file mode 100644 index 000000000..a28ee8de5 --- /dev/null +++ b/src/components/Indexer/processors/NewAccessListEventProcessor.ts @@ -0,0 +1,74 @@ +import { ethers, Signer, FallbackProvider, Interface } from 'ethers' +import { INDEXER_LOGGER } from '../../../utils/logging/common.js' +import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js' +import { BaseEventProcessor } from './BaseProcessor.js' +import AccessListFactory from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessListFactory.sol/AccessListFactory.json' with { type: 'json' } +import AccessList from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' with { type: 'json' } + +const factoryInterface = new Interface(AccessListFactory.abi) + +export class NewAccessListEventProcessor extends BaseEventProcessor { + async processEvent( + event: ethers.Log, + chainId: number, + signer: Signer, + provider: FallbackProvider + ): Promise { + try { + const decoded = factoryInterface.parseLog({ + topics: Array.from(event.topics), + data: event.data + }) + if (!decoded) return null + + const contractAddress = decoded.args[0].toString().toLowerCase() + + let transferable = false + let name: string | undefined + let symbol: string | undefined + try { + const accessListContract = new ethers.Contract( + contractAddress, + AccessList.abi, + provider + ) + const [transferableRaw, nameRaw, symbolRaw] = await Promise.all([ + accessListContract.transferable(), + accessListContract.name(), + accessListContract.symbol() + ]) + transferable = Boolean(transferableRaw) + name = nameRaw + symbol = symbolRaw + } catch (err) { + INDEXER_LOGGER.log( + LOG_LEVELS_STR.LEVEL_WARN, + `Failed to read on-chain metadata for ${contractAddress}: ${err.message}` + ) + } + + const { accessList } = await this.getDatabase() + const result = await accessList.create( + chainId, + contractAddress, + transferable, + event.blockNumber, + event.transactionHash, + name, + symbol + ) + + INDEXER_LOGGER.logMessage( + `[NewAccessList] Indexed access list ${contractAddress} on chain ${chainId} (name=${name}, symbol=${symbol}, transferable=${transferable})` + ) + return result + } catch (err) { + INDEXER_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Error processing NewAccessList event: ${err.message}`, + true + ) + return null + } + } +} diff --git a/src/components/Indexer/processors/index.ts b/src/components/Indexer/processors/index.ts index 912ab6aa2..418adeedb 100644 --- a/src/components/Indexer/processors/index.ts +++ b/src/components/Indexer/processors/index.ts @@ -12,6 +12,9 @@ export * from './MetadataEventProcessor.js' export * from './MetadataStateEventProcessor.js' export * from './OrderReusedEventProcessor.js' export * from './OrderStartedEventProcessor.js' +export * from './NewAccessListEventProcessor.js' +export * from './AddressAddedEventProcessor.js' +export * from './AddressRemovedEventProcessor.js' export * from './BaseProcessor.js' export type ProcessorConstructor = new ( diff --git a/src/components/core/handler/accessListHandler.ts b/src/components/core/handler/accessListHandler.ts new file mode 100644 index 000000000..e99a4baa3 --- /dev/null +++ b/src/components/core/handler/accessListHandler.ts @@ -0,0 +1,84 @@ +import { CommandHandler } from './handler.js' +import { P2PCommandResponse } from '../../../@types/OceanNode.js' +import { + GetAccessListCommand, + SearchAccessListCommand +} from '../../../@types/commands.js' +import { Readable } from 'stream' +import { isAddress } from 'ethers' +import { + ValidateParams, + validateCommandParameters +} from '../../httpRoutes/validateCommands.js' +import { CORE_LOGGER } from '../../../utils/logging/common.js' + +export class GetAccessListHandler extends CommandHandler { + validate(command: GetAccessListCommand): ValidateParams { + return validateCommandParameters(command, ['chainId', 'contractAddress']) + } + + async handle(task: GetAccessListCommand): Promise { + const checks = await this.verifyParamsAndRateLimits(task) + if (checks.status.httpStatus !== 200 || checks.status.error !== null) { + return checks + } + try { + const db = await this.getOceanNode().getDatabase() + const doc = await db.accessList.retrieve( + Number(task.chainId), + String(task.contractAddress) + ) + if (!doc) { + return { + stream: null, + status: { httpStatus: 404, error: 'AccessList not found' } + } + } + return { + stream: Readable.from(JSON.stringify(doc)), + status: { httpStatus: 200 } + } + } catch (error) { + CORE_LOGGER.error(`GetAccessListHandler error: ${error.message}`) + return { + stream: null, + status: { httpStatus: 500, error: 'Unknown error: ' + error.message } + } + } + } +} + +export class SearchAccessListHandler extends CommandHandler { + validate(command: SearchAccessListCommand): ValidateParams { + return validateCommandParameters(command, ['wallet']) + } + + async handle(task: SearchAccessListCommand): Promise { + const checks = await this.verifyParamsAndRateLimits(task) + if (checks.status.httpStatus !== 200 || checks.status.error !== null) { + return checks + } + try { + const walletString = String(task.wallet) + if (!isAddress(walletString)) { + return { + stream: null, + status: { httpStatus: 400, error: 'Invalid wallet address' } + } + } + const db = await this.getOceanNode().getDatabase() + const chainId = task.chainId !== undefined ? Number(task.chainId) : undefined + const docs = await db.accessList.searchByWallet(walletString, chainId) + return { + stream: Readable.from(JSON.stringify(docs ?? [])), + status: { httpStatus: 200 } + } + } catch (error) { + CORE_LOGGER.error(`SearchAccessListHandler error: ${error.message}`) + return { + stream: null, + status: { httpStatus: 500, error: 'Unknown error: ' + error.message } + } + } + } +} diff --git a/src/components/core/handler/coreHandlersRegistry.ts b/src/components/core/handler/coreHandlersRegistry.ts index 531f7f1a9..e492a01a6 100644 --- a/src/components/core/handler/coreHandlersRegistry.ts +++ b/src/components/core/handler/coreHandlersRegistry.ts @@ -55,6 +55,7 @@ import { PersistentStorageListFilesHandler, PersistentStorageUploadFileHandler } from './persistentStorage.js' +import { GetAccessListHandler, SearchAccessListHandler } from './accessListHandler.js' export type HandlerRegistry = { handlerName: string // name of the handler @@ -199,6 +200,14 @@ export class CoreHandlersRegistry { PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE, new PersistentStorageDeleteFileHandler(node) ) + this.registerCoreHandler( + PROTOCOL_COMMANDS.GET_ACCESS_LIST, + new GetAccessListHandler(node) + ) + this.registerCoreHandler( + PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST, + new SearchAccessListHandler(node) + ) } public static getInstance( diff --git a/src/components/database/BaseDatabase.ts b/src/components/database/BaseDatabase.ts index 3fddb074f..1214e9488 100644 --- a/src/components/database/BaseDatabase.ts +++ b/src/components/database/BaseDatabase.ts @@ -1,5 +1,6 @@ import { Schema } from '.' import { OceanNodeDBConfig } from '../../@types' +import { AccessListUser } from '../../@types/AccessList.js' import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js' import { DATABASE_LOGGER } from '../../utils/logging/common.js' import { ElasticsearchSchema } from './ElasticSchemas.js' @@ -38,6 +39,34 @@ export abstract class AbstractIndexerDatabase extends AbstractDatabase { abstract delete(network: number): Promise } +export abstract class AbstractAccessListDatabase extends AbstractDatabase { + abstract create( + chainId: number, + contractAddress: string, + transferable: boolean, + block: number, + txId: string, + name?: string, + symbol?: string + ): Promise + + abstract retrieve(chainId: number, contractAddress: string): Promise + abstract addUser( + chainId: number, + contractAddress: string, + user: AccessListUser + ): Promise + + abstract removeUserByTokenId( + chainId: number, + contractAddress: string, + tokenId: number + ): Promise + + abstract searchByWallet(wallet: string, chainId?: number): Promise + abstract delete(chainId: number, contractAddress: string): Promise +} + export abstract class AbstractLogDatabase extends AbstractDatabase { abstract insertLog(logEntry: Record): Promise abstract retrieveLog(id: string): Promise | null> diff --git a/src/components/database/DatabaseFactory.ts b/src/components/database/DatabaseFactory.ts index 4d5d35603..cb6dbcb2c 100644 --- a/src/components/database/DatabaseFactory.ts +++ b/src/components/database/DatabaseFactory.ts @@ -1,5 +1,6 @@ import { OceanNodeDBConfig } from '../../@types' import { + AbstractAccessListDatabase, AbstractDdoDatabase, AbstractDdoStateDatabase, AbstractIndexerDatabase, @@ -7,6 +8,7 @@ import { AbstractOrderDatabase } from './BaseDatabase.js' import { + ElasticsearchAccessListDatabase, ElasticsearchDdoDatabase, ElasticsearchDdoStateDatabase, ElasticsearchIndexerDatabase, @@ -15,6 +17,7 @@ import { } from './ElasticSearchDatabase.js' import { typesenseSchemas } from './TypesenseSchemas.js' import { + TypesenseAccessListDatabase, TypesenseDdoDatabase, TypesenseDdoStateDatabase, TypesenseIndexerDatabase, @@ -45,7 +48,9 @@ export class DatabaseFactory { new ElasticsearchOrderDatabase(config, elasticSchemas.orderSchema), ddoState: (config: OceanNodeDBConfig) => new ElasticsearchDdoStateDatabase(config), ddoStateQuery: () => new ElasticSearchDdoStateQuery(), - metadataQuery: () => new ElasticSearchMetadataQuery() + metadataQuery: () => new ElasticSearchMetadataQuery(), + accessList: (config: OceanNodeDBConfig) => + new ElasticsearchAccessListDatabase(config) }, typesense: { ddo: (config: OceanNodeDBConfig) => @@ -59,7 +64,9 @@ export class DatabaseFactory { ddoState: (config: OceanNodeDBConfig) => new TypesenseDdoStateDatabase(config, typesenseSchemas.ddoStateSchema), ddoStateQuery: () => new TypesenseDdoStateQuery(), - metadataQuery: () => new TypesenseMetadataQuery() + metadataQuery: () => new TypesenseMetadataQuery(), + accessList: (config: OceanNodeDBConfig) => + new TypesenseAccessListDatabase(config, typesenseSchemas.accessListSchema) } } @@ -129,4 +136,10 @@ export class DatabaseFactory { static async createConfigDatabase(): Promise { return await new SQLLiteConfigDatabase() } + + static createAccessListDatabase( + config: OceanNodeDBConfig + ): Promise { + return this.createDatabase('accessList', config) + } } diff --git a/src/components/database/ElasticSchemas.ts b/src/components/database/ElasticSchemas.ts index 6f3b5b6f9..09462ca31 100644 --- a/src/components/database/ElasticSchemas.ts +++ b/src/components/database/ElasticSchemas.ts @@ -94,6 +94,7 @@ export type ElasticsearchSchemas = { logSchemas: ElasticsearchSchema orderSchema: ElasticsearchSchema ddoStateSchema: ElasticsearchSchema + accessListSchema: ElasticsearchSchema } const ddoSchemas = readElasticsearchJsonSchemas() @@ -164,5 +165,30 @@ export const elasticSchemas: ElasticsearchSchemas = { } } } + }, + accessListSchema: { + index: 'access_list', + body: { + mappings: { + properties: { + chainId: { type: 'integer' }, + contractAddress: { type: 'keyword' }, + name: { type: 'keyword' }, + symbol: { type: 'keyword' }, + transferable: { type: 'boolean' }, + users: { + type: 'nested', + properties: { + wallet: { type: 'keyword' }, + tokenId: { type: 'long' }, + block: { type: 'long' }, + txId: { type: 'keyword' } + } + }, + deploymentBlock: { type: 'long' }, + deploymentTxId: { type: 'keyword' } + } + } + } } } diff --git a/src/components/database/ElasticSearchDatabase.ts b/src/components/database/ElasticSearchDatabase.ts index 104b0fa1d..a5f3d88df 100644 --- a/src/components/database/ElasticSearchDatabase.ts +++ b/src/components/database/ElasticSearchDatabase.ts @@ -1,11 +1,13 @@ import { Client } from '@elastic/elasticsearch' import { + AbstractAccessListDatabase, AbstractDdoDatabase, AbstractDdoStateDatabase, AbstractIndexerDatabase, AbstractLogDatabase, AbstractOrderDatabase } from './BaseDatabase.js' +import { AccessListUser } from '../../@types/AccessList.js' import { createElasticsearchClientWithRetry } from './ElasticsearchConfigHelper.js' import { OceanNodeDBConfig } from '../../@types' import { ElasticsearchSchema } from './ElasticSchemas.js' @@ -1021,6 +1023,277 @@ export class ElasticsearchLogDatabase extends AbstractLogDatabase { } } +export class ElasticsearchAccessListDatabase extends AbstractAccessListDatabase { + private client: Client + private index: string + + constructor(config: OceanNodeDBConfig) { + super(config) + this.index = 'access_list' + + return (async (): Promise => { + this.client = await createElasticsearchClientWithRetry(config) + await this.initializeIndex() + return this + })() as unknown as ElasticsearchAccessListDatabase + } + + private docId(chainId: number, contractAddress: string): string { + return `${chainId}-${contractAddress.toLowerCase()}` + } + + private async initializeIndex() { + try { + const indexExists = await this.client.indices.exists({ index: this.index }) + if (!indexExists) { + await this.client.indices.create({ + index: this.index, + body: { + mappings: { + properties: { + chainId: { type: 'integer' }, + contractAddress: { type: 'keyword' }, + name: { type: 'keyword' }, + symbol: { type: 'keyword' }, + transferable: { type: 'boolean' }, + users: { + type: 'nested', + properties: { + wallet: { type: 'keyword' }, + tokenId: { type: 'long' }, + block: { type: 'long' }, + txId: { type: 'keyword' } + } + }, + deploymentBlock: { type: 'long' }, + deploymentTxId: { type: 'keyword' } + } + } + } + }) + } + } catch (e) { + DATABASE_LOGGER.error(e.message) + } + } + + async create( + chainId: number, + contractAddress: string, + transferable: boolean, + block: number, + txId: string, + name?: string, + symbol?: string + ) { + const id = this.docId(chainId, contractAddress) + const lowerContract = contractAddress.toLowerCase() + try { + await this.client.update({ + index: this.index, + id, + body: { + script: { + source: ` + ctx._source.transferable = params.transferable; + ctx._source.deploymentBlock = params.block; + ctx._source.deploymentTxId = params.txId; + if (params.name != null) { ctx._source.name = params.name; } + if (params.symbol != null) { ctx._source.symbol = params.symbol; } + `, + lang: 'painless', + params: { transferable, block, txId, name, symbol } + }, + upsert: { + chainId, + contractAddress: lowerContract, + name, + symbol, + transferable, + users: [], + deploymentBlock: block, + deploymentTxId: txId + } + }, + refresh: 'wait_for' + }) + return { id } + } catch (error) { + const errorMsg = `Error when upserting access list ${id}: ${error.message}` + DATABASE_LOGGER.logMessageWithEmoji( + errorMsg, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return null + } + } + + async retrieve(chainId: number, contractAddress: string) { + const id = this.docId(chainId, contractAddress) + try { + const result = await this.client.get({ + index: this.index, + id, + refresh: true + }) + return result._source + } catch (error) { + if (error?.meta?.statusCode === 404) { + return null + } + const errorMsg = `Error when retrieving access list ${id}: ${error.message}` + DATABASE_LOGGER.logMessageWithEmoji( + errorMsg, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return null + } + } + + async addUser(chainId: number, contractAddress: string, user: AccessListUser) { + const id = this.docId(chainId, contractAddress) + const lowerContract = contractAddress.toLowerCase() + const normalized: AccessListUser = { ...user, wallet: user.wallet.toLowerCase() } + try { + await this.client.update({ + index: this.index, + id, + body: { + script: { + source: ` + if (ctx._source.users == null) { ctx._source.users = []; } + boolean exists = false; + for (int i = 0; i < ctx._source.users.length; i++) { + if (ctx._source.users[i].tokenId == params.user.tokenId) { exists = true; break; } + } + if (!exists) { ctx._source.users.add(params.user); } + `, + lang: 'painless', + params: { user: normalized } + }, + upsert: { + chainId, + contractAddress: lowerContract, + transferable: false, + users: [normalized] + } + }, + refresh: 'wait_for' + }) + return { id } + } catch (error) { + const errorMsg = `Error when adding user ${normalized.wallet} to access list ${id}: ${error.message}` + DATABASE_LOGGER.logMessageWithEmoji( + errorMsg, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return null + } + } + + async removeUserByTokenId(chainId: number, contractAddress: string, tokenId: number) { + const id = this.docId(chainId, contractAddress) + try { + await this.client.update({ + index: this.index, + id, + body: { + script: { + source: ` + if (ctx._source.users != null) { + ctx._source.users.removeIf(u -> u.tokenId == params.tokenId); + } + `, + lang: 'painless', + params: { tokenId } + } + }, + refresh: 'wait_for' + }) + return { id } + } catch (error) { + if (error?.meta?.statusCode === 404) { + DATABASE_LOGGER.logMessageWithEmoji( + `AddressRemoved on missing access list ${id} (tokenId=${tokenId}); ignoring.`, + true, + GENERIC_EMOJIS.EMOJI_CHECK_MARK, + LOG_LEVELS_STR.LEVEL_WARN + ) + return null + } + const errorMsg = `Error when removing tokenId ${tokenId} from access list ${id}: ${error.message}` + DATABASE_LOGGER.logMessageWithEmoji( + errorMsg, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return null + } + } + + async searchByWallet(wallet: string, chainId?: number): Promise { + const lowerWallet = wallet.toLowerCase() + const filters: any[] = [ + { + nested: { + path: 'users', + query: { term: { 'users.wallet': lowerWallet } } + } + } + ] + if (chainId !== undefined) { + filters.push({ term: { chainId } }) + } + try { + const result = await this.client.search({ + index: this.index, + size: 250, + body: { + query: { bool: { must: filters } } + } + } as any) + return result.hits.hits.map((h: any) => h._source) + } catch (error) { + const errorMsg = `Error when searching access lists by wallet ${lowerWallet}: ${error.message}` + DATABASE_LOGGER.logMessageWithEmoji( + errorMsg, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return [] + } + } + + async delete(chainId: number, contractAddress: string) { + const id = this.docId(chainId, contractAddress) + try { + await this.client.delete({ + index: this.index, + id, + refresh: 'wait_for' + }) + return { id } + } catch (error) { + const errorMsg = `Error when deleting access list ${id}: ${error.message}` + DATABASE_LOGGER.logMessageWithEmoji( + errorMsg, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + return null + } + } +} + /** * Make DB agnostic APIs. The response should be similar, no matter what DB engine is used * Normalizes the document responses to match same kind of typesense ones diff --git a/src/components/database/TypesenseDatabase.ts b/src/components/database/TypesenseDatabase.ts index 4129fdf02..b11054252 100644 --- a/src/components/database/TypesenseDatabase.ts +++ b/src/components/database/TypesenseDatabase.ts @@ -7,12 +7,14 @@ import { DATABASE_LOGGER } from '../../utils/logging/common.js' import { ENVIRONMENT_VARIABLES, TYPESENSE_HITS_CAP } from '../../utils/constants.js' import { + AbstractAccessListDatabase, AbstractDdoDatabase, AbstractDdoStateDatabase, AbstractIndexerDatabase, AbstractLogDatabase, AbstractOrderDatabase } from './BaseDatabase.js' +import { AccessListUser } from '../../@types/AccessList.js' import { validateDDO } from '../../utils/asset.js' import { DDOManager } from '@oceanprotocol/ddo-js' @@ -934,3 +936,175 @@ export class TypesenseLogDatabase extends AbstractLogDatabase { } } } + +export class TypesenseAccessListDatabase extends AbstractAccessListDatabase { + private provider: Typesense + + constructor(config: OceanNodeDBConfig, schema: TypesenseSchema) { + super(config, schema) + return (async (): Promise => { + this.provider = new Typesense({ + ...convertTypesenseConfig(this.config.url), + logger: DATABASE_LOGGER + }) + try { + await this.provider.collections(this.schema.name).retrieve() + } catch (error) { + if (error instanceof TypesenseError && error.httpStatus === 404) { + await this.provider.collections().create(this.schema) + } + } + return this + })() as unknown as TypesenseAccessListDatabase + } + + private docId(chainId: number, contractAddress: string): string { + return `${chainId}-${contractAddress.toLowerCase()}` + } + + async create( + chainId: number, + contractAddress: string, + transferable: boolean, + block: number, + txId: string, + name?: string, + symbol?: string + ) { + const id = this.docId(chainId, contractAddress) + const lowerContract = contractAddress.toLowerCase() + try { + const existing: any = await this.retrieve(chainId, contractAddress) + const doc = { + id, + chainId, + contractAddress: lowerContract, + name, + symbol, + transferable, + users: existing?.users ?? [], + deploymentBlock: block, + deploymentTxId: txId + } + if (existing) { + return await this.provider + .collections(this.schema.name) + .documents() + .update(id, doc) + } + return await this.provider.collections(this.schema.name).documents().create(doc) + } catch (error) { + this.logError(`upserting access list ${id}`, error) + return null + } + } + + async retrieve(chainId: number, contractAddress: string) { + const id = this.docId(chainId, contractAddress) + try { + const doc: any = await this.provider + .collections(this.schema.name) + .documents() + .retrieve(id) + return stripId(doc) + } catch (error) { + if (error instanceof TypesenseError && error.httpStatus === 404) { + return null + } + this.logError(`retrieving access list ${id}`, error) + return null + } + } + + async addUser(chainId: number, contractAddress: string, user: AccessListUser) { + const id = this.docId(chainId, contractAddress) + const lowerContract = contractAddress.toLowerCase() + const normalized: AccessListUser = { ...user, wallet: user.wallet.toLowerCase() } + try { + const existing: any = await this.retrieve(chainId, contractAddress) + const users: AccessListUser[] = existing?.users ?? [] + const exists = users.some((u) => u.tokenId === normalized.tokenId) + const nextUsers = exists ? users : [...users, normalized] + if (existing) { + return await this.provider + .collections(this.schema.name) + .documents() + .update(id, { users: nextUsers }) + } + return await this.provider.collections(this.schema.name).documents().create({ + id, + chainId, + contractAddress: lowerContract, + transferable: false, + users: nextUsers + }) + } catch (error) { + this.logError(`adding user ${normalized.wallet} to access list ${id}`, error) + return null + } + } + + async removeUserByTokenId(chainId: number, contractAddress: string, tokenId: number) { + const id = this.docId(chainId, contractAddress) + try { + const existing: any = await this.retrieve(chainId, contractAddress) + if (!existing) return null + const nextUsers = (existing.users ?? []).filter( + (u: AccessListUser) => u.tokenId !== tokenId + ) + return await this.provider + .collections(this.schema.name) + .documents() + .update(id, { users: nextUsers }) + } catch (error) { + this.logError(`removing tokenId ${tokenId} from access list ${id}`, error) + return null + } + } + + async searchByWallet(wallet: string, chainId?: number): Promise { + const lowerWallet = wallet.toLowerCase() + try { + const filterParts = [`users.wallet:=${lowerWallet}`] + if (chainId !== undefined) filterParts.push(`chainId:=${chainId}`) + const result = await this.provider + .collections(this.schema.name) + .documents() + .search({ + q: '*', + query_by: 'contractAddress', + filter_by: filterParts.join(' && '), + per_page: 250 + }) + return (result.hits ?? []).map((h: any) => stripId(h.document)) + } catch (error) { + this.logError(`searching access lists by wallet ${lowerWallet}`, error) + return [] + } + } + + async delete(chainId: number, contractAddress: string) { + const id = this.docId(chainId, contractAddress) + try { + return await this.provider.collections(this.schema.name).documents().delete(id) + } catch (error) { + this.logError(`deleting access list ${id}`, error) + return null + } + } + + private logError(action: string, error: any) { + DATABASE_LOGGER.logMessageWithEmoji( + `Error when ${action}: ${error?.message ?? error}`, + true, + GENERIC_EMOJIS.EMOJI_CROSS_MARK, + LOG_LEVELS_STR.LEVEL_ERROR + ) + } +} + +function stripId(doc: any): any { + if (!doc) return doc + const { id: _id, ...rest } = doc + return rest +} diff --git a/src/components/database/TypesenseSchemas.ts b/src/components/database/TypesenseSchemas.ts index 0cdf7ea5e..f928922c1 100644 --- a/src/components/database/TypesenseSchemas.ts +++ b/src/components/database/TypesenseSchemas.ts @@ -53,6 +53,7 @@ export type TypesenseSchemas = { logSchemas: TypesenseSchema orderSchema: TypesenseSchema ddoStateSchema: TypesenseSchema + accessListSchema: TypesenseSchema } const ddoSchemas = readJsonSchemas() export const typesenseSchemas: TypesenseSchemas = { @@ -126,5 +127,21 @@ export const typesenseSchemas: TypesenseSchemas = { { name: 'valid', type: 'bool' }, { name: 'error', type: 'string' } ] + }, + accessListSchema: { + name: 'access_list', + enable_nested_fields: true, + fields: [ + { name: 'chainId', type: 'int64' }, + { name: 'contractAddress', type: 'string' }, + { name: 'name', type: 'string', optional: true }, + { name: 'symbol', type: 'string', optional: true }, + { name: 'transferable', type: 'bool' }, + { name: 'users', type: 'object[]', optional: true }, + { name: 'users.wallet', type: 'string[]', optional: true, facet: true }, + { name: 'users.tokenId', type: 'int64[]', optional: true }, + { name: 'deploymentBlock', type: 'int64', optional: true }, + { name: 'deploymentTxId', type: 'string', optional: true } + ] } } diff --git a/src/components/database/index.ts b/src/components/database/index.ts index 6b0da5250..3c42f468d 100644 --- a/src/components/database/index.ts +++ b/src/components/database/index.ts @@ -6,6 +6,7 @@ import { } from '../../utils/logging/Logger.js' import { DATABASE_LOGGER } from '../../utils/logging/common.js' import { + AbstractAccessListDatabase, AbstractDdoDatabase, AbstractDdoStateDatabase, AbstractIndexerDatabase, @@ -29,6 +30,7 @@ export class Database { logs: AbstractLogDatabase order: AbstractOrderDatabase ddoState: AbstractDdoStateDatabase + accessList: AbstractAccessListDatabase sqliteConfig: SQLLiteConfigDatabase c2d: C2DDatabase authToken: AuthTokenDatabase @@ -101,6 +103,13 @@ export class Database { DATABASE_LOGGER.error(`DDO State database initialization failed: ${error}`) return null } + + try { + db.accessList = await DatabaseFactory.createAccessListDatabase(config) + } catch (error) { + DATABASE_LOGGER.error(`AccessList database initialization failed: ${error}`) + return null + } } else { DATABASE_LOGGER.info( 'Invalid DB URL. Only Nonce, C2D, Auth Token and Config Databases are initialized.' diff --git a/src/components/httpRoutes/accessList.ts b/src/components/httpRoutes/accessList.ts new file mode 100644 index 000000000..8a3fc9440 --- /dev/null +++ b/src/components/httpRoutes/accessList.ts @@ -0,0 +1,65 @@ +import express, { Request, Response } from 'express' +import { Readable } from 'stream' +import { isAddress } from 'ethers' +import { + GetAccessListHandler, + SearchAccessListHandler +} from '../core/handler/accessListHandler.js' +import { PROTOCOL_COMMANDS } from '../../utils/constants.js' +import { streamToString } from '../../utils/util.js' + +export const accessListRoutes = express.Router() + +accessListRoutes.get( + '/api/services/accesslists', + async (req: Request, res: Response): Promise => { + const { wallet } = req.query + if (typeof wallet !== 'string' || !wallet) { + res.status(400).send('Missing required query param: wallet') + return + } + if (!isAddress(wallet)) { + res.status(400).send('Invalid wallet address') + return + } + const chainIdQuery = req.query.chainId + let chainId: number | undefined + if (chainIdQuery !== undefined) { + chainId = Number(chainIdQuery) + if (Number.isNaN(chainId)) { + res.status(400).send('chainId must be a number') + return + } + } + const result = await new SearchAccessListHandler(req.oceanNode).handle({ + command: PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST, + wallet, + chainId, + caller: req.caller + }) + if (result.stream) { + const data = JSON.parse(await streamToString(result.stream as Readable)) + res.json(data) + } else { + res.status(result.status.httpStatus).send(result.status.error) + } + } +) + +accessListRoutes.get( + '/api/services/accesslists/:chainId/:contractAddress', + async (req: Request, res: Response): Promise => { + const result = await new GetAccessListHandler(req.oceanNode).handle({ + command: PROTOCOL_COMMANDS.GET_ACCESS_LIST, + chainId: Number(req.params.chainId), + contractAddress: req.params.contractAddress, + caller: req.caller + }) + if (result.stream) { + const data = JSON.parse(await streamToString(result.stream as Readable)) + res.json(data) + } else { + res.status(result.status.httpStatus).send(result.status.error) + } + } +) diff --git a/src/components/httpRoutes/index.ts b/src/components/httpRoutes/index.ts index ad4c0f3dc..0706f3cba 100644 --- a/src/components/httpRoutes/index.ts +++ b/src/components/httpRoutes/index.ts @@ -15,6 +15,7 @@ import { PolicyServerPassthroughRoute } from './policyServer.js' import { authRoutes } from './auth.js' import { adminConfigRoutes } from './adminConfig.js' import { persistentStorageRoutes } from './persistentStorage.js' +import { accessListRoutes } from './accessList.js' export * from './getOceanPeers.js' export * from './auth.js' @@ -64,6 +65,8 @@ httpRoutes.use(authRoutes) httpRoutes.use(adminConfigRoutes) // persistent storage routes httpRoutes.use(persistentStorageRoutes) +// access list routes +httpRoutes.use(accessListRoutes) export function getAllServiceEndpoints() { httpRoutes.stack.forEach(addMapping.bind(null, [])) diff --git a/src/test/integration/accessListEvents.test.ts b/src/test/integration/accessListEvents.test.ts new file mode 100644 index 000000000..ba73bd48b --- /dev/null +++ b/src/test/integration/accessListEvents.test.ts @@ -0,0 +1,438 @@ +import { expect } from 'chai' +import { JsonRpcProvider, Signer } from 'ethers' +import { homedir } from 'os' +import { Readable } from 'stream' +import AccessListFactory from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessListFactory.sol/AccessListFactory.json' with { type: 'json' } +import AccessList from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' with { type: 'json' } +import { Database } from '../../components/database/index.js' +import { OceanIndexer } from '../../components/Indexer/index.js' +import { OceanNode } from '../../OceanNode.js' +import { RPCS } from '../../@types/blockchain.js' +import { + DEVELOPMENT_CHAIN_ID, + getOceanArtifactsAdresses, + getOceanArtifactsAdressesByChainId +} from '../../utils/address.js' +import { ENVIRONMENT_VARIABLES, PROTOCOL_COMMANDS } from '../../utils/constants.js' +import { getConfiguration } from '../../utils/config.js' +import { streamToString } from '../../utils/util.js' +import { + buildEnvOverrideConfig, + DEFAULT_TEST_TIMEOUT, + getMockSupportedNetworks, + OverrideEnvConfig, + setupEnvironment, + tearDownEnvironment +} from '../utils/utils.js' +import { deployAccessListContract, getContract } from '../utils/contracts.js' +import { waitForCondition } from './testUtils.js' +import { + GetAccessListHandler, + SearchAccessListHandler +} from '../../components/core/handler/accessListHandler.js' + +describe('********** AccessList event indexing', function () { + this.timeout(DEFAULT_TEST_TIMEOUT * 4) + + let database: Database + let oceanNode: OceanNode + let provider: JsonRpcProvider + let owner: Signer + let factoryAddress: string + let indexer: OceanIndexer + const chainId = DEVELOPMENT_CHAIN_ID + const mockSupportedNetworks: RPCS = getMockSupportedNetworks() + let previousConfiguration: OverrideEnvConfig[] + + before(async () => { + previousConfiguration = await setupEnvironment( + null, + buildEnvOverrideConfig( + [ + ENVIRONMENT_VARIABLES.RPCS, + ENVIRONMENT_VARIABLES.INDEXER_NETWORKS, + ENVIRONMENT_VARIABLES.PRIVATE_KEY, + ENVIRONMENT_VARIABLES.ADDRESS_FILE + ], + [ + JSON.stringify(mockSupportedNetworks), + JSON.stringify([DEVELOPMENT_CHAIN_ID]), + '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', + `${homedir}/.ocean/ocean-contracts/artifacts/address.json` + ] + ) + ) + + const config = await getConfiguration(true) + database = await Database.init(config.dbConfig) + + const oldIndexer = OceanNode.getInstance(config, database).getIndexer() + if (oldIndexer) { + await oldIndexer.stopAllChainIndexers() + } + oceanNode = OceanNode.getInstance( + config, + database, + null, + null, + null, + null, + null, + true + ) + let artifactsAddresses = getOceanArtifactsAdressesByChainId(DEVELOPMENT_CHAIN_ID) + if (!artifactsAddresses) { + artifactsAddresses = getOceanArtifactsAdresses().development + } + factoryAddress = artifactsAddresses.AccessListFactory + + provider = new JsonRpcProvider('http://127.0.0.1:8545') + owner = (await provider.getSigner(0)) as Signer + + // Skip historical replay: the hardhat chain accumulates AccessList events + // across test runs. Pin the indexer to the current head so it only sees + // events emitted by THIS suite. + const headBlock = await provider.getBlockNumber() + await database.indexer.update(chainId, headBlock) + + indexer = new OceanIndexer(database, config, oceanNode.blockchainRegistry) + oceanNode.addIndexer(indexer) + }) + + after(async () => { + if (oceanNode) await oceanNode.tearDownAll() + await tearDownEnvironment(previousConfiguration) + }) + + it('factory deploy with no initial users creates an indexed document', async () => { + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'EmptyList', + 'EMPTY', + false, + await owner.getAddress(), + [], + [] + ) + expect(deployedAddr, 'deployment failed').to.be.a('string') + + const doc: any = await waitForCondition(async () => { + return await database.accessList.retrieve(chainId, deployedAddr!) + }, DEFAULT_TEST_TIMEOUT * 2) + + expect(doc, 'document was not indexed in time').to.not.equal(null) + expect(doc.contractAddress).to.equal(deployedAddr!.toLowerCase()) + expect(doc.transferable).to.equal(false) + expect(Array.isArray(doc.users)).to.equal(true) + expect(doc.users.length).to.equal(0) + }) + + it('factory deploy with initial users records every AddressAdded', async () => { + const wallets = [ + await (await provider.getSigner(2)).getAddress(), + await (await provider.getSigner(3)).getAddress(), + await (await provider.getSigner(4)).getAddress() + ] + const tokenURIs = wallets.map(() => 'https://oceanprotocol.com/nft/') + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'PrefilledList', + 'PRE', + false, + await owner.getAddress(), + wallets, + tokenURIs + ) + expect(deployedAddr, 'deployment failed').to.be.a('string') + + const doc: any = await waitForCondition(async () => { + const d = await database.accessList.retrieve(chainId, deployedAddr!) + return d && d.users && d.users.length === wallets.length ? d : null + }, DEFAULT_TEST_TIMEOUT * 3) + expect(doc, 'doc with all initial users not indexed in time').to.not.equal(null) + + const indexedWallets = doc.users.map((u: any) => u.wallet) + for (const w of wallets) { + expect(indexedWallets).to.include(w.toLowerCase()) + } + for (const u of doc.users) { + expect(u.tokenId).to.be.a('number') + expect(u.block).to.be.a('number').and.greaterThan(0) + expect(u.txId).to.be.a('string') + } + }) + + it('mint adds a user; burn removes by tokenId', async () => { + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'MutableList', + 'MUT', + false, + await owner.getAddress(), + [], + [] + ) + expect(deployedAddr).to.be.a('string') + + await waitForCondition(async () => { + return await database.accessList.retrieve(chainId, deployedAddr!) + }, DEFAULT_TEST_TIMEOUT * 2) + + const accessListContract = getContract(deployedAddr!, AccessList.abi, owner) + const newWalletSigner = await provider.getSigner(5) + const newWallet = await newWalletSigner.getAddress() + + const mintTx = await accessListContract.mint(newWallet, 'https://example/nft') + await mintTx.wait() + + const docAfterMint: any = await waitForCondition(async () => { + const d: any = await database.accessList.retrieve(chainId, deployedAddr!) + return d && d.users.some((u: any) => u.wallet === newWallet.toLowerCase()) + ? d + : null + }, DEFAULT_TEST_TIMEOUT * 2) + expect(docAfterMint).to.not.equal(null) + const minted = docAfterMint.users.find( + (u: any) => u.wallet === newWallet.toLowerCase() + ) + expect(minted).to.not.equal(undefined) + const { tokenId } = minted + + const burnTx = await accessListContract.burn(tokenId) + await burnTx.wait() + + const docAfterBurn: any = await waitForCondition(async () => { + const d: any = await database.accessList.retrieve(chainId, deployedAddr!) + return d && !d.users.some((u: any) => u.tokenId === tokenId) ? d : null + }, DEFAULT_TEST_TIMEOUT * 2) + expect(docAfterBurn).to.not.equal(null) + expect(docAfterBurn.users.some((u: any) => u.tokenId === tokenId)).to.equal(false) + }) + + it('searchByWallet returns AccessLists containing the wallet', async () => { + const wallet = await (await provider.getSigner(6)).getAddress() + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'SearchableList', + 'SCH', + false, + await owner.getAddress(), + [wallet], + ['https://example/nft'] + ) + expect(deployedAddr).to.be.a('string') + + await waitForCondition(async () => { + const d: any = await database.accessList.retrieve(chainId, deployedAddr!) + return d && d.users.length === 1 ? d : null + }, DEFAULT_TEST_TIMEOUT * 3) + + const matched = await waitForCondition(async () => { + const results = await database.accessList.searchByWallet(wallet, chainId) + return ( + results.find((r: any) => r.contractAddress === deployedAddr!.toLowerCase()) ?? + null + ) + }, DEFAULT_TEST_TIMEOUT * 3) + expect(matched, 'wallet not found in any access list').to.not.equal(null) + }) + + it('addUser is idempotent when the same tokenId is replayed', async () => { + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'IdempotentList', + 'IDM', + false, + await owner.getAddress(), + [], + [] + ) + expect(deployedAddr).to.be.a('string') + + await waitForCondition(async () => { + return await database.accessList.retrieve(chainId, deployedAddr!) + }, DEFAULT_TEST_TIMEOUT * 2) + + const sameUser = { + wallet: '0x' + 'a'.repeat(40), + tokenId: 999, + block: 1, + txId: '0xdeadbeef' + } + + await database.accessList.addUser(chainId, deployedAddr!, sameUser) + await database.accessList.addUser(chainId, deployedAddr!, sameUser) + + const doc: any = await database.accessList.retrieve(chainId, deployedAddr!) + const matches = doc.users.filter((u: any) => u.tokenId === sameUser.tokenId) + expect(matches.length).to.equal(1) + }) + + it('transferable: true is recorded on the doc', async () => { + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'TransferableList', + 'TRF', + true, + await owner.getAddress(), + [], + [] + ) + expect(deployedAddr).to.be.a('string') + + const doc: any = await waitForCondition(async () => { + return await database.accessList.retrieve(chainId, deployedAddr!) + }, DEFAULT_TEST_TIMEOUT * 2) + expect(doc).to.not.equal(null) + expect(doc.transferable).to.equal(true) + }) + + it('lastIndexedBlock advances after access list events', async () => { + const before = await database.indexer.retrieve(chainId) + const beforeBlock = before?.lastIndexedBlock ?? 0 + + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'CursorList', + 'CUR', + false, + await owner.getAddress(), + [], + [] + ) + expect(deployedAddr).to.be.a('string') + + await waitForCondition(async () => { + return await database.accessList.retrieve(chainId, deployedAddr!) + }, DEFAULT_TEST_TIMEOUT * 2) + + const after = await waitForCondition(async () => { + const cur = await database.indexer.retrieve(chainId) + return cur && cur.lastIndexedBlock > beforeBlock ? cur : null + }, DEFAULT_TEST_TIMEOUT * 2) + expect(after, 'indexer cursor did not advance').to.not.equal(null) + expect(after.lastIndexedBlock).to.be.greaterThan(beforeBlock) + }) + + it('GetAccessListHandler returns the indexed doc', async () => { + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'HandlerGetList', + 'HGET', + false, + await owner.getAddress(), + [], + [] + ) + expect(deployedAddr).to.be.a('string') + + await waitForCondition(async () => { + return await database.accessList.retrieve(chainId, deployedAddr!) + }, DEFAULT_TEST_TIMEOUT * 2) + + const result = await new GetAccessListHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.GET_ACCESS_LIST, + chainId, + contractAddress: deployedAddr! + }) + expect(result.status.httpStatus).to.equal(200) + expect(result.stream).to.not.equal(null) + const doc = JSON.parse(await streamToString(result.stream as Readable)) + expect(doc.contractAddress).to.equal(deployedAddr!.toLowerCase()) + }) + + it('GetAccessListHandler returns 404 for an unknown contract', async () => { + const result = await new GetAccessListHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.GET_ACCESS_LIST, + chainId, + contractAddress: '0x' + 'd'.repeat(40) + }) + expect(result.status.httpStatus).to.equal(404) + expect(result.stream).to.equal(null) + }) + + it('SearchAccessListHandler without chainId returns matches across all chains', async () => { + const wallet = await (await provider.getSigner(9)).getAddress() + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'CrossChainList', + 'CCL', + false, + await owner.getAddress(), + [wallet], + ['https://example/nft'] + ) + expect(deployedAddr).to.be.a('string') + + await waitForCondition(async () => { + const d: any = await database.accessList.retrieve(chainId, deployedAddr!) + return d && d.users.length === 1 ? d : null + }, DEFAULT_TEST_TIMEOUT * 3) + + const matched = await waitForCondition(async () => { + const result = await new SearchAccessListHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST, + wallet + }) + if (result.status.httpStatus !== 200 || !result.stream) return null + const docs = JSON.parse(await streamToString(result.stream as Readable)) + return ( + docs.find((d: any) => d.contractAddress === deployedAddr!.toLowerCase()) ?? null + ) + }, DEFAULT_TEST_TIMEOUT * 3) + expect(matched, 'wallet not found via cross-chain handler').to.not.equal(null) + }) + + it('SearchAccessListHandler returns docs containing a wallet', async () => { + const wallet = await (await provider.getSigner(8)).getAddress() + const deployedAddr = await deployAccessListContract( + owner, + factoryAddress, + AccessListFactory.abi, + 'HandlerSearchList', + 'HSRC', + false, + await owner.getAddress(), + [wallet], + ['https://example/nft'] + ) + expect(deployedAddr).to.be.a('string') + + await waitForCondition(async () => { + const d: any = await database.accessList.retrieve(chainId, deployedAddr!) + return d && d.users.length === 1 ? d : null + }, DEFAULT_TEST_TIMEOUT * 3) + + const matched = await waitForCondition(async () => { + const result = await new SearchAccessListHandler(oceanNode).handle({ + command: PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST, + wallet, + chainId + }) + if (result.status.httpStatus !== 200 || !result.stream) return null + const docs = JSON.parse(await streamToString(result.stream as Readable)) + return ( + docs.find((d: any) => d.contractAddress === deployedAddr!.toLowerCase()) ?? null + ) + }, DEFAULT_TEST_TIMEOUT * 3) + expect(matched, 'wallet not found via handler').to.not.equal(null) + }) +}) diff --git a/src/test/integration/testUtils.ts b/src/test/integration/testUtils.ts index 19e4c570c..3a8503dcb 100644 --- a/src/test/integration/testUtils.ts +++ b/src/test/integration/testUtils.ts @@ -84,6 +84,28 @@ export const waitToIndex = async ( }) } +/** + * Polls a predicate until it returns truthy or the timeout elapses. + * Returns the predicate's truthy value, or null on timeout. + */ +export async function waitForCondition( + predicate: () => Promise, + timeoutMs: number = DEFAULT_TEST_TIMEOUT, + intervalMs: number = 1000 +): Promise { + const start = Date.now() + while (Date.now() - start < timeoutMs) { + try { + const result = await predicate() + if (result) return result + } catch (_e) { + // ignore and retry + } + await new Promise((resolve) => setTimeout(resolve, intervalMs)) + } + return null +} + export async function signMessage( message: string, address: string, diff --git a/src/utils/constants.ts b/src/utils/constants.ts index 7150e37b0..fb3fa1d83 100644 --- a/src/utils/constants.ts +++ b/src/utils/constants.ts @@ -44,7 +44,9 @@ export const PROTOCOL_COMMANDS = { PERSISTENT_STORAGE_LIST_FILES: 'persistentStorageListFiles', PERSISTENT_STORAGE_UPLOAD_FILE: 'persistentStorageUploadFile', PERSISTENT_STORAGE_GET_FILE_OBJECT: 'persistentStorageGetFileObject', - PERSISTENT_STORAGE_DELETE_FILE: 'persistentStorageDeleteFile' + PERSISTENT_STORAGE_DELETE_FILE: 'persistentStorageDeleteFile', + GET_ACCESS_LIST: 'getAccessList', + SEARCH_ACCESS_LIST: 'searchAccessList' } // more visible, keep then close to make sure we always update both export const SUPPORTED_PROTOCOL_COMMANDS: string[] = [ @@ -90,7 +92,9 @@ export const SUPPORTED_PROTOCOL_COMMANDS: string[] = [ PROTOCOL_COMMANDS.PERSISTENT_STORAGE_LIST_FILES, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_UPLOAD_FILE, PROTOCOL_COMMANDS.PERSISTENT_STORAGE_GET_FILE_OBJECT, - PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE + PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE, + PROTOCOL_COMMANDS.GET_ACCESS_LIST, + PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST ] export const MetadataStates = { @@ -115,7 +119,10 @@ export const EVENTS = { DISPENSER_ACTIVATED: 'DispenserActivated', DISPENSER_DEACTIVATED: 'DispenserDeactivated', EXCHANGE_ACTIVATED: 'ExchangeActivated', - EXCHANGE_DEACTIVATED: 'ExchangeDeactivated' + EXCHANGE_DEACTIVATED: 'ExchangeDeactivated', + ADDRESS_ADDED: 'AddressAdded', + ADDRESS_REMOVED: 'AddressRemoved', + NEW_ACCESS_LIST: 'NewAccessList' } export const INDEXER_CRAWLING_EVENTS = { @@ -185,6 +192,18 @@ export const EVENT_HASHES: Hashes = { '0x03da9148e1de78fba22de63c573465562ebf6ef878a1d3ea83790a560229984c': { type: EVENTS.EXCHANGE_DEACTIVATED, text: 'ExchangeDeactivated(bytes32,address)' + }, + '0x9cc987676e7d63379f176ea50df0ae8d2d9d1141d1231d4ce15b5965f73c9430': { + type: EVENTS.ADDRESS_ADDED, + text: 'AddressAdded(address,uint256)' + }, + '0xb1e731889e7185f2cc895a86c70cded99d77ab8ecea58ab5abe5d43b084f51ae': { + type: EVENTS.ADDRESS_REMOVED, + text: 'AddressRemoved(uint256)' + }, + '0xd65bc8e3024bbad886df74eea79b6e118b7fbcffe1f3f98054e5a6b98dc83891': { + type: EVENTS.NEW_ACCESS_LIST, + text: 'NewAccessList(address,address)' } }