Commit 517298d1 authored by Hanspeter Portner's avatar Hanspeter Portner

Merge commit '904309f8'

parents 9bc4f298 904309f8
Pipeline #22079748 failed with stages
in 5 minutes and 33 seconds
......@@ -86,14 +86,15 @@ struct _LV2_OSC_Stream {
int protocol;
bool server;
bool slip;
bool connected;
int sock;
int fd;
LV2_OSC_Address self;
LV2_OSC_Address peer;
const LV2_OSC_Driver *driv;
void *data;
uint8_t tx_buf [8092];
uint8_t rx_buf [8092];
uint8_t tx_buf [0x4000];
uint8_t rx_buf [0x4000];
size_t rx_off;
};
......@@ -173,8 +174,6 @@ lv2_osc_stream_init(LV2_OSC_Stream *stream, const char *url,
const char *iface = NULL;
const char *service = NULL;
char *colon = strrchr(ptr, ':');
// optional IPv6
if(ptr[0] == '[')
{
......@@ -380,7 +379,10 @@ lv2_osc_stream_init(LV2_OSC_Stream *stream, const char *url,
}
else // client
{
connect(stream->sock, &stream->peer.in, stream->peer.len);
if(connect(stream->sock, &stream->peer.in, stream->peer.len) == 0)
{
stream->connected = true;
}
}
}
else
......@@ -505,7 +507,10 @@ lv2_osc_stream_init(LV2_OSC_Stream *stream, const char *url,
}
else // client
{
connect(stream->sock, &stream->peer.in, stream->peer.len);
if(connect(stream->sock, &stream->peer.in, stream->peer.len) == 0)
{
stream->connected = true;
}
}
}
else
......@@ -533,7 +538,7 @@ fail:
if(stream->sock >= 0)
{
close(stream->sock);
stream->sock = 0;
stream->sock = -1;
}
return -1;
......@@ -642,99 +647,157 @@ lv2_osc_slip_decode_inline(uint8_t *dst, size_t len, size_t *size)
}
static LV2_OSC_Enum
lv2_osc_stream_run(LV2_OSC_Stream *stream)
_lv2_osc_stream_run_udp(LV2_OSC_Stream *stream)
{
LV2_OSC_Enum ev = LV2_OSC_NONE;
// handle connections
if( (stream->socket_type == SOCK_STREAM)
&& (stream->server)
&& (stream->fd <= 0)) // no peer
// send everything
if(stream->peer.len) // has a peer
{
stream->peer.len = sizeof(stream->peer.in);
stream->fd = accept(stream->sock, &stream->peer.in, &stream->peer.len);
const uint8_t *buf;
size_t tosend;
if(stream->fd > 0)
while( (buf = stream->driv->read_req(stream->data, &tosend)) )
{
const int flag = 1;
const int sendbuff = LV2_OSC_STREAM_SNDBUF;
const int recvbuff = LV2_OSC_STREAM_RCVBUF;
const ssize_t sent = sendto(stream->sock, buf, tosend, 0,
&stream->peer.in, stream->peer.len);
if(fcntl(stream->fd, F_SETFL, O_NONBLOCK) == -1)
if(sent == -1)
{
fprintf(stderr, "%s: fcntl failed\n", __func__);
}
if( (errno == EAGAIN) || (errno == EWOULDBLOCK) )
{
// full queue
break;
}
if(setsockopt(stream->fd, stream->protocol,
TCP_NODELAY, &flag, sizeof(int)) != 0)
{
fprintf(stderr, "%s: setsockopt failed\n", __func__);
fprintf(stderr, "%s: sendto: %s\n", __func__, strerror(errno));
break;
}
if(setsockopt(stream->sock, SOL_SOCKET,
SO_KEEPALIVE, &flag, sizeof(int)) != 0)
else if(sent != (ssize_t)tosend)
{
fprintf(stderr, "%s: setsockopt failed\n", __func__);
fprintf(stderr, "%s: only sent %zi of %zu bytes", __func__, sent, tosend);
break;
}
if(setsockopt(stream->fd, SOL_SOCKET,
SO_SNDBUF, &sendbuff, sizeof(int))== -1)
stream->driv->read_adv(stream->data);
ev |= LV2_OSC_SEND;
}
}
// recv everything
{
uint8_t *buf;
size_t max_len;
while( (buf = stream->driv->write_req(stream->data,
LV2_OSC_STREAM_REQBUF, &max_len)) )
{
struct sockaddr in;
socklen_t in_len = sizeof(in);
const ssize_t recvd = recvfrom(stream->sock, buf, max_len, 0,
&in, &in_len);
if(recvd == -1)
{
fprintf(stderr, "%s: setsockopt failed\n", __func__);
}
if( (errno == EAGAIN) || (errno == EWOULDBLOCK) )
{
// empty queue
break;
}
if(setsockopt(stream->fd, SOL_SOCKET,
SO_RCVBUF, &recvbuff, sizeof(int))== -1)
fprintf(stderr, "%s: recv: %s\n", __func__, strerror(errno));
break;
}
else if(recvd == 0)
{
fprintf(stderr, "%s: setsockopt failed\n", __func__);
// peer has shut down
break;
}
//FIXME ev |=
stream->peer.len = in_len;
stream->peer.in = in;
stream->driv->write_adv(stream->data, recvd);
ev |= LV2_OSC_RECV;
}
}
// send everything
if(stream->socket_type == SOCK_DGRAM)
return ev;
}
static LV2_OSC_Enum
_lv2_osc_stream_run_tcp(LV2_OSC_Stream *stream)
{
LV2_OSC_Enum ev = LV2_OSC_NONE;
// handle connections
if(!stream->connected) // no peer
{
if(stream->peer.len) // has a peer
if(stream->server)
{
const uint8_t *buf;
size_t tosend;
stream->peer.len = sizeof(stream->peer.in);
stream->fd = accept(stream->sock, &stream->peer.in, &stream->peer.len);
while( (buf = stream->driv->read_req(stream->data, &tosend)) )
if(stream->fd >= 0)
{
const ssize_t sent = sendto(stream->sock, buf, tosend, 0,
&stream->peer.in, stream->peer.len);
const int flag = 1;
const int sendbuff = LV2_OSC_STREAM_SNDBUF;
const int recvbuff = LV2_OSC_STREAM_RCVBUF;
if(sent == -1)
if(fcntl(stream->fd, F_SETFL, O_NONBLOCK) == -1)
{
if( (errno = EAGAIN) || (errno == EWOULDBLOCK) )
{
// full queue
break;
}
fprintf(stderr, "%s: fcntl failed\n", __func__);
}
fprintf(stderr, "%s: sendto: %s\n", __func__, strerror(errno));
break;
if(setsockopt(stream->fd, stream->protocol,
TCP_NODELAY, &flag, sizeof(int)) != 0)
{
fprintf(stderr, "%s: setsockopt failed\n", __func__);
}
else if(sent != (ssize_t)tosend)
if(setsockopt(stream->sock, SOL_SOCKET,
SO_KEEPALIVE, &flag, sizeof(int)) != 0)
{
fprintf(stderr, "%s: only sent %zi of %zu bytes", __func__, sent, tosend);
break;
fprintf(stderr, "%s: setsockopt failed\n", __func__);
}
stream->driv->read_adv(stream->data);
ev |= LV2_OSC_SEND;
if(setsockopt(stream->fd, SOL_SOCKET,
SO_SNDBUF, &sendbuff, sizeof(int))== -1)
{
fprintf(stderr, "%s: setsockopt failed\n", __func__);
}
if(setsockopt(stream->fd, SOL_SOCKET,
SO_RCVBUF, &recvbuff, sizeof(int))== -1)
{
fprintf(stderr, "%s: setsockopt failed\n", __func__);
}
stream->connected = true;
//fprintf(stderr, "%s: orderly accept\n", __func__);
//FIXME ev |=
}
}
else
{
if(connect(stream->sock, &stream->peer.in, stream->peer.len) == 0)
{
stream->connected = true;
//fprintf(stderr, "%s: orderly (re)connect\n", __func__);
//FIXME ev |=
}
}
}
else if(stream->socket_type == SOCK_STREAM)
// send everything
if(stream->connected)
{
const int fd = stream->server
? stream->fd
: stream->sock;
if(fd > 0)
if(fd >= 0)
{
const uint8_t *buf;
size_t tosend;
......@@ -779,23 +842,17 @@ lv2_osc_stream_run(LV2_OSC_Stream *stream)
{
if( (errno == EAGAIN) || (errno == EWOULDBLOCK) )
{
// empty queue
break;
}
else if(errno == ECONNRESET)
else if(stream->server)
{
if(stream->server)
{
// peer has shut down
close(stream->fd);
stream->fd = 0;
break;
}
else
{
assert(false); //FIXME reconnect
}
// peer has shut down
close(stream->fd);
stream->fd = -1;
}
stream->connected = false;
fprintf(stderr, "%s: send: %s\n", __func__, strerror(errno));
break;
}
......@@ -812,51 +869,13 @@ lv2_osc_stream_run(LV2_OSC_Stream *stream)
}
// recv everything
if(stream->socket_type == SOCK_DGRAM)
{
uint8_t *buf;
size_t max_len;
while( (buf = stream->driv->write_req(stream->data,
LV2_OSC_STREAM_REQBUF, &max_len)) )
{
struct sockaddr in;
socklen_t in_len = sizeof(in);
const ssize_t recvd = recvfrom(stream->sock, buf, max_len, 0,
&in, &in_len);
if(recvd == -1)
{
if( (errno == EAGAIN) || (errno == EWOULDBLOCK) )
{
// empty queue
break;
}
fprintf(stderr, "%s: recv: %s\n", __func__, strerror(errno));
break;
}
else if(recvd == 0)
{
// peer has shut down
break;
}
stream->peer.len = in_len;
stream->peer.in = in;
stream->driv->write_adv(stream->data, recvd);
ev |= LV2_OSC_RECV;
}
}
else if(stream->socket_type == SOCK_STREAM)
if(stream->connected)
{
const int fd = stream->server
? stream->fd
: stream->sock;
if(fd > 0)
if(fd >= 0)
{
if(stream->slip) // SLIP framed
{
......@@ -872,29 +891,28 @@ lv2_osc_stream_run(LV2_OSC_Stream *stream)
// empty queue
break;
}
else if(errno == ECONNRESET)
else if(stream->server)
{
if(stream->server)
{
// peer has shut down
close(stream->fd);
stream->fd = 0;
break;
}
else
{
assert(false); //FIXME reconnect
}
// peer has shut down
close(stream->fd);
stream->fd = -1;
}
fprintf(stderr, "%s: recv: %s\n", __func__, strerror(errno));
stream->connected = false;
fprintf(stderr, "%s: recv(slip): %s\n", __func__, strerror(errno));
break;
}
else if( (recvd == 0) && stream->server)
else if(recvd == 0)
{
// peer has shut down
close(stream->fd);
stream->fd = 0;
if(stream->server)
{
// peer has shut down
close(stream->fd);
stream->fd = -1;
}
stream->connected = false;
//fprintf(stderr, "%s: recv(slip): %s\n", __func__, "orderly shutdown");
break;
}
......@@ -904,11 +922,11 @@ lv2_osc_stream_run(LV2_OSC_Stream *stream)
while(recvd > 0)
{
size_t size;
const size_t parsed = lv2_osc_slip_decode_inline(ptr, recvd, &size);
size_t parsed = lv2_osc_slip_decode_inline(ptr, recvd, &size);
if(size) // dispatch
{
uint8_t *buf ;
uint8_t *buf;
if( (buf = stream->driv->write_req(stream->data, size, NULL)) )
{
......@@ -919,7 +937,8 @@ lv2_osc_stream_run(LV2_OSC_Stream *stream)
}
else
{
fprintf(stderr, "%s: write buffer overflow", __func__);
parsed = 0;
fprintf(stderr, "%s: write buffer overflow\n", __func__);
}
}
......@@ -962,37 +981,35 @@ lv2_osc_stream_run(LV2_OSC_Stream *stream)
prefix = ntohl(prefix); //FIXME check prefix <= max_len
recvd = recv(fd, buf, prefix, 0);
}
if(recvd == -1)
else if(recvd == -1)
{
if( (errno == EAGAIN) || (errno == EWOULDBLOCK) )
{
// empty queue
break;
}
else if(errno == ECONNRESET)
else if(stream->server)
{
if(stream->server)
{
// peer has shut down
close(stream->fd);
stream->fd = 0;
break;
}
else
{
assert(false); //FIXME reconnect
}
// peer has shut down
close(stream->fd);
stream->fd = -1;
}
fprintf(stderr, "%s: recv: %s\n", __func__, strerror(errno));
stream->connected = false;
fprintf(stderr, "%s: recv(prefix): %s\n", __func__, strerror(errno));
break;
}
else if( (recvd == 0) && stream->server)
else if(recvd == 0)
{
// peer has shut down
close(stream->fd);
stream->fd = 0;
if(stream->server)
{
// peer has shut down
close(stream->fd);
stream->fd = -1;
}
stream->connected = false;
//fprintf(stderr, "%s: recv(prefix): %s\n", __func__, "orderly shutdown");
break;
}
......@@ -1006,19 +1023,39 @@ lv2_osc_stream_run(LV2_OSC_Stream *stream)
return ev;
}
static LV2_OSC_Enum
lv2_osc_stream_run(LV2_OSC_Stream *stream)
{
LV2_OSC_Enum ev = LV2_OSC_NONE;
switch(stream->socket_type)
{
case SOCK_DGRAM:
{
ev |= _lv2_osc_stream_run_udp(stream);
} break;
case SOCK_STREAM:
{
ev |= _lv2_osc_stream_run_tcp(stream);
} break;
}
return ev;
}
static int
lv2_osc_stream_deinit(LV2_OSC_Stream *stream)
{
if(stream->fd >= 0)
{
close(stream->fd);
stream->fd = 0;
stream->fd = -1;
}
if(stream->sock >= 0)
{
close(stream->sock);
stream->sock = 0;
stream->sock = -1;
}
return 0;
......
......@@ -32,6 +32,10 @@
extern "C" {
#endif
#ifndef __unused
# define __unused __attribute__((unused))
#endif
#undef LV2_ATOM_TUPLE_FOREACH // there is a bug in LV2 1.10.0
#define LV2_ATOM_TUPLE_FOREACH(tuple, iter) \
for (LV2_Atom* (iter) = lv2_atom_tuple_begin(tuple); \
......@@ -203,7 +207,8 @@ lv2_osc_argument_type(LV2_OSC_URID *osc_urid, const LV2_Atom *atom)
}
static inline const LV2_Atom *
lv2_osc_int32_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, int32_t *i)
lv2_osc_int32_get(LV2_OSC_URID *osc_urid __unused, const LV2_Atom *atom,
int32_t *i)
{
assert(i);
*i = ((const LV2_Atom_Int *)atom)->body;
......@@ -212,7 +217,8 @@ lv2_osc_int32_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, int32_t *i)
}
static inline const LV2_Atom *
lv2_osc_float_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, float *f)
lv2_osc_float_get(LV2_OSC_URID *osc_urid __unused, const LV2_Atom *atom,
float *f)
{
assert(f);
*f = ((const LV2_Atom_Float *)atom)->body;
......@@ -221,7 +227,8 @@ lv2_osc_float_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, float *f)
}
static inline const LV2_Atom *
lv2_osc_string_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, const char **s)
lv2_osc_string_get(LV2_OSC_URID *osc_urid __unused, const LV2_Atom *atom,
const char **s)
{
assert(s);
*s = LV2_ATOM_BODY_CONST(atom);
......@@ -230,8 +237,8 @@ lv2_osc_string_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, const char **s)
}
static inline const LV2_Atom *
lv2_osc_blob_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, uint32_t *size,
const uint8_t **b)
lv2_osc_blob_get(LV2_OSC_URID *osc_urid __unused, const LV2_Atom *atom,
uint32_t *size, const uint8_t **b)
{
assert(size && b);
*size = atom->size;
......@@ -241,7 +248,8 @@ lv2_osc_blob_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, uint32_t *size,
}
static inline const LV2_Atom *
lv2_osc_int64_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, int64_t *h)
lv2_osc_int64_get(LV2_OSC_URID *osc_urid __unused, const LV2_Atom *atom,
int64_t *h)
{
assert(h);
*h = ((const LV2_Atom_Long *)atom)->body;
......@@ -250,7 +258,8 @@ lv2_osc_int64_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, int64_t *h)
}
static inline const LV2_Atom *
lv2_osc_double_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, double *d)
lv2_osc_double_get(LV2_OSC_URID *osc_urid __unused, const LV2_Atom *atom,
double *d)
{
assert(d);
*d = ((const LV2_Atom_Double *)atom)->body;
......@@ -289,31 +298,32 @@ lv2_osc_timetag_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom,
}
static inline const LV2_Atom *
lv2_osc_true_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom)
lv2_osc_true_get(LV2_OSC_URID *osc_urid __unused, const LV2_Atom *atom)
{
return lv2_atom_tuple_next(atom);
}
static inline const LV2_Atom *
lv2_osc_false_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom)
lv2_osc_false_get(LV2_OSC_URID *osc_urid __unused, const LV2_Atom *atom)
{
return lv2_atom_tuple_next(atom);
}
static inline const LV2_Atom *
lv2_osc_nil_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom)
lv2_osc_nil_get(LV2_OSC_URID *osc_urid __unused, const LV2_Atom *atom)
{
return lv2_atom_tuple_next(atom);
}
static inline const LV2_Atom *
lv2_osc_impulse_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom)
lv2_osc_impulse_get(LV2_OSC_URID *osc_urid __unused, const LV2_Atom *atom)
{
return lv2_atom_tuple_next(atom);
}
static inline const LV2_Atom *
lv2_osc_symbol_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, LV2_URID *S)
lv2_osc_symbol_get(LV2_OSC_URID *osc_urid __unused, const LV2_Atom *atom,
LV2_URID *S)
{
assert(S);
*S = ((const LV2_Atom_URID *)atom)->body;
......@@ -322,8 +332,8 @@ lv2_osc_symbol_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, LV2_URID *S)
}
static inline const LV2_Atom *
lv2_osc_midi_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, uint32_t *size,
const uint8_t **m)
lv2_osc_midi_get(LV2_OSC_URID *osc_urid __unused, const LV2_Atom *atom,
uint32_t *size, const uint8_t **m)
{
assert(size && m);
*size = atom->size;
......@@ -333,7 +343,7 @@ lv2_osc_midi_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, uint32_t *size,
}
static inline const LV2_Atom *
lv2_osc_char_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, char *c)
lv2_osc_char_get(LV2_OSC_URID *osc_urid __unused, const LV2_Atom *atom, char *c)
{
assert(c);
const char *str = LV2_ATOM_CONTENTS_CONST(LV2_Atom_Literal, atom);
......@@ -343,7 +353,7 @@ lv2_osc_char_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom, char *c)
}
static inline const LV2_Atom *
lv2_osc_rgba_get(LV2_OSC_URID *osc_urid, const LV2_Atom *atom,
lv2_osc_rgba_get(LV2_OSC_URID *osc_urid __unused, const LV2_Atom *atom,
uint8_t *r, uint8_t *g, uint8_t *b, uint8_t *a)
{
assert(r && g && b && a);
......
CC ?= gcc
#C_FLAGS ?= -I../ -Wall -Wextra -Wpedantic $(shell pkg-config --cflags lv2) \
# -fprofile-arcs -ftest-coverage
C_FLAGS ?= -I../ $(shell pkg-config --cflags lv2) \
C_FLAGS ?= -I../ -Wall -Wextra -Wpedantic $(shell pkg-config --cflags lv2) \
-fprofile-arcs -ftest-coverage
all: osc_test
osc_test: osc_test.c ../osc.lv2/*.h
$(CC) -std=gnu11 -g -o $@ $< $(C_FLAGS) -lpthread
./$@
gcov $<
check: osc_test
./$<
gcov $<.c
clean:
rm -f osc_test *.gcov *.gc* vgcore.*
......@@ -2,6 +2,7 @@
#include <string.h>
#include <stdio.h>
#include <pthread.h>
#include <time.h>
#include <osc.lv2/osc.h>
#include <osc.lv2/reader.h>
......@@ -9,7 +10,7 @@
#include <osc.lv2/forge.h>
#include <osc.lv2/stream.h>
#define BUF_SIZE 8192
#define BUF_SIZE 0x100000
#define MAX_URIDS 512
typedef void (*test_t)(LV2_OSC_Writer *writer);
......@@ -588,12 +589,21 @@ static const LV2_OSC_Driver driv = {
.read_adv = _read_adv
};
#define COUNT 1024
#define COUNT 128
typedef struct _pair_t pair_t;
struct _pair_t {
const char *server;
const char *client;
bool lossy;
};
static void *
_thread_1(void *data)
{
const char *uri = data;
const pair_t *pair = data;
const char *uri = pair->server;
LV2_OSC_Stream stream;
stash_t stash [2];
......@@ -605,9 +615,11 @@ _thread_1(void *data)
assert(lv2_osc_stream_init(&stream, uri, &driv, stash) == 0);
time_t t0 = time(NULL);
unsigned count = 0;
while(true)
{
const time_t t1 = time(NULL);
const LV2_OSC_Enum ev = lv2_osc_stream_run(&stream);
if(ev & LV2_OSC_RECV)
......@@ -648,31 +660,44 @@ _thread_1(void *data)
_stash_read_adv(&stash[0]);
}
t0 = t1;
}
if(count >= COUNT)
{
break;