Commit 85f57eff authored by Daniel P. Berrange's avatar Daniel P. Berrange

Add stream sendall/recvall functions

Signed-off-by: 's avatarDaniel P. Berrange <berrange@redhat.com>
parent 2242e0b9
......@@ -62,8 +62,6 @@ var (
"virStreamEventAddCallback",
"virStreamEventRemoveCallback",
"virStreamEventUpdateCallback",
"virStreamRecvAll",
"virStreamSendAll",
/* Wrapped in domain_events_cfuncs.go instead */
"virConnectDomainEventRegister",
......
......@@ -29,6 +29,8 @@
package libvirt
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"strings"
......@@ -1312,6 +1314,156 @@ func TestStorageVolUploadDownload(t *testing.T) {
}
}
func TestStorageVolUploadDownloadCallbacks(t *testing.T) {
conn, err := NewConnect("lxc:///")
if err != nil {
t.Error(err)
return
}
defer func() {
if res, _ := conn.CloseConnection(); res != 0 {
t.Errorf("CloseConnection() == %d, expected 0", res)
}
}()
poolPath, err := ioutil.TempDir("", "default-pool-test-1")
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(poolPath)
pool, err := conn.StoragePoolDefineXML(`<pool type='dir'>
<name>default-pool-test-1</name>
<target>
<path>`+poolPath+`</path>
</target>
</pool>`, 0)
defer func() {
pool.Undefine()
pool.Free()
}()
if err := pool.Create(0); err != nil {
t.Error(err)
return
}
defer pool.Destroy()
vol, err := pool.StorageVolCreateXML(testStorageVolXML("", poolPath), 0)
if err != nil {
t.Error(err)
return
}
defer func() {
vol.Delete(STORAGE_VOL_DELETE_NORMAL)
vol.Free()
}()
input := make([]byte, 1024*1024)
for i := 0; i < len(input); i++ {
input[i] = (byte)(((i % 256) ^ (i / 256)) % 256)
}
// write above data to the vol
// 1. create a stream
stream, err := conn.NewStream(0)
if err != nil {
t.Fatal(err)
}
defer func() {
stream.Free()
}()
// 2. set it up to upload from stream
if err := vol.Upload(stream, 0, uint64(len(input)), 0); err != nil {
stream.Abort()
t.Fatal(err)
}
sent := 0
source := func(stream *Stream, nbytes int) ([]byte, error) {
tosend := nbytes
if tosend > (len(input) - sent) {
tosend = len(input) - sent
}
if tosend == 0 {
return []byte{}, nil
}
data := input[sent : sent+tosend]
sent += tosend
return data, nil
}
// 3. do the actual writing
if err := stream.SendAll(source); err != nil {
t.Fatal(err)
}
if sent != len(input) {
t.Fatal("Wanted %d but only sent %d bytes",
len(input), sent)
}
// 4. finish!
if err := stream.Finish(); err != nil {
t.Fatal(err)
}
// read back the data
// 1. create a stream
downStream, err := conn.NewStream(0)
if err != nil {
t.Fatal(err)
}
defer func() {
downStream.Free()
}()
// 2. set it up to download from stream
if err := vol.Download(downStream, 0, uint64(len(input)), 0); err != nil {
downStream.Abort()
t.Fatal(err)
}
// 3. do the actual reading
output := make([]byte, len(input))
got := 0
sink := func(st *Stream, data []byte) (int, error) {
toget := len(data)
if (got + toget) > len(output) {
toget = len(output) - got
}
if toget == 0 {
return 0, fmt.Errorf("Output buffer is full")
}
target := output[got : got+toget]
copied := copy(target, data)
got += copied
return copied, nil
}
if err := downStream.RecvAll(sink); err != nil {
t.Fatal(err)
}
if got != len(input) {
t.Fatalf("Wanted %d but only received %d bytes",
len(input), got)
}
// 4. finish!
if err := downStream.Finish(); err != nil {
t.Fatal(err)
}
if !bytes.Equal(input, output) {
t.Fatal("Input and output arrays are different")
}
}
/*func TestDomainMemoryStats(t *testing.T) {
conn, err := NewConnect("lxc:///")
if err != nil {
......
......@@ -31,6 +31,7 @@ package libvirt
#include <libvirt/libvirt.h>
#include <libvirt/virterror.h>
#include <stdlib.h>
#include "stream_cfuncs.h"
*/
import "C"
import (
......@@ -107,3 +108,83 @@ func (v *Stream) Send(p []byte) (int, error) {
return int(n), nil
}
type StreamSinkFunc func(*Stream, []byte) (int, error)
//export streamSinkCallback
func streamSinkCallback(stream C.virStreamPtr, cdata *C.char, nbytes C.size_t, callbackID int) int {
callbackFunc := getCallbackId(callbackID)
callback, ok := callbackFunc.(StreamSinkFunc)
if !ok {
panic("Incorrect stream sink func callback")
}
data := make([]byte, int(nbytes))
for i := 0; i < int(nbytes); i++ {
cdatabyte := (*C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(cdata)) + (unsafe.Sizeof(*cdata) * uintptr(i))))
data[i] = (byte)(*cdatabyte)
}
retnbytes, err := callback(&Stream{ptr: stream}, data)
if err != nil {
return -1
}
return retnbytes
}
func (v *Stream) RecvAll(handler StreamSinkFunc) error {
callbackID := registerCallbackId(handler)
ret := C.virStreamRecvAll_cgo(v.ptr, (C.int)(callbackID))
freeCallbackId(callbackID)
if ret == -1 {
return GetLastError()
}
return nil
}
type StreamSourceFunc func(*Stream, int) ([]byte, error)
//export streamSourceCallback
func streamSourceCallback(stream C.virStreamPtr, cdata *C.char, nbytes C.size_t, callbackID int) int {
callbackFunc := getCallbackId(callbackID)
callback, ok := callbackFunc.(StreamSourceFunc)
if !ok {
panic("Incorrect stream sink func callback")
}
data, err := callback(&Stream{ptr: stream}, (int)(nbytes))
if err != nil {
return -1
}
nretbytes := int(nbytes)
if len(data) < nretbytes {
nretbytes = len(data)
}
for i := 0; i < nretbytes; i++ {
cdatabyte := (*C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(cdata)) + (unsafe.Sizeof(*cdata) * uintptr(i))))
*cdatabyte = (C.char)(data[i])
}
return nretbytes
}
func (v *Stream) SendAll(handler StreamSourceFunc) error {
callbackID := registerCallbackId(handler)
ret := C.virStreamSendAll_cgo(v.ptr, (C.int)(callbackID))
freeCallbackId(callbackID)
if ret == -1 {
return GetLastError()
}
return nil
}
/*
* This file is part of the libvirt-go project
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
* Copyright (c) 2013 Alex Zorin
* Copyright (C) 2016 Red Hat, Inc.
*
*/
package libvirt
/*
#cgo pkg-config: libvirt
#include <libvirt/libvirt.h>
#include <libvirt/virterror.h>
#include "stream_cfuncs.h"
int streamSourceCallback(virStreamPtr st, char *cdata, size_t nbytes, int callbackID);
int streamSinkCallback(virStreamPtr st, const char *cdata, size_t nbytes, int callbackID);
static int streamSourceCallbackHelper(virStreamPtr st, char *data, size_t nbytes, void *opaque)
{
int *callbackID = opaque;
return streamSourceCallback(st, data, nbytes, *callbackID);
}
static int streamSinkCallbackHelper(virStreamPtr st, const char *data, size_t nbytes, void *opaque)
{
int *callbackID = opaque;
return streamSinkCallback(st, data, nbytes, *callbackID);
}
int virStreamSendAll_cgo(virStreamPtr st, int callbackID)
{
return virStreamSendAll(st, streamSourceCallbackHelper, &callbackID);
}
int virStreamRecvAll_cgo(virStreamPtr st, int callbackID)
{
return virStreamRecvAll(st, streamSinkCallbackHelper, &callbackID);
}
*/
import "C"
/*
* This file is part of the libvirt-go project
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
* Copyright (c) 2013 Alex Zorin
* Copyright (C) 2016 Red Hat, Inc.
*
*/
#ifndef LIBVIRT_GO_STREAM_CFUNCS_H__
#define LIBVIRT_GO_STREAM_CFUNCS_H__
int virStreamSendAll_cgo(virStreamPtr st, int callbackID);
int virStreamRecvAll_cgo(virStreamPtr st, int callbackID);
#endif /* LIBVIRT_GO_STREAM_CFUNCS_H__ */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment