qio.c (23670B)
1 #include "u.h" 2 #include "lib.h" 3 #include "mem.h" 4 #include "dat.h" 5 #include "fns.h" 6 #include "error.h" 7 8 static ulong padblockcnt; 9 static ulong concatblockcnt; 10 static ulong pullupblockcnt; 11 static ulong copyblockcnt; 12 static ulong consumecnt; 13 static ulong producecnt; 14 static ulong qcopycnt; 15 16 static int debugging; 17 18 #define QDEBUG if(0) 19 20 /* 21 * IO queues 22 */ 23 24 struct Queue 25 { 26 Lock lk; 27 28 Block* bfirst; /* buffer */ 29 Block* blast; 30 31 int len; /* bytes allocated to queue */ 32 int dlen; /* data bytes in queue */ 33 int limit; /* max bytes in queue */ 34 int inilim; /* initial limit */ 35 int state; 36 int noblock; /* true if writes return immediately when q full */ 37 int eof; /* number of eofs read by user */ 38 39 void (*kick)(void*); /* restart output */ 40 void (*bypass)(void*, Block*); /* bypass queue altogether */ 41 void* arg; /* argument to kick */ 42 43 QLock rlock; /* mutex for reading processes */ 44 Rendez rr; /* process waiting to read */ 45 QLock wlock; /* mutex for writing processes */ 46 Rendez wr; /* process waiting to write */ 47 48 char err[ERRMAX]; 49 }; 50 51 enum 52 { 53 Maxatomic = 64*1024, 54 }; 55 56 uint qiomaxatomic = Maxatomic; 57 58 void 59 ixsummary(void) 60 { 61 debugging ^= 1; 62 iallocsummary(); 63 print("pad %lud, concat %lud, pullup %lud, copy %lud\n", 64 padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt); 65 print("consume %lud, produce %lud, qcopy %lud\n", 66 consumecnt, producecnt, qcopycnt); 67 } 68 69 /* 70 * free a list of blocks 71 */ 72 void 73 freeblist(Block *b) 74 { 75 Block *next; 76 77 for(; b != 0; b = next){ 78 next = b->next; 79 if(b->ref == 1) 80 b->next = nil; 81 freeb(b); 82 } 83 } 84 85 /* 86 * pad a block to the front (or the back if size is negative) 87 */ 88 Block* 89 padblock(Block *bp, int size) 90 { 91 int n; 92 Block *nbp; 93 94 QDEBUG checkb(bp, "padblock 1"); 95 if(size >= 0){ 96 if(bp->rp - bp->base >= size){ 97 bp->rp -= size; 98 return bp; 99 } 100 101 if(bp->next) 102 panic("padblock %#p", getcallerpc(&bp)); 103 n = BLEN(bp); 104 padblockcnt++; 105 nbp = allocb(size+n); 106 nbp->rp += size; 107 nbp->wp = nbp->rp; 108 memmove(nbp->wp, bp->rp, n); 109 nbp->wp += n; 110 freeb(bp); 111 nbp->rp -= size; 112 } else { 113 size = -size; 114 115 if(bp->next) 116 panic("padblock %#p", getcallerpc(&bp)); 117 118 if(bp->lim - bp->wp >= size) 119 return bp; 120 121 n = BLEN(bp); 122 padblockcnt++; 123 nbp = allocb(size+n); 124 memmove(nbp->wp, bp->rp, n); 125 nbp->wp += n; 126 freeb(bp); 127 } 128 QDEBUG checkb(nbp, "padblock 1"); 129 return nbp; 130 } 131 132 /* 133 * return count of bytes in a string of blocks 134 */ 135 int 136 blocklen(Block *bp) 137 { 138 int len; 139 140 len = 0; 141 while(bp) { 142 len += BLEN(bp); 143 bp = bp->next; 144 } 145 return len; 146 } 147 148 /* 149 * return count of space in blocks 150 */ 151 int 152 blockalloclen(Block *bp) 153 { 154 int len; 155 156 len = 0; 157 while(bp) { 158 len += BALLOC(bp); 159 bp = bp->next; 160 } 161 return len; 162 } 163 164 /* 165 * copy the string of blocks into 166 * a single block and free the string 167 */ 168 Block* 169 concatblock(Block *bp) 170 { 171 int len; 172 Block *nb, *f; 173 174 if(bp->next == 0) 175 return bp; 176 177 nb = allocb(blocklen(bp)); 178 for(f = bp; f; f = f->next) { 179 len = BLEN(f); 180 memmove(nb->wp, f->rp, len); 181 nb->wp += len; 182 } 183 concatblockcnt += BLEN(nb); 184 freeblist(bp); 185 QDEBUG checkb(nb, "concatblock 1"); 186 return nb; 187 } 188 189 /* 190 * make sure the first block has at least n bytes 191 */ 192 Block* 193 pullupblock(Block *bp, int n) 194 { 195 int i; 196 Block *nbp; 197 198 /* 199 * this should almost always be true, it's 200 * just to avoid every caller checking. 201 */ 202 if(BLEN(bp) >= n) 203 return bp; 204 205 /* 206 * if not enough room in the first block, 207 * add another to the front of the list. 208 */ 209 if(bp->lim - bp->rp < n){ 210 nbp = allocb(n); 211 nbp->next = bp; 212 bp = nbp; 213 } 214 215 /* 216 * copy bytes from the trailing blocks into the first 217 */ 218 n -= BLEN(bp); 219 while((nbp = bp->next)){ 220 i = BLEN(nbp); 221 if(i > n) { 222 memmove(bp->wp, nbp->rp, n); 223 pullupblockcnt++; 224 bp->wp += n; 225 nbp->rp += n; 226 QDEBUG checkb(bp, "pullupblock 1"); 227 return bp; 228 } else { 229 /* shouldn't happen but why crash if it does */ 230 if(i < 0){ 231 print("pullup negative length packet, called from %#p\n", 232 getcallerpc(&bp)); 233 i = 0; 234 } 235 memmove(bp->wp, nbp->rp, i); 236 pullupblockcnt++; 237 bp->wp += i; 238 bp->next = nbp->next; 239 nbp->next = 0; 240 freeb(nbp); 241 n -= i; 242 if(n == 0){ 243 QDEBUG checkb(bp, "pullupblock 2"); 244 return bp; 245 } 246 } 247 } 248 freeb(bp); 249 return 0; 250 } 251 252 /* 253 * make sure the first block has at least n bytes 254 */ 255 Block* 256 pullupqueue(Queue *q, int n) 257 { 258 Block *b; 259 260 if(BLEN(q->bfirst) >= n) 261 return q->bfirst; 262 q->bfirst = pullupblock(q->bfirst, n); 263 for(b = q->bfirst; b != nil && b->next != nil; b = b->next) 264 ; 265 q->blast = b; 266 return q->bfirst; 267 } 268 269 /* 270 * trim to len bytes starting at offset 271 */ 272 Block * 273 trimblock(Block *bp, int offset, int len) 274 { 275 ulong l; 276 Block *nb, *startb; 277 278 QDEBUG checkb(bp, "trimblock 1"); 279 if(blocklen(bp) < offset+len) { 280 freeblist(bp); 281 return nil; 282 } 283 284 while((l = BLEN(bp)) < offset) { 285 offset -= l; 286 nb = bp->next; 287 bp->next = nil; 288 freeb(bp); 289 bp = nb; 290 } 291 292 startb = bp; 293 bp->rp += offset; 294 295 while((l = BLEN(bp)) < len) { 296 len -= l; 297 bp = bp->next; 298 } 299 300 bp->wp -= (BLEN(bp) - len); 301 302 if(bp->next) { 303 freeblist(bp->next); 304 bp->next = nil; 305 } 306 307 return startb; 308 } 309 310 /* 311 * copy 'count' bytes into a new block 312 */ 313 Block* 314 copyblock(Block *bp, int count) 315 { 316 int l; 317 Block *nbp; 318 319 QDEBUG checkb(bp, "copyblock 0"); 320 nbp = allocb(count); 321 for(; count > 0 && bp != 0; bp = bp->next){ 322 l = BLEN(bp); 323 if(l > count) 324 l = count; 325 memmove(nbp->wp, bp->rp, l); 326 nbp->wp += l; 327 count -= l; 328 } 329 if(count > 0){ 330 memset(nbp->wp, 0, count); 331 nbp->wp += count; 332 } 333 copyblockcnt++; 334 QDEBUG checkb(nbp, "copyblock 1"); 335 336 return nbp; 337 } 338 339 Block* 340 adjustblock(Block* bp, int len) 341 { 342 int n; 343 Block *nbp; 344 345 if(len < 0){ 346 freeb(bp); 347 return nil; 348 } 349 350 if(bp->rp+len > bp->lim){ 351 nbp = copyblock(bp, len); 352 freeblist(bp); 353 QDEBUG checkb(nbp, "adjustblock 1"); 354 355 return nbp; 356 } 357 358 n = BLEN(bp); 359 if(len > n) 360 memset(bp->wp, 0, len-n); 361 bp->wp = bp->rp+len; 362 QDEBUG checkb(bp, "adjustblock 2"); 363 364 return bp; 365 } 366 367 368 /* 369 * throw away up to count bytes from a 370 * list of blocks. Return count of bytes 371 * thrown away. 372 */ 373 int 374 pullblock(Block **bph, int count) 375 { 376 Block *bp; 377 int n, bytes; 378 379 bytes = 0; 380 if(bph == nil) 381 return 0; 382 383 while(*bph != nil && count != 0) { 384 bp = *bph; 385 n = BLEN(bp); 386 if(count < n) 387 n = count; 388 bytes += n; 389 count -= n; 390 bp->rp += n; 391 QDEBUG checkb(bp, "pullblock "); 392 if(BLEN(bp) == 0) { 393 *bph = bp->next; 394 bp->next = nil; 395 freeb(bp); 396 } 397 } 398 return bytes; 399 } 400 401 /* 402 * get next block from a queue, return null if nothing there 403 */ 404 Block* 405 qget(Queue *q) 406 { 407 int dowakeup; 408 Block *b; 409 410 /* sync with qwrite */ 411 ilock(&q->lk); 412 413 b = q->bfirst; 414 if(b == nil){ 415 q->state |= Qstarve; 416 iunlock(&q->lk); 417 return nil; 418 } 419 q->bfirst = b->next; 420 b->next = 0; 421 q->len -= BALLOC(b); 422 q->dlen -= BLEN(b); 423 QDEBUG checkb(b, "qget"); 424 425 /* if writer flow controlled, restart */ 426 if((q->state & Qflow) && q->len < q->limit/2){ 427 q->state &= ~Qflow; 428 dowakeup = 1; 429 } else 430 dowakeup = 0; 431 432 iunlock(&q->lk); 433 434 if(dowakeup) 435 wakeup(&q->wr); 436 437 return b; 438 } 439 440 /* 441 * throw away the next 'len' bytes in the queue 442 */ 443 int 444 qdiscard(Queue *q, int len) 445 { 446 Block *b; 447 int dowakeup, n, sofar; 448 449 ilock(&q->lk); 450 for(sofar = 0; sofar < len; sofar += n){ 451 b = q->bfirst; 452 if(b == nil) 453 break; 454 QDEBUG checkb(b, "qdiscard"); 455 n = BLEN(b); 456 if(n <= len - sofar){ 457 q->bfirst = b->next; 458 b->next = 0; 459 q->len -= BALLOC(b); 460 q->dlen -= BLEN(b); 461 freeb(b); 462 } else { 463 n = len - sofar; 464 b->rp += n; 465 q->dlen -= n; 466 } 467 } 468 469 /* 470 * if writer flow controlled, restart 471 * 472 * This used to be 473 * q->len < q->limit/2 474 * but it slows down tcp too much for certain write sizes. 475 * I really don't understand it completely. It may be 476 * due to the queue draining so fast that the transmission 477 * stalls waiting for the app to produce more data. - presotto 478 */ 479 if((q->state & Qflow) && q->len < q->limit){ 480 q->state &= ~Qflow; 481 dowakeup = 1; 482 } else 483 dowakeup = 0; 484 485 iunlock(&q->lk); 486 487 if(dowakeup) 488 wakeup(&q->wr); 489 490 return sofar; 491 } 492 493 /* 494 * Interrupt level copy out of a queue, return # bytes copied. 495 */ 496 int 497 qconsume(Queue *q, void *vp, int len) 498 { 499 Block *b; 500 int n, dowakeup; 501 uchar *p = vp; 502 Block *tofree = nil; 503 504 /* sync with qwrite */ 505 ilock(&q->lk); 506 507 for(;;) { 508 b = q->bfirst; 509 if(b == 0){ 510 q->state |= Qstarve; 511 iunlock(&q->lk); 512 return -1; 513 } 514 QDEBUG checkb(b, "qconsume 1"); 515 516 n = BLEN(b); 517 if(n > 0) 518 break; 519 q->bfirst = b->next; 520 q->len -= BALLOC(b); 521 522 /* remember to free this */ 523 b->next = tofree; 524 tofree = b; 525 }; 526 527 if(n < len) 528 len = n; 529 memmove(p, b->rp, len); 530 consumecnt += n; 531 b->rp += len; 532 q->dlen -= len; 533 534 /* discard the block if we're done with it */ 535 if((q->state & Qmsg) || len == n){ 536 q->bfirst = b->next; 537 b->next = 0; 538 q->len -= BALLOC(b); 539 q->dlen -= BLEN(b); 540 541 /* remember to free this */ 542 b->next = tofree; 543 tofree = b; 544 } 545 546 /* if writer flow controlled, restart */ 547 if((q->state & Qflow) && q->len < q->limit/2){ 548 q->state &= ~Qflow; 549 dowakeup = 1; 550 } else 551 dowakeup = 0; 552 553 iunlock(&q->lk); 554 555 if(dowakeup) 556 wakeup(&q->wr); 557 558 if(tofree != nil) 559 freeblist(tofree); 560 561 return len; 562 } 563 564 int 565 qpass(Queue *q, Block *b) 566 { 567 int dlen, len, dowakeup; 568 569 /* sync with qread */ 570 dowakeup = 0; 571 ilock(&q->lk); 572 if(q->len >= q->limit){ 573 freeblist(b); 574 iunlock(&q->lk); 575 return -1; 576 } 577 if(q->state & Qclosed){ 578 len = BALLOC(b); 579 freeblist(b); 580 iunlock(&q->lk); 581 return len; 582 } 583 584 /* add buffer to queue */ 585 if(q->bfirst) 586 q->blast->next = b; 587 else 588 q->bfirst = b; 589 len = BALLOC(b); 590 dlen = BLEN(b); 591 QDEBUG checkb(b, "qpass"); 592 while(b->next){ 593 b = b->next; 594 QDEBUG checkb(b, "qpass"); 595 len += BALLOC(b); 596 dlen += BLEN(b); 597 } 598 q->blast = b; 599 q->len += len; 600 q->dlen += dlen; 601 602 if(q->len >= q->limit/2) 603 q->state |= Qflow; 604 605 if(q->state & Qstarve){ 606 q->state &= ~Qstarve; 607 dowakeup = 1; 608 } 609 iunlock(&q->lk); 610 611 if(dowakeup) 612 wakeup(&q->rr); 613 614 return len; 615 } 616 617 int 618 qpassnolim(Queue *q, Block *b) 619 { 620 int dlen, len, dowakeup; 621 622 /* sync with qread */ 623 dowakeup = 0; 624 ilock(&q->lk); 625 626 if(q->state & Qclosed){ 627 freeblist(b); 628 iunlock(&q->lk); 629 return BALLOC(b); 630 } 631 632 /* add buffer to queue */ 633 if(q->bfirst) 634 q->blast->next = b; 635 else 636 q->bfirst = b; 637 len = BALLOC(b); 638 dlen = BLEN(b); 639 QDEBUG checkb(b, "qpass"); 640 while(b->next){ 641 b = b->next; 642 QDEBUG checkb(b, "qpass"); 643 len += BALLOC(b); 644 dlen += BLEN(b); 645 } 646 q->blast = b; 647 q->len += len; 648 q->dlen += dlen; 649 650 if(q->len >= q->limit/2) 651 q->state |= Qflow; 652 653 if(q->state & Qstarve){ 654 q->state &= ~Qstarve; 655 dowakeup = 1; 656 } 657 iunlock(&q->lk); 658 659 if(dowakeup) 660 wakeup(&q->rr); 661 662 return len; 663 } 664 665 /* 666 * if the allocated space is way out of line with the used 667 * space, reallocate to a smaller block 668 */ 669 Block* 670 packblock(Block *bp) 671 { 672 Block **l, *nbp; 673 int n; 674 675 for(l = &bp; *l; l = &(*l)->next){ 676 nbp = *l; 677 n = BLEN(nbp); 678 if((n<<2) < BALLOC(nbp)){ 679 *l = allocb(n); 680 memmove((*l)->wp, nbp->rp, n); 681 (*l)->wp += n; 682 (*l)->next = nbp->next; 683 freeb(nbp); 684 } 685 } 686 687 return bp; 688 } 689 690 int 691 qproduce(Queue *q, void *vp, int len) 692 { 693 Block *b; 694 int dowakeup; 695 uchar *p = vp; 696 697 /* sync with qread */ 698 dowakeup = 0; 699 ilock(&q->lk); 700 701 /* no waiting receivers, room in buffer? */ 702 if(q->len >= q->limit){ 703 q->state |= Qflow; 704 iunlock(&q->lk); 705 return -1; 706 } 707 708 /* save in buffer */ 709 b = iallocb(len); 710 if(b == 0){ 711 iunlock(&q->lk); 712 return 0; 713 } 714 memmove(b->wp, p, len); 715 producecnt += len; 716 b->wp += len; 717 if(q->bfirst) 718 q->blast->next = b; 719 else 720 q->bfirst = b; 721 q->blast = b; 722 /* b->next = 0; done by iallocb() */ 723 q->len += BALLOC(b); 724 q->dlen += BLEN(b); 725 QDEBUG checkb(b, "qproduce"); 726 727 if(q->state & Qstarve){ 728 q->state &= ~Qstarve; 729 dowakeup = 1; 730 } 731 732 if(q->len >= q->limit) 733 q->state |= Qflow; 734 iunlock(&q->lk); 735 736 if(dowakeup) 737 wakeup(&q->rr); 738 739 return len; 740 } 741 742 /* 743 * copy from offset in the queue 744 */ 745 Block* 746 qcopy(Queue *q, int len, ulong offset) 747 { 748 int sofar; 749 int n; 750 Block *b, *nb; 751 uchar *p; 752 753 nb = allocb(len); 754 755 ilock(&q->lk); 756 757 /* go to offset */ 758 b = q->bfirst; 759 for(sofar = 0; ; sofar += n){ 760 if(b == nil){ 761 iunlock(&q->lk); 762 return nb; 763 } 764 n = BLEN(b); 765 if(sofar + n > offset){ 766 p = b->rp + offset - sofar; 767 n -= offset - sofar; 768 break; 769 } 770 QDEBUG checkb(b, "qcopy"); 771 b = b->next; 772 } 773 774 /* copy bytes from there */ 775 for(sofar = 0; sofar < len;){ 776 if(n > len - sofar) 777 n = len - sofar; 778 memmove(nb->wp, p, n); 779 qcopycnt += n; 780 sofar += n; 781 nb->wp += n; 782 b = b->next; 783 if(b == nil) 784 break; 785 n = BLEN(b); 786 p = b->rp; 787 } 788 iunlock(&q->lk); 789 790 return nb; 791 } 792 793 /* 794 * called by non-interrupt code 795 */ 796 Queue* 797 qopen(int limit, int msg, void (*kick)(void*), void *arg) 798 { 799 Queue *q; 800 801 q = malloc(sizeof(Queue)); 802 if(q == 0) 803 return 0; 804 805 q->limit = q->inilim = limit; 806 q->kick = kick; 807 q->arg = arg; 808 q->state = msg; 809 810 q->state |= Qstarve; 811 q->eof = 0; 812 q->noblock = 0; 813 814 return q; 815 } 816 817 /* open a queue to be bypassed */ 818 Queue* 819 qbypass(void (*bypass)(void*, Block*), void *arg) 820 { 821 Queue *q; 822 823 q = malloc(sizeof(Queue)); 824 if(q == 0) 825 return 0; 826 827 q->limit = 0; 828 q->arg = arg; 829 q->bypass = bypass; 830 q->state = 0; 831 832 return q; 833 } 834 835 static int 836 notempty(void *a) 837 { 838 Queue *q = a; 839 840 return (q->state & Qclosed) || q->bfirst != 0; 841 } 842 843 /* 844 * wait for the queue to be non-empty or closed. 845 * called with q ilocked. 846 */ 847 static int 848 qwait(Queue *q) 849 { 850 /* wait for data */ 851 for(;;){ 852 if(q->bfirst != nil) 853 break; 854 855 if(q->state & Qclosed){ 856 if(++q->eof > 3) 857 return -1; 858 if(*q->err && strcmp(q->err, Ehungup) != 0) 859 return -1; 860 return 0; 861 } 862 863 q->state |= Qstarve; /* flag requesting producer to wake me */ 864 iunlock(&q->lk); 865 sleep(&q->rr, notempty, q); 866 ilock(&q->lk); 867 } 868 return 1; 869 } 870 871 /* 872 * add a block list to a queue 873 */ 874 void 875 qaddlist(Queue *q, Block *b) 876 { 877 /* queue the block */ 878 if(q->bfirst) 879 q->blast->next = b; 880 else 881 q->bfirst = b; 882 q->len += blockalloclen(b); 883 q->dlen += blocklen(b); 884 while(b->next) 885 b = b->next; 886 q->blast = b; 887 } 888 889 /* 890 * called with q ilocked 891 */ 892 Block* 893 qremove(Queue *q) 894 { 895 Block *b; 896 897 b = q->bfirst; 898 if(b == nil) 899 return nil; 900 q->bfirst = b->next; 901 b->next = nil; 902 q->dlen -= BLEN(b); 903 q->len -= BALLOC(b); 904 QDEBUG checkb(b, "qremove"); 905 return b; 906 } 907 908 /* 909 * copy the contents of a string of blocks into 910 * memory. emptied blocks are freed. return 911 * pointer to first unconsumed block. 912 */ 913 Block* 914 bl2mem(uchar *p, Block *b, int n) 915 { 916 int i; 917 Block *next; 918 919 for(; b != nil; b = next){ 920 i = BLEN(b); 921 if(i > n){ 922 memmove(p, b->rp, n); 923 b->rp += n; 924 return b; 925 } 926 memmove(p, b->rp, i); 927 n -= i; 928 p += i; 929 b->rp += i; 930 next = b->next; 931 freeb(b); 932 } 933 return nil; 934 } 935 936 /* 937 * copy the contents of memory into a string of blocks. 938 * return nil on error. 939 */ 940 Block* 941 mem2bl(uchar *p, int len) 942 { 943 int n; 944 Block *b, *first, **l; 945 946 first = nil; 947 l = &first; 948 if(waserror()){ 949 freeblist(first); 950 nexterror(); 951 } 952 do { 953 n = len; 954 if(n > Maxatomic) 955 n = Maxatomic; 956 957 *l = b = allocb(n); 958 memmove(b->wp, p, n); 959 b->wp += n; 960 p += n; 961 len -= n; 962 l = &b->next; 963 } while(len > 0); 964 poperror(); 965 966 return first; 967 } 968 969 /* 970 * put a block back to the front of the queue 971 * called with q ilocked 972 */ 973 void 974 qputback(Queue *q, Block *b) 975 { 976 b->next = q->bfirst; 977 if(q->bfirst == nil) 978 q->blast = b; 979 q->bfirst = b; 980 q->len += BALLOC(b); 981 q->dlen += BLEN(b); 982 } 983 984 /* 985 * flow control, get producer going again 986 * called with q ilocked 987 */ 988 static void 989 qwakeup_iunlock(Queue *q) 990 { 991 int dowakeup = 0; 992 993 /* if writer flow controlled, restart */ 994 if((q->state & Qflow) && q->len < q->limit/2){ 995 q->state &= ~Qflow; 996 dowakeup = 1; 997 } 998 999 iunlock(&q->lk); 1000 1001 /* wakeup flow controlled writers */ 1002 if(dowakeup){ 1003 if(q->kick) 1004 q->kick(q->arg); 1005 wakeup(&q->wr); 1006 } 1007 } 1008 1009 /* 1010 * get next block from a queue (up to a limit) 1011 */ 1012 Block* 1013 qbread(Queue *q, int len) 1014 { 1015 Block *b, *nb; 1016 int n; 1017 1018 qlock(&q->rlock); 1019 if(waserror()){ 1020 qunlock(&q->rlock); 1021 nexterror(); 1022 } 1023 1024 ilock(&q->lk); 1025 switch(qwait(q)){ 1026 case 0: 1027 /* queue closed */ 1028 iunlock(&q->lk); 1029 qunlock(&q->rlock); 1030 poperror(); 1031 return nil; 1032 case -1: 1033 /* multiple reads on a closed queue */ 1034 iunlock(&q->lk); 1035 error(q->err); 1036 } 1037 1038 /* if we get here, there's at least one block in the queue */ 1039 b = qremove(q); 1040 n = BLEN(b); 1041 1042 /* split block if it's too big and this is not a message queue */ 1043 nb = b; 1044 if(n > len){ 1045 if((q->state&Qmsg) == 0){ 1046 n -= len; 1047 b = allocb(n); 1048 memmove(b->wp, nb->rp+len, n); 1049 b->wp += n; 1050 qputback(q, b); 1051 } 1052 nb->wp = nb->rp + len; 1053 } 1054 1055 /* restart producer */ 1056 qwakeup_iunlock(q); 1057 1058 poperror(); 1059 qunlock(&q->rlock); 1060 return nb; 1061 } 1062 1063 /* 1064 * read a queue. if no data is queued, post a Block 1065 * and wait on its Rendez. 1066 */ 1067 long 1068 qread(Queue *q, void *vp, int len) 1069 { 1070 Block *b, *first, **l; 1071 int m, n; 1072 1073 qlock(&q->rlock); 1074 if(waserror()){ 1075 qunlock(&q->rlock); 1076 nexterror(); 1077 } 1078 1079 ilock(&q->lk); 1080 again: 1081 switch(qwait(q)){ 1082 case 0: 1083 /* queue closed */ 1084 iunlock(&q->lk); 1085 qunlock(&q->rlock); 1086 poperror(); 1087 return 0; 1088 case -1: 1089 /* multiple reads on a closed queue */ 1090 iunlock(&q->lk); 1091 error(q->err); 1092 } 1093 1094 /* if we get here, there's at least one block in the queue */ 1095 if(q->state & Qcoalesce){ 1096 /* when coalescing, 0 length blocks just go away */ 1097 b = q->bfirst; 1098 if(BLEN(b) <= 0){ 1099 freeb(qremove(q)); 1100 goto again; 1101 } 1102 1103 /* grab the first block plus as many 1104 * following blocks as will completely 1105 * fit in the read. 1106 */ 1107 n = 0; 1108 l = &first; 1109 m = BLEN(b); 1110 for(;;) { 1111 *l = qremove(q); 1112 l = &b->next; 1113 n += m; 1114 1115 b = q->bfirst; 1116 if(b == nil) 1117 break; 1118 m = BLEN(b); 1119 if(n+m > len) 1120 break; 1121 } 1122 } else { 1123 first = qremove(q); 1124 n = BLEN(first); 1125 } 1126 1127 /* copy to user space outside of the ilock */ 1128 iunlock(&q->lk); 1129 b = bl2mem(vp, first, len); 1130 ilock(&q->lk); 1131 1132 /* take care of any left over partial block */ 1133 if(b != nil){ 1134 n -= BLEN(b); 1135 if(q->state & Qmsg) 1136 freeb(b); 1137 else 1138 qputback(q, b); 1139 } 1140 1141 /* restart producer */ 1142 qwakeup_iunlock(q); 1143 1144 poperror(); 1145 qunlock(&q->rlock); 1146 return n; 1147 } 1148 1149 static int 1150 qnotfull(void *a) 1151 { 1152 Queue *q = a; 1153 1154 return q->len < q->limit || (q->state & Qclosed); 1155 } 1156 1157 ulong noblockcnt; 1158 1159 /* 1160 * add a block to a queue obeying flow control 1161 */ 1162 long 1163 qbwrite(Queue *q, Block *b) 1164 { 1165 int n, dowakeup; 1166 Proc *p; 1167 1168 n = BLEN(b); 1169 1170 if(q->bypass){ 1171 (*q->bypass)(q->arg, b); 1172 return n; 1173 } 1174 1175 dowakeup = 0; 1176 qlock(&q->wlock); 1177 if(waserror()){ 1178 if(b != nil) 1179 freeb(b); 1180 qunlock(&q->wlock); 1181 nexterror(); 1182 } 1183 1184 ilock(&q->lk); 1185 1186 /* give up if the queue is closed */ 1187 if(q->state & Qclosed){ 1188 iunlock(&q->lk); 1189 error(q->err); 1190 } 1191 1192 /* if nonblocking, don't queue over the limit */ 1193 if(q->len >= q->limit){ 1194 if(q->noblock){ 1195 iunlock(&q->lk); 1196 freeb(b); 1197 noblockcnt += n; 1198 qunlock(&q->wlock); 1199 poperror(); 1200 return n; 1201 } 1202 } 1203 1204 /* queue the block */ 1205 if(q->bfirst) 1206 q->blast->next = b; 1207 else 1208 q->bfirst = b; 1209 q->blast = b; 1210 b->next = 0; 1211 q->len += BALLOC(b); 1212 q->dlen += n; 1213 QDEBUG checkb(b, "qbwrite"); 1214 b = nil; 1215 1216 /* make sure other end gets awakened */ 1217 if(q->state & Qstarve){ 1218 q->state &= ~Qstarve; 1219 dowakeup = 1; 1220 } 1221 iunlock(&q->lk); 1222 1223 /* get output going again */ 1224 if(q->kick && (dowakeup || (q->state&Qkick))) 1225 q->kick(q->arg); 1226 1227 /* wakeup anyone consuming at the other end */ 1228 if(dowakeup){ 1229 p = wakeup(&q->rr); 1230 1231 /* if we just wokeup a higher priority process, let it run */ 1232 if(p != nil && p->priority > up->priority) 1233 sched(); 1234 } 1235 1236 /* 1237 * flow control, wait for queue to get below the limit 1238 * before allowing the process to continue and queue 1239 * more. We do this here so that postnote can only 1240 * interrupt us after the data has been queued. This 1241 * means that things like 9p flushes and ssl messages 1242 * will not be disrupted by software interrupts. 1243 * 1244 * Note - this is moderately dangerous since a process 1245 * that keeps getting interrupted and rewriting will 1246 * queue infinite crud. 1247 */ 1248 for(;;){ 1249 if(q->noblock || qnotfull(q)) 1250 break; 1251 1252 ilock(&q->lk); 1253 q->state |= Qflow; 1254 iunlock(&q->lk); 1255 sleep(&q->wr, qnotfull, q); 1256 } 1257 USED(b); 1258 1259 qunlock(&q->wlock); 1260 poperror(); 1261 return n; 1262 } 1263 1264 /* 1265 * write to a queue. only Maxatomic bytes at a time is atomic. 1266 */ 1267 int 1268 qwrite(Queue *q, void *vp, int len) 1269 { 1270 int n, sofar; 1271 Block *b; 1272 uchar *p = vp; 1273 1274 QDEBUG if(!islo()) 1275 print("qwrite hi %#p\n", getcallerpc(&q)); 1276 1277 sofar = 0; 1278 do { 1279 n = len-sofar; 1280 if(n > Maxatomic) 1281 n = Maxatomic; 1282 1283 b = allocb(n); 1284 if(waserror()){ 1285 freeb(b); 1286 nexterror(); 1287 } 1288 memmove(b->wp, p+sofar, n); 1289 poperror(); 1290 b->wp += n; 1291 1292 qbwrite(q, b); 1293 1294 sofar += n; 1295 } while(sofar < len && (q->state & Qmsg) == 0); 1296 1297 return len; 1298 } 1299 1300 /* 1301 * used by print() to write to a queue. Since we may be splhi or not in 1302 * a process, don't qlock. 1303 * 1304 * this routine merges adjacent blocks if block n+1 will fit into 1305 * the free space of block n. 1306 */ 1307 int 1308 qiwrite(Queue *q, void *vp, int len) 1309 { 1310 int n, sofar, dowakeup; 1311 Block *b; 1312 uchar *p = vp; 1313 1314 dowakeup = 0; 1315 1316 sofar = 0; 1317 do { 1318 n = len-sofar; 1319 if(n > Maxatomic) 1320 n = Maxatomic; 1321 1322 b = iallocb(n); 1323 if(b == nil) 1324 break; 1325 memmove(b->wp, p+sofar, n); 1326 b->wp += n; 1327 1328 ilock(&q->lk); 1329 1330 /* we use an artificially high limit for kernel prints since anything 1331 * over the limit gets dropped 1332 */ 1333 if(q->dlen >= 16*1024){ 1334 iunlock(&q->lk); 1335 freeb(b); 1336 break; 1337 } 1338 1339 QDEBUG checkb(b, "qiwrite"); 1340 if(q->bfirst) 1341 q->blast->next = b; 1342 else 1343 q->bfirst = b; 1344 q->blast = b; 1345 q->len += BALLOC(b); 1346 q->dlen += n; 1347 1348 if(q->state & Qstarve){ 1349 q->state &= ~Qstarve; 1350 dowakeup = 1; 1351 } 1352 1353 iunlock(&q->lk); 1354 1355 if(dowakeup){ 1356 if(q->kick) 1357 q->kick(q->arg); 1358 wakeup(&q->rr); 1359 } 1360 1361 sofar += n; 1362 } while(sofar < len && (q->state & Qmsg) == 0); 1363 1364 return sofar; 1365 } 1366 1367 /* 1368 * be extremely careful when calling this, 1369 * as there is no reference accounting 1370 */ 1371 void 1372 qfree(Queue *q) 1373 { 1374 qclose(q); 1375 free(q); 1376 } 1377 1378 /* 1379 * Mark a queue as closed. No further IO is permitted. 1380 * All blocks are released. 1381 */ 1382 void 1383 qclose(Queue *q) 1384 { 1385 Block *bfirst; 1386 1387 if(q == nil) 1388 return; 1389 1390 /* mark it */ 1391 ilock(&q->lk); 1392 q->state |= Qclosed; 1393 q->state &= ~(Qflow|Qstarve); 1394 strcpy(q->err, Ehungup); 1395 bfirst = q->bfirst; 1396 q->bfirst = 0; 1397 q->len = 0; 1398 q->dlen = 0; 1399 q->noblock = 0; 1400 iunlock(&q->lk); 1401 1402 /* free queued blocks */ 1403 freeblist(bfirst); 1404 1405 /* wake up readers/writers */ 1406 wakeup(&q->rr); 1407 wakeup(&q->wr); 1408 } 1409 1410 /* 1411 * Mark a queue as closed. Wakeup any readers. Don't remove queued 1412 * blocks. 1413 */ 1414 void 1415 qhangup(Queue *q, char *msg) 1416 { 1417 /* mark it */ 1418 ilock(&q->lk); 1419 q->state |= Qclosed; 1420 if(msg == 0 || *msg == 0) 1421 strcpy(q->err, Ehungup); 1422 else 1423 strncpy(q->err, msg, ERRMAX-1); 1424 iunlock(&q->lk); 1425 1426 /* wake up readers/writers */ 1427 wakeup(&q->rr); 1428 wakeup(&q->wr); 1429 } 1430 1431 /* 1432 * return non-zero if the q is hungup 1433 */ 1434 int 1435 qisclosed(Queue *q) 1436 { 1437 return q->state & Qclosed; 1438 } 1439 1440 /* 1441 * mark a queue as no longer hung up 1442 */ 1443 void 1444 qreopen(Queue *q) 1445 { 1446 ilock(&q->lk); 1447 q->state &= ~Qclosed; 1448 q->state |= Qstarve; 1449 q->eof = 0; 1450 q->limit = q->inilim; 1451 iunlock(&q->lk); 1452 } 1453 1454 /* 1455 * return bytes queued 1456 */ 1457 int 1458 qlen(Queue *q) 1459 { 1460 return q->dlen; 1461 } 1462 1463 /* 1464 * return space remaining before flow control 1465 */ 1466 int 1467 qwindow(Queue *q) 1468 { 1469 int l; 1470 1471 l = q->limit - q->len; 1472 if(l < 0) 1473 l = 0; 1474 return l; 1475 } 1476 1477 /* 1478 * return true if we can read without blocking 1479 */ 1480 int 1481 qcanread(Queue *q) 1482 { 1483 return q->bfirst!=0; 1484 } 1485 1486 /* 1487 * change queue limit 1488 */ 1489 void 1490 qsetlimit(Queue *q, int limit) 1491 { 1492 q->limit = limit; 1493 } 1494 1495 /* 1496 * set blocking/nonblocking 1497 */ 1498 void 1499 qnoblock(Queue *q, int onoff) 1500 { 1501 q->noblock = onoff; 1502 } 1503 1504 /* 1505 * flush the output queue 1506 */ 1507 void 1508 qflush(Queue *q) 1509 { 1510 Block *bfirst; 1511 1512 /* mark it */ 1513 ilock(&q->lk); 1514 bfirst = q->bfirst; 1515 q->bfirst = 0; 1516 q->len = 0; 1517 q->dlen = 0; 1518 iunlock(&q->lk); 1519 1520 /* free queued blocks */ 1521 freeblist(bfirst); 1522 1523 /* wake up readers/writers */ 1524 wakeup(&q->wr); 1525 } 1526 1527 int 1528 qfull(Queue *q) 1529 { 1530 return q->state & Qflow; 1531 } 1532 1533 int 1534 qstate(Queue *q) 1535 { 1536 return q->state; 1537 }
