Commit dff4689f authored by Jonas L.'s avatar Jonas L.

Refactor concurrency limit of the project image downloader

parent 263dd7c7
......@@ -12,6 +12,8 @@ import de.determapp.android.content.projectdata.Project
import de.determapp.android.ui.viewer.ProjectSpec
import de.determapp.android.util.*
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.channels.consumeEach
import okhttp3.HttpUrl
import okhttp3.Request
import okio.Okio
......@@ -67,54 +69,60 @@ fun updateProjectImages(context: Context, baseUrl: String, project: Project, res
})
}
// FIXME: this would not limit the concurrency correctly when downloads are non blocking
runBlocking (context = newFixedThreadPoolContext(NUM_OF_PARALLEL_DOWNLOADS, "image downloading")) {
cancellationSignal.setOnCancelListener {
this.coroutineContext.cancel()
}
runBlocking {
val job = Job()
newFiles.forEach {
processedFile ->
cancellationSignal.setOnCancelListener { job.cancel() }
async {
val requestedFile = processedFile.getByResolution(resolution)
withContext(job + Dispatchers.IO) {
val pipe = Channel<Image>()
if (BuildConfig.DEBUG) {
Log.d(LOG_TAG, "start download of ${requestedFile.filename}")
}
repeat(NUM_OF_PARALLEL_DOWNLOADS) { worker ->
async {
pipe.consumeEach { processedFile ->
val requestedFile = processedFile.getByResolution(resolution)
val tempFileName = ContentStorage.generateId()
val tempFile = File(imageDirectory, tempFileName)
if (BuildConfig.DEBUG) {
Log.d(LOG_TAG, "start download of ${requestedFile.filename} at $worker")
}
Http.uncachedClient.newCall(
Request.Builder()
.url(HttpUrl.parse(baseUrl)!!.resolve("./image/" + requestedFile.filename)!!)
.build()
).execute().use { response ->
if (!response.isSuccessful) {
throw IOException("request failed")
}
val tempFileName = ContentStorage.generateId()
val tempFile = File(imageDirectory, tempFileName)
// write response to temp file
Okio.buffer(Okio.sink(tempFile)).use { tempFileSink ->
tempFileSink.writeAll(response.body()!!.source())
}
}
Http.uncachedClient.newCall(
Request.Builder()
.url(HttpUrl.parse(baseUrl)!!.resolve("./image/" + requestedFile.filename)!!)
.build()
).execute().use { response ->
if (!response.isSuccessful) {
throw IOException("request failed")
}
// write response to temp file
Okio.buffer(Okio.sink(tempFile)).use { tempFileSink ->
tempFileSink.writeAll(response.body()!!.source())
}
}
// validate the temp file
ImageValidator.assertImageValid(requestedFile, tempFile)
// validate the temp file
ImageValidator.assertImageValid(requestedFile, tempFile)
// rename to final name
tempFile.renameTo(File(imageDirectory, requestedFile.filename))
// rename to final name
tempFile.renameTo(File(imageDirectory, requestedFile.filename))
// update the progress
processedFiles.add(processedFile)
updateProgress()
// update the progress
processedFiles.add(processedFile)
updateProgress()
if (BuildConfig.DEBUG) {
Log.d(LOG_TAG, "finished download of ${requestedFile.filename}")
if (BuildConfig.DEBUG) {
Log.d(LOG_TAG, "finished download of ${requestedFile.filename} at $worker")
}
}
}
}
newFiles.forEach { pipe.send(it) }
pipe.close()
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment