t9pserve: fix races causing assert(c->nmsg == 0) to fail. - plan9port - [fork] Plan 9 from user space
 (HTM) git clone git://src.adamsgaard.dk/plan9port
 (DIR) Log
 (DIR) Files
 (DIR) Refs
 (DIR) README
 (DIR) LICENSE
       ---
 (DIR) commit 841d71b5c6be4851572a60c5a9f9dd239ea69e56
 (DIR) parent 1f61c0914d5f42054b075b6dc5296894de2d25ab
 (HTM) Author: Russ Cox <rsc@swtch.com>
       Date:   Wed, 22 Aug 2007 08:55:08 -0400
       
       9pserve: fix races causing assert(c->nmsg == 0) to fail.
       
       1. Could happen that connoutthread sends c->outq a nil
       just before the regular input handler sends c->outq a real message.
       When the connoutthread gets the nil it will free c->outq,
       leaving the real message unprocessed.
       
       2. Could happen that the outputthread writes a message
       body to the remote 9P server and then a response comes
       in and then the connection gets torn down, all before the
       outputthread manages to call msgput(m).
       Thanks to David Swasey for identifying this scenario.
       
       Also change yield() loop into explicit communication.
       
       Also remove dead code involving hungup queues.
       
       Diffstat:
         M src/cmd/9pserve.c                   |      62 ++++++++++++++++++-------------
       
       1 file changed, 36 insertions(+), 26 deletions(-)
       ---
 (DIR) diff --git a/src/cmd/9pserve.c b/src/cmd/9pserve.c
       t@@ -43,6 +43,7 @@ struct Msg
        {
                Conn *c;
                int internal;
       +        int sync;
                int ref;
                int ctag;
                int tag;
       t@@ -73,6 +74,7 @@ struct Conn
                Hash *fid[NHASH];
                Queue *outq;
                Queue *inq;
       +        Channel *outqdead;
                int dotu;
        };
        
       t@@ -288,6 +290,7 @@ listenthread(void *arg)
                        c->internal = chancreate(sizeof(void*), 0);
                        c->inq = qalloc();
                        c->outq = qalloc();
       +                c->outqdead = chancreate(sizeof(void*), 0);
                        if(verbose) fprint(2, "%T incoming call on %s\n", c->dir);
                        threadcreate(connthread, c, STACK);
                }        
       t@@ -348,7 +351,7 @@ connthread(void *arg)
                int i, fd;
                Conn *c;
                Hash *h, *hnext;
       -        Msg *m, *om, *mm;
       +        Msg *m, *om, *mm, sync;
                Fid *f;
                Ioproc *io;
        
       t@@ -519,15 +522,11 @@ connthread(void *arg)
        
                if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd);
        
       -        /* flush the output queue */
       -        sendq(c->outq, nil);
       -        while(c->outq != nil)
       -                yield();
       -
                /* flush all outstanding messages */
                for(i=0; i<NHASH; i++){
       -                for(h=c->tag[i]; h; h=hnext){
       +                while((h = c->tag[i]) != nil){
                                om = h->v;
       +                        msgincref(om); /* for us */
                                m = msgnew(0);
                                m->internal = 1;
                                m->c = c;
       t@@ -543,12 +542,31 @@ connthread(void *arg)
                                assert(mm == m);
                                msgput(m);        /* got from recvp */
                                msgput(m);        /* got from msgnew */
       -                        msgput(om);        /* got from hash table */
       -                        hnext = h->next;
       -                        free(h);
       +                        if(delhash(c->tag, om->tag, om) == 0)
       +                                msgput(om);        /* got from hash table */
       +                        msgput(om);        /* got from msgincref */
                        }
                }
        
       +        /*
       +         * outputthread has written all its messages
       +         * to the remote connection (because we've gotten all the replies!),
       +         * but it might not have gotten a chance to msgput
       +         * the very last one.  sync up to make sure.
       +         */
       +        memset(&sync, 0, sizeof sync);
       +        sync.sync = 1;
       +        sync.c = c;
       +        sendq(outq, &sync);
       +        recvp(c->outqdead);
       +
       +        /* should be no messages left anywhere. */
       +        assert(c->nmsg == 0);
       +
       +        /* everything is quiet; can close the local output queue. */
       +        sendq(c->outq, nil);
       +        recvp(c->outqdead);
       +
                /* clunk all outstanding fids */
                for(i=0; i<NHASH; i++){
                        for(h=c->fid[i]; h; h=hnext){
       t@@ -765,15 +783,13 @@ connoutthread(void *arg)
                char *ename;
                int err;
                Conn *c;
       -        Queue *outq;
                Msg *m, *om;
                Ioproc *io;
        
                c = arg;
       -        outq = c->outq;
                io = ioproc();
                threadsetname("connout %s", c->dir);
       -        while((m = recvq(outq)) != nil){
       +        while((m = recvq(c->outq)) != nil){
                        err = m->tx.type+1 != m->rx.type;
                        if(!err && m->isopenfd)
                                if(xopenfd(m) < 0)
       t@@ -843,8 +859,9 @@ connoutthread(void *arg)
                                nbsendp(c->inc, 0);
                }
                closeioproc(io);
       -        free(outq);
       +        free(c->outq);
                c->outq = nil;
       +        sendp(c->outqdead, nil);
        }
        
        void
       t@@ -857,6 +874,10 @@ outputthread(void *arg)
                io = ioproc();
                threadsetname("output");
                while((m = recvq(outq)) != nil){
       +                if(m->sync){
       +                        sendp(m->c->outqdead, nil);
       +                        continue;
       +                }
                        if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx);
                        rewritehdr(&m->tx, m->tpkt);
                        if(mwrite9p(io, 1, m->tpkt) < 0)
       t@@ -1148,7 +1169,6 @@ struct Qel
        
        struct Queue
        {
       -        int hungup;
                QLock lk;
                Rendez r;
                Qel *head;
       t@@ -1174,12 +1194,6 @@ sendq(Queue *q, void *p)
        
                e = emalloc(sizeof(Qel));
                qlock(&q->lk);
       -        if(q->hungup){
       -                free(e);
       -                werrstr("hungup queue");
       -                qunlock(&q->lk);
       -                return -1;
       -        }
                e->p = p;
                e->next = nil;
                if(q->head == nil)
       t@@ -1199,12 +1213,8 @@ recvq(Queue *q)
                Qel *e;
        
                qlock(&q->lk);
       -        while(q->head == nil && !q->hungup)
       +        while(q->head == nil)
                        rsleep(&q->r);
       -        if(q->hungup){
       -                qunlock(&q->lk);
       -                return nil;
       -        }
                e = q->head;
                q->head = e->next;
                qunlock(&q->lk);