Verified Commit 197eb1aa authored by Thomas Ives's avatar Thomas Ives
Browse files

src/server: Rename eventsupplier to EventSupplier

Also, split out ZmqEventSupplier.h and NotifdEventSupplier.h header
files and rename the implementation files to ZmqEventSupplier.cpp and
NotifdEventSupplier.cpp.
parent d9a0f05e
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -15,9 +15,9 @@
#include <tango/common/utils/assert.h>
#include <tango/common/utils/event_system_info.h>
#include <tango/common/utils/type_info.h>
#include <tango/server/eventsupplier.h>
#include <tango/server/Except.h>
#include <tango/server/SeqVec.h>
#include <tango/server/ZmqEventSupplier.h>

#include <algorithm>

+3 −1
Original line number Diff line number Diff line
@@ -41,7 +41,9 @@ set(SERVER_HEADERS UserDefaultAttrProp.h
    dserverclass.h
    DeviceEventSubscriptionState.h
    DServerSignal.h
    eventsupplier.h
    EventSupplier.h
    NotifdEventSupplier.h
    ZmqEventSupplier.h
    Except.h
    FwdAttr.h
    FwdAttribute.h
+7 −272
Original line number Diff line number Diff line
/*
 * SPDX-FileCopyrightText: 2003 Copyright contributors to the cppTango project
 * SPDX-FileCopyrightText: 2004 Copyright contributors to the cppTango project
 *
 * SPDX-License-Identifier: LGPL-3.0-or-later
 */
@@ -7,46 +7,21 @@
#ifndef TANGO_SERVER_EVENTSUPPLIER_H
#define TANGO_SERVER_EVENTSUPPLIER_H

#include <tango/common/omnithread_wrapper.h>
#include <tango/server/BlackBox.h>
#include <tango/server/DeviceImpl.h>
#include <tango/server/Except.h>
#include <tango/server/tango_clock.h>
#include <tango/server/Util.h>

#if defined(_TG_WINDOWS_) && defined(_USRDLL) && !defined(_TANGO_LIB)
  #define USE_stub_in_nt_dll
#endif

#include <COS/CosNotification.hh>
#include <COS/CosNotifyChannelAdmin.hh>
#include <COS/CosNotifyComm.hh>
#include <zmq.hpp>

#if defined(_TG_WINDOWS_) && defined(_USRDLL) && !defined(_TANGO_LIB)
  #undef USE_stub_in_nt_dll
#endif

#include <tango/client/eventconsumer.h>
#include <tango/common/omnithread_wrapper.h>
#ifndef _TG_WINDOWS_
  #include <sys/time.h>
#endif

#include <chrono>
#include <list>
#include <string>
#include <vector>

namespace Tango
{

typedef struct _NotifService
{
    CosNotifyChannelAdmin::SupplierAdmin_var SupAdm;
    CosNotifyChannelAdmin::ProxyID pID;
    CosNotifyChannelAdmin::ProxyConsumer_var ProCon;
    CosNotifyChannelAdmin::StructuredProxyPushConsumer_var StrProPush;
    CosNotifyChannelAdmin::EventChannelFactory_var EveChaFac;
    CosNotifyChannelAdmin::EventChannel_var EveCha;
    std::string ec_ior;
} NotifService;

//---------------------------------------------------------------------
//
//              EventSupplier base class
@@ -194,246 +169,6 @@ class EventSupplier
    bool one_subscription_cmd{false};
};

//---------------------------------------------------------------------
//
//              NotifdEventSupplier class
//
//---------------------------------------------------------------------

class NotifdEventSupplier : public EventSupplier, public POA_CosNotifyComm::StructuredPushSupplier
{
  public:
    static NotifdEventSupplier *create(CORBA::ORB_var, std::string, Util *);
    void connect();
    void disconnect_structured_push_supplier() override;
    void disconnect_from_notifd();
    void subscription_change(const CosNotification::EventTypeSeq &added,
                             const CosNotification::EventTypeSeq &deled) override;

    void push_heartbeat_event() override;

    std::string &get_event_channel_ior()
    {
        return event_channel_ior;
    }

    void file_db_svr();

    //------------------ Push event -------------------------------

    void push_event(DeviceImpl *,
                    std::string,
                    const std::vector<std::string> &,
                    const std::vector<double> &,
                    const std::vector<std::string> &,
                    const std::vector<long> &,
                    const struct SuppliedEventData &,
                    const std::string &,
                    DevFailed *,
                    bool) override;

  protected:
    NotifdEventSupplier(CORBA::ORB_var,
                        CosNotifyChannelAdmin::SupplierAdmin_var,
                        CosNotifyChannelAdmin::ProxyID,
                        CosNotifyChannelAdmin::ProxyConsumer_var,
                        CosNotifyChannelAdmin::StructuredProxyPushConsumer_var,
                        CosNotifyChannelAdmin::EventChannelFactory_var,
                        CosNotifyChannelAdmin::EventChannel_var,
                        std::string &,
                        Util *);

  private:
    static NotifdEventSupplier *_instance;
    CosNotifyChannelAdmin::EventChannel_var eventChannel;
    CosNotifyChannelAdmin::SupplierAdmin_var supplierAdmin;
    CosNotifyChannelAdmin::ProxyID proxyId;
    CosNotifyChannelAdmin::ProxyConsumer_var proxyConsumer;
    CosNotifyChannelAdmin::StructuredProxyPushConsumer_var structuredProxyPushConsumer;
    CosNotifyChannelAdmin::EventChannelFactory_var eventChannelFactory;
    CORBA::ORB_var orb_;

    std::string event_channel_ior;

    void reconnect_notifd();
    static void connect_to_notifd(NotifService &, const CORBA::ORB_var &, const std::string &, Util *);
};

//---------------------------------------------------------------------
//
//              ZmqEventSupplier class
//
//---------------------------------------------------------------------

#define LARGE_DATA_THRESHOLD 2048
#define LARGE_DATA_THRESHOLD_ENCODED LARGE_DATA_THRESHOLD * 4

class ZmqEventSupplier : public EventSupplier
{
  public:
    static ZmqEventSupplier *create(Util *);
    ~ZmqEventSupplier() override;

    //------------------ Push event -------------------------------

    void push_heartbeat_event() override;
    void push_event(DeviceImpl *,
                    std::string,
                    const std::vector<std::string> &,
                    const std::vector<double> &,
                    const std::vector<std::string> &,
                    const std::vector<long> &,
                    const struct SuppliedEventData &,
                    const std::string &,
                    DevFailed *,
                    bool) override;

    std::string &get_heartbeat_endpoint()
    {
        return heartbeat_endpoint;
    }

    std::string &get_event_endpoint()
    {
        return event_endpoint;
    }

    std::vector<std::string> &get_alternate_heartbeat_endpoint()
    {
        return alternate_h_endpoint;
    }

    std::vector<std::string> &get_alternate_event_endpoint()
    {
        return alternate_e_endpoint;
    }

    void create_event_socket();
    void create_mcast_event_socket(const std::string &, const std::string &, int, bool);
    bool is_event_mcast(const std::string &);
    std::string &get_mcast_event_endpoint(const std::string &);
    void init_event_cptr(const std::string &event_name);

    size_t get_mcast_event_nb()
    {
        return event_mcast.size();
    }

    bool update_connected_client(client_addr *);

    void set_double_send()
    {
        double_send++;
        double_send_heartbeat = true;
    }

    int get_zmq_release()
    {
        return zmq_release;
    }

    int get_calling_th()
    {
        return calling_th;
    }

    std::string create_full_event_name(DeviceImpl *device_impl,
                                       const std::string &event_type,
                                       const std::string &obj_name_lower,
                                       bool intr_change);

    void query_event_system(std::ostream &os);
    static void enable_perf_mon(Tango::DevBoolean enabled);

  protected:
    ZmqEventSupplier(Util *);

  private:
    static ZmqEventSupplier *_instance;

    struct McastSocketPub
    {
        std::string endpoint;
        zmq::socket_t *pub_socket;
        bool local_client;
        bool double_send;
    };

    struct ConnectedClient
    {
        client_addr clnt;
        std::chrono::steady_clock::time_point date;
    };

    zmq::context_t zmq_context;                        // ZMQ context
    zmq::socket_t *heartbeat_pub_sock;                 // heartbeat publisher socket
    zmq::socket_t *event_pub_sock{nullptr};            // events publisher socket
    std::map<std::string, McastSocketPub> event_mcast; // multicast socket(s) map
                                                       // The key is the full event name
                                                       // ie: tango://kidiboo.esrf.fr:1000/dev/test/10/att.change

    std::string heartbeat_endpoint;     // heartbeat publisher endpoint
    std::string host_ip;                // Host IP address
    std::vector<std::string> alt_ip;    // Host alternate IP addresses
    std::string heartbeat_event_name;   // The event name used for the heartbeat
    ZmqCallInfo heartbeat_call;         // The heartbeat call info
    cdrMemoryStream heartbeat_call_cdr; //
    TangoCdrMemoryStream data_call_cdr;
    std::string event_name;
    std::vector<std::string> alternate_h_endpoint; // Alternate heartbeat endpoint (host with several NIC)

    zmq::message_t endian_mess;             // Zmq messages
    zmq::message_t endian_mess_2;           //
    zmq::message_t endian_mess_heartbeat;   //
    zmq::message_t endian_mess_heartbeat_2; //
    zmq::message_t heartbeat_call_mess;     //
    zmq::message_t heartbeat_call_mess_2;   //

    unsigned char host_endian; // the host endianess

    bool ip_specified;          // The user has specified an IP address
    bool name_specified{false}; // The user has specified a name as IP address
    std::string user_ip;        // The specified IP address

    std::string event_endpoint;                    // event publisher endpoint
    std::vector<std::string> alternate_e_endpoint; // Alternate event endpoint (host with several NIC)

    std::map<std::string, unsigned int> event_cptr; // event counter map

    std::list<ConnectedClient> con_client; // Connected clients
    int double_send{0};                    // Double send ctr
    bool double_send_heartbeat{false};

    int zmq_release; // ZMQ lib release

    int calling_th;

    void tango_bind(zmq::socket_t *, std::string &);
    unsigned char test_endian();
    void create_mcast_socket(const std::string &, int, McastSocketPub &);
    size_t get_blob_data_nb(DevVarPipeDataEltArray &);
    size_t get_data_elt_data_nb(DevPipeDataElt &);
    std::string ctr_event_name;

    /// Rebuild the heartbeat event name
    void rebuild_heartbeat_event_name();
};

//
// Yet another evil macro! But sometimes quite usefull
//

#define GET_SEQ(A, B, C, D, E)                        \
    do                                                \
    {                                                 \
        A = true;                                     \
        B = &E->value.C();                            \
        if(archive == true)                           \
            D = &attr.prev_archive_event.value_4.C(); \
        else                                          \
            D = &attr.prev_change_event.value_4.C();  \
    } while(false)

} // namespace Tango

#endif // TANGO_SERVER_EVENTSUPPLIER_H
#endif
+107 −0
Original line number Diff line number Diff line
/*
 * SPDX-FileCopyrightText: 2004 Copyright contributors to the cppTango project
 *
 * SPDX-License-Identifier: LGPL-3.0-or-later
 */

#ifndef TANGO_SERVER_NOTIFDEVENTSUPPLIER_H
#define TANGO_SERVER_NOTIFDEVENTSUPPLIER_H

#include <tango/server/EventSupplier.h>

#if defined(_TG_WINDOWS_) && defined(_USRDLL) && !defined(_TANGO_LIB)
  #define USE_stub_in_nt_dll
#endif

#include <COS/CosNotification.hh>
#include <COS/CosNotifyChannelAdmin.hh>
#include <COS/CosNotifyComm.hh>

#if defined(_TG_WINDOWS_) && defined(_USRDLL) && !defined(_TANGO_LIB)
  #undef USE_stub_in_nt_dll
#endif

#include <string>
#include <vector>

namespace Tango
{

typedef struct _NotifService
{
    CosNotifyChannelAdmin::SupplierAdmin_var SupAdm;
    CosNotifyChannelAdmin::ProxyID pID;
    CosNotifyChannelAdmin::ProxyConsumer_var ProCon;
    CosNotifyChannelAdmin::StructuredProxyPushConsumer_var StrProPush;
    CosNotifyChannelAdmin::EventChannelFactory_var EveChaFac;
    CosNotifyChannelAdmin::EventChannel_var EveCha;
    std::string ec_ior;
} NotifService;

//---------------------------------------------------------------------
//
//              NotifdEventSupplier class
//
//---------------------------------------------------------------------

class NotifdEventSupplier : public EventSupplier, public POA_CosNotifyComm::StructuredPushSupplier
{
  public:
    static NotifdEventSupplier *create(CORBA::ORB_var, std::string, Util *);
    void connect();
    void disconnect_structured_push_supplier() override;
    void disconnect_from_notifd();
    void subscription_change(const CosNotification::EventTypeSeq &added,
                             const CosNotification::EventTypeSeq &deled) override;

    void push_heartbeat_event() override;

    std::string &get_event_channel_ior()
    {
        return event_channel_ior;
    }

    void file_db_svr();

    //------------------ Push event -------------------------------

    void push_event(DeviceImpl *,
                    std::string,
                    const std::vector<std::string> &,
                    const std::vector<double> &,
                    const std::vector<std::string> &,
                    const std::vector<long> &,
                    const struct SuppliedEventData &,
                    const std::string &,
                    DevFailed *,
                    bool) override;

  protected:
    NotifdEventSupplier(CORBA::ORB_var,
                        CosNotifyChannelAdmin::SupplierAdmin_var,
                        CosNotifyChannelAdmin::ProxyID,
                        CosNotifyChannelAdmin::ProxyConsumer_var,
                        CosNotifyChannelAdmin::StructuredProxyPushConsumer_var,
                        CosNotifyChannelAdmin::EventChannelFactory_var,
                        CosNotifyChannelAdmin::EventChannel_var,
                        std::string &,
                        Util *);

  private:
    static NotifdEventSupplier *_instance;
    CosNotifyChannelAdmin::EventChannel_var eventChannel;
    CosNotifyChannelAdmin::SupplierAdmin_var supplierAdmin;
    CosNotifyChannelAdmin::ProxyID proxyId;
    CosNotifyChannelAdmin::ProxyConsumer_var proxyConsumer;
    CosNotifyChannelAdmin::StructuredProxyPushConsumer_var structuredProxyPushConsumer;
    CosNotifyChannelAdmin::EventChannelFactory_var eventChannelFactory;
    CORBA::ORB_var orb_;

    std::string event_channel_ior;

    void reconnect_notifd();
    static void connect_to_notifd(NotifService &, const CORBA::ORB_var &, const std::string &, Util *);
};
} // namespace Tango

#endif
+185 −0
Original line number Diff line number Diff line
/*
 * SPDX-FileCopyrightText: 2004 Copyright contributors to the cppTango project
 *
 * SPDX-License-Identifier: LGPL-3.0-or-later
 */

#ifndef TANGO_SERVER_ZMQEVENTSUPPLIER_H
#define TANGO_SERVER_ZMQEVENTSUPPLIER_H

#include <tango/client/eventconsumer.h>
#include <tango/server/EventSupplier.h>

#include <zmq.hpp>

#include <list>
#include <map>
#include <string>
#include <vector>

namespace Tango
{

//---------------------------------------------------------------------
//
//              ZmqEventSupplier class
//
//---------------------------------------------------------------------

#define LARGE_DATA_THRESHOLD 2048
#define LARGE_DATA_THRESHOLD_ENCODED LARGE_DATA_THRESHOLD * 4

class ZmqEventSupplier : public EventSupplier
{
  public:
    static ZmqEventSupplier *create(Util *);
    ~ZmqEventSupplier() override;

    //------------------ Push event -------------------------------

    void push_heartbeat_event() override;
    void push_event(DeviceImpl *,
                    std::string,
                    const std::vector<std::string> &,
                    const std::vector<double> &,
                    const std::vector<std::string> &,
                    const std::vector<long> &,
                    const struct SuppliedEventData &,
                    const std::string &,
                    DevFailed *,
                    bool) override;

    std::string &get_heartbeat_endpoint()
    {
        return heartbeat_endpoint;
    }

    std::string &get_event_endpoint()
    {
        return event_endpoint;
    }

    std::vector<std::string> &get_alternate_heartbeat_endpoint()
    {
        return alternate_h_endpoint;
    }

    std::vector<std::string> &get_alternate_event_endpoint()
    {
        return alternate_e_endpoint;
    }

    void create_event_socket();
    void create_mcast_event_socket(const std::string &, const std::string &, int, bool);
    bool is_event_mcast(const std::string &);
    std::string &get_mcast_event_endpoint(const std::string &);
    void init_event_cptr(const std::string &event_name);

    size_t get_mcast_event_nb()
    {
        return event_mcast.size();
    }

    bool update_connected_client(client_addr *);

    void set_double_send()
    {
        double_send++;
        double_send_heartbeat = true;
    }

    int get_zmq_release()
    {
        return zmq_release;
    }

    int get_calling_th()
    {
        return calling_th;
    }

    std::string create_full_event_name(DeviceImpl *device_impl,
                                       const std::string &event_type,
                                       const std::string &obj_name_lower,
                                       bool intr_change);

    void query_event_system(std::ostream &os);
    static void enable_perf_mon(Tango::DevBoolean enabled);

  protected:
    ZmqEventSupplier(Util *);

  private:
    static ZmqEventSupplier *_instance;

    struct McastSocketPub
    {
        std::string endpoint;
        zmq::socket_t *pub_socket;
        bool local_client;
        bool double_send;
    };

    struct ConnectedClient
    {
        client_addr clnt;
        std::chrono::steady_clock::time_point date;
    };

    zmq::context_t zmq_context;                        // ZMQ context
    zmq::socket_t *heartbeat_pub_sock;                 // heartbeat publisher socket
    zmq::socket_t *event_pub_sock{nullptr};            // events publisher socket
    std::map<std::string, McastSocketPub> event_mcast; // multicast socket(s) map
                                                       // The key is the full event name
                                                       // ie: tango://kidiboo.esrf.fr:1000/dev/test/10/att.change

    std::string heartbeat_endpoint;     // heartbeat publisher endpoint
    std::string host_ip;                // Host IP address
    std::vector<std::string> alt_ip;    // Host alternate IP addresses
    std::string heartbeat_event_name;   // The event name used for the heartbeat
    ZmqCallInfo heartbeat_call;         // The heartbeat call info
    cdrMemoryStream heartbeat_call_cdr; //
    TangoCdrMemoryStream data_call_cdr;
    std::string event_name;
    std::vector<std::string> alternate_h_endpoint; // Alternate heartbeat endpoint (host with several NIC)

    zmq::message_t endian_mess;             // Zmq messages
    zmq::message_t endian_mess_2;           //
    zmq::message_t endian_mess_heartbeat;   //
    zmq::message_t endian_mess_heartbeat_2; //
    zmq::message_t heartbeat_call_mess;     //
    zmq::message_t heartbeat_call_mess_2;   //

    unsigned char host_endian; // the host endianess

    bool ip_specified;          // The user has specified an IP address
    bool name_specified{false}; // The user has specified a name as IP address
    std::string user_ip;        // The specified IP address

    std::string event_endpoint;                    // event publisher endpoint
    std::vector<std::string> alternate_e_endpoint; // Alternate event endpoint (host with several NIC)

    std::map<std::string, unsigned int> event_cptr; // event counter map

    std::list<ConnectedClient> con_client; // Connected clients
    int double_send{0};                    // Double send ctr
    bool double_send_heartbeat{false};

    int zmq_release; // ZMQ lib release

    int calling_th;

    void tango_bind(zmq::socket_t *, std::string &);
    unsigned char test_endian();
    void create_mcast_socket(const std::string &, int, McastSocketPub &);
    size_t get_blob_data_nb(DevVarPipeDataEltArray &);
    size_t get_data_elt_data_nb(DevPipeDataElt &);
    std::string ctr_event_name;

    /// Rebuild the heartbeat event name
    void rebuild_heartbeat_event_name();
};

} // namespace Tango

#endif
Loading