Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

wvstream.cc

00001 /*
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  * 
00005  * Unified support for streams, that is, sequences of bytes that may or
00006  * may not be ready for read/write at any given time.
00007  * 
00008  * We provide typical read and write routines, as well as a select() function
00009  * for each stream.
00010  */
00011 #include <time.h>
00012 #include <sys/types.h>
00013 #include <assert.h>
00014 #define __WVSTREAM_UNIT_TEST 1
00015 #include "wvstream.h"
00016 #include "wvtimeutils.h"
00017 #include "wvcont.h"
00018 
00019 #ifdef _WIN32
00020 #define ENOBUFS WSAENOBUFS
00021 #undef errno
00022 #define errno GetLastError()
00023 #ifdef __GNUC__
00024 #include <sys/socket.h>
00025 #endif
00026 #include "streams.h"
00027 #else
00028 #include <errno.h>
00029 #endif
00030 
00031 // enable this to add some read/write trace messages (this can be VERY
00032 // verbose)
00033 #if 0
00034 # ifndef _MSC_VER
00035 #  define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
00036 # else
00037 #  define TRACE printf
00038 # endif
00039 #else
00040 # ifndef _MSC_VER
00041 #  define TRACE(x, y...)
00042 # else
00043 #  define TRACE
00044 # endif
00045 #endif
00046 
00047 WvStream *WvStream::globalstream = NULL;
00048 
00049 UUID_MAP_BEGIN(WvStream)
00050   UUID_MAP_ENTRY(IObject)
00051   UUID_MAP_ENTRY(IWvStream)
00052   UUID_MAP_END
00053 
00054 
00055 WvStream::WvStream():
00056     read_requires_writable(NULL),
00057     write_requires_readable(NULL),
00058     uses_continue_select(false),
00059     personal_stack_size(65536),
00060     alarm_was_ticking(false),
00061     stop_read(false),
00062     stop_write(false),
00063     closed(false),
00064     userdata(NULL),
00065     readcb(this, &WvStream::legacy_callback),
00066     max_outbuf_size(0),
00067     outbuf_delayed_flush(false),
00068     is_auto_flush(true),
00069     want_to_flush(true),
00070     is_flushing(false),
00071     queue_min(0),
00072     autoclose_time(0),
00073     alarm_time(wvtime_zero),
00074     last_alarm_check(wvtime_zero)
00075 {
00076     TRACE("Creating wvstream %p\n", this);
00077     
00078 #ifdef _WIN32
00079     WSAData wsaData;
00080     int result = WSAStartup(MAKEWORD(2,0), &wsaData); 
00081     assert(result == 0);
00082 #endif
00083 }
00084 
00085 
00086 // FIXME: interfaces (IWvStream) shouldn't have implementations!
00087 IWvStream::IWvStream()
00088 {
00089 }
00090 
00091 
00092 IWvStream::~IWvStream()
00093 {
00094 }
00095 
00096 
00097 WvStream::~WvStream()
00098 {
00099     TRACE("destroying %p\n", this);
00100     close();
00101     
00102     // if this assertion fails, then uses_continue_select is true, but you
00103     // didn't call terminate_continue_select() or close() before destroying
00104     // your object.  Shame on you!
00105     assert(!uses_continue_select || !call_ctx);
00106     
00107     call_ctx = 0; // finish running the suspended callback, if any
00108     TRACE("done destroying %p\n", this);
00109 }
00110 
00111 
00112 void WvStream::close()
00113 {
00114     TRACE("flushing in wvstream...\n");
00115     flush(2000); // fixme: should not hardcode this stuff
00116     TRACE("(flushed)\n");
00117 
00118     closed = true;
00119     
00120     if (!!closecb)
00121     {
00122         IWvStreamCallback cb = closecb;
00123         closecb = 0; // ensure callback is only called once
00124         cb(*this);
00125     }
00126     
00127     // I would like to delete call_ctx here, but then if someone calls
00128     // close() from *inside* a continuable callback, we explode.  Oops!
00129     //call_ctx = 0; // destroy the context, if necessary
00130 }
00131 
00132 
00133 void WvStream::autoforward(WvStream &s)
00134 {
00135     setcallback(autoforward_callback, &s);
00136     read_requires_writable = &s;
00137 }
00138 
00139 
00140 void WvStream::noautoforward()
00141 {
00142     setcallback(0, NULL);
00143     read_requires_writable = NULL;
00144 }
00145 
00146 
00147 void WvStream::autoforward_callback(WvStream &s, void *userdata)
00148 {
00149     WvStream &s2 = *(WvStream *)userdata;
00150     char buf[1024];
00151     size_t len;
00152     
00153     len = s.read(buf, sizeof(buf));
00154     // fprintf(stderr, "autoforward read %d bytes\n", (int)len);
00155     s2.write(buf, len);
00156 }
00157 
00158 
00159 void WvStream::_callback()
00160 {
00161     execute();
00162     if (!! callfunc)
00163         callfunc(*this, userdata);
00164 }
00165 
00166 
00167 void *WvStream::_callwrap(void *)
00168 {
00169     _callback();
00170     return NULL;
00171 }
00172 
00173 
00174 void WvStream::callback()
00175 {
00176     TRACE("(?)");
00177     
00178     // if the alarm has gone off and we're calling callback... good!
00179     if (alarm_remaining() == 0)
00180     {
00181         alarm_time = wvtime_zero;
00182         alarm_was_ticking = true;
00183     }
00184     else
00185         alarm_was_ticking = false;
00186     
00187     assert(!uses_continue_select || personal_stack_size >= 1024);
00188 
00189 #define TEST_CONTINUES_HARSHLY 0
00190 #if TEST_CONTINUES_HARSHLY
00191 #ifndef _WIN32
00192 # warning "Using WvCont for *all* streams for testing!"
00193 #endif
00194     if (1)
00195 #else
00196     if (uses_continue_select && personal_stack_size >= 1024)
00197 #endif
00198     {
00199         if (!call_ctx) // no context exists yet!
00200         {
00201             call_ctx = WvCont(WvCallback<void*,void*>
00202                               (this, &WvStream::_callwrap),
00203                               personal_stack_size);
00204         }
00205         
00206         call_ctx(NULL);
00207     }
00208     else
00209         _callback();
00210 
00211     // if this assertion fails, a derived class's virtual execute() function
00212     // didn't call its parent's execute() function, and we didn't make it
00213     // all the way back up to WvStream::execute().  This doesn't always
00214     // matter right now, but it could lead to obscure bugs later, so we'll
00215     // enforce it.
00216 }
00217 
00218 
00219 bool WvStream::isok() const
00220 {
00221     return !closed && WvErrorBase::isok();
00222 }
00223 
00224 
00225 void WvStream::seterr(int _errnum)
00226 {
00227     if (!geterr()) // no pre-existing error
00228     {
00229         WvErrorBase::seterr(_errnum);
00230         close();
00231     }
00232 }
00233 
00234 
00235 size_t WvStream::read(WvBuf &outbuf, size_t count)
00236 {
00237     // for now, just wrap the older read function
00238     size_t free = outbuf.free();
00239     if (count > free)
00240         count = free;
00241 
00242     WvDynBuf tmp;
00243     unsigned char *buf = tmp.alloc(count);
00244     size_t len = read(buf, count);
00245     tmp.unalloc(count - len);
00246     outbuf.merge(tmp);
00247     return len;
00248 }
00249 
00250 
00251 size_t WvStream::write(WvBuf &inbuf, size_t count)
00252 {
00253     // for now, just wrap the older write function
00254     size_t avail = inbuf.used();
00255     if (count > avail)
00256         count = avail;
00257     const unsigned char *buf = inbuf.get(count);
00258     size_t len = write(buf, count);
00259     inbuf.unget(count - len);
00260     return len;
00261 }
00262 
00263 
00264 size_t WvStream::read(void *buf, size_t count)
00265 {
00266     size_t bufu, i;
00267     unsigned char *newbuf;
00268 
00269     bufu = inbuf.used();
00270     if (bufu < queue_min)
00271     {
00272         newbuf = inbuf.alloc(queue_min - bufu);
00273         i = uread(newbuf, queue_min - bufu);
00274         inbuf.unalloc(queue_min - bufu - i);
00275         
00276         bufu = inbuf.used();
00277     }
00278     
00279     if (bufu < queue_min)
00280     {
00281         maybe_autoclose();
00282         return 0;
00283     }
00284         
00285     // if buffer is empty, do a hard read
00286     if (!bufu)
00287         bufu = uread(buf, count);
00288     else
00289     {
00290         // otherwise just read from the buffer
00291         if (bufu > count)
00292             bufu = count;
00293     
00294         memcpy(buf, inbuf.get(bufu), bufu);
00295     }
00296     
00297     TRACE("read  obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu, count);
00298     maybe_autoclose();
00299     return bufu;
00300 }
00301 
00302 
00303 size_t WvStream::write(const void *buf, size_t count)
00304 {
00305     if (!isok() || !buf || !count || stop_write) return 0;
00306     
00307     size_t wrote = 0;
00308     if (!outbuf_delayed_flush && !outbuf.used())
00309     {
00310         wrote = uwrite(buf, count);
00311         count -= wrote;
00312         buf = (const unsigned char *)buf + wrote;
00313         // if (!count) return wrote; // short circuit if no buffering needed
00314     }
00315     if (max_outbuf_size != 0)
00316     {
00317         size_t canbuffer = max_outbuf_size - outbuf.used();
00318         if (count > canbuffer)
00319             count = canbuffer; // can't write the whole amount
00320     }
00321     if (count != 0)
00322     {
00323         outbuf.put(buf, count);
00324         wrote += count;
00325     }
00326 
00327     if (should_flush())
00328     {
00329         if (is_auto_flush)
00330             flush(0);
00331         else 
00332             flush_outbuf(0);
00333     }
00334 
00335     return wrote;
00336 }
00337 
00338 
00339 void WvStream::noread()
00340 {
00341     stop_read = true;
00342     maybe_autoclose();
00343 }
00344 
00345 
00346 void WvStream::nowrite()
00347 {
00348     stop_write = true;
00349     maybe_autoclose();
00350 }
00351 
00352 
00353 void WvStream::maybe_autoclose()
00354 {
00355     if (stop_read && stop_write && !outbuf.used() && !inbuf.used() && !closed)
00356         close();
00357 }
00358 
00359 
00360 bool WvStream::isreadable()
00361 {
00362     return isok() && select(0, true, false, false);
00363 }
00364 
00365 
00366 bool WvStream::iswritable()
00367 {
00368     return !stop_write && isok() && select(0, false, true, false);
00369 }
00370 
00371 
00372 char *WvStream::blocking_getline(time_t wait_msec, int separator,
00373                                  int readahead)
00374 {
00375     // in fact, a separator of 0 would probably work fine.  Unfortunately,
00376     // the parameters of getline() changed recently to not include
00377     // wait_msec, so people keep trying to pass 0/-1 wait_msec in as the
00378     // separator.  Stop them now, before they get confused.
00379     // --mrwise - had to reenable 0 so that unit tests would pass when
00380     // merging, bug 11133
00381     //assert(separator != 0);
00382     assert(separator >= 0);
00383     assert(separator <= 255);
00384     
00385     //assert(uses_continue_select || wait_msec == 0);
00386 
00387     struct timeval timeout_time;
00388     if (wait_msec > 0)
00389         timeout_time = msecadd(wvtime(), wait_msec);
00390     
00391     maybe_autoclose();
00392 
00393     // if we get here, we either want to wait a bit or there is data
00394     // available.
00395     while (isok())
00396     {
00397         // fprintf(stderr, "(inbuf used = %d)\n", inbuf.used()); fflush(stderr);
00398         queuemin(0);
00399     
00400         // if there is a newline already, we have enough data.
00401         if (inbuf.strchr(separator) > 0)
00402             break;
00403         else if (!isok() || stop_read)    // uh oh, stream is in trouble.
00404             break;
00405 
00406         // make select not return true until more data is available
00407         queuemin(inbuf.used() + 1);
00408 
00409         // compute remaining timeout
00410         if (wait_msec > 0)
00411         {
00412             wait_msec = msecdiff(timeout_time, wvtime());
00413             if (wait_msec < 0)
00414                 wait_msec = 0;
00415         }
00416         
00417         // FIXME: this is blocking_getline.  It shouldn't
00418         // call continue_select()!
00419         bool hasdata;
00420         if (wait_msec != 0 && uses_continue_select)
00421             hasdata = continue_select(wait_msec);
00422         else
00423             hasdata = select(wait_msec, true, false);
00424         
00425         if (!isok())
00426             break;
00427 
00428         if (hasdata)
00429         {
00430             // read a few bytes
00431             WvDynBuf tmp;
00432             unsigned char *buf = tmp.alloc(readahead);
00433             size_t len = uread(buf, readahead);
00434             tmp.unalloc(readahead - len);
00435             inbuf.merge(tmp);
00436             hasdata = len > 0; // enough?
00437         }
00438 
00439         if (!isok())
00440             break;
00441         
00442         if (!hasdata && wait_msec == 0)
00443             return NULL; // handle timeout
00444     }
00445     if (!inbuf.used())
00446         return NULL;
00447 
00448     // return the appropriate data
00449     size_t i = 0;
00450     i = inbuf.strchr(separator);
00451     if (i > 0) {
00452         char *eol = (char *)inbuf.mutablepeek(i - 1, 1);
00453         assert(eol);
00454         *eol = 0;
00455         return const_cast<char*>((const char *)inbuf.get(i));
00456     } else {
00457         // handle "EOF without newline" condition
00458         // FIXME: it's very silly that buffers can't return editable
00459         // char* arrays.
00460         inbuf.alloc(1)[0] = 0; // null-terminate it
00461         return const_cast<char *>((const char *)inbuf.get(inbuf.used()));
00462     }
00463 }
00464 
00465 
00466 char *WvStream::continue_getline(time_t wait_msec, int separator,
00467                                  int readahead)
00468 {
00469     assert(false && "not implemented, come back later!");
00470     assert(uses_continue_select);
00471     return NULL;
00472 }
00473 
00474 
00475 void WvStream::drain()
00476 {
00477     char buf[1024];
00478     while (isreadable())
00479         read(buf, sizeof(buf));
00480 }
00481 
00482 
00483 bool WvStream::flush(time_t msec_timeout)
00484 {
00485     if (is_flushing) return false;
00486     
00487     TRACE("%p flush starts\n", this);
00488 
00489     is_flushing = true;
00490     want_to_flush = true;
00491     bool done = flush_internal(msec_timeout) // any other internal buffers
00492         && flush_outbuf(msec_timeout);  // our own outbuf
00493     is_flushing = false;
00494 
00495     TRACE("flush stops (%d)\n", done);
00496     return done;
00497 }
00498 
00499 
00500 bool WvStream::should_flush()
00501 {
00502     return want_to_flush;
00503 }
00504 
00505 
00506 bool WvStream::flush_outbuf(time_t msec_timeout)
00507 {
00508     TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok());
00509     bool outbuf_was_used = outbuf.used();
00510     
00511     // do-nothing shortcut for speed
00512     // FIXME: definitely makes a "measurable" difference...
00513     //   but is it worth the risk?
00514     if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush)
00515     {
00516         maybe_autoclose();
00517         return true;
00518     }
00519     
00520     WvTime stoptime = msecadd(wvtime(), msec_timeout);
00521     
00522     // flush outbuf
00523     while (outbuf_was_used && isok())
00524     {
00525 //      fprintf(stderr, "%p: fd:%d/%d, used:%d\n", 
00526 //              this, getrfd(), getwfd(), outbuf.used());
00527         
00528         size_t attempt = outbuf.used();
00529         size_t real = uwrite(outbuf.get(attempt), attempt);
00530         
00531         // WARNING: uwrite() may have messed up our outbuf!
00532         // This probably only happens if uwrite() closed the stream because
00533         // of an error, so we'll check isok().
00534         if (isok() && real < attempt)
00535         {
00536             TRACE("flush_outbuf: unget %d-%d\n", attempt, real);
00537             assert(outbuf.ungettable() >= attempt - real);
00538             outbuf.unget(attempt - real);
00539         }
00540         
00541         // since post_select() can call us, and select() calls post_select(),
00542         // we need to be careful not to call select() if we don't need to!
00543         // post_select() will only call us with msec_timeout==0, and we don't
00544         // need to do select() in that case anyway.
00545         if (!msec_timeout)
00546             break;
00547         if (msec_timeout >= 0 
00548           && (stoptime < wvtime() || !select(msec_timeout, false, true)))
00549             break;
00550         
00551         outbuf_was_used = outbuf.used();
00552     }
00553 
00554     // handle autoclose
00555     if (autoclose_time && isok())
00556     {
00557         time_t now = time(NULL);
00558         TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n", 
00559               this, now - autoclose_time, outbuf.used());
00560         if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time)
00561         {
00562             autoclose_time = 0; // avoid infinite recursion!
00563             close();
00564         }
00565     }
00566 
00567     TRACE("flush_outbuf: after autoclose chunk\n");
00568     if (outbuf_delayed_flush && !outbuf_was_used)
00569         want_to_flush = false;
00570     
00571     TRACE("flush_outbuf: now isok=%d\n", isok());
00572 
00573     // if we can't flush the outbuf, at least empty it!
00574     if (outbuf_was_used && !isok())
00575         outbuf.zap();
00576 
00577     maybe_autoclose();
00578     TRACE("flush_outbuf stops\n");
00579     
00580     return !outbuf_was_used;
00581 }
00582 
00583 
00584 bool WvStream::flush_internal(time_t msec_timeout)
00585 {
00586     // once outbuf emptied, that's it for most streams
00587     return true;
00588 }
00589 
00590 
00591 int WvStream::getrfd() const
00592 {
00593     return -1;
00594 }
00595 
00596 
00597 int WvStream::getwfd() const
00598 {
00599     return -1;
00600 }
00601 
00602 
00603 void WvStream::flush_then_close(int msec_timeout)
00604 {
00605     time_t now = time(NULL);
00606     autoclose_time = now + (msec_timeout + 999) / 1000;
00607     
00608     TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n", 
00609             this, outbuf.used(), autoclose_time - now);
00610 
00611     // as a fast track, we _could_ close here: but that's not a good idea,
00612     // since flush_then_close() deals with obscure situations, and we don't
00613     // want the caller to use it incorrectly.  So we make things _always_
00614     // break when the caller forgets to call select() later.
00615     
00616     flush(0);
00617 }
00618 
00619 
00620 bool WvStream::pre_select(SelectInfo &si)
00621 {
00622     maybe_autoclose();
00623     
00624     time_t alarmleft = alarm_remaining();
00625     
00626     if (!si.inherit_request && alarmleft == 0)
00627         return true; // alarm has rung
00628 
00629     if (!si.inherit_request)
00630     {
00631         si.wants.readable |= readcb;
00632         si.wants.writable |= writecb;
00633         si.wants.isexception |= exceptcb;
00634     }
00635     
00636     // handle read-ahead buffering
00637     if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
00638         return true; // already ready
00639     if (alarmleft >= 0
00640       && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
00641         si.msec_timeout = alarmleft + 10;
00642     return false;
00643 }
00644 
00645 
00646 bool WvStream::post_select(SelectInfo &si)
00647 {
00648     // FIXME: need sane buffer flush support for non FD-based streams
00649     // FIXME: need read_requires_writable and write_requires_readable
00650     //        support for non FD-based streams
00651     
00652     // note: flush(nonzero) might call select(), but flush(0) never does,
00653     // so this is safe.
00654     if (should_flush())
00655         flush(0);
00656     if (!si.inherit_request && alarm_remaining() == 0)
00657         return true; // alarm ticked
00658     return false;
00659 }
00660 
00661 
00662 bool WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
00663     bool readable, bool writable, bool isexcept, bool forceable)
00664 {
00665     FD_ZERO(&si.read);
00666     FD_ZERO(&si.write);
00667     FD_ZERO(&si.except);
00668     
00669     if (forceable)
00670     {
00671         si.wants.readable = readcb;
00672         si.wants.writable = writecb;
00673         si.wants.isexception = exceptcb;
00674     }
00675     else
00676     {
00677         si.wants.readable = readable;
00678         si.wants.writable = writable;
00679         si.wants.isexception = isexcept;
00680     }
00681     
00682     si.max_fd = -1;
00683     si.msec_timeout = msec_timeout;
00684     si.inherit_request = ! forceable;
00685     si.global_sure = false;
00686 
00687     if (!isok()) return false;
00688 
00689     bool sure = pre_select(si);
00690     if (globalstream && forceable && (globalstream != this))
00691     {
00692         WvStream *s = globalstream;
00693         globalstream = NULL; // prevent recursion
00694         si.global_sure = s->xpre_select(si, SelectRequest(false, false, false));
00695         globalstream = s;
00696     }
00697     if (sure || si.global_sure)
00698         si.msec_timeout = 0;
00699     return sure;
00700 }
00701 
00702 
00703 int WvStream::_do_select(SelectInfo &si)
00704 {
00705     // prepare timeout
00706     timeval tv;
00707     tv.tv_sec = si.msec_timeout / 1000;
00708     tv.tv_usec = (si.msec_timeout % 1000) * 1000;
00709     
00710 #ifdef _WIN32
00711     // selecting on an empty set of sockets doesn't cause a delay in win32.
00712     SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0);
00713     FD_SET(fakefd, &si.except);
00714 #endif    
00715     
00716     // block
00717     int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
00718         si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
00719 
00720     // handle errors.
00721     //   EAGAIN and EINTR don't matter because they're totally normal.
00722     //   ENOBUFS is hopefully transient.
00723     //   EBADF is kind of gross and might imply that something is wrong,
00724     //      but it happens sometimes...
00725     if (sel < 0 
00726       && errno != EAGAIN && errno != EINTR 
00727       && errno != EBADF
00728       && errno != ENOBUFS
00729       )
00730     {
00731         seterr(errno);
00732     }
00733 #ifdef _WIN32
00734     ::close(fakefd);
00735 #endif
00736     TRACE("select() returned %d\n", sel);
00737     return sel;
00738 }
00739 
00740 
00741 bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable)
00742 {
00743     if (!isok()) return false;
00744     
00745     bool sure = post_select(si);
00746     if (globalstream && forceable && (globalstream != this))
00747     {
00748         WvStream *s = globalstream;
00749         globalstream = NULL; // prevent recursion
00750         si.global_sure = s->xpost_select(si, SelectRequest(false, false, false))
00751                                          || si.global_sure;
00752         globalstream = s;
00753     }
00754     return sure;
00755 }
00756 
00757 
00758 bool WvStream::_select(time_t msec_timeout,
00759     bool readable, bool writable, bool isexcept, bool forceable)
00760 {
00761     SelectInfo si;
00762     bool sure = _build_selectinfo(si, msec_timeout,
00763                                   readable, writable, isexcept, forceable);
00764     
00765     if (!isok())
00766         return false;
00767     
00768     // the eternal question: if 'sure' is true already, do we need to do the
00769     // rest of this stuff?  If we do, it might increase fairness a bit, but
00770     // it encourages select()ing when we know something fishy has happened -
00771     // when a stream is !isok() in a list, for example, pre_select() returns
00772     // true.  If that's the case, our SelectInfo structure might not be
00773     // quite right (eg. it might be selecting on invalid fds).  That doesn't
00774     // sound *too* bad, so let's go for the fairness.
00775 
00776     int sel = _do_select(si);
00777     if (sel >= 0)
00778         sure = _process_selectinfo(si, forceable) || sure; // note the order
00779     if (si.global_sure && globalstream && forceable && (globalstream != this))
00780         globalstream->callback();
00781     return sure;
00782 }
00783 
00784 
00785 IWvStream::SelectRequest WvStream::get_select_request()
00786 {
00787     return IWvStream::SelectRequest(readcb, writecb, exceptcb);
00788 }
00789 
00790 
00791 void WvStream::force_select(bool readable, bool writable, bool isexception)
00792 {
00793     if (readable)
00794         readcb = IWvStreamCallback(this, &WvStream::legacy_callback);
00795     if (writable)
00796         writecb = IWvStreamCallback(this, &WvStream::legacy_callback);
00797     if (isexception)
00798         exceptcb = IWvStreamCallback(this, &WvStream::legacy_callback);
00799 }
00800 
00801 
00802 void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
00803 {
00804     if (readable)
00805         readcb = 0;
00806     if (writable)
00807         writecb = 0;
00808     if (isexception)
00809         exceptcb = 0;
00810 }
00811 
00812 
00813 void WvStream::alarm(time_t msec_timeout)
00814 {
00815     if (msec_timeout >= 0)
00816         alarm_time = msecadd(wvtime(), msec_timeout);
00817     else
00818         alarm_time = wvtime_zero;
00819 }
00820 
00821 
00822 time_t WvStream::alarm_remaining()
00823 {
00824     if (alarm_time.tv_sec)
00825     {
00826         WvTime now = wvtime();
00827 
00828         // Time is going backward!
00829         if (now < last_alarm_check)
00830         {
00831 #if 0 // okay, I give up.  Time just plain goes backwards on some systems.
00832             // warn only if it's a "big" difference (sigh...)
00833             if (msecdiff(last_alarm_check, now) > 200)
00834                 fprintf(stderr, " ************* TIME WENT BACKWARDS! "
00835                         "(%ld:%ld %ld:%ld)\n",
00836                         last_alarm_check.tv_sec, last_alarm_check.tv_usec,
00837                         now.tv_sec, now.tv_usec);
00838 #endif
00839             alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
00840         }
00841 
00842         last_alarm_check = now;
00843 
00844         time_t remaining = msecdiff(alarm_time, now);
00845         if (remaining < 0)
00846             remaining = 0;
00847         return remaining;
00848     }
00849     return -1;
00850 }
00851 
00852 
00853 bool WvStream::continue_select(time_t msec_timeout)
00854 {
00855     assert(uses_continue_select);
00856     
00857     // if this assertion triggers, you probably tried to do continue_select()
00858     // while inside terminate_continue_select().
00859     assert(call_ctx);
00860     
00861     if (msec_timeout >= 0)
00862         alarm(msec_timeout);
00863 
00864     alarm(msec_timeout);
00865     WvCont::yield();
00866     alarm(-1); // cancel the still-pending alarm, or it might go off later!
00867     
00868     // when we get here, someone has jumped back into our task.
00869     // We have to select(0) here because it's possible that the alarm was 
00870     // ticking _and_ data was available.  This is aggravated especially if
00871     // msec_delay was zero.  Note that running select() here isn't
00872     // inefficient, because if the alarm was expired then pre_select()
00873     // returned true anyway and short-circuited the previous select().
00874     TRACE("hello-%p\n", this);
00875     return !alarm_was_ticking || select(0, readcb, writecb, exceptcb);
00876 }
00877 
00878 
00879 void WvStream::terminate_continue_select()
00880 {
00881     close();
00882     call_ctx = 0; // destroy the context, if necessary
00883 }
00884 
00885 
00886 const WvAddr *WvStream::src() const
00887 {
00888     return NULL;
00889 }
00890 
00891 
00892 void WvStream::setcallback(WvStreamCallback _callfunc, void *_userdata)
00893 { 
00894     callfunc = _callfunc;
00895     userdata = _userdata;
00896     call_ctx = 0; // delete any in-progress WvCont
00897 }
00898 
00899 
00900 void WvStream::legacy_callback(IWvStream& s)
00901 {
00902     execute();
00903     if (!! callfunc)
00904         callfunc(*this, userdata);
00905 }
00906 
00907 
00908 IWvStreamCallback WvStream::setreadcallback(IWvStreamCallback _callback)
00909 {
00910     IWvStreamCallback tmp = readcb;
00911 
00912     readcb = _callback;
00913 
00914     return tmp;
00915 }
00916 
00917 
00918 IWvStreamCallback WvStream::setwritecallback(IWvStreamCallback _callback)
00919 {
00920     IWvStreamCallback tmp = writecb;
00921 
00922     writecb = _callback;
00923 
00924     return tmp;
00925 }
00926 
00927 
00928 IWvStreamCallback WvStream::setexceptcallback(IWvStreamCallback _callback)
00929 {
00930     IWvStreamCallback tmp = exceptcb;
00931 
00932     exceptcb = _callback;
00933 
00934     return tmp;
00935 }
00936 
00937 
00938 IWvStreamCallback WvStream::setclosecallback(IWvStreamCallback _callback)
00939 {
00940     IWvStreamCallback tmp = closecb;
00941     if (isok())
00942         closecb = _callback;
00943     else
00944     {
00945         // already closed?  notify immediately!
00946         closecb = 0;
00947         if (!!_callback)
00948             _callback(*this);
00949     }
00950     return tmp;
00951 }
00952 
00953 
00954 void WvStream::unread(WvBuf &unreadbuf, size_t count)
00955 {
00956     WvDynBuf tmp;
00957     tmp.merge(unreadbuf, count);
00958     tmp.merge(inbuf);
00959     inbuf.zap();
00960     inbuf.merge(tmp);
00961 }

Generated on Tue May 24 05:22:39 2005 for WvStreams by doxygen1.2.15