Refactor M3U8 segment handling to avoid loading the entire segment into memory

This commit is contained in:
Cuong-Tran
2026-05-31 15:31:49 +07:00
parent fdb44891bc
commit b39f14d702
2 changed files with 79 additions and 71 deletions

View File

@@ -13,8 +13,6 @@ import org.nanohttpd.protocols.http.response.Response
import org.nanohttpd.protocols.http.response.Response.newChunkedResponse
import org.nanohttpd.protocols.http.response.Response.newFixedLengthResponse
import org.nanohttpd.protocols.http.response.Status
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.IOException
import java.net.URLEncoder
@@ -98,11 +96,13 @@ class M3u8HttpServer(
}
private fun handleSegmentRequest(session: IHTTPSession): Response {
val url = session.parameters["url"]?.first()
val url = session.parameters["url"]?.firstOrNull()
val headers = extractHeadersFromSession(session)
return handleSegment(url, headers)
}
fun handleSegment(url: String?, headers: Map<String, String> = emptyMap()): Response {
Log.d(tag, "Processing segment request for URL: $url")
Log.d(tag, "Headers: $headers")
if (url.isNullOrBlank()) {
Log.w(tag, "Missing URL parameter in segment request")
@@ -110,10 +110,16 @@ class M3u8HttpServer(
}
return try {
Log.d(tag, "Starting segment processing for: $url")
val segmentData = runBlocking { processSegmentUrl(url, headers) }
Log.d(tag, "Segment processing completed successfully, data size: ${segmentData.size} bytes")
val inputStream = ByteArrayInputStream(segmentData)
val requestBuilder = Request.Builder().url(url)
headers.forEach { (key, value) ->
requestBuilder.addHeader(key, value)
}
val response = client.newCall(requestBuilder.build()).execute()
if (!response.isSuccessful) {
response.close()
throw IOException("Failed to fetch segment: ${response.code}")
}
val inputStream = AutoDetectingInputStream(response, response.body.byteStream())
newChunkedResponse(Status.OK, "video/mp2t", inputStream)
} catch (e: Exception) {
Log.e(tag, "Error processing segment: ${e.message}", e)
@@ -121,6 +127,69 @@ class M3u8HttpServer(
}
}
private class AutoDetectingInputStream(
private val response: okhttp3.Response,
private val delegate: java.io.InputStream
) : java.io.InputStream() {
private var detected = false
private var prefixBuffer: ByteArray? = null
private var prefixPointer = 0
private var prefixLength = 0
private fun ensureDetected() {
if (detected) return
detected = true
val buffer = ByteArray(4096)
var bytesRead = 0
while (bytesRead < buffer.size) {
val read = delegate.read(buffer, bytesRead, buffer.size - bytesRead)
if (read == -1) break
bytesRead += read
}
if (bytesRead > 0) {
val skipBytes = AutoDetector.detectSkipBytes(buffer.copyOf(bytesRead))
prefixBuffer = buffer
prefixPointer = skipBytes
prefixLength = bytesRead
}
}
override fun read(): Int {
ensureDetected()
val buf = prefixBuffer
if (buf != null && prefixPointer < prefixLength) {
val b = buf[prefixPointer].toInt() and 0xFF
prefixPointer++
if (prefixPointer >= prefixLength) {
prefixBuffer = null
}
return b
}
return delegate.read()
}
override fun read(b: ByteArray, off: Int, len: Int): Int {
ensureDetected()
val buf = prefixBuffer
if (buf != null && prefixPointer < prefixLength) {
val available = prefixLength - prefixPointer
val toCopy = minOf(len, available)
System.arraycopy(buf, prefixPointer, b, off, toCopy)
prefixPointer += toCopy
if (prefixPointer >= prefixLength) {
prefixBuffer = null
}
return toCopy
}
return delegate.read(b, off, len)
}
override fun close() {
delegate.close()
response.close()
}
}
private fun handleHealthRequest(): Response {
Log.d(tag, "Health check requested")
val status = getHealthStatus()
@@ -169,21 +238,6 @@ class M3u8HttpServer(
}
}
/**
* Process segment with automatic detection
*/
suspend fun processSegmentUrl(url: String, headers: Map<String, String> = emptyMap()): ByteArray = withContext(Dispatchers.IO) {
try {
Log.d(tag, "Fetching segment from: $url with headers: $headers")
val segmentData = fetchSegmentWithAutoDetection(url, headers)
Log.d(tag, "Segment processing completed, final size: ${segmentData.size} bytes")
segmentData
} catch (e: Exception) {
Log.e(tag, "Error processing segment URL: ${e.message}", e)
throw IOException("Error processing segment: ${e.message}")
}
}
/**
* Health check
*/
@@ -221,53 +275,6 @@ class M3u8HttpServer(
}
}
private suspend fun fetchSegmentWithAutoDetection(url: String, headers: Map<String, String> = emptyMap()): ByteArray = withContext(Dispatchers.IO) {
Log.d(tag, "Making HTTP request to fetch segment with headers: $headers")
val requestBuilder = Request.Builder().url(url)
headers.forEach { (key, value) ->
requestBuilder.addHeader(key, value)
}
val request = requestBuilder.build()
client.newCall(request).execute().use { response ->
Log.d(tag, "Segment HTTP response code: ${response.code}")
if (!response.isSuccessful) {
Log.e(tag, "Failed to fetch segment, HTTP code: ${response.code}")
throw IOException("Failed to fetch segment: ${response.code}")
}
val inputStream = response.body.byteStream()
val outputStream = ByteArrayOutputStream()
// Read first 4KB to detect format
val buffer = ByteArray(4096)
val bytesRead = inputStream.read(buffer)
Log.d(tag, "Read $bytesRead bytes from segment for format detection")
if (bytesRead > 0) {
val skipBytes = AutoDetector.detectSkipBytes(buffer.copyOf(bytesRead))
Log.d(tag, "AutoDetector determined skip bytes: $skipBytes")
// Write data from detected offset
val validBytes = bytesRead - skipBytes
outputStream.write(buffer, skipBytes, validBytes)
Log.d(tag, "Wrote $validBytes bytes from detected offset")
// Copy remaining data
val remainingBytes = inputStream.copyTo(outputStream)
Log.d(tag, "Copied $remainingBytes remaining bytes")
}
inputStream.close()
val finalData = outputStream.toByteArray()
outputStream.close()
Log.d(tag, "Final segment data size: ${finalData.size} bytes")
finalData
}
}
/**
* Creates a local M3U8 URL by encoding the original URL and redirecting to the local server.
* It can either be segment URL or a direct M3U8 URL (not a playlist).

View File

@@ -2,6 +2,7 @@ package aniyomi.lib.m3u8server
import android.util.Log
import okhttp3.OkHttpClient
import org.nanohttpd.protocols.http.response.Response
/**
* M3U8 Server manager to facilitate usage
@@ -67,7 +68,7 @@ class M3u8ServerManager(
* @param headers Optional headers to use for the request
* @return Processed segment data
*/
suspend fun processSegmentUrl(segmentUrl: String, headers: Map<String, String> = emptyMap()): ByteArray? = server?.processSegmentUrl(segmentUrl, headers)
fun processSegmentUrl(segmentUrl: String, headers: Map<String, String> = emptyMap()): Response? = server?.handleSegment(segmentUrl, headers)
/**
* Gets server information