Commit 7dd7f104 authored by Alexander Sporn's avatar Alexander Sporn

Added giota-compatible client

parent 8524245c
package powsrvio
import (
"crypto/tls"
"errors"
"fmt"
"io"
"sync"
"time"
"gitlab.com/powsrv.io/go/api"
"github.com/iotaledger/giota"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)
const (
serverName string = "powsrv.io"
serverAddress string = "powsrv.io:443"
)
// PendingPowRequest contains the information about a request and the response channel for signaling
type PendingPowRequest struct {
powRequest *api.PowRequest
powResponseChan chan *api.PowResponse
}
// PowClient is the client that connects to the powsrv.io
type PowClient struct {
ReadTimeOutMs int64 // Timeout in ms to receive a response
APIKey string
Verbose bool // Print info messages
connection *grpc.ClientConn // Connection to the powsrv.io
connectionError error
connectionErrorOcurred chan bool
powStream api.Pow_DoPowClient // Stream for DoPow
requestID uint64
requestIDLock *sync.Mutex
outgoingReq chan PendingPowRequest // Outgoing Requests to the powsrv.io
pendingRequests map[uint64]PendingPowRequest
pendingRequestsLock *sync.Mutex
}
func (client *PowClient) Init() error {
var err error
creds := credentials.NewTLS(&tls.Config{
ServerName: serverName,
})
// Set up a connection to the server.
client.connection, err = grpc.Dial(serverAddress, grpc.WithTransportCredentials(creds))
if err != nil {
return err
}
client.connectionErrorOcurred = make(chan bool, 1)
client.requestID = 0
client.requestIDLock = &sync.Mutex{}
client.outgoingReq = make(chan PendingPowRequest)
client.pendingRequests = make(map[uint64]PendingPowRequest)
client.pendingRequestsLock = &sync.Mutex{}
md := metadata.Pairs("authorization", "powsrv-token "+client.APIKey)
ctx := metadata.NewOutgoingContext(context.Background(), md)
// Set up a RPC connection to the server.
rpcClient := api.NewPowClient(client.connection)
client.powStream, err = rpcClient.DoPow(ctx)
if err != nil {
return err
}
go client.handleConnection()
return nil
}
func (client *PowClient) Close() {
client.powStream.CloseSend()
client.connection.Close()
}
func (client *PowClient) printMsg(msg string) {
if client.Verbose {
fmt.Print(msg + "\n")
}
}
func (client *PowClient) handleConnection() {
if client.connection == nil {
panic("Connection not established")
}
if client.powStream == nil {
panic("RPC Stream not established")
}
client.printMsg("Start receiving responses...")
// Responses
go func() {
for {
incomingRes, err := client.powStream.Recv()
if err == io.EOF {
close(client.outgoingReq)
// read done.
client.printMsg("Connection to server closed")
client.connectionError = err
client.connectionErrorOcurred <- true
return
}
if err != nil {
close(client.outgoingReq)
client.printMsg(fmt.Sprintf("Failed to receive: %v", err))
client.connectionError = err
client.connectionErrorOcurred <- true
return
}
client.printMsg(fmt.Sprintf("Received response: %v", incomingRes))
client.pendingRequestsLock.Lock()
pendingPowRequest, ok := client.pendingRequests[incomingRes.Identifier]
if ok {
pendingPowRequest.powResponseChan <- incomingRes
delete(client.pendingRequests, incomingRes.Identifier)
}
client.pendingRequestsLock.Unlock()
}
}()
// Requests
for {
for pendingPowRequest := range client.outgoingReq {
client.pendingRequestsLock.Lock()
client.pendingRequests[pendingPowRequest.powRequest.Identifier] = pendingPowRequest
client.pendingRequestsLock.Unlock()
err := client.powStream.Send(pendingPowRequest.powRequest)
if err != nil {
client.printMsg(fmt.Sprintf("Failed to send a message: %v", err))
}
}
}
}
// PowFunc does the POW
func (client *PowClient) PowFunc(trytes giota.Trytes, minWeightMagnitude int) (result giota.Trytes, Error error) {
if client.connectionError != nil {
return "", client.connectionError
}
if (minWeightMagnitude < 0) || (minWeightMagnitude > 243) {
return "", fmt.Errorf("minWeightMagnitude out of range [0-243]: %v", minWeightMagnitude)
}
if client.connection == nil {
return "", errors.New("Connection not established")
}
if client.powStream == nil {
return "", errors.New("RPC Stream not established")
}
client.requestIDLock.Lock()
client.requestID++
reqID := client.requestID
client.requestIDLock.Unlock()
pendingPowRequest := PendingPowRequest{powRequest: &api.PowRequest{Identifier: reqID, Trytes: string(trytes), Mwm: uint32(minWeightMagnitude)}, powResponseChan: make(chan *api.PowResponse, 1)}
client.outgoingReq <- pendingPowRequest
select {
case powResponse := <-pendingPowRequest.powResponseChan:
// Got response
close(pendingPowRequest.powResponseChan)
result, err := giota.ToTrytes(powResponse.Nonce)
if err != nil {
return "", err
}
return result, err
case <-time.After(time.Duration(client.ReadTimeOutMs) * time.Millisecond):
// Timeout
client.pendingRequestsLock.Lock()
delete(client.pendingRequests, reqID)
client.pendingRequestsLock.Unlock()
client.printMsg(fmt.Sprintf("Receive timeout! ReqID: %v", reqID))
return "", errors.New("Receive timeout")
case <-client.connectionErrorOcurred:
return "", client.connectionError
}
}
package powsrvio
import (
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/iotaledger/giota"
)
const (
TRYTE_CHARS string = "9ABCDEFGHIJKLMNOPQRSTUVWXYZ"
MWM int = 14
TestCount int = 10
WorkersCount int = 3
transaction string
)
var testCountSum = TestCount * WorkersCount
var attachedCount uint32
var ts time.Time
func printMsg(t *testing.T, msg string) {
println(msg)
t.Log(msg)
}
func runPowTest(t *testing.T, wg *sync.WaitGroup, id int, powClient *PowClient, count int) {
// test transaction data
randomTrytes := make([]rune, 256)
for i := 0; i < count; i++ {
for j := 0; j < 256; j++ {
randomTrytes[j] = rune(TRYTE_CHARS[rand.Intn(len(TRYTE_CHARS))])
}
data, err := giota.ToTrytes(string(randomTrytes) + transaction[256:])
if err != nil {
t.Error(err)
continue
}
printMsg(t, fmt.Sprintf("Worker %d started job %d\n", id, i))
response, err := powClient.PowFunc(data, MWM)
if err != nil {
t.Error(err)
continue
}
attachedCount++
avgTps := float32(attachedCount) / float32(time.Since(ts)/time.Second)
printMsg(t, fmt.Sprintf("Worker %d finished job %d with response: %v [%d/%d] attached! avg TPS: %0.2f\n", id, i, response, attachedCount, testCountSum, avgTps))
}
wg.Done()
}
func TestPOW(t *testing.T) {
var wg = &sync.WaitGroup{}
powClient := &PowClient{ReadTimeOutMs: 5000, Verbose: true}
powClient.Init()
defer powClient.Close()
ts = time.Now()
for worker := 0; worker < WorkersCount; worker++ {
wg.Add(1)
go runPowTest(t, wg, worker, powClient, TestCount)
}
wg.Wait()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment