1 /*
2  * Copyright 2017 Google Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     https://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package trebuchet.io
18 
19 import kotlin.sequences.iterator
20 
21 class StreamingReader(val source: BufferProducer, val keepLoadedSize: Int = 8096) : GenericByteBuffer {
22     val windows = mutableListOf<Window>()
23 
24     var onWindowReleased: ((Window) -> Unit)? = null
25     var startIndex: Long = 0
26         get private set
27     var endIndex: Long = -1
28         get private set
29     var reachedEof: Boolean = false
30         get private set
31 
getnull32     override operator fun get(index: Long): Byte = windowFor(index)[index]
33     override val length: Long
34         get() = endIndex - startIndex + 1
35 
36     fun windowFor(i: Long): Window {
37         for (wi in 0..windows.size-1) {
38             val window = windows[wi]
39             if (window.globalStartIndex <= i && window.globalEndIndex >= i) {
40                 return window
41             }
42         }
43         throw IndexOutOfBoundsException("$i not in range $startIndex..$endIndex")
44     }
45 
loadIndexnull46     fun loadIndex(index: Long): Boolean {
47         while (endIndex < index && !reachedEof) {
48             val nextBuffer = source.next()
49             if (nextBuffer == null) {
50                 reachedEof = true
51                 return false
52             }
53             addBuffer(nextBuffer)
54         }
55         return index <= endIndex
56     }
57 
iternull58     fun iter(startIndex: Long = 0L): Iterator<DataSlice> {
59         return iterator {
60             for (win in windows) {
61                 if (startIndex <= win.globalStartIndex) {
62                     yield(win.slice)
63                 } else if (startIndex <= win.globalEndIndex) {
64                     yield(win.slice.slice((startIndex - win.globalStartIndex).toInt()))
65                 }
66             }
67 
68             while (!reachedEof) {
69                 val nextBuffer = source.next()
70                 if (nextBuffer != null) {
71                     addBuffer(nextBuffer)
72 
73                     // This variable is a workaround for an apparent bug in the Kotlin
74                     // type system that causes it to handle type inference around yield
75                     // statements incorrectly.
76                     val notNullNextBuffer : DataSlice = nextBuffer
77                     yield(notNullNextBuffer)
78                 } else {
79                     reachedEof = true
80                 }
81             }
82         }
83     }
84 
addBuffernull85     private fun addBuffer(buffer: DataSlice) {
86         windows.add(Window(buffer, endIndex + 1, endIndex + buffer.length))
87         endIndex += buffer.length
88         if (windows.size > 2 && endIndex - windows[1].globalStartIndex > keepLoadedSize) {
89             val temp = windows[0]
90             windows.removeAt(0)
91             startIndex = windows[0].globalStartIndex
92             if (onWindowReleased != null) {
93                 onWindowReleased!!.invoke(temp)
94             }
95         }
96     }
97 
98     class Window(val slice: DataSlice, val globalStartIndex: Long, val globalEndIndex: Long) {
99         @Suppress("NOTHING_TO_INLINE")
getnull100         inline operator fun get(i: Long): Byte = slice[(i - globalStartIndex).toInt()]
101     }
102 
103     fun copyTo(tmpBuffer: ByteArray, lineStartIndex: Long, lineEndIndex: Long) {
104         var srcIndex = lineStartIndex
105         var dstIndex = 0
106         while (srcIndex <= lineEndIndex && dstIndex < tmpBuffer.size) {
107             val window = windowFor(srcIndex)
108             while (srcIndex <= window.globalEndIndex && dstIndex < tmpBuffer.size) {
109                 tmpBuffer[dstIndex++] = window[srcIndex++]
110             }
111         }
112     }
113 }