qio.c - vx32 - Local 9vx git repository for patches.
 (HTM) git clone git://r-36.net/vx32
 (DIR) Log
 (DIR) Files
 (DIR) Refs
       ---
       qio.c (23670B)
       ---
            1 #include        "u.h"
            2 #include        "lib.h"
            3 #include        "mem.h"
            4 #include        "dat.h"
            5 #include        "fns.h"
            6 #include        "error.h"
            7 
            8 static ulong padblockcnt;
            9 static ulong concatblockcnt;
           10 static ulong pullupblockcnt;
           11 static ulong copyblockcnt;
           12 static ulong consumecnt;
           13 static ulong producecnt;
           14 static ulong qcopycnt;
           15 
           16 static int debugging;
           17 
           18 #define QDEBUG        if(0)
           19 
           20 /*
           21  *  IO queues
           22  */
           23 
           24 struct Queue
           25 {
           26         Lock lk;
           27 
           28         Block*        bfirst;                /* buffer */
           29         Block*        blast;
           30 
           31         int        len;                /* bytes allocated to queue */
           32         int        dlen;                /* data bytes in queue */
           33         int        limit;                /* max bytes in queue */
           34         int        inilim;                /* initial limit */
           35         int        state;
           36         int        noblock;        /* true if writes return immediately when q full */
           37         int        eof;                /* number of eofs read by user */
           38 
           39         void        (*kick)(void*);        /* restart output */
           40         void        (*bypass)(void*, Block*);        /* bypass queue altogether */
           41         void*        arg;                /* argument to kick */
           42 
           43         QLock        rlock;                /* mutex for reading processes */
           44         Rendez        rr;                /* process waiting to read */
           45         QLock        wlock;                /* mutex for writing processes */
           46         Rendez        wr;                /* process waiting to write */
           47 
           48         char        err[ERRMAX];
           49 };
           50 
           51 enum
           52 {
           53         Maxatomic        = 64*1024,
           54 };
           55 
           56 uint        qiomaxatomic = Maxatomic;
           57 
           58 void
           59 ixsummary(void)
           60 {
           61         debugging ^= 1;
           62         iallocsummary();
           63         print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
           64                 padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
           65         print("consume %lud, produce %lud, qcopy %lud\n",
           66                 consumecnt, producecnt, qcopycnt);
           67 }
           68 
           69 /*
           70  *  free a list of blocks
           71  */
           72 void
           73 freeblist(Block *b)
           74 {
           75         Block *next;
           76 
           77         for(; b != 0; b = next){
           78                 next = b->next;
           79                 if(b->ref == 1)
           80                         b->next = nil;
           81                 freeb(b);
           82         }
           83 }
           84 
           85 /*
           86  *  pad a block to the front (or the back if size is negative)
           87  */
           88 Block*
           89 padblock(Block *bp, int size)
           90 {
           91         int n;
           92         Block *nbp;
           93 
           94         QDEBUG checkb(bp, "padblock 1");
           95         if(size >= 0){
           96                 if(bp->rp - bp->base >= size){
           97                         bp->rp -= size;
           98                         return bp;
           99                 }
          100 
          101                 if(bp->next)
          102                         panic("padblock %#p", getcallerpc(&bp));
          103                 n = BLEN(bp);
          104                 padblockcnt++;
          105                 nbp = allocb(size+n);
          106                 nbp->rp += size;
          107                 nbp->wp = nbp->rp;
          108                 memmove(nbp->wp, bp->rp, n);
          109                 nbp->wp += n;
          110                 freeb(bp);
          111                 nbp->rp -= size;
          112         } else {
          113                 size = -size;
          114 
          115                 if(bp->next)
          116                         panic("padblock %#p", getcallerpc(&bp));
          117 
          118                 if(bp->lim - bp->wp >= size)
          119                         return bp;
          120 
          121                 n = BLEN(bp);
          122                 padblockcnt++;
          123                 nbp = allocb(size+n);
          124                 memmove(nbp->wp, bp->rp, n);
          125                 nbp->wp += n;
          126                 freeb(bp);
          127         }
          128         QDEBUG checkb(nbp, "padblock 1");
          129         return nbp;
          130 }
          131 
          132 /*
          133  *  return count of bytes in a string of blocks
          134  */
          135 int
          136 blocklen(Block *bp)
          137 {
          138         int len;
          139 
          140         len = 0;
          141         while(bp) {
          142                 len += BLEN(bp);
          143                 bp = bp->next;
          144         }
          145         return len;
          146 }
          147 
          148 /*
          149  * return count of space in blocks
          150  */
          151 int
          152 blockalloclen(Block *bp)
          153 {
          154         int len;
          155 
          156         len = 0;
          157         while(bp) {
          158                 len += BALLOC(bp);
          159                 bp = bp->next;
          160         }
          161         return len;
          162 }
          163 
          164 /*
          165  *  copy the  string of blocks into
          166  *  a single block and free the string
          167  */
          168 Block*
          169 concatblock(Block *bp)
          170 {
          171         int len;
          172         Block *nb, *f;
          173 
          174         if(bp->next == 0)
          175                 return bp;
          176 
          177         nb = allocb(blocklen(bp));
          178         for(f = bp; f; f = f->next) {
          179                 len = BLEN(f);
          180                 memmove(nb->wp, f->rp, len);
          181                 nb->wp += len;
          182         }
          183         concatblockcnt += BLEN(nb);
          184         freeblist(bp);
          185         QDEBUG checkb(nb, "concatblock 1");
          186         return nb;
          187 }
          188 
          189 /*
          190  *  make sure the first block has at least n bytes
          191  */
          192 Block*
          193 pullupblock(Block *bp, int n)
          194 {
          195         int i;
          196         Block *nbp;
          197 
          198         /*
          199          *  this should almost always be true, it's
          200          *  just to avoid every caller checking.
          201          */
          202         if(BLEN(bp) >= n)
          203                 return bp;
          204 
          205         /*
          206          *  if not enough room in the first block,
          207          *  add another to the front of the list.
          208          */
          209         if(bp->lim - bp->rp < n){
          210                 nbp = allocb(n);
          211                 nbp->next = bp;
          212                 bp = nbp;
          213         }
          214 
          215         /*
          216          *  copy bytes from the trailing blocks into the first
          217          */
          218         n -= BLEN(bp);
          219         while((nbp = bp->next)){
          220                 i = BLEN(nbp);
          221                 if(i > n) {
          222                         memmove(bp->wp, nbp->rp, n);
          223                         pullupblockcnt++;
          224                         bp->wp += n;
          225                         nbp->rp += n;
          226                         QDEBUG checkb(bp, "pullupblock 1");
          227                         return bp;
          228                 } else {
          229                         /* shouldn't happen but why crash if it does */
          230                         if(i < 0){
          231                                 print("pullup negative length packet, called from %#p\n",
          232                                         getcallerpc(&bp));
          233                                 i = 0;
          234                         }
          235                         memmove(bp->wp, nbp->rp, i);
          236                         pullupblockcnt++;
          237                         bp->wp += i;
          238                         bp->next = nbp->next;
          239                         nbp->next = 0;
          240                         freeb(nbp);
          241                         n -= i;
          242                         if(n == 0){
          243                                 QDEBUG checkb(bp, "pullupblock 2");
          244                                 return bp;
          245                         }
          246                 }
          247         }
          248         freeb(bp);
          249         return 0;
          250 }
          251 
          252 /*
          253  *  make sure the first block has at least n bytes
          254  */
          255 Block*
          256 pullupqueue(Queue *q, int n)
          257 {
          258         Block *b;
          259 
          260         if(BLEN(q->bfirst) >= n)
          261                 return q->bfirst;
          262         q->bfirst = pullupblock(q->bfirst, n);
          263         for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
          264                 ;
          265         q->blast = b;
          266         return q->bfirst;
          267 }
          268 
          269 /*
          270  *  trim to len bytes starting at offset
          271  */
          272 Block *
          273 trimblock(Block *bp, int offset, int len)
          274 {
          275         ulong l;
          276         Block *nb, *startb;
          277 
          278         QDEBUG checkb(bp, "trimblock 1");
          279         if(blocklen(bp) < offset+len) {
          280                 freeblist(bp);
          281                 return nil;
          282         }
          283 
          284         while((l = BLEN(bp)) < offset) {
          285                 offset -= l;
          286                 nb = bp->next;
          287                 bp->next = nil;
          288                 freeb(bp);
          289                 bp = nb;
          290         }
          291 
          292         startb = bp;
          293         bp->rp += offset;
          294 
          295         while((l = BLEN(bp)) < len) {
          296                 len -= l;
          297                 bp = bp->next;
          298         }
          299 
          300         bp->wp -= (BLEN(bp) - len);
          301 
          302         if(bp->next) {
          303                 freeblist(bp->next);
          304                 bp->next = nil;
          305         }
          306 
          307         return startb;
          308 }
          309 
          310 /*
          311  *  copy 'count' bytes into a new block
          312  */
          313 Block*
          314 copyblock(Block *bp, int count)
          315 {
          316         int l;
          317         Block *nbp;
          318 
          319         QDEBUG checkb(bp, "copyblock 0");
          320         nbp = allocb(count);
          321         for(; count > 0 && bp != 0; bp = bp->next){
          322                 l = BLEN(bp);
          323                 if(l > count)
          324                         l = count;
          325                 memmove(nbp->wp, bp->rp, l);
          326                 nbp->wp += l;
          327                 count -= l;
          328         }
          329         if(count > 0){
          330                 memset(nbp->wp, 0, count);
          331                 nbp->wp += count;
          332         }
          333         copyblockcnt++;
          334         QDEBUG checkb(nbp, "copyblock 1");
          335 
          336         return nbp;
          337 }
          338 
          339 Block*
          340 adjustblock(Block* bp, int len)
          341 {
          342         int n;
          343         Block *nbp;
          344 
          345         if(len < 0){
          346                 freeb(bp);
          347                 return nil;
          348         }
          349 
          350         if(bp->rp+len > bp->lim){
          351                 nbp = copyblock(bp, len);
          352                 freeblist(bp);
          353                 QDEBUG checkb(nbp, "adjustblock 1");
          354 
          355                 return nbp;
          356         }
          357 
          358         n = BLEN(bp);
          359         if(len > n)
          360                 memset(bp->wp, 0, len-n);
          361         bp->wp = bp->rp+len;
          362         QDEBUG checkb(bp, "adjustblock 2");
          363 
          364         return bp;
          365 }
          366 
          367 
          368 /*
          369  *  throw away up to count bytes from a
          370  *  list of blocks.  Return count of bytes
          371  *  thrown away.
          372  */
          373 int
          374 pullblock(Block **bph, int count)
          375 {
          376         Block *bp;
          377         int n, bytes;
          378 
          379         bytes = 0;
          380         if(bph == nil)
          381                 return 0;
          382 
          383         while(*bph != nil && count != 0) {
          384                 bp = *bph;
          385                 n = BLEN(bp);
          386                 if(count < n)
          387                         n = count;
          388                 bytes += n;
          389                 count -= n;
          390                 bp->rp += n;
          391                 QDEBUG checkb(bp, "pullblock ");
          392                 if(BLEN(bp) == 0) {
          393                         *bph = bp->next;
          394                         bp->next = nil;
          395                         freeb(bp);
          396                 }
          397         }
          398         return bytes;
          399 }
          400 
          401 /*
          402  *  get next block from a queue, return null if nothing there
          403  */
          404 Block*
          405 qget(Queue *q)
          406 {
          407         int dowakeup;
          408         Block *b;
          409 
          410         /* sync with qwrite */
          411         ilock(&q->lk);
          412 
          413         b = q->bfirst;
          414         if(b == nil){
          415                 q->state |= Qstarve;
          416                 iunlock(&q->lk);
          417                 return nil;
          418         }
          419         q->bfirst = b->next;
          420         b->next = 0;
          421         q->len -= BALLOC(b);
          422         q->dlen -= BLEN(b);
          423         QDEBUG checkb(b, "qget");
          424 
          425         /* if writer flow controlled, restart */
          426         if((q->state & Qflow) && q->len < q->limit/2){
          427                 q->state &= ~Qflow;
          428                 dowakeup = 1;
          429         } else
          430                 dowakeup = 0;
          431 
          432         iunlock(&q->lk);
          433 
          434         if(dowakeup)
          435                 wakeup(&q->wr);
          436 
          437         return b;
          438 }
          439 
          440 /*
          441  *  throw away the next 'len' bytes in the queue
          442  */
          443 int
          444 qdiscard(Queue *q, int len)
          445 {
          446         Block *b;
          447         int dowakeup, n, sofar;
          448 
          449         ilock(&q->lk);
          450         for(sofar = 0; sofar < len; sofar += n){
          451                 b = q->bfirst;
          452                 if(b == nil)
          453                         break;
          454                 QDEBUG checkb(b, "qdiscard");
          455                 n = BLEN(b);
          456                 if(n <= len - sofar){
          457                         q->bfirst = b->next;
          458                         b->next = 0;
          459                         q->len -= BALLOC(b);
          460                         q->dlen -= BLEN(b);
          461                         freeb(b);
          462                 } else {
          463                         n = len - sofar;
          464                         b->rp += n;
          465                         q->dlen -= n;
          466                 }
          467         }
          468 
          469         /*
          470          *  if writer flow controlled, restart
          471          *
          472          *  This used to be
          473          *        q->len < q->limit/2
          474          *  but it slows down tcp too much for certain write sizes.
          475          *  I really don't understand it completely.  It may be
          476          *  due to the queue draining so fast that the transmission
          477          *  stalls waiting for the app to produce more data.  - presotto
          478          */
          479         if((q->state & Qflow) && q->len < q->limit){
          480                 q->state &= ~Qflow;
          481                 dowakeup = 1;
          482         } else
          483                 dowakeup = 0;
          484 
          485         iunlock(&q->lk);
          486 
          487         if(dowakeup)
          488                 wakeup(&q->wr);
          489 
          490         return sofar;
          491 }
          492 
          493 /*
          494  *  Interrupt level copy out of a queue, return # bytes copied.
          495  */
          496 int
          497 qconsume(Queue *q, void *vp, int len)
          498 {
          499         Block *b;
          500         int n, dowakeup;
          501         uchar *p = vp;
          502         Block *tofree = nil;
          503 
          504         /* sync with qwrite */
          505         ilock(&q->lk);
          506 
          507         for(;;) {
          508                 b = q->bfirst;
          509                 if(b == 0){
          510                         q->state |= Qstarve;
          511                         iunlock(&q->lk);
          512                         return -1;
          513                 }
          514                 QDEBUG checkb(b, "qconsume 1");
          515 
          516                 n = BLEN(b);
          517                 if(n > 0)
          518                         break;
          519                 q->bfirst = b->next;
          520                 q->len -= BALLOC(b);
          521 
          522                 /* remember to free this */
          523                 b->next = tofree;
          524                 tofree = b;
          525         };
          526 
          527         if(n < len)
          528                 len = n;
          529         memmove(p, b->rp, len);
          530         consumecnt += n;
          531         b->rp += len;
          532         q->dlen -= len;
          533 
          534         /* discard the block if we're done with it */
          535         if((q->state & Qmsg) || len == n){
          536                 q->bfirst = b->next;
          537                 b->next = 0;
          538                 q->len -= BALLOC(b);
          539                 q->dlen -= BLEN(b);
          540 
          541                 /* remember to free this */
          542                 b->next = tofree;
          543                 tofree = b;
          544         }
          545 
          546         /* if writer flow controlled, restart */
          547         if((q->state & Qflow) && q->len < q->limit/2){
          548                 q->state &= ~Qflow;
          549                 dowakeup = 1;
          550         } else
          551                 dowakeup = 0;
          552 
          553         iunlock(&q->lk);
          554 
          555         if(dowakeup)
          556                 wakeup(&q->wr);
          557 
          558         if(tofree != nil)
          559                 freeblist(tofree);
          560 
          561         return len;
          562 }
          563 
          564 int
          565 qpass(Queue *q, Block *b)
          566 {
          567         int dlen, len, dowakeup;
          568 
          569         /* sync with qread */
          570         dowakeup = 0;
          571         ilock(&q->lk);
          572         if(q->len >= q->limit){
          573                 freeblist(b);
          574                 iunlock(&q->lk);
          575                 return -1;
          576         }
          577         if(q->state & Qclosed){
          578                 len = BALLOC(b);
          579                 freeblist(b);
          580                 iunlock(&q->lk);
          581                 return len;
          582         }
          583 
          584         /* add buffer to queue */
          585         if(q->bfirst)
          586                 q->blast->next = b;
          587         else
          588                 q->bfirst = b;
          589         len = BALLOC(b);
          590         dlen = BLEN(b);
          591         QDEBUG checkb(b, "qpass");
          592         while(b->next){
          593                 b = b->next;
          594                 QDEBUG checkb(b, "qpass");
          595                 len += BALLOC(b);
          596                 dlen += BLEN(b);
          597         }
          598         q->blast = b;
          599         q->len += len;
          600         q->dlen += dlen;
          601 
          602         if(q->len >= q->limit/2)
          603                 q->state |= Qflow;
          604 
          605         if(q->state & Qstarve){
          606                 q->state &= ~Qstarve;
          607                 dowakeup = 1;
          608         }
          609         iunlock(&q->lk);
          610 
          611         if(dowakeup)
          612                 wakeup(&q->rr);
          613 
          614         return len;
          615 }
          616 
          617 int
          618 qpassnolim(Queue *q, Block *b)
          619 {
          620         int dlen, len, dowakeup;
          621 
          622         /* sync with qread */
          623         dowakeup = 0;
          624         ilock(&q->lk);
          625 
          626         if(q->state & Qclosed){
          627                 freeblist(b);
          628                 iunlock(&q->lk);
          629                 return BALLOC(b);
          630         }
          631 
          632         /* add buffer to queue */
          633         if(q->bfirst)
          634                 q->blast->next = b;
          635         else
          636                 q->bfirst = b;
          637         len = BALLOC(b);
          638         dlen = BLEN(b);
          639         QDEBUG checkb(b, "qpass");
          640         while(b->next){
          641                 b = b->next;
          642                 QDEBUG checkb(b, "qpass");
          643                 len += BALLOC(b);
          644                 dlen += BLEN(b);
          645         }
          646         q->blast = b;
          647         q->len += len;
          648         q->dlen += dlen;
          649 
          650         if(q->len >= q->limit/2)
          651                 q->state |= Qflow;
          652 
          653         if(q->state & Qstarve){
          654                 q->state &= ~Qstarve;
          655                 dowakeup = 1;
          656         }
          657         iunlock(&q->lk);
          658 
          659         if(dowakeup)
          660                 wakeup(&q->rr);
          661 
          662         return len;
          663 }
          664 
          665 /*
          666  *  if the allocated space is way out of line with the used
          667  *  space, reallocate to a smaller block
          668  */
          669 Block*
          670 packblock(Block *bp)
          671 {
          672         Block **l, *nbp;
          673         int n;
          674 
          675         for(l = &bp; *l; l = &(*l)->next){
          676                 nbp = *l;
          677                 n = BLEN(nbp);
          678                 if((n<<2) < BALLOC(nbp)){
          679                         *l = allocb(n);
          680                         memmove((*l)->wp, nbp->rp, n);
          681                         (*l)->wp += n;
          682                         (*l)->next = nbp->next;
          683                         freeb(nbp);
          684                 }
          685         }
          686 
          687         return bp;
          688 }
          689 
          690 int
          691 qproduce(Queue *q, void *vp, int len)
          692 {
          693         Block *b;
          694         int dowakeup;
          695         uchar *p = vp;
          696 
          697         /* sync with qread */
          698         dowakeup = 0;
          699         ilock(&q->lk);
          700 
          701         /* no waiting receivers, room in buffer? */
          702         if(q->len >= q->limit){
          703                 q->state |= Qflow;
          704                 iunlock(&q->lk);
          705                 return -1;
          706         }
          707 
          708         /* save in buffer */
          709         b = iallocb(len);
          710         if(b == 0){
          711                 iunlock(&q->lk);
          712                 return 0;
          713         }
          714         memmove(b->wp, p, len);
          715         producecnt += len;
          716         b->wp += len;
          717         if(q->bfirst)
          718                 q->blast->next = b;
          719         else
          720                 q->bfirst = b;
          721         q->blast = b;
          722         /* b->next = 0; done by iallocb() */
          723         q->len += BALLOC(b);
          724         q->dlen += BLEN(b);
          725         QDEBUG checkb(b, "qproduce");
          726 
          727         if(q->state & Qstarve){
          728                 q->state &= ~Qstarve;
          729                 dowakeup = 1;
          730         }
          731 
          732         if(q->len >= q->limit)
          733                 q->state |= Qflow;
          734         iunlock(&q->lk);
          735 
          736         if(dowakeup)
          737                 wakeup(&q->rr);
          738 
          739         return len;
          740 }
          741 
          742 /*
          743  *  copy from offset in the queue
          744  */
          745 Block*
          746 qcopy(Queue *q, int len, ulong offset)
          747 {
          748         int sofar;
          749         int n;
          750         Block *b, *nb;
          751         uchar *p;
          752 
          753         nb = allocb(len);
          754 
          755         ilock(&q->lk);
          756 
          757         /* go to offset */
          758         b = q->bfirst;
          759         for(sofar = 0; ; sofar += n){
          760                 if(b == nil){
          761                         iunlock(&q->lk);
          762                         return nb;
          763                 }
          764                 n = BLEN(b);
          765                 if(sofar + n > offset){
          766                         p = b->rp + offset - sofar;
          767                         n -= offset - sofar;
          768                         break;
          769                 }
          770                 QDEBUG checkb(b, "qcopy");
          771                 b = b->next;
          772         }
          773 
          774         /* copy bytes from there */
          775         for(sofar = 0; sofar < len;){
          776                 if(n > len - sofar)
          777                         n = len - sofar;
          778                 memmove(nb->wp, p, n);
          779                 qcopycnt += n;
          780                 sofar += n;
          781                 nb->wp += n;
          782                 b = b->next;
          783                 if(b == nil)
          784                         break;
          785                 n = BLEN(b);
          786                 p = b->rp;
          787         }
          788         iunlock(&q->lk);
          789 
          790         return nb;
          791 }
          792 
          793 /*
          794  *  called by non-interrupt code
          795  */
          796 Queue*
          797 qopen(int limit, int msg, void (*kick)(void*), void *arg)
          798 {
          799         Queue *q;
          800 
          801         q = malloc(sizeof(Queue));
          802         if(q == 0)
          803                 return 0;
          804 
          805         q->limit = q->inilim = limit;
          806         q->kick = kick;
          807         q->arg = arg;
          808         q->state = msg;
          809         
          810         q->state |= Qstarve;
          811         q->eof = 0;
          812         q->noblock = 0;
          813 
          814         return q;
          815 }
          816 
          817 /* open a queue to be bypassed */
          818 Queue*
          819 qbypass(void (*bypass)(void*, Block*), void *arg)
          820 {
          821         Queue *q;
          822 
          823         q = malloc(sizeof(Queue));
          824         if(q == 0)
          825                 return 0;
          826 
          827         q->limit = 0;
          828         q->arg = arg;
          829         q->bypass = bypass;
          830         q->state = 0;
          831 
          832         return q;
          833 }
          834 
          835 static int
          836 notempty(void *a)
          837 {
          838         Queue *q = a;
          839 
          840         return (q->state & Qclosed) || q->bfirst != 0;
          841 }
          842 
          843 /*
          844  *  wait for the queue to be non-empty or closed.
          845  *  called with q ilocked.
          846  */
          847 static int
          848 qwait(Queue *q)
          849 {
          850         /* wait for data */
          851         for(;;){
          852                 if(q->bfirst != nil)
          853                         break;
          854 
          855                 if(q->state & Qclosed){
          856                         if(++q->eof > 3)
          857                                 return -1;
          858                         if(*q->err && strcmp(q->err, Ehungup) != 0)
          859                                 return -1;
          860                         return 0;
          861                 }
          862 
          863                 q->state |= Qstarve;        /* flag requesting producer to wake me */
          864                 iunlock(&q->lk);
          865                 sleep(&q->rr, notempty, q);
          866                 ilock(&q->lk);
          867         }
          868         return 1;
          869 }
          870 
          871 /*
          872  * add a block list to a queue
          873  */
          874 void
          875 qaddlist(Queue *q, Block *b)
          876 {
          877         /* queue the block */
          878         if(q->bfirst)
          879                 q->blast->next = b;
          880         else
          881                 q->bfirst = b;
          882         q->len += blockalloclen(b);
          883         q->dlen += blocklen(b);
          884         while(b->next)
          885                 b = b->next;
          886         q->blast = b;
          887 }
          888 
          889 /*
          890  *  called with q ilocked
          891  */
          892 Block*
          893 qremove(Queue *q)
          894 {
          895         Block *b;
          896 
          897         b = q->bfirst;
          898         if(b == nil)
          899                 return nil;
          900         q->bfirst = b->next;
          901         b->next = nil;
          902         q->dlen -= BLEN(b);
          903         q->len -= BALLOC(b);
          904         QDEBUG checkb(b, "qremove");
          905         return b;
          906 }
          907 
          908 /*
          909  *  copy the contents of a string of blocks into
          910  *  memory.  emptied blocks are freed.  return
          911  *  pointer to first unconsumed block.
          912  */
          913 Block*
          914 bl2mem(uchar *p, Block *b, int n)
          915 {
          916         int i;
          917         Block *next;
          918 
          919         for(; b != nil; b = next){
          920                 i = BLEN(b);
          921                 if(i > n){
          922                         memmove(p, b->rp, n);
          923                         b->rp += n;
          924                         return b;
          925                 }
          926                 memmove(p, b->rp, i);
          927                 n -= i;
          928                 p += i;
          929                 b->rp += i;
          930                 next = b->next;
          931                 freeb(b);
          932         }
          933         return nil;
          934 }
          935 
          936 /*
          937  *  copy the contents of memory into a string of blocks.
          938  *  return nil on error.
          939  */
          940 Block*
          941 mem2bl(uchar *p, int len)
          942 {
          943         int n;
          944         Block *b, *first, **l;
          945 
          946         first = nil;
          947         l = &first;
          948         if(waserror()){
          949                 freeblist(first);
          950                 nexterror();
          951         }
          952         do {
          953                 n = len;
          954                 if(n > Maxatomic)
          955                         n = Maxatomic;
          956 
          957                 *l = b = allocb(n);
          958                 memmove(b->wp, p, n);
          959                 b->wp += n;
          960                 p += n;
          961                 len -= n;
          962                 l = &b->next;
          963         } while(len > 0);
          964         poperror();
          965 
          966         return first;
          967 }
          968 
          969 /*
          970  *  put a block back to the front of the queue
          971  *  called with q ilocked
          972  */
          973 void
          974 qputback(Queue *q, Block *b)
          975 {
          976         b->next = q->bfirst;
          977         if(q->bfirst == nil)
          978                 q->blast = b;
          979         q->bfirst = b;
          980         q->len += BALLOC(b);
          981         q->dlen += BLEN(b);
          982 }
          983 
          984 /*
          985  *  flow control, get producer going again
          986  *  called with q ilocked
          987  */
          988 static void
          989 qwakeup_iunlock(Queue *q)
          990 {
          991         int dowakeup = 0;
          992 
          993         /* if writer flow controlled, restart */
          994         if((q->state & Qflow) && q->len < q->limit/2){
          995                 q->state &= ~Qflow;
          996                 dowakeup = 1;
          997         }
          998 
          999         iunlock(&q->lk);
         1000 
         1001         /* wakeup flow controlled writers */
         1002         if(dowakeup){
         1003                 if(q->kick)
         1004                         q->kick(q->arg);
         1005                 wakeup(&q->wr);
         1006         }
         1007 }
         1008 
         1009 /*
         1010  *  get next block from a queue (up to a limit)
         1011  */
         1012 Block*
         1013 qbread(Queue *q, int len)
         1014 {
         1015         Block *b, *nb;
         1016         int n;
         1017 
         1018         qlock(&q->rlock);
         1019         if(waserror()){
         1020                 qunlock(&q->rlock);
         1021                 nexterror();
         1022         }
         1023 
         1024         ilock(&q->lk);
         1025         switch(qwait(q)){
         1026         case 0:
         1027                 /* queue closed */
         1028                 iunlock(&q->lk);
         1029                 qunlock(&q->rlock);
         1030                 poperror();
         1031                 return nil;
         1032         case -1:
         1033                 /* multiple reads on a closed queue */
         1034                 iunlock(&q->lk);
         1035                 error(q->err);
         1036         }
         1037 
         1038         /* if we get here, there's at least one block in the queue */
         1039         b = qremove(q);
         1040         n = BLEN(b);
         1041 
         1042         /* split block if it's too big and this is not a message queue */
         1043         nb = b;
         1044         if(n > len){
         1045                 if((q->state&Qmsg) == 0){
         1046                         n -= len;
         1047                         b = allocb(n);
         1048                         memmove(b->wp, nb->rp+len, n);
         1049                         b->wp += n;
         1050                         qputback(q, b);
         1051                 }
         1052                 nb->wp = nb->rp + len;
         1053         }
         1054 
         1055         /* restart producer */
         1056         qwakeup_iunlock(q);
         1057 
         1058         poperror();
         1059         qunlock(&q->rlock);
         1060         return nb;
         1061 }
         1062 
         1063 /*
         1064  *  read a queue.  if no data is queued, post a Block
         1065  *  and wait on its Rendez.
         1066  */
         1067 long
         1068 qread(Queue *q, void *vp, int len)
         1069 {
         1070         Block *b, *first, **l;
         1071         int m, n;
         1072 
         1073         qlock(&q->rlock);
         1074         if(waserror()){
         1075                 qunlock(&q->rlock);
         1076                 nexterror();
         1077         }
         1078 
         1079         ilock(&q->lk);
         1080 again:
         1081         switch(qwait(q)){
         1082         case 0:
         1083                 /* queue closed */
         1084                 iunlock(&q->lk);
         1085                 qunlock(&q->rlock);
         1086                 poperror();
         1087                 return 0;
         1088         case -1:
         1089                 /* multiple reads on a closed queue */
         1090                 iunlock(&q->lk);
         1091                 error(q->err);
         1092         }
         1093 
         1094         /* if we get here, there's at least one block in the queue */
         1095         if(q->state & Qcoalesce){
         1096                 /* when coalescing, 0 length blocks just go away */
         1097                 b = q->bfirst;
         1098                 if(BLEN(b) <= 0){
         1099                         freeb(qremove(q));
         1100                         goto again;
         1101                 }
         1102 
         1103                 /*  grab the first block plus as many
         1104                  *  following blocks as will completely
         1105                  *  fit in the read.
         1106                  */
         1107                 n = 0;
         1108                 l = &first;
         1109                 m = BLEN(b);
         1110                 for(;;) {
         1111                         *l = qremove(q);
         1112                         l = &b->next;
         1113                         n += m;
         1114 
         1115                         b = q->bfirst;
         1116                         if(b == nil)
         1117                                 break;
         1118                         m = BLEN(b);
         1119                         if(n+m > len)
         1120                                 break;
         1121                 }
         1122         } else {
         1123                 first = qremove(q);
         1124                 n = BLEN(first);
         1125         }
         1126 
         1127         /* copy to user space outside of the ilock */
         1128         iunlock(&q->lk);
         1129         b = bl2mem(vp, first, len);
         1130         ilock(&q->lk);
         1131 
         1132         /* take care of any left over partial block */
         1133         if(b != nil){
         1134                 n -= BLEN(b);
         1135                 if(q->state & Qmsg)
         1136                         freeb(b);
         1137                 else
         1138                         qputback(q, b);
         1139         }
         1140 
         1141         /* restart producer */
         1142         qwakeup_iunlock(q);
         1143 
         1144         poperror();
         1145         qunlock(&q->rlock);
         1146         return n;
         1147 }
         1148 
         1149 static int
         1150 qnotfull(void *a)
         1151 {
         1152         Queue *q = a;
         1153 
         1154         return q->len < q->limit || (q->state & Qclosed);
         1155 }
         1156 
         1157 ulong noblockcnt;
         1158 
         1159 /*
         1160  *  add a block to a queue obeying flow control
         1161  */
         1162 long
         1163 qbwrite(Queue *q, Block *b)
         1164 {
         1165         int n, dowakeup;
         1166         Proc *p;
         1167 
         1168         n = BLEN(b);
         1169 
         1170         if(q->bypass){
         1171                 (*q->bypass)(q->arg, b);
         1172                 return n;
         1173         }
         1174 
         1175         dowakeup = 0;
         1176         qlock(&q->wlock);
         1177         if(waserror()){
         1178                 if(b != nil)
         1179                         freeb(b);
         1180                 qunlock(&q->wlock);
         1181                 nexterror();
         1182         }
         1183 
         1184         ilock(&q->lk);
         1185 
         1186         /* give up if the queue is closed */
         1187         if(q->state & Qclosed){
         1188                 iunlock(&q->lk);
         1189                 error(q->err);
         1190         }
         1191 
         1192         /* if nonblocking, don't queue over the limit */
         1193         if(q->len >= q->limit){
         1194                 if(q->noblock){
         1195                         iunlock(&q->lk);
         1196                         freeb(b);
         1197                         noblockcnt += n;
         1198                         qunlock(&q->wlock);
         1199                         poperror();
         1200                         return n;
         1201                 }
         1202         }
         1203 
         1204         /* queue the block */
         1205         if(q->bfirst)
         1206                 q->blast->next = b;
         1207         else
         1208                 q->bfirst = b;
         1209         q->blast = b;
         1210         b->next = 0;
         1211         q->len += BALLOC(b);
         1212         q->dlen += n;
         1213         QDEBUG checkb(b, "qbwrite");
         1214         b = nil;
         1215 
         1216         /* make sure other end gets awakened */
         1217         if(q->state & Qstarve){
         1218                 q->state &= ~Qstarve;
         1219                 dowakeup = 1;
         1220         }
         1221         iunlock(&q->lk);
         1222 
         1223         /*  get output going again */
         1224         if(q->kick && (dowakeup || (q->state&Qkick)))
         1225                 q->kick(q->arg);
         1226 
         1227         /* wakeup anyone consuming at the other end */
         1228         if(dowakeup){
         1229                 p = wakeup(&q->rr);
         1230 
         1231                 /* if we just wokeup a higher priority process, let it run */
         1232                 if(p != nil && p->priority > up->priority)
         1233                         sched();
         1234         }
         1235 
         1236         /*
         1237          *  flow control, wait for queue to get below the limit
         1238          *  before allowing the process to continue and queue
         1239          *  more.  We do this here so that postnote can only
         1240          *  interrupt us after the data has been queued.  This
         1241          *  means that things like 9p flushes and ssl messages
         1242          *  will not be disrupted by software interrupts.
         1243          *
         1244          *  Note - this is moderately dangerous since a process
         1245          *  that keeps getting interrupted and rewriting will
         1246          *  queue infinite crud.
         1247          */
         1248         for(;;){
         1249                 if(q->noblock || qnotfull(q))
         1250                         break;
         1251 
         1252                 ilock(&q->lk);
         1253                 q->state |= Qflow;
         1254                 iunlock(&q->lk);
         1255                 sleep(&q->wr, qnotfull, q);
         1256         }
         1257         USED(b);
         1258 
         1259         qunlock(&q->wlock);
         1260         poperror();
         1261         return n;
         1262 }
         1263 
         1264 /*
         1265  *  write to a queue.  only Maxatomic bytes at a time is atomic.
         1266  */
         1267 int
         1268 qwrite(Queue *q, void *vp, int len)
         1269 {
         1270         int n, sofar;
         1271         Block *b;
         1272         uchar *p = vp;
         1273 
         1274         QDEBUG if(!islo())
         1275                 print("qwrite hi %#p\n", getcallerpc(&q));
         1276 
         1277         sofar = 0;
         1278         do {
         1279                 n = len-sofar;
         1280                 if(n > Maxatomic)
         1281                         n = Maxatomic;
         1282 
         1283                 b = allocb(n);
         1284                 if(waserror()){
         1285                         freeb(b);
         1286                         nexterror();
         1287                 }
         1288                 memmove(b->wp, p+sofar, n);
         1289                 poperror();
         1290                 b->wp += n;
         1291 
         1292                 qbwrite(q, b);
         1293 
         1294                 sofar += n;
         1295         } while(sofar < len && (q->state & Qmsg) == 0);
         1296 
         1297         return len;
         1298 }
         1299 
         1300 /*
         1301  *  used by print() to write to a queue.  Since we may be splhi or not in
         1302  *  a process, don't qlock.
         1303  *
         1304  *  this routine merges adjacent blocks if block n+1 will fit into
         1305  *  the free space of block n.
         1306  */
         1307 int
         1308 qiwrite(Queue *q, void *vp, int len)
         1309 {
         1310         int n, sofar, dowakeup;
         1311         Block *b;
         1312         uchar *p = vp;
         1313 
         1314         dowakeup = 0;
         1315 
         1316         sofar = 0;
         1317         do {
         1318                 n = len-sofar;
         1319                 if(n > Maxatomic)
         1320                         n = Maxatomic;
         1321 
         1322                 b = iallocb(n);
         1323                 if(b == nil)
         1324                         break;
         1325                 memmove(b->wp, p+sofar, n);
         1326                 b->wp += n;
         1327 
         1328                 ilock(&q->lk);
         1329 
         1330                 /* we use an artificially high limit for kernel prints since anything
         1331                  * over the limit gets dropped
         1332                  */
         1333                 if(q->dlen >= 16*1024){
         1334                         iunlock(&q->lk);
         1335                         freeb(b);
         1336                         break;
         1337                 }
         1338 
         1339                 QDEBUG checkb(b, "qiwrite");
         1340                 if(q->bfirst)
         1341                         q->blast->next = b;
         1342                 else
         1343                         q->bfirst = b;
         1344                 q->blast = b;
         1345                 q->len += BALLOC(b);
         1346                 q->dlen += n;
         1347 
         1348                 if(q->state & Qstarve){
         1349                         q->state &= ~Qstarve;
         1350                         dowakeup = 1;
         1351                 }
         1352 
         1353                 iunlock(&q->lk);
         1354 
         1355                 if(dowakeup){
         1356                         if(q->kick)
         1357                                 q->kick(q->arg);
         1358                         wakeup(&q->rr);
         1359                 }
         1360 
         1361                 sofar += n;
         1362         } while(sofar < len && (q->state & Qmsg) == 0);
         1363 
         1364         return sofar;
         1365 }
         1366 
         1367 /*
         1368  *  be extremely careful when calling this,
         1369  *  as there is no reference accounting
         1370  */
         1371 void
         1372 qfree(Queue *q)
         1373 {
         1374         qclose(q);
         1375         free(q);
         1376 }
         1377 
         1378 /*
         1379  *  Mark a queue as closed.  No further IO is permitted.
         1380  *  All blocks are released.
         1381  */
         1382 void
         1383 qclose(Queue *q)
         1384 {
         1385         Block *bfirst;
         1386 
         1387         if(q == nil)
         1388                 return;
         1389 
         1390         /* mark it */
         1391         ilock(&q->lk);
         1392         q->state |= Qclosed;
         1393         q->state &= ~(Qflow|Qstarve);
         1394         strcpy(q->err, Ehungup);
         1395         bfirst = q->bfirst;
         1396         q->bfirst = 0;
         1397         q->len = 0;
         1398         q->dlen = 0;
         1399         q->noblock = 0;
         1400         iunlock(&q->lk);
         1401 
         1402         /* free queued blocks */
         1403         freeblist(bfirst);
         1404 
         1405         /* wake up readers/writers */
         1406         wakeup(&q->rr);
         1407         wakeup(&q->wr);
         1408 }
         1409 
         1410 /*
         1411  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
         1412  *  blocks.
         1413  */
         1414 void
         1415 qhangup(Queue *q, char *msg)
         1416 {
         1417         /* mark it */
         1418         ilock(&q->lk);
         1419         q->state |= Qclosed;
         1420         if(msg == 0 || *msg == 0)
         1421                 strcpy(q->err, Ehungup);
         1422         else
         1423                 strncpy(q->err, msg, ERRMAX-1);
         1424         iunlock(&q->lk);
         1425 
         1426         /* wake up readers/writers */
         1427         wakeup(&q->rr);
         1428         wakeup(&q->wr);
         1429 }
         1430 
         1431 /*
         1432  *  return non-zero if the q is hungup
         1433  */
         1434 int
         1435 qisclosed(Queue *q)
         1436 {
         1437         return q->state & Qclosed;
         1438 }
         1439 
         1440 /*
         1441  *  mark a queue as no longer hung up
         1442  */
         1443 void
         1444 qreopen(Queue *q)
         1445 {
         1446         ilock(&q->lk);
         1447         q->state &= ~Qclosed;
         1448         q->state |= Qstarve;
         1449         q->eof = 0;
         1450         q->limit = q->inilim;
         1451         iunlock(&q->lk);
         1452 }
         1453 
         1454 /*
         1455  *  return bytes queued
         1456  */
         1457 int
         1458 qlen(Queue *q)
         1459 {
         1460         return q->dlen;
         1461 }
         1462 
         1463 /*
         1464  * return space remaining before flow control
         1465  */
         1466 int
         1467 qwindow(Queue *q)
         1468 {
         1469         int l;
         1470 
         1471         l = q->limit - q->len;
         1472         if(l < 0)
         1473                 l = 0;
         1474         return l;
         1475 }
         1476 
         1477 /*
         1478  *  return true if we can read without blocking
         1479  */
         1480 int
         1481 qcanread(Queue *q)
         1482 {
         1483         return q->bfirst!=0;
         1484 }
         1485 
         1486 /*
         1487  *  change queue limit
         1488  */
         1489 void
         1490 qsetlimit(Queue *q, int limit)
         1491 {
         1492         q->limit = limit;
         1493 }
         1494 
         1495 /*
         1496  *  set blocking/nonblocking
         1497  */
         1498 void
         1499 qnoblock(Queue *q, int onoff)
         1500 {
         1501         q->noblock = onoff;
         1502 }
         1503 
         1504 /*
         1505  *  flush the output queue
         1506  */
         1507 void
         1508 qflush(Queue *q)
         1509 {
         1510         Block *bfirst;
         1511 
         1512         /* mark it */
         1513         ilock(&q->lk);
         1514         bfirst = q->bfirst;
         1515         q->bfirst = 0;
         1516         q->len = 0;
         1517         q->dlen = 0;
         1518         iunlock(&q->lk);
         1519 
         1520         /* free queued blocks */
         1521         freeblist(bfirst);
         1522 
         1523         /* wake up readers/writers */
         1524         wakeup(&q->wr);
         1525 }
         1526 
         1527 int
         1528 qfull(Queue *q)
         1529 {
         1530         return q->state & Qflow;
         1531 }
         1532 
         1533 int
         1534 qstate(Queue *q)
         1535 {
         1536         return q->state;
         1537 }