Ticket #732: Add pub sub server sample

parent c3ae1e40
Pipeline #160162625 failed with stages
in 23 minutes and 35 seconds
# SPDX-License-Identifier: Apache-2.0
cmake_minimum_required(VERSION 3.13.1)
include($ENV{ZEPHYR_BASE}/cmake/app/boilerplate.cmake NO_POLICY_SCOPE)
project(pubsub_test_server)
add_definitions (-DWITH_STATIC_SECURITY_DATA=1 -DPUBSUB_STATIC_CONFIG=1 -DWITH_NANO_EXTENDED=1)
add_definitions (-DMY_IP_MASK="255.255.255.0" -DMY_IP_ETH0="192.0.2.10" -DMY_IP_GW="192.0.2.2" -DMY_IP6_ETH0="2001:db8::2" -DMY_IP6_MASK="ffff:ffff:")
add_definitions (-DPUBLISHER_ADDRESS="232.1.2.100:4840" -DSUBSCRIBER_ADDRESS="232.1.2.101:4840")
add_definitions (-DPUBLISH_PERIOD=1000 -DSUBSCRIBE_PERIOD=1000)
# add_definitions (-DDEBUG_PUBSUB_SCHEDULER_INFO=1)
# Add TEST include directory
SET(app_headers ${CMAKE_CURRENT_SOURCE_DIR}/src)
target_include_directories(app PRIVATE ${app_headers})
# =====================
# MBEDTLS SPECIFIC DEFS
# =====================
# custom config file
SET(mbedtls_config ${CMAKE_CURRENT_SOURCE_DIR}/src/tls_config)
target_include_directories(app PRIVATE ${tls_config})
zephyr_include_directories(${mbedtls_config})
FILE(GLOB tls_files ${CMAKE_CURRENT_SOURCE_DIR}/src/tls_config/*.c)
target_sources(app PRIVATE ${tls_files})
# ============================
# END OF MBEDTLS SPECIFIC DEFS
# ============================
# Add PUBSUB APPS includes directories
FILE(GLOB app_headers_pubsub_src_apps_pubsub_server ${CMAKE_CURRENT_SOURCE_DIR}/src/pubsub_server)
target_include_directories(app PRIVATE ${app_headers_pubsub_src_apps_pubsub_server})
# Add PUBSUB APPS sources directories
FILE(GLOB app_sources_pubsub_src_apps_pubsub_server_helpers_c ${CMAKE_CURRENT_SOURCE_DIR}/src/pubsub_server/helpers.c)
target_sources(app PRIVATE ${app_sources_pubsub_src_apps_pubsub_server_helpers_c})
FILE(GLOB app_sources_pubsub_src_apps_pubsub_server_pubsub_c ${CMAKE_CURRENT_SOURCE_DIR}/src/pubsub_server/pubsub.c)
target_sources(app PRIVATE ${app_sources_pubsub_src_apps_pubsub_server_pubsub_c})
FILE(GLOB app_sources_pubsub_src_apps_pubsub_server_server_c ${CMAKE_CURRENT_SOURCE_DIR}/src/pubsub_server/server.c)
target_sources(app PRIVATE ${app_sources_pubsub_src_apps_pubsub_server_server_c})
# Add TEST source directory
FILE(GLOB app_sources ${CMAKE_CURRENT_SOURCE_DIR}/src/*.c)
target_sources(app PRIVATE ${app_sources})
# Add other path.
set(gen_dir ${ZEPHYR_BINARY_DIR}/include/generated/)
CONFIG_NET_L2_ETHERNET=y
CONFIG_NET_QEMU_ETHERNET=y
CONFIG_ETH_E1000=y
CONFIG_PCIE=y
#CONFIG_ETHERNET_LOG_LEVEL_DBG=y
# General config
CONFIG_NEWLIB_LIBC=y
CONFIG_MAIN_STACK_SIZE=8192
CONFIG_SOCKS=y
CONFIG_POSIX_MAX_FDS=16
CONFIG_INIT_STACKS=y
# Networking config
CONFIG_NETWORKING=y
CONFIG_NET_IPV4=y
CONFIG_NET_TCP=y
CONFIG_NET_SOCKETS=y
CONFIG_NET_SOCKETS_POSIX_NAMES=y
CONFIG_NET_SOCKETS_POLL_MAX=5
CONFIG_NET_L2_DUMMY=y
CONFIG_NET_L2_ETHERNET=y
CONFIG_NET_LOOPBACK=y
CONFIG_NET_ARP=y
CONFIG_NET_MGMT=y
CONFIG_DNS_RESOLVER=y
CONFIG_NET_DEFAULT_IF_ETHERNET=y
CONFIG_NET_MAX_CONN=16
CONFIG_NET_TX_STACK_SIZE=1500
CONFIG_NET_IF_MAX_IPV4_COUNT=2
CONFIG_NET_IF_MCAST_IPV4_ADDR_COUNT=16
CONFIG_NET_BUF_DATA_SIZE=512
# Network driver config
CONFIG_TEST_RANDOM_GENERATOR=y
# Network address config
CONFIG_NET_CONFIG_SETTINGS=y
CONFIG_NET_CONFIG_SETTINGS=y
CONFIG_NET_CONFIG_NEED_IPV4=y
CONFIG_NET_CONFIG_MY_IPV4_ADDR="192.0.2.10"
CONFIG_NET_CONFIG_MY_IPV4_NETMASK="255.255.255.0"
CONFIG_NET_CONFIG_MY_IPV4_GW="192.0.2.2"
# MbedTLS
CONFIG_MBEDTLS=y
CONFIG_MBEDTLS_DTLS=y
CONFIG_MBEDTLS_GENPRIME_ENABLED=y
CONFIG_MBEDTLS_HEAP_SIZE=60000
CONFIG_MBEDTLS_HMAC_DRBG_ENABLED=y
CONFIG_MBEDTLS_ENTROPY_ENABLED=y
CONFIG_MBEDTLS_ENABLE_HEAP=y
CONFIG_MBEDTLS_KEY_EXCHANGE_PSK_ENABLED=y
CONFIG_MBEDTLS_KEY_EXCHANGE_RSA_PSK_ENABLED=y
CONFIG_MBEDTLS_PEM_CERTIFICATE_FORMAT=y
CONFIG_MBEDTLS_SSL_EXPORT_KEYS=y
CONFIG_MBEDTLS_SSL_MAX_CONTENT_LEN=2048
CONFIG_MBEDTLS_TLS_VERSION_1_0=y
CONFIG_MBEDTLS_TLS_VERSION_1_1=y
CONFIG_MBEDTLS_USER_CONFIG_ENABLE=y
CONFIG_MBEDTLS_USER_CONFIG_FILE="config_custom_mbedtls.h"
# S2OPC
CONFIG_S2OPC=y
/*
* network_init.c
*
* Created on: 17 f�vr. 2020
* Author: nottin
*/
#include "network_init.h"
#include <errno.h>
#include <inttypes.h>
#include <kernel.h>
#include <limits.h>
#include <assert.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <kernel.h>
#include <net/net_ip.h>
#include <net/socket.h>
#include "net/ethernet.h"
#include "net/net_if.h"
#include "sopc_mutexes.h"
#ifndef MY_IP_LB
#define MY_IP_LB ((const char*) ("127.0.0.1"))
#endif
#ifndef MY_IP_ETH0
#define MY_IP_ETH0 ((const char*) ("192.168.1.102"))
#endif
#ifndef MY_IP_MASK
#define MY_IP_MASK ((const char*) ("255.255.255.0"))
#endif
#ifndef MY_IP_GW
#define MY_IP_GW ((const char*) ("192.168.1.111"))
#endif
typedef enum E_NETWORK_CONFIG_STATUS
{
NETWORK_CONFIG_STATUS_NOT_INITIALIZED,
NETWORK_CONFIG_STATUS_INITIALIZING,
NETWORK_CONFIG_STATUS_INITIALIZED,
NETWORK_CONFIG_STATUS_SIZE = INT32_MAX
} eNetworkConfigStatus;
/* Max socket based on max connections allowed by zephyr */
#ifdef CONFIG_NET_MAX_CONN
#define MAX_ZEPHYR_SOCKET (CONFIG_NET_MAX_CONN - 2)
#else
#define MAX_ZEPHYR_SOCKET 4
#endif
static volatile eNetworkConfigStatus priv_P_SOCKET_networkConfigStatus = NETWORK_CONFIG_STATUS_NOT_INITIALIZED;
static Mutex priv_lockL2; // tabMCast protection
// *** Weak p_sockets functions definitions ***
bool Network_Initialize(void)
{
uint32_t nwStatus = 0;
printk("\r\nNetwork initialize called\r\n");
do
{
nwStatus = __sync_val_compare_and_swap(&priv_P_SOCKET_networkConfigStatus, //
NETWORK_CONFIG_STATUS_NOT_INITIALIZED, // Not initialized
NETWORK_CONFIG_STATUS_INITIALIZING); // Initializing
if (0 == nwStatus)
{
assert(SOPC_STATUS_OK == Mutex_Initialization(&priv_lockL2));
struct net_if* ptrNetIf = NULL;
struct in_addr addressLoopBack;
struct in_addr addressLoopBackNetMask;
struct in_addr addressInterfaceEth;
struct in_addr addressInterfaceEthMask;
struct in_addr addressInterfaceEthGtw;
net_addr_pton(AF_INET, MY_IP_LB, (void*) &addressLoopBack);
net_addr_pton(AF_INET, MY_IP_MASK, (void*) &addressLoopBackNetMask);
net_addr_pton(AF_INET, MY_IP_ETH0, (void*) &addressInterfaceEth);
net_addr_pton(AF_INET, MY_IP_MASK, (void*) &addressInterfaceEthMask);
net_addr_pton(AF_INET, MY_IP_GW, (void*) &addressInterfaceEthGtw);
if (NULL == net_if_ipv4_addr_lookup(&addressLoopBack, &ptrNetIf))
{
#if defined(CONFIG_NET_L2_DUMMY)
ptrNetIf = net_if_get_first_by_type(&NET_L2_GET_NAME(DUMMY));
#else
#error "CONFIG_NET_L2_DUMMY should be defined."
#endif
assert(NULL != ptrNetIf);
assert(NULL != net_if_ipv4_addr_add(ptrNetIf, &addressLoopBack, NET_ADDR_MANUAL, 0));
net_if_ipv4_set_netmask(ptrNetIf, &addressLoopBackNetMask);
}
if (NULL == net_if_ipv4_addr_lookup(&addressInterfaceEth, &ptrNetIf))
{
#if defined(CONFIG_NET_L2_ETHERNET)
ptrNetIf = net_if_get_first_by_type(&NET_L2_GET_NAME(ETHERNET));
#else
#error "CONFIG_NET_L2_ETHERNET should be defined"
#endif
assert(NULL != ptrNetIf);
assert(NULL != net_if_ipv4_addr_add(ptrNetIf, &addressInterfaceEth, NET_ADDR_MANUAL, 0));
net_if_ipv4_set_gw(ptrNetIf, &addressInterfaceEthGtw);
net_if_ipv4_set_netmask(ptrNetIf, &addressInterfaceEthMask);
}
priv_P_SOCKET_networkConfigStatus = NETWORK_CONFIG_STATUS_INITIALIZED;
nwStatus = NETWORK_CONFIG_STATUS_INITIALIZED;
}
else
{
// Initializing on going, yield and retry
if (NETWORK_CONFIG_STATUS_INITIALIZING == nwStatus)
{
k_yield();
}
}
} while (NETWORK_CONFIG_STATUS_INITIALIZING == nwStatus);
// Initialized or not initialized
if (NETWORK_CONFIG_STATUS_INITIALIZED == nwStatus)
{
return true;
}
else
{
return false;
}
}
#ifndef __NETWORK_INIT_H__
#define __NETWORK_INIT_H__
#include <stdbool.h>
bool Network_Initialize(void);
#endif /* __NETWORK_INIT_H__ */
\ No newline at end of file
/* Copyright (C) Systerel SAS 2019, all rights reserved. */
#include "pubsub_config_static.h"
#include <assert.h>
#include <stdbool.h>
#include <string.h>
#ifndef PUBLISH_PERIOD
#define PUBLISH_PERIOD 1000
#endif
#ifndef SUBSCRIBE_PERIOD
#define SUBSCRIBE_PERIOD 1000
#endif
#ifndef PUBLISHER_ADDRESS
#define PUBLISHER_ADDRESS "232.1.2.100:4840"
#endif
#ifndef SUBSCRIBER_ADDRESS
#define SUBSCRIBER_ADDRESS "232.1.2.101:4840"
#endif
static SOPC_DataSetWriter* SOPC_PubSubConfig_SetPubMessageAt(SOPC_PubSubConnection* connection,
uint16_t index,
uint16_t messageId,
uint32_t version,
uint64_t interval)
{
SOPC_WriterGroup* group = SOPC_PubSubConnection_Get_WriterGroup_At(connection, index);
SOPC_WriterGroup_Set_Id(group, messageId);
SOPC_WriterGroup_Set_Version(group, version);
SOPC_WriterGroup_Set_PublishingInterval(group, interval);
// Create one DataSet Writer
SOPC_WriterGroup_Allocate_DataSetWriter_Array(group, 1);
SOPC_DataSetWriter* writer = SOPC_WriterGroup_Get_DataSetWriter_At(group, 0);
SOPC_DataSetWriter_Set_Id(writer, messageId);
return writer;
}
static SOPC_PublishedDataSet* SOPC_PubSubConfig_InitDataSet(SOPC_PubSubConfiguration* config,
uint16_t dataSetIndex,
SOPC_DataSetWriter* writer,
uint16_t nbVar)
{
SOPC_PublishedDataSet* dataset = SOPC_PubSubConfiguration_Get_PublishedDataSet_At(config, dataSetIndex);
SOPC_PublishedDataSet_Init(dataset, SOPC_PublishedDataItemsDataType, nbVar);
SOPC_DataSetWriter_Set_DataSet(writer, dataset);
return dataset;
}
static void SOPC_PubSubConfig_SetPubVariableAt(SOPC_PublishedDataSet* dataset,
uint16_t index,
char* strNodeId,
SOPC_BuiltinId builtinType)
{
SOPC_FieldMetaData* fieldmetadata = SOPC_PublishedDataSet_Get_FieldMetaData_At(dataset, index);
SOPC_FieldMetaData_Set_ValueRank(fieldmetadata, -1);
SOPC_FieldMetaData_Set_BuiltinType(fieldmetadata, builtinType);
SOPC_PublishedVariable* publishedVar = SOPC_FieldMetaData_Get_PublishedVariable(fieldmetadata);
assert(NULL != publishedVar);
SOPC_NodeId* nodeId = SOPC_NodeId_FromCString(strNodeId, (int32_t) strlen(strNodeId));
assert(NULL != nodeId);
SOPC_PublishedVariable_Set_NodeId(publishedVar, nodeId);
SOPC_PublishedVariable_Set_AttributeId(publishedVar,
13); // Value => AttributeId=13
}
static SOPC_DataSetReader* SOPC_PubSubConfig_SetSubMessageAt(SOPC_PubSubConnection* connection,
uint16_t index,
uint32_t publisherId,
uint16_t messageId,
uint32_t version,
uint64_t interval)
{
SOPC_ReaderGroup* readerGroup = SOPC_PubSubConnection_Get_ReaderGroup_At(connection, index);
assert(readerGroup != NULL);
bool allocSuccess = SOPC_ReaderGroup_Allocate_DataSetReader_Array(readerGroup, 1);
if (allocSuccess)
{
SOPC_DataSetReader* reader = SOPC_ReaderGroup_Get_DataSetReader_At(readerGroup, 0);
assert(reader != NULL);
SOPC_DataSetReader_Set_WriterGroupVersion(reader, version);
SOPC_DataSetReader_Set_WriterGroupId(reader, messageId);
SOPC_DataSetReader_Set_DataSetWriterId(reader,
messageId); // Same as WriterGroup
SOPC_DataSetReader_Set_ReceiveTimeout(reader, 2 * interval);
SOPC_DataSetReader_Set_PublisherId_UInteger(reader, publisherId);
return reader;
}
return NULL;
}
static bool SOPC_PubSubConfig_SetSubNbVariables(SOPC_DataSetReader* reader, uint16_t nbVar)
{
return SOPC_DataSetReader_Allocate_FieldMetaData_Array(reader, SOPC_TargetVariablesDataType, nbVar);
}
static void SOPC_PubSubConfig_SetSubVariableAt(SOPC_DataSetReader* reader,
uint16_t index,
char* strNodeId,
SOPC_BuiltinId builtinType)
{
SOPC_FieldMetaData* fieldmetadata = SOPC_DataSetReader_Get_FieldMetaData_At(reader, index);
assert(fieldmetadata != NULL);
/* fieldmetadata: type the field */
SOPC_FieldMetaData_Set_ValueRank(fieldmetadata, -1);
SOPC_FieldMetaData_Set_BuiltinType(fieldmetadata, builtinType);
/* FieldTarget: link to the source/target data */
SOPC_FieldTarget* fieldTarget = SOPC_FieldMetaData_Get_TargetVariable(fieldmetadata);
assert(fieldTarget != NULL);
SOPC_NodeId* nodeId = SOPC_NodeId_FromCString(strNodeId, (int32_t) strlen(strNodeId));
SOPC_FieldTarget_Set_NodeId(fieldTarget, nodeId);
SOPC_FieldTarget_Set_AttributeId(fieldTarget, 13); // Value => AttributeId=13
}
SOPC_PubSubConfiguration* SOPC_PubSubConfig_GetStatic()
{
bool alloc = true;
SOPC_PubSubConfiguration* config = SOPC_PubSubConfiguration_Create();
SOPC_PubSubConnection* connection;
/* 1 connection pub */
alloc = SOPC_PubSubConfiguration_Allocate_PubConnection_Array(config, 1);
/** connection pub 0 **/
if (alloc)
{
// Set publisher id and address
connection = SOPC_PubSubConfiguration_Get_PubConnection_At(config, 0);
SOPC_PubSubConnection_Set_PublisherId_UInteger(connection, 42);
alloc = SOPC_PubSubConnection_Set_Address(connection, "opc.udp://" PUBLISHER_ADDRESS);
}
if (alloc)
{
// 2 pub messages
alloc = SOPC_PubSubConnection_Allocate_WriterGroup_Array(connection, 2);
}
if (alloc)
{
// 2 published data sets
alloc = SOPC_PubSubConfiguration_Allocate_PublishedDataSet_Array(config, 2);
}
/*** Pub Message 14 ***/
SOPC_DataSetWriter* writer = NULL;
if (alloc)
{
writer = SOPC_PubSubConfig_SetPubMessageAt(connection, 0, 14, 1, PUBLISH_PERIOD);
alloc = NULL != writer;
}
SOPC_PublishedDataSet* dataset = NULL;
if (alloc)
{
dataset = SOPC_PubSubConfig_InitDataSet(config, 0, writer, 2);
alloc = NULL != dataset;
}
if (alloc)
{
SOPC_PubSubConfig_SetPubVariableAt(dataset, 0, "ns=1;s=PubBool", SOPC_Boolean_Id);
SOPC_PubSubConfig_SetPubVariableAt(dataset, 1, "ns=1;s=PubString", SOPC_String_Id);
}
/*** Pub Message 15 ***/
if (alloc)
{
writer = SOPC_PubSubConfig_SetPubMessageAt(connection, 1, 15, 1, PUBLISH_PERIOD);
alloc = NULL != writer;
}
if (alloc)
{
dataset = SOPC_PubSubConfig_InitDataSet(config, 1, writer, 2);
alloc = NULL != dataset;
}
if (alloc)
{
SOPC_PubSubConfig_SetPubVariableAt(dataset, 0, "ns=1;s=PubInt", SOPC_Int64_Id);
SOPC_PubSubConfig_SetPubVariableAt(dataset, 1, "ns=1;s=PubUInt", SOPC_UInt64_Id);
}
/* 1 connection Sub */
alloc = SOPC_PubSubConfiguration_Allocate_SubConnection_Array(config, 1);
/** connection sub 0 **/
if (alloc)
{
// Set subscriber id and address
connection = SOPC_PubSubConfiguration_Get_SubConnection_At(config, 0);
alloc = SOPC_PubSubConnection_Set_Address(connection, "opc.udp://" SUBSCRIBER_ADDRESS);
}
if (alloc)
{
// 2 sub messages
alloc = SOPC_PubSubConnection_Allocate_ReaderGroup_Array(connection, 2);
}
SOPC_DataSetReader* reader = NULL;
/*** Sub Message 14 ***/
if (alloc)
{
reader = SOPC_PubSubConfig_SetSubMessageAt(connection, 0, 42, 14, 1, SUBSCRIBE_PERIOD);
alloc = NULL != reader;
}
if (alloc)
{
alloc = SOPC_PubSubConfig_SetSubNbVariables(reader, 2);
}
if (alloc)
{
SOPC_PubSubConfig_SetSubVariableAt(reader, 0, "ns=1;s=SubBool", SOPC_Boolean_Id);
SOPC_PubSubConfig_SetSubVariableAt(reader, 1, "ns=1;s=SubString", SOPC_String_Id);
}
/*** Sub Message 15 ***/
if (alloc)
{
reader = SOPC_PubSubConfig_SetSubMessageAt(connection, 1, 42, 15, 1, SUBSCRIBE_PERIOD);
alloc = NULL != reader;
}
if (alloc)
{
alloc = SOPC_PubSubConfig_SetSubNbVariables(reader, 2);
}
if (alloc)
{
SOPC_PubSubConfig_SetSubVariableAt(reader, 0, "ns=1;s=SubInt", SOPC_Int64_Id);
SOPC_PubSubConfig_SetSubVariableAt(reader, 1, "ns=1;s=SubUInt", SOPC_UInt64_Id);
}
if (!alloc)
{
SOPC_PubSubConfiguration_Delete(config);
return NULL;
}
return config;
}
/*
* Licensed to Systerel under one or more contributor license
* agreements. See the NOTICE file distributed with this work
* for additional information regarding copyright ownership.
* Systerel licenses this file to you under the Apache
* License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/** Hard-coded configuration */
#ifndef CONFIG_H_
#define CONFIG_H_
#define xstr(s) str(s)
#define str(s) #s
/* Server Configuration */
#define SERVER_CERT_PATH "./server_public/server_2k_cert.der"
#define SERVER_KEY_PATH "./server_private/server_2k_key.pem"
#define CA_CERT_PATH "./trusted/cacert.der"
#define CA_CRL_PATH "./revoked/cacrl.der"
#define DEFAULT_ENDPOINT_URL "opc.tcp://localhost:4841"
#define APPLICATION_URI "urn:S2OPC:localhost"
#define PRODUCT_URI "urn:S2OPC:localhost"
#define SERVER_DESCRIPTION "S2OPC Server + PubSub"
#define LOG_PATH "./logs/"
#define ADDRESS_SPACE_PATH "./pubsub_server.xml"
#define SLEEP_TIMEOUT 100
#define NODEID_PUBSUB_STATUS "ns=1;s=PubSubStatus"
#define NODEID_PUBSUB_CONFIG "ns=1;s=PubSubConfiguration"
#define NODEID_PUBSUB_COMMAND "ns=1;s=PubSubStartStop"
#define SYNCHRONOUS_READ_TIMEOUT 10000
extern char* ENDPOINT_URL;
/* Sub Configuration */
#define PUBSUB_CONFIG_PATH "./config_pubsub_server.xml"
#endif /* CONFIG_H_ */
/*
* Licensed to Systerel under one or more contributor license
* agreements. See the NOTICE file distributed with this work
* for additional information regarding copyright ownership.
* Systerel licenses this file to you under the Apache
* License, Version 2.0 (the "License"); you may not use this