Improve RPC decoding performance with streaming codecs
The current decoding scheme has a dumb loop with a retry. Specifically, in resto's src/client.ml
, the following code is inefficient:
| Some chunk ->
let buffer = Buffer.create 2048 in
let output = Service.output_encoding service in
let rec loop = function
| None ->
on_close ();
Lwt.return_unit
| Some chunk -> (
Buffer.add_string buffer chunk;
let data = Buffer.contents buffer in
log.log ~media output `OK (lazy (Lwt.return chunk))
>>= fun () ->
match media.destruct output data with
| Ok body ->
Buffer.reset buffer;
on_chunk body;
Lwt_stream.get stream >>= loop
| Error _msg -> Lwt_stream.get stream >>= loop )
in
ignore (loop (Some chunk) : unit Lwt.t);
Lwt.return
(`Ok
(Some
(fun () ->
ignore
( Lwt_stream.junk_while (fun _ -> true) stream
: unit Lwt.t );
()))) ) )
TL;DR: it reads a chunk, attempts to decode, if it fails it reads another chunks, adds it to the buffer, and tries again. And then again and again and again.
This code is necessary because the streaming server may split each element of the stream onto multiple different chunks of data. (Although it does guarantee to send separate elements onto separate (sets of) chunks.)
A better approach would be to have a streaming interface when decoding. This means that the media-types in resto should carry a destruct_stream
field to complement the existing destruct
one.
Reading material:
- Jsonm streaming interface (with
Await
) (the solution should not necessarily be "use Jsonm" or "use the same interface as Jsonm", but it's a good point of reference for what can be done.), - Data-encoding's
Binary_stream_reader
, src/lib_rpc_http/media_type.ml
Edited by Raphaël Proust