...
 
Commits (95)
......@@ -7,3 +7,5 @@ maker.db
maker.yaml
maker.log
data
.docker_cache
Dockerfile
......@@ -4,6 +4,7 @@
/maker.yaml
/maker.log
/maker
/maker.pem
*-packr.go
*~
/dist
......
## ChangeLog
### 0.4.0 - 2019-04-06
- New command line option to set the data directory where the
maker.yaml, maker.db and log files are stored. This replaces the
options to set the config filename and log
filename. https://gitlab.com/crankykernel/maker/issues/38
- Store the configuration and database in a fixed location to avoid
having to move the files over upgrades. On Linux this directory is
~/.makertradingtool and on Windows it is
%appdata%\MakerTradingTool. It can be modified using the command
line option above to change the data
directory. https://gitlab.com/crankykernel/maker/issues/31
- Filter display of active trades to all, open closed.
https://gitlab.com/crankykernel/maker/issues/32
- Add TLS support. TLS support can be enabled with the `--tls` command
line option, or by binding to non-localhost.
https://gitlab.com/crankykernel/maker/issues/3
- Add authentication support. The `--auth` command line option is used
to enable authentication and will auto generate a strong password.
https://gitlab.com/crankykernel/maker/issues/1
- Move AccountInfo request (Binance) from the UI to the server so the
UI doesn't have to make any authenticated requests to
Binance. Solves the issue where the server on a VPS may have good
time sync, but the browser machine
doesn't. https://gitlab.com/crankykernel/maker/issues/52
- Provide visiable health status in the
UI. https://gitlab.com/crankykernel/maker/issues/42
- Add simple Binance balance view.
- Paginate trade history view. Too many trades can harm performance,
server side and in the browser.
- Highlight the BNB balance when
low. https://gitlab.com/crankykernel/maker/issues/44
[Full Changelog](https://gitlab.com/crankykernel/maker/compare/0.3.2...0.4.0)
### 0.3.2 - 2019-02-01
- By default hide API key/secret input, but add a checkbox to enable
it to be shown.
......
FROM centos:7
RUN yum -y install epel-release
RUN yum -y install \
git \
make \
gcc \
zip \
which \
patch
ENV NODE_VERSION 10.15.0
RUN cd /usr/local && curl -L -o - https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x64.tar.gz | tar zxf - --strip-components=1
ENV GO_VERSION 1.12
ENV GO_ROOT /usr/local/go
RUN cd /usr/local && curl -L -o - https://dl.google.com/go/go${GO_VERSION}.linux-amd64.tar.gz | tar zxf -
ENV PATH ${GO_ROOT}/bin:$PATH
WORKDIR /src
COPY / .
RUN make install-deps && make
FROM centos:7
WORKDIR /app
COPY --from=0 /src/maker .
ENTRYPOINT ["/app/maker", "server"]
......@@ -18,11 +18,18 @@ clean:
cd webapp && $(MAKE) $@
cd go && $(MAKE) $@
rm -rf dist
find . -name \*~ -delete
distclean: clean
cd webapp && $(MAKE) $@
cd go && $(MAKE) $@
gofmt:
cd go && $(MAKE) $@
docker:
docker build -t crankykernel/maker:latest .
#
# Release building.
#
......
......@@ -18,8 +18,7 @@ ENV NODE_VERSION 10.15.0
RUN cd /usr/local && \
curl -L -o - https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x64.tar.gz | tar zxf - --strip-components=1
ENV GO_VERSION 1.11.4
ENV GO_VERSION 1.12
RUN cd /usr/local && \
curl -L -o - https://dl.google.com/go/go${GO_VERSION}.linux-amd64.tar.gz | tar zxf -
......
......@@ -4,10 +4,13 @@ trap 'echo "Killing background jobs..."; kill $(jobs -p)' EXIT
(cd webapp && make update-version && npm start) &
args="$@"
export RACE="-race"
while true; do
find go -name \*.go | grep -v packr | \
entr -d -r sh -c "(cd go && make) && ./go/maker server"
entr -d -r sh -c "(cd go && make) && ./go/maker server ${args}"
done
kill $(jobs -p)
......@@ -21,16 +21,19 @@ $(APP): EXE := $(shell go env GOEXE)
$(APP): BIN ?= $(APP)$(EXE)
$(APP): DIR ?= .
$(APP):
test -e ../webapp/dist && $(GOPATH)/bin/packr -v -z || true
CGO_ENABLED=1 go build -o $(DIR)/$(BIN) \
test -e ../webapp/dist && GO111MODULE=on $(GOPATH)/bin/packr2 -v
CGO_ENABLED=1 go build $(RACE) -o $(DIR)/$(BIN) \
-ldflags "$(LDFLAGS)" \
-tags "$(GO_TAGS)"
install-deps:
go get -u github.com/gobuffalo/packr/...
go mod download
go get github.com/gobuffalo/packr/v2/packr2
gofmt:
go fmt ./...
clean:
$(GOPATH)/bin/packr2 clean
rm -f $(APP)
find . -name \*~ -delete
find . -name \*-packr.go -delete
......
// Copyright (C) 2019 Cranky Kernel
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package auth
import (
"bytes"
"encoding/hex"
"fmt"
"golang.org/x/crypto/argon2"
"math/rand"
"strings"
)
const (
TYPE_ARGON2ID = "argon2id"
)
const PASSWORD_TYPE = TYPE_ARGON2ID
const SALT_SIZE = 16
func genSalt() ([]byte, error) {
return genRandom(SALT_SIZE)
}
func genRandom(size int) ([]byte, error) {
bytes := make([]byte, size)
_, err := rand.Read(bytes)
return bytes, err
}
func encode(input []byte) string {
return hex.EncodeToString(input)
}
func decode(input string) ([]byte, error) {
return hex.DecodeString(input)
}
func CheckPassword(password string, encodedPassword string) (bool, error) {
passwordType, salt, passwordHash, err := DecodePassword(encodedPassword)
if err != nil {
return false, err
}
switch passwordType {
case TYPE_ARGON2ID:
hash := argon2.IDKey([]byte(password), salt, 1, 64*1024, 4, 32)
if bytes.Equal(hash, passwordHash) {
return true, nil
}
}
return false, nil
}
func DecodePassword(input string) (passwordType string, salt []byte, password []byte, err error) {
parts := strings.Split(input, "$")
if len(parts) != 3 {
err = fmt.Errorf("invalid encoded password")
} else {
passwordType = parts[0]
salt, err = decode(parts[1])
password, err = decode(parts[2])
}
return passwordType, salt, password, err
}
func EncodePassword(password string) (string, error) {
salt, err := genSalt()
if err != nil {
return "", err
}
encoded := argon2.IDKey([]byte(password), salt, 1, 64*1024, 4, 32)
return fmt.Sprintf("%s$%s$%s",
TYPE_ARGON2ID, encode(salt), encode(encoded)), nil
}
package auth
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestArgon2(t *testing.T) {
assert := assert.New(t)
encoded, err := EncodePassword("password")
assert.Nil(err)
passwordType, salt, password, err := DecodePassword(encoded)
assert.Nil(err)
assert.Equal(PASSWORD_TYPE, passwordType)
assert.NotEmpty(salt)
assert.NotEmpty(password)
ok, err := CheckPassword("password", encoded)
assert.Nil(err)
assert.True(ok)
ok, err = CheckPassword("password1", encoded)
assert.Nil(err)
assert.False(ok)
}
......@@ -17,28 +17,28 @@ package binanceex
import (
"fmt"
"gitlab.com/crankykernel/cryptotrader/binance"
"github.com/crankykernel/binanceapi-go"
"gitlab.com/crankykernel/maker/go/log"
"gitlab.com/crankykernel/maker/go/types"
"gitlab.com/crankykernel/maker/go/util"
)
type BinancePriceService struct {
anonymousClient *binance.RestClient
exchangeInfoService *binance.ExchangeInfoService
exchangeInfoService *ExchangeInfoService
client *binanceapi.RestClient
}
func NewBinancePriceService(exchangeInfoService *binance.ExchangeInfoService) *BinancePriceService {
func NewBinancePriceService(exchangeInfoService *ExchangeInfoService) *BinancePriceService {
return &BinancePriceService{
anonymousClient: binance.NewAnonymousClient(),
exchangeInfoService: exchangeInfoService,
client: binanceapi.NewRestClient(),
}
}
// GetLastPrice gets the most current close price from Binance using the REST
// API.
func (s *BinancePriceService) GetLastPrice(symbol string) (float64, error) {
ticker, err := s.anonymousClient.GetPriceTicker(symbol)
ticker, err := s.client.GetPriceTicker(symbol)
if err != nil {
return 0, err
}
......@@ -48,7 +48,7 @@ func (s *BinancePriceService) GetLastPrice(symbol string) (float64, error) {
// GetBestBidPrice gets the most current best bid price from Binance using
// the REST API.
func (s *BinancePriceService) GetBestBidPrice(symbol string) (float64, error) {
ticker, err := s.anonymousClient.GetOrderBookTicker(symbol)
ticker, err := s.client.GetBookTicker(symbol)
if err != nil {
return 0, err
}
......@@ -58,7 +58,7 @@ func (s *BinancePriceService) GetBestBidPrice(symbol string) (float64, error) {
// GetBestBidPrice gets the most current best bid price from Binance using
// the REST API.
func (s *BinancePriceService) GetBestAskPrice(symbol string) (float64, error) {
ticker, err := s.anonymousClient.GetOrderBookTicker(symbol)
ticker, err := s.client.GetBookTicker(symbol)
if err != nil {
return 0, err
}
......
// Copyright (C) 2018 Cranky Kernel
// Copyright (C) 2018-2019 Cranky Kernel
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
......@@ -16,14 +16,15 @@
package binanceex
import (
"sync"
"time"
"gitlab.com/crankykernel/cryptotrader/binance"
"github.com/gorilla/websocket"
"strings"
"encoding/json"
"github.com/crankykernel/binanceapi-go"
"gitlab.com/crankykernel/maker/go/clientnotificationservice"
"gitlab.com/crankykernel/maker/go/config"
"gitlab.com/crankykernel/maker/go/healthservice"
"gitlab.com/crankykernel/maker/go/log"
"strings"
"sync"
"time"
)
type StreamEventType string
......@@ -33,171 +34,180 @@ const (
EventTypeOutboundAccountInfo StreamEventType = "outboundAccountInfo"
)
type ListenKeyWrapper struct {
lock sync.Mutex
listenKey string
}
func NewListenKeyWrapper() *ListenKeyWrapper {
return &ListenKeyWrapper{}
}
func (k *ListenKeyWrapper) Set(listenKey string) {
k.lock.Lock()
defer k.lock.Unlock()
k.listenKey = listenKey
}
func (k *ListenKeyWrapper) Get() string {
k.lock.Lock()
defer k.lock.Unlock()
return k.listenKey
}
type UserStreamEvent struct {
EventType StreamEventType
EventTime time.Time
OutboundAccountInfo binance.StreamOutboundAccountInfo
ExecutionReport binance.StreamExecutionReport
OutboundAccountInfo binanceapi.StreamOutboundAccountInfo
ExecutionReport binanceapi.StreamExecutionReport
Raw []byte
}
type BinanceUserDataStream struct {
Subscribers map[chan *UserStreamEvent]bool
lock sync.RWMutex
Subscribers map[chan *UserStreamEvent]string
lock sync.RWMutex
listenKey *ListenKeyWrapper
notificationService *clientnotificationservice.Service
healthService *healthservice.Service
}
func NewBinanceUserDataStream() *BinanceUserDataStream {
func NewBinanceUserDataStream(notificationService *clientnotificationservice.Service,
healthService *healthservice.Service) *BinanceUserDataStream {
return &BinanceUserDataStream{
Subscribers: make(map[chan *UserStreamEvent]bool),
Subscribers: make(map[chan *UserStreamEvent]string),
listenKey: NewListenKeyWrapper(),
notificationService: notificationService,
healthService: healthService,
}
}
func (b *BinanceUserDataStream) Subscribe() (chan *UserStreamEvent) {
func (b *BinanceUserDataStream) Subscribe(name string) chan *UserStreamEvent {
b.lock.Lock()
defer b.lock.Unlock()
channel := make(chan *UserStreamEvent)
b.Subscribers[channel] = true
channel := make(chan *UserStreamEvent, 3)
b.Subscribers[channel] = name
return channel
}
func (b *BinanceUserDataStream) Unsubscribe(channel chan *UserStreamEvent) {
b.lock.Lock()
defer b.lock.Unlock()
b.Subscribers[channel] = false
delete(b.Subscribers, channel)
}
func (b *BinanceUserDataStream) Run() {
b.DoRun()
func (b *BinanceUserDataStream) ListenKeyRefreshLoop() {
for {
time.Sleep(time.Minute)
listenKey := b.listenKey.Get()
if listenKey == "" {
log.Debugf("No Binance user stream key set, will not refresh")
} else {
log.Debugf("Refreshing Binance user stream listen key")
client := GetBinanceRestClient()
if err := client.PutUserStreamKeepAlive(listenKey); err != nil {
log.WithError(err).Errorf("Failed to send Binance user stream keep alive.")
}
}
}
}
func (b *BinanceUserDataStream) DoRun() {
intervalDuration := time.Minute
intervalChannel := make(chan bool)
lastPong := time.Now()
func (b *BinanceUserDataStream) Run() {
configChannel := config.Subscribe()
go func() {
for {
time.Sleep(intervalDuration)
select {
case intervalChannel <- true:
default:
log.Errorf("Failed to send OK to interval channel.")
}
}
}()
go b.ListenKeyRefreshLoop()
goto Start
Fail:
select {
case intervalChannel <- false:
default:
}
b.listenKey.Set("")
b.notificationService.Broadcast(
clientnotificationservice.NewNotice(
clientnotificationservice.LevelError,
"Failed to connect to Binance user socket").
WithData(map[string]interface{}{
"binanceUserSocketState": "failed",
}))
b.healthService.Update(func(state *healthservice.State) {
state.BinanceUserSocketState = "connection failed"
})
time.Sleep(time.Second)
Start:
apiKey := config.GetString("binance.api.key")
// Wait for key to be set if needed.
if apiKey == "" {
log.Infof("Binance API key not set. Waiting for configuration update.")
<-configChannel
goto Start
}
restClient := binance.NewAuthenticatedClient(
config.GetString("binance.api.key"), "")
for {
// First we have to get the user stream listen key.
listenKey, err := GetBinanceRestClient().GetUserDataStream()
if err != nil {
log.WithError(err).Error("Failed to get Binance user stream key. Retyring.")
goto Fail
} else {
log.WithFields(log.Fields{}).Debugf("Acquired Binance user stream listen key")
}
// First we have to get the user stream listen key.
listenKey, err := restClient.GetUserDataStream()
if err != nil {
log.WithError(err).Error("Failed to get Binance user stream key. Retyring.")
goto Fail
}
userStream, err := binanceapi.OpenSingleStream(listenKey)
if err != nil {
log.WithError(err).Errorf("Failed to open Binance user stream")
goto Fail
}
b.listenKey.Set(listenKey)
log.Infof("Connected to Binance user stream websocket.")
userStream.Conn.SetPongHandler(func(appData string) error {
log.WithFields(log.Fields{
"data": appData,
}).Debugf("Received Binance user stream pong")
return nil
})
b.notificationService.Broadcast(
clientnotificationservice.NewNotice(
clientnotificationservice.LevelInfo,
"Connected to Binance user data stream.").WithData(map[string]interface{}{
"binanceUserSocketState": "ok",
}))
b.healthService.Update(func(state *healthservice.State) {
state.BinanceUserSocketState = "ok"
})
userStream, err := binance.OpenStream(listenKey)
for {
message, err := userStream.Next()
if err != nil {
log.Printf("Failed to open user data stream: %v", err)
log.WithError(err).Errorf("Failed to read next Binance user stream message")
goto Fail
}
log.Infof("Connected to Binance user stream websocket.");
userStream.Conn.SetPongHandler(func(appData string) error {
lastPong = time.Now()
return nil
})
go func() {
for {
if time.Now().Sub(lastPong) > intervalDuration*2 {
log.Errorf("Last user stream PONG received %v ago.", intervalDuration)
userStream.Close()
return
}
if err := userStream.Conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
log.WithError(err).Error("Failed to send user stream PING message.")
userStream.Close()
return
}
err := restClient.PutUserStreamKeepAlive(listenKey)
if err != nil {
log.WithError(err).Error("Failed to send user stream keep alive.")
userStream.Close()
return
}
select {
case ok := <-intervalChannel:
if !ok {
log.Infof("Binance user stream PING loop exiting.")
return
}
case <-configChannel:
log.Infof("Binance user stream: received configuration update.")
userStream.Close()
return
}
streamEvent := UserStreamEvent{}
streamEvent.Raw = message
switch {
case strings.HasPrefix(string(message), `{"e":"executionReport",`):
var orderUpdate binanceapi.StreamExecutionReport
if err := json.Unmarshal(message, &orderUpdate); err != nil {
log.WithError(err).Error("Failed to decode user stream executionReport message.")
continue
}
}()
for {
_, message, err := userStream.Next()
if err != nil {
log.WithError(err).Error("Failed to read next user stream message.")
goto Fail
}
streamEvent := UserStreamEvent{}
streamEvent.Raw = message
switch {
case strings.HasPrefix(string(message), `{"e":"executionReport",`):
var orderUpdate binance.StreamExecutionReport
if err := json.Unmarshal(message, &orderUpdate); err != nil {
log.WithError(err).Error("Failed to decode user stream executionReport message.")
continue
}
streamEvent.EventType = StreamEventType(orderUpdate.EventType)
streamEvent.EventTime = time.Unix(0, orderUpdate.EventTimeMillis*int64(time.Millisecond))
streamEvent.ExecutionReport = orderUpdate
case strings.HasPrefix(string(message), `{"e":"outboundAccountInfo",`):
if err := json.Unmarshal(message, &streamEvent.OutboundAccountInfo); err != nil {
log.WithError(err).Error("Failed to decode user stream outboundAccountInfo message.")
continue
}
streamEvent.EventType = StreamEventType(streamEvent.OutboundAccountInfo.EventType)
streamEvent.EventTime = time.Unix(0, streamEvent.OutboundAccountInfo.EventTimeMillis*int64(time.Millisecond))
streamEvent.EventType = StreamEventType(orderUpdate.EventType)
streamEvent.EventTime = time.Unix(0, orderUpdate.EventTimeMillis*int64(time.Millisecond))
streamEvent.ExecutionReport = orderUpdate
case strings.HasPrefix(string(message), `{"e":"outboundAccountInfo",`):
if err := json.Unmarshal(message, &streamEvent.OutboundAccountInfo); err != nil {
log.WithError(err).Error("Failed to decode user stream outboundAccountInfo message.")
continue
}
streamEvent.EventType = StreamEventType(streamEvent.OutboundAccountInfo.EventType)
streamEvent.EventTime = time.Unix(0, streamEvent.OutboundAccountInfo.EventTimeMillis*int64(time.Millisecond))
}
for channel := range b.Subscribers {
channel <- &streamEvent
b.lock.RLock()
for channel := range b.Subscribers {
select {
case channel <- &streamEvent:
}
}
b.lock.RUnlock()
}
}
......@@ -16,13 +16,14 @@
package binanceex
import (
"gitlab.com/crankykernel/cryptotrader/binance"
"github.com/crankykernel/binanceapi-go"
"gitlab.com/crankykernel/maker/go/config"
)
func GetBinanceRestClient() *binance.RestClient {
restClient := binance.NewAuthenticatedClient(
func GetBinanceRestClient() *binanceapi.RestClient {
client := binanceapi.NewRestClient().WithAuth(
config.GetString("binance.api.key"),
config.GetString("binance.api.secret"))
return restClient
return client
}
// The MIT License (MIT)
//
// Copyright (c) 2018-2019 Cranky Kernel
//
// Permission is hereby granted, free of charge, to any person
// obtaining a copy of this software and associated documentation
// files (the "Software"), to deal in the Software without
// restriction, including without limitation the rights to use, copy,
// modify, merge, publish, distribute, sublicense, and/or sell copies
// of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be
// included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
// BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package binanceex
import (
"fmt"
"github.com/crankykernel/binanceapi-go"
"gitlab.com/crankykernel/maker/go/log"
"sync"
)
type SymbolInfo struct {
TickSize float64
StepSize float64
MinNotional float64
}
type ExchangeInfoService struct {
Symbols map[string]SymbolInfo
lock sync.RWMutex
}
func NewExchangeInfoService() *ExchangeInfoService {
return &ExchangeInfoService{
Symbols: make(map[string]SymbolInfo),
}
}
func (s *ExchangeInfoService) Update() error {
exchangeInfo, err := binanceapi.NewRestClient().GetExchangeInfo()
if err != nil {
return err
}
s.lock.Lock()
defer s.lock.Unlock()
for _, symbol := range exchangeInfo.Symbols {
symbolInfo := SymbolInfo{}
for _, filter := range symbol.Filters {
switch filter.FilterType {
case "PRICE_FILTER":
symbolInfo.TickSize = filter.TickSize
case "MIN_NOTIONAL":
symbolInfo.MinNotional = filter.MinNotional
case "LOT_SIZE":
symbolInfo.StepSize = filter.StepSize
}
}
s.Symbols[symbol.Symbol] = symbolInfo
}
log.WithFields(log.Fields{
"symbols": len(s.Symbols),
}).Infof("Binance exchange info service updated")
return nil
}
// GetSymbol returns the symbol info object for the requested symbol.
func (s *ExchangeInfoService) GetSymbol(symbol string) (info SymbolInfo, err error) {
s.lock.RLock()
defer s.lock.RUnlock()
info, ok := s.Symbols[symbol]
if !ok {
return info, fmt.Errorf("symbol not found")
}
return info, nil
}
// GetTickSize returns the tick size for the requested symbol.
func (s *ExchangeInfoService) GetTickSize(symbol string) (float64, error) {
s.lock.RLock()
defer s.lock.RUnlock()
symbolInfo, ok := s.Symbols[symbol]
if !ok {
return 0, fmt.Errorf("symbol not found")
}
return symbolInfo.TickSize, nil
}
// GetMinNotional returns the minimum notional value for the requested symbol.
func (s *ExchangeInfoService) GetMinNotional(symbol string) (float64, error) {
s.lock.RLock()
defer s.lock.Unlock()
symbolInfo, ok := s.Symbols[symbol]
if !ok {
return 0, fmt.Errorf("symbol not found")
}
return symbolInfo.MinNotional, nil
}
// GetStepSize returns the step size for the requested symbol.
func (s *ExchangeInfoService) GetStepSize(symbol string) (float64, error) {
s.lock.RLock()
defer s.lock.RUnlock()
symbolInfo, ok := s.Symbols[symbol]
if !ok {
return 0, fmt.Errorf("symbol not found")
}
return symbolInfo.StepSize, nil
}
// Copyright (C) 2018-2019 Cranky Kernel
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package binanceex
import (
"encoding/json"
"fmt"
"gitlab.com/crankykernel/cryptotrader/binance"
"gitlab.com/crankykernel/maker/go/log"
"strings"
"sync"
"time"
)
type TickerStreamChannel chan binance.StreamTicker24
type TickerStreamManager struct {
mutex sync.RWMutex
subscriptions map[TickerStreamChannel]bool
streams map[string]*binance.StreamClient
streamCount map[string]int
}
func NewTickerStreamManager() *TickerStreamManager {
return &TickerStreamManager{
subscriptions: make(map[TickerStreamChannel]bool),
streams: make(map[string]*binance.StreamClient),
streamCount: make(map[string]int),
}
}
func (m *TickerStreamManager) Subscribe() TickerStreamChannel {
m.mutex.Lock()
defer m.mutex.Unlock()
channel := make(TickerStreamChannel)
m.subscriptions[channel] = true
return channel
}
func (m *TickerStreamManager) Unsubscribe(channel TickerStreamChannel) {
m.mutex.Lock()
defer m.mutex.Unlock()
if _, exists := m.subscriptions[channel]; !exists {
log.Errorf("Attempt to unsubscribe non existing channel")
}
m.subscriptions[channel] = false
delete(m.subscriptions, channel)
}
func (m *TickerStreamManager) AddTicker(ticker string) {
m.mutex.Lock()
defer m.mutex.Unlock()
_, exists := m.streamCount[ticker]
if exists {
m.streamCount[ticker] += 1
return
}
m.streamCount[ticker] = 1
go m.runStream(ticker)
}
func (m *TickerStreamManager) RemoveTicker(ticker string) {
m.mutex.Lock()
defer m.mutex.Unlock()
count, exists := m.streamCount[ticker]
if !exists {
return
}
if count > 1 {
m.streamCount[ticker] -= 1
} else {
delete(m.streamCount, ticker)
}
}
func (m *TickerStreamManager) streamRefCount(name string) int {
m.mutex.RLock()
defer m.mutex.RUnlock()
count, exists := m.streamCount[name]
if exists {
return count
}
return 0
}
func (m *TickerStreamManager) runStream(name string) {
Retry:
if m.streamRefCount(name) == 0 {
return
}
streamName := fmt.Sprintf("%s@ticker", strings.ToLower(name))
stream, err := binance.OpenStream(streamName)
if err != nil {
log.WithError(err).
WithField("stream", streamName).
Errorf("Failed to open ticker stream")
time.Sleep(1 * time.Second)
goto Retry
}
for {
_, payload, err := stream.Next()
if err != nil {
log.WithError(err).
WithField("stream", streamName).
Errorf("Failed to read ticker stream message")
stream.Close()
time.Sleep(1 * time.Second)
goto Retry
}
// Check if we still have subscribers.
count := m.streamRefCount(name)
if count == 0 {
log.WithFields(log.Fields{
"tickerStream": name,
}).Infof("Ticker reference count is zero, disconnected stream")
stream.Close()
return
}
var ticker binance.StreamTicker24
if err := json.Unmarshal(payload, &ticker); err != nil {
log.WithError(err).WithFields(log.Fields{
"name": name,
}).Errorf("Failed to decode ticker stream message")
continue
}
m.mutex.RLock()
for channel := range m.subscriptions {
channel <- ticker
}
m.mutex.RUnlock()
}
}
......@@ -18,52 +18,51 @@ package binanceex
import (
"encoding/json"
"fmt"
"gitlab.com/crankykernel/cryptotrader/binance"
"github.com/crankykernel/binanceapi-go"
"gitlab.com/crankykernel/maker/go/log"
"strings"
"sync"
"time"
)
type TradeStreamChannel chan *binance.StreamAggTrade
type TradeStreamChannel chan binanceapi.StreamAggTrade
type TradeStreamManager struct {
mutex sync.RWMutex
subscriptions map[TradeStreamChannel]bool
streams map[string]*binance.StreamClient
lock sync.RWMutex
subscriptions map[TradeStreamChannel]string
streams map[string]*binanceapi.Stream
streamCount map[string]int
}
func NewXTradeStreamManager() *TradeStreamManager {
func NewTradeStreamManager() *TradeStreamManager {
return &TradeStreamManager{
subscriptions: make(map[TradeStreamChannel]bool),
streams: make(map[string]*binance.StreamClient),
subscriptions: make(map[TradeStreamChannel]string),
streams: make(map[string]*binanceapi.Stream),
streamCount: make(map[string]int),
}
}
func (m *TradeStreamManager) Subscribe() TradeStreamChannel {
m.mutex.Lock()
defer m.mutex.Unlock()
channel := make(TradeStreamChannel)
m.subscriptions[channel] = true
func (m *TradeStreamManager) Subscribe(name string) TradeStreamChannel {
m.lock.Lock()
defer m.lock.Unlock()
channel := make(TradeStreamChannel, 128)
m.subscriptions[channel] = name
return channel
}
func (m *TradeStreamManager) Unsubscribe(channel TradeStreamChannel) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.lock.Lock()
defer m.lock.Unlock()
if _, exists := m.subscriptions[channel]; !exists {
log.Errorf("Attempt to unsubscribe non existing channel")
}
m.subscriptions[channel] = false
delete(m.subscriptions, channel)
}
func (m *TradeStreamManager) AddSymbol(symbol string) {
m.lock.Lock()
defer m.lock.Unlock()
symbol = strings.ToLower(symbol)
m.mutex.Lock()
defer m.mutex.Unlock()
_, exists := m.streamCount[symbol]
if exists {
m.streamCount[symbol] += 1
......@@ -74,9 +73,9 @@ func (m *TradeStreamManager) AddSymbol(symbol string) {
}
func (m *TradeStreamManager) RemoveSymbol(symbol string) {
m.lock.Lock()
defer m.lock.Unlock()
symbol = strings.ToLower(symbol)
m.mutex.Lock()
defer m.mutex.Unlock()
count, exists := m.streamCount[symbol]
if !exists {
return
......@@ -89,8 +88,8 @@ func (m *TradeStreamManager) RemoveSymbol(symbol string) {
}
func (m *TradeStreamManager) streamRefCount(name string) int {
m.mutex.RLock()
defer m.mutex.RUnlock()
m.lock.RLock()
defer m.lock.RUnlock()
count, exists := m.streamCount[name]
if exists {
return count
......@@ -104,7 +103,7 @@ Retry:
return
}
streamName := fmt.Sprintf("%s@aggTrade", strings.ToLower(name))
stream, err := binance.OpenStream(streamName)
stream, err := binanceapi.OpenSingleStream(streamName)
if err != nil {
log.WithError(err).
WithField("stream", streamName).
......@@ -112,8 +111,11 @@ Retry:
time.Sleep(1 * time.Second)
goto Retry
}
log.WithFields(log.Fields{
"symbol": name,
}).Infof("Connected to trade Binance aggTrade stream")
for {
_, payload, err := stream.Next()
payload, err := stream.Next()
if err != nil {
log.WithError(err).
WithField("stream", streamName).
......@@ -133,7 +135,7 @@ Retry:
return
}
var trade binance.StreamAggTrade
var trade binanceapi.StreamAggTrade
if err := json.Unmarshal(payload, &trade); err != nil {
log.WithError(err).WithFields(log.Fields{
"name": name,
......@@ -141,10 +143,15 @@ Retry:
continue
}
m.mutex.RLock()
m.lock.RLock()
for channel := range m.subscriptions {
channel <- &trade
select {
case channel <- trade:
default:
log.Warnf("Failed to send Binance trade to channel [%s], would block",
m.subscriptions[channel])
}
}
m.mutex.RUnlock()
m.lock.RUnlock()
}
}
// Copyright (C) 2018 Cranky Kernel
// Copyright (C) 2018-2019 Cranky Kernel
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
......@@ -13,46 +13,54 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package server
package clientnotificationservice
import "sync"
type ClientNoticeLevel string
type Level string
const ClientNoticeLevelWarning = "warning"
const LevelInfo = "info"
const LevelWarning = "warning"
const LevelError = "error"
type ClientNotice struct {
Level ClientNoticeLevel `json:"level"`
Message string `json:"message"`
type Notice struct {
Level Level `json:"level"`
Message string `json:"message"`
Data map[string]interface{} `json:"data"`
}
func NewClientNotice(level ClientNoticeLevel, msg string) ClientNotice {
return ClientNotice{
Level: level,
func NewNotice(level Level, msg string) *Notice {
return &Notice{
Level: level,
Message: msg,
}
}
type ClientNoticeService struct {
lock sync.RWMutex
subscribers map[chan ClientNotice]bool
func (n *Notice) WithData(data map[string]interface{}) *Notice {
n.Data = data
return n
}
func NewClientNoticeService() *ClientNoticeService {
return &ClientNoticeService{
subscribers: make(map[chan ClientNotice]bool),
type Service struct {
lock sync.RWMutex
subscribers map[chan *Notice]bool
}
func New() *Service {
return &Service{
subscribers: make(map[chan *Notice]bool),
}
}
func (s *ClientNoticeService) Subscribe() chan ClientNotice {
func (s *Service) Subscribe() chan *Notice {
s.lock.Lock()
defer s.lock.Unlock()
channel := make(chan ClientNotice)
channel := make(chan *Notice)
s.subscribers[channel] = true
return channel
}
func (s *ClientNoticeService) Unsubscribe(channel chan ClientNotice) {
func (s *Service) Unsubscribe(channel chan *Notice) {
s.lock.Lock()
defer s.lock.Unlock()
if _, exists := s.subscribers[channel]; !exists {
......@@ -61,10 +69,10 @@ func (s *ClientNoticeService) Unsubscribe(channel chan ClientNotice) {
delete(s.subscribers, channel)
}
func (s *ClientNoticeService) Broadcast(notice ClientNotice) {
func (s *Service) Broadcast(notice *Notice) {
s.lock.RLock()
defer s.lock.RUnlock()
for channel := range s.subscribers {
channel <- notice
}
s.lock.RUnlock()
}
\ No newline at end of file
}
// Copyright (C) 2018 Cranky Kernel
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"github.com/spf13/cobra"
"gitlab.com/crankykernel/maker/go/gencert"
)
var opts gencert.Flags
var gencertCmd = &cobra.Command{