qio.c - vx32 - Local 9vx git repository for patches.
(HTM) git clone git://r-36.net/vx32
(DIR) Log
(DIR) Files
(DIR) Refs
---
qio.c (23670B)
---
1 #include "u.h"
2 #include "lib.h"
3 #include "mem.h"
4 #include "dat.h"
5 #include "fns.h"
6 #include "error.h"
7
8 static ulong padblockcnt;
9 static ulong concatblockcnt;
10 static ulong pullupblockcnt;
11 static ulong copyblockcnt;
12 static ulong consumecnt;
13 static ulong producecnt;
14 static ulong qcopycnt;
15
16 static int debugging;
17
18 #define QDEBUG if(0)
19
20 /*
21 * IO queues
22 */
23
24 struct Queue
25 {
26 Lock lk;
27
28 Block* bfirst; /* buffer */
29 Block* blast;
30
31 int len; /* bytes allocated to queue */
32 int dlen; /* data bytes in queue */
33 int limit; /* max bytes in queue */
34 int inilim; /* initial limit */
35 int state;
36 int noblock; /* true if writes return immediately when q full */
37 int eof; /* number of eofs read by user */
38
39 void (*kick)(void*); /* restart output */
40 void (*bypass)(void*, Block*); /* bypass queue altogether */
41 void* arg; /* argument to kick */
42
43 QLock rlock; /* mutex for reading processes */
44 Rendez rr; /* process waiting to read */
45 QLock wlock; /* mutex for writing processes */
46 Rendez wr; /* process waiting to write */
47
48 char err[ERRMAX];
49 };
50
51 enum
52 {
53 Maxatomic = 64*1024,
54 };
55
56 uint qiomaxatomic = Maxatomic;
57
58 void
59 ixsummary(void)
60 {
61 debugging ^= 1;
62 iallocsummary();
63 print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
64 padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
65 print("consume %lud, produce %lud, qcopy %lud\n",
66 consumecnt, producecnt, qcopycnt);
67 }
68
69 /*
70 * free a list of blocks
71 */
72 void
73 freeblist(Block *b)
74 {
75 Block *next;
76
77 for(; b != 0; b = next){
78 next = b->next;
79 if(b->ref == 1)
80 b->next = nil;
81 freeb(b);
82 }
83 }
84
85 /*
86 * pad a block to the front (or the back if size is negative)
87 */
88 Block*
89 padblock(Block *bp, int size)
90 {
91 int n;
92 Block *nbp;
93
94 QDEBUG checkb(bp, "padblock 1");
95 if(size >= 0){
96 if(bp->rp - bp->base >= size){
97 bp->rp -= size;
98 return bp;
99 }
100
101 if(bp->next)
102 panic("padblock %#p", getcallerpc(&bp));
103 n = BLEN(bp);
104 padblockcnt++;
105 nbp = allocb(size+n);
106 nbp->rp += size;
107 nbp->wp = nbp->rp;
108 memmove(nbp->wp, bp->rp, n);
109 nbp->wp += n;
110 freeb(bp);
111 nbp->rp -= size;
112 } else {
113 size = -size;
114
115 if(bp->next)
116 panic("padblock %#p", getcallerpc(&bp));
117
118 if(bp->lim - bp->wp >= size)
119 return bp;
120
121 n = BLEN(bp);
122 padblockcnt++;
123 nbp = allocb(size+n);
124 memmove(nbp->wp, bp->rp, n);
125 nbp->wp += n;
126 freeb(bp);
127 }
128 QDEBUG checkb(nbp, "padblock 1");
129 return nbp;
130 }
131
132 /*
133 * return count of bytes in a string of blocks
134 */
135 int
136 blocklen(Block *bp)
137 {
138 int len;
139
140 len = 0;
141 while(bp) {
142 len += BLEN(bp);
143 bp = bp->next;
144 }
145 return len;
146 }
147
148 /*
149 * return count of space in blocks
150 */
151 int
152 blockalloclen(Block *bp)
153 {
154 int len;
155
156 len = 0;
157 while(bp) {
158 len += BALLOC(bp);
159 bp = bp->next;
160 }
161 return len;
162 }
163
164 /*
165 * copy the string of blocks into
166 * a single block and free the string
167 */
168 Block*
169 concatblock(Block *bp)
170 {
171 int len;
172 Block *nb, *f;
173
174 if(bp->next == 0)
175 return bp;
176
177 nb = allocb(blocklen(bp));
178 for(f = bp; f; f = f->next) {
179 len = BLEN(f);
180 memmove(nb->wp, f->rp, len);
181 nb->wp += len;
182 }
183 concatblockcnt += BLEN(nb);
184 freeblist(bp);
185 QDEBUG checkb(nb, "concatblock 1");
186 return nb;
187 }
188
189 /*
190 * make sure the first block has at least n bytes
191 */
192 Block*
193 pullupblock(Block *bp, int n)
194 {
195 int i;
196 Block *nbp;
197
198 /*
199 * this should almost always be true, it's
200 * just to avoid every caller checking.
201 */
202 if(BLEN(bp) >= n)
203 return bp;
204
205 /*
206 * if not enough room in the first block,
207 * add another to the front of the list.
208 */
209 if(bp->lim - bp->rp < n){
210 nbp = allocb(n);
211 nbp->next = bp;
212 bp = nbp;
213 }
214
215 /*
216 * copy bytes from the trailing blocks into the first
217 */
218 n -= BLEN(bp);
219 while((nbp = bp->next)){
220 i = BLEN(nbp);
221 if(i > n) {
222 memmove(bp->wp, nbp->rp, n);
223 pullupblockcnt++;
224 bp->wp += n;
225 nbp->rp += n;
226 QDEBUG checkb(bp, "pullupblock 1");
227 return bp;
228 } else {
229 /* shouldn't happen but why crash if it does */
230 if(i < 0){
231 print("pullup negative length packet, called from %#p\n",
232 getcallerpc(&bp));
233 i = 0;
234 }
235 memmove(bp->wp, nbp->rp, i);
236 pullupblockcnt++;
237 bp->wp += i;
238 bp->next = nbp->next;
239 nbp->next = 0;
240 freeb(nbp);
241 n -= i;
242 if(n == 0){
243 QDEBUG checkb(bp, "pullupblock 2");
244 return bp;
245 }
246 }
247 }
248 freeb(bp);
249 return 0;
250 }
251
252 /*
253 * make sure the first block has at least n bytes
254 */
255 Block*
256 pullupqueue(Queue *q, int n)
257 {
258 Block *b;
259
260 if(BLEN(q->bfirst) >= n)
261 return q->bfirst;
262 q->bfirst = pullupblock(q->bfirst, n);
263 for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
264 ;
265 q->blast = b;
266 return q->bfirst;
267 }
268
269 /*
270 * trim to len bytes starting at offset
271 */
272 Block *
273 trimblock(Block *bp, int offset, int len)
274 {
275 ulong l;
276 Block *nb, *startb;
277
278 QDEBUG checkb(bp, "trimblock 1");
279 if(blocklen(bp) < offset+len) {
280 freeblist(bp);
281 return nil;
282 }
283
284 while((l = BLEN(bp)) < offset) {
285 offset -= l;
286 nb = bp->next;
287 bp->next = nil;
288 freeb(bp);
289 bp = nb;
290 }
291
292 startb = bp;
293 bp->rp += offset;
294
295 while((l = BLEN(bp)) < len) {
296 len -= l;
297 bp = bp->next;
298 }
299
300 bp->wp -= (BLEN(bp) - len);
301
302 if(bp->next) {
303 freeblist(bp->next);
304 bp->next = nil;
305 }
306
307 return startb;
308 }
309
310 /*
311 * copy 'count' bytes into a new block
312 */
313 Block*
314 copyblock(Block *bp, int count)
315 {
316 int l;
317 Block *nbp;
318
319 QDEBUG checkb(bp, "copyblock 0");
320 nbp = allocb(count);
321 for(; count > 0 && bp != 0; bp = bp->next){
322 l = BLEN(bp);
323 if(l > count)
324 l = count;
325 memmove(nbp->wp, bp->rp, l);
326 nbp->wp += l;
327 count -= l;
328 }
329 if(count > 0){
330 memset(nbp->wp, 0, count);
331 nbp->wp += count;
332 }
333 copyblockcnt++;
334 QDEBUG checkb(nbp, "copyblock 1");
335
336 return nbp;
337 }
338
339 Block*
340 adjustblock(Block* bp, int len)
341 {
342 int n;
343 Block *nbp;
344
345 if(len < 0){
346 freeb(bp);
347 return nil;
348 }
349
350 if(bp->rp+len > bp->lim){
351 nbp = copyblock(bp, len);
352 freeblist(bp);
353 QDEBUG checkb(nbp, "adjustblock 1");
354
355 return nbp;
356 }
357
358 n = BLEN(bp);
359 if(len > n)
360 memset(bp->wp, 0, len-n);
361 bp->wp = bp->rp+len;
362 QDEBUG checkb(bp, "adjustblock 2");
363
364 return bp;
365 }
366
367
368 /*
369 * throw away up to count bytes from a
370 * list of blocks. Return count of bytes
371 * thrown away.
372 */
373 int
374 pullblock(Block **bph, int count)
375 {
376 Block *bp;
377 int n, bytes;
378
379 bytes = 0;
380 if(bph == nil)
381 return 0;
382
383 while(*bph != nil && count != 0) {
384 bp = *bph;
385 n = BLEN(bp);
386 if(count < n)
387 n = count;
388 bytes += n;
389 count -= n;
390 bp->rp += n;
391 QDEBUG checkb(bp, "pullblock ");
392 if(BLEN(bp) == 0) {
393 *bph = bp->next;
394 bp->next = nil;
395 freeb(bp);
396 }
397 }
398 return bytes;
399 }
400
401 /*
402 * get next block from a queue, return null if nothing there
403 */
404 Block*
405 qget(Queue *q)
406 {
407 int dowakeup;
408 Block *b;
409
410 /* sync with qwrite */
411 ilock(&q->lk);
412
413 b = q->bfirst;
414 if(b == nil){
415 q->state |= Qstarve;
416 iunlock(&q->lk);
417 return nil;
418 }
419 q->bfirst = b->next;
420 b->next = 0;
421 q->len -= BALLOC(b);
422 q->dlen -= BLEN(b);
423 QDEBUG checkb(b, "qget");
424
425 /* if writer flow controlled, restart */
426 if((q->state & Qflow) && q->len < q->limit/2){
427 q->state &= ~Qflow;
428 dowakeup = 1;
429 } else
430 dowakeup = 0;
431
432 iunlock(&q->lk);
433
434 if(dowakeup)
435 wakeup(&q->wr);
436
437 return b;
438 }
439
440 /*
441 * throw away the next 'len' bytes in the queue
442 */
443 int
444 qdiscard(Queue *q, int len)
445 {
446 Block *b;
447 int dowakeup, n, sofar;
448
449 ilock(&q->lk);
450 for(sofar = 0; sofar < len; sofar += n){
451 b = q->bfirst;
452 if(b == nil)
453 break;
454 QDEBUG checkb(b, "qdiscard");
455 n = BLEN(b);
456 if(n <= len - sofar){
457 q->bfirst = b->next;
458 b->next = 0;
459 q->len -= BALLOC(b);
460 q->dlen -= BLEN(b);
461 freeb(b);
462 } else {
463 n = len - sofar;
464 b->rp += n;
465 q->dlen -= n;
466 }
467 }
468
469 /*
470 * if writer flow controlled, restart
471 *
472 * This used to be
473 * q->len < q->limit/2
474 * but it slows down tcp too much for certain write sizes.
475 * I really don't understand it completely. It may be
476 * due to the queue draining so fast that the transmission
477 * stalls waiting for the app to produce more data. - presotto
478 */
479 if((q->state & Qflow) && q->len < q->limit){
480 q->state &= ~Qflow;
481 dowakeup = 1;
482 } else
483 dowakeup = 0;
484
485 iunlock(&q->lk);
486
487 if(dowakeup)
488 wakeup(&q->wr);
489
490 return sofar;
491 }
492
493 /*
494 * Interrupt level copy out of a queue, return # bytes copied.
495 */
496 int
497 qconsume(Queue *q, void *vp, int len)
498 {
499 Block *b;
500 int n, dowakeup;
501 uchar *p = vp;
502 Block *tofree = nil;
503
504 /* sync with qwrite */
505 ilock(&q->lk);
506
507 for(;;) {
508 b = q->bfirst;
509 if(b == 0){
510 q->state |= Qstarve;
511 iunlock(&q->lk);
512 return -1;
513 }
514 QDEBUG checkb(b, "qconsume 1");
515
516 n = BLEN(b);
517 if(n > 0)
518 break;
519 q->bfirst = b->next;
520 q->len -= BALLOC(b);
521
522 /* remember to free this */
523 b->next = tofree;
524 tofree = b;
525 };
526
527 if(n < len)
528 len = n;
529 memmove(p, b->rp, len);
530 consumecnt += n;
531 b->rp += len;
532 q->dlen -= len;
533
534 /* discard the block if we're done with it */
535 if((q->state & Qmsg) || len == n){
536 q->bfirst = b->next;
537 b->next = 0;
538 q->len -= BALLOC(b);
539 q->dlen -= BLEN(b);
540
541 /* remember to free this */
542 b->next = tofree;
543 tofree = b;
544 }
545
546 /* if writer flow controlled, restart */
547 if((q->state & Qflow) && q->len < q->limit/2){
548 q->state &= ~Qflow;
549 dowakeup = 1;
550 } else
551 dowakeup = 0;
552
553 iunlock(&q->lk);
554
555 if(dowakeup)
556 wakeup(&q->wr);
557
558 if(tofree != nil)
559 freeblist(tofree);
560
561 return len;
562 }
563
564 int
565 qpass(Queue *q, Block *b)
566 {
567 int dlen, len, dowakeup;
568
569 /* sync with qread */
570 dowakeup = 0;
571 ilock(&q->lk);
572 if(q->len >= q->limit){
573 freeblist(b);
574 iunlock(&q->lk);
575 return -1;
576 }
577 if(q->state & Qclosed){
578 len = BALLOC(b);
579 freeblist(b);
580 iunlock(&q->lk);
581 return len;
582 }
583
584 /* add buffer to queue */
585 if(q->bfirst)
586 q->blast->next = b;
587 else
588 q->bfirst = b;
589 len = BALLOC(b);
590 dlen = BLEN(b);
591 QDEBUG checkb(b, "qpass");
592 while(b->next){
593 b = b->next;
594 QDEBUG checkb(b, "qpass");
595 len += BALLOC(b);
596 dlen += BLEN(b);
597 }
598 q->blast = b;
599 q->len += len;
600 q->dlen += dlen;
601
602 if(q->len >= q->limit/2)
603 q->state |= Qflow;
604
605 if(q->state & Qstarve){
606 q->state &= ~Qstarve;
607 dowakeup = 1;
608 }
609 iunlock(&q->lk);
610
611 if(dowakeup)
612 wakeup(&q->rr);
613
614 return len;
615 }
616
617 int
618 qpassnolim(Queue *q, Block *b)
619 {
620 int dlen, len, dowakeup;
621
622 /* sync with qread */
623 dowakeup = 0;
624 ilock(&q->lk);
625
626 if(q->state & Qclosed){
627 freeblist(b);
628 iunlock(&q->lk);
629 return BALLOC(b);
630 }
631
632 /* add buffer to queue */
633 if(q->bfirst)
634 q->blast->next = b;
635 else
636 q->bfirst = b;
637 len = BALLOC(b);
638 dlen = BLEN(b);
639 QDEBUG checkb(b, "qpass");
640 while(b->next){
641 b = b->next;
642 QDEBUG checkb(b, "qpass");
643 len += BALLOC(b);
644 dlen += BLEN(b);
645 }
646 q->blast = b;
647 q->len += len;
648 q->dlen += dlen;
649
650 if(q->len >= q->limit/2)
651 q->state |= Qflow;
652
653 if(q->state & Qstarve){
654 q->state &= ~Qstarve;
655 dowakeup = 1;
656 }
657 iunlock(&q->lk);
658
659 if(dowakeup)
660 wakeup(&q->rr);
661
662 return len;
663 }
664
665 /*
666 * if the allocated space is way out of line with the used
667 * space, reallocate to a smaller block
668 */
669 Block*
670 packblock(Block *bp)
671 {
672 Block **l, *nbp;
673 int n;
674
675 for(l = &bp; *l; l = &(*l)->next){
676 nbp = *l;
677 n = BLEN(nbp);
678 if((n<<2) < BALLOC(nbp)){
679 *l = allocb(n);
680 memmove((*l)->wp, nbp->rp, n);
681 (*l)->wp += n;
682 (*l)->next = nbp->next;
683 freeb(nbp);
684 }
685 }
686
687 return bp;
688 }
689
690 int
691 qproduce(Queue *q, void *vp, int len)
692 {
693 Block *b;
694 int dowakeup;
695 uchar *p = vp;
696
697 /* sync with qread */
698 dowakeup = 0;
699 ilock(&q->lk);
700
701 /* no waiting receivers, room in buffer? */
702 if(q->len >= q->limit){
703 q->state |= Qflow;
704 iunlock(&q->lk);
705 return -1;
706 }
707
708 /* save in buffer */
709 b = iallocb(len);
710 if(b == 0){
711 iunlock(&q->lk);
712 return 0;
713 }
714 memmove(b->wp, p, len);
715 producecnt += len;
716 b->wp += len;
717 if(q->bfirst)
718 q->blast->next = b;
719 else
720 q->bfirst = b;
721 q->blast = b;
722 /* b->next = 0; done by iallocb() */
723 q->len += BALLOC(b);
724 q->dlen += BLEN(b);
725 QDEBUG checkb(b, "qproduce");
726
727 if(q->state & Qstarve){
728 q->state &= ~Qstarve;
729 dowakeup = 1;
730 }
731
732 if(q->len >= q->limit)
733 q->state |= Qflow;
734 iunlock(&q->lk);
735
736 if(dowakeup)
737 wakeup(&q->rr);
738
739 return len;
740 }
741
742 /*
743 * copy from offset in the queue
744 */
745 Block*
746 qcopy(Queue *q, int len, ulong offset)
747 {
748 int sofar;
749 int n;
750 Block *b, *nb;
751 uchar *p;
752
753 nb = allocb(len);
754
755 ilock(&q->lk);
756
757 /* go to offset */
758 b = q->bfirst;
759 for(sofar = 0; ; sofar += n){
760 if(b == nil){
761 iunlock(&q->lk);
762 return nb;
763 }
764 n = BLEN(b);
765 if(sofar + n > offset){
766 p = b->rp + offset - sofar;
767 n -= offset - sofar;
768 break;
769 }
770 QDEBUG checkb(b, "qcopy");
771 b = b->next;
772 }
773
774 /* copy bytes from there */
775 for(sofar = 0; sofar < len;){
776 if(n > len - sofar)
777 n = len - sofar;
778 memmove(nb->wp, p, n);
779 qcopycnt += n;
780 sofar += n;
781 nb->wp += n;
782 b = b->next;
783 if(b == nil)
784 break;
785 n = BLEN(b);
786 p = b->rp;
787 }
788 iunlock(&q->lk);
789
790 return nb;
791 }
792
793 /*
794 * called by non-interrupt code
795 */
796 Queue*
797 qopen(int limit, int msg, void (*kick)(void*), void *arg)
798 {
799 Queue *q;
800
801 q = malloc(sizeof(Queue));
802 if(q == 0)
803 return 0;
804
805 q->limit = q->inilim = limit;
806 q->kick = kick;
807 q->arg = arg;
808 q->state = msg;
809
810 q->state |= Qstarve;
811 q->eof = 0;
812 q->noblock = 0;
813
814 return q;
815 }
816
817 /* open a queue to be bypassed */
818 Queue*
819 qbypass(void (*bypass)(void*, Block*), void *arg)
820 {
821 Queue *q;
822
823 q = malloc(sizeof(Queue));
824 if(q == 0)
825 return 0;
826
827 q->limit = 0;
828 q->arg = arg;
829 q->bypass = bypass;
830 q->state = 0;
831
832 return q;
833 }
834
835 static int
836 notempty(void *a)
837 {
838 Queue *q = a;
839
840 return (q->state & Qclosed) || q->bfirst != 0;
841 }
842
843 /*
844 * wait for the queue to be non-empty or closed.
845 * called with q ilocked.
846 */
847 static int
848 qwait(Queue *q)
849 {
850 /* wait for data */
851 for(;;){
852 if(q->bfirst != nil)
853 break;
854
855 if(q->state & Qclosed){
856 if(++q->eof > 3)
857 return -1;
858 if(*q->err && strcmp(q->err, Ehungup) != 0)
859 return -1;
860 return 0;
861 }
862
863 q->state |= Qstarve; /* flag requesting producer to wake me */
864 iunlock(&q->lk);
865 sleep(&q->rr, notempty, q);
866 ilock(&q->lk);
867 }
868 return 1;
869 }
870
871 /*
872 * add a block list to a queue
873 */
874 void
875 qaddlist(Queue *q, Block *b)
876 {
877 /* queue the block */
878 if(q->bfirst)
879 q->blast->next = b;
880 else
881 q->bfirst = b;
882 q->len += blockalloclen(b);
883 q->dlen += blocklen(b);
884 while(b->next)
885 b = b->next;
886 q->blast = b;
887 }
888
889 /*
890 * called with q ilocked
891 */
892 Block*
893 qremove(Queue *q)
894 {
895 Block *b;
896
897 b = q->bfirst;
898 if(b == nil)
899 return nil;
900 q->bfirst = b->next;
901 b->next = nil;
902 q->dlen -= BLEN(b);
903 q->len -= BALLOC(b);
904 QDEBUG checkb(b, "qremove");
905 return b;
906 }
907
908 /*
909 * copy the contents of a string of blocks into
910 * memory. emptied blocks are freed. return
911 * pointer to first unconsumed block.
912 */
913 Block*
914 bl2mem(uchar *p, Block *b, int n)
915 {
916 int i;
917 Block *next;
918
919 for(; b != nil; b = next){
920 i = BLEN(b);
921 if(i > n){
922 memmove(p, b->rp, n);
923 b->rp += n;
924 return b;
925 }
926 memmove(p, b->rp, i);
927 n -= i;
928 p += i;
929 b->rp += i;
930 next = b->next;
931 freeb(b);
932 }
933 return nil;
934 }
935
936 /*
937 * copy the contents of memory into a string of blocks.
938 * return nil on error.
939 */
940 Block*
941 mem2bl(uchar *p, int len)
942 {
943 int n;
944 Block *b, *first, **l;
945
946 first = nil;
947 l = &first;
948 if(waserror()){
949 freeblist(first);
950 nexterror();
951 }
952 do {
953 n = len;
954 if(n > Maxatomic)
955 n = Maxatomic;
956
957 *l = b = allocb(n);
958 memmove(b->wp, p, n);
959 b->wp += n;
960 p += n;
961 len -= n;
962 l = &b->next;
963 } while(len > 0);
964 poperror();
965
966 return first;
967 }
968
969 /*
970 * put a block back to the front of the queue
971 * called with q ilocked
972 */
973 void
974 qputback(Queue *q, Block *b)
975 {
976 b->next = q->bfirst;
977 if(q->bfirst == nil)
978 q->blast = b;
979 q->bfirst = b;
980 q->len += BALLOC(b);
981 q->dlen += BLEN(b);
982 }
983
984 /*
985 * flow control, get producer going again
986 * called with q ilocked
987 */
988 static void
989 qwakeup_iunlock(Queue *q)
990 {
991 int dowakeup = 0;
992
993 /* if writer flow controlled, restart */
994 if((q->state & Qflow) && q->len < q->limit/2){
995 q->state &= ~Qflow;
996 dowakeup = 1;
997 }
998
999 iunlock(&q->lk);
1000
1001 /* wakeup flow controlled writers */
1002 if(dowakeup){
1003 if(q->kick)
1004 q->kick(q->arg);
1005 wakeup(&q->wr);
1006 }
1007 }
1008
1009 /*
1010 * get next block from a queue (up to a limit)
1011 */
1012 Block*
1013 qbread(Queue *q, int len)
1014 {
1015 Block *b, *nb;
1016 int n;
1017
1018 qlock(&q->rlock);
1019 if(waserror()){
1020 qunlock(&q->rlock);
1021 nexterror();
1022 }
1023
1024 ilock(&q->lk);
1025 switch(qwait(q)){
1026 case 0:
1027 /* queue closed */
1028 iunlock(&q->lk);
1029 qunlock(&q->rlock);
1030 poperror();
1031 return nil;
1032 case -1:
1033 /* multiple reads on a closed queue */
1034 iunlock(&q->lk);
1035 error(q->err);
1036 }
1037
1038 /* if we get here, there's at least one block in the queue */
1039 b = qremove(q);
1040 n = BLEN(b);
1041
1042 /* split block if it's too big and this is not a message queue */
1043 nb = b;
1044 if(n > len){
1045 if((q->state&Qmsg) == 0){
1046 n -= len;
1047 b = allocb(n);
1048 memmove(b->wp, nb->rp+len, n);
1049 b->wp += n;
1050 qputback(q, b);
1051 }
1052 nb->wp = nb->rp + len;
1053 }
1054
1055 /* restart producer */
1056 qwakeup_iunlock(q);
1057
1058 poperror();
1059 qunlock(&q->rlock);
1060 return nb;
1061 }
1062
1063 /*
1064 * read a queue. if no data is queued, post a Block
1065 * and wait on its Rendez.
1066 */
1067 long
1068 qread(Queue *q, void *vp, int len)
1069 {
1070 Block *b, *first, **l;
1071 int m, n;
1072
1073 qlock(&q->rlock);
1074 if(waserror()){
1075 qunlock(&q->rlock);
1076 nexterror();
1077 }
1078
1079 ilock(&q->lk);
1080 again:
1081 switch(qwait(q)){
1082 case 0:
1083 /* queue closed */
1084 iunlock(&q->lk);
1085 qunlock(&q->rlock);
1086 poperror();
1087 return 0;
1088 case -1:
1089 /* multiple reads on a closed queue */
1090 iunlock(&q->lk);
1091 error(q->err);
1092 }
1093
1094 /* if we get here, there's at least one block in the queue */
1095 if(q->state & Qcoalesce){
1096 /* when coalescing, 0 length blocks just go away */
1097 b = q->bfirst;
1098 if(BLEN(b) <= 0){
1099 freeb(qremove(q));
1100 goto again;
1101 }
1102
1103 /* grab the first block plus as many
1104 * following blocks as will completely
1105 * fit in the read.
1106 */
1107 n = 0;
1108 l = &first;
1109 m = BLEN(b);
1110 for(;;) {
1111 *l = qremove(q);
1112 l = &b->next;
1113 n += m;
1114
1115 b = q->bfirst;
1116 if(b == nil)
1117 break;
1118 m = BLEN(b);
1119 if(n+m > len)
1120 break;
1121 }
1122 } else {
1123 first = qremove(q);
1124 n = BLEN(first);
1125 }
1126
1127 /* copy to user space outside of the ilock */
1128 iunlock(&q->lk);
1129 b = bl2mem(vp, first, len);
1130 ilock(&q->lk);
1131
1132 /* take care of any left over partial block */
1133 if(b != nil){
1134 n -= BLEN(b);
1135 if(q->state & Qmsg)
1136 freeb(b);
1137 else
1138 qputback(q, b);
1139 }
1140
1141 /* restart producer */
1142 qwakeup_iunlock(q);
1143
1144 poperror();
1145 qunlock(&q->rlock);
1146 return n;
1147 }
1148
1149 static int
1150 qnotfull(void *a)
1151 {
1152 Queue *q = a;
1153
1154 return q->len < q->limit || (q->state & Qclosed);
1155 }
1156
1157 ulong noblockcnt;
1158
1159 /*
1160 * add a block to a queue obeying flow control
1161 */
1162 long
1163 qbwrite(Queue *q, Block *b)
1164 {
1165 int n, dowakeup;
1166 Proc *p;
1167
1168 n = BLEN(b);
1169
1170 if(q->bypass){
1171 (*q->bypass)(q->arg, b);
1172 return n;
1173 }
1174
1175 dowakeup = 0;
1176 qlock(&q->wlock);
1177 if(waserror()){
1178 if(b != nil)
1179 freeb(b);
1180 qunlock(&q->wlock);
1181 nexterror();
1182 }
1183
1184 ilock(&q->lk);
1185
1186 /* give up if the queue is closed */
1187 if(q->state & Qclosed){
1188 iunlock(&q->lk);
1189 error(q->err);
1190 }
1191
1192 /* if nonblocking, don't queue over the limit */
1193 if(q->len >= q->limit){
1194 if(q->noblock){
1195 iunlock(&q->lk);
1196 freeb(b);
1197 noblockcnt += n;
1198 qunlock(&q->wlock);
1199 poperror();
1200 return n;
1201 }
1202 }
1203
1204 /* queue the block */
1205 if(q->bfirst)
1206 q->blast->next = b;
1207 else
1208 q->bfirst = b;
1209 q->blast = b;
1210 b->next = 0;
1211 q->len += BALLOC(b);
1212 q->dlen += n;
1213 QDEBUG checkb(b, "qbwrite");
1214 b = nil;
1215
1216 /* make sure other end gets awakened */
1217 if(q->state & Qstarve){
1218 q->state &= ~Qstarve;
1219 dowakeup = 1;
1220 }
1221 iunlock(&q->lk);
1222
1223 /* get output going again */
1224 if(q->kick && (dowakeup || (q->state&Qkick)))
1225 q->kick(q->arg);
1226
1227 /* wakeup anyone consuming at the other end */
1228 if(dowakeup){
1229 p = wakeup(&q->rr);
1230
1231 /* if we just wokeup a higher priority process, let it run */
1232 if(p != nil && p->priority > up->priority)
1233 sched();
1234 }
1235
1236 /*
1237 * flow control, wait for queue to get below the limit
1238 * before allowing the process to continue and queue
1239 * more. We do this here so that postnote can only
1240 * interrupt us after the data has been queued. This
1241 * means that things like 9p flushes and ssl messages
1242 * will not be disrupted by software interrupts.
1243 *
1244 * Note - this is moderately dangerous since a process
1245 * that keeps getting interrupted and rewriting will
1246 * queue infinite crud.
1247 */
1248 for(;;){
1249 if(q->noblock || qnotfull(q))
1250 break;
1251
1252 ilock(&q->lk);
1253 q->state |= Qflow;
1254 iunlock(&q->lk);
1255 sleep(&q->wr, qnotfull, q);
1256 }
1257 USED(b);
1258
1259 qunlock(&q->wlock);
1260 poperror();
1261 return n;
1262 }
1263
1264 /*
1265 * write to a queue. only Maxatomic bytes at a time is atomic.
1266 */
1267 int
1268 qwrite(Queue *q, void *vp, int len)
1269 {
1270 int n, sofar;
1271 Block *b;
1272 uchar *p = vp;
1273
1274 QDEBUG if(!islo())
1275 print("qwrite hi %#p\n", getcallerpc(&q));
1276
1277 sofar = 0;
1278 do {
1279 n = len-sofar;
1280 if(n > Maxatomic)
1281 n = Maxatomic;
1282
1283 b = allocb(n);
1284 if(waserror()){
1285 freeb(b);
1286 nexterror();
1287 }
1288 memmove(b->wp, p+sofar, n);
1289 poperror();
1290 b->wp += n;
1291
1292 qbwrite(q, b);
1293
1294 sofar += n;
1295 } while(sofar < len && (q->state & Qmsg) == 0);
1296
1297 return len;
1298 }
1299
1300 /*
1301 * used by print() to write to a queue. Since we may be splhi or not in
1302 * a process, don't qlock.
1303 *
1304 * this routine merges adjacent blocks if block n+1 will fit into
1305 * the free space of block n.
1306 */
1307 int
1308 qiwrite(Queue *q, void *vp, int len)
1309 {
1310 int n, sofar, dowakeup;
1311 Block *b;
1312 uchar *p = vp;
1313
1314 dowakeup = 0;
1315
1316 sofar = 0;
1317 do {
1318 n = len-sofar;
1319 if(n > Maxatomic)
1320 n = Maxatomic;
1321
1322 b = iallocb(n);
1323 if(b == nil)
1324 break;
1325 memmove(b->wp, p+sofar, n);
1326 b->wp += n;
1327
1328 ilock(&q->lk);
1329
1330 /* we use an artificially high limit for kernel prints since anything
1331 * over the limit gets dropped
1332 */
1333 if(q->dlen >= 16*1024){
1334 iunlock(&q->lk);
1335 freeb(b);
1336 break;
1337 }
1338
1339 QDEBUG checkb(b, "qiwrite");
1340 if(q->bfirst)
1341 q->blast->next = b;
1342 else
1343 q->bfirst = b;
1344 q->blast = b;
1345 q->len += BALLOC(b);
1346 q->dlen += n;
1347
1348 if(q->state & Qstarve){
1349 q->state &= ~Qstarve;
1350 dowakeup = 1;
1351 }
1352
1353 iunlock(&q->lk);
1354
1355 if(dowakeup){
1356 if(q->kick)
1357 q->kick(q->arg);
1358 wakeup(&q->rr);
1359 }
1360
1361 sofar += n;
1362 } while(sofar < len && (q->state & Qmsg) == 0);
1363
1364 return sofar;
1365 }
1366
1367 /*
1368 * be extremely careful when calling this,
1369 * as there is no reference accounting
1370 */
1371 void
1372 qfree(Queue *q)
1373 {
1374 qclose(q);
1375 free(q);
1376 }
1377
1378 /*
1379 * Mark a queue as closed. No further IO is permitted.
1380 * All blocks are released.
1381 */
1382 void
1383 qclose(Queue *q)
1384 {
1385 Block *bfirst;
1386
1387 if(q == nil)
1388 return;
1389
1390 /* mark it */
1391 ilock(&q->lk);
1392 q->state |= Qclosed;
1393 q->state &= ~(Qflow|Qstarve);
1394 strcpy(q->err, Ehungup);
1395 bfirst = q->bfirst;
1396 q->bfirst = 0;
1397 q->len = 0;
1398 q->dlen = 0;
1399 q->noblock = 0;
1400 iunlock(&q->lk);
1401
1402 /* free queued blocks */
1403 freeblist(bfirst);
1404
1405 /* wake up readers/writers */
1406 wakeup(&q->rr);
1407 wakeup(&q->wr);
1408 }
1409
1410 /*
1411 * Mark a queue as closed. Wakeup any readers. Don't remove queued
1412 * blocks.
1413 */
1414 void
1415 qhangup(Queue *q, char *msg)
1416 {
1417 /* mark it */
1418 ilock(&q->lk);
1419 q->state |= Qclosed;
1420 if(msg == 0 || *msg == 0)
1421 strcpy(q->err, Ehungup);
1422 else
1423 strncpy(q->err, msg, ERRMAX-1);
1424 iunlock(&q->lk);
1425
1426 /* wake up readers/writers */
1427 wakeup(&q->rr);
1428 wakeup(&q->wr);
1429 }
1430
1431 /*
1432 * return non-zero if the q is hungup
1433 */
1434 int
1435 qisclosed(Queue *q)
1436 {
1437 return q->state & Qclosed;
1438 }
1439
1440 /*
1441 * mark a queue as no longer hung up
1442 */
1443 void
1444 qreopen(Queue *q)
1445 {
1446 ilock(&q->lk);
1447 q->state &= ~Qclosed;
1448 q->state |= Qstarve;
1449 q->eof = 0;
1450 q->limit = q->inilim;
1451 iunlock(&q->lk);
1452 }
1453
1454 /*
1455 * return bytes queued
1456 */
1457 int
1458 qlen(Queue *q)
1459 {
1460 return q->dlen;
1461 }
1462
1463 /*
1464 * return space remaining before flow control
1465 */
1466 int
1467 qwindow(Queue *q)
1468 {
1469 int l;
1470
1471 l = q->limit - q->len;
1472 if(l < 0)
1473 l = 0;
1474 return l;
1475 }
1476
1477 /*
1478 * return true if we can read without blocking
1479 */
1480 int
1481 qcanread(Queue *q)
1482 {
1483 return q->bfirst!=0;
1484 }
1485
1486 /*
1487 * change queue limit
1488 */
1489 void
1490 qsetlimit(Queue *q, int limit)
1491 {
1492 q->limit = limit;
1493 }
1494
1495 /*
1496 * set blocking/nonblocking
1497 */
1498 void
1499 qnoblock(Queue *q, int onoff)
1500 {
1501 q->noblock = onoff;
1502 }
1503
1504 /*
1505 * flush the output queue
1506 */
1507 void
1508 qflush(Queue *q)
1509 {
1510 Block *bfirst;
1511
1512 /* mark it */
1513 ilock(&q->lk);
1514 bfirst = q->bfirst;
1515 q->bfirst = 0;
1516 q->len = 0;
1517 q->dlen = 0;
1518 iunlock(&q->lk);
1519
1520 /* free queued blocks */
1521 freeblist(bfirst);
1522
1523 /* wake up readers/writers */
1524 wakeup(&q->wr);
1525 }
1526
1527 int
1528 qfull(Queue *q)
1529 {
1530 return q->state & Qflow;
1531 }
1532
1533 int
1534 qstate(Queue *q)
1535 {
1536 return q->state;
1537 }