Commit fd6239cc authored by Richard W.M. Jones's avatar Richard W.M. Jones
Browse files

Implement asynch nbd_aio in nbdublk

(Will be folded into previous commit)
parent 908ca30b
Pipeline #623041539 failed with stages
in 8 minutes and 50 seconds
......@@ -412,6 +412,7 @@ main (int argc, char *argv[])
data.tgt_type = "nbd";
data.tgt_ops = &tgt_type;
data.flags = 0;
data.ublksrv_flags = UBLKSRV_F_NEED_EVENTFD;
dev = ublksrv_ctrl_init (&data);
if (!dev) {
......
......@@ -22,34 +22,70 @@
#include <stdlib.h>
#include <stdint.h>
#include <stdbool.h>
#include <time.h>
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#ifdef HAVE_STDATOMIC_H
#include <stdatomic.h>
#else
/* Rely on ints being atomic enough on the platform. */
#define _Atomic /**/
#endif
#include <ublksrv.h>
#include <libnbd.h>
#include "ispowerof2.h"
#include "vector.h"
#include "nbdublk.h"
/* Per-thread information. */
/* Number of seconds to wait for commands to complete when closing the dev. */
#define RELEASE_TIMEOUT 5
/* Thread model:
*
* There are two threads per NBD connection. One thread
* ('io_uring_thread') handles the io_uring traffic. The other thread
* ('nbd_work_thread') handles the NBD asynchronous commands for that
* connection.
*
* The thread_info entry is shared between each pair of threads.
*/
struct thread_info {
struct ublksrv_dev *dev;
size_t thread_num;
pthread_t thread;
size_t i; /* index into nbd.ptr[], also q_id */
pthread_t io_uring_thread;
pthread_t nbd_work_thread;
/* This counts the number of commands in flight. The condition is
* used to allow the operations thread to process commands when
* in_flight goes from 0 -> 1. This is roughly equivalent to
* nbd_aio_in_flight, but we need to count it ourselves in order to
* use the condition.
*/
_Atomic size_t in_flight;
pthread_mutex_t in_flight_mutex;
pthread_cond_t in_flight_cond;
};
DEFINE_VECTOR_TYPE(thread_infos, struct thread_info)
static thread_infos thread_info;
static pthread_barrier_t barrier;
static char jbuf[4096];
static pthread_mutex_t jbuf_lock = PTHREAD_MUTEX_INITIALIZER;
static void *
io_thread (void *vpinfo)
io_uring_thread (void *vpinfo)
{
struct thread_info *thread_info = vpinfo;
struct ublksrv_dev *dev = thread_info->dev;
const unsigned dev_id = dev->ctrl_dev->dev_info.dev_id;
const size_t q_id = thread_info->thread_num;
const size_t q_id = thread_info->i;
struct ublksrv_queue *q;
int r;
......@@ -87,6 +123,37 @@ io_thread (void *vpinfo)
return NULL;
}
static void *
nbd_work_thread (void *vpinfo)
{
struct thread_info *thread_info = vpinfo;
const size_t i = thread_info->i;
struct nbd_handle *h = nbd.ptr[i];
/* Signal to the main thread that we have initialized. */
pthread_barrier_wait (&barrier);
while (1) {
/* Sleep until at least one command is in flight. */
pthread_mutex_lock (&thread_info->in_flight_mutex);
while (thread_info->in_flight == 0)
pthread_cond_wait (&thread_info->in_flight_cond,
&thread_info->in_flight_mutex);
pthread_mutex_unlock (&thread_info->in_flight_mutex);
/* Dispatch work while there are commands in flight. */
while (thread_info->in_flight > 0) {
if (nbd_poll (h, -1) == -1) {
fprintf (stderr, "%s\n", nbd_get_error ());
exit (EXIT_FAILURE);
}
}
}
/*NOTREACHED*/
return NULL;
}
static int
set_parameters (struct ublksrv_ctrl_dev *ctrl_dev,
const struct ublksrv_dev *dev)
......@@ -132,24 +199,37 @@ int
start_daemon (struct ublksrv_ctrl_dev *ctrl_dev)
{
const struct ublksrv_ctrl_dev_info *dinfo = &ctrl_dev->dev_info;
struct thread_info *thread_info;
struct ublksrv_dev *dev;
size_t i;
int r;
time_t st;
assert (dinfo->nr_hw_queues == connections);
assert (nbd.len == connections);
if (verbose)
fprintf (stderr, "%s: starting daemon\n", "nbdublk");
r = ublksrv_ctrl_get_affinity(ctrl_dev);
if (r < 0) {
/* This barrier is used to ensure all NBD work threads have started
* up before we proceed to start the device.
*/
r = pthread_barrier_init (&barrier, NULL, nbd.len + 1);
if (r != 0) {
errno = r;
perror ("ublksrv_ctrl_get_affinity");
perror ("nbdublk: pthread_barrier_init");
return -1;
}
thread_info = calloc (dinfo->nr_hw_queues, sizeof (struct thread_info));
if (thread_info == NULL) {
perror ("calloc");
/* Reserve space for the thread_info. */
if (thread_infos_reserve (&thread_info, nbd.len) == -1) {
perror ("realloc");
return -1;
}
r = ublksrv_ctrl_get_affinity(ctrl_dev);
if (r < 0) {
errno = r;
perror ("ublksrv_ctrl_get_affinity");
return -1;
}
......@@ -164,20 +244,38 @@ start_daemon (struct ublksrv_ctrl_dev *ctrl_dev)
return -1;
}
/* Create the io threads. */
for (i = 0; i < dinfo->nr_hw_queues; ++i) {
thread_info[i].dev = dev;
thread_info[i].thread_num = i;
r = pthread_create (&thread_info[i].thread, NULL,
io_thread, &thread_info[i]);
/* Create the threads. */
for (i = 0; i < nbd.len; ++i) {
/* Note this cannot fail because of previous reserve. */
thread_infos_append (&thread_info,
(struct thread_info)
{ .dev = dev, .i = i, .in_flight = 0 });
r = pthread_mutex_init (&thread_info.ptr[i].in_flight_mutex, NULL);
if (r != 0)
goto bad_pthread;
r = pthread_cond_init (&thread_info.ptr[i].in_flight_cond, NULL);
if (r != 0)
goto bad_pthread;
r = pthread_create (&thread_info.ptr[i].io_uring_thread, NULL,
io_uring_thread, &thread_info.ptr[i]);
if (r != 0)
goto bad_pthread;
r = pthread_create (&thread_info.ptr[i].nbd_work_thread, NULL,
nbd_work_thread, &thread_info.ptr[i]);
if (r != 0) {
bad_pthread:
errno = r;
perror ("pthread_create");
perror ("nbdublk: pthread");
ublksrv_dev_deinit (dev);
return -1;
}
}
/* Wait on the barrier to ensure all NBD work threads are up. */
pthread_barrier_wait (&barrier);
pthread_barrier_destroy (&barrier);
if (set_parameters (ctrl_dev, dev) == -1) {
ublksrv_dev_deinit (dev);
return -1;
......@@ -195,12 +293,32 @@ start_daemon (struct ublksrv_ctrl_dev *ctrl_dev)
ublksrv_ctrl_get_info (ctrl_dev);
ublksrv_ctrl_dump (ctrl_dev, jbuf);
/* Wait for threads to exit. */
for (i = 0; i < dinfo->nr_hw_queues; ++i)
pthread_join (thread_info[i].thread, NULL);
/* Wait for io_uring threads to exit. */
for (i = 0; i < nbd.len; ++i)
pthread_join (thread_info.ptr[i].io_uring_thread, NULL);
/* Wait until a timeout while there are NBD commands in flight. */
time (&st);
while (time (NULL) - st <= RELEASE_TIMEOUT) {
for (i = 0; i < nbd.len; ++i) {
if (thread_info.ptr[i].in_flight > 0)
break;
}
if (i == nbd.len) /* no commands in flight */
break;
/* Signal to the operations threads to work. */
for (i = 0; i < nbd.len; ++i) {
pthread_mutex_lock (&thread_info.ptr[i].in_flight_mutex);
pthread_cond_signal (&thread_info.ptr[i].in_flight_cond);
pthread_mutex_unlock (&thread_info.ptr[i].in_flight_mutex);
}
sleep (1);
}
ublksrv_dev_deinit (dev);
free (thread_info);
//thread_infos_reset (&thread_info);
return 0;
}
......@@ -230,6 +348,82 @@ init_tgt (struct ublksrv_dev *dev, int type, int argc, char *argv[])
return 0;
}
/* Command completion. */
struct completion {
struct ublksrv_queue *q;
int tag;
int res; /* The normal return value, if the command completes OK. */
};
DEFINE_VECTOR_TYPE(completions, struct completion)
static pthread_mutex_t completed_commands_lock = PTHREAD_MUTEX_INITIALIZER;
static completions completed_commands;
static int
command_completed (void *vpdata, int *error)
{
struct completion *completion = vpdata;
struct ublksrv_queue *q = completion->q;
if (verbose)
fprintf (stderr,
"%s: command_completed: tag=%d q_id = %d res=%d error=%d\n",
"nbdublk", completion->tag, q->q_id, completion->res, *error);
/* If the command failed, override the normal result. */
if (*error != 0)
completion->res = *error;
assert (thread_info.ptr[q->q_id].in_flight >= 1);
thread_info.ptr[q->q_id].in_flight--;
/* Copy the command to the list of completed commands.
*
* Note *completion is freed by the .free handler that we added to
* this completion callback.
*/
pthread_mutex_lock (&completed_commands_lock);
completions_append (&completed_commands, *completion);
pthread_mutex_unlock (&completed_commands_lock);
/* Signal io_uring thread that the command has been completed.
* It will call us back in a different thread on ->handle_event
* and we can finally complete the command(s) there.
*/
ublksrv_queue_send_event (q);
/* Retire the NBD command. */
return 1;
}
/* Retire one or more completed commands. */
static void
complete_all_commands (void)
{
size_t i, len = completed_commands.len;
for (i = 0; i < len; ++i) {
struct completion *completion = &completed_commands.ptr[i];
ublksrv_complete_io (completion->q, completion->tag, completion->res);
}
completions_reset (&completed_commands);
}
static void
handle_event (struct ublksrv_queue *q)
{
if (verbose)
fprintf (stderr, "%s: handle_event: q_id = %d\n", "nbdublk", q->q_id);
pthread_mutex_lock (&completed_commands_lock);
complete_all_commands ();
pthread_mutex_unlock (&completed_commands_lock);
ublksrv_queue_handled_event (q);
}
/* Start a single command. */
static int
handle_io_async (struct ublksrv_queue *q, int tag)
{
......@@ -241,47 +435,55 @@ handle_io_async (struct ublksrv_queue *q, int tag)
const size_t q_id = q->q_id; /* also the NBD handle number */
struct nbd_handle *h = nbd.ptr[q_id];
uint32_t nbd_flags = 0;
int r, res;
int64_t r;
nbd_completion_callback cb;
struct completion *completion;
if (verbose)
fprintf (stderr, "%s: handle_io_async: tag = %d q_id = %zu\n",
"nbdublk", tag, q_id);
/* XXX reimplement this using asynch operations */
/* Set up a completion callback and its user data. */
completion = malloc (sizeof *completion);
if (completion == NULL) abort ();
completion->q = q;
completion->tag = tag;
completion->res = iod->nr_sectors << 9;
cb.callback = command_completed;
cb.user_data = completion;
cb.free = free;
switch (op) {
case UBLK_IO_OP_READ:
r = nbd_pread (h, (void *) iod->addr, iod->nr_sectors << 9,
iod->start_sector << 9, 0);
r = nbd_aio_pread (h, (void *) iod->addr, iod->nr_sectors << 9,
iod->start_sector << 9, cb, 0);
if (r == -1) {
fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
res = - (nbd_get_errno () ? : EINVAL);
ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL));
return 0;
}
else
res = iod->nr_sectors << 9; /* NBD always does complete op. */
break;
case UBLK_IO_OP_WRITE:
if (fua && can_fua)
nbd_flags |= LIBNBD_CMD_FLAG_FUA;
r = nbd_pwrite (h, (const void *) iod->addr, iod->nr_sectors << 9,
iod->start_sector << 9, nbd_flags);
r = nbd_aio_pwrite (h, (const void *) iod->addr, iod->nr_sectors << 9,
iod->start_sector << 9, cb, nbd_flags);
if (r == -1) {
fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
res = - (nbd_get_errno () ? : EINVAL);
ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL));
return 0;
}
else
res = iod->nr_sectors << 9; /* NBD always does complete op. */
break;
case UBLK_IO_OP_FLUSH:
r = nbd_flush (h, 0);
if (r == -1) {
fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
res = - (nbd_get_errno () ? : EINVAL);
ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL));
return 0;
}
else
res = 0;
break;
case UBLK_IO_OP_DISCARD:
......@@ -291,10 +493,9 @@ handle_io_async (struct ublksrv_queue *q, int tag)
r = nbd_trim (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags);
if (r == -1) {
fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
res = - (nbd_get_errno () ? : EINVAL);
ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL));
return 0;
}
else
res = iod->nr_sectors << 9;
break;
case UBLK_IO_OP_WRITE_ZEROES:
......@@ -307,19 +508,22 @@ handle_io_async (struct ublksrv_queue *q, int tag)
r = nbd_zero (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags);
if (r == -1) {
fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
res = - (nbd_get_errno () ? : EINVAL);
ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL));
return 0;
}
else
res = iod->nr_sectors << 9;
break;
default:
fprintf (stderr, "%s: unknown operation %u\n", "nbdublk", op);
res = -ENOTSUP;
break;
ublksrv_complete_io (q, tag, -ENOTSUP);
return 0;
}
ublksrv_complete_io (q, tag, res);
/* Make sure the corresponding NBD worker sees the command. */
pthread_mutex_lock (&thread_info.ptr[q_id].in_flight_mutex);
thread_info.ptr[q_id].in_flight++;
pthread_cond_signal (&thread_info.ptr[q_id].in_flight_cond);
pthread_mutex_unlock (&thread_info.ptr[q_id].in_flight_mutex);
return 0;
}
......@@ -329,4 +533,5 @@ struct ublksrv_tgt_type tgt_type = {
.name = "nbd",
.init_tgt = init_tgt,
.handle_io_async = handle_io_async,
.handle_event = handle_event,
};
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment