Commit b4e4fb8f authored by Pascal J. Bourguignon's avatar Pascal J. Bourguignon

Added pipe-stream.

parent dd9653b7
;;;; -*- mode:lisp;coding:utf-8 -*-
;;;;**************************************************************************
;;;;FILE: com.informatimago.clext.pipe-stream.asd
;;;;LANGUAGE: Common-Lisp
;;;;SYSTEM: Common-Lisp
;;;;USER-INTERFACE: NONE
;;;;DESCRIPTION
;;;;
;;;; ASD file to load the com.informatimago.clext.pipe-stream library.
;;;;
;;;;AUTHORS
;;;; <PJB> Pascal J. Bourguignon <pjb@informatimago.com>
;;;;MODIFICATIONS
;;;; 2015-09-12 <PJB> Created this .asd file.
;;;;BUGS
;;;;LEGAL
;;;; AGPL3
;;;;
;;;; Copyright Pascal J. Bourguignon 2015 - 2015
;;;;
;;;; This program is free software: you can redistribute it and/or modify
;;;; it under the terms of the GNU Affero General Public License as published by
;;;; the Free Software Foundation, either version 3 of the License, or
;;;; (at your option) any later version.
;;;;
;;;; This program is distributed in the hope that it will be useful,
;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;;;; GNU Affero General Public License for more details.
;;;;
;;;; You should have received a copy of the GNU Affero General Public License
;;;; along with this program. If not, see http://www.gnu.org/licenses/
;;;;**************************************************************************
#+mocl
(asdf:defsystem "com.informatimago.clext.pipe-stream"
;; system attributes:
:description "Dummy Informatimago Common Lisp Extensions: Pipe-Streams."
:long-description "
This system would use TRIVIAL-GRAY-STREAMS, which is not available on MOCL.
"
:author "Pascal J. Bourguignon <pjb@informatimago.com>"
:maintainer "Pascal J. Bourguignon <pjb@informatimago.com>"
:licence "AGPL3"
;; component attributes:
:version "1.0.4"
:properties ((#:author-email . "pjb@informatimago.com")
(#:date . "Summer 2015")
((#:albert #:output-dir) . "/tmp/documentation/com.informatimago.clext/")
((#:albert #:formats) . ("docbook"))
((#:albert #:docbook #:template) . "book")
((#:albert #:docbook #:bgcolor) . "white")
((#:albert #:docbook #:textcolor) . "black"))
#+asdf-unicode :encoding #+asdf-unicode :utf-8
:depends-on ()
:components ())
#-mocl
(asdf:defsystem "com.informatimago.clext.pipe-stream"
;; system attributes:
:description "Informatimago Common Lisp Extensions: Pipe-Streams."
:long-description "
This system provides PIPE-STREAMS, a pair of input and output stream
with a synchronized queue in the middle.
"
:author "Pascal J. Bourguignon <pjb@informatimago.com>"
:maintainer "Pascal J. Bourguignon <pjb@informatimago.com>"
:licence "AGPL3"
;; component attributes:
:version "1.0.0"
:properties ((#:author-email . "pjb@informatimago.com")
(#:date . "Summer 2015")
((#:albert #:output-dir) . "/tmp/documentation/com.informatimago.clext/")
((#:albert #:formats) . ("docbook"))
((#:albert #:docbook #:template) . "book")
((#:albert #:docbook #:bgcolor) . "white")
((#:albert #:docbook #:textcolor) . "black"))
:depends-on ("trivial-gray-streams"
"bordeaux-threads")
:components ((:file "pipe-stream"))
#+adsf3 :in-order-to #+adsf3 ((test-op (test-op "com.informatimago.clext.pipe-stream.test")))
#+asdf-unicode :encoding #+asdf-unicode :utf-8)
;;;; THE END ;;;;
;;;; -*- mode:lisp;coding:utf-8 -*-
;;;;**************************************************************************
;;;;FILE: pipe-stream-test.lisp
;;;;LANGUAGE: Common-Lisp
;;;;SYSTEM: Common-Lisp
;;;;USER-INTERFACE: NONE
;;;;DESCRIPTION
;;;;
;;;; Tests the pipe streams.
;;;;
;;;;AUTHORS
;;;; <PJB> Pascal J. Bourguignon <pjb@informatimago.com>
;;;;MODIFICATIONS
;;;; 2015-09-13 <PJB> Created.
;;;;BUGS
;;;;LEGAL
;;;; AGPL3
;;;;
;;;; Copyright Pascal J. Bourguignon 2015 - 2015
;;;;
;;;; This program is free software: you can redistribute it and/or modify
;;;; it under the terms of the GNU Affero General Public License as published by
;;;; the Free Software Foundation, either version 3 of the License, or
;;;; (at your option) any later version.
;;;;
;;;; This program is distributed in the hope that it will be useful,
;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;;;; GNU Affero General Public License for more details.
;;;;
;;;; You should have received a copy of the GNU Affero General Public License
;;;; along with this program. If not, see <http://www.gnu.org/licenses/>.
;;;;**************************************************************************
(defpackage "COM.INFORMATIMAGO.CLEXT.PIPE.TEST"
(:use "COMMON-LISP"
"BORDEAUX-THREADS"
"COM.INFORMATIMAGO.COMMON-LISP.CESARUM.SIMPLE-TEST"
"COM.INFORMATIMAGO.CLEXT.PIPE")
(:export "TEST/ALL"))
(in-package "COM.INFORMATIMAGO.CLEXT.PIPE.TEST")
#-(and)
(:export "MAKE-PIPE" "PIPE" "PIPE-INPUT-STREAM" "PIPE-OUTPUT-STREAM" "PIPE-ELEMENT-TYPE"
"PIPE-CHARACTER-INPUT-STREAM"
"PIPE-CHARACTER-OUTPUT-STREAM"
"PIPE-BINARY-INPUT-STREAM"
"PIPE-BINARY-OUTPUT-STREAM")
(deftype octet () '(unsigned-byte 8))
(define-test test/character-io (pipe-kind out-kind in-kind)
(let* ((buffer-size (ecase pipe-kind
(:queued nil)
(:buffered 133)))
(pipe (make-pipe :element-type 'character
:buffer-size buffer-size))
(output (pipe-output-stream pipe))
(input (pipe-input-stream pipe))
(producer (make-thread
(ecase out-kind
(:char (make-character-output output buffer-size))
(:line (make-line-output output buffer-size))
(:sequence (make-string-output output buffer-size)))
:named "test/character-io/producer"))
(consumer (make-thread
(ecase in-kind
(:char (make-character-input input buffer-size))
(:line (make-line-input input buffer-size))
(:sequence (make-string-input input buffer-size)))
:named "test/character-io/consumer")))
(join-thread producer)
(join-thread consomer)))
(define-test test/binary-io (pipe-kind out-kind in-kind)
(let* ((buffer-size (ecase pipe-kind
(:queued nil)
(:buffered 133)))
(pipe (make-pipe :element-type 'octet
:buffer-size buffer-size))
(output (pipe-output-stream pipe))
(input (pipe-input-stream pipe))
(producer (make-thread
(ecase out-kind
(:byte (make-binary-output output buffer-size))
(:sequence (make-sequence-output output buffer-size)))
:named "test/binary-io/producer"))
(consumer (make-thread
(ecase in-kind
(:byte (make-binary-input input buffer-size))
(:sequence (make-sequence-input input buffer-size)))
:named "test/binary-io/consumer")))
(join-thread producer)
(join-thread consomer)))
(define-test test/all ()
(loop
:for pipe :in '(:queued :buffered)
:do (loop
:for out :in '(:char :line :sequence)
:do (loop
:for in :in '(:char :line :sequence)
:do (test/character-io pipe out in))))
(loop
:for pipe :in '(:queued :buffered)
:do (loop
:for out :in '(:byte :sequence)
:do (loop
:for in :in '(:byte :sequence)
:do (test/binary-io pipe out in)))))
;;;; THE END ;;;;
;;;; -*- mode:lisp;coding:utf-8 -*-
;;;;**************************************************************************
;;;;FILE: pipe-stream.lisp
;;;;LANGUAGE: Common-Lisp
;;;;SYSTEM: Common-Lisp
;;;;USER-INTERFACE: NONE
;;;;DESCRIPTION
;;;;
;;;; Implements a pipe stream using Gray streams and bordeaux-threads.
;;;;
;;;; The data written to the pipe-stream is queued (if a maximum
;;;; queue-size is specified for the stream, then the writing
;;;; thread may block if the buffer is full).
;;;;
;;;; The data queued can be read from the pipe-stream. If the
;;;; queue is empty, then the reading stream may block (unless it
;;;; used listen, read-char-no-hang, etc).
;;;;
;;;; When the stream is closed, one can still read from it until
;;;; the EOF is reached.
;;;;
;;;;AUTHORS
;;;; <PJB> Pascal J. Bourguignon <pjb@informatimago.com>
;;;;MODIFICATIONS
;;;; 2015-09-12 <PJB> Created.
;;;;BUGS
;;;;LEGAL
;;;; AGPL3
;;;;
;;;; Copyright Pascal J. Bourguignon 2015 - 2015
;;;;
;;;; This program is free software: you can redistribute it and/or modify
;;;; it under the terms of the GNU Affero General Public License as published by
;;;; the Free Software Foundation, either version 3 of the License, or
;;;; (at your option) any later version.
;;;;
;;;; This program is distributed in the hope that it will be useful,
;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;;;; GNU Affero General Public License for more details.
;;;;
;;;; You should have received a copy of the GNU Affero General Public License
;;;; along with this program. If not, see <http://www.gnu.org/licenses/>.
;;;;**************************************************************************
(defpackage "COM.INFORMATIMAGO.CLEXT.PIPE"
(:use "COMMON-LISP"
"TRIVIAL-GRAY-STREAMS"
"BORDEAUX-THREADS")
(:export "MAKE-PIPE" "PIPE" "PIPE-INPUT-STREAM" "PIPE-OUTPUT-STREAM" "PIPE-ELEMENT-TYPE"
"PIPE-CHARACTER-INPUT-STREAM"
"PIPE-CHARACTER-OUTPUT-STREAM"
"PIPE-BINARY-INPUT-STREAM"
"PIPE-BINARY-OUTPUT-STREAM"))
(in-package "COM.INFORMATIMAGO.CLEXT.PIPE")
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;
;;; Public interface:
;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defgeneric pipe-element-type (pipe)
(:documentation "RETURN: the pipe ELEMENT-TYPE."))
(defgeneric pipe-input-stream (pipe)
(:documentation "RETURN: the pipe input stream."))
(defgeneric pipe-output-stream (pipe)
(:documentation "RETURN: the pipe output stream."))
(defclass pipe ()
((element-type :reader pipe-element-type :initarg :element-type)
(input-stream :reader pipe-input-stream)
(output-stream :reader pipe-output-stream))
;; Data is written to the pipe-output-stream and then enqueued into the pipe.
;; Data is dequeued from the pipe, and then read from the pipe-input-stream.
(:documentation "A PIPE is a synchronized queue accessed
from an input stream and an output stream."))
(defmethod initialize-instance ((pipe pipe) &key &allow-other-keys)
(if (eql (class-of pipe) (find-class 'pipe))
(error "~S is an abstract class, use ~S to create pipes."
'pipe 'make-pipe)
(call-next-method)))
#-(and)
(define-condition pipe-error (error)
((pipe :initarg :pipe :reader pipe-error-pipe)
(sequence :initarg :sequence :initform nil :reader pipe-error-sequence)
(start :initarg :start :initform nil :reader pipe-error-start)
(end :initarg :end :initform nil :reader pipe-error-end)
(format-control :initarg :format-control :initform "" :reader pipe-error-format-control)
(format-arguments :initarg :format-arguments :initform nil :reader pipe-error-format-arguments))
(:report (lambda (condition stream)
(format stream "~?"
(pipe-error-format-control condition)
(pipe-error-format-arguments condition)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;
;;; Private implementation:
;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defclass generic-pipe (pipe)
((lock :reader lock :initform (bt:make-lock "pipe"))
(not-empty :reader not-empty :initform (bt:make-condition-variable :name "pipe/not-empty"))
(head :accessor head :initarg :head)
(tail :accessor tail :initarg :tail)))
(defmethod initialize-instance ((pipe generic-pipe) &key &allow-other-keys)
(if (eql (class-of pipe) (find-class 'generic-pipe))
(error "~S is an abstract class, use ~S to create pipes."
'generic-pipe 'make-pipe)
(call-next-method)))
(defclass buffered-pipe (generic-pipe)
((not-full :reader not-full :initform (bt:make-condition-variable :name "pipe/not-full"))
(buffer :reader buffer :initarg :buffer))
;; Otherwise buffer is a vector of element-type, and head
;; and tail are indices in that vector. In that case, not-full
;; is a condition.
;; New elements are enqueued on tail; old elements are dequeued
;; from head.
(:documentation "A pipe with a fixed-size circular buffer."))
(defclass queued-pipe (generic-pipe)
()
;; When buffer is NIL then head is a list of cells, with
;; tail refering the last cons in that list. In this case,
;; not-full is NIL.
;; New elements are enqueued on tail; old elements are dequeued
;; from head.
(:documentation "A pipe with a variable length queue of blocks."))
(defclass pipe-stream ()
((pipe :initarg :pipe :reader pipe-stream-pipe)))
(defclass pipe-character-input-stream (pipe-stream fundamental-character-input-stream)
((column :initform 0 :accessor column)))
(defclass pipe-character-output-stream (pipe-stream fundamental-character-output-stream)
((column :initform 0 :accessor column)))
(defclass pipe-binary-input-stream (pipe-stream fundamental-binary-input-stream)
())
(defclass pipe-binary-output-stream (pipe-stream fundamental-binary-output-stream)
())
(defun make-pipe (&key (element-type 'character) buffer-size)
"
RETURN: A new PIPE.
"
(let ((pipe (if buffer-size
(make-instance 'buffered-pipe :element-type element-type
:buffer (make-array (1+ buffer-size) :element-type element-type)
:head 0 :tail 0)
(make-instance 'queued-pipe :element-type element-type
:head nil :tail nil))))
(multiple-value-bind (input-class output-class)
(if (subtypep element-type 'character)
(values 'pipe-character-input-stream 'pipe-character-output-stream)
(values 'pipe-binary-input-stream 'pipe-binary-output-stream))
(setf (slot-value pipe 'input-stream) (make-instance input-class :pipe pipe :element-type element-type)
(slot-value pipe 'output-stream) (make-instance output-class :pipe pipe :element-type element-type)))
pipe))
(defgeneric %pipe-emptyp (pipe)
(:documentation "Whether the PIPE is empty."))
(defgeneric %pipe-fullp (pipe)
(:documentation "Whether the PIPE is full."))
(defgeneric %pipe-enqueue-element (pipe element)
(:documentation "
DO: Enqueues the ELEMENT into the PIPE.
PRE: (not (%pipe-fullp pipe))
"))
(defgeneric %pipe-dequeue-element (pipe)
(:documentation "
DO: Dequeues an element from the PIPE.
PRE: (not (%pipe-emptyp pipe))
RETURN: The dequeued element from the PIPE.
"))
(defgeneric %pipe-peek-element (pipe)
(:documentation "
PRE: (not (%pipe-emptyp pipe))
RETURN: The first element in the PIPE.
"))
(defgeneric pipe-enqueue-element (pipe element)
(:documentation "
Enqueues the ELEMENT into the PIPE.
Blocks if the pipe is full.
"))
(defgeneric pipe-dequeue-element (pipe)
(:documentation "
Dequeues and returns the ELEMENT from the PIPE.
If the pipe is empty,
then returns :EOF if the output stream is closed
or else blocks.
"))
(defgeneric pipe-peek-element (pipe)
(:documentation "
Returns the first ELEMENT from the PIPE,
or NIL if the queue is empty,
or :EOF if the output-stream is closed.
"))
(defgeneric pipe-enqueue-sequence (pipe sequence start end)
(:documentation "
If (- end start) is more than the buffer-size then signal an error
else if there's enough space in the buffer, for (- end start) elements,
then enqueues the subseq of SEQUENCE between START and END,
else blocks until there's enough space.
"))
(defgeneric pipe-dequeue-sequence (pipe sequence start end)
(:documentation "
If (- end start) is more than the buffer-size then signal an error
else if there's more than (- end start) elements in the buffer,
then dequeues them into the subseq of SEQUENCE between START and END,
else blocks until there's enough data available.
"))
(defgeneric pipe-dequeue-until-element (pipe element)
(:documentation "
Dequeues elements from the buffer until an element equal to element is found.
Returns a sequence containing all the dequeued elements.
"))
(defgeneric sunk-pipe-p (pipe)
(:documentation "Whether the reading stream has been closed;
when it's the case, no more data is written to the pipe.")
(:method ((pipe pipe))
(not (open-stream-p (pipe-input-stream pipe)))))
(defgeneric closed-pipe-p (pipe)
(:documentation "Whether the writing stream has been closed;
when it's the case, end-of-file is detected upon reading on an empty pipe.")
(:method ((pipe pipe))
(not (open-stream-p (pipe-output-stream pipe)))))
;;; for circular buffers:
(defun mod-plus (value modulo &rest arguments)
(mod (apply (function +) value arguments) modulo))
(defun mod-minus (value modulo &rest arguments)
(mod (apply (function -) value arguments) modulo))
(define-modify-macro mod-incf (modulo &optional (increment 1)) mod-plus)
(define-modify-macro mod-decf (modulo &optional (decrement 1)) mod-minus)
(defmethod %pipe-emptyp ((pipe buffered-pipe))
(= (head pipe) (tail pipe)))
(defmethod %pipe-fullp ((pipe buffered-pipe))
(= (head pipe) (mod-incf (tail pipe) (length (buffer pipe)))))
;;; for queue lists, we enqueue blocks made of a start index and a sequence.
(declaim (inline make-block block-start block-sequence block-empty-p block-full-p
block-pop-char block-peek-char block-push-char))
(defun make-block (buffer) (cons 0 (copy-seq buffer)))
(defun block-start (blk) (car blk))
(defun (setf block-start) (new-value blk) (setf (car blk) new-value))
(defun block-sequence (blk) (cdr blk))
(defun block-empty-p (blk) (>= (block-start blk) (length (block-sequence blk))))
(defun block-full-p (blk) (zerop (block-start blk)))
(defun block-pop-char (blk)
(assert (not (block-empty-p blk)))
(prog1 (aref (block-sequence blk) (block-start blk))
(incf (block-start blk))))
(defun block-peek-char (blk)
(assert (not (block-empty-p blk)))
(aref (block-sequence blk) (block-start blk)))
(defun block-push-char (blk ch)
(assert (not (block-full-p blk)))
(decf (block-start blk))
(setf (aref (block-sequence blk) (block-start blk)) ch))
(defmethod %pipe-emptyp ((pipe queued-pipe))
(null (head pipe)))
(defmethod %pipe-fullp ((pipe queued-pipe))
nil)
(defun %wait-not-empty-or-closed (pipe)
(loop :while (%pipe-emptyp pipe)
:do (if (closed-pipe-p pipe)
(return :eof)
(condition-wait (not-empty pipe) (lock pipe)))
:finally (return nil)))
(defmethod pipe-enqueue-element ((pipe buffered-pipe) element)
(with-lock-held ((lock pipe))
(loop :while (%pipe-fullp pipe)
:do (condition-wait (not-full pipe) (lock pipe)))
(setf (aref (buffer pipe) (tail pipe)) element)
(mod-incf (tail pipe) (length (buffer pipe)))
(condition-notify (not-empty pipe))))
(defmethod pipe-enqueue-sequence ((pipe buffered-pipe) sequence start end)
(assert (<= 0 start end (length sequence)))
(let ((buflen (length (buffer pipe))))
(with-lock-held ((lock pipe))
(loop
:while (< start end)
:do (loop :while (%pipe-fullp pipe)
:do (condition-wait (not-full pipe) (lock pipe)))
;; [_____head-------tail__________]
;; [-------tail_______head--------]
;; write what we can:
(let* ((seqlen (- end start))
(dsts (tail pipe))
(dste (min (if (< (head pipe) (tail pipe))
buflen
(1- (head pipe)))
(+ dsts seqlen)))
(len (- dste dsts)))
(replace (buffer pipe) sequence :start1 dsts :end1 dste :start2 start)
(incf start len)
(mod-incf (tail pipe) buflen len)
(condition-notify (not-empty pipe))))))
sequence)
(defmethod pipe-dequeue-sequence ((pipe buffered-pipe) sequence start end)
(assert (<= 0 start end (length sequence)))
(let ((buflen (length (buffer pipe))))
(with-lock-held ((lock pipe))
(loop
:while (< start end)
:do (when (%wait-not-empty-or-closed pipe)
(return (values sequence start)))
;; [_____head-------tail__________]
;; [-------tail_______head--------]
;; read what we can:
(let* ((seqlen (- end start))
(srcs (head pipe))
(srce (min (if (< (head pipe) (tail pipe))
(tail pipe)
buflen)
(+ srcs seqlen)))
(len (- srce srcs)))
(replace sequence (buffer pipe) :start2 srcs :end2 srce :start1 start)
(incf start len)
(mod-incf (head pipe) buflen len)
(condition-notify (not-full pipe)))
:finally (return (values sequence start))))))
(defmethod pipe-dequeue-until-element ((pipe buffered-pipe) element)
(let ((buflen (length (buffer pipe)))
(chunks '()))
(flet ((concatenate-chunks ()
(with-output-to-string (out)
(dolist (chunk (nreverse chunks))
(write-string chunk out)))))
(with-lock-held ((lock pipe))
(loop
(when (%wait-not-empty-or-closed pipe)
(return (values (concatenate-chunks) nil)))
;; [_____head-------tail__________]
;; [-------tail_______head--------]
;; read what we can:
(let* ((srcs (head pipe))
(srce (if (< (head pipe) (tail pipe))
(tail pipe)
buflen))
(pos (position element (buffer pipe) :start srcs :end srce)))
(if pos
(progn
(push (subseq (buffer pipe) srcs pos) chunks)
(mod-incf (head pipe) buflen (- (1+ pos) srcs))
(condition-notify (not-full pipe))
(return (values (concatenate-chunks) t)))
(progn
(push (subseq (buffer pipe) srcs srce) chunks)
(mod-incf (head pipe) buflen (- srce srcs))
(condition-notify (not-full pipe))))))))))
(defmethod pipe-enqueue-element ((pipe queued-pipe) element)
(pipe-enqueue-sequence pipe (vector element) 0 1))
(defmethod pipe-enqueue-sequence ((pipe queued-pipe) sequence start end)
(let ((blkl (list (make-block (subseq sequence start end)))))
(with-lock-held ((lock pipe))
(if (tail pipe)
(setf (cdr (tail pipe)) blkl
(tail pipe) (cdr (tail pipe)))
(setf (head pipe)
(setf (tail pipe) blkl)))
(condition-notify (not-empty pipe)))))
(defmethod pipe-dequeue-sequence ((pipe queued-pipe) sequence start end)
(assert (<= 0 start end (length sequence)))
(with-lock-held ((lock pipe))
(loop
:while (< start end)
:do (when (%wait-not-empty-or-closed pipe)
(return (values sequence start)))
;; read what we can:
(let ((blk (car (head pipe))))
(if (block-empty-p blk)
(if (eql (head pipe) (tail pipe))
(setf (head pipe) nil
(tail pipe) nil)
(pop (head pipe)))
(let* ((seqlen (- end start))
(srcs (block-start blk))
(srce (min (+ seqlen srcs)
(length (block-sequence blk))))
(len (- srce srcs)))
(replace sequence (block-sequence blk)
:start1 start :start2 (block-start blk) :end2 srce)
(incf start len)
(incf (block-start blk) len))))
:finally (return (values sequence start)))))
(defmethod pipe-dequeue-until-element ((pipe queued-pipe) element)
(let ((chunks '()))
(flet ((concatenate-chunks ()
(with-output-to-string (out)
(dolist (chunk (nreverse chunks))
(write-string chunk out)))))
(with-lock-held ((lock pipe))
(loop
(when (%wait-not-empty-or-closed pipe)
(return (values (concatenate-chunks) nil)))
;; read what we can:
(let ((blk (car (head pipe))))
(if (block-empty-p blk)
(if (eql (head pipe) (tail pipe))
(setf (head pipe) nil
(tail pipe) nil)
(pop (head pipe)))
(let* ((srcs (block-start blk))
(srce (length (block-sequence blk)))
(pos (position element (block-sequence blk) :start srcs :end srce)))
(if pos
(progn (push (subseq (block-sequence blk) (block-start blk) pos) chunks)
(setf (block-start blk) (1+ pos))
(return (values (concatenate-chunks) nil)))
(progn (push (subseq (block-sequence blk) (block-start blk)) chunks)
(setf (block-start blk) srce)))))))))))
;;; Peek or dequeue element.
(defmethod %pipe-peek-or-dequeue ((pipe buffered-pipe) peek)
(values t (prog1 (aref (buffer pipe) (head pipe))
(unless peek
(mod-incf (head pipe) (length (buffer pipe)))
(condition-notify (not-full pipe))))))
(defmethod %pipe-peek-or-dequeue ((pipe queued-pipe) peek)
(let ((blk (car (head pipe))))
(if (block-empty-p blk)
(progn
(if (eql (head pipe) (tail pipe))
(setf (head pipe) nil
(tail pipe) nil)
(pop (head pipe)))
(values nil))
(values t (if peek
(block-peek-char blk)
(block-pop-char blk))))))
(defun %peek-or-dequeue (pipe peek no-hang)
(with-lock-held ((lock pipe))
(loop
(if (%pipe-emptyp pipe)
(if (closed-pipe-p pipe)
(return :eof)
(if no-hang
(return nil)
(condition-wait (not-empty pipe) (lock pipe))))
(multiple-value-bind (returnp result) (%pipe-peek-or-dequeue pipe peek)
(when returnp
(return result)))))))
(defmethod pipe-dequeue-element ((pipe actual-pipe))
(%peek-or-dequeue pipe nil nil))