Verified Commit 9a950cb5 authored by madhead's avatar madhead

Yield the thread periodically

parent b94d4cd8
......@@ -27,4 +27,5 @@
.. Go and check the `wiki` connector at http://localhost:8083/connectors
.. Go and check the `wiki` connector details at http://localhost:8083/connectors/wiki
. Check the topic:
.. `cd kafka_2.12-2.5.0`
.. `./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --property print.key=true --topic wiki`
......@@ -15,6 +15,7 @@ import org.apache.kafka.connect.source.SourceTask
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.TimeUnit
class WikipediaSourceTask : SourceTask() {
companion object {
......@@ -57,8 +58,8 @@ class WikipediaSourceTask : SourceTask() {
.observeOn(Schedulers.io())
.flatMap(::sse2model)
.observeOn(Schedulers.computation())
.doOnNext { logger.debug("Next SSE event: {}", it?.meta?.id) }
.onBackpressureDrop { logger.warn("Dropping SSE event: {}", it?.meta?.id) }
.doOnNext { logger.debug("Next SSE: {}", it?.meta?.id) }
.onBackpressureDrop { logger.warn("Dropping SSE: {}", it?.meta?.id) }
.retry()
.observeOn(Schedulers.io())
.subscribeWith(recentChangesSubscriber)
......@@ -67,23 +68,24 @@ class WikipediaSourceTask : SourceTask() {
}
override fun poll(): List<SourceRecord> {
val event = rendezvous.take()
val record = SourceRecord(
/* sourcePartition */ mapOf("domain" to event.meta?.domain),
/* sourceOffset */ mapOf("dt" to event.meta?.dt?.time),
/* topic */ topic,
/* partition */ null,
/* keySchema */ Schema.STRING_SCHEMA,
/* key */ event.meta?.id ?: "",
/* valueSchema */ Schema.BYTES_SCHEMA,
/* value */ mapper.writeValueAsBytes(event)
)
return rendezvous.poll(1, TimeUnit.SECONDS)?.let { event ->
val record = SourceRecord(
/* sourcePartition */ mapOf("domain" to event.meta?.domain),
/* sourceOffset */ mapOf("dt" to event.meta?.dt?.time),
/* topic */ topic,
/* partition */ null,
/* keySchema */ Schema.STRING_SCHEMA,
/* key */ event.meta?.id ?: "",
/* valueSchema */ Schema.BYTES_SCHEMA,
/* value */ mapper.writeValueAsBytes(event)
)
logger.debug("Producing a record: {}", record)
logger.debug("Producing a record: {}", record)
return listOf(
record
)
listOf(
record
)
} ?: emptyList()
}
override fun stop() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment