Commit 7314f0c5 authored by Erin Krengel's avatar Erin Krengel
Browse files

change outbound msg + make svc smaller

parent 05b0d1be
FROM golang:1.13.1
FROM golang:1.13.1 as build-env
WORKDIR /app
......@@ -6,5 +6,8 @@ COPY . .
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o demo-app .
EXPOSE 8080
ENTRYPOINT ["./demo-app"]
FROM scratch
COPY --from=build-env /app/demo-app /go/bin/demo-app
COPY --from=build-env /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
EXPOSE 8080
ENTRYPOINT ["/go/bin/demo-app"]
\ No newline at end of file
......@@ -2,6 +2,10 @@ FROM golang:1.13.1
WORKDIR /app
COPY go.mod .
COPY go.sum .
RUN go mod download
COPY . .
ENTRYPOINT go test -tags=acceptance -v
......@@ -17,16 +17,21 @@ import (
"cloud.google.com/go/pubsub"
"cloud.google.com/go/storage"
uuid "github.com/satori/go.uuid"
)
// OutboundMessage is the message sent out to notify consumers
// of the new object.
type OutboundMessage struct {
ObjectID string `json:"objectId"`
}
func TestSimpleHappyPath(t *testing.T) {
project := getConfigurationValue("PROJECT")
serviceIP := getConfigurationValue("SERVICE_IP")
bucketName := getConfigurationValue("BUCKET")
subscriptionName := getConfigurationValue("SUBSCRIPTION")
ctx, cancelCtx := context.WithTimeout(context.Background(), 3*time.Minute)
ctx, cancelCtx := context.WithTimeout(context.Background(), 2*time.Minute)
log.Println("starting pubsub client")
pubsubClient, err := pubsub.NewClient(ctx, project)
......@@ -34,12 +39,10 @@ func TestSimpleHappyPath(t *testing.T) {
log.Fatalf("cannot create pubsub client: %v", err)
}
testMsg := map[string]string{
"key": uuid.NewV4().String(),
"data": "ruby is awesome",
}
testMsg := map[string]string{"data": "Ruby is awesome!"}
bodyBytes, _ := json.Marshal(testMsg)
var objectID string
t.Run("posting a message returns 202", func(t *testing.T) {
url := fmt.Sprintf("http://%s/message", serviceIP)
resp, err := http.Post(url, "application/json", bytes.NewBuffer(bodyBytes))
......@@ -50,21 +53,55 @@ func TestSimpleHappyPath(t *testing.T) {
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("expected a 202 response code, received : %d", resp.StatusCode)
}
defer resp.Body.Close()
rawBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("unable read body: %v", err)
}
var body OutboundMessage
err = json.Unmarshal(rawBody, &body)
if err != nil {
t.Fatalf("unable unmarshal response body: %v", err)
}
objectID = body.ObjectID
})
t.Run("message is sent to PubSub topic", func(t *testing.T) {
sub := pubsubClient.Subscription(subscriptionName)
var receivedMsg OutboundMessage
err := sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
err = json.Unmarshal(m.Data, &receivedMsg)
if err != nil {
t.Fatalf("unable to unmarshal received msg: %v", err)
}
m.Ack()
cancelCtx()
})
if err != nil {
t.Fatalf("unable to receive from subscription: %v", err)
}
if objectID != receivedMsg.ObjectID {
t.Fatalf("objectId mismatch. expected: %s, received: %s", objectID, receivedMsg.ObjectID)
}
})
t.Run("message is stored in bucket", func(t *testing.T) {
ctx = context.Background()
client, err := storage.NewClient(ctx)
bkt := client.Bucket(bucketName)
fileName := testMsg["key"]
reader, err := bkt.Object(fileName).NewReader(ctx)
reader, err := bkt.Object(objectID).NewReader(ctx)
if err != nil {
t.Fatalf("readFile: unable to open file from bucket %q, file %q: %v", bucketName, fileName, err)
t.Fatalf("readFile: unable to open file from bucket %q, file %q: %v", bucketName, objectID, err)
}
defer reader.Close()
bytes, err := ioutil.ReadAll(reader)
if err != nil {
t.Fatalf("readFile: unable to read data from bucket %q, file %q: %v", bucketName, fileName, err)
t.Fatalf("readFile: unable to read data from bucket %q, file %q: %v", bucketName, objectID, err)
}
storedMsg := make(map[string]string)
......@@ -74,29 +111,12 @@ func TestSimpleHappyPath(t *testing.T) {
}
// Delete object so we can delete the bucket.
err = bkt.Object(fileName).Delete(ctx)
err = bkt.Object(objectID).Delete(ctx)
if err != nil {
t.Logf("deleteFile: unable to delete data from bucket %q, file %q: %v", bucketName, fileName, err)
t.Logf("deleteFile: unable to delete data from bucket %q, file %q: %v", bucketName, objectID, err)
}
})
t.Run("message is sent to PubSub topic", func(t *testing.T) {
sub := pubsubClient.Subscription(subscriptionName)
receivedMsg := make(map[string]string)
err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
json.Unmarshal(m.Data, &receivedMsg)
m.Ack()
cancelCtx()
})
if err != nil {
t.Fatalf("unable to receive from subscription: %v", err)
}
if !reflect.DeepEqual(testMsg, receivedMsg) {
t.Fatalf("expected: %s, received: %s", testMsg, receivedMsg)
}
})
}
func TestSimpleSadPath(t *testing.T) {
......
......@@ -8,6 +8,6 @@ require (
cloud.google.com/go/storage v1.0.0
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/pkg/errors v0.8.1
github.com/satori/go.uuid v1.2.0 // indirect
github.com/satori/go.uuid v1.2.0
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a // indirect
)
......@@ -31,7 +31,9 @@ const topic = new gcp.pubsub.Topic(name, {
export const topicName = topic.name;
// Create egress topic.
export const bucket = new gcp.storage.Bucket(name + "-bucket");
export const bucket = new gcp.storage.Bucket(name + "-bucket", {
location: "us-west1",
});
new gcp.storage.BucketIAMMember(name, {
member: "serviceAccount:" + svcAccountOutput.email,
......
package apitype
// Message is the struct the message handler expects.
type Message struct {
Key string `json:"key"`
// InboundMessage is the struct the message handler expects.
type InboundMessage struct {
Data string `json:"data"`
}
// OutboundMessage is the message sent out to notify consumers
// of the new object.
type OutboundMessage struct {
ObjectID string `json:"objectId"`
}
......@@ -11,6 +11,7 @@ import (
"cloud.google.com/go/pubsub"
"cloud.google.com/go/storage"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"gitlab.com/rocore/demo-app/pkg/apitype"
)
......@@ -32,57 +33,63 @@ func MessageHandler(topic *pubsub.Topic, bkt *storage.BucketHandle) func(http.Re
return
}
var incomingMsg apitype.Message
err = json.Unmarshal(msgData, &incomingMsg)
var inboundMsg apitype.InboundMessage
err = json.Unmarshal(msgData, &inboundMsg)
if err != nil {
log.Printf("error unmarshalling inbound msg: %v", err)
w.WriteHeader(http.StatusBadRequest)
return
}
err = writeToBucket(ctx, bkt, incomingMsg)
fileName, err := writeToBucket(ctx, bkt, inboundMsg)
if err != nil {
log.Printf("error writing to bucket: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = writeToPubSub(ctx, topic, incomingMsg)
outboundBytes, err := writeToPubSub(ctx, topic, fileName)
if err != nil {
log.Printf("error writing to pubsub: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusAccepted)
w.Write(msgData)
w.Write(outboundBytes)
}
}
func writeToBucket(ctx context.Context, bkt *storage.BucketHandle, incomingMsg apitype.Message) error {
func writeToBucket(ctx context.Context, bkt *storage.BucketHandle, inboundMsg apitype.InboundMessage) (string, error) {
log.Println("writing to bucket")
obj := bkt.Object(incomingMsg.Key)
fileName := uuid.NewV4().String()
obj := bkt.Object(fileName)
w := obj.NewWriter(ctx)
defer w.Close()
bytes, err := json.Marshal(incomingMsg)
bytes, err := json.Marshal(inboundMsg)
if err != nil {
return errors.Wrap(err, "unable to marshal data")
return "", errors.Wrap(err, "unable to marshal data")
}
if _, err := w.Write(bytes); err != nil {
return errors.Wrap(err, "unable to write data to bucket")
return "", errors.Wrap(err, "unable to write data to bucket")
}
return nil
return fileName, nil
}
func writeToPubSub(ctx context.Context, topic *pubsub.Topic, incomingMsg apitype.Message) error {
func writeToPubSub(ctx context.Context, topic *pubsub.Topic, fileName string) ([]byte, error) {
log.Println("writing to PubSub")
bytes, err := json.Marshal(incomingMsg)
outboundMsg := apitype.OutboundMessage{ObjectID: fileName}
log.Printf("send msg: %v", outboundMsg)
bytes, err := json.Marshal(outboundMsg)
if err != nil {
return errors.Wrap(err, "unable to marshal data")
return nil, errors.Wrap(err, "unable to marshal outbound message")
}
msg := pubsub.Message{Data: bytes}
result := topic.Publish(ctx, &msg)
if _, err := result.Get(ctx); err != nil {
return errors.Wrap(err, "unable to write data to PubSub")
return nil, errors.Wrap(err, "unable to write data to PubSub")
}
return nil
return bytes, nil
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment