Skip to content

Prevent emitting duplicated messages to event subscribers.

Bogdan Tabor requested to merge patch/ADAPT-2313 into master

CHANGES

  • introduced duplicate variable to control which messages consumed from kafka server are emitted to subscribers.

TESTING 1st part of log below shows how message (IXI/803) is not emitted after adapter restart as operator waited for .topics.json update before adapter restart.

automation-platform    | 2022-08-05T10:54:47.287Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:492, message=[ 'PROCESSING NEW MESSAGE ON TOPIC: t1 PARTITION: 0 WITH OFFSET: 803' ]
automation-platform    | 2022-08-05T10:54:47.292Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:564, message=[
automation-platform    |   'Message IXI being passed as it does meet the filter condition: IXI'
automation-platform    | ]
automation-platform    | 2022-08-05T10:54:47.293Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:580, message=[ 'Emitting: IXI TO t1' ]

<ADAPTER RESTART>

automation-platform    | 2022-08-05T10:55:17.827Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:492, message=[ 'PROCESSING NEW MESSAGE ON TOPIC: t1 PARTITION: 0 WITH OFFSET: 803' ]
automation-platform    | 2022-08-05T10:55:17.832Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:564, message=[
automation-platform    |   'Message IXI being passed as it does meet the filter condition: IXI'
automation-platform    | ]
automation-platform    | 2022-08-05T10:55:17.834Z - warn: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:573, message=[ 'Emitting: IXI/803 SKIPPED, duplicated message' ]

2nd part of log shows how mesage (PIXI/805) is emitted twice (before and after adapter restart) as operator restarted adapter before interval_time elapse.

automation-platform    | 2022-08-05T10:56:17.568Z - debug: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:387, message=[ 'Update .topics.json file' ]
automation-platform    | 2022-08-05T10:56:27.160Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:492, message=[ 'PROCESSING NEW MESSAGE ON TOPIC: t1 PARTITION: 0 WITH OFFSET: 805' ]
automation-platform    | 2022-08-05T10:56:27.161Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:564, message=[
automation-platform    |   'Message PIXI being passed as it does meet the filter condition: PIXI'
automation-platform    | ]
automation-platform    | 2022-08-05T10:56:27.162Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:580, message=[ 'Emitting: PIXI TO t1' ]

<ADAPTER RESTART>

automation-platform    | 2022-08-05T10:56:37.623Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:492, message=[ 'PROCESSING NEW MESSAGE ON TOPIC: t1 PARTITION: 0 WITH OFFSET: 805' ]
automation-platform    | 2022-08-05T10:56:37.625Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:564, message=[
automation-platform    |   'Message PIXI being passed as it does meet the filter condition: PIXI'
automation-platform    | ]
automation-platform    | 2022-08-05T10:56:37.627Z - info: audit_id=undefined, origin=/opt/itential/automation-platform/node_modules/@itentialopensource/adapter-kafka/adapter.js:580, message=[ 'Emitting: PIXI TO t1' ]

FULL LOG: duplicate_IAP.log

Edited by Bogdan Tabor

Merge request reports