WikipediaSourceTask.kt 3.67 KB
Newer Older
madhead's avatar
madhead committed
1 2 3 4
package me.madhead.playgrounds.kafka.connect

import com.fasterxml.jackson.databind.ObjectMapper
import com.saladevs.rxsse.RxSSE
5
import com.saladevs.rxsse.ServerSentEvent
madhead's avatar
madhead committed
6
import io.reactivex.Flowable
7
import io.reactivex.disposables.Disposable
madhead's avatar
madhead committed
8 9 10 11 12 13 14 15 16 17
import io.reactivex.schedulers.Schedulers
import io.reactivex.subscribers.DisposableSubscriber
import me.madhead.playgrounds.kafka.wikipedia.RecentChange
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.source.SourceRecord
import org.apache.kafka.connect.source.SourceTask
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.SynchronousQueue
madhead's avatar
madhead committed
18
import java.util.concurrent.TimeUnit
madhead's avatar
madhead committed
19 20 21 22 23 24

class WikipediaSourceTask : SourceTask() {
    companion object {
        private val logger: Logger = LoggerFactory.getLogger(WikipediaSourceTask::class.java)
    }

25
    private lateinit var topic: String
madhead's avatar
madhead committed
26
    private lateinit var streamUrl: String
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
    private lateinit var recentChanges: Disposable
    private val mapper = ObjectMapper()
    private val rendezvous = SynchronousQueue<RecentChange>()
    private val recentChangesSubscriber = object : DisposableSubscriber<RecentChange>() {
        override fun onStart() {
            request(1)
        }

        override fun onNext(t: RecentChange?) {
            t?.let {
                rendezvous.put(it)
            }
            request(1)
        }

        override fun onComplete() {
        }

        override fun onError(t: Throwable?) {
            logger.error("Error in SSE stream", t)
        }
    }
madhead's avatar
madhead committed
49 50 51

    override fun version(): String = WikipediaSourceConnector.VERSION

52 53
    override fun start(props: Map<String, String>) {
        topic = props[WikipediaSourceConnector.TOPIC_CONFIG] ?: throw ConfigException("Topic must be set")
madhead's avatar
madhead committed
54
        streamUrl = props[WikipediaSourceConnector.STREAM_URL_CONFIG] ?: throw ConfigException("SSE URL must be set")
madhead's avatar
madhead committed
55
        recentChanges = RxSSE()
madhead's avatar
madhead committed
56
                .connectTo(streamUrl)
madhead's avatar
madhead committed
57
                .subscribeOn(Schedulers.io())
58 59 60
                .observeOn(Schedulers.io())
                .flatMap(::sse2model)
                .observeOn(Schedulers.computation())
madhead's avatar
madhead committed
61 62
                .doOnNext { logger.debug("Next SSE: {}", it?.meta?.id) }
                .onBackpressureDrop { logger.warn("Dropping SSE: {}", it?.meta?.id) }
63 64 65
                .retry()
                .observeOn(Schedulers.io())
                .subscribeWith(recentChangesSubscriber)
madhead's avatar
madhead committed
66

67
        logger.info("Started WikipediaSourceTask with topic '{}'", topic)
madhead's avatar
madhead committed
68 69 70
    }

    override fun poll(): List<SourceRecord> {
madhead's avatar
madhead committed
71 72 73 74 75 76 77 78 79 80 81
        return rendezvous.poll(1, TimeUnit.SECONDS)?.let { event ->
            val record = SourceRecord(
                    /* sourcePartition */ mapOf("domain" to event.meta?.domain),
                    /* sourceOffset */ mapOf("dt" to event.meta?.dt?.time),
                    /* topic */ topic,
                    /* partition */ null,
                    /* keySchema */ Schema.STRING_SCHEMA,
                    /* key */ event.meta?.id ?: "",
                    /* valueSchema */ Schema.BYTES_SCHEMA,
                    /* value */ mapper.writeValueAsBytes(event)
            )
82

madhead's avatar
madhead committed
83
            logger.debug("Producing a record: {}", record)
madhead's avatar
madhead committed
84

madhead's avatar
madhead committed
85 86 87 88
            listOf(
                    record
            )
        } ?: emptyList()
madhead's avatar
madhead committed
89 90 91 92 93
    }

    override fun stop() {
        recentChanges.dispose()

94 95 96 97 98 99 100 101 102
        logger.info("Stopped WikipediaSourceTask with topic '{}'", topic)
    }

    private fun sse2model(it: ServerSentEvent): Flowable<RecentChange> {
        return try {
            Flowable.just(mapper.readValue(it.data, RecentChange::class.java))
        } catch (_: Exception) {
            Flowable.empty()
        }
madhead's avatar
madhead committed
103 104
    }
}