Commit 39b072c6 authored by Pascal J. Bourguignon's avatar Pascal J. Bourguignon

Removed a gate-signal not-empty. Debugging.

parent 27e272fd
......@@ -83,7 +83,8 @@ with a synchronized queue in the middle.
"bordeaux-threads"
"com.informatimago.clext.closer-weak") ; weak hash-tables are needed for gate.
:components ((:file "gate")
(:file "pipe-stream" :depends-on ("gate")))
(:file "debug")
(:file "pipe-stream" :depends-on ("gate" "debug")))
#+adsf3 :in-order-to #+adsf3 ((test-op (test-op "com.informatimago.clext.pipe-stream.test")))
#+asdf-unicode :encoding #+asdf-unicode :utf-8)
......
;;;; -*- mode:lisp;coding:utf-8 -*-
;;;;**************************************************************************
;;;;FILE: com.informatimago.clext.pipe-stream.test.asd
;;;;LANGUAGE: Common-Lisp
;;;;SYSTEM: Common-Lisp
;;;;USER-INTERFACE: NONE
;;;;DESCRIPTION
;;;;
;;;; ASD file to test the com.informatimago.clext.pipe-stream library.
;;;;
;;;;AUTHORS
;;;; <PJB> Pascal J. Bourguignon <pjb@informatimago.com>
;;;;MODIFICATIONS
;;;; 2015-09-29 <PJB> Created.
;;;;BUGS
;;;;LEGAL
;;;; AGPL3
;;;;
;;;; Copyright Pascal J. Bourguignon 2015 - 2015
;;;;
;;;; This program is free software: you can redistribute it and/or modify
;;;; it under the terms of the GNU Affero General Public License as published by
;;;; the Free Software Foundation, either version 3 of the License, or
;;;; (at your option) any later version.
;;;;
;;;; This program is distributed in the hope that it will be useful,
;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;;;; GNU Affero General Public License for more details.
;;;;
;;;; You should have received a copy of the GNU Affero General Public License
;;;; along with this program. If not, see <http://www.gnu.org/licenses/>.
;;;;**************************************************************************
(asdf:defsystem "com.informatimago.clext.pipe-stream.test"
;; system attributes:
:description "Informatimago Common Lisp Extensions: Pipe-Streams - Tests."
:long-description "
Tests the pipe-streams.
"
:author "Pascal J. Bourguignon <pjb@informatimago.com>"
:maintainer "Pascal J. Bourguignon <pjb@informatimago.com>"
:licence "AGPL3"
;; component attributes:
:version "1.0.0"
:properties ((#:author-email . "pjb@informatimago.com")
(#:date . "Automn 2015")
((#:albert #:output-dir) . "/tmp/documentation/com.informatimago.clext/")
((#:albert #:formats) . ("docbook"))
((#:albert #:docbook #:template) . "book")
((#:albert #:docbook #:bgcolor) . "white")
((#:albert #:docbook #:textcolor) . "black"))
#+asdf-unicode :encoding #+asdf-unicode :utf-8
:depends-on ("com.informatimago.common-lisp.cesarum"
"com.informatimago.clext.pipe-stream")
:components ((:file "pipe-stream-test"))
#+asdf3 :perform #+asdf3 (asdf:test-op
(o s)
(let ((*package* (find-package "COM.INFORMATIMAGO.CLEXT.PIPE-STREAM")))
(uiop:symbol-call "COM.INFORMATIMAGO.CLEXT.PIPE-STREAM.TEST" "TEST/ALL"))))
;;;; THE END ;;;;
(defpackage "COM.INFORMATIMAGO.CLEXT.DEBUG"
(:use "COMMON-LISP"
"BORDEAUX-THREADS")
(:shadow "WITH-LOCK-HELD")
(:export "TR" "WITH-LOCK-HELD"))
(in-package "COM.INFORMATIMAGO.CLEXT.DEBUG")
(defvar *tr-lock* (make-lock "trace"))
(defvar *tr-output* *standard-output*)
(defun tr (fc &rest a)
(bt:with-lock-held (*tr-lock*)
(format *tr-output* "~&~30A: ~?~&" (thread-name (current-thread)) fc a)))
(defmacro with-lock-held ((place) &body body)
(let ((lock (gensym)))
`(let ((,lock ,place))
(tr "will acquire lock ~A" (ccl:lock-name ,lock))
(unwind-protect
(bt:with-lock-held (,lock)
(tr "acquired lock ~A" (ccl:lock-name ,lock))
,@body)
(tr "released lock ~A" (ccl:lock-name ,lock))))))
;;;; THE END ;;;;
......@@ -193,55 +193,24 @@ the gate signal is forgotten.
(make-condition-variable
:name (format nil "~A/gate" (thread-name thread)))))))
#+debug-gate
(progn
(defvar *tr-lock* (make-lock "trace"))
(defvar *tr-output* *standard-output*)
(defun tr (fc &rest a)
(with-lock-held (*tr-lock*)
(format *tr-output* "~&~30A: ~?~&" (thread-name (current-thread)) fc a))))
#-debug-gate
(defmacro tr (&rest ignored)
(declare (ignore ignored))
'nil)
(defun condition-variable-name (x) x)
(defun gate-wait (gate lock)
(tr "(gate-wait ~A enter" (gate-name gate))
(let ((my-var (%get-thread-condition-variable (current-thread))))
(tr " gate-wait ~A will acquire lock ~A" (gate-name gate) (ccl:lock-name (gate-lock gate)))
(with-lock-held ((gate-lock gate))
(tr " gate-wait ~A has lock ~A" (gate-name gate) (ccl:lock-name (gate-lock gate)))
(push my-var (gate-waiting-threads gate))
(tr " gate-wait ~A releases lock ~A" (gate-name gate) (ccl:lock-name lock))
(release-lock lock)
(tr " gate-wait ~A will wait condition ~A" (gate-name gate) (condition-variable-name my-var))
(unwind-protect
(condition-wait my-var (gate-lock gate))
(tr " gate-wait ~A got notified" (gate-name gate))
(tr " gate-wait ~A will acquire lock ~A" (gate-name gate) (ccl:lock-name lock))
(acquire-lock lock)
(tr " gate-wait ~A acquired lock ~A" (gate-name gate) (ccl:lock-name lock)))))
(tr " gate-wait ~A released lock ~A" (gate-name gate) (ccl:lock-name (gate-lock gate)))
(tr " gate-wait ~A exits)" (gate-name gate)))
(acquire-lock lock)))))
(defun gate-signal (gate)
(tr "(gate-signal ~A enter" (gate-name gate))
(tr " gate-signal ~A will acquire lock ~A" (gate-name gate) (ccl:lock-name (gate-lock gate)))
(loop :until (acquire-lock (gate-lock gate) nil))
(unwind-protect
(progn
(tr " gate-signal ~A acquired lock ~A" (gate-name gate) (ccl:lock-name (gate-lock gate)))
;; (mapc (function condition-notify) (gate-waiting-threads gate))
(mapc (lambda (var)
(tr " gate-signal will notify condition ~A" (condition-variable-name var))
(condition-notify var))
(gate-waiting-threads gate))
(mapc (function condition-notify) (gate-waiting-threads gate))
(setf (gate-waiting-threads gate) '()))
(release-lock (gate-lock gate)))
(tr " gate-signal ~A released lock ~A" (gate-name gate) (ccl:lock-name (gate-lock gate)))
(tr " gate-signal ~A exits)" (gate-name gate)))
(release-lock (gate-lock gate))))
;;;; THE END ;;;;
......@@ -327,11 +327,11 @@ publie en 1962 par MIT Press, un des maitres-livres de l'Informatique.
(*debug-io* . ,*debug-io*)
(*query-io* . ,*query-io*)
#+debug-gate (*tr-output* . ,*trace-output*)))
(producer (funcall (or (third (assoc out-kind makers))
(producer (funcall (or (second (assoc out-kind makers))
(error "Invalid out-kind ~S, expected one of ~{~S~^ ~}"
out-kind (mapcar (function first) makers)))
output buffer-size))
(consumer (funcall (or (second (assoc out-kind makers))
(consumer (funcall (or (third (assoc in-kind makers))
(error "Invalid in-kind ~S, expected one of ~{~S~^ ~}"
in-kind (mapcar (function first) makers)))
input buffer-size))
......@@ -354,14 +354,14 @@ publie en 1962 par MIT Press, un des maitres-livres de l'Informatique.
(define-test test/character-io (pipe-kind out-kind in-kind &key debug)
(test/io pipe-kind out-kind in-kind debug 'character "character"
'((:char make-character-input make-character-output)
(:line make-line-input make-line-output )
(:sequence make-string-input make-string-output ))))
'((:char make-character-output make-character-input )
(:line make-line-output make-line-input )
(:sequence make-string-output make-string-input ))))
(define-test test/binary-io (pipe-kind out-kind in-kind &key debug)
(test/io pipe-kind out-kind in-kind debug 'octet "binary"
'((:byte make-binary-input make-binary-output )
(:sequence make-sequence-input make-sequence-output))))
'((:byte make-binary-output make-binary-input )
(:sequence make-sequence-output make-sequence-input ))))
(define-test test/all ()
(loop
......
......@@ -49,29 +49,19 @@
"TRIVIAL-GRAY-STREAMS"
"BORDEAUX-THREADS"
"COM.INFORMATIMAGO.CLEXT.GATE")
#+debug (:shadowing-import-from "COM.INFORMATIMAGO.CLEXT.DEBUG" "WITH-LOCK-HELD")
#+debug (:use "COM.INFORMATIMAGO.CLEXT.DEBUG")
(:export "MAKE-PIPE" "PIPE" "PIPE-INPUT-STREAM" "PIPE-OUTPUT-STREAM" "PIPE-ELEMENT-TYPE"
"REOPEN-PIPE"
"PIPE-CHARACTER-INPUT-STREAM"
"PIPE-CHARACTER-OUTPUT-STREAM"
"PIPE-BINARY-INPUT-STREAM"
"PIPE-BINARY-OUTPUT-STREAM")
(:import-from "COM.INFORMATIMAGO.CLEXT.GATE" "TR")
(:shadow "WITH-LOCK-HELD"))
"PIPE-BINARY-OUTPUT-STREAM"))
(in-package "COM.INFORMATIMAGO.CLEXT.PIPE")
(declaim (declaration stepper))
(defmacro with-lock-held ((place) &body body)
(let ((lock (gensym)))
`(let ((,lock ,place))
(tr "will acquire lock ~A" (ccl:lock-name ,lock))
(unwind-protect
(bt:with-lock-held (,lock)
(tr "acquired lock ~A" (ccl:lock-name ,lock))
,@body)
(tr "released lock ~A" (ccl:lock-name ,lock))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;
;;; Public interface:
......@@ -245,23 +235,14 @@ RETURN: A new PIPE.
(defgeneric %pipe-fullp (pipe)
(:documentation "Whether the PIPE is full."))
(defgeneric %pipe-enqueue-element (pipe element)
(:documentation "
DO: Enqueues the ELEMENT into the PIPE.
PRE: (not (%pipe-fullp pipe))
"))
(defgeneric %pipe-dequeue-element (pipe)
(:documentation "
DO: Dequeues an element from the PIPE.
PRE: (not (%pipe-emptyp pipe))
RETURN: The dequeued element from the PIPE.
"))
(defgeneric %pipe-peek-element (pipe)
(defgeneric %pipe-peek-or-dequeue (pipe peek)
(:documentation "
PRE: (not (%pipe-emptyp pipe))
RETURN: The first element in the PIPE.
PEEK: When true, then peek, else dequeue.
RETURN: RETURNP: whether we have a result;
RESULT: the result of peeking or dequeing;
SIGNAL: whether not-full condition should be signaled.
"))
......@@ -385,10 +366,13 @@ when it's the case, end-of-file is detected upon reading on an empty pipe.")
(defmethod pipe-enqueue-element ((pipe buffered-pipe) element)
(with-lock-held ((lock pipe))
(tr "pipe-enqueue-element: wait until (not (%pipe-fullp pipe))")
(loop :while (%pipe-fullp pipe)
:do (gate-wait (not-full pipe) (lock pipe)))
(tr "pipe-enqueue-element: tail=~S bufsize=~S" (tail pipe) (length (buffer pipe)))
(setf (aref (buffer pipe) (tail pipe)) element)
(mod-incf (tail pipe) (length (buffer pipe))))
(mod-incf (tail pipe) (length (buffer pipe)))
(tr "pipe-enqueue-element: head=~S tail=~S" (head pipe) (tail pipe)))
(gate-signal (not-empty pipe)))
(defmethod pipe-enqueue-sequence ((pipe buffered-pipe) sequence start end)
......@@ -397,9 +381,11 @@ when it's the case, end-of-file is detected upon reading on an empty pipe.")
(loop
:while (< start end)
:do (with-lock-held ((lock pipe))
(tr "pipe-enqueue-sequence: wait until (not (%pipe-fullp pipe))")
(loop :while (%pipe-fullp pipe)
:do (gate-wait (not-full pipe) (lock pipe)))
(when (%pipe-emptyp pipe)
(tr "pipe-enqueue-sequence: pipe is empty")
(setf (head pipe) 0
(tail pipe) 0))
;; [_____head-------tail__________]
......@@ -412,15 +398,14 @@ when it's the case, end-of-file is detected upon reading on an empty pipe.")
(1- (head pipe)))
(+ dsts seqlen)))
(len (- dste dsts)))
(tr "pipe-enqueue-sequence: dsts=~S dste=~S start=~S len=~S buflen=~S" dsts dste start len buflen)
(replace (buffer pipe) sequence :start1 dsts :end1 dste :start2 start)
(incf start len)
(mod-incf (tail pipe) buflen len))
(gate-signal (not-empty pipe)))))
(mod-incf (tail pipe) buflen len)
(tr "pipe-enqueue-sequence: head=~S tail=~S start=~S" (head pipe) (tail pipe) start)))
(gate-signal (not-empty pipe))))
sequence)
;; %pipe-emptyp (= (head pipe) (tail pipe))
;; %pipe-fullp (= (head pipe) (mod-plus (tail pipe) (length (buffer pipe)) 1))
(defmethod pipe-dequeue-sequence ((pipe buffered-pipe) sequence start end)
(assert (<= 0 start end (length sequence)))
(let ((buflen (length (buffer pipe))))
......@@ -452,29 +437,30 @@ when it's the case, end-of-file is detected upon reading on an empty pipe.")
(with-output-to-string (out)
(dolist (chunk (nreverse chunks))
(write-string chunk out)))))
(values (concatenate-chunks)
(loop :named collect
:do (with-lock-held ((lock pipe))
(when (%wait-not-empty-or-closed pipe)
(return-from collect t #|=eof|#))
;; [_____head-------tail__________]
;; [-------tail_______head--------]
;; read what we can:
(let* ((srcs (head pipe))
(srce (if (<= (head pipe) (tail pipe))
(tail pipe)
buflen))
(pos (position element (buffer pipe) :start srcs :end srce)))
(if pos
(progn
(push (subseq (buffer pipe) srcs pos) chunks)
(mod-incf (head pipe) buflen (- (1+ pos) srcs))
(return-from collect nil #|=newline|#))
(progn
(push (subseq (buffer pipe) srcs srce) chunks)
(mod-incf (head pipe) buflen (- srce srcs))))))
(gate-signal (not-full pipe))
:finally (gate-signal (not-full pipe)))))))
(loop :named collect
:do (with-lock-held ((lock pipe))
(when (%wait-not-empty-or-closed pipe)
(return-from collect (values (concatenate-chunks)
t #|=eof|#)))
;; [_____head-------tail__________]
;; [-------tail_______head--------]
;; read what we can:
(let* ((srcs (head pipe))
(srce (if (<= (head pipe) (tail pipe))
(tail pipe)
buflen))
(pos (position element (buffer pipe) :start srcs :end srce)))
(if pos
(progn
(push (subseq (buffer pipe) srcs pos) chunks)
(mod-incf (head pipe) buflen (- (1+ pos) srcs))
(return-from collect (values (concatenate-chunks)
nil #|=newline|#)))
(progn
(push (subseq (buffer pipe) srcs srce) chunks)
(mod-incf (head pipe) buflen (- srce srcs))))))
(gate-signal (not-full pipe))
:finally (gate-signal (not-full pipe))))))
......@@ -676,8 +662,7 @@ when it's the case, end-of-file is detected upon reading on an empty pipe.")
(end (or end (length string)))
(nlp (position #\newline string :start start :end end :from-end t)))
(unless (sunk-pipe-p pipe)
(pipe-enqueue-sequence pipe string start end)
(gate-signal (not-empty pipe)))
(pipe-enqueue-sequence pipe string start end))
(if nlp
(setf (column stream) (- end nlp))
(incf (column stream) (- end start))))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment