Skip to content

Use attribute lock ATTR_BY_KERNEL for push_event

Mateusz C requested to merge 573-no-sync-attribute-lock into develop

Closes #573 (closed)

Second approach to the problem, now using a built-in omni_mutex lock protecting each attribute on push_events. No changes to CppTango are requires, only a boost layer of PyTango.

Considerations:

  • read attribute was coliding with push_events, write attribute is not colliding
  • omniorb docs say that when one thread tries to lock the same mutex, the behaviour is undefined. Coroutines are only using one thread, but after discussing with Anton, we think that the case we acquire twice the same lock in the same thread will not happen.
DS to test it
import argparse
import asyncio
import queue
import threading
import time
import numpy as np
import tango
from tango import (
    GreenMode,
    AttrQuality,
    AttrWriteType,
    DevState,
    DebugIt,
    EnsureOmniThread,
)
from tango.server import Device, attribute, command
from tango.test_context import DeviceTestContext

EVENT_PUBLISHER_ERROR_BACKOFF_TIME_SEC = 1.0


class MyBaseDevice(Device):

    def __init__(self, *args):
        super().__init__(*args)
        self._last_data = 0.0
        self._publisher = threading.Thread(
            target=self._publisher_thread, name="publisher"
        )
        self._publisher.daemon = True
        self._running = False

    def _publisher_thread(self):
        with EnsureOmniThread():
            attr_name = "unset"
            data = None
            while self._running:
                try:
                    self._last_data = np.random.rand()
                    super().push_change_event("H22", self._last_data)
                except Exception as exc:
                    self.error_stream(
                        f"General exception in publisher thread for attr {attr_name}, "
                        f"value {data}: {exc}"
                    )
                    time.sleep(EVENT_PUBLISHER_ERROR_BACKOFF_TIME_SEC)

    @DebugIt()
    @command
    def Start(self):
        self._running = True
        self._publisher.start()
        self.set_state(DevState.RUNNING)

    @attribute(dtype=float)
    def H22(self):
        print("--------------------------------------- H22")
        return self._last_data

    @H22.write
    def H22(self, value):
        print(f"Writing H22: {value}")
        self._last_data = value



class MyDeviceAsyncio(MyBaseDevice):
    green_mode = GreenMode.Asyncio


class MyDeviceSync(MyBaseDevice):
    green_mode = GreenMode.Synchronous


# Inherit from either MyDeviceSync or MyDeviceAsyncio
class MyDevice(MyDeviceAsyncio):
    pass


def run_server(parsed_args, *args):
    MyDevice.run_server([parsed_args.instance] + list(args))


def run_client(parsed_args, *args):
    proxy = tango.DeviceProxy(parsed_args.device)
    start_and_read(proxy, parsed_args.sleep)


def run_writer(parsed_args, *args):
    proxy = tango.DeviceProxy(parsed_args.device)
    proxy.Start()
    while True:
        value = np.random.rand()
        print(f"wrinting H22: {value}")
        proxy.write_attribute("H22", value)
        time.sleep(parsed_args.sleep)

def run_test(parsed_args, *args):
    with DeviceTestContext(MyDevice, process=True) as proxy:
        start_and_read(proxy, parsed_args.sleep)


def start_and_read(proxy, sleep):
    proxy.Start()
    while True:
        attributes = proxy.read_attributes(["H22"])
        for attr in attributes:
            print(f"Reading {attr.name}: {attr.value}")
        time.sleep(sleep)


def parse_args():
    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(required=True, help="valid subcommands")
    # server subcommand
    parser_server = subparsers.add_parser("server", help="run the server")
    parser_server.add_argument("instance", help="Tango instance name")
    parser_server.set_defaults(func=run_server)
    # client subcommand
    parser_client = subparsers.add_parser("client", help="run the client")
    parser_client.add_argument("device", help="Tango device name")
    parser_client.add_argument(
        "--sleep",
        type=float,
        help="sleep between read in seconds [default: 2.0]",
        default=2.0,
    )
    parser_client.set_defaults(func=run_client)
    # writer subcommand
    parser_client = subparsers.add_parser("writer", help="run the client writing the attribute")
    parser_client.add_argument("device", help="Tango device name")
    parser_client.add_argument(
        "--sleep",
        type=float,
        help="sleep between read in seconds [default: 2.0]",
        default=2.0,
    )
    parser_client.set_defaults(func=run_writer)
    # test subcommand
    parser_test = subparsers.add_parser(
        "test", help="run both the server and client (using DeviceTestContext)"
    )
    parser_test.add_argument(
        "--sleep",
        type=float,
        help="sleep between read in seconds [default: 2.0]",
        default=2,
    )
    parser_test.set_defaults(func=run_test)
    args, unknown = parser.parse_known_args()
    return (args, unknown)


def main():
    args, unknown = parse_args()
    args.func(args, *unknown)


if __name__ == "__main__":
    main()
commands to test it
python read_dyn.py server dyn
python read_dyn.py client my/dyn/1
Edited by Anton Joubert

Merge request reports