Skip to content
Snippets Groups Projects
Select Git revision
  • dbickford/consolidate-version-script
  • main default protected
  • hhoerl/359825-fetch-sub-remotes
  • toon-use-new-git-clone
  • ajwalker/shell-wrap
  • cache-detect-bucket-location
  • k8s-promote-ff-retrieve-pod-warning-events
  • ggeorgiev/upgrade-ebpf
  • ajwalker/refactor-config-opts
  • bcarranza-use-correct-log_format-value
  • avonbertoldi/bump-go-to-1.23.5
  • 17-9-stable protected
  • axil-cherry-pick-hugo-shortcodes
  • fix/cyclonedx-upload-fail-non-english-dependency-path
  • 38586-windows-runner-does-not-capture-exit-code-0xffffffff-from-nuke-build-running-in-powershell-so
  • improve-concurrency-messages
  • ajwalker/runner-shell
  • ggeorgiev/trigger-k8s-wait-ubi
  • doc/clarify-allowed-pull-policies
  • doc-exclude-image
  • helpers/runner_wrapper/api/v0.1.0 protected
  • v17.9.0 protected
  • v17.8.3 protected
  • v17.8.2 protected
  • v17.8.1 protected
  • v17.7.1 protected
  • v17.8.0 protected
  • v17.6.1 protected
  • v17.7.0 protected
  • v17.5.5 protected
  • v17.6.0 protected
  • v17.5.4 protected
  • v17.5.3 protected
  • v16.11.4 protected
  • v17.0.3 protected
  • v17.5.2 protected
  • v17.4.2 protected
  • v17.3.3 protected
  • v17.2.3 protected
  • v17.5.1 protected
40 results

timestamper.go

timestamper.go 6.61 KiB
package timestamper

import (
	"bytes"
	"io"
	"math"
	"strconv"
	"time"
)

const (
	StdoutType StreamType = 'O'
	StderrType StreamType = 'E'

	PartialLineType LineType = '+'
	FullLineType    LineType = ' '

	hextable = "0123456789abcdef"

	// bufSize is the amount of data this implementation will buffer
	// when no newline character is found. It is _not_ the maximum line length
	// any consumer of the logs will receive.
	bufSize = 8 * 1024

	// fracs is the nanosecond length we append
	fracs = 6

	// additional bytes added to the format:
	// - nanosecond separator ('.')
	// - single byte for append flag
	additionalBytes = 2
)

type (
	StreamType byte
	LineType   byte
)

var (
	now = func() time.Time {
		return time.Now().UTC()
	}

	lineEscape = []byte("\n")
)

// Logger implements the standard io.Write interface and adds lightweight
// metadata in the form of:
// <date> <stream id><stream type><append flag><message>
//
// Where:
// - <date> is a RFC3339 Nano formatted date
// - <stream id> is a 2-digit hex encoded user provided stream identifier
// - <stream type> is either 'stdout' or 'stderr'
// - <append flag> is either ' ' (no-op) or '+' (append line to last line)
// - <message> is a user provided message.
//
// This format is intended to be well suited to CI/CD logs, where timed output
// can help determine the duration of executed commands.
//
// A new log line is emitted for each new-line character (\n) found within data
// provided to Write().
//
// A new log line is also emitted for the last carriage return (\r) in calls to
// Write() that don't contain a new-line character. Such lines are often used
// to display progress bars, so having them "flushed" to the underlying stream
// can help with live log viewing.
type Logger struct {
	buf bytes.Buffer
	w   io.Writer

	bufStream []byte
	timeLen   int
	timestamp bool
}

func New(w io.Writer, streamType StreamType, streamNumber uint8, timestamp bool) *Logger {
	l := &Logger{
		w:         w,
		timestamp: timestamp,
	}

	if timestamp {
		l.timeLen = len(time.Now().UTC().Format(time.RFC3339)) + fracs + additionalBytes
	}
	l.bufStream = make([]byte, l.timeLen+4)
	if timestamp {
		l.bufStream[l.timeLen-1] = ' '
	}
	l.bufStream[l.timeLen+0] = hextable[streamNumber>>4]
	l.bufStream[l.timeLen+1] = hextable[streamNumber&0x0f]
	l.bufStream[l.timeLen+2] = byte(streamType)
	l.bufStream[l.timeLen+3] = byte(FullLineType)

	return l
}

func (l *Logger) Write(p []byte) (n int, err error) {
	n, err = l.writeLines(p)
	if err != nil {
		return n, err
	}

	nn, err := l.writeCarriageReturns(p[n:])
	n += nn
	if err != nil {
		return n, err
	}

	nn, err = l.buffer(p[n:])
	n += nn
	return n, err
}

// buffer is used when we have input data that contains no newline character.
//
// l.buf is filled with data until either a newline character appears or
// we exceed bufSize. When we exceed the buffer size, we flush a new line
// and write the buffer to the underlying writer directly. To indicate that
// this has occurred, we then set the append flag for the next line to be
// written.
//
// Because we write the buffer to the underling writer when the bufSize has
// been exceeded, bufSize is not indicative of the maximum line length a
// consumer will receive, it's only used internally so that this implementation
// doesn't need to have an infinite sized buffer.
func (l *Logger) buffer(p []byte) (n int, err error) {
	if len(p) == 0 {
		return 0, nil
	}

	// if we exceed our buffer size, write directly to underlying writer
	// nolint:nestif
	if len(p)+l.buf.Len() > bufSize {
		if l.buf.Len() == 0 {
			if err := l.writeHeader(l.w); err != nil {
				return 0, err
			}
		}
		_, err := l.w.Write(l.buf.Bytes())
		if err != nil {
			return 0, err
		}
		l.buf.Reset()

		// ensure next write is a continuation
		l.bufStream[l.timeLen+3] = byte(PartialLineType)

		nn, err := l.w.Write(p)
		n += nn
		if err != nil {
			return n, err
		}

		_, err = l.w.Write(lineEscape)
		return n, err
	}

	// start new buffer
	if l.buf.Len() == 0 {
		if err := l.writeHeader(&l.buf); err != nil {
			return n, err
		}
	}

	// append to existing buffer
	return l.buf.Write(p)
}

func (l *Logger) writeLines(p []byte) (n int, err error) {
	idx := bytes.IndexByte(p, '\n')
	if idx == -1 {
		return n, err
	}

	if l.buf.Len() > 0 {
		_, err := l.w.Write(l.buf.Bytes())
		if err != nil {
			return 0, err
		}

		l.buf.Reset()

		nn, err := l.w.Write(p[:idx+1])
		n += nn
		if err != nil {
			return n, err
		}
	}

	for {
		idx := bytes.IndexByte(p[n:], '\n')
		if idx == -1 {
			return n, err
		}

		if err := l.writeHeader(l.w); err != nil {
			return n, err
		}

		nn, err := l.w.Write(p[n : n+idx+1])
		n += nn
		if err != nil {
			return n, err
		}
	}
}

func (l *Logger) writeCarriageReturns(p []byte) (n int, err error) {
	idx := bytes.LastIndexByte(p, '\r')
	if idx == -1 {
		return n, err
	}

	if l.buf.Len() > 0 {
		_, err := l.w.Write(l.buf.Bytes())
		if err != nil {
			return 0, err
		}

		l.buf.Reset()
	} else {
		if err := l.writeHeader(l.w); err != nil {
			return n, err
		}
	}

	// ensure next write is a continuation
	l.bufStream[l.timeLen+3] = byte(PartialLineType)

	nn, err := l.w.Write(p[n : n+idx+1])
	n += nn
	if err != nil {
		return n, err
	}

	_, err = l.w.Write(lineEscape)
	return n, err
}

func (l *Logger) writeHeader(w io.Writer) error {
	if l.timestamp {
		t := now()

		// time.RFC3339 doesn't add nanosecond precision, and time.RFC3339Nano doesn't
		// use a fixed length of precision. Whilst we could use a custom format, this
		// is slower, as Go as built-in optimizations for RFC3339. So here we use the
		// non-nano version, and then add nanoseconds to a fixed length. Fixed length
		// is important because it makes the logs easier for both a human and machine
		// to read.
		t.AppendFormat(l.bufStream[:0], time.RFC3339)

		// remove the 'Z'
		l.bufStream = l.bufStream[:l.timeLen-fracs-additionalBytes]
		l.bufStream[len(l.bufStream)-1] = '.' // replace 'Z' for '.'

		// ensure nanoseconds doesn't exceed our fracs precision
		nanos := t.Nanosecond() / int(math.Pow10(9-fracs))

		// add nanoseconds and append leading zeros
		leadingZeros := len(l.bufStream)
		l.bufStream = strconv.AppendInt(l.bufStream, int64(nanos), 10)
		leadingZeros = fracs - (len(l.bufStream) - leadingZeros)
		for i := 0; i < leadingZeros; i++ {
			l.bufStream = append(l.bufStream, '0')
		}

		// add 'Z' back
		l.bufStream = append(l.bufStream, 'Z')

		// expand back to full header size
		l.bufStream = l.bufStream[:l.timeLen+4]
	}
	_, err := w.Write(l.bufStream)

	l.bufStream[l.timeLen+3] = byte(FullLineType)

	return err
}

func (l *Logger) Close() error {
	if l.buf.Len() > 0 {
		l.buf.Write(lineEscape)
		_, err := l.w.Write(l.buf.Bytes())
		return err
	}
	return nil
}