Commit fdf984d1 authored by Raphaël Proust's avatar Raphaël Proust Committed by Romain

Stdlib: fixes and doc for Lwt utilities

Lwt_dropbox and Lwt_pipe:
- General documentation improvements
- More consistent behaviour for timeout promise canceling
parent 48099a83
......@@ -49,7 +49,7 @@ let put dropbox elt =
dropbox.data <- Some elt ;
notify_put dropbox )
let peek dropbox = dropbox.data
let peek dropbox = if dropbox.closed then raise Closed else dropbox.data
let close dropbox =
if not dropbox.closed then (
......@@ -59,11 +59,11 @@ let close dropbox =
let wait_put ~timeout dropbox =
match dropbox.put_waiter with
| Some (waiter, _wakener) ->
Lwt.choose [timeout; Lwt.protected waiter]
Lwt.pick [timeout; Lwt.protected waiter]
| None ->
let (waiter, wakener) = Lwt.wait () in
dropbox.put_waiter <- Some (waiter, wakener) ;
Lwt.choose [timeout; Lwt.protected waiter]
Lwt.pick [timeout; Lwt.protected waiter]
let rec take dropbox =
match dropbox.data with
......
......@@ -28,34 +28,46 @@
(** Type of dropbox holding a value of type ['a] *)
type 'a t
(** The exception returned when trying to access a 'closed' dropbox. *)
exception Closed
(** Create an empty dropbox. *)
val create : unit -> 'a t
(** Put an element inside the dropbox. If the dropbox was already
containing an element, the old element is replaced by the new one.
(** [put t e] puts the element [e] inside the dropbox [t]. If the dropbox
already held an element, the old element is discarded and replaced by the
new one.
@raise [Closed] if the dropbox has been closed with [close]. *)
@raise [Close] if [close t] has been called. *)
val put : 'a t -> 'a -> unit
(** Wait until the dropbox contains an element, then returns the elements.
The elements is removed from the dropbox.
(** [take t] is a promise that resolves as soon as an element is held by [t].
The element is removed from [t] when the promise resolves.
@raise [Closed] if the dropbox has been closed with [close] and is empty. *)
If [t] already holds an element when [take t] is called, the promise
resolves immediately. Otherwise, the promise resolves when an element is
[put] there.
@raise [Close] if [close t] has been called. *)
val take : 'a t -> 'a Lwt.t
(** Like [take] except that it returns [None] after [timeout seconds]
if the dropbox is still empty. *)
(** [take_with_timeout timeout t] behaves like [take t] except that it returns
[None] if [timeout] resolves before an element is [put].
Note that [timeout] is canceled (i.e., fails with [Canceled]) if an element
is [put] in time (or if one is already present).
@raise [Close] if [close t] has been called. *)
val take_with_timeout : unit Lwt.t -> 'a t -> 'a option Lwt.t
(** Read the current element of the dropbox without removing it. It
immediately returns [None] if the dropbox is empty. *)
val peek : 'a t -> 'a option
(** [peek t] is [Some e] if [t] holds [e] and [None] if [t] does not hold any
element.
(** The exception returned when trying to access a 'closed' dropbox. *)
exception Closed
@raise [Close] if [close t] has been called. *)
val peek : 'a t -> 'a option
(** Close the dropbox. It terminates all the waiting reader with the
exception [Closed]. All further read or write will also immediately
(** [close t] closes the dropbox [t]. It terminates all the waiting reader with
the exception [Closed]. All further read or write will also immediately
fail with [Closed], except if the dropbox is not empty when
[close] is called. In that case, a single (and last) [take] will
succeed. *)
......
......@@ -147,7 +147,7 @@ let rec pop_with_timeout timeout q =
if q.closed then (Lwt.cancel timeout ; Lwt.fail Closed)
else
let waiter = wait_push q in
Lwt.choose [timeout; Lwt.protected waiter]
Lwt.pick [timeout; Lwt.protected waiter]
>>= fun () -> pop_with_timeout timeout q
else Lwt.return_none
......
......@@ -25,7 +25,7 @@
(** Data queues similar to the [Pipe] module in Jane Street's [Async]
library. They are implemented with [Queue]s, limited in size, and
use lwt primitives for concurrent access. *)
use Lwt primitives for concurrent access. *)
(** Type of queues holding values of type ['a]. *)
type 'a t
......@@ -37,32 +37,35 @@ type 'a t
When no [size] argument is provided, the queue is unbounded. *)
val create : ?size:int * ('a -> int) -> unit -> 'a t
(** [push q v] is a thread that blocks while [q] contains more
(** [push q v] is a promise that blocks while [q] contains more
than [size] elements, then adds [v] at the end of [q]. *)
val push : 'a t -> 'a -> unit Lwt.t
(** [pop q] is a thread that blocks while [q] is empty, then
(** [pop q] is a promise that blocks while [q] is empty, then
removes and returns the first element in [q]. *)
val pop : 'a t -> 'a Lwt.t
(** [pop t q] is a thread that blocks while [q] is empty, then
removes and returns the first element [v] in [q] and
to return [Some v], unless no message could be popped
in [t] seconds, in which case it returns [None].
(** [pop_with_timeout t q] is a promise that blocks while [q] is empty, then
removes the first element [v] in [q] and returns [Some v].
If no message can be popped before [t] resolves, it returns [None].
As concurrent readers are allowed, [None] does not
necessarily mean that no value has been pushed. *)
necessarily mean that no value has been pushed.
[t] is canceled (i.e., it fails with [Canceled]) if an element is returned.
*)
val pop_with_timeout : unit Lwt.t -> 'a t -> 'a option Lwt.t
(** [pop_all q] is a thread that blocks while [q] is empty, then
removes and returns all the element in [q] (in the order they
(** [pop_all q] is a promise that blocks while [q] is empty, then
removes and returns all the elements in [q] (in the order in which they
were inserted). *)
val pop_all : 'a t -> 'a list Lwt.t
(** [pop_all_now q] returns all the element in [q] (in the order they
were inserted), or [[]] if [q] is empty. *)
(** [pop_all_now q] removes and returns all the element in [q] (in the order in
which they were inserted). If [q] is empty, [[]] is returned. *)
val pop_all_now : 'a t -> 'a list
(** [peek] is like [pop] except it does not removes the first
(** [peek q] returns the same value as [pop q] but does not remove the first
element. *)
val peek : 'a t -> 'a Lwt.t
......@@ -70,32 +73,36 @@ val peek : 'a t -> 'a Lwt.t
or [[]] if empty. *)
val peek_all : 'a t -> 'a list
(** [values_available] is like [peek] but it ignores the value
returned. *)
(** [values_available q] is a promise that blocks while [q] is empty. *)
val values_available : 'a t -> unit Lwt.t
(** [push_now q v] adds [v] at the ends of [q] immediately and returns
[false] if [q] is currently full, [true] otherwise. *)
(** [push_now q v] either
- adds [v] at the ends of [q] immediately and returns [true], or
- if [q] is full, returns [false]. *)
val push_now : 'a t -> 'a -> bool
exception Full
(** [push_now q v] adds [v] at the ends of [q] immediately or
raise [Full] if [q] is currently full. *)
(** [push_now q v] either
- adds [v] at the ends of [q] immediately, or
- if [q] is full, raises [Full].
@raise [Full] if [q] does not have enough space to hold [v]. *)
val push_now_exn : 'a t -> 'a -> unit
(** [safe_push_now q v] may adds [v] at the ends of [q]. It does
nothing if the queue is fulled or closed. *)
(** [safe_push_now q v] may or may not add [v] at the ends of [q]. *)
val safe_push_now : 'a t -> 'a -> unit
(** [pop_now q] maybe removes and returns the first element in [q] if
(** [pop_now q] may remove and return the first element in [q] if
[q] contains at least one element. *)
val pop_now : 'a t -> 'a option
exception Empty
(** [pop_now_exn q] removes and returns the first element in [q] if
[q] contains at least one element, or raise [Empty] otherwise. *)
[q] contains at least one element, or raise [Empty] otherwise.
@raise [Empty] if [q] holds no elements. *)
val pop_now_exn : 'a t -> 'a
(** [length q] is the number of elements in [q]. *)
......@@ -104,7 +111,7 @@ val length : 'a t -> int
(** [is_empty q] is [true] if [q] is empty, [false] otherwise. *)
val is_empty : 'a t -> bool
(** [empty q] returns when [q] becomes empty. *)
(** [empty q] is a promise that resolves when [q] becomes empty. *)
val empty : 'a t -> unit Lwt.t
(** [iter q ~f] pops all elements of [q] and applies [f] on them. *)
......@@ -112,14 +119,14 @@ val iter : 'a t -> f:('a -> unit Lwt.t) -> unit Lwt.t
exception Closed
(** [close q] the write end of [q]:
(** [close q] the write-end of [q]:
* Future write attempts will fail with [Closed].
* If there are reads blocked, they will unblock and fail with [Closed].
* Future read attempts will drain the data until there is no data left.
Thus, after a pipe has been closed, reads never block.
Close is idempotent.
The [close] function is idempotent.
*)
val close : 'a t -> unit
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment