varchunk.h 9.89 KB
Newer Older
Hanspeter Portner's avatar
Hanspeter Portner committed
1
/*
2
 * Copyright (c) 2015-2017 Hanspeter Portner (dev@open-music-kontrollers.ch)
Hanspeter Portner's avatar
Hanspeter Portner committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
 *
 * This is free software: you can redistribute it and/or modify
 * it under the terms of the Artistic License 2.0 as published by
 * The Perl Foundation.
 *
 * This source is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * Artistic License 2.0 for more details.
 *
 * You should have received a copy of the Artistic License 2.0
 * along the source as a COPYING file. If not, obtain it from
 * http://www.perlfoundation.org/artistic_license_2_0.
 */

#ifndef _VARCHUNK_H
#define _VARCHUNK_H

#ifdef __cplusplus
extern "C" {
#endif

25 26
#include <stdlib.h>
#include <stdint.h>
27
#include <stdatomic.h>
28
#include <stdbool.h>
29 30 31 32 33 34 35 36 37 38
#include <assert.h>

#if !defined(_WIN32)
#	include <sys/mman.h> // mlock
#endif

/*****************************************************************************
 * API START
 *****************************************************************************/

Hanspeter Portner's avatar
Hanspeter Portner committed
39 40
typedef struct _varchunk_t varchunk_t;

41
static inline bool
42 43
varchunk_is_lock_free(void);

44 45 46
static inline size_t
varchunk_body_size(size_t minimum);

47
static inline varchunk_t *
48
varchunk_new(size_t minimum, bool release_and_acquire);
Hanspeter Portner's avatar
Hanspeter Portner committed
49

50
static inline void
Hanspeter Portner's avatar
Hanspeter Portner committed
51 52
varchunk_free(varchunk_t *varchunk);

53 54 55
static inline void
varchunk_init(varchunk_t *varchunk, size_t body_size, bool release_and_acquire);

56 57 58
static inline void *
varchunk_write_request_max(varchunk_t *varchunk, size_t minimum, size_t *maximum);

59
static inline void *
Hanspeter Portner's avatar
Hanspeter Portner committed
60 61
varchunk_write_request(varchunk_t *varchunk, size_t minimum);

62
static inline void
Hanspeter Portner's avatar
Hanspeter Portner committed
63 64
varchunk_write_advance(varchunk_t *varchunk, size_t written);

65
static inline const void *
Hanspeter Portner's avatar
Hanspeter Portner committed
66 67
varchunk_read_request(varchunk_t *varchunk, size_t *toread);

68
static inline void
Hanspeter Portner's avatar
Hanspeter Portner committed
69 70
varchunk_read_advance(varchunk_t *varchunk);

71 72 73 74 75 76 77 78 79 80
/*****************************************************************************
 * API END
 *****************************************************************************/

#define VARCHUNK_PAD(SIZE) ( ( (size_t)(SIZE) + 7U ) & ( ~7U ) )

typedef struct _varchunk_elmnt_t varchunk_elmnt_t;

struct _varchunk_elmnt_t {
	uint32_t size;
81
	uint32_t gap;
82 83 84 85 86 87
};

struct _varchunk_t {
  size_t size;
  size_t mask;
	size_t rsvd;
88
	size_t gapd;
89

90 91 92
	memory_order acquire;
	memory_order release;

93 94
  atomic_size_t head;
  atomic_size_t tail;
95

96
  uint8_t buf [] __attribute__((aligned(sizeof(varchunk_elmnt_t))));
97 98
}; 

99
static inline bool
100 101 102 103 104 105 106 107
varchunk_is_lock_free(void)
{
	varchunk_t varchunk;

	return atomic_is_lock_free(&varchunk.head)
	 && atomic_is_lock_free(&varchunk.tail);
}

108 109
static inline size_t
varchunk_body_size(size_t minimum)
110
{
111 112 113 114 115
	size_t size = 1;
	while(size < minimum)
		size <<= 1; // assure size to be a power of 2
	return size;
}
116

117 118 119
static inline void
varchunk_init(varchunk_t *varchunk, size_t body_size, bool release_and_acquire)
{
120 121 122 123 124 125 126
	varchunk->acquire = release_and_acquire
		? memory_order_acquire
		: memory_order_relaxed;
	varchunk->release = release_and_acquire
		? memory_order_release
		: memory_order_relaxed;

127 128
	atomic_init(&varchunk->head, 0);
	atomic_init(&varchunk->tail, 0);
129

130
	varchunk->size = body_size;
131
	varchunk->mask = varchunk->size - 1;
132 133 134 135 136 137 138 139 140
}

static inline varchunk_t *
varchunk_new(size_t minimum, bool release_and_acquire)
{
	varchunk_t *varchunk = NULL;

	const size_t body_size = varchunk_body_size(minimum);
	const size_t total_size = sizeof(varchunk_t) + body_size;
141 142

#if defined(_WIN32)
143
	varchunk = _aligned_malloc(total_size, sizeof(varchunk_elmnt_t));
144
#else
145 146
	posix_memalign((void **)&varchunk, sizeof(varchunk_elmnt_t), total_size);
	mlock(varchunk, total_size); // prevent memory from being flushed to disk
147
#endif
148 149 150

	if(varchunk)
		varchunk_init(varchunk, body_size, release_and_acquire);
151

152 153 154 155 156 157 158 159 160
	return varchunk;
}

static inline void
varchunk_free(varchunk_t *varchunk)
{
	if(varchunk)
	{
#if !defined(_WIN32)
161
		munlock(varchunk->buf, varchunk->size);
162 163 164 165 166 167
#endif
		free(varchunk);
	}
}

static inline void
168
_varchunk_write_advance_raw(varchunk_t *varchunk, size_t head, size_t written)
169 170
{
	// only producer is allowed to advance write head
171
	const size_t new_head = (head + written) & varchunk->mask;
172
	atomic_store_explicit(&varchunk->head, new_head, varchunk->release);
173 174 175
}

static inline void *
176
varchunk_write_request_max(varchunk_t *varchunk, size_t minimum, size_t *maximum)
177
{
178 179
	assert(varchunk);

180 181
	size_t space; // size of writable buffer
	size_t end; // virtual end of writable buffer
182
	const size_t head = atomic_load_explicit(&varchunk->head, memory_order_relaxed); // read head
183
	const size_t tail = atomic_load_explicit(&varchunk->tail, varchunk->acquire); // read tail (consumer modifies it any time)
184
	const size_t padded = 2*sizeof(varchunk_elmnt_t) + VARCHUNK_PAD(minimum);
185 186 187 188 189 190 191 192 193 194 195 196 197

	// calculate writable space
	if(head > tail)
		space = ((tail - head + varchunk->size) & varchunk->mask) - 1;
	else if(head < tail)
		space = (tail - head) - 1;
	else // head == tail
		space = varchunk->size - 1;
	end = head + space;

	if(end > varchunk->size) // available region wraps over at end of buffer
	{
		// get first part of available buffer
198
		uint8_t *buf1 = varchunk->buf + head;
199
		const size_t len1 = varchunk->size - head;
200 201 202 203

		if(len1 < padded) // not enough space left on first part of buffer
		{
			// get second part of available buffer
204
			uint8_t *buf2 = varchunk->buf;
205
			const size_t len2 = end & varchunk->mask;
206 207 208 209

			if(len2 < padded) // not enough space left on second buffer, either
			{
				varchunk->rsvd = 0;
210
				varchunk->gapd = 0;
211 212
				if(maximum)
					*maximum = varchunk->rsvd;
213 214 215 216
				return NULL;
			}
			else // enough space left on second buffer, use it!
			{
217
				varchunk->rsvd = len2;
218
				varchunk->gapd = len1;
219 220
				if(maximum)
					*maximum = varchunk->rsvd;
221 222 223 224 225
				return buf2 + sizeof(varchunk_elmnt_t);
			}
		}
		else // enough space left on first part of buffer, use it!
		{
226
			varchunk->rsvd = len1;
227
			varchunk->gapd = 0;
228 229
			if(maximum)
				*maximum = varchunk->rsvd;
230 231 232 233 234
			return buf1 + sizeof(varchunk_elmnt_t);
		}
	}
	else // available region is contiguous
	{
235
		uint8_t *buf = varchunk->buf + head;
236 237 238 239

		if(space < padded) // no space left on contiguous buffer
		{
			varchunk->rsvd = 0;
240
			varchunk->gapd = 0;
241 242
			if(maximum)
				*maximum = varchunk->rsvd;
243 244
			return NULL;
		}
245
		else // enough space left on contiguous buffer, use it!
246
		{
247
			varchunk->rsvd = space;
248
			varchunk->gapd = 0;
249 250
			if(maximum)
				*maximum = varchunk->rsvd;
251 252 253 254 255
			return buf + sizeof(varchunk_elmnt_t);
		}
	}
}

256 257 258 259 260 261
static inline void *
varchunk_write_request(varchunk_t *varchunk, size_t minimum)
{
	return varchunk_write_request_max(varchunk, minimum, NULL);
}

262 263 264
static inline void
varchunk_write_advance(varchunk_t *varchunk, size_t written)
{
265
	assert(varchunk);
266 267 268 269
	// fail miserably if stupid programmer tries to write more than rsvd
	assert(written <= varchunk->rsvd);

	// write elmnt header at head
270 271 272 273
	const size_t head = atomic_load_explicit(&varchunk->head, memory_order_relaxed);
	if(varchunk->gapd > 0)
	{
		// fill end of first buffer with gap
274
		varchunk_elmnt_t *elmnt = (varchunk_elmnt_t *)(varchunk->buf + head);
275 276 277 278
		elmnt->size = varchunk->gapd - sizeof(varchunk_elmnt_t);
		elmnt->gap = 1;

		// fill written element header
279
		elmnt = (void *)varchunk->buf;
280 281 282 283 284 285
		elmnt->size = written;
		elmnt->gap = 0;
	}
	else // varchunk->gapd == 0
	{
		// fill written element header
286
		varchunk_elmnt_t *elmnt = (varchunk_elmnt_t *)(varchunk->buf + head);
287 288 289
		elmnt->size = written;
		elmnt->gap = 0;
	}
290 291

	// advance write head
292
	_varchunk_write_advance_raw(varchunk, head,
293
		varchunk->gapd + sizeof(varchunk_elmnt_t) + VARCHUNK_PAD(written));
294 295 296
}

static inline void
297
_varchunk_read_advance_raw(varchunk_t *varchunk, size_t tail, size_t read)
298 299
{
	// only consumer is allowed to advance read tail 
300
	const size_t new_tail = (tail + read) & varchunk->mask;
301
	atomic_store_explicit(&varchunk->tail, new_tail, varchunk->release);
302 303 304 305 306
}

static inline const void *
varchunk_read_request(varchunk_t *varchunk, size_t *toread)
{
307
	assert(varchunk);
308
	size_t space; // size of available buffer
309
	const size_t tail = atomic_load_explicit(&varchunk->tail, memory_order_relaxed); // read tail
310
	const size_t head = atomic_load_explicit(&varchunk->head, varchunk->acquire); // read head (producer modifies it any time)
311 312 313 314 315 316 317 318 319

	// calculate readable space
	if(head > tail)
		space = head - tail;
	else
		space = (head - tail + varchunk->size) & varchunk->mask;

	if(space > 0) // there may be chunks available for reading
	{
320
		const size_t end = tail + space; // virtual end of available buffer
321 322 323 324

		if(end > varchunk->size) // available buffer wraps around at end
		{
			// first part of available buffer
325
			const uint8_t *buf1 = varchunk->buf + tail;
326
			const size_t len1 = varchunk->size - tail;
327
			const varchunk_elmnt_t *elmnt = (const varchunk_elmnt_t *)buf1;
328 329 330 331

			if(elmnt->gap) // gap elmnt?
			{
				// skip gap
332
				_varchunk_read_advance_raw(varchunk, tail, len1);
333 334

				// second part of available buffer
335 336 337
				const uint8_t *buf2 = varchunk->buf;
				// there will always be at least on element after a gap
				elmnt = (const varchunk_elmnt_t *)buf2;
338 339 340 341 342 343 344 345 346 347 348 349 350

				*toread = elmnt->size;
				return buf2 + sizeof(varchunk_elmnt_t);
			}
			else // valid chunk, use it!
			{
				*toread = elmnt->size;
				return buf1 + sizeof(varchunk_elmnt_t);
			}
		}
		else // available buffer is contiguous
		{
			// get buffer
351 352
			const uint8_t *buf = varchunk->buf + tail;
			const varchunk_elmnt_t *elmnt = (const varchunk_elmnt_t *)buf;
353

354 355
			*toread = elmnt->size;
			return buf + sizeof(varchunk_elmnt_t);
356 357 358 359 360 361 362 363 364 365 366 367
		}
	}
	else // no chunks available aka empty buffer
	{
		*toread = 0;
		return NULL;
	}
}

static inline void
varchunk_read_advance(varchunk_t *varchunk)
{
368
	assert(varchunk);
369
	// get elmnt header from tail (for size)
370
	const size_t tail = atomic_load_explicit(&varchunk->tail, memory_order_relaxed);
371
	const varchunk_elmnt_t *elmnt = (const varchunk_elmnt_t *)(varchunk->buf + tail);
372 373

	// advance read tail
374
	_varchunk_read_advance_raw(varchunk, tail,
375 376 377 378 379
		sizeof(varchunk_elmnt_t) + VARCHUNK_PAD(elmnt->size));
}

#undef VARCHUNK_PAD

Hanspeter Portner's avatar
Hanspeter Portner committed
380 381 382 383
#ifdef __cplusplus
}
#endif

384
#endif //_VARCHUNK_H