Commit 615ab886 authored by Arthur Del Esposte's avatar Arthur Del Esposte

Add ActuatorCommandUpdater Worker

parent dfb1552c
Pipeline #9210043 passed with stage
in 3 minutes 48 seconds
require 'bunny'
require 'rubygems'
require 'json'
require 'mongoid'
require "#{File.dirname(__FILE__)}/../models/actuator_command"
class ActuatorCommandUpdater
TOPIC = 'resource.actuate.status'
QUEUE = 'actuator_controller.resource.actuation.status'
def initialize(consumers_size = 1, thread_pool = 1)
@consumers_size = consumers_size
@consumers = []
@channel = $conn.create_channel(nil, thread_pool)
@channel.prefetch(2)
@topic = @channel.topic(TOPIC)
@queue = @channel.queue(QUEUE, durable: true, auto_delete: false)
end
def perform
@queue.bind(@topic, routing_key: '#')
@consumers_size.times do
@consumers << @queue.subscribe(block: false) do |delivery_info, properties, body|
begin
json = JSON.parse(body)
command = ::ActuatorCommand.find(json['command_id'])
command.status = json['status']
command.save!
WORKERS_LOGGER.info("ActuatorCommandUpdater::CommandUpdated - command=#{command_id}&status=#{status}")
rescue StandardError => e
WORKERS_LOGGER.error("ActuatorCommandUpdater::CommandNotUpdated - #{e.message}")
end
end
end
end
def cancel
@consumers.each do |consumer|
@consumer.cancel
end
@channel.close
end
end
......@@ -12,4 +12,7 @@ if Rails.env.development? || Rails.env.production?
resource_updater_worker = ResourceUpdater.new(1, 1)
resource_updater_worker.perform
actuator_command_updater_worker = ActuatorCommandUpdater.new(1, 1)
actuator_command_updater_worker.perform
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment