diff --git a/core/src/commonMain/kotlin/com/sunya/cdm/api/Netchdf.kt b/core/src/commonMain/kotlin/com/sunya/cdm/api/Netchdf.kt index 5c07dd2a..cc67eb5e 100644 --- a/core/src/commonMain/kotlin/com/sunya/cdm/api/Netchdf.kt +++ b/core/src/commonMain/kotlin/com/sunya/cdm/api/Netchdf.kt @@ -18,6 +18,7 @@ interface Netchdf : AutoCloseable { // TODO I think the output type is not always the input type fun readArrayData(v2: Variable, section: SectionPartial? = null) : ArrayTyped + // iterate over all the chunks in section, order is arbitrary. fun chunkIterator(v2: Variable, section: SectionPartial? = null, maxElements : Int? = null) : Iterator> } @@ -25,8 +26,11 @@ interface Netchdf : AutoCloseable { data class ArraySection(val array : ArrayTyped, val section : Section) // Experimental: read concurrently chunks of data, call back with lamda, order is arbitrary. -fun Netchdf.chunkConcurrent(v2: Variable, section: SectionPartial? = null, maxElements : Int? = null, nthreads: Int = 20, - lamda : (ArraySection) -> Unit) { +fun Netchdf.readChunksConcurrent(v2: Variable, + section: SectionPartial? = null, + maxElements : Int? = null, + nthreads: Int = 20, + lamda : (ArraySection) -> Unit) { val reader = ReadChunkConcurrent() val chunkIter = this.chunkIterator( v2, section, maxElements) reader.readChunks(nthreads, chunkIter, lamda) diff --git a/core/src/commonMain/kotlin/com/sunya/cdm/layout/Tiling.kt b/core/src/commonMain/kotlin/com/sunya/cdm/layout/Tiling.kt index 31fece19..4c4291c5 100644 --- a/core/src/commonMain/kotlin/com/sunya/cdm/layout/Tiling.kt +++ b/core/src/commonMain/kotlin/com/sunya/cdm/layout/Tiling.kt @@ -6,38 +6,38 @@ import kotlin.math.min /** * A Tiling divides a multidimensional index space into tiles. * Indices are points in the original multidimensional index space. - * Tiles are points in the tiled space. + * Tiles are points in the "tiled space" ~ varShape / chunkShape * Each tile has the same index size, given by chunk. * * Allows to efficiently find the data chunks that cover an arbitrary section of the variable's index space. * - * @param varshape the variable's shape - * @param chunk actual data storage has this shape. May be larger than the shape, last dim ignored if rank > varshape. + * @param varShape the variable's shape + * @param chunkShape actual data storage has this shape. May be larger than the shape, last dim ignored if rank > varshape. */ -class Tiling(varshape: LongArray, chunkIn: LongArray) { - val chunk = chunkIn.copyOf() +class Tiling(varShape: LongArray, chunkShape: LongArray) { + val chunk = chunkShape.copyOf() val rank: Int val tileShape : LongArray // overall shape of the dataset's tile space - private val indexShape : LongArray // overall shape of the dataset's index space - may be larger than actual variable shape - private val strider : LongArray // for computing tile index + val indexShape : LongArray // overall shape of the dataset's index space - may be larger than actual variable shape + val tileStrider : LongArray // for computing tile index init { // convenient to allow tileSize to have (an) extra dimension at the end // to accommodate hdf5 storage, which has the element size - require(varshape.size <= chunk.size) - rank = varshape.size + require(varShape.size <= chunk.size) + rank = varShape.size this.indexShape = LongArray(rank) for (i in 0 until rank) { - this.indexShape[i] = max(varshape[i], chunk[i]) + this.indexShape[i] = max(varShape[i], chunk[i]) } this.tileShape = LongArray(rank) for (i in 0 until rank) { tileShape[i] = (this.indexShape[i] + chunk[i] - 1) / chunk[i] } - strider = LongArray(rank) + tileStrider = LongArray(rank) var accumStride = 1L for (k in rank - 1 downTo 0) { - strider[k] = accumStride + tileStrider[k] = accumStride accumStride *= tileShape[k] } } @@ -57,27 +57,43 @@ class Tiling(varshape: LongArray, chunkIn: LongArray) { return tile } - /** Compute the minimum index of a tile, inverse of tile(). + /** Compute the left upper index of a tile, inverse of tile(). * This will match a key in the datachunk btree, up to rank dimensions */ fun index(tile: LongArray): LongArray { return LongArray(rank) { idx -> tile[idx] * chunk[idx] } } /** - * Get order based on which tile the pt belongs to - * LOOK you could do this without using the tile + * Get order based on which tile the index pt belongs to. + * This is the linear ordering of the tile. * - * @param pt index point + * @param index index point * @return order number based on which tile the pt belongs to */ - fun order(pt: LongArray): Long { - val tile = tile(pt) + fun order(index: LongArray): Long { + val tile = tile(index) var order = 0L - val useRank = min(rank, pt.size) // eg varlen (datatype 9) has mismatch - for (i in 0 until useRank) order += strider[i] * tile[i] + val useRank = min(rank, index.size) // eg varlen (datatype 9) has mismatch + for (i in 0 until useRank) order += tileStrider[i] * tile[i] return order } + /** inverse of order() */ + fun orderToIndex(order: Long) : LongArray { + // calculate tile + val tile = LongArray(rank) + var rem = order + + for (k in 0 until rank) { + tile[k] = rem / tileStrider[k] + rem = rem - (tile[k] * tileStrider[k]) + } + print("tile $order = ${tile.contentToString()}") + + // convert to index + return index(tile) + } + /** * Create an ordering of index points based on which tile the point is in. * diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1ext.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1ext.kt new file mode 100644 index 00000000..78479d3f --- /dev/null +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1ext.kt @@ -0,0 +1,114 @@ +@file:OptIn(InternalLibraryApi::class) + +package com.sunya.netchdf.hdf5 + +import com.sunya.cdm.iosp.OpenFileState +import com.sunya.cdm.layout.Tiling +import com.sunya.cdm.util.InternalLibraryApi +import kotlin.collections.mutableListOf + +/** a BTree1 that uses OpenFileExtended */ +internal class BTree1ext( + val raf: OpenFileExtended, + val rootNodeAddress: Long, + val nodeType : Int, // 0 = group/symbol table, 1 = raw data chunks + varShape: LongArray, + chunkShape: LongArray, +) { + val tiling = Tiling(varShape, chunkShape) + val ndimStorage = chunkShape.size + + init { + // println(" BTreeNode varShape ${varShape.contentToString()} chunkShape ${chunkShape.contentToString()}") + require (nodeType == 1) + } + + fun rootNode(): BTreeNode = BTreeNode(rootNodeAddress, null) + + // here both internal and leaf are the same structure + // Btree nodes Level 1A1 - Version 1 B-trees + inner class BTreeNode(val address: Long, val parent: BTreeNode?) { + val level: Int + val nentries: Int + private val leftAddress: Long + private val rightAddress: Long + + // type 1 + val keys = mutableListOf() + val values = mutableListOf() + val children = mutableListOf() + + init { + val state = OpenFileState(raf.getFileOffset(address), false) + val magic: String = raf.readString(state, 4) + check(magic == "TREE") { "DataBTree doesnt start with TREE" } + + val type: Int = raf.readByte(state).toInt() + check(type == nodeType) { "DataBTree must be type $nodeType" } + + level = raf.readByte(state).toInt() // leaf nodes are level 0 + nentries = raf.readShort(state).toInt() // number of children to which this node points + leftAddress = raf.readOffset(state) + rightAddress = raf.readOffset(state) + + // println(" BTreeNode level $level nentries $nentries") + + for (idx in 0 until nentries) { + val chunkSize = raf.readInt(state) + val filterMask = raf.readInt(state) + val inner = LongArray(ndimStorage) { j -> raf.readLong(state) } + val key = DataChunkKey(chunkSize, filterMask, inner) + val childPointer = raf.readAddress(state) // 4 or 8 bytes, then add fileOffset + if (level == 0) { + keys.add(inner) + values.add(DataChunkEntry1(level, this, idx, key, childPointer)) + } else { + children.add(BTreeNode(childPointer, this)) + } + } + + // note there may be unused entries, "All nodes of a particular type of tree have the same maximum degree, + // but most nodes will point to less than that number of children"" + } + + // return only the leaf nodes, in any order + fun asSequence(): Sequence> = sequence { + // Handle child nodes recursively (in-order traversal) + if (children.isNotEmpty()) { + children.forEachIndexed { index, childNode -> + yieldAll(childNode.asSequence()) // Yield all elements from the child + } + } else { // If it's a leaf node (no children) + keys.forEachIndexed { index, key -> + yield(tiling.order(key) to values[index]) // Yield all key-value pairs + } + } + } + } + + data class DataChunkKey(val chunkSize: Int, val filterMask : Int, val offsets: LongArray) { + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is DataChunkKey) return false + if (!offsets.contentEquals(other.offsets)) return false + return true + } + + override fun hashCode(): Int { + return offsets.contentHashCode() + } + } + + // childAddress = data chunk (level 1) else a child node + data class DataChunkEntry1(val level : Int, val parent : BTreeNode, val idx : Int, val key : DataChunkKey, val childAddress : Long) : DataChunkIF { + override fun childAddress() = childAddress + override fun offsets() = key.offsets + override fun isMissing() = (childAddress == -1L) + override fun chunkSize() = key.chunkSize + override fun filterMask() = key.filterMask + + override fun show(tiling : Tiling) : String = "chunkSize=${key.chunkSize}, chunkStart=${key.offsets.contentToString()}" + + ", tile= ${tiling.tile(key.offsets).contentToString()} idx=$idx" + } +} + diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree2j.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree2j.kt index b7d6021c..343e5468 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree2j.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree2j.kt @@ -98,10 +98,6 @@ internal class BTree2j(private val h5: H5builder, owner: String, address: Long, readRecords(childAddress, depth - 1, numberOfChildRecords, totalNumberOfChildRecords) } } - - // bb.limit(bb.position() + 4); - // bb.rewind(); - // ChecksumUtils.validateChecksum(bb); } // heroic jhdf diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5TiledData1.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5TiledData1.kt index e23fb11c..3cf97b3a 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5TiledData1.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5TiledData1.kt @@ -4,7 +4,7 @@ import com.sunya.cdm.layout.IndexSpace import com.sunya.cdm.layout.IndexND import com.sunya.cdm.layout.Tiling -/** wraps BTree1 and BTree2 to handle iterating through tiled data (aka chunked data) */ +/** wraps BTree1 to handle iterating through tiled data (aka chunked data) */ internal class H5TiledData1(val btree : BTree1, val varShape: LongArray, val chunkShape: LongArray) { private val check = true private val debug = false diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5builder.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5builder.kt index e83ed993..45cafd5b 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5builder.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5builder.kt @@ -32,7 +32,7 @@ class H5builder( ) { val raf: OpenFileIF - private val superblockStart: Long // may be offset for arbitrary metadata + val superblockStart: Long // may be offset for arbitrary metadata var sizeOffsets: Int = 0 var sizeLengths: Int = 0 var sizeHeapId = 0 diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkIterator.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkIterator.kt index 5ece17c5..7a790a82 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkIterator.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkIterator.kt @@ -2,6 +2,7 @@ package com.sunya.netchdf.hdf5 import com.sunya.cdm.api.* import com.sunya.cdm.array.ArrayTyped +import com.sunya.cdm.iosp.OpenFileIF import com.sunya.cdm.iosp.OpenFileState import com.sunya.cdm.layout.Chunker import com.sunya.cdm.layout.IndexSpace @@ -9,7 +10,7 @@ import com.sunya.cdm.layout.transferMissingNelems import com.sunya.cdm.util.InternalLibraryApi // TODO assumes BTree1, could it include BTree2? any chunked reader ? -// only used in Netchdf.chunkConcurrent +// only used in Netchdf.readChunksConcurrent @OptIn(InternalLibraryApi::class) internal class H5chunkIterator(val h5 : H5builder, val v2: Variable, val wantSection : Section) : AbstractIterator>() { @@ -31,8 +32,8 @@ internal class H5chunkIterator(val h5 : H5builder, val v2: Variable, val w elemSize = vinfo.storageDims[vinfo.storageDims.size - 1].toInt() // last one is always the elements size datatype = h5type.datatype() - val btreeNew = BTree1(h5, vinfo.dataPos, 1, vinfo.storageDims.size) - tiledData = H5TiledData1(btreeNew, v2.shape, vinfo.storageDims) + val btree1 = BTree1(h5, vinfo.dataPos, 1, vinfo.storageDims.size) + tiledData = H5TiledData1(btree1, v2.shape, vinfo.storageDims) filters = FilterPipeline(v2.name, vinfo.mfp, h5type.isBE) if (debugChunking) println(" H5chunkIterator tiles=${tiledData.tiling}") @@ -87,3 +88,26 @@ internal class H5chunkIterator(val h5 : H5builder, val v2: Variable, val w return ArraySection(array, intersectSpace.section(v2.shape)) // LOOK use space instead of Section ?? } } + +// for H5readConcurrent +class OpenFileExtended(val delegate: OpenFileIF, + val isLengthLong: Boolean, + val isOffsetLong: Boolean, + val startingOffset: Long, ) : OpenFileIF by delegate { + + fun readLength(state : OpenFileState): Long { + return if (isLengthLong) delegate.readLong(state) else delegate.readInt(state).toLong() + } + + fun readOffset(state : OpenFileState): Long { + return if (isOffsetLong) delegate.readLong(state) else delegate.readInt(state).toLong() + } + + fun getFileOffset(address: Long): Long { + return startingOffset + address + } + + fun readAddress(state : OpenFileState): Long { + return getFileOffset(readOffset(state)) + } +} diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkReader.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkReader.kt index d5d0d73d..cfe66f5a 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkReader.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkReader.kt @@ -73,12 +73,12 @@ internal class H5chunkReader(val h5 : H5builder) { } val ba = ByteArray(sizeBytes.toInt()) - val btree = if (vinfo.mdl is DataLayoutBTreeVer1) + val btree1 = if (vinfo.mdl is DataLayoutBTreeVer1) BTree1(h5, vinfo.dataPos, 1, vinfo.storageDims.size) else throw RuntimeException("Unsupprted mdl ${vinfo.mdl}") - val tiledData = H5TiledData1(btree, v2.shape, vinfo.storageDims) + val tiledData = H5TiledData1(btree1, v2.shape, vinfo.storageDims) val filters = FilterPipeline(v2.name, vinfo.mfp, vinfo.h5type.isBE) if (debugChunking) println(" readChunkedData tiles=${tiledData.tiling}") diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5readConcurrent.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5readConcurrent.kt new file mode 100644 index 00000000..3f2b6ce8 --- /dev/null +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5readConcurrent.kt @@ -0,0 +1,150 @@ +@file:OptIn(InternalLibraryApi::class) + +package com.sunya.netchdf.hdf5 + +import com.sunya.cdm.api.ArraySection +import com.sunya.cdm.api.Datatype +import com.sunya.cdm.api.Variable +import com.sunya.cdm.api.computeSize +import com.sunya.cdm.api.toIntArray +import com.sunya.cdm.api.toLongArray +import com.sunya.cdm.iosp.OkioFile +import com.sunya.cdm.iosp.OpenFileIF +import com.sunya.cdm.iosp.OpenFileState +import com.sunya.cdm.layout.IndexSpace +import com.sunya.cdm.layout.Tiling +import com.sunya.cdm.layout.transferMissingNelems +import com.sunya.cdm.util.InternalLibraryApi +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.produce +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.yield + +class H5readConcurrent(val h5file: Hdf5File, val v2: Variable<*>) { + val h5 = h5file.header + val varShape = v2.shape + val chunkShape: IntArray + val tiling: Tiling + val nchunks: Long + internal val rootNode: BTree1ext.BTreeNode + val rootAddress: Long + + init { + require(v2.spObject is DataContainerVariable) + val vinfo = v2.spObject + require(vinfo.mdl is DataLayoutBTreeVer1) + val mdl = vinfo.mdl + chunkShape = mdl.chunkDims + tiling = Tiling(varShape, chunkShape.toLongArray()) + nchunks = tiling.tileShape.computeSize() + val h5 = h5file.header + + val rafext: OpenFileExtended = OpenFileExtended( + h5.raf, + h5.isLengthLong, h5.isOffsetLong, h5.superblockStart, + ) + + val bTreeExt = BTree1ext(rafext, mdl.btreeAddress, 1, varShape, chunkShape.toLongArray()) + rootNode = bTreeExt.rootNode() + rootAddress = mdl.btreeAddress + } + + fun readChunks(nthreads: Int, lamda: (ArraySection<*>) -> Unit) { + runBlocking { + val jobs = mutableListOf() + + val chunkProducer = produceChunks(rootNode.asSequence()) + repeat(nthreads) { + val worker = Worker(h5file.filename) + jobs.add( launchJob(worker, chunkProducer, lamda)) + } + + // wait for all jobs to be done, then close everything + joinAll(*jobs.toTypedArray()) + } + } + + private var count = 0 + private fun CoroutineScope.produceChunks(producer: Sequence>): ReceiveChannel> = + produce { + for (datatChunk in producer) { + send(datatChunk) + yield() + count++ + } + channel.close() + } + + private fun CoroutineScope.launchJob( + worker: Worker, + input: ReceiveChannel>, + lamda: (ArraySection<*>) -> Unit, + ) = launch(Dispatchers.Default) { + for (pair: Pair in input) { + val arraySection = worker.work(pair.second) + lamda(arraySection) + yield() + } + } + + private inner class Worker(filename: String) { + private val raf: OpenFileIF = OkioFile(filename) + private val rafext: OpenFileExtended = OpenFileExtended( + raf, + h5.isLengthLong, h5.isOffsetLong, h5.superblockStart, + ) + + // a thread-safe accessor of the btree + private val btree1 = BTree1ext(rafext, rootAddress, 1, varShape, chunkShape.toLongArray()) + + val vinfo: DataContainerVariable = v2.spObject as DataContainerVariable + val h5type: H5TypeInfo + val elemSize: Int + val datatype: Datatype<*> + val filters: FilterPipeline + val state: OpenFileState + + init { + h5type = vinfo.h5type + elemSize = vinfo.storageDims[vinfo.storageDims.size - 1].toInt() // last one is always the elements size + datatype = h5type.datatype() + + filters = FilterPipeline(v2.name, vinfo.mfp, h5type.isBE) + + state = OpenFileState(0L, h5type.isBE) + } + + fun work(dataChunk : DataChunkIF) : ArraySection<*> { + val dataSpace = IndexSpace(v2.rank, dataChunk.offsets(), vinfo.storageDims) + + val ba = if (dataChunk.isMissing()) { + if (debugChunking) println(" missing ${dataChunk.show(tiling)}") + val sizeBytes = dataSpace.totalElements * elemSize + val bbmissing = ByteArray(sizeBytes.toInt()) + transferMissingNelems(vinfo.fillValue, dataSpace.totalElements.toInt(), bbmissing, 0) + if (debugChunking) println(" missing transfer ${dataSpace.totalElements} fillValue=${vinfo.fillValue}") + bbmissing + } else { + if (debugChunking) println(" chunkIterator=${dataChunk.show(tiling)}") + state.pos = dataChunk.childAddress() + val rawdata = h5.raf.readByteArray(state, dataChunk.chunkSize()) + if (dataChunk.filterMask() == null) rawdata else filters.apply(rawdata, dataChunk.filterMask()!!) + } + + val array = if (h5type.datatype5 == Datatype5.Vlen) { + // internal fun H5builder.processVlenIntoArray(h5type: H5TypeInfo, shape: IntArray, ba: ByteArray, nelems: Int, elemSize : Int): ArrayTyped { + h5.processVlenIntoArray(h5type, dataSpace.shape.toIntArray(), ba, dataSpace.totalElements.toInt(), elemSize) + } else { + h5.processDataIntoArray(ba, h5type.isBE, datatype, dataSpace.shape.toIntArray(), h5type, elemSize) + } + + return ArraySection(array, dataSpace.section(v2.shape)) + } + } + val debugChunking = false +} \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/Hdf5File.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/Hdf5File.kt index ae8b367d..4adf7a5d 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/Hdf5File.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/Hdf5File.kt @@ -101,7 +101,7 @@ class Hdf5File(val filename : String, strict : Boolean = false) : Netchdf { } } - // Netchdf.chunkConcurrent + // for Netchdf.readChunksConcurrent override fun chunkIterator(v2: Variable, section: SectionPartial?, maxElements : Int?) : Iterator> { if (v2.nelems == 0L) { return listOf>().iterator() @@ -115,6 +115,7 @@ class Hdf5File(val filename : String, strict : Boolean = false) : Netchdf { return listOf(single).iterator() } + // TODO return if (vinfo.mdl is DataLayoutBTreeVer1) { H5chunkIterator(header, v2, wantSection) } else { diff --git a/core/src/commonTest/kotlin/com/sunya/cdm/layout/TestTiling.kt b/core/src/commonTest/kotlin/com/sunya/cdm/layout/TestTiling.kt index f635d84f..2fbaf455 100644 --- a/core/src/commonTest/kotlin/com/sunya/cdm/layout/TestTiling.kt +++ b/core/src/commonTest/kotlin/com/sunya/cdm/layout/TestTiling.kt @@ -1,6 +1,7 @@ package com.sunya.cdm.layout import com.sunya.cdm.api.TestSection +import com.sunya.cdm.api.computeSize import kotlin.test.* import kotlin.test.assertEquals import kotlin.test.assertTrue @@ -8,6 +9,82 @@ import kotlin.test.assertTrue /** Test [com.sunya.cdm.layout.Tiling] */ class TestTiling { + @Test + fun basics() { + val varshape = longArrayOf(4, 6, 20) + val chunk = longArrayOf(1, 3, 20) + val tiling = Tiling(varshape, chunk) + + assertEquals(3, tiling.rank) // max(var, chunk) + checkEquals(longArrayOf(4, 6, 20), tiling.indexShape) // max(var, chunk) + checkEquals(longArrayOf(4, 2, 1), tiling.tileShape) // indexShape / chunk + checkEquals(longArrayOf(2, 1, 1), tiling.tileStrider) // accumStride *= tileShape[k] + + val tiling2 = Tiling(longArrayOf(10, 100, 1000), + longArrayOf(2, 4, 20)) + + assertEquals(3, tiling2.rank) // max(var, chunk) + checkEquals(longArrayOf(10, 100, 1000), tiling2.indexShape) // max(var, chunk) + checkEquals(longArrayOf(5, 25, 50), tiling2.tileShape) // indexShape / chunk + checkEquals(longArrayOf(1250, 50, 1), tiling2.tileStrider) // accumStride *= tileShape[k] + } + + @Test + fun uneven() { + val varshape = longArrayOf(4, 6, 25) + val chunk = longArrayOf(1, 3, 20) + val tiling = Tiling(varshape, chunk) + + assertEquals(3, tiling.rank) + checkEquals(longArrayOf(4, 6, 25), tiling.indexShape) // max(var, chunk) + checkEquals(longArrayOf(4, 2, 2), tiling.tileShape) // indexShape / chunk + checkEquals(longArrayOf(4, 2, 1), tiling.tileStrider) // accumStride *= tileShape[k] + } + + @Test + fun chunkbigger() { + val varshape = longArrayOf(4, 6, 20) + val chunk = longArrayOf(1, 3, 22) + val tiling = Tiling(varshape, chunk) + + assertEquals(3, tiling.rank) + checkEquals(longArrayOf(4, 6, 22), tiling.indexShape) // max(var, chunk) + checkEquals(longArrayOf(4, 2, 1), tiling.tileShape) // indexShape / chunk + checkEquals(longArrayOf(2, 1, 1), tiling.tileStrider) // accumStride *= tileShape[k] + } + + @Test + fun chunkextradim() { + val varshape = longArrayOf(4, 6, 20) + val chunk = longArrayOf(1, 3, 22, 12) + val tiling = Tiling(varshape, chunk) + + assertEquals(3, tiling.rank) + checkEquals(longArrayOf(4, 6, 22), tiling.indexShape) // max(var, chunk) + checkEquals(longArrayOf(4, 2, 1), tiling.tileShape) // indexShape / chunk + checkEquals(longArrayOf(2, 1, 1), tiling.tileStrider) // accumStride *= tileShape[k] + } + + @Test + fun testOrder() { + val varshape = longArrayOf(4, 6, 20) + val chunk = longArrayOf(1, 3, 5) + val tiling = Tiling(varshape, chunk) + + assertEquals(3, tiling.rank) // max(var, chunk) + checkEquals(longArrayOf(4, 6, 20), tiling.indexShape) // max(var, chunk) + checkEquals(longArrayOf(4, 2, 4), tiling.tileShape) // indexShape / chunk + checkEquals(longArrayOf(8, 4, 1), tiling.tileStrider) // accumStride *= tileShape[k] + + val ntiles = tiling.tileShape.computeSize() + + for (order in 0 until ntiles) { + val index = tiling.orderToIndex(order) + println(" index $order = ${index.contentToString()}") + assertEquals( order, tiling.order(index)) + } + } + @Test fun testTiling() { val varshape = longArrayOf(4, 6, 20) diff --git a/core/src/commonTest/kotlin/com/sunya/netchdf/hdf5/Btree1extTest.kt b/core/src/commonTest/kotlin/com/sunya/netchdf/hdf5/Btree1extTest.kt new file mode 100644 index 00000000..974fc46d --- /dev/null +++ b/core/src/commonTest/kotlin/com/sunya/netchdf/hdf5/Btree1extTest.kt @@ -0,0 +1,58 @@ +@file:OptIn(InternalLibraryApi::class) + +package com.sunya.netchdf.hdf5 + +import com.sunya.cdm.api.computeSize +import com.sunya.cdm.api.toLongArray +import com.sunya.cdm.iosp.OkioFile +import com.sunya.cdm.iosp.OpenFileIF +import com.sunya.cdm.layout.Tiling +import com.sunya.cdm.util.InternalLibraryApi +import kotlin.test.Test + +class Btree1extTest { + + @Test + fun testBTree1ext() { + val filename = "/home/all/testdata/cdmUnitTest/formats/netcdf4/hiig_forec_20140208.nc" + val varname = "salt" + Hdf5File(filename).use { myfile -> + println("${myfile.type()} $filename ${myfile.size / 1000.0 / 1000.0} Mbytes") + // println(myfile.cdl()) + + val h5 = myfile.header + + var countChunks = 0 + val myvar = myfile.rootGroup().allVariables().find { it.fullname() == varname } + ?: throw RuntimeException("cant find $varname") + + val raf: OpenFileIF = OkioFile(filename) + val rafext: OpenFileExtended = OpenFileExtended( + raf, + h5.isLengthLong, h5.isOffsetLong, h5.superblockStart, + ) + + val varShape = myvar.shape + + require(myvar.spObject is DataContainerVariable) + val vinfo = myvar.spObject + require(vinfo.mdl is DataLayoutBTreeVer1) + val mdl = vinfo.mdl + val chunkShape = mdl.chunkDims + val tiling = Tiling(varShape, chunkShape.toLongArray()) + val nchunks = tiling.tileShape.computeSize() + + // a thread-safe accessor of the btree + // val raf: OpenFileExtended, + // val rootNodeAddress: Long, + // val nodeType : Int, // 0 = group/symbol table, 1 = raw data chunks + // varShape: LongArray, + // chunkShape: LongArray, + val bTreeExt = BTree1ext(rafext, mdl.btreeAddress, 1, varShape, chunkShape.toLongArray()) + val rootNode = bTreeExt.rootNode() + + rootNode.asSequence().forEach { (key, value) -> println("Key: ${key}, Value: $value") } + } + } + +} diff --git a/testfiles/src/test/kotlin/com/sunya/netchdf/CountVersions.kt b/testfiles/src/test/kotlin/com/sunya/netchdf/CountVersions.kt index d5998228..5feafab5 100644 --- a/testfiles/src/test/kotlin/com/sunya/netchdf/CountVersions.kt +++ b/testfiles/src/test/kotlin/com/sunya/netchdf/CountVersions.kt @@ -29,7 +29,7 @@ class CountVersions { val filenames = mutableMapOf>() val showAllFiles = false - fun afterAll() { + fun showVersions() { println("*** nfiles = ${filenames.size}") var dups = 0 filenames.keys.sorted().forEach { @@ -74,6 +74,58 @@ class CountVersions { } } - afterAll() + showVersions() + } + + @Test + fun sortFileSizes() { + val fileSizes = mutableMapOf() + + files().forEach { filename -> + try { + openNetchdfFile(filename).use { ncfile -> + if (ncfile == null) { + println("Not a netchdf file=$filename ") + } else { + fileSizes[filename ] = ncfile.size + } + } + } catch (e: Throwable) { + e.printStackTrace() + } + } + + val sorted = fileSizes.toList() + .sortedBy { (_, value) -> value } + .toMap() + + sorted.keys.forEach{ println("${sorted[it]} == $it } files") } + } + + @Test + fun sortVarSizes() { + val varSizes = mutableMapOf() + + files().forEach { filename -> + try { + openNetchdfFile(filename).use { ncfile -> + if (ncfile == null) { + println("Not a netchdf file=$filename ") + } else { + ncfile.rootGroup().allVariables().forEach { v -> + varSizes["$filename#${v.name}" ] = v.nelems + } + } + } + } catch (e: Throwable) { + e.printStackTrace() + } + } + + val sorted = varSizes.toList() + .sortedBy { (_, value) -> value } + .toMap() + + sorted.keys.forEach{ println("${sorted[it]} == $it } files") } } } \ No newline at end of file diff --git a/testfiles/src/test/kotlin/com/sunya/netchdf/hdf5/H5readConcurrentTest.kt b/testfiles/src/test/kotlin/com/sunya/netchdf/hdf5/H5readConcurrentTest.kt index e87cb8c8..6dcbcabe 100644 --- a/testfiles/src/test/kotlin/com/sunya/netchdf/hdf5/H5readConcurrentTest.kt +++ b/testfiles/src/test/kotlin/com/sunya/netchdf/hdf5/H5readConcurrentTest.kt @@ -1,13 +1,15 @@ package com.sunya.netchdf.hdf5 +import com.sunya.cdm.api.ArraySection +import com.sunya.cdm.api.Datatype import com.sunya.cdm.api.Netchdf import com.sunya.cdm.api.Variable -import com.sunya.cdm.api.chunkConcurrent +import com.sunya.cdm.api.readChunksConcurrent import com.sunya.cdm.array.ArrayTyped import com.sunya.netchdf.testfiles.H5Files import com.sunya.netchdf.testutils.AtomicDouble import com.sunya.netchdf.testutils.Stats -import com.sunya.netchdf.testutils.compareNetchIterate +import com.sunya.netchdf.testutils.compareChunkReading import com.sunya.netchdf.testutils.testData import kotlin.collections.iterator import kotlin.system.measureNanoTime @@ -25,29 +27,56 @@ class H5readConcurrentTest { } @Test - fun problemWrf() { - readH5concurrent(testData + "cdmUnitTest/formats/hdf5/wrf/wrf_out_par.h5") + fun sanity() { + readH5concurrent(testData + "cdmUnitTest/formats/netcdf4/hiig_forec_20140208.nc", "salt") } @Test - fun testReadIterate() { + fun testReadConcurrent() { files().forEach { filename -> - compareNetchIterate(filename, null) + readH5concurrent(filename, null) } } @Test - fun testReadConcurrent() { + fun compareChunkIterateTest() { files().forEach { filename -> - readH5concurrent(filename, null) + compareChunkReading(filename, null) + } + } + + + @Test + fun testH5readConcurrent() { + val filename = "/home/all/testdata/cdmUnitTest/formats/netcdf4/hiig_forec_20140208.nc" + val varname = "salt" + Hdf5File(filename).use { myfile -> + println("${myfile.type()} $filename ${myfile.size / 1000.0 / 1000.0} Mbytes") + + val myvar = myfile.rootGroup().allVariables().find { it.fullname() == varname } + ?: throw RuntimeException("cant find $varname") + + println("nthreads, time in secs") + + for (nthreads in listOf(1, 2, 4, 8, 10, 16, 20, 24, 32, 40, 48)) { + val time = measureNanoTime { + val reader = H5readConcurrent(myfile, myvar) + reader.readChunks(nthreads) { asect: ArraySection<*> -> + // println(" section = ${asect.section}") + } + } + println("$nthreads, ${time * nano}") + } } } } +val nano = 1.0e-9 fun readH5concurrent(filename: String, varname: String? = null) { Hdf5File(filename).use { myfile -> println("${myfile.type()} $filename ${myfile.size / 1000.0 / 1000.0} Mbytes") + println(myfile.cdl()) var countChunks = 0 if (varname != null) { val myvar = myfile.rootGroup().allVariables().find { it.fullname() == varname } @@ -83,7 +112,7 @@ fun testOneVarConcurrent(myFile: Netchdf, myvar: Variable<*>): Int { sum.set(0.0) val time2 = measureNanoTime { - myFile.chunkConcurrent(myvar, null) { sumValues(it.array) } + myFile.readChunksConcurrent(myvar, null) { sumValues(it.array) } } val sum2 = sum.get() Stats.of("concurrentSum", filename, "chunk").accum(time2, countChunks) @@ -96,6 +125,12 @@ fun testOneVarConcurrent(myFile: Netchdf, myvar: Variable<*>): Int { val sum3 = sum.get() Stats.of("regularSum", filename, "chunk").accum(time3, countChunks) + println(" serialSum $time1") + println("concurrentSum $time2") + println(" regularSum $time3") + + Stats.show() + /* if (sum1.isFinite() && sum2.isFinite() && sum3.isFinite()) { assertTrue(nearlyEquals(sum1, sum2), "$sum1 != $sum2 sum2") assertTrue(nearlyEquals(sum1, sum3), "$sum1 != $sum3 sum3") @@ -107,8 +142,8 @@ fun testOneVarConcurrent(myFile: Netchdf, myvar: Variable<*>): Int { var sum = AtomicDouble(0.0) fun sumValues(array: ArrayTyped<*>) { - if (!array.datatype.isNumber or true) return - for (value in array) { + if (!array.datatype.isNumber) return + for (value in array) { val number = (value as Number) val numberd: Double = number.toDouble() if (numberd.isFinite()) { diff --git a/testfiles/src/test/kotlin/com/sunya/netchdf/testutils/ReadIterator.kt b/testfiles/src/test/kotlin/com/sunya/netchdf/testutils/ReadChunkIterator.kt similarity index 90% rename from testfiles/src/test/kotlin/com/sunya/netchdf/testutils/ReadIterator.kt rename to testfiles/src/test/kotlin/com/sunya/netchdf/testutils/ReadChunkIterator.kt index f12d03ea..476b9c06 100644 --- a/testfiles/src/test/kotlin/com/sunya/netchdf/testutils/ReadIterator.kt +++ b/testfiles/src/test/kotlin/com/sunya/netchdf/testutils/ReadChunkIterator.kt @@ -11,7 +11,7 @@ import kotlin.test.assertTrue ////////////////////////////////////////////////////////////////////////////////////// // compare reading data regular and through the chunkIterate API -fun compareNetchIterate(filename: String, varname : String? = null, compare : Boolean = true) { +fun compareChunkReading(filename: String, varname : String? = null, compare : Boolean = true) { openNetchdfFile(filename).use { myfile -> if (myfile == null) { println("*** not a netchdf file = $filename") @@ -21,10 +21,10 @@ fun compareNetchIterate(filename: String, varname : String? = null, compare : Bo var countChunks = 0 if (varname != null) { val myvar = myfile.rootGroup().allVariables().find { it.fullname() == varname } ?: throw RuntimeException("cant find $varname") - countChunks += compareOneVarIterate(myfile, myvar, compare) + countChunks += compareChunkReadingForVar(myfile, myvar, compare) } else { myfile.rootGroup().allVariables().forEach { it -> - countChunks += compareOneVarIterate(myfile, it, compare) + countChunks += compareChunkReadingForVar(myfile, it, compare) } } if (countChunks > 0) { @@ -34,7 +34,7 @@ fun compareNetchIterate(filename: String, varname : String? = null, compare : Bo } // compare readArrayData with chunkIterator -private fun compareOneVarIterate(myFile: Netchdf, myvar: Variable<*>, compare : Boolean = true) : Int { +private fun compareChunkReadingForVar(myFile: Netchdf, myvar: Variable<*>, compare : Boolean = true) : Int { val filename = myFile.location().substringAfterLast('/') val varBytes = myvar.nelems if (varBytes >= maxBytes) { @@ -42,25 +42,22 @@ private fun compareOneVarIterate(myFile: Netchdf, myvar: Variable<*>, compare : return 0 } - val sum1 = AtomicDouble(0.0) + val sumReadArray = AtomicDouble(0.0) val sumArrayData = if (compare) { val time3 = measureNanoTime { val arrayData = myFile.readArrayData(myvar, null) - sumValues(arrayData, sum1) + sumValues(arrayData, sumReadArray) } Stats.of("readArrayData", filename, "chunk").accum(time3, 1) - sum1.get() + sumReadArray.get() } else 0.0 val sum2 = AtomicDouble(0.0) var countChunks = 0 val time1 = measureNanoTime { - val chunkIter = myFile.chunkIterator(myvar) - for (pair in chunkIter) { - // println(" ${pair.section} = ${pair.array.shape.contentToString()}") - sumValues(pair.array, sum2) + myFile.readChunksConcurrent(myvar, null) { countChunks++ - } + sumValues(it.array, sum2) } } val sumChunkIterator = sum2.get() if (compare) Stats.of("chunkIterator", filename, "chunk").accum(time1, countChunks)