diff --git a/core/src/commonMain/kotlin/com/sunya/cdm/api/Group.kt b/core/src/commonMain/kotlin/com/sunya/cdm/api/Group.kt index c1c5ab1c..9408196e 100644 --- a/core/src/commonMain/kotlin/com/sunya/cdm/api/Group.kt +++ b/core/src/commonMain/kotlin/com/sunya/cdm/api/Group.kt @@ -4,6 +4,7 @@ package com.sunya.cdm.api import com.sunya.cdm.util.InternalLibraryApi import com.sunya.cdm.util.makeValidCdmObjectName +import io.github.oshai.kotlinlogging.KotlinLogging class Group(orgName : String, val typedefs : List, @@ -154,7 +155,7 @@ class Group(orgName : String, // add if vb name not already added fun addVariable(vb: Variable.Builder<*>) : Builder { if (vb.datatype == Datatype.REFERENCE) { - println("skip REFERENCE variable $vb") + logger.warn{"skip REFERENCE variable $vb"} return this } if (variables.find {it.name == vb.name } == null) { @@ -234,4 +235,8 @@ class Group(orgName : String, return Group(useName, typedefs, dimensions, attributes, variables, groups, parent) } } + + companion object { + val logger = KotlinLogging.logger("Group") + } } \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/sunya/cdm/api/Netchdf.kt b/core/src/commonMain/kotlin/com/sunya/cdm/api/Netchdf.kt index cc67eb5e..54420f02 100644 --- a/core/src/commonMain/kotlin/com/sunya/cdm/api/Netchdf.kt +++ b/core/src/commonMain/kotlin/com/sunya/cdm/api/Netchdf.kt @@ -1,7 +1,6 @@ package com.sunya.cdm.api import com.sunya.cdm.array.ArrayTyped -import com.sunya.cdm.iosp.ReadChunkConcurrent import com.sunya.cdm.util.CdmFullNames interface Netchdf : AutoCloseable { @@ -16,22 +15,23 @@ interface Netchdf : AutoCloseable { } // TODO I think the output type is not always the input type - fun readArrayData(v2: Variable, section: SectionPartial? = null) : ArrayTyped + fun readArrayData(v2: Variable, wantSection: SectionPartial? = null) : ArrayTyped - // iterate over all the chunks in section, order is arbitrary. - fun chunkIterator(v2: Variable, section: SectionPartial? = null, maxElements : Int? = null) : Iterator> + // iterate over all the chunks in section, order is arbitrary. TODO where is intersection with wantSection done ?? + fun chunkIterator(v2: Variable, wantSection: SectionPartial? = null, maxElements : Int? = null) : Iterator> + + fun readChunksConcurrent(v2: Variable, + lamda : (ArraySection<*>) -> Unit, + done : () -> Unit, + nthreads: Int? = null) { + TODO() + } } // the section describes the array chunk reletive to the variable's shape. -data class ArraySection(val array : ArrayTyped, val section : Section) - -// Experimental: read concurrently chunks of data, call back with lamda, order is arbitrary. -fun Netchdf.readChunksConcurrent(v2: Variable, - section: SectionPartial? = null, - maxElements : Int? = null, - nthreads: Int = 20, - lamda : (ArraySection) -> Unit) { - val reader = ReadChunkConcurrent() - val chunkIter = this.chunkIterator( v2, section, maxElements) - reader.readChunks(nthreads, chunkIter, lamda) +data class ArraySection(val array : ArrayTyped, val section : Section) { + fun intersect(wantSection: SectionPartial) : ArrayTyped { + // TODO + return array + } } \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/sunya/cdm/api/Section.kt b/core/src/commonMain/kotlin/com/sunya/cdm/api/Section.kt index 75dc29f2..2e23a4c6 100644 --- a/core/src/commonMain/kotlin/com/sunya/cdm/api/Section.kt +++ b/core/src/commonMain/kotlin/com/sunya/cdm/api/Section.kt @@ -37,6 +37,18 @@ data class Section(val ranges : List, val varShape : LongArray) result = 31 * result + varShape.contentHashCode() return result } + + override fun toString(): String { + return "Section(${ranges.show()}, shape=${shape.contentToString()}, totalElements=$totalElements, varShape=${varShape.contentToString()})" + } +} + +fun List.show() = buildString { + forEach { p: LongProgression -> + append("[${p.first}:${p.last}") + if (p.step != 1L) append(":${p.step}") + append("]") + } } /** A partially filled section of multidimensional array indices. */ diff --git a/core/src/commonMain/kotlin/com/sunya/cdm/iosp/ReadChunkConcurrent.kt b/core/src/commonMain/kotlin/com/sunya/cdm/iosp/ReadChunkConcurrent.kt deleted file mode 100644 index 41b0a439..00000000 --- a/core/src/commonMain/kotlin/com/sunya/cdm/iosp/ReadChunkConcurrent.kt +++ /dev/null @@ -1,51 +0,0 @@ -package com.sunya.cdm.iosp - -import com.sunya.cdm.api.ArraySection -import com.sunya.cdm.util.InternalLibraryApi - -import kotlinx.coroutines.* -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.channels.produce - -internal class ReadChunkConcurrent { - - fun readChunks(nthreads : Int, chunkIter : Iterator>, lamda : (ArraySection) -> Unit) { - - runBlocking { - val jobs = mutableListOf() - val chunkProducer = produceChunks(chunkIter) - repeat(nthreads) { - jobs.add( launchJob(it, chunkProducer, lamda) ) - } - - // wait for all jobs to be done, then close everything - joinAll(*jobs.toTypedArray()) - } - } - - private val allResults = mutableListOf() - private var count = 0 - private fun CoroutineScope.produceChunks(producer: Iterator>): ReceiveChannel> = - produce { - for (ballot in producer) { - send(ballot) - yield() - count++ - } - channel.close() - } - - private val mutex = Mutex() - - private fun CoroutineScope.launchJob( - id: Int, - input: ReceiveChannel>, - lamda: (ArraySection) -> Unit, - ) = launch(Dispatchers.Default) { - for (arraySection in input) { - lamda(arraySection) - yield() - } - } -} \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf4/Hdf4File.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf4/Hdf4File.kt index 035856e2..e51da22c 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf4/Hdf4File.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf4/Hdf4File.kt @@ -34,11 +34,11 @@ class Hdf4File(val filename : String) : Netchdf { override fun type() = header.type() override val size : Long get() = raf.size() - override fun readArrayData(v2: Variable, section: SectionPartial?): ArrayTyped { + override fun readArrayData(v2: Variable, wantSection: SectionPartial?): ArrayTyped { if (v2.nelems == 0L) { return ArrayEmpty(v2.shape.toIntArray(), v2.datatype) } - val filledSection = SectionPartial.fill(section, v2.shape) + val filledSection = SectionPartial.fill(wantSection, v2.shape) return if (v2.datatype == Datatype.COMPOUND) { readStructureDataArray(v2, filledSection) as ArrayTyped } else { diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1ext.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1data.kt similarity index 74% rename from core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1ext.kt rename to core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1data.kt index 78479d3f..be88f4d4 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1ext.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1data.kt @@ -7,22 +7,16 @@ import com.sunya.cdm.layout.Tiling import com.sunya.cdm.util.InternalLibraryApi import kotlin.collections.mutableListOf -/** a BTree1 that uses OpenFileExtended */ -internal class BTree1ext( +/** a BTree1 that uses OpenFileExtended and tracks its own tiling. */ +internal class BTree1data( val raf: OpenFileExtended, val rootNodeAddress: Long, - val nodeType : Int, // 0 = group/symbol table, 1 = raw data chunks varShape: LongArray, chunkShape: LongArray, ) { val tiling = Tiling(varShape, chunkShape) val ndimStorage = chunkShape.size - init { - // println(" BTreeNode varShape ${varShape.contentToString()} chunkShape ${chunkShape.contentToString()}") - require (nodeType == 1) - } - fun rootNode(): BTreeNode = BTreeNode(rootNodeAddress, null) // here both internal and leaf are the same structure @@ -33,7 +27,6 @@ internal class BTree1ext( private val leftAddress: Long private val rightAddress: Long - // type 1 val keys = mutableListOf() val values = mutableListOf() val children = mutableListOf() @@ -44,26 +37,34 @@ internal class BTree1ext( check(magic == "TREE") { "DataBTree doesnt start with TREE" } val type: Int = raf.readByte(state).toInt() - check(type == nodeType) { "DataBTree must be type $nodeType" } + check(type == 1) { "DataBTree must be type 1" } level = raf.readByte(state).toInt() // leaf nodes are level 0 nentries = raf.readShort(state).toInt() // number of children to which this node points leftAddress = raf.readOffset(state) rightAddress = raf.readOffset(state) - // println(" BTreeNode level $level nentries $nentries") - - for (idx in 0 until nentries) { + if (nentries == 0) { val chunkSize = raf.readInt(state) val filterMask = raf.readInt(state) val inner = LongArray(ndimStorage) { j -> raf.readLong(state) } val key = DataChunkKey(chunkSize, filterMask, inner) - val childPointer = raf.readAddress(state) // 4 or 8 bytes, then add fileOffset - if (level == 0) { - keys.add(inner) - values.add(DataChunkEntry1(level, this, idx, key, childPointer)) - } else { - children.add(BTreeNode(childPointer, this)) + val childPointer = raf.readAddress(state) + keys.add(inner) + values.add(DataChunkEntry1(this, key, childPointer)) + } else { + repeat(nentries) { + val chunkSize = raf.readInt(state) + val filterMask = raf.readInt(state) + val inner = LongArray(ndimStorage) { j -> raf.readLong(state) } + val key = DataChunkKey(chunkSize, filterMask, inner) + val childPointer = raf.readAddress(state) // 4 or 8 bytes, then add fileOffset + if (level == 0) { + keys.add(inner) + values.add(DataChunkEntry1( this, key, childPointer)) + } else { + children.add(BTreeNode(childPointer, this)) + } } } @@ -100,15 +101,15 @@ internal class BTree1ext( } // childAddress = data chunk (level 1) else a child node - data class DataChunkEntry1(val level : Int, val parent : BTreeNode, val idx : Int, val key : DataChunkKey, val childAddress : Long) : DataChunkIF { + data class DataChunkEntry1(val parent : BTreeNode, val key : DataChunkKey, val childAddress : Long) : DataChunkIF { override fun childAddress() = childAddress override fun offsets() = key.offsets - override fun isMissing() = (childAddress == -1L) + override fun isMissing() = (childAddress <= 0L) // may be 0 or -1 override fun chunkSize() = key.chunkSize override fun filterMask() = key.filterMask override fun show(tiling : Tiling) : String = "chunkSize=${key.chunkSize}, chunkStart=${key.offsets.contentToString()}" + - ", tile= ${tiling.tile(key.offsets).contentToString()} idx=$idx" + ", tile= ${tiling.tile(key.offsets).contentToString()}" } } diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree2j.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree2data.kt similarity index 99% rename from core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree2j.kt rename to core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree2data.kt index 343e5468..bab1967b 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree2j.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree2data.kt @@ -1,6 +1,5 @@ package com.sunya.netchdf.hdf5 - import com.sunya.cdm.api.computeSize import com.sunya.cdm.api.toIntArray import com.sunya.cdm.iosp.OpenFileIF @@ -15,7 +14,7 @@ import kotlin.math.pow @OptIn(InternalLibraryApi::class) /* Btree version 2, for data. From jhdf. */ -internal class BTree2j(private val h5: H5builder, owner: String, address: Long, storageDims: LongArray? = null) { // BTree2 +internal class BTree2data(private val h5: H5builder, owner: String, address: Long, storageDims: LongArray? = null) { // BTree2 val btreeType: Int private val nodeSize: Int // size in bytes of btree nodes private val recordSize: Int // size in bytes of btree records diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/ChunkedDataLayoutMessageV4.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/ChunkedDataLayoutMessageV4.kt index ebe3e44e..78153d6c 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/ChunkedDataLayoutMessageV4.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/ChunkedDataLayoutMessageV4.kt @@ -9,8 +9,6 @@ import com.sunya.cdm.util.InternalLibraryApi import io.github.oshai.kotlinlogging.KotlinLogging import kotlin.math.ceil -private val logger = KotlinLogging.logger("ChunkedDataLayoutMessageV4") - // DataLayoutMessage version 4, layout class 2 (chunked), chunkIndexingType 1-5 // jhdf @@ -213,6 +211,9 @@ internal class FixedArrayIndex(val h5: H5builder, val varShape: IntArray, val md fun chunkIterator() : Iterator = chunks.iterator() + companion object { + val logger = KotlinLogging.logger("ChunkedDataLayoutMessageV4") + } } ///////////////////////////////////////////////// diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/FractalHeap.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/FractalHeap.kt index 024bdab3..6d8c5636 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/FractalHeap.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/FractalHeap.kt @@ -174,12 +174,12 @@ internal class FractalHeap(private val h5: H5builder, forWho: String, address: L when (subtype) { 1, 2 -> { if (btreeHugeObjects == null) { // lazy - val local = BTree2j(h5, "FractalHeap btreeHugeObjects", btreeAddressHugeObjects) + val local = BTree2data(h5, "FractalHeap btreeHugeObjects", btreeAddressHugeObjects) require(local.btreeType == subtype) btreeHugeObjects = local.records } - val record1: BTree2j.Record1? = BTree2j.findRecord1byId(btreeHugeObjects!!, offset) + val record1: BTree2data.Record1? = BTree2data.findRecord1byId(btreeHugeObjects!!, offset) if (record1 == null) { throw RuntimeException("Cant find DHeapId=$offset") } @@ -385,7 +385,7 @@ internal class FractalHeap(private val h5: H5builder, forWho: String, address: L } companion object { - private val logger = KotlinLogging.logger("H5builder") + private val logger = KotlinLogging.logger("FractalHeap") var debugDetail = false var debugFractalHeap = false var debugPos = false diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/GroupSymbolTable.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/GroupSymbolTable.kt index 0d8032a5..fd9ffbcd 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/GroupSymbolTable.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/GroupSymbolTable.kt @@ -4,13 +4,14 @@ package com.sunya.netchdf.hdf5 import com.sunya.cdm.iosp.OpenFileState import com.sunya.cdm.util.InternalLibraryApi +import io.github.oshai.kotlinlogging.KotlinLogging -/** Wraps a BTree1New, when its used to store symbol table nodes for GroupOld. */ +/** Wraps a BTree1, when its used to store symbol table nodes for GroupOld. */ @OptIn(InternalLibraryApi::class) internal class GroupSymbolTable(val btreeAddress : Long) { fun symbolTableEntries(h5: H5builder): Iterable { - val btree = BTree1(h5, btreeAddress, 0) + val btree = BTreeSymbolTable(h5, btreeAddress) val symbols = mutableListOf() btree.readGroupEntries().forEach { readSymbolTableNode(h5, it.childAddress, symbols) @@ -33,10 +34,14 @@ internal class GroupSymbolTable(val btreeAddress : Long) { if (entry.objectHeaderAddress != 0L) { // skip zeroes, probably a bug in HDF5 file format or docs, or me symbols.add(entry) } else { - println(" BAD objectHeaderAddress==0 !! $entry") + logger.warn{" BAD objectHeaderAddress==0 !! $entry"} } } } + + companion object { + private val logger = KotlinLogging.logger("GroupSymbolTable") + } } // Level 1C - Symbol Table Entry @@ -101,4 +106,68 @@ internal data class SymbolTableEntry( init { require(dataSize == 32 || dataSize == 40) // sanity check } -} \ No newline at end of file +} + +internal class BTreeSymbolTable( + val h5: H5builder, + val rootNodeAddress: Long, +) { + fun readGroupEntries(): Iterator { + val root = Node(rootNodeAddress, null) + return if (root.level == 0) { + root.groupEntries.iterator() + } else { + val result = mutableListOf() + for (entry in root.groupEntries) { + readAllEntries(entry, root, result) + } + result.iterator() + } + } + + private fun readAllEntries(entry: GroupEntry, parent: Node, list: MutableList) { + val node = Node(entry.childAddress, parent) + if (node.level == 0) { + list.addAll(node.groupEntries) + } else { + for (nested in node.groupEntries) { + readAllEntries(nested, node, list) + } + } + } + + // here both internal and leaf are the same structure + // Btree nodes Level 1A1 - Version 1 B-trees + inner class Node(val address: Long, val parent: Node?) { + val level: Int + val nentries: Int + private val leftAddress: Long + private val rightAddress: Long + + // type 0 + val groupEntries = mutableListOf() + + init { + val state = OpenFileState(h5.getFileOffset(address), false) + val magic: String = h5.raf.readString(state, 4) + check(magic == "TREE") { "BTreeSymbolTable doesnt start with TREE" } + + val type: Int = h5.raf.readByte(state).toInt() + check(type == 0) { "BTreeSymbolTable must be node type 0" } + + level = h5.raf.readByte(state).toInt() // leaf nodes are level 0 + nentries = h5.raf.readShort(state).toInt() // number of children to which this node points + leftAddress = h5.readOffset(state) + rightAddress = h5.readOffset(state) + + repeat (nentries) { + val key = h5.readLength(state) // 4 or 8 bytes + val address = h5.readOffset(state) // 4 or 8 bytes + if (address > 0) groupEntries.add(GroupEntry(key, address)) + } + } + } + + /** @param key the byte offset into the local heap for the first object name in the subtree which that key describes. */ + data class GroupEntry(val key: Long, val childAddress: Long) +} diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5builder.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5builder.kt index 45cafd5b..5d3bb96f 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5builder.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5builder.kt @@ -11,6 +11,7 @@ import com.fleeksoft.charset.Charset import com.fleeksoft.charset.Charsets import com.sunya.cdm.array.makeString import com.sunya.cdm.util.InternalLibraryApi +import io.github.oshai.kotlinlogging.KotlinLogging private const val debugStart = false private const val debugSuperblock = false @@ -384,6 +385,55 @@ class H5builder( return size } + //////////////////////////////////////////////////////////////////////////////// + fun convertReferences(gb : Group.Builder) { + val refAtts = gb.attributes.filter{ it.datatype == Datatype.REFERENCE } + refAtts.forEach { att -> + val convertAtt = convertReferenceAttribute(att) + if (convertAtt != null) { + gb.addAttribute(convertAtt) + } + gb.attributes.remove(att) + } + + gb.variables.forEach{ vb -> + val refAtts = vb.attributes.filter{ it.datatype == Datatype.REFERENCE} + refAtts.forEach { att -> + val convertAtt = convertReferenceAttribute(att) + if (convertAtt != null) { + if (att.name == HDF5_DIMENSION_LIST) { + vb.dimNames = convertAtt.values as List + } else { + vb.addAttribute(convertAtt) + } + } + vb.attributes.remove(att) + } + } + gb.groups.forEach{ convertReferences(it) } + } + + fun convertReferenceAttribute(att : Attribute<*>) : Attribute<*>? { + val svalues = mutableListOf() + att.values.forEach { + val dsetId = it as Long + val pair = datasetMap[dsetId] + if (pair == null) { + logger.warn {"H5 cant find dataset reference for att $att in file ${raf.location()}"} + return null + } + val (gb, vb) = pair + val name = vb.fullname(gb) + svalues.add(name) + } + return Attribute(att.name, Datatype.STRING, svalues) + } + + fun openFileExtended(): OpenFileExtended { + val raf: OpenFileIF = OkioFile(this.raf.location()) + return OpenFileExtended(raf, this.isLengthLong, this.isOffsetLong, this.superblockStart) + } + companion object { // special attribute names in HDF5 const val HDF5_CLASS = "CLASS" @@ -426,50 +476,7 @@ class H5builder( * 3) all variables' dimensions have a dimension scale */ private const val KNOWN_FILTERS = 3 - } - - //////////////////////////////////////////////////////////////////////////////// - fun convertReferences(gb : Group.Builder) { - val refAtts = gb.attributes.filter{ it.datatype == Datatype.REFERENCE } - refAtts.forEach { att -> - val convertAtt = convertReferenceAttribute(att) - if (convertAtt != null) { - gb.addAttribute(convertAtt) - } - gb.attributes.remove(att) - } - - gb.variables.forEach{ vb -> - val refAtts = vb.attributes.filter{ it.datatype == Datatype.REFERENCE} - refAtts.forEach { att -> - val convertAtt = convertReferenceAttribute(att) - if (convertAtt != null) { - if (att.name == HDF5_DIMENSION_LIST) { - vb.dimNames = convertAtt.values as List - } else { - vb.addAttribute(convertAtt) - } - } - vb.attributes.remove(att) - } - } - gb.groups.forEach{ convertReferences(it) } - } - fun convertReferenceAttribute(att : Attribute<*>) : Attribute<*>? { - val svalues = mutableListOf() - att.values.forEach { - val dsetId = it as Long - val pair = datasetMap[dsetId] - if (pair == null) { - println("H5 cant find dataset reference for att $att") - return null - } - val (gb, vb) = pair - val name = vb.fullname(gb) - svalues.add(name) - } - return Attribute(att.name, Datatype.STRING, svalues) + val logger = KotlinLogging.logger("H5builder") } - } \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5readConcurrent.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkConcurrent.kt similarity index 79% rename from core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5readConcurrent.kt rename to core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkConcurrent.kt index 3f2b6ce8..ea56b658 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5readConcurrent.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkConcurrent.kt @@ -8,8 +8,6 @@ import com.sunya.cdm.api.Variable import com.sunya.cdm.api.computeSize import com.sunya.cdm.api.toIntArray import com.sunya.cdm.api.toLongArray -import com.sunya.cdm.iosp.OkioFile -import com.sunya.cdm.iosp.OpenFileIF import com.sunya.cdm.iosp.OpenFileState import com.sunya.cdm.layout.IndexSpace import com.sunya.cdm.layout.Tiling @@ -25,13 +23,15 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.yield -class H5readConcurrent(val h5file: Hdf5File, val v2: Variable<*>) { +class H5chunkConcurrent(h5file: Hdf5File, val v2: Variable<*>) { val h5 = h5file.header + val rafext: OpenFileExtended = h5.openFileExtended() + val varShape = v2.shape val chunkShape: IntArray val tiling: Tiling val nchunks: Long - internal val rootNode: BTree1ext.BTreeNode + internal val rootNode: BTree1data.BTreeNode val rootAddress: Long init { @@ -42,38 +42,38 @@ class H5readConcurrent(val h5file: Hdf5File, val v2: Variable<*>) { chunkShape = mdl.chunkDims tiling = Tiling(varShape, chunkShape.toLongArray()) nchunks = tiling.tileShape.computeSize() - val h5 = h5file.header - - val rafext: OpenFileExtended = OpenFileExtended( - h5.raf, - h5.isLengthLong, h5.isOffsetLong, h5.superblockStart, - ) - val bTreeExt = BTree1ext(rafext, mdl.btreeAddress, 1, varShape, chunkShape.toLongArray()) + // its not obvious you actually need a seperate raf + val bTreeExt = BTree1data(rafext, mdl.btreeAddress, varShape, chunkShape.toLongArray()) rootNode = bTreeExt.rootNode() rootAddress = mdl.btreeAddress } - fun readChunks(nthreads: Int, lamda: (ArraySection<*>) -> Unit) { + // TODO section: SectionPartial + fun readChunks(nthreads: Int, lamda: (ArraySection<*>) -> Unit, done: () -> Unit) { runBlocking { val jobs = mutableListOf() - + val workers = mutableListOf() val chunkProducer = produceChunks(rootNode.asSequence()) repeat(nthreads) { - val worker = Worker(h5file.filename) + val worker = Worker() jobs.add( launchJob(worker, chunkProducer, lamda)) + workers.add(worker) } // wait for all jobs to be done, then close everything joinAll(*jobs.toTypedArray()) + workers.forEach { it.rafext.close() } } + rafext.close() + done() } private var count = 0 private fun CoroutineScope.produceChunks(producer: Sequence>): ReceiveChannel> = produce { - for (datatChunk in producer) { - send(datatChunk) + for (dataChunk in producer) { + send(dataChunk) yield() count++ } @@ -87,20 +87,16 @@ class H5readConcurrent(val h5file: Hdf5File, val v2: Variable<*>) { ) = launch(Dispatchers.Default) { for (pair: Pair in input) { val arraySection = worker.work(pair.second) - lamda(arraySection) + if (arraySection != null) lamda(arraySection) yield() } } - private inner class Worker(filename: String) { - private val raf: OpenFileIF = OkioFile(filename) - private val rafext: OpenFileExtended = OpenFileExtended( - raf, - h5.isLengthLong, h5.isOffsetLong, h5.superblockStart, - ) + private inner class Worker() { + val rafext: OpenFileExtended = h5.openFileExtended() // here we need a seperate raf // a thread-safe accessor of the btree - private val btree1 = BTree1ext(rafext, rootAddress, 1, varShape, chunkShape.toLongArray()) + // private val btree1 = BTree1data(rafext, rootAddress, varShape, chunkShape.toLongArray()) val vinfo: DataContainerVariable = v2.spObject as DataContainerVariable val h5type: H5TypeInfo @@ -119,9 +115,10 @@ class H5readConcurrent(val h5file: Hdf5File, val v2: Variable<*>) { state = OpenFileState(0L, h5type.isBE) } - fun work(dataChunk : DataChunkIF) : ArraySection<*> { - val dataSpace = IndexSpace(v2.rank, dataChunk.offsets(), vinfo.storageDims) + fun work(dataChunk : DataChunkIF) : ArraySection<*>? { + // TODO check if intersect with wantSection before reading in data + val dataSpace = IndexSpace(v2.rank, dataChunk.offsets(), vinfo.storageDims) val ba = if (dataChunk.isMissing()) { if (debugChunking) println(" missing ${dataChunk.show(tiling)}") val sizeBytes = dataSpace.totalElements * elemSize @@ -132,7 +129,7 @@ class H5readConcurrent(val h5file: Hdf5File, val v2: Variable<*>) { } else { if (debugChunking) println(" chunkIterator=${dataChunk.show(tiling)}") state.pos = dataChunk.childAddress() - val rawdata = h5.raf.readByteArray(state, dataChunk.chunkSize()) + val rawdata = rafext.readByteArray(state, dataChunk.chunkSize()) if (dataChunk.filterMask() == null) rawdata else filters.apply(rawdata, dataChunk.filterMask()!!) } diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkIterator.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkIterator.kt index 7a790a82..03571e81 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkIterator.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkIterator.kt @@ -2,7 +2,6 @@ package com.sunya.netchdf.hdf5 import com.sunya.cdm.api.* import com.sunya.cdm.array.ArrayTyped -import com.sunya.cdm.iosp.OpenFileIF import com.sunya.cdm.iosp.OpenFileState import com.sunya.cdm.layout.Chunker import com.sunya.cdm.layout.IndexSpace @@ -89,25 +88,3 @@ internal class H5chunkIterator(val h5 : H5builder, val v2: Variable, val w } } -// for H5readConcurrent -class OpenFileExtended(val delegate: OpenFileIF, - val isLengthLong: Boolean, - val isOffsetLong: Boolean, - val startingOffset: Long, ) : OpenFileIF by delegate { - - fun readLength(state : OpenFileState): Long { - return if (isLengthLong) delegate.readLong(state) else delegate.readInt(state).toLong() - } - - fun readOffset(state : OpenFileState): Long { - return if (isOffsetLong) delegate.readLong(state) else delegate.readInt(state).toLong() - } - - fun getFileOffset(address: Long): Long { - return startingOffset + address - } - - fun readAddress(state : OpenFileState): Long { - return getFileOffset(readOffset(state)) - } -} diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkReader.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkReader.kt index cfe66f5a..4f8d8bba 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkReader.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkReader.kt @@ -59,6 +59,7 @@ internal class H5chunkReader(val h5 : H5builder) { } } + // TODO can we use concurrent reading ?? internal fun readBtreeVer1(v2: Variable, wantSection: Section): ArrayTyped { val vinfo = v2.spObject as DataContainerVariable val h5type = vinfo.h5type diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5group.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5group.kt index 6e65ccf6..9a2cb9d4 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5group.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5group.kt @@ -27,7 +27,7 @@ internal fun H5builder.readH5Group(facade: DataObjectFacade): H5GroupBuilder? { facade.group = hashGroups[groupMessage.btreeAddress] if (facade.group != null && facade.parent != null) { if (facade.parent.isChildOf(facade.group!!)) { - println("Remove hard link to group that creates a loop = ${facade.group!!.name}") + H5builder.logger.warn{"Remove hard link to group that creates a loop = ${facade.group!!.name} in ${this.raf.location()}"} facade.group = null return null } @@ -59,11 +59,11 @@ internal fun H5builder.readGroupNew( check(btreeAddress >= 0) { "no valid btree for GroupNew with Fractal Heap" } // read in btree and all entries - val btree2j = BTree2j(this, parent.name, btreeAddress) + val btree2j = BTree2data(this, parent.name, btreeAddress) for (record in btree2j.records) { val heapId: ByteArray = when (btree2j.btreeType) { - 5 -> (record as BTree2j.Record5).heapId - 6 -> (record as BTree2j.Record6).heapId + 5 -> (record as BTree2data.Record5).heapId + 6 -> (record as BTree2data.Record6).heapId else -> throw RuntimeException("btree2 type ${btree2j.btreeType} mot supported") } @@ -138,7 +138,7 @@ internal fun H5builder.replaceSymbolicLinks(groupb: H5GroupBuilder) { } else if (dof.linkName != null) { // symbolic links val link: DataObjectFacade? = this.symlinkMap[dof.linkName] if (link == null) { - println(" WARNING Didnt find symbolic link=${dof.linkName} from ${dof.name}") + H5builder.logger.warn{"Didnt find symbolic link=${dof.linkName} from ${dof.name}"} objList.removeAt(count) continue } diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/Hdf5File.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/Hdf5File.kt index 1cd01aa1..b9a2812c 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/Hdf5File.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/Hdf5File.kt @@ -32,6 +32,9 @@ class Hdf5File(val filename : String, strict : Boolean = false) : Netchdf { override val size : Long get() = raf.size() fun layoutName(v: Variable<*>): String { + if (v.spObject is DataContainerAttribute) { + return("DataContainerAttribute") + } val vinfo = (v.spObject as DataContainerVariable) return if (vinfo.mdl != null) vinfo.mdl.javaClass.simpleName else "none" } @@ -94,7 +97,7 @@ class Hdf5File(val filename : String, strict : Boolean = false) : Netchdf { } else if (vinfo.mdl is DataLayoutBtreeVer2) { // H5chunkReader(header).readBtreeVer2j(v2, wantSection) - val index = BTree2j(header, v2.name, vinfo.dataPos, vinfo.storageDims) + val index = BTree2data(header, v2.name, vinfo.dataPos, vinfo.storageDims) H5chunkReader(header).readChunkedData(v2, wantSection, index.chunkIterator()) } else { @@ -106,7 +109,6 @@ class Hdf5File(val filename : String, strict : Boolean = false) : Netchdf { } } - // for Netchdf.readChunksConcurrent override fun chunkIterator(v2: Variable, section: SectionPartial?, maxElements : Int?) : Iterator> { if (v2.nelems == 0L) { return listOf>().iterator() @@ -116,11 +118,11 @@ class Hdf5File(val filename : String, strict : Boolean = false) : Netchdf { if (vinfo.onlyFillValue) { // fill value only, no data val tba = TypedByteArray(v2.datatype, vinfo.fillValue, 0, isBE = vinfo.h5type.isBE) - val single = ArraySection( ArraySingle(wantSection.shape.toIntArray(), v2.datatype, tba.get(0)), wantSection) + val single = ArraySection(ArraySingle(wantSection.shape.toIntArray(), v2.datatype, tba.get(0)), wantSection) return listOf(single).iterator() } - // TODO + // TODO can we use concurrent reading ?? return if (vinfo.mdl is DataLayoutBTreeVer1) { H5chunkIterator(header, v2, wantSection) } else { @@ -128,4 +130,36 @@ class Hdf5File(val filename : String, strict : Boolean = false) : Netchdf { } } + override fun readChunksConcurrent(v2: Variable, lamda : (ArraySection<*>) -> Unit, done : () -> Unit, nthreads: Int?) { + val reader = H5chunkConcurrent(this, v2) + // TODO default nthreads ?? + reader.readChunks(nthreads ?: 20, lamda, done = { done() }) + } + + /* + class Btree1chunkIterator(val hdfFile: Hdf5File, val v2: Variable<*>, val wantSection: SectionPartial?): AbstractIterator>() { + val reader = H5readConcurrent(hdfFile, v2) + val nthreads = 20 + var currElement: ArraySection<*>? = null // could be a queue ? or a stack with a limit + var done: Boolean = false + + init { + reader.readChunks(nthreads, + lamda = { it -> + if (currElement == null) currElement = it + }, + done = { done = true } + ) + } + + override fun computeNext() { + if (currElement != null) { + setNext( currElement!! ) + currElement = null + } else { + wait() + } + if (done) done() + } + } */ } \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/MessageHeader.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/MessageHeader.kt index 521a4e1d..b210ff8a 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/MessageHeader.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/MessageHeader.kt @@ -884,14 +884,14 @@ private fun H5builder.readAttributesFromInfoMessage2( val btreeAddress: Long = attributeOrderBtreeAddress ?: attributeNameBtreeAddress if (btreeAddress < 0 || fractalHeapAddress < 0) return emptyList() - val btree2j = BTree2j(this, "AttributeInfoMessage", btreeAddress) + val btree2j = BTree2data(this, "AttributeInfoMessage", btreeAddress) val fractalHeapj = FractalHeap(this, "AttributeInfoMessage", fractalHeapAddress) val attMessages = mutableListOf() for (record in btree2j.records) { val heapId: ByteArray = when (btree2j.btreeType) { - 8 -> (record as BTree2j.Record8).heapId - 9 -> (record as BTree2j.Record9).heapId + 8 -> (record as BTree2data.Record8).heapId + 9 -> (record as BTree2data.Record9).heapId else -> continue } diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/OpenFileExtended.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/OpenFileExtended.kt new file mode 100644 index 00000000..95ac5b41 --- /dev/null +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/OpenFileExtended.kt @@ -0,0 +1,29 @@ +package com.sunya.netchdf.hdf5 + +import com.sunya.cdm.iosp.OpenFileIF +import com.sunya.cdm.iosp.OpenFileState + +// for H5readConcurrent +class OpenFileExtended(val delegate: OpenFileIF, + val isLengthLong: Boolean, + val isOffsetLong: Boolean, + val startingOffset: Long, ) : OpenFileIF by delegate { + + fun readLength(state : OpenFileState): Long { + return if (isLengthLong) delegate.readLong(state) else delegate.readInt(state).toLong() + } + + fun readOffset(state : OpenFileState): Long { + return if (isOffsetLong) delegate.readLong(state) else delegate.readInt(state).toLong() + } + + fun getFileOffset(address: Long): Long { + return startingOffset + address + } + + fun readAddress(state : OpenFileState): Long { + return getFileOffset(readOffset(state)) + } + + override fun close() = delegate.close() +} \ No newline at end of file diff --git a/core/src/commonTest/kotlin/com/sunya/netchdf/hdf5/Btree1extTest.kt b/core/src/commonTest/kotlin/com/sunya/netchdf/hdf5/Btree1extTest.kt index 494e6267..6c185395 100644 --- a/core/src/commonTest/kotlin/com/sunya/netchdf/hdf5/Btree1extTest.kt +++ b/core/src/commonTest/kotlin/com/sunya/netchdf/hdf5/Btree1extTest.kt @@ -4,8 +4,6 @@ package com.sunya.netchdf.hdf5 import com.sunya.cdm.api.ArraySection import com.sunya.cdm.api.toLongArray -import com.sunya.cdm.iosp.OkioFile -import com.sunya.cdm.iosp.OpenFileIF import com.sunya.cdm.util.InternalLibraryApi import com.sunya.netchdf.testutil.nano import com.sunya.netchdf.testutil.testData @@ -29,11 +27,7 @@ class Btree1extTest { val myvar = myfile.rootGroup().allVariables().find { it.fullname() == varname } ?: throw RuntimeException("cant find $varname") - val raf: OpenFileIF = OkioFile(filename) - val rafext: OpenFileExtended = OpenFileExtended( - raf, - h5.isLengthLong, h5.isOffsetLong, h5.superblockStart, - ) + val rafext: OpenFileExtended = h5.openFileExtended() val varShape = myvar.shape @@ -44,7 +38,7 @@ class Btree1extTest { val chunkShape = mdl.chunkDims // a thread-safe accessor of the btree - val bTreeExt = BTree1ext(rafext, mdl.btreeAddress, 1, varShape, chunkShape.toLongArray()) + val bTreeExt = BTree1data(rafext, mdl.btreeAddress, varShape, chunkShape.toLongArray()) val rootNode = bTreeExt.rootNode() rootNode.asSequence().forEach { (key, value) -> println("Key: ${key}, Value: $value") } @@ -65,10 +59,11 @@ class Btree1extTest { for (nthreads in listOf(1, 2, 4, 8, 10, 16, 20, 24, 32, 40, 48)) { val time = measureNanoTime { - val reader = H5readConcurrent(myfile, myvar) - reader.readChunks(nthreads) { asect: ArraySection<*> -> + // fun readChunks(nthreads: Int, lamda: (ArraySection<*>) -> Unit, done: () -> Unit) { + val reader = H5chunkConcurrent(myfile, myvar) + reader.readChunks(nthreads, { asect: ArraySection<*> -> // println(" section = ${asect.section}") - } + }, { }, ) } println("$nthreads, ${time * nano}") } diff --git a/testclibs/src/main/kotlin/com/sunya/netchdf/hdf4Clib/H4ClibFile.kt b/testclibs/src/main/kotlin/com/sunya/netchdf/hdf4Clib/H4ClibFile.kt index 295e89c1..c08a3797 100644 --- a/testclibs/src/main/kotlin/com/sunya/netchdf/hdf4Clib/H4ClibFile.kt +++ b/testclibs/src/main/kotlin/com/sunya/netchdf/hdf4Clib/H4ClibFile.kt @@ -47,8 +47,8 @@ class Hdf4ClibFile(val filename: String) : Netchdf { } // LOOK SDreadchunk ?? - override fun readArrayData(v2: Variable, section: SectionPartial?): ArrayTyped { - return readArrayData(v2, SectionPartial.fill(section, v2.shape)) + override fun readArrayData(v2: Variable, wantSection: SectionPartial?): ArrayTyped { + return readArrayData(v2, SectionPartial.fill(wantSection, v2.shape)) } internal fun readArrayData(v2: Variable, filled: Section): ArrayTyped { @@ -81,12 +81,12 @@ class Hdf4ClibFile(val filename: String) : Netchdf { throw RuntimeException("cant read ${v2.name}") } - override fun chunkIterator(v2: Variable, section: SectionPartial?, maxElements : Int?): Iterator> { + override fun chunkIterator(v2: Variable, wantSection: SectionPartial?, maxElements : Int?): Iterator> { if (v2.nelems == 0L) { return listOf>().iterator() } - val filled = SectionPartial.fill(section, v2.shape) + val filled = SectionPartial.fill(wantSection, v2.shape) return HCmaxIterator(v2, filled, maxElements ?: 100_000) } diff --git a/testclibs/src/main/kotlin/com/sunya/netchdf/hdf5Clib/H5ClibFile.kt b/testclibs/src/main/kotlin/com/sunya/netchdf/hdf5Clib/H5ClibFile.kt index e49e4ea4..d98b84d5 100644 --- a/testclibs/src/main/kotlin/com/sunya/netchdf/hdf5Clib/H5ClibFile.kt +++ b/testclibs/src/main/kotlin/com/sunya/netchdf/hdf5Clib/H5ClibFile.kt @@ -55,8 +55,8 @@ class Hdf5ClibFile(val filename: String) : Netchdf { val status = H5Fclose(header.file_id) } - override fun readArrayData(v2: Variable, section: SectionPartial?): ArrayTyped { - return readArrayData(v2, SectionPartial.fill(section, v2.shape)) + override fun readArrayData(v2: Variable, wantSection: SectionPartial?): ArrayTyped { + return readArrayData(v2, SectionPartial.fill(wantSection, v2.shape)) } internal fun readArrayData(v2: Variable, fillSection: Section): ArrayTyped { @@ -140,8 +140,8 @@ class Hdf5ClibFile(val filename: String) : Netchdf { } } - override fun chunkIterator(v2: Variable, section: SectionPartial?, maxElements : Int?): Iterator> { - return H5CmaxIterator(v2, section, maxElements ?: 100_000) + override fun chunkIterator(v2: Variable, wantSection: SectionPartial?, maxElements : Int?): Iterator> { + return H5CmaxIterator(v2, wantSection, maxElements ?: 100_000) } private inner class H5CmaxIterator(val v2: Variable, section : SectionPartial?, maxElems: Int) : AbstractIterator>() { diff --git a/testclibs/src/main/kotlin/com/sunya/netchdf/netcdfClib/NClibFile.kt b/testclibs/src/main/kotlin/com/sunya/netchdf/netcdfClib/NClibFile.kt index df6f149c..0926af54 100644 --- a/testclibs/src/main/kotlin/com/sunya/netchdf/netcdfClib/NClibFile.kt +++ b/testclibs/src/main/kotlin/com/sunya/netchdf/netcdfClib/NClibFile.kt @@ -111,8 +111,8 @@ class NClibFile(val filename: String) : Netchdf { // NOOP } - override fun readArrayData(v2: Variable, section: SectionPartial?): ArrayTyped { - return readArrayData(v2, SectionPartial.fill(section, v2.shape)) + override fun readArrayData(v2: Variable, wantSection: SectionPartial?): ArrayTyped { + return readArrayData(v2, SectionPartial.fill(wantSection, v2.shape)) } internal fun readArrayData(v2: Variable, wantSection: Section): ArrayTyped { @@ -348,8 +348,8 @@ class NClibFile(val filename: String) : Netchdf { } } - override fun chunkIterator(v2: Variable, section: SectionPartial?, maxElements : Int?): Iterator> { - val filled = SectionPartial.fill(section, v2.shape) + override fun chunkIterator(v2: Variable, wantSection: SectionPartial?, maxElements : Int?): Iterator> { + val filled = SectionPartial.fill(wantSection, v2.shape) return NCmaxIterator(v2, filled, maxElements ?: 100_000) } diff --git a/testclibs/src/test/kotlin/com/sunya/netchdf/NetchdfClibTest.kt b/testclibs/src/test/kotlin/com/sunya/netchdf/NetchdfClibTest.kt index 4f2c8ade..ce8a7538 100644 --- a/testclibs/src/test/kotlin/com/sunya/netchdf/NetchdfClibTest.kt +++ b/testclibs/src/test/kotlin/com/sunya/netchdf/NetchdfClibTest.kt @@ -7,7 +7,6 @@ import com.sunya.netchdf.hdf4Clib.Hdf4ClibFile import com.sunya.netchdf.hdf5Clib.Hdf5ClibFile import com.sunya.netchdf.netcdfClib.NClibFile import com.sunya.netchdf.testfiles.* -import com.sunya.netchdf.testutils.AtomicDouble import com.sunya.netchdf.testutils.Stats import com.sunya.netchdf.testutils.testData import kotlin.test.* @@ -734,32 +733,29 @@ fun compareIterateNetchdf(myfile: Netchdf, cfile: Netchdf, varname: String?, sec } fun compareOneVarIterate(myvar: Variable<*>, myfile: Netchdf, cvar : Variable<*>, cfile: Netchdf, section: SectionPartial?) { - val sum = AtomicDouble(0.0) - sum.set(0.0) + var sum1 = 0.0 var countChunks = 0 val time1 = measureNanoTime { val chunkIter = myfile.chunkIterator(myvar) for (pair in chunkIter) { if (debugIter) println(" compareOneVarIterate myvar=${myvar.name} ${pair.section} = ${pair.array.shape.contentToString()}") - sumValues(pair.array, sum) + sum1 += sumValues(pair.array) countChunks++ } } Stats.of("netchdf", myfile.location(), "chunk").accum(time1, countChunks) - val sum1 = sum.get() - sum.set(0.0) + var sum2 = 0.0 countChunks = 0 val time2 = measureNanoTime { val chunkIter = cfile.chunkIterator(cvar) for (pair in chunkIter) { if (debugIter) println(" compareOneVarIterate cvar=${cvar.name} ${pair.section} = ${pair.array.shape.contentToString()}") - sumValues(pair.array, sum) + sum2 += sumValues(pair.array) countChunks++ } } Stats.of("nclib", cfile.location(), "chunk").accum(time2, countChunks) - val sum2 = sum.get() if (sum1.isFinite() && sum2.isFinite()) { assertTrue(nearlyEquals(sum1, sum2), "$sum1 != $sum2 sum2") @@ -793,9 +789,10 @@ fun compareOneVarIterate(myvar: Variable<*>, myfile: Netchdf, cvar : Variable<*> } } */ -fun sumValues(array : ArrayTyped<*>, sum : AtomicDouble) { +private fun sumValues(array : ArrayTyped<*>): Double { + var result = 0.0 if (array is ArraySingle || array is ArrayEmpty) { - return // test fillValue the same ?? + return result // test fillValue the same ?? } if (array.datatype.isNumber) { @@ -803,7 +800,7 @@ fun sumValues(array : ArrayTyped<*>, sum : AtomicDouble) { val number = (value as Number) val numberd: Double = number.toDouble() if (numberd.isFinite()) { - sum.getAndAdd(numberd) + result += numberd } } } else if (array.datatype.isIntegral) { @@ -818,10 +815,11 @@ fun sumValues(array : ArrayTyped<*>, sum : AtomicDouble) { val number = (useValue as Number) val numberd: Double = number.toDouble() if (numberd.isFinite()) { - sum.getAndAdd(numberd) + result += numberd } } } + return result } fun countArrayDiffs(array1 : ArrayTyped<*>, array2 : ArrayTyped<*>, showDiff : Int = 0) : Int { diff --git a/testfiles/src/main/kotlin/com/sunya/netchdf/testutils/AtomicDouble.kt b/testfiles/src/main/kotlin/com/sunya/netchdf/testutils/AtomicDouble.kt index c4853964..12fb96f1 100644 --- a/testfiles/src/main/kotlin/com/sunya/netchdf/testutils/AtomicDouble.kt +++ b/testfiles/src/main/kotlin/com/sunya/netchdf/testutils/AtomicDouble.kt @@ -1,10 +1,12 @@ package com.sunya.netchdf.testutils +import kotlin.concurrent.atomics.AtomicLong import kotlin.concurrent.atomics.AtomicReference import kotlin.concurrent.atomics.ExperimentalAtomicApi +// doesnt work, dunno why ?? @OptIn(ExperimentalAtomicApi::class) -class AtomicDouble(initialValue: Double) { +class AtomicDoubleNotWorking(initialValue: Double) { private val atomicReference = AtomicReference(initialValue) fun get(): Double = atomicReference.load() @@ -13,18 +15,47 @@ class AtomicDouble(initialValue: Double) { atomicReference.store(newValue) } + fun getAndAdd(delta: Double): Double { + var ntries = 0 + while (true) { + val currentVal = atomicReference.load() + val newValue = currentVal + delta + if (atomicReference.compareAndSet(currentVal, newValue)) { + return newValue + } + ntries++ + if (ntries > 100) + println(ntries) + } + } +} + +// AtomicLong with Bit Conversions: You can store the double or float value as its bit representation +// (e.g., using Double.doubleToLongBits() and Double.longBitsToDouble()) within an AtomicLong +// and perform atomic operations on the long value. +@OptIn(ExperimentalAtomicApi::class) +class AtomicDouble(initialValue: Double) { + private val atomicReference = AtomicLong(initialValue.toBits()) + + fun get(): Double = Double.fromBits(atomicReference.load()) + + fun set(newValue: Double) { + atomicReference.store(newValue.toBits()) + } + fun getAndAdd(delta: Double): Double { while (true) { - val current = atomicReference.load() - val new = current + delta - if (atomicReference.compareAndSet(current, new)) { - return current + val currentVal = atomicReference.load() + val newDouble = Double.fromBits(currentVal) + delta + val newValue = newDouble.toBits() + if (atomicReference.compareAndSet(currentVal, newValue)) { + return newDouble } } } } -// not quite ready +// not quite ready WHY NOT ? /* import kotlinx.atomicfu.* diff --git a/testfiles/src/test/kotlin/com/sunya/netchdf/CountVersions.kt b/testfiles/src/test/kotlin/com/sunya/netchdf/CountVersions.kt index 10739aa0..22472182 100644 --- a/testfiles/src/test/kotlin/com/sunya/netchdf/CountVersions.kt +++ b/testfiles/src/test/kotlin/com/sunya/netchdf/CountVersions.kt @@ -132,6 +132,52 @@ class CountVersions { .sortedBy { (_, value) -> value } .toMap() - sorted.keys.forEach{ println("${sorted[it]} == $it } files") } + sorted.keys.forEach{ println("${sorted[it]} == $it files") } } + + @Test + fun countLayoutTypes() { + fun h5files(): Iterator { + return sequenceOf( + N4Files.Companion.files().asSequence(), + H5Files.Companion.files().asSequence(), + NetchdfExtraFiles.Companion.files(false).asSequence(), + JhdfFiles.Companion.files().asSequence(), + ) + .flatten() + .iterator() + } + + val layoutCounts = mutableMapOf() + + h5files().forEach { filename -> + try { + openNetchdfFile(filename).use { ncfile -> + if (ncfile == null) { + println("Not a netchdf file=$filename ") + } else { + if (ncfile.type() in listOf("netcdf4", "hdf")) { + val hdf5File = ncfile as Hdf5File + ncfile.rootGroup().allVariables().forEach { v -> + val layout = hdf5File.layoutName(v) + val layoutCount = layoutCounts.getOrPut(layout) { LayoutCount() } + layoutCount.count++ + layoutCount.size += v.nelems + } + } + } + } + } catch (e: Throwable) { + e.printStackTrace() + } + } + + val sorted = layoutCounts.toList() + .sortedBy { (_, value) -> value.size } + .toMap() + + sorted.keys.forEach{ println("${sorted[it]} == '$it'") } + } + + data class LayoutCount(var count: Int = 0, var size: Long = 0) } \ No newline at end of file diff --git a/testfiles/src/test/kotlin/com/sunya/netchdf/NetchdfExtra.kt b/testfiles/src/test/kotlin/com/sunya/netchdf/NetchdfExtra.kt index 1a5b08f0..4a840d6f 100644 --- a/testfiles/src/test/kotlin/com/sunya/netchdf/NetchdfExtra.kt +++ b/testfiles/src/test/kotlin/com/sunya/netchdf/NetchdfExtra.kt @@ -5,8 +5,6 @@ import kotlin.test.* import com.sunya.netchdf.testutils.Stats import com.sunya.netchdf.testutils.readNetchdfData import com.sunya.netchdf.testutils.testData -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.MethodSource // Compare header using cdl(!strict) with Netchdf and NetcdfClibFile // mostly fails in handling of types. nclib doesnt pass over all the types. @@ -47,24 +45,27 @@ class NetchdfExtra { } /////////////////////////////////////////////////////// - @ParameterizedTest - @MethodSource("files") - fun checkVersion(filename: String) { - openNetchdfFile(filename).use { ncfile -> - if (ncfile == null) { - println("Not a netchdf file=$filename ") - return + + @Test + fun checkVersion() { + files().forEach { filename -> + openNetchdfFile(filename).use { ncfile -> + if (ncfile == null) { + println("Not a netchdf file=$filename ") + return + } + println("${ncfile.type()} $filename ") + val paths = versions.getOrPut(ncfile.type()) { mutableListOf() } + paths.add(filename) } - println("${ncfile.type()} $filename ") - val paths = versions.getOrPut(ncfile.type()) { mutableListOf() } - paths.add(filename) } } - @ParameterizedTest - @MethodSource("files") - fun readNetchdfDataAll(filename: String) { - readNetchdfData(filename, null) + @Test + fun readNetchdfDataAll() { + files().forEach { filename -> + readNetchdfData(filename, null) + } } } \ No newline at end of file diff --git a/testfiles/src/test/kotlin/com/sunya/netchdf/hdf5/H5readConcurrentTest.kt b/testfiles/src/test/kotlin/com/sunya/netchdf/hdf5/H5readConcurrentTest.kt index 025ee945..1a6f6d06 100644 --- a/testfiles/src/test/kotlin/com/sunya/netchdf/hdf5/H5readConcurrentTest.kt +++ b/testfiles/src/test/kotlin/com/sunya/netchdf/hdf5/H5readConcurrentTest.kt @@ -1,17 +1,19 @@ +@file:OptIn(ExperimentalAtomicApi::class) + package com.sunya.netchdf.hdf5 -import com.sunya.cdm.api.ArraySection -import com.sunya.cdm.api.Datatype -import com.sunya.cdm.api.Netchdf +import com.sunya.cdm.util.nearlyEquals import com.sunya.cdm.api.Variable -import com.sunya.cdm.api.readChunksConcurrent import com.sunya.cdm.array.ArrayTyped import com.sunya.netchdf.testfiles.H5Files import com.sunya.netchdf.testutils.AtomicDouble import com.sunya.netchdf.testutils.Stats import com.sunya.netchdf.testutils.compareChunkReading +import com.sunya.netchdf.testutils.compareChunkReadingForVar import com.sunya.netchdf.testutils.testData import kotlin.collections.iterator +import kotlin.concurrent.atomics.AtomicInt +import kotlin.concurrent.atomics.ExperimentalAtomicApi import kotlin.system.measureNanoTime import kotlin.test.* @@ -39,7 +41,12 @@ class H5readConcurrentTest { } @Test - fun compareChunkIterateTest() { + fun compareChunkReadingProblem() { + compareChunkReading(testData + "cdmUnitTest/formats/hdf5/HIRDLS/HIRDLS2-AFGL_b027_na.he5", "/HDFEOS/SWATHS/HIRDLS/Data_Fields/Altitude") + } + + @Test + fun compareChunkReadingTest() { files().forEach { filename -> compareChunkReading(filename, null) } @@ -49,7 +56,7 @@ class H5readConcurrentTest { fun testH5readConcurrent() { val filename = "/home/all/testdata/cdmUnitTest/formats/netcdf4/hiig_forec_20140208.nc" val varname = "salt" - Hdf5File(filename).use { myfile -> + Hdf5File(filename).use { myfile : Hdf5File -> println("${myfile.type()} $filename ${myfile.size / 1000.0 / 1000.0} Mbytes") val myvar = myfile.rootGroup().allVariables().find { it.fullname() == varname } @@ -59,10 +66,8 @@ class H5readConcurrentTest { for (nthreads in listOf(1, 2, 4, 8, 10, 16, 20, 24, 32, 40, 48)) { val time = measureNanoTime { - val reader = H5readConcurrent(myfile, myvar) - reader.readChunks(nthreads) { asect: ArraySection<*> -> - // println(" section = ${asect.section}") - } + // fun readChunksConcurrent(v2: Variable, lamda : (ArraySection<*>) -> Unit, done : () -> Unit, nthreads: Int?) { + myfile.readChunksConcurrent(myvar, lamda = { it -> println(" section = ${it.section}") }, { }, nthreads) } println("$nthreads, ${time * nano}") } @@ -80,11 +85,11 @@ fun readH5concurrent(filename: String, varname: String? = null) { if (varname != null) { val myvar = myfile.rootGroup().allVariables().find { it.fullname() == varname } ?: throw RuntimeException("cant find $varname") - countChunks += testOneVarConcurrent(myfile, myvar) + countChunks += compareChunkReadingForVar(myfile, myvar) } else { myfile.rootGroup().allVariables().forEach { it -> if (it.datatype.isNumber) { - countChunks += testOneVarConcurrent(myfile, it) + countChunks += compareChunkReadingForVar(myfile, it) } } } @@ -94,59 +99,63 @@ fun readH5concurrent(filename: String, varname: String? = null) { } } -fun testOneVarConcurrent(myFile: Netchdf, myvar: Variable<*>): Int { - val filename = myFile.location().substringAfterLast('/') - val sum = AtomicDouble(0.0) - var countChunks = 0 +/* +fun testOneVarConcurrent(hdf5: Hdf5File, myvar: Variable<*>): Int { + val filename = hdf5.location().substringAfterLast('/') + + Stats.clear() + + var sumChunkIterator = 0.0 + var countChunkIterator = 0 val time1 = measureNanoTime { - val chunkIter = myFile.chunkIterator(myvar) + val chunkIter = hdf5.chunkIterator(myvar) for (pair in chunkIter) { // println(" ${pair.section} = ${pair.array.shape.contentToString()}") - sumValues(pair.array) - countChunks++ + sumChunkIterator += sumValues(pair.array) + countChunkIterator++ } } - val sum1 = sum.get() - Stats.of("serialSum", filename, "chunk").accum(time1, countChunks) + Stats.of("chunkIterator", filename, "chunk").accum(time1, countChunkIterator) - sum.set(0.0) - val time2 = measureNanoTime { - myFile.readChunksConcurrent(myvar, null) { sumValues(it.array) } - } - val sum2 = sum.get() - Stats.of("concurrentSum", filename, "chunk").accum(time2, countChunks) - - sum.set(0.0) + var sumArrayRead = 0.0 val time3 = measureNanoTime { - val arrayData = myFile.readArrayData(myvar, null) - sumValues(arrayData) + val arrayData = hdf5.readArrayData(myvar, null) + sumArrayRead += sumValues(arrayData) + } + Stats.of("readArrayData", filename, "chunk").accum(time3, 1) + assertTrue(nearlyEquals(sumChunkIterator, sumArrayRead), "sumChunkIterator $sumChunkIterator != $sumArrayRead sumArrayRead") + + val counta = AtomicInt(0) + val suma = AtomicDouble(0.0) + val layout = hdf5.layoutName(myvar) + if (layout == "DataLayoutBTreeVer1") { + val time2 = measureNanoTime { + hdf5.readChunksConcurrent(myvar, { it -> + suma.getAndAdd(sumValues(it.array)) + counta.fetchAndAdd(1) + }, done = { }) + } + Stats.of("concurrentSum", filename, "chunk").accum(time2, counta.load()) + val sumConcurrentRead = suma.get() + assertTrue(nearlyEquals(sumConcurrentRead, sumArrayRead), "sumConcurrentRead $sumConcurrentRead != $sumArrayRead sumArrayRead") } - val sum3 = sum.get() - Stats.of("regularSum", filename, "chunk").accum(time3, countChunks) - - println(" serialSum $time1") - println("concurrentSum $time2") - println(" regularSum $time3") Stats.show() - /* if (sum1.isFinite() && sum2.isFinite() && sum3.isFinite()) { - assertTrue(nearlyEquals(sum1, sum2), "$sum1 != $sum2 sum2") - assertTrue(nearlyEquals(sum1, sum3), "$sum1 != $sum3 sum3") + return countChunkIterator } - */ - return countChunks -} +fun sumValues(array: ArrayTyped<*>): Double { + var result = 0.0 + if (!array.datatype.isNumber) return result -var sum = AtomicDouble(0.0) -fun sumValues(array: ArrayTyped<*>) { - if (!array.datatype.isNumber) return - for (value in array) { + for (value in array) { val number = (value as Number) val numberd: Double = number.toDouble() if (numberd.isFinite()) { - sum.getAndAdd(numberd) + result += numberd } } + return result } +*/ \ No newline at end of file diff --git a/testfiles/src/test/kotlin/com/sunya/netchdf/testutils/ReadChunkIterator.kt b/testfiles/src/test/kotlin/com/sunya/netchdf/testutils/CompareChunkReading.kt similarity index 57% rename from testfiles/src/test/kotlin/com/sunya/netchdf/testutils/ReadChunkIterator.kt rename to testfiles/src/test/kotlin/com/sunya/netchdf/testutils/CompareChunkReading.kt index 476b9c06..e182d067 100644 --- a/testfiles/src/test/kotlin/com/sunya/netchdf/testutils/ReadChunkIterator.kt +++ b/testfiles/src/test/kotlin/com/sunya/netchdf/testutils/CompareChunkReading.kt @@ -1,17 +1,22 @@ +@file:OptIn(ExperimentalAtomicApi::class) + package com.sunya.netchdf.testutils import com.sunya.cdm.api.* import com.sunya.cdm.array.* import com.sunya.cdm.util.nearlyEquals +import com.sunya.netchdf.hdf5.Hdf5File import com.sunya.netchdf.openNetchdfFile import kotlin.collections.iterator +import kotlin.concurrent.atomics.AtomicInt +import kotlin.concurrent.atomics.ExperimentalAtomicApi import kotlin.system.measureNanoTime import kotlin.test.assertTrue ////////////////////////////////////////////////////////////////////////////////////// // compare reading data regular and through the chunkIterate API -fun compareChunkReading(filename: String, varname : String? = null, compare : Boolean = true) { +fun compareChunkReading(filename: String, varname : String? = null) { openNetchdfFile(filename).use { myfile -> if (myfile == null) { println("*** not a netchdf file = $filename") @@ -21,10 +26,10 @@ fun compareChunkReading(filename: String, varname : String? = null, compare : Bo var countChunks = 0 if (varname != null) { val myvar = myfile.rootGroup().allVariables().find { it.fullname() == varname } ?: throw RuntimeException("cant find $varname") - countChunks += compareChunkReadingForVar(myfile, myvar, compare) + countChunks += compareChunkReadingForVar(myfile, myvar) } else { myfile.rootGroup().allVariables().forEach { it -> - countChunks += compareChunkReadingForVar(myfile, it, compare) + countChunks += compareChunkReadingForVar(myfile, it) } } if (countChunks > 0) { @@ -33,8 +38,60 @@ fun compareChunkReading(filename: String, varname : String? = null, compare : Bo } } -// compare readArrayData with chunkIterator -private fun compareChunkReadingForVar(myFile: Netchdf, myvar: Variable<*>, compare : Boolean = true) : Int { +fun compareChunkReadingForVar(myfile: Netchdf, myvar: Variable<*>): Int { + val filename = myfile.location().substringAfterLast('/') + + Stats.clear() + + var sumChunkIterator = 0.0 + var countChunkIterator = 0 + val time1 = measureNanoTime { + val chunkIter = myfile.chunkIterator(myvar) + for (pair in chunkIter) { + // println(" ${pair.section} = ${pair.array.shape.contentToString()}") + sumChunkIterator += sumValues(pair.array) + countChunkIterator++ + } + } + Stats.of("chunkIterator", filename, "chunk").accum(time1, countChunkIterator) + + var sumArrayRead = 0.0 + val time3 = measureNanoTime { + val arrayData = myfile.readArrayData(myvar, null) + sumArrayRead += sumValues(arrayData) + } + Stats.of("readArrayData", filename, "chunk").accum(time3, 1) + assertTrue(nearlyEquals(sumChunkIterator, sumArrayRead), "sumChunkIterator $sumChunkIterator != $sumArrayRead sumArrayRead") + + if (myfile is Hdf5File) { + val hdf5 = myfile as Hdf5File + val counta = AtomicInt(0) + val suma = AtomicDouble(0.0) + val layout = hdf5.layoutName(myvar) + if (layout == "DataLayoutBTreeVer1") { + val time2 = measureNanoTime { + hdf5.readChunksConcurrent(myvar, { it -> + suma.getAndAdd(sumValues(it.array)) + counta.fetchAndAdd(1) + }, done = { }) + } + Stats.of("concurrentSum", filename, "chunk").accum(time2, counta.load()) + val sumConcurrentRead = suma.get() + assertTrue( + nearlyEquals(sumConcurrentRead, sumArrayRead), + "sumConcurrentRead $sumConcurrentRead != $sumArrayRead sumArrayRead" + ) + } + } + + Stats.show() + + return countChunkIterator +} + +/* compare readArrayData with chunkIterator +private fun compareChunkReadingForVar(myFile: Netchdf, myvar: Variable<*>, compare : Boolean = true): Int { + println(" compareChunkReadingForVar ${myvar.nameAndShape()}") val filename = myFile.location().substringAfterLast('/') val varBytes = myvar.nelems if (varBytes >= maxBytes) { @@ -42,32 +99,45 @@ private fun compareChunkReadingForVar(myFile: Netchdf, myvar: Variable<*>, compa return 0 } - val sumReadArray = AtomicDouble(0.0) - val sumArrayData = if (compare) { + var sumArrayData = 0.0 + if (compare) { val time3 = measureNanoTime { val arrayData = myFile.readArrayData(myvar, null) - sumValues(arrayData, sumReadArray) + sumArrayData = sumValues(arrayData) } Stats.of("readArrayData", filename, "chunk").accum(time3, 1) - sumReadArray.get() - } else 0.0 - - val sum2 = AtomicDouble(0.0) - var countChunks = 0 - val time1 = measureNanoTime { - myFile.readChunksConcurrent(myvar, null) { - countChunks++ - sumValues(it.array, sum2) } } - val sumChunkIterator = sum2.get() - if (compare) Stats.of("chunkIterator", filename, "chunk").accum(time1, countChunks) - if (compare && sumChunkIterator.isFinite() && sumArrayData.isFinite()) { - // println(" sumChunkIterator = $sumChunkIterator for ${myvar.nameAndShape()}") - assertTrue(nearlyEquals(sumArrayData, sumChunkIterator), "chunkIterator $sumChunkIterator != $sumArrayData sumArrayData") + var nchunks = 0 + + if (myFile is Hdf5File) { + val layout = myFile.layoutName(myvar) + if (layout == "DataLayoutBTreeVer1") { + val countChunks = AtomicInt(0) + val sumChunkIterator = AtomicDouble(0.0) + val time1 = measureNanoTime { + myFile.readChunksConcurrent( + myvar, + lamda = { + countChunks.fetchAndAdd(1) + sumChunkIterator.getAndAdd(sumValues(it.array)) + }, + done = {}, + 10 + ) + } + nchunks = countChunks.load() + val sumChunksConcurrent = sumChunkIterator.get() + Stats.of("chunkIterator", filename, "chunk").accum(time1, nchunks) + + assertTrue( + nearlyEquals(sumChunksConcurrent, sumArrayData), + "sumChunksConcurrent $sumChunksConcurrent != $sumArrayData sumArrayData for variable '${myvar.fullname()}'" + ) + } } - return countChunks -} + return nchunks +} */ ////////////////////////////////////////////////////////////////////////////////////// // compare reading data chunkIterate API with Netch and NC @@ -97,31 +167,29 @@ fun compareIterateWithNC(myfile: Netchdf, ncfile: Netchdf, varname: String?, sec } private fun compareOneVarIterate(myvar: Variable<*>, myfile: Netchdf, ncvar : Variable<*>, ncfile: Netchdf, section: SectionPartial?) { - val sum = AtomicDouble(0.0) var countChunks = 0 + var sum1 = 0.0 val time1 = measureNanoTime { val chunkIter = myfile.chunkIterator(myvar) for (pair in chunkIter) { // println(" ${pair.section} = ${pair.array.shape.contentToString()}") - sumValues(pair.array, sum) + sum1 += sumValues(pair.array) countChunks++ } } Stats.of("netchdf", myfile.location(), "chunk").accum(time1, countChunks) - val sum1 = sum.get() - sum.set(0.0) countChunks = 0 + var sum2 = 0.0 val time2 = measureNanoTime { val chunkIter = ncfile.chunkIterator(ncvar) for (pair in chunkIter) { // println(" ${pair.section} = ${pair.array.shape.contentToString()}") - sumValues(pair.array, sum) + sum2 += sumValues(pair.array) countChunks++ } } Stats.of("nclib", ncfile.location(), "chunk").accum(time2, countChunks) - val sum2 = sum.get() if (sum1.isFinite() && sum2.isFinite()) { assertTrue(nearlyEquals(sum1, sum2), "$sum1 != $sum2 sum2") @@ -130,9 +198,10 @@ private fun compareOneVarIterate(myvar: Variable<*>, myfile: Netchdf, ncvar : Va } /////////////////////////////////////////////////////////// -private fun sumValues(array : ArrayTyped<*>, sum : AtomicDouble) { +private fun sumValues(array : ArrayTyped<*>): Double { + var result = 0.0 if (array is ArraySingle || array is ArrayEmpty) { - return // test fillValue the same ?? + return result // test fillValue the same ?? } if (array.datatype.isNumber) { @@ -140,7 +209,7 @@ private fun sumValues(array : ArrayTyped<*>, sum : AtomicDouble) { val number = (value as Number) val numberd: Double = number.toDouble() if (numberd.isFinite()) { - sum.getAndAdd(numberd) + result += numberd } } } else if (array.datatype.isIntegral) { @@ -155,10 +224,11 @@ private fun sumValues(array : ArrayTyped<*>, sum : AtomicDouble) { val number = (useValue as Number) val numberd: Double = number.toDouble() if (numberd.isFinite()) { - sum.getAndAdd(numberd) + result += numberd } } } + return result }