tAdd binary fraction tree index. The old index code is still supported too.  Buildindex and checkindex need to be revisited, though they should be easy to adapt. - plan9port - [fork] Plan 9 from user space
 (HTM) git clone git://src.adamsgaard.dk/plan9port
 (DIR) Log
 (DIR) Files
 (DIR) Refs
 (DIR) README
 (DIR) LICENSE
       ---
 (DIR) commit 333c1dccc2f9af67b9c3d8513cca492d022fab4f
 (DIR) parent 9ffbb5adcaeec878d3b6db0f8b1f654e839b4689
 (HTM) Author: rsc <devnull@localhost>
       Date:   Sat, 13 Mar 2004 04:35:13 +0000
       
       Add binary fraction tree index.
       The old index code is still
       supported too.  Buildindex and
       checkindex need to be revisited,
       tthough they should be easy to adapt.
       
       Diffstat:
         M src/cmd/venti/arena.c               |      24 ++++++++++++++++--------
         M src/cmd/venti/clump.c               |       2 +-
         M src/cmd/venti/dat.h                 |       6 ++++++
         M src/cmd/venti/dcache.c              |      58 +++++++++++++++++++++++++------
         M src/cmd/venti/fmtindex.c            |       5 ++---
         M src/cmd/venti/httpd.c               |       6 ++++--
         M src/cmd/venti/index.c               |     175 +++++++++++++++++++++++++++----
         M src/cmd/venti/lumpqueue.c           |       3 +++
         M src/cmd/venti/part.c                |       2 +-
         M src/cmd/venti/stats.c               |       1 +
         M src/cmd/venti/syncarena.c           |       1 +
       
       11 files changed, 239 insertions(+), 44 deletions(-)
       ---
 (DIR) diff --git a/src/cmd/venti/arena.c b/src/cmd/venti/arena.c
       t@@ -468,18 +468,16 @@ ReadErr:
        int
        wbarena(Arena *arena)
        {
       -        ZBlock *b;
       +        DBlock *b;
                int bad;
        
       -        b = alloczblock(arena->blocksize, 1);
       -        if(b == nil){
       +        if((b = getdblock(arena->part, arena->base + arena->size, 0)) == nil){
                        logerr(EAdmin, "can't write arena trailer: %r");
       -///ZZZ add error message?
                        return -1;
                }
       -        bad = okarena(arena)<0 || packarena(arena, b->data)<0 || 
       -                writepart(arena->part, arena->base + arena->size, b->data, arena->blocksize)<0;
       -        freezblock(b);
       +        dirtydblock(b, DirtyArenaTrailer);
       +        bad = okarena(arena)<0 || packarena(arena, b->data)<0;
       +        putdblock(b);
                if(bad)
                        return -1;
                return 0;
       t@@ -502,6 +500,10 @@ wbarenahead(Arena *arena)
        ///ZZZ add error message?
                        return -1;
                }
       +        /*
       +         * this writepart is okay because it only happens
       +         * during initialization.
       +         */
                bad = packarenahead(&head, b->data)<0 ||
                      writepart(arena->part, arena->base - arena->blocksize, b->data, arena->blocksize)<0;
                freezblock(b);
       t@@ -582,6 +584,7 @@ okarena(Arena *arena)
        static CIBlock*
        getcib(Arena *arena, int clump, int writing, CIBlock *rock)
        {
       +        int read;
                CIBlock *cib;
                u32int block, off;
        
       t@@ -613,7 +616,12 @@ getcib(Arena *arena, int clump, int writing, CIBlock *rock)
        
                cib->block = block;
                cib->offset = off;
       -        cib->data = getdblock(arena->part, arena->base + arena->size - (block + 1) * arena->blocksize, arena->blocksize);
       +
       +        read = 1;
       +        if(writing && off == 0 && clump == arena->clumps-1)
       +                read = 0;
       +
       +        cib->data = getdblock(arena->part, arena->base + arena->size - (block + 1) * arena->blocksize, read);
                if(cib->data == nil)
                        return nil;
                return cib;
 (DIR) diff --git a/src/cmd/venti/clump.c b/src/cmd/venti/clump.c
       t@@ -46,7 +46,7 @@ if(0)print("storeclump %08x %p\n", mainindex->arenas[0], &cl);
                scorecp(cl.info.score, sc);
        
        if(0)print("whackblock %08x %p\n", mainindex->arenas[0], &cl);
       -        dsize = whackblock(&cb->data[ClumpSize], zb->data, size);
       +        dsize=0; // dsize = whackblock(&cb->data[ClumpSize], zb->data, size);
        if(0)print("whackedblock %08x %p\n", mainindex->arenas[0], &cl);
                if(dsize > 0 && dsize < size){
                        cl.encoding = ClumpECompress;
 (DIR) diff --git a/src/cmd/venti/dat.h b/src/cmd/venti/dat.h
       t@@ -124,6 +124,7 @@ enum
                DirtyIndex,
                DirtyIndexBitmap,
                DirtyArenaCib,
       +        DirtyArenaTrailer,
                DirtyMax,
        
                VentiZZZZZZZZ
       t@@ -355,6 +356,10 @@ struct Clump
         *        <field name="tabsize" val="s->tabsize" type="U32int"/>
         *        <field name="buckets" val="s->buckets" type="U32int"/>
         *        <field name="buckdiv" val="s->div" type="U32int"/>
       + *        <field name="bitblocks" val="s->div" type="U32int"/>
       + *        <field name="maxdepth" val="s->div" type="U32int"/>
       + *        <field name="bitkeylog" val="s->div" type="U32int"/>
       + *        <field name="bitkeymask" val="s->div" type="U32int"/>
         *        <array name="sect" val="&s->smap[i]" elems="s->nsects" type="Amap"/>
         *        <array name="amap" val="&s->amap[i]" elems="s->narenas" type="Amap"/>
         *        <array name="arena" val="s->arenas[i]" elems="s->narenas" type="Arena"/>
       t@@ -495,6 +500,7 @@ struct Stats
                long                indexreads;                /* index from disk */
                long                indexwreads;                /* for writing a new entry */
                long                indexareads;                /* for allocating an overflow block */
       +        long                indexsplits;                /* index block splits */
                long                diskwrites;                /* total disk writes */
                long                diskreads;                /* total disk reads */
                vlong                diskbwrites;                /* total disk bytes written */
 (DIR) diff --git a/src/cmd/venti/dcache.c b/src/cmd/venti/dcache.c
       t@@ -230,10 +230,38 @@ dirtydblock(DBlock *b, int dirty)
                int odirty;
                Part *p;
        
       -        rlock(&dcache.dirtylock);
       +
                assert(b->ref != 0);
       -        assert(b->dirtying == 0);
       -        b->dirtying = 1;
       +
       +        /*
       +          * Because we use an rlock to keep track of how
       +         * many blocks are being dirtied (and a wlock to
       +         * stop dirtying), we cannot try to dirty two blocks
       +         * at the same time in the same thread -- if the 
       +         * wlock happens between them, we get a deadlock.
       +         * 
       +         * The only place in the code where we need to
       +         * dirty multiple blocks at once is in splitiblock, when
       +         * we split an index block.  The new block has a dirty
       +         * flag of DirtyIndexSplit already, because it has to get
       +         * written to disk before the old block (DirtyIndex).
       +         * So when we see the DirtyIndexSplit block, we don't
       +         * acquire the rlock (and we don't set dirtying, so putdblock
       +         * won't drop the rlock).  This kludginess means that
       +         * the caller will have to be sure to putdblock on the 
       +         * new block before putdblock on the old block.
       +         */
       +        if(dirty == DirtyIndexSplit){
       +                /* caller should have another dirty block */
       +                assert(!canwlock(&dcache.dirtylock));
       +                /* this block should be clean */
       +                assert(b->dirtying == 0);
       +                assert(b->dirty == 0);
       +        }else{
       +                rlock(&dcache.dirtylock);
       +                assert(b->dirtying == 0);
       +                b->dirtying = 1;
       +        }
        
                qlock(&stats.lock);
                if(b->dirty)
       t@@ -247,12 +275,13 @@ dirtydblock(DBlock *b, int dirty)
                 * blocks.  Only clean blocks ever get marked DirtyIndexSplit,
                 * though, so we don't need the opposite conjunction here.
                 */
       +        odirty = b->dirty;
                if(b->dirty)
                        assert(b->dirty == dirty
                                || (b->dirty==DirtyIndexSplit && dirty==DirtyIndex));
       +        else
       +                b->dirty = dirty;
        
       -        odirty = b->dirty;
       -        b->dirty = dirty;
                p = b->part;
                if(p->writechan == nil){
        fprint(2, "allocate write proc for part %s\n", p->name);
       t@@ -304,6 +333,8 @@ bumpdblock(void)
                                break;
                }
        
       +        fprint(2, "bump %s at %llud\n", b->part->name, b->addr);
       +
                /*
                 * unchain the block
                 */
       t@@ -541,6 +572,7 @@ static void
        flushproc(void *v)
        {
                int i, j, n;
       +        ulong t0;
                DBlock *b, **write;
        
                USED(v);
       t@@ -551,7 +583,8 @@ flushproc(void *v)
                        rsleep(&dcache.flush);
                        qunlock(&dcache.lock);
        
       -                fprint(2, "flushing dcache\n");
       +                fprint(2, "flushing dcache: t=0 ms\n");
       +                t0 = nsec()/1000;
        
                        /*
                         * Because we don't record any dependencies at all, we must write out
       t@@ -568,10 +601,10 @@ flushproc(void *v)
                         * finishes with them.
                         */ 
                
       -                fprint(2, "flushproc: wlock\n");
       +                fprint(2, "flushproc: wlock at t=%lud\n", (ulong)(nsec()/1000) - t0);
                        wlock(&dcache.dirtylock);
        
       -                fprint(2, "flushproc: build list\n");
       +                fprint(2, "flushproc: build list at t=%lud\n", (ulong)(nsec()/1000) - t0);
                        write = dcache.write;
                        n = 0;
                        for(i=0; i<dcache.nblocks; i++){
       t@@ -583,12 +616,15 @@ flushproc(void *v)
                        qsort(write, n, sizeof(write[0]), writeblockcmp);
        
                        /* Write each stage of blocks out. */
       +                fprint(2, "flushproc: write blocks at t=%lud\n", (ulong)(nsec()/1000) - t0);
                        i = 0;
       -                for(j=1; j<DirtyMax; j++)
       +                for(j=1; j<DirtyMax; j++){
       +                        fprint(2, "flushproc: flush dirty %d at t=%lud\n", j, (ulong)(nsec()/1000) - t0);
                                i += parallelwrites(write+i, write+n, j);
       +                }
                        assert(i == n);
        
       -                fprint(2, "flushproc: update dirty bits\n");
       +                fprint(2, "flushproc: update dirty bits at t=%lud\n", (ulong)(nsec()/1000) - t0);
                        qlock(&dcache.lock);
                        for(i=0; i<n; i++){
                                b = write[i];
       t@@ -602,6 +638,8 @@ flushproc(void *v)
                        qunlock(&dcache.lock);
                        wunlock(&dcache.dirtylock);
        
       +                fprint(2, "flushproc: done at t=%lud\n", (ulong)(nsec()/1000) - t0);
       +
                        qlock(&stats.lock);
                        stats.dcacheflushes++;
                        stats.dcacheflushwrites += n;
 (DIR) diff --git a/src/cmd/venti/fmtindex.c b/src/cmd/venti/fmtindex.c
       t@@ -66,9 +66,6 @@ threadmain(int argc, char *argv[])
                        for(i = 0; i < ix->nsects; i++)
                                n += ix->sects[i]->blocks;
        
       -                if(ix->div < 100)
       -                        sysfatal("index divisor too coarse: use bigger block size");
       -
                        fprint(2, "using %ud buckets of %ud; div=%d\n", ix->buckets, n, ix->div);
                }
                amap = MKNZ(AMap, narenas);
       t@@ -105,6 +102,8 @@ threadmain(int argc, char *argv[])
                }
                fprint(2, "configured index=%s with arenas=%d and storage=%lld\n",
                        ix->name, n, addr - IndexBase);
       +        fprint(2, "\tbitblocks=%d maxdepth=%d buckets=%d\n",
       +                ix->bitblocks, ix->maxdepth, ix->buckets);
        
                ix->amap = amap;
                ix->arenas = arenas;
 (DIR) diff --git a/src/cmd/venti/httpd.c b/src/cmd/venti/httpd.c
       t@@ -258,6 +258,7 @@ estats(HConnect *c)
                hprint(hout, "index disk reads=%,ld\n", stats.indexreads);
                hprint(hout, "index disk reads for modify=%,ld\n", stats.indexwreads);
                hprint(hout, "index disk reads for allocation=%,ld\n", stats.indexareads);
       +        hprint(hout, "index block splits=%,ld\n", stats.indexsplits);
        
                hprint(hout, "index cache lookups=%,ld\n", stats.iclookups);
                hprint(hout, "index cache hits=%,ld %d%%\n", stats.ichits,
       t@@ -277,7 +278,8 @@ estats(HConnect *c)
        
                hprint(hout, "disk cache flushes=%,ld\n", stats.dcacheflushes);
                hprint(hout, "disk cache flush writes=%,ld (%,ld per flush)\n", 
       -                stats.dcacheflushwrites, stats.dcacheflushwrites/stats.dcacheflushes);
       +                stats.dcacheflushwrites,
       +                stats.dcacheflushwrites/(stats.dcacheflushes ? stats.dcacheflushes : 1));
        
                hprint(hout, "disk writes=%,ld\n", stats.diskwrites);
                hprint(hout, "disk bytes written=%,lld\n", stats.diskbwrites);
       t@@ -368,7 +370,7 @@ dindex(HConnect *c)
                        ix->name, ix->version, ix->blocksize, ix->tabsize);
                hprint(hout, "\tbuckets=%d div=%d\n", ix->buckets, ix->div);
                for(i = 0; i < ix->nsects; i++)
       -                hprint(hout, "\tsect=%s for buckets [%lld,%lld)\n", ix->smap[i].name, ix->smap[i].start, ix->smap[i].stop);
       +                hprint(hout, "\tsect=%s for buckets [%lld,%lld) buckmax=%d\n", ix->smap[i].name, ix->smap[i].start, ix->smap[i].stop, ix->sects[i]->buckmax);
                for(i = 0; i < ix->narenas; i++){
                        if(ix->arenas[i] != nil && ix->arenas[i]->clumps != 0){
                                hprint(hout, "arena=%s at index [%lld,%lld)\n\t", ix->amap[i].name, ix->amap[i].start, ix->amap[i].stop);
 (DIR) diff --git a/src/cmd/venti/index.c b/src/cmd/venti/index.c
       t@@ -116,6 +116,9 @@ static int        writebucket(ISect *is, u32int buck, IBucket *ib, DBlock *b);
        static int        okibucket(IBucket *ib, ISect *is);
        static int        initindex1(Index*);
        static ISect        *initisect1(ISect *is);
       +static int        splitiblock(Index *ix, DBlock *b, ISect *is, u32int buck, IBucket *ib);
       +
       +#define KEY(k,d)        ((d) ? (k)>>(32-(d)) : 0)
        
        //static QLock        indexlock;        //ZZZ
        
       t@@ -763,6 +766,7 @@ storeientry(Index *ix, IEntry *ie)
                stats.indexwreads++;
                qunlock(&stats.lock);
        
       +top:
                b = loadibucket(ix, ie->score, &is, &buck, &ib);
                if(b == nil)
                        return -1;
       t@@ -789,6 +793,18 @@ storeientry(Index *ix, IEntry *ie)
                        goto out;
                }
        
       +        /* block is full -- not supposed to happen in V1 */
       +        if(ix->version == IndexVersion1){
       +                seterr(EAdmin, "index bucket %ud on %s is full\n", buck, is->part->name);
       +                ok = -1;
       +                goto out;
       +        }
       +
       +        /* in V2, split the block and try again; splitiblock puts b */
       +        if(splitiblock(ix, b, is, buck, &ib) < 0)
       +                return -1;
       +        goto top;
       +
        out:
                putdblock(b);
                return ok;
       t@@ -797,7 +813,7 @@ out:
        static int
        writebucket(ISect *is, u32int buck, IBucket *ib, DBlock *b)
        {
       -        assert(b->dirty == DirtyIndex);
       +        assert(b->dirty == DirtyIndex || b->dirty == DirtyIndexSplit);
        
                if(buck >= is->blocks){
                        seterr(EAdmin, "index write out of bounds: %d >= %d\n",
       t@@ -919,7 +935,7 @@ indexsect0(Index *ix, u32int buck)
         * load the index block at bucket #buck
         */
        static DBlock*
       -loadibucket0(Index *ix, u32int buck, ISect **pis, u32int *pbuck, IBucket *ib)
       +loadibucket0(Index *ix, u32int buck, ISect **pis, u32int *pbuck, IBucket *ib, int read)
        {
                ISect *is;
                DBlock *b;
       t@@ -931,12 +947,15 @@ loadibucket0(Index *ix, u32int buck, ISect **pis, u32int *pbuck, IBucket *ib)
                }
        
                buck -= is->start;
       -        if((b = getdblock(is->part, is->blockbase + ((u64int)buck << is->blocklog), 1)) == nil)
       +        if((b = getdblock(is->part, is->blockbase + ((u64int)buck << is->blocklog), read)) == nil)
                        return nil;
        
       -        *pis = is;
       -        *pbuck = buck;
       -        unpackibucket(ib, b->data);
       +        if(pis)
       +                *pis = is;
       +        if(pbuck)
       +                *pbuck = buck;
       +        if(ib)
       +                unpackibucket(ib, b->data);
                return b;
        }
        
       t@@ -955,14 +974,16 @@ indexsect1(Index *ix, u8int *score)
        static DBlock*
        loadibucket1(Index *ix, u8int *score, ISect **pis, u32int *pbuck, IBucket *ib)
        {
       -        return loadibucket0(ix, hashbits(score, 32)/ix->div, pis, pbuck, ib);
       +        return loadibucket0(ix, hashbits(score, 32)/ix->div, pis, pbuck, ib, 1);
        }
        
        static u32int
        keytobuck(Index *ix, u32int key, int d)
        {
       -        /* clear all but top d bits */
       -        if(d != 32)
       +        /* clear all but top d bits; can't depend on boundary case shifts */
       +        if(d == 0)
       +                key = 0;
       +        else if(d != 32)
                        key &= ~((1<<(32-d))-1);
        
                /* truncate to maxdepth bits */
       t@@ -981,27 +1002,33 @@ static int
        bitmapop(Index *ix, u32int key, int d, int set)
        {
                DBlock *b;
       -        ISect *is;
       -        IBucket ib;
       -        u32int buck;
                int inuse;
       +        u32int key1, buck1;
        
                if(d >= ix->maxdepth)
                        return 0;
        
       +
                /* construct .xxx1 in bucket number format */
       -        key = keytobuck(ix, key, d) | (1<<(ix->maxdepth-d-1));
       +        key1 = key | (1<<(32-d-1));
       +        buck1 = keytobuck(ix, key1, d+1);
       +
       +if(0) fprint(2, "key %d/%0*ub key1 %d/%0*ub buck1 %08ux\n",
       +        d, d, KEY(key, d), d+1, d+1, KEY(key1, d+1), buck1);
        
       -        /* check whether key (now the bucket number for .xxx1) is in use */
       +        /* check whether buck1 is in use */
        
       -        if((b = loadibucket0(ix, key >> ix->bitkeylog, &is, &buck, &ib)) == nil){
       +        if((b = loadibucket0(ix, buck1 >> ix->bitkeylog, nil, nil, nil, 1)) == nil){
                        seterr(ECorrupt, "cannot load in-use bitmap block");
       +fprint(2, "loadibucket: %r\n");
                        return -1;
                }
       -        inuse = ((u32int*)b->data)[(key & ix->bitkeymask)>>5] & (1<<(key&31));
       +if(0) fprint(2, "buck1 %08ux bitkeymask %08ux bitkeylog %d\n", buck1, ix->bitkeymask, ix->bitkeylog);
       +        buck1 &= ix->bitkeymask;
       +        inuse = ((u32int*)b->data)[buck1>>5] & (1<<(buck1&31));
                if(set && !inuse){
                        dirtydblock(b, DirtyIndexBitmap);
       -                ((u32int*)b->data)[(key & ix->bitkeymask)>>5] |= (1<<(key&31));
       +                ((u32int*)b->data)[buck1>>5] |= (1<<(buck1&31));
                }
                putdblock(b);
                return inuse;
       t@@ -1073,13 +1100,15 @@ loadibucket2(Index *ix, u8int *score, ISect **pis, u32int *pbuck, IBucket *ib)
                                return nil;
                        }
        
       -                if((b = loadibucket0(ix, keytobuck(ix, key, d), pis, pbuck, ib)) == nil)
       +                if((b = loadibucket0(ix, keytobuck(ix, key, d), pis, pbuck, ib, 1)) == nil)
                                return nil;
        
                        if(ib->depth == d)
                                return b;
        
                        if(ib->depth < d){
       +                        fprint(2, "loaded block %ud for %d/%0*ub got %d/%0*ub\n",
       +                                *pbuck, d, d, KEY(key,d), ib->depth, ib->depth, KEY(key, ib->depth));
                                seterr(EBug, "index block has smaller depth than expected -- cannot happen");
                                putdblock(b);
                                return nil;
       t@@ -1087,8 +1116,12 @@ loadibucket2(Index *ix, u8int *score, ISect **pis, u32int *pbuck, IBucket *ib)
        
                        /*
                         * ib->depth > d, meaning the bitmap was out of date.
       -                 * fix the bitmap and try again.
       +                 * fix the bitmap and try again.  if we actually updated
       +                 * the bitmap in splitiblock, this would only happen if
       +                 * venti crashed at an inopportune moment.  but this way
       +                 * the code gets tested more.
                         */
       +if(0) fprint(2, "update bitmap: %d/%0*ub is split\n", d, d, KEY(key,d));
                        putdblock(b);
                        if(marksplit(ix, key, d) < 0)
                                return nil;
       t@@ -1097,6 +1130,110 @@ loadibucket2(Index *ix, u8int *score, ISect **pis, u32int *pbuck, IBucket *ib)
                return nil;
        }
        
       +static int
       +splitiblock(Index *ix, DBlock *b, ISect *is, u32int buck, IBucket *ib)
       +{
       +        int i, d;
       +        u8int *score;
       +        u32int buck0, buck1, key0, key1, key, dmask;
       +        DBlock *b0, *b1;
       +        IBucket ib0, ib1;
       +
       +        if(ib->depth == ix->maxdepth){
       +if(0) fprint(2, "depth=%d == maxdepth\n", ib->depth);
       +                seterr(EAdmin, "index bucket %ud on %s is full\n", buck, is->part->name);
       +                putdblock(b);
       +                return -1;
       +        }
       +
       +        buck = is->start+buck - ix->bitblocks;
       +        d = ib->depth+1;
       +        buck0 = buck;
       +        buck1 = buck0 | (1<<(ix->maxdepth-d));
       +        if(ix->maxdepth == 32){
       +                key0 = buck0;
       +                key1 = buck1;
       +        }else{
       +                key0 = buck0 << (32-ix->maxdepth);
       +                key1 = buck1 << (32-ix->maxdepth);
       +        }
       +        buck0 += ix->bitblocks;
       +        buck1 += ix->bitblocks;
       +        USED(buck0);
       +        USED(key1);
       +
       +        if(d == 32)
       +                dmask = TWID32;
       +        else
       +                dmask = TWID32 ^ ((1<<(32-d))-1);
       +
       +        /*
       +         * Since we hold the lock for b, the bitmap
       +         * thinks buck1 doesn't exist, and the bit
       +         * for buck1 can't be updated without first
       +         * locking and splitting b, it's safe to try to
       +         * acquire the lock on buck1 without dropping b.
       +         * No one else will go after it too.
       +         *
       +         * Also, none of the rest of the code ever locks
       +         * more than one block at a time, so it's okay if
       +         * we do.
       +         */
       +        if((b1 = loadibucket0(ix, buck1, nil, nil, &ib1, 0)) == nil){
       +                putdblock(b);
       +                return -1;
       +        }
       +        b0 = b;
       +        ib0 = *ib;
       +
       +        /*
       +         * Funny locking going on here -- see dirtydblock.
       +         * We must putdblock(b1) before putdblock(b0).
       +         */
       +        dirtydblock(b0, DirtyIndex);
       +        dirtydblock(b1, DirtyIndexSplit);
       +
       +        /*
       +         * Split the block contents.
       +         * The block is already sorted, so it's pretty easy:
       +         * the first part stays, the second part goes to b1.
       +         */
       +        ib0.n = 0;
       +        ib0.depth = d;
       +        ib1.n = 0;
       +        ib1.depth = d;
       +        for(i=0; i<ib->n; i++){
       +                score = ib->data+i*IEntrySize;
       +                key = hashbits(score, 32);
       +                if((key&dmask) != key0)
       +                        break;
       +        }
       +        ib0.n = i;
       +        ib1.n = ib->n - ib0.n;
       +        memmove(ib1.data, ib0.data+ib0.n*IEntrySize, ib1.n*IEntrySize);
       +if(0) fprint(2, "splitiblock %d in %d/%0*ub => %d in %d/%0*ub + %d in %d/%0*ub\n",
       +        ib->n, d-1, d-1, key0>>(32-d+1), 
       +        ib0.n, d, d, key0>>(32-d), 
       +        ib1.n, d, d, key1>>(32-d));
       +
       +        packibucket(&ib0, b0->data);
       +        packibucket(&ib1, b1->data);
       +
       +        /* order matters!  see comment above. */
       +        putdblock(b1);
       +        putdblock(b0);
       +
       +        /*
       +         * let the recovery code take care of updating the bitmap.
       +         */
       +
       +        qlock(&stats.lock);
       +        stats.indexsplits++;
       +        qunlock(&stats.lock);
       +
       +        return 0;
       +}
       +
        int
        indexsect(Index *ix, u8int *score)
        {
 (DIR) diff --git a/src/cmd/venti/lumpqueue.c b/src/cmd/venti/lumpqueue.c
       t@@ -102,6 +102,9 @@ flushqueue(void)
                int i;
                LumpQueue *q;
        
       +        if(!lumpqs)
       +                return;
       +
                qlock(&glk);
                gen++;
                qunlock(&glk);
 (DIR) diff --git a/src/cmd/venti/part.c b/src/cmd/venti/part.c
       t@@ -2,7 +2,7 @@
        #include "dat.h"
        #include "fns.h"
        
       -#define trace 0
       +#define trace 1
        
        u32int        maxblocksize;
        int        readonly;
 (DIR) diff --git a/src/cmd/venti/stats.c b/src/cmd/venti/stats.c
       t@@ -44,6 +44,7 @@ printstats(void)
                fprint(2, "index disk reads=%,ld\n", stats.indexreads);
                fprint(2, "index disk reads for modify=%,ld\n", stats.indexwreads);
                fprint(2, "index disk reads for allocation=%,ld\n", stats.indexareads);
       +        fprint(2, "index block splits=%,ld\n", stats.indexsplits);
        
                fprint(2, "index cache lookups=%,ld\n", stats.iclookups);
                fprint(2, "index cache hits=%,ld %d%%\n", stats.ichits,
 (DIR) diff --git a/src/cmd/venti/syncarena.c b/src/cmd/venti/syncarena.c
       t@@ -128,6 +128,7 @@ syncarena(Arena *arena, u32int n, int zok, int fix)
                                fprint(2, "can't flush arena directory cache: %r");
                                err |= SyncFixErr;
                        }
       +                flushdcache();
                }
        
                if(used != arena->used