Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/@types/AccessList.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,10 @@
export interface AccessList {
[chainId: string]: string[]
}

export interface AccessListUser {
wallet: string
tokenId: number
block: number
txId: string
}
10 changes: 10 additions & 0 deletions src/@types/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ export interface FindPeerCommand extends Command {
export interface GetP2PPeersCommand extends Command {}
export interface GetP2PNetworkStatsCommand extends Command {}

export interface GetAccessListCommand extends Command {
chainId: number
contractAddress: string
}

export interface SearchAccessListCommand extends Command {
wallet: string
chainId?: number
}

export interface SignedCommand extends Command {
nonce: string
signature: string
Expand Down
8 changes: 7 additions & 1 deletion src/components/Indexer/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import {
ExchangeActivatedEventProcessor,
ExchangeDeactivatedEventProcessor,
ExchangeRateChangedEventProcessor,
NewAccessListEventProcessor,
AddressAddedEventProcessor,
AddressRemovedEventProcessor,
ProcessorConstructor
} from './processors/index.js'
import { findEventByKey } from './utils.js'
Expand All @@ -36,7 +39,10 @@ const EVENT_PROCESSOR_MAP: Record<string, ProcessorConstructor> = {
[EVENTS.EXCHANGE_CREATED]: ExchangeCreatedEventProcessor,
[EVENTS.EXCHANGE_ACTIVATED]: ExchangeActivatedEventProcessor,
[EVENTS.EXCHANGE_DEACTIVATED]: ExchangeDeactivatedEventProcessor,
[EVENTS.EXCHANGE_RATE_CHANGED]: ExchangeRateChangedEventProcessor
[EVENTS.EXCHANGE_RATE_CHANGED]: ExchangeRateChangedEventProcessor,
[EVENTS.NEW_ACCESS_LIST]: NewAccessListEventProcessor,
[EVENTS.ADDRESS_ADDED]: AddressAddedEventProcessor,
[EVENTS.ADDRESS_REMOVED]: AddressRemovedEventProcessor
}

const processorInstances = new Map<string, BaseEventProcessor>()
Expand Down
48 changes: 48 additions & 0 deletions src/components/Indexer/processors/AddressAddedEventProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { ethers, Signer, FallbackProvider, Interface } from 'ethers'
import { INDEXER_LOGGER } from '../../../utils/logging/common.js'
import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js'
import { BaseEventProcessor } from './BaseProcessor.js'
import AccessList from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' with { type: 'json' }

const accessListInterface = new Interface(AccessList.abi)

export class AddressAddedEventProcessor extends BaseEventProcessor {
async processEvent(
event: ethers.Log,
chainId: number,
signer: Signer,
provider: FallbackProvider
): Promise<any> {
try {
const decoded = accessListInterface.parseLog({
topics: Array.from(event.topics),
data: event.data
})
if (!decoded) return null

const wallet = decoded.args[0].toString().toLowerCase()
const tokenId = Number(decoded.args[1])
const contractAddress = event.address.toLowerCase()

const { accessList } = await this.getDatabase()
const result = await accessList.addUser(chainId, contractAddress, {
wallet,
tokenId,
block: event.blockNumber,
txId: event.transactionHash
})

INDEXER_LOGGER.logMessage(
`[AddressAdded] ${wallet} (tokenId=${tokenId}) added to ${contractAddress} on chain ${chainId}`
)
return result
} catch (err) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error processing AddressAdded event: ${err.message}`,
true
)
return null
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { ethers, Signer, FallbackProvider, Interface } from 'ethers'
import { INDEXER_LOGGER } from '../../../utils/logging/common.js'
import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js'
import { BaseEventProcessor } from './BaseProcessor.js'
import AccessList from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' with { type: 'json' }

const accessListInterface = new Interface(AccessList.abi)

export class AddressRemovedEventProcessor extends BaseEventProcessor {
async processEvent(
event: ethers.Log,
chainId: number,
signer: Signer,
provider: FallbackProvider
): Promise<any> {
try {
const decoded = accessListInterface.parseLog({
topics: Array.from(event.topics),
data: event.data
})
if (!decoded) return null

const tokenId = Number(decoded.args[0])
const contractAddress = event.address.toLowerCase()

const { accessList } = await this.getDatabase()
const result = await accessList.removeUserByTokenId(
chainId,
contractAddress,
tokenId
)

INDEXER_LOGGER.logMessage(
`[AddressRemoved] tokenId=${tokenId} removed from ${contractAddress} on chain ${chainId}`
)
return result
} catch (err) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error processing AddressRemoved event: ${err.message}`,
true
)
return null
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { ethers, Signer, FallbackProvider, Interface } from 'ethers'
import { INDEXER_LOGGER } from '../../../utils/logging/common.js'
import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js'
import { BaseEventProcessor } from './BaseProcessor.js'
import AccessListFactory from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessListFactory.sol/AccessListFactory.json' with { type: 'json' }
import AccessList from '@oceanprotocol/contracts/artifacts/contracts/accesslists/AccessList.sol/AccessList.json' with { type: 'json' }

const factoryInterface = new Interface(AccessListFactory.abi)

export class NewAccessListEventProcessor extends BaseEventProcessor {
async processEvent(
event: ethers.Log,
chainId: number,
signer: Signer,
provider: FallbackProvider
): Promise<any> {
try {
const decoded = factoryInterface.parseLog({
topics: Array.from(event.topics),
data: event.data
})
if (!decoded) return null

const contractAddress = decoded.args[0].toString().toLowerCase()

let transferable = false
let name: string | undefined
let symbol: string | undefined
try {
const accessListContract = new ethers.Contract(
contractAddress,
AccessList.abi,
provider
)
const [transferableRaw, nameRaw, symbolRaw] = await Promise.all([
accessListContract.transferable(),
accessListContract.name(),
accessListContract.symbol()
])
transferable = Boolean(transferableRaw)
name = nameRaw
symbol = symbolRaw
} catch (err) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_WARN,
`Failed to read on-chain metadata for ${contractAddress}: ${err.message}`
)
}

const { accessList } = await this.getDatabase()
const result = await accessList.create(
chainId,
contractAddress,
transferable,
event.blockNumber,
event.transactionHash,
name,
symbol
)

INDEXER_LOGGER.logMessage(
`[NewAccessList] Indexed access list ${contractAddress} on chain ${chainId} (name=${name}, symbol=${symbol}, transferable=${transferable})`
)
return result
} catch (err) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error processing NewAccessList event: ${err.message}`,
true
)
return null
}
}
}
3 changes: 3 additions & 0 deletions src/components/Indexer/processors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ export * from './MetadataEventProcessor.js'
export * from './MetadataStateEventProcessor.js'
export * from './OrderReusedEventProcessor.js'
export * from './OrderStartedEventProcessor.js'
export * from './NewAccessListEventProcessor.js'
export * from './AddressAddedEventProcessor.js'
export * from './AddressRemovedEventProcessor.js'
export * from './BaseProcessor.js'

export type ProcessorConstructor = new (
Expand Down
84 changes: 84 additions & 0 deletions src/components/core/handler/accessListHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { CommandHandler } from './handler.js'
import { P2PCommandResponse } from '../../../@types/OceanNode.js'
import {
GetAccessListCommand,
SearchAccessListCommand
} from '../../../@types/commands.js'
import { Readable } from 'stream'
import { isAddress } from 'ethers'
import {
ValidateParams,
validateCommandParameters
} from '../../httpRoutes/validateCommands.js'
import { CORE_LOGGER } from '../../../utils/logging/common.js'

export class GetAccessListHandler extends CommandHandler {
validate(command: GetAccessListCommand): ValidateParams {
return validateCommandParameters(command, ['chainId', 'contractAddress'])
}

async handle(task: GetAccessListCommand): Promise<P2PCommandResponse> {
const checks = await this.verifyParamsAndRateLimits(task)
if (checks.status.httpStatus !== 200 || checks.status.error !== null) {
return checks
}
try {
const db = await this.getOceanNode().getDatabase()
const doc = await db.accessList.retrieve(
Number(task.chainId),
String(task.contractAddress)
)
if (!doc) {
return {
stream: null,
status: { httpStatus: 404, error: 'AccessList not found' }
}
}
return {
stream: Readable.from(JSON.stringify(doc)),
status: { httpStatus: 200 }
}
} catch (error) {
CORE_LOGGER.error(`GetAccessListHandler error: ${error.message}`)
return {
stream: null,
status: { httpStatus: 500, error: 'Unknown error: ' + error.message }
}
}
}
}

export class SearchAccessListHandler extends CommandHandler {
validate(command: SearchAccessListCommand): ValidateParams {
return validateCommandParameters(command, ['wallet'])
}

async handle(task: SearchAccessListCommand): Promise<P2PCommandResponse> {
const checks = await this.verifyParamsAndRateLimits(task)
if (checks.status.httpStatus !== 200 || checks.status.error !== null) {
return checks
}
try {
const walletString = String(task.wallet)
if (!isAddress(walletString)) {
return {
stream: null,
status: { httpStatus: 400, error: 'Invalid wallet address' }
}
}
const db = await this.getOceanNode().getDatabase()
const chainId = task.chainId !== undefined ? Number(task.chainId) : undefined
const docs = await db.accessList.searchByWallet(walletString, chainId)
return {
stream: Readable.from(JSON.stringify(docs ?? [])),
status: { httpStatus: 200 }
}
} catch (error) {
CORE_LOGGER.error(`SearchAccessListHandler error: ${error.message}`)
return {
stream: null,
status: { httpStatus: 500, error: 'Unknown error: ' + error.message }
}
}
}
}
9 changes: 9 additions & 0 deletions src/components/core/handler/coreHandlersRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import {
PersistentStorageListFilesHandler,
PersistentStorageUploadFileHandler
} from './persistentStorage.js'
import { GetAccessListHandler, SearchAccessListHandler } from './accessListHandler.js'

export type HandlerRegistry = {
handlerName: string // name of the handler
Expand Down Expand Up @@ -199,6 +200,14 @@ export class CoreHandlersRegistry {
PROTOCOL_COMMANDS.PERSISTENT_STORAGE_DELETE_FILE,
new PersistentStorageDeleteFileHandler(node)
)
this.registerCoreHandler(
PROTOCOL_COMMANDS.GET_ACCESS_LIST,
new GetAccessListHandler(node)
)
this.registerCoreHandler(
PROTOCOL_COMMANDS.SEARCH_ACCESS_LIST,
new SearchAccessListHandler(node)
)
}

public static getInstance(
Expand Down
29 changes: 29 additions & 0 deletions src/components/database/BaseDatabase.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Schema } from '.'
import { OceanNodeDBConfig } from '../../@types'
import { AccessListUser } from '../../@types/AccessList.js'
import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js'
import { DATABASE_LOGGER } from '../../utils/logging/common.js'
import { ElasticsearchSchema } from './ElasticSchemas.js'
Expand Down Expand Up @@ -38,6 +39,34 @@ export abstract class AbstractIndexerDatabase extends AbstractDatabase {
abstract delete(network: number): Promise<any>
}

export abstract class AbstractAccessListDatabase extends AbstractDatabase {
abstract create(
chainId: number,
contractAddress: string,
transferable: boolean,
block: number,
txId: string,
name?: string,
symbol?: string
): Promise<any>

abstract retrieve(chainId: number, contractAddress: string): Promise<any>
abstract addUser(
chainId: number,
contractAddress: string,
user: AccessListUser
): Promise<any>

abstract removeUserByTokenId(
chainId: number,
contractAddress: string,
tokenId: number
): Promise<any>

abstract searchByWallet(wallet: string, chainId?: number): Promise<any[]>
abstract delete(chainId: number, contractAddress: string): Promise<any>
}

export abstract class AbstractLogDatabase extends AbstractDatabase {
abstract insertLog(logEntry: Record<string, any>): Promise<any>
abstract retrieveLog(id: string): Promise<Record<string, any> | null>
Expand Down
Loading
Loading