Commit d712ff58 authored by Erin Krengel's avatar Erin Krengel
Browse files

add comments

parent ea7a334e
......@@ -44,22 +44,29 @@ build-acceptance-test:
acceptance-test:
stage: acceptance-test
variables:
# Set the environment based on the pipeline, so we can create multiple ephemeral environments at once.
ENV: "test-$CI_PIPELINE_ID"
before_script:
- gcloud auth activate-service-account $CI_SVC_ACCOUNT_EMAIL --key-file="$GOOGLE_APPLICATION_CREDENTIALS"
script:
- cd infrastructure
- npm install
# Create a new stack for our new ephemeral environment.
- pulumi stack init rocore/$ENV-app
# Set the appropriate configuration for this new stack.
- pulumi config set DOCKER_TAG $DOCKER_TAG
- pulumi config set ENV $ENV
- pulumi config set gcp:project rocore-k8s
- pulumi config set gcp:zone us-west1-a
# The pulumi up command will spin up our GCP and K8s resources, including our acceptance tests since
# our environment begins with "test". We will skip the preview (e.g. manual verification of changes)
# since this is running in CI.
- pulumi up --skip-preview
after_script:
- gcloud auth activate-service-account $CI_SVC_ACCOUNT_EMAIL --key-file="$GOOGLE_APPLICATION_CREDENTIALS"
- cd infrastructure
- pulumi stack select rocore/$ENV-app
# Get the logs of our acceptance tests.
- kubectl logs -n rocore --selector=job-name=$(pulumi stack output acceptanceJobName)
- pulumi destroy --skip-preview -s rocore/$ENV-app
- echo "rocore/$ENV-app" | pulumi stack rm rocore/demo-app/$ENV-app
......
......@@ -29,6 +29,9 @@ type config struct {
var testConfig config
func TestMain(m *testing.M) {
// Set up our acceptance test configuration. These values are
// required, so we will use a helper function that exits if the
// environment variable is not set.
testConfig = config{
project: getConfigurationValue("PROJECT"),
serviceIP: getConfigurationValue("SERVICE_IP"),
......@@ -46,11 +49,14 @@ func TestSimpleHappyPath(t *testing.T) {
}
testMsg := map[string]string{"message": "Ruby is awesome!"}
bodyBytes, _ := json.Marshal(testMsg)
var objectName string
t.Run("posting a message returns 202", func(t *testing.T) {
url := fmt.Sprintf("http://%s/message", testConfig.serviceIP)
bodyBytes, err := json.Marshal(testMsg)
if err != nil {
t.Fatalf("unable to marshal test msg: %v", err)
}
resp, err := http.Post(url, "application/json", bytes.NewBuffer(bodyBytes))
if err != nil {
t.Fatalf("unable to hit endpoint: %v", err)
......@@ -91,7 +97,7 @@ func TestSimpleHappyPath(t *testing.T) {
}
if objectName != receivedMsg["name"] {
t.Fatalf("name mismatch. expected: %s, received: %s", objectName, receivedMsg["name"])
t.Fatalf("object name mismatch. expected: %s, received: %s", objectName, receivedMsg["name"])
}
})
......@@ -115,8 +121,7 @@ func TestSimpleHappyPath(t *testing.T) {
if !reflect.DeepEqual(testMsg, storedMsg) {
t.Fatalf("expected: %s, stored: %s", testMsg, storedMsg)
}
// Delete object so we can delete the bucket.
// Clean up after our test. Delete the object so we can delete the bucket.
err = bkt.Object(objectName).Delete(ctx)
if err != nil {
t.Logf("deleteFile: unable to delete data from bucket %q, file %q: %v", testConfig.bucketName, objectName, err)
......@@ -125,7 +130,7 @@ func TestSimpleHappyPath(t *testing.T) {
}
func TestSimpleSadPath(t *testing.T) {
t.Run("posting a incorrect message returns 400", func(t *testing.T) {
t.Run("posting a incorrectly formatted message returns 400", func(t *testing.T) {
bodyBytes, _ := json.Marshal("this wont unmarshal")
url := fmt.Sprintf("http://%s/message", testConfig.serviceIP)
resp, err := http.Post(url, "application/json", bytes.NewBuffer(bodyBytes))
......
import * as k8s from "@pulumi/kubernetes";
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as svcKey from "./svcKey";
interface AcceptanceConfig {
env: string;
......@@ -16,55 +17,54 @@ interface AcceptanceConfig {
}
export function SetupAcceptanceTests(config: AcceptanceConfig): k8s.batch.v1.Job {
const name = `${config.env}-demo-app-acc-test`
const name = `${config.env}-demo-app-acc-test`;
// Create subscription on the app's topic.
const subscription = new gcp.pubsub.Subscription(`${config.env}-rocore-subscription`, {
const subscription = new gcp.pubsub.Subscription(name, {
topic: config.topic.name,
});
// Give acceptance test svc account permission to subscribe from the subscription.
new gcp.pubsub.SubscriptionIAMMember(name, {
member: "serviceAccount:" + config.accSvcAccountEmail,
member: `serviceAccount:${config.accSvcAccountEmail}`,
role: "roles/pubsub.subscriber",
subscription: subscription.name,
})
});
// Give acceptance test svc account permission to read from bucket.
// Give acceptance test svc account object admin permissions for bucket.
// This is required so the acceptance test can delete the object created
// during our tests and therefore we can cleanly delete the bucket.
new gcp.storage.BucketIAMMember(name, {
member: "serviceAccount:" + config.accSvcAccountEmail,
member: `serviceAccount:${config.accSvcAccountEmail}`,
role: "roles/storage.objectAdmin",
bucket: config.bucket.name,
})
});
// Create a service account key for the acceptance test account.
const serviceAccountKey = new gcp.serviceAccount.Key(name, {
serviceAccountId: config.accSvcAccountId,
})
const serviceAccountKey = svcKey.getSvcKey(name, config.accSvcAccountId);
const decodedKey = serviceAccountKey.privateKey.apply(key => {
return Buffer.from(key, 'base64').toString('binary');;
});
// Create a K8s secret from the service account key to mount to our K8s job.
const secret = new k8s.core.v1.Secret(name, {
metadata: {
name: `${config.env}-google-cloud-key-acc-test`,
},
stringData: { "key.json": decodedKey },
stringData: { "key.json": serviceAccountKey },
}, { provider: config.clusterProvider });
// Create the job for our acceptance tests.
const jobName = `${name}-job`
// Create the job for our acceptance tests. The job will only run one time.
// By default, `pulumi up` will wait for the job to complete and only succeed
// if the container (aka our acceptance tests) exit successfully. The means we
// can simply run `pulumi up` to run our acceptance tests and determine if they
// succeeded or not.
const appLabels = { appClass: name };
return new k8s.batch.v1.Job(jobName,
return new k8s.batch.v1.Job(name,
{
metadata: {
labels: appLabels,
},
metadata: { labels: appLabels },
spec: {
template: {
spec: {
containers: [{
name: jobName,
name: name,
image: `rocore/demo-app-acceptance:${config.dockerTag}`,
imagePullPolicy: "Always",
env: [
......@@ -92,12 +92,8 @@ export function SetupAcceptanceTests(config: AcceptanceConfig): k8s.batch.v1.Job
},
{
provider: config.clusterProvider,
customTimeouts: {
create: "5m",
},
dependsOn: [
config.appDeployment,
]
customTimeouts: { create: "5m" },
dependsOn: [config.appDeployment]
},
);
}
......@@ -2,7 +2,9 @@ import * as k8s from "@pulumi/kubernetes";
import * as pulumi from "@pulumi/pulumi";
import * as gcp from "@pulumi/gcp";
import * as acceptance from "./acceptance";
import * as svcKey from "./svcKey";
// Get GCP project configuration value.
const gcpConfig = new pulumi.Config("gcp");
const project = gcpConfig.require("project");
......@@ -11,68 +13,71 @@ const config = new pulumi.Config("demo-app");
const ENV = config.require("ENV");
const DOCKER_TAG = config.require("DOCKER_TAG");
// Global Stack reference and outputs.
// Get the global stack reference and its outputs. These are our long-lived
// pieces of infrastructure such as our Kubernetes cluster and two service
// accounts. We have one service account that our application will use and
// one that our acceptance tests will use.
const globalStackRef = new pulumi.StackReference("rocore/global-infra/global-infra");
const kubeconfigOutput = globalStackRef.getOutput("kubeconfig");
const namespaceOutput = globalStackRef.getOutputSync("namespace");
let svcAccountOutput = globalStackRef.getOutputSync("testSvcAccount");
const accTestSvcAccount = globalStackRef.getOutputSync("acceptanceTestSvcAccount");
// By default, we're going to use a "test" service account for our application.
// If the environment is production, we will use the production service account.
if (ENV === "prod") {
svcAccountOutput = globalStackRef.getOutputSync("prodSvcAccount");
}
const name = ENV + "-demo-app";
// We prepend the environment to the name used for all
// resources. This allows us to run this same program multiple
// times in different environments and not have resources with
// the duplicate names.
const name = `${ENV}-demo-app`;
// Create a Kubernetes provider from the global stack reference.
// We will use this make sure our K8s resources are created in the
// correct cluster and namespace.
const clusterProvider = new k8s.Provider(name, {
kubeconfig: kubeconfigOutput,
namespace: namespaceOutput.metadata.name,
});
// Create egress topic.
export const topic = new gcp.pubsub.Topic(name, {
name: name + "-topic",
export const topic = new gcp.pubsub.Topic(name);
// Give the Service permission to publish to the topic.
new gcp.pubsub.TopicIAMMember(name, {
member: `serviceAccount:${svcAccountOutput.email}`,
role: "roles/pubsub.publisher",
topic: topic.name,
});
// Create bucket.
export const bucket = new gcp.storage.Bucket(name + "-bucket", {
export const bucket = new gcp.storage.Bucket(name, {
location: "us-west1",
});
// Give the Service permission to create objects.
new gcp.storage.BucketIAMMember(name, {
member: "serviceAccount:" + svcAccountOutput.email,
member: `serviceAccount:${svcAccountOutput.email}`,
role: "roles/storage.objectCreator",
bucket: bucket.name,
})
// Give the Service permission tp publish to the topic.
new gcp.pubsub.TopicIAMMember(name, {
member: "serviceAccount:" + svcAccountOutput.email,
role: "roles/pubsub.publisher",
topic: topic.name,
})
});
// Create service account key.
const serviceAccountKey = new gcp.serviceAccount.Key(name, {
serviceAccountId: svcAccountOutput.id,
})
const serviceAccountKey = svcKey.getSvcKey(name, svcAccountOutput.id);
// Create a Kubernetes provider instance that uses our cluster from above.
const clusterProvider = new k8s.Provider(name, {
kubeconfig: kubeconfigOutput,
namespace: namespaceOutput.metadata.name,
});
// Create a secret from the key.
const decodedKey = serviceAccountKey.privateKey.apply(key => {
return Buffer.from(key, 'base64').toString('binary');;
});
// Create a K8s secret from the service account key to mount to our K8s deployment.
const secret = new k8s.core.v1.Secret(name, {
metadata: { name: ENV + "-google-cloud-key" },
stringData: { "key.json": decodedKey },
metadata: { name: `${ENV}-google-cloud-key` },
stringData: { "key.json": serviceAccountKey },
}, { provider: clusterProvider });
// Create a K8s Deployment for an application.
// Create a K8s Deployment for our application.
const appLabels = { appClass: name };
const deployment = new k8s.apps.v1.Deployment(name, {
metadata: {
labels: appLabels,
},
metadata: { labels: appLabels },
spec: {
replicas: 1,
selector: { matchLabels: appLabels },
......@@ -82,18 +87,22 @@ const deployment = new k8s.apps.v1.Deployment(name, {
containers: [
{
name: name,
image: "rocore/demo-app:" + DOCKER_TAG,
image: `rocore/demo-app:${DOCKER_TAG}`,
imagePullPolicy: "Always",
ports: [{ name: "http", containerPort: 8080 }],
env: [
{ name: "TOPIC", value: topic.name },
{ name: "BUCKET", value: bucket.name },
{ name: "PROJECT", value: project },
{ name: "GOOGLE_APPLICATION_CREDENTIALS", value: "/var/secrets/google/key.json" },
],
volumeMounts: [
{ name: "google-cloud-key", mountPath: "/var/secrets/google" }
{
name: "GOOGLE_APPLICATION_CREDENTIALS",
value: "/var/secrets/google/key.json"
},
],
volumeMounts: [{
name: "google-cloud-key",
mountPath: "/var/secrets/google"
}],
livenessProbe: {
httpGet: {
path: "/ping",
......@@ -112,41 +121,24 @@ const deployment = new k8s.apps.v1.Deployment(name, {
},
}
],
volumes: [
{
name: "google-cloud-key",
secret: { secretName: secret.metadata.name }
}
]
volumes: [{
name: "google-cloud-key",
secret: { secretName: secret.metadata.name }
}]
}
}
},
},
{
provider: clusterProvider,
customTimeouts: {
create: "2m",
update: "2m"
},
},
);
// Export the Deployment name.
export const deploymentName = deployment.metadata.name;
}, { provider: clusterProvider });
// Create a LoadBalancer Service for the K8s Deployment.
const service = new k8s.core.v1.Service(name, {
metadata: {
labels: appLabels,
},
metadata: { labels: appLabels },
spec: {
type: "ClusterIP",
ports: [{ port: 80, targetPort: 8080 }],
selector: appLabels,
},
},
{ provider: clusterProvider },
);
}, { provider: clusterProvider });
// If its a test environment, set up acceptance tests.
let job: k8s.batch.v1.Job | undefined;
......@@ -162,6 +154,9 @@ if (ENV.startsWith("test")) {
appDeployment: deployment,
appService: service,
clusterProvider: clusterProvider,
})
});
}
// Export the acceptance job name, so we can get the logs from our
// acceptance tests.
export const acceptanceJobName = job ? job.metadata.name : "unapplicable";
import * as gcp from "@pulumi/gcp";
import * as pulumi from "@pulumi/pulumi";
export function getSvcKey(name: string, svcAccountId: string): pulumi.Output<string> {
// Create service account key.
const serviceAccountKey = new gcp.serviceAccount.Key(name, {
serviceAccountId: svcAccountId,
})
// Decode key.
return serviceAccountKey.privateKey.apply(key => {
return Buffer.from(key, 'base64').toString('binary');;
});
}
\ No newline at end of file
......@@ -14,26 +14,19 @@ import (
func main() {
ctx := context.Background()
// Get configuration from environment variables.
// Get configuration from environment variables. These are
// required configuration values, so we will use an helper
// function get the values and exit if the value is not set.
project := getConfigurationValue("PROJECT")
topicName := getConfigurationValue("TOPIC")
bucketName := getConfigurationValue("BUCKET")
// Set up the PubSub client.
pubsubClient, err := pubsub.NewClient(ctx, project)
if err != nil {
log.Fatalf("cannot create pubsub client: %v", err)
}
topic := pubsubClient.Topic(topicName)
// Set up the GCP resources we need.
topic := getPubSubTopic(ctx, project, topicName)
bkt := getBucketHandle(ctx, bucketName)
// Set up Storage bucket.
storageClient, err := storage.NewClient(ctx)
if err != nil {
log.Fatalf("cannot create storage client: %v", err)
}
bkt := storageClient.Bucket(bucketName)
// Create handlers.
// Create two handlers. One is a basic health endpoint and
// the other is the handler we will be testing.
mux := http.NewServeMux()
mux.HandleFunc("/ping", handlers.PingHandler)
mux.HandleFunc("/message", handlers.MessageHandler(topic, bkt))
......@@ -53,3 +46,19 @@ func getConfigurationValue(envVar string) string {
log.Printf("%s: %s", envVar, value)
return value
}
func getPubSubTopic(ctx context.Context, project, topicName string) *pubsub.Topic {
pubsubClient, err := pubsub.NewClient(ctx, project)
if err != nil {
log.Fatalf("cannot create pubsub client: %v", err)
}
return pubsubClient.Topic(topicName)
}
func getBucketHandle(ctx context.Context, bucketName string) *storage.BucketHandle {
storageClient, err := storage.NewClient(ctx)
if err != nil {
log.Fatalf("cannot create storage client: %v", err)
}
return storageClient.Bucket(bucketName)
}
......@@ -3,7 +3,6 @@ package handlers
import (
"context"
"encoding/json"
"io/ioutil"
"log"
"net/http"
......@@ -15,26 +14,29 @@ import (
"gitlab.com/rocore/demo-app/pkg/apitype"
)
// MessageHandler returns a handler for receiving messages.
// MessageHandler returns a handler that stores the messages it receives.
func MessageHandler(topic *pubsub.Topic, bkt *storage.BucketHandle) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusNotFound)
return
}
ctx := r.Context()
// Read the request body.
defer r.Body.Close()
msgData, err := ioutil.ReadAll(r.Body)
if err != nil {
handleServerError(w, err, "error retrieving body")
return
}
// Read inbound request.
// Unmarshal the request body bytes into the inbound request,
// which is a simple JSON object with a message field. If we
// cannot unmarshal the request, we return a 400 to the client.
var inboundRequest apitype.InboundRequest
err = json.Unmarshal(msgData, &inboundRequest)
if err != nil {
log.Printf("error unmarshalling inbound msg: %v", err)
w.WriteHeader(http.StatusBadRequest)
return
}
......@@ -46,7 +48,8 @@ func MessageHandler(topic *pubsub.Topic, bkt *storage.BucketHandle) func(http.Re
return
}
// Generate outbound message.
// Generate the outbound message which includes the
// object name where we stored the new message.
outboundMsg := apitype.NewMessageAlert{Name: fileName}
outboundBytes, err := json.Marshal(outboundMsg)
if err != nil {
......@@ -54,13 +57,14 @@ func MessageHandler(topic *pubsub.Topic, bkt *storage.BucketHandle) func(http.Re
return
}
// Send a notification about the new message.
// Send a notification that we received a new message.
err = writeToPubSub(ctx, topic, outboundBytes)
if err != nil {
handleServerError(w, err, "error writing to pubsub")
return
}
// Write the response to client.
w.WriteHeader(http.StatusAccepted)
w.Write(outboundBytes)
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment