Commit 4687a829 authored by Luciano Joublanc (DA)'s avatar Luciano Joublanc (DA)

Fix GRPC transport & derive paths from package names.

parent 31655f44
Pipeline #89514389 failed with stages
in 22 minutes and 10 seconds
......@@ -9,12 +9,16 @@ import fs2.Stream
import cats.effect.{ConcurrentEffect, ContextShift, Timer}
import org.http4s.client.Client
import org.http4s.headers.`Content-Type`
import org.http4s.MediaType
import org.http4s.{MediaType, ContentCoding}
import org.http4s.util.CaseInsensitiveString
import scala.language.postfixOps
import java.net.URI
class Http4sClientService[F[_]: ConcurrentEffect: ContextShift: Timer](
val channel: Client[F]
val channel: Client[F],
val baseUri: URI,
val encoding: ContentCoding = ContentCoding.identity
) extends Service[F] {
type Rpc = Rpc.type
......@@ -24,76 +28,147 @@ class Http4sClientService[F[_]: ConcurrentEffect: ContextShift: Timer](
import org.http4s._
import cats.instances.function._
import cats.syntax.profunctor._
import java.net.URI
implicit def nil: Case0.Aux[Rpc.type, HNil] = at { HNil }
implicit def unary[K <: Symbol, I: Codec, O: Codec](
implicit label: Witness.Aux[K]
): Case0.Aux[Rpc.type, FieldType[K, URI => I => F[O]]] = at {
field[K] {
bidirectionalStream[K, I, O]
.apply()
.rmap(
_.dimap(Stream.emit[F, I])(_.head.compile.lastOrError)
)
}
): Case0.Aux[Rpc.type, URI => FieldType[K, I => F[O]]] = at {
bidirectionalStream[K, I, O]
.apply()
.rmap { f =>
field[K] {
f.dimap(Stream.emit[F, I])(_.head.compile.lastOrError)
}
}
}
implicit def serverStream[K <: Symbol, I: Codec, O: Codec](
implicit label: Witness.Aux[K]
): Case0.Aux[Rpc.type, FieldType[K, URI => I => Stream[F, O]]] = at {
field[K] {
bidirectionalStream[K, I, O]
.apply()
.rmap(
_.lmap(Stream.emit[F, I])
)
}
): Case0.Aux[Rpc.type, URI => FieldType[K, I => Stream[F, O]]] = at {
bidirectionalStream[K, I, O]
.apply()
.rmap { f =>
field[K] {
f.lmap(Stream.emit[F, I])
}
}
}
implicit def clientStream[K <: Symbol, I: Codec, O: Codec](
implicit label: Witness.Aux[K]
): Case0.Aux[Rpc.type, FieldType[K, URI => Stream[F, I] => F[O]]] = at {
field[K] {
bidirectionalStream[K, I, O]
.apply()
.rmap(
_.rmap(_.head.compile.lastOrError)
)
}
): Case0.Aux[Rpc.type, URI => FieldType[K, Stream[F, I] => F[O]]] = at {
bidirectionalStream[K, I, O]
.apply()
.rmap { f =>
field[K] {
f.rmap(_.head.compile.lastOrError)
}
}
}
implicit def bidirectionalStream[K <: Symbol, I: Codec, O: Codec](
implicit label: Witness.Aux[K]
): Case0.Aux[Rpc.type, FieldType[K, URI => Stream[F, I] => Stream[F, O]]] =
at {
field[K] { baseUri => (is: Stream[F, I]) =>
): Case0.Aux[Rpc.type, URI => FieldType[K, Stream[F, I] => Stream[F, O]]] =
at { svcUri =>
field[K] { (is: Stream[F, I]) =>
val S = Stream.monadErrorInstance[F]
import S.{raiseError, fromEither, fromTry, whenA}
import S.{fromEither, fromTry}
for {
uri <- fromEither(
Uri.fromString(baseUri resolve ("/" + label.value.name) toString)
Uri.fromString(
baseUri resolve List(baseUri.getPath, svcUri, label.value.name)
.mkString("/") toString
)
)
appGrpc <- fromEither(MediaType.parse("application/grpc+proto"))
i <- is
bitsOut <- fromTry(
lengthPrefixedMessage(encoding, Codec[I]).encode(i).toTry
)
bytesOut = bitsOut.bytes
req = Request(
method = Method.POST,
uri = uri,
httpVersion = HttpVersion.`HTTP/2.0`,
body = fromTry(Codec[I] encode i toTry)
.flatMap(Stream emits _.bytes.toSeq)
).withContentType(
`Content-Type`(new MediaType("application", "grpc"))
)
headers = Headers of (
Header("TE", "trailers"),
Header("grpc-timeout", "5S"),
Header("grpc-encoding", encoding.renderString),
Header("grpc-accept-encoding", encoding.renderString),
Header("content-length", bytesOut.length.toString)
),
body = Stream.emits(bytesOut.toSeq).covary[F]
).withContentType(`Content-Type`(appGrpc))
rep <- channel stream req
_ <- whenA(rep.status != Status.Ok)(
raiseError(new Exception(rep.status.toString))
_ <- assertHeaderErrors(rep)
bitsIn <- Stream eval rep.body.compile.toChunk map (BitVector apply _.toArray)
repCoding <- fromEither(getResponseEncoding(rep))
o <- fromTry(
lengthPrefixedMessage(repCoding, Codec[O]) decodeValue bitsIn toTry
)
bits <- Stream eval rep.body.compile.toChunk map (BitVector apply _.toArray)
o <- fromTry(Codec[O] decodeValue bits toTry)
} yield o
}
}
/** Parses the headers to determine the content coding.
* returns `identity` if header is missing.
*/
protected def getResponseEncoding(
response: Response[F]
): Either[Throwable, ContentCoding] =
response.headers
.get(CaseInsensitiveString("grpc-encoding"))
.fold[Either[Throwable, ContentCoding]](Right(ContentCoding.identity))(
ContentCoding parse _.value
)
/** Side effecting stream that raises any errors in http response */
protected def assertHeaderErrors(rep: Response[F]): Stream[F, Unit] = {
import cats.instances.list._
import cats.instances.option._
import cats.syntax.alternative._
import cats.syntax.semigroup._
val S = Stream.monadErrorInstance[F]
import S.whenA
def parseErrorHeaders(h: Headers): Stream[F, Unit] =
Stream raiseError {
new Exception(
List(
h.get(CaseInsensitiveString("grpc-status")),
h.get(CaseInsensitiveString("grpc-message"))
).unite.map(_.toString) mkString "\n"
)
}
for {
trailerHeaders <- Stream eval rep.trailerHeaders
headers = rep.headers combine trailerHeaders
_ <- whenA(rep.status != Status.Ok)(parseErrorHeaders(headers))
_ <- whenA(
headers
.get(CaseInsensitiveString("grpc-status"))
.exists(_.value.toInt != 0)
)(parseErrorHeaders(headers))
} yield ()
}
}
/** Used to encode the GRPC payload.
* @see [[https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md Length-Prefixed-Message]] term.
*/
protected def lengthPrefixedMessage[A](
encoding: ContentCoding,
message: Codec[A]
): Codec[A] = {
import scodec.Err
import scodec.codecs.{uint32, variableSizeBytesLong, fail}
import scodec.codecs.literals.constantIntCodec
if (encoding matches ContentCoding.identity)
0x00 ~> variableSizeBytesLong(uint32, message)
else
fail(Err(s"lengthPrefixedMessage: currently $encoding not supported"))
}
}
package grpc
import shapeless.{HList, Poly0, LabelledGeneric}
import shapeless.ops.hlist.FillWith
import shapeless.{HList, Poly0, LabelledGeneric, Nat, Poly2}
import shapeless.ops.hlist.{FillWith, Mapped, Length, Fill, ZipWith}
import fs2.Stream
import java.net.URI
/** A protobuf service builder. Uses meta programming techniques to generate
* service RPC definitions based off the requested type signature. The service
* RPC's are represented by function literals inside a case class, of type:
*
* 1. `URI => I => F[O]` unary request.
* 2. `URI => I => Stream[F, O]` server-streaming request.
* 3. `URI => Stream[F, I] => F[O]` client-streaming request.
* 4. `URI => Stream[F, I] => Stream[F, O]` bidirectional streaming.
* 1. `I => F[O]` unary request.
* 2. `I => Stream[F, O]` server-streaming request.
* 3. `Stream[F, I] => F[O]` client-streaming request.
* 4. `Stream[F, I] => Stream[F, O]` bidirectional streaming.
*
* Which correcpond to the
* [[https://www.grpc.io/docs/guides/concepts/#rpc-life-cycle respective gRPC
......@@ -23,23 +25,36 @@ trait Service[F[_]] { self =>
type Rpc <: Poly0
def baseUri: URI
/** Create an instance of a protobuf service, tied to the specified `baseUri`.
*
* @tparam G Typically a `scala.Product` corresponding of the rpc calls of the
* protobuf service
*
* @param baseUri The name of each member is appended to this to resolve the
* target URI.
*/
def of[G[_[_]]] = new OfSyntax[G] {}
trait OfSyntax[G[_[_]]] {
def apply[L <: HList]()( //TODO: get rid off the pesky `()`
implicit labels: LabelledGeneric.Aux[G[F], L],
mkRpc: FillWith[self.Rpc, L]
): Stream[F, G[F]] =
Stream[F, G[F]](labels.from(mkRpc()))
/* This is pretty convoluted. It turns the case class into a list of I =>
* O, then prepends => URI to them, derives the URI from HasFqN, and
* partial applies it n.b. `Rpc` has methods of the form URI => I => O.
*/
def apply[L <: HList, M <: HList, C <: HList, N <: Nat]()( //TODO: get rid off the pesky `()`
implicit svcName: Service.HasFqn[G[F]],
labels: LabelledGeneric.Aux[G[F], L],
mapped: Mapped.Aux[L, ({ type λ[a] = URI => a })#λ, M],
mkRpc: FillWith[self.Rpc, M],
length: Length.Aux[L, N],
fqns: Fill.Aux[N, URI, C],
zipWith: ZipWith.Aux[M, C, Service.ApplyUrl.type, L]
): Stream[F, G[F]] = {
val uris = fqns(new URI(svcName.fqn))
val _ = mapped // supress warnings
val _ = length
Stream[F, G[F]](labels.from(zipWith(mkRpc(), uris)))
}
}
}
object Service {
......@@ -48,8 +63,9 @@ object Service {
import scala.jdk.CollectionConverters.MutableSeqHasAsJava
/** Creates a service builder backed by http4s + OkHttp client */
def apply[F[_]: ConcurrentEffect: ContextShift: Timer]
: Stream[F, Http4sClientService[F]] = //FIXME should return a `Client` rather than `Http4sClient` but that breaks.
def apply[F[_]: ConcurrentEffect: ContextShift: Timer](
baseUri: URI
): Stream[F, Http4sClientService[F]] = //FIXME should return a `Client` rather than `Http4sClient` but that breaks.
for {
blocker <- Stream resource cats.effect.Blocker[F]
builder = org.http4s.client.okhttp.OkHttpBuilder[F](
......@@ -61,5 +77,29 @@ object Service {
blocker
)
cli <- builder.stream
} yield new Http4sClientService(cli)
} yield new Http4sClientService(cli, baseUri)
/** Partially applies URI to fn of the form URI => I => O
* @return `I => O`
*/
protected object ApplyUrl extends Poly2 {
implicit def g[A]: Case.Aux[URI => A, java.net.URI, A] =
at { (g, uri) =>
g(uri)
}
}
/** Derive the fully qualified name of a type `A`.
* This is used to infer the path of the GRPC service from the packge name.
*/
protected trait HasFqn[A] {
def fqn: String
}
protected object HasFqn {
import scala.reflect.runtime.universe._
implicit def any[A: WeakTypeTag]: HasFqn[A] = new HasFqn[A] {
def fqn: String = weakTypeOf[A].typeSymbol.fullName
}
}
}
......@@ -473,13 +473,13 @@ trait Parser extends RegexParsers {
"(" ~> "stream".? ~ messageType <~ ")"
} ~ (("{" ~> { option | emptyStatement }.* <~ "}") | ";") map {
case name ~ (None ~ tpeIn) ~ (None ~ tpeOut) ~ _ =>
param"$name: java.net.URI => $tpeIn => F[$tpeOut]"
param"$name: $tpeIn => F[$tpeOut]"
case name ~ (None ~ tpeIn) ~ (Some(_) ~ tpeOut) ~ _ =>
param"$name: java.net.URI => $tpeIn => fs2.Stream[F, $tpeOut]"
param"$name: $tpeIn => fs2.Stream[F, $tpeOut]"
case name ~ (Some(_) ~ tpeIn) ~ (None ~ tpeOut) ~ _ =>
param"$name: java.net.URI => fs2.Stream[F, $tpeIn] => F[$tpeOut]"
param"$name: fs2.Stream[F, $tpeIn] => F[$tpeOut]"
case name ~ (Some(_) ~ tpeIn) ~ (Some(_) ~ tpeOut) ~ _ =>
param"$name: java.net.URI => fs2.Stream[F, $tpeIn] => fs2.Stream[F, $tpeOut]"
param"$name: fs2.Stream[F, $tpeIn] => fs2.Stream[F, $tpeOut]"
}
/** Proto file
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment