Commit ca478d9e authored by Michael Evenchick's avatar Michael Evenchick

add support for defining and subscribing to kafka topics --- always

parent e22d0caf
Pipeline #159344370 passed with stages
in 3 minutes and 38 seconds
......@@ -138,13 +138,53 @@ class Kafka extends EventEmitterCl {
this.id = prongid;
myid = prongid;
// put topics from file into memory
this.topicsEvents = {};
const topicsFile = path.join(__dirname, '/.topics.json');
// if the file does not exist - error
if (fs.existsSync(topicsFile)) {
this.topicsEvents = JSON.parse(fs.readFileSync(topicsFile, 'utf-8'));
}
// if there are topics in the properties
if (this.props && this.props.topics) {
let needRestart = false;
for (let t = 0; t < this.props.topics.length; t += 1) {
if (!mytopics.includes(this.props.topics[t])) {
pjson.topics[this.props.topics[t]] = {};
needRestart = true;
if (typeof this.props.topics[t] === 'string') {
if (!mytopics.includes(this.props.topics[t])) {
pjson.topics[this.props.topics[t]] = {};
needRestart = true;
}
} else if (typeof this.props.topics[t] === 'object') {
const topName = this.props.topics[t].name;
if (!mytopics.includes(topName)) {
pjson.topics[topName] = {};
needRestart = true;
}
// if the topic is always subscribed to --
if (this.props.topics[t].always) {
// need to add topics from properties into memory
if (Object.hasOwnProperty.call(this.topicsEvents, topName)) {
// increase the number of subscribers so we are always subscribed
this.topicsEvents[topName].subscribers = 999999;
} else {
// create tne topic information in memory
this.topicsEvents[topName] = {
subscribers: 999999,
offset: 0,
partitions: 0,
avro: 'NO'
};
}
// if avro, set avro on topic
if (this.props.topics[t].avro) {
// turn avro to true on topic
this.topicsEvents[topName].avro = 'YES';
}
}
}
}
if (needRestart) {
......@@ -186,15 +226,6 @@ class Kafka extends EventEmitterCl {
const errorData = JSON.parse(fs.readFileSync(errorFile, 'utf-8'));
({ errors } = errorData);
// put topics in memory
this.topicsEvents = {};
const topicsFile = path.join(__dirname, '/.topics.json');
// if the file does not exist - error
if (fs.existsSync(topicsFile)) {
this.topicsEvents = JSON.parse(fs.readFileSync(topicsFile, 'utf-8'));
}
// rewrite the topics file for persistence
if (this.props && this.props.stub === false) {
const intTime = this.props.interval_time || 30000;
......@@ -209,7 +240,7 @@ class Kafka extends EventEmitterCl {
*
* @function connect
*/
async connect() {
connect() {
const meth = 'adapter-connect';
const origin = `${this.id}-${meth}`;
log.trace(origin);
......@@ -223,27 +254,28 @@ class Kafka extends EventEmitterCl {
const combinedProps = this.props.client || {};
combinedProps.kafkaHost = this.props.host.concat(':', this.props.port);
this.KafkaClient = new kafka.KafkaClient(combinedProps);
this.producer = new kafka.Producer(this.KafkaClient, this.props.producer || {});
// get the topics
const consumerPayloads = [];
const keys = Object.keys(this.topicsEvents);
keys.forEach((item) => {
consumerPayloads.push({
topic: item,
offset: this.topicsEvents[item].offset || 0,
partitions: this.topicsEvents[item].partitions || 0
});
});
// create the consumer
this.consumer = new kafka.Consumer(this.KafkaClient, consumerPayloads, this.props.consumer || {});
this.KafkaClient.on('ready', () => {
this.emit('ONLINE', {
id: this.id
});
log.info('EMITTED ONLINE');
this.producer = new kafka.Producer(this.KafkaClient, this.props.producer || {});
// get the topics
const consumerPayloads = [];
const keys = Object.keys(this.topicsEvents);
keys.forEach((item) => {
consumerPayloads.push({
topic: item,
offset: this.topicsEvents[item].offset || 0,
partition: this.topicsEvents[item].partitions || 0
});
});
// create the consumer
this.consumer = new kafka.Consumer(this.KafkaClient, consumerPayloads, this.props.consumer || {});
// consume the messages
this.consumer.on('error', (serr) => {
log.error(`Consumer Error: ${serr}`);
......@@ -558,7 +590,9 @@ class Kafka extends EventEmitterCl {
// if topic list has this topic
if (Object.hasOwnProperty.call(this.topicsEvents, item)) {
// increase the number of subscribers
this.topicsEvents[item].subscribers += 1;
if (this.topicsEvents[item].subscribers >= 999999) {
this.topicsEvents[item].subscribers += 1;
}
// can only set the offset and partitions if no one else listening
if ((partition && partition !== this.topicsEvents[item].partitions)
......@@ -659,7 +693,9 @@ class Kafka extends EventEmitterCl {
// if topic list has this topic
if (Object.hasOwnProperty.call(this.topicsEvents, item)) {
// increase the number of subscribers
this.topicsEvents[item].subscribers += 1;
if (this.topicsEvents[item].subscribers >= 999999) {
this.topicsEvents[item].subscribers += 1;
}
// can only set the offset and partitions if no one else listening
if ((partition && partition !== this.topicsEvents[item].partitions)
......@@ -758,7 +794,9 @@ class Kafka extends EventEmitterCl {
// if topic list has this topic
if (Object.hasOwnProperty.call(this.topicsEvents, item)) {
// reduce the number of subscribers
this.topicsEvents[item].subscribers -= 1;
if (this.topicsEvents[item].subscribers >= 999999) {
this.topicsEvents[item].subscribers -= 1;
}
// if event list is empty, remove the topic object
if (this.topicsEvents[item].subscribers <= 0) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment