Prevent emitting duplicated messages to event subscribers.
CHANGES
- introduced
duplicate
variable to control which messages consumed from kafka server are emitted to subscribers.
TESTING 1st part of log below shows how message (IXI/803) is not emitted after adapter restart as operator waited for .topics.json update before adapter restart.
automation-platform | 2022-08-05T10:54:47.287Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:492, message=[ 'PROCESSING NEW MESSAGE ON TOPIC: t1 PARTITION: 0 WITH OFFSET: 803' ]
automation-platform | 2022-08-05T10:54:47.292Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:564, message=[
automation-platform | 'Message IXI being passed as it does meet the filter condition: IXI'
automation-platform | ]
automation-platform | 2022-08-05T10:54:47.293Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:580, message=[ 'Emitting: IXI TO t1' ]
<ADAPTER RESTART>
automation-platform | 2022-08-05T10:55:17.827Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:492, message=[ 'PROCESSING NEW MESSAGE ON TOPIC: t1 PARTITION: 0 WITH OFFSET: 803' ]
automation-platform | 2022-08-05T10:55:17.832Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:564, message=[
automation-platform | 'Message IXI being passed as it does meet the filter condition: IXI'
automation-platform | ]
automation-platform | 2022-08-05T10:55:17.834Z - warn: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:573, message=[ 'Emitting: IXI/803 SKIPPED, duplicated message' ]
2nd part of log shows how mesage (PIXI/805) is emitted twice (before and after adapter restart) as operator restarted adapter before interval_time elapse.
automation-platform | 2022-08-05T10:56:17.568Z - debug: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:387, message=[ 'Update .topics.json file' ]
automation-platform | 2022-08-05T10:56:27.160Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:492, message=[ 'PROCESSING NEW MESSAGE ON TOPIC: t1 PARTITION: 0 WITH OFFSET: 805' ]
automation-platform | 2022-08-05T10:56:27.161Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:564, message=[
automation-platform | 'Message PIXI being passed as it does meet the filter condition: PIXI'
automation-platform | ]
automation-platform | 2022-08-05T10:56:27.162Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:580, message=[ 'Emitting: PIXI TO t1' ]
<ADAPTER RESTART>
automation-platform | 2022-08-05T10:56:37.623Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:492, message=[ 'PROCESSING NEW MESSAGE ON TOPIC: t1 PARTITION: 0 WITH OFFSET: 805' ]
automation-platform | 2022-08-05T10:56:37.625Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:564, message=[
automation-platform | 'Message PIXI being passed as it does meet the filter condition: PIXI'
automation-platform | ]
automation-platform | 2022-08-05T10:56:37.627Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:580, message=[ 'Emitting: PIXI TO t1' ]
FULL LOG: duplicate_IAP.log
Edited by Bogdan Tabor