00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include <time.h>
00012 #include <sys/types.h>
00013 #include <assert.h>
00014 #define __WVSTREAM_UNIT_TEST 1
00015 #include "wvstream.h"
00016 #include "wvtimeutils.h"
00017 #include "wvcont.h"
00018
00019 #ifdef _WIN32
00020 #define ENOBUFS WSAENOBUFS
00021 #undef errno
00022 #define errno GetLastError()
00023 #ifdef __GNUC__
00024 #include <sys/socket.h>
00025 #endif
00026 #include "streams.h"
00027 #else
00028 #include <errno.h>
00029 #endif
00030
00031
00032
00033 #if 0
00034 # ifndef _MSC_VER
00035 # define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
00036 # else
00037 # define TRACE printf
00038 # endif
00039 #else
00040 # ifndef _MSC_VER
00041 # define TRACE(x, y...)
00042 # else
00043 # define TRACE
00044 # endif
00045 #endif
00046
00047 WvStream *WvStream::globalstream = NULL;
00048
00049 UUID_MAP_BEGIN(WvStream)
00050 UUID_MAP_ENTRY(IObject)
00051 UUID_MAP_ENTRY(IWvStream)
00052 UUID_MAP_END
00053
00054
00055 WvStream::WvStream():
00056 read_requires_writable(NULL),
00057 write_requires_readable(NULL),
00058 uses_continue_select(false),
00059 personal_stack_size(65536),
00060 alarm_was_ticking(false),
00061 stop_read(false),
00062 stop_write(false),
00063 closed(false),
00064 userdata(NULL),
00065 readcb(this, &WvStream::legacy_callback),
00066 max_outbuf_size(0),
00067 outbuf_delayed_flush(false),
00068 is_auto_flush(true),
00069 want_to_flush(true),
00070 is_flushing(false),
00071 queue_min(0),
00072 autoclose_time(0),
00073 alarm_time(wvtime_zero),
00074 last_alarm_check(wvtime_zero)
00075 {
00076 TRACE("Creating wvstream %p\n", this);
00077
00078 #ifdef _WIN32
00079 WSAData wsaData;
00080 int result = WSAStartup(MAKEWORD(2,0), &wsaData);
00081 assert(result == 0);
00082 #endif
00083 }
00084
00085
00086
00087 IWvStream::IWvStream()
00088 {
00089 }
00090
00091
00092 IWvStream::~IWvStream()
00093 {
00094 }
00095
00096
00097 WvStream::~WvStream()
00098 {
00099 TRACE("destroying %p\n", this);
00100 close();
00101
00102
00103
00104
00105 assert(!uses_continue_select || !call_ctx);
00106
00107 call_ctx = 0;
00108 TRACE("done destroying %p\n", this);
00109 }
00110
00111
00112 void WvStream::close()
00113 {
00114 TRACE("flushing in wvstream...\n");
00115 flush(2000);
00116 TRACE("(flushed)\n");
00117
00118 closed = true;
00119
00120 if (!!closecb)
00121 {
00122 IWvStreamCallback cb = closecb;
00123 closecb = 0;
00124 cb(*this);
00125 }
00126
00127
00128
00129
00130 }
00131
00132
00133 void WvStream::autoforward(WvStream &s)
00134 {
00135 setcallback(autoforward_callback, &s);
00136 read_requires_writable = &s;
00137 }
00138
00139
00140 void WvStream::noautoforward()
00141 {
00142 setcallback(0, NULL);
00143 read_requires_writable = NULL;
00144 }
00145
00146
00147 void WvStream::autoforward_callback(WvStream &s, void *userdata)
00148 {
00149 WvStream &s2 = *(WvStream *)userdata;
00150 char buf[1024];
00151 size_t len;
00152
00153 len = s.read(buf, sizeof(buf));
00154
00155 s2.write(buf, len);
00156 }
00157
00158
00159 void WvStream::_callback()
00160 {
00161 execute();
00162 if (!! callfunc)
00163 callfunc(*this, userdata);
00164 }
00165
00166
00167 void *WvStream::_callwrap(void *)
00168 {
00169 _callback();
00170 return NULL;
00171 }
00172
00173
00174 void WvStream::callback()
00175 {
00176 TRACE("(?)");
00177
00178
00179 if (alarm_remaining() == 0)
00180 {
00181 alarm_time = wvtime_zero;
00182 alarm_was_ticking = true;
00183 }
00184 else
00185 alarm_was_ticking = false;
00186
00187 assert(!uses_continue_select || personal_stack_size >= 1024);
00188
00189 #define TEST_CONTINUES_HARSHLY 0
00190 #if TEST_CONTINUES_HARSHLY
00191 #ifndef _WIN32
00192 # warning "Using WvCont for *all* streams for testing!"
00193 #endif
00194 if (1)
00195 #else
00196 if (uses_continue_select && personal_stack_size >= 1024)
00197 #endif
00198 {
00199 if (!call_ctx)
00200 {
00201 call_ctx = WvCont(WvCallback<void*,void*>
00202 (this, &WvStream::_callwrap),
00203 personal_stack_size);
00204 }
00205
00206 call_ctx(NULL);
00207 }
00208 else
00209 _callback();
00210
00211
00212
00213
00214
00215
00216 }
00217
00218
00219 bool WvStream::isok() const
00220 {
00221 return !closed && WvErrorBase::isok();
00222 }
00223
00224
00225 void WvStream::seterr(int _errnum)
00226 {
00227 if (!geterr())
00228 {
00229 WvErrorBase::seterr(_errnum);
00230 close();
00231 }
00232 }
00233
00234
00235 size_t WvStream::read(WvBuf &outbuf, size_t count)
00236 {
00237
00238 size_t free = outbuf.free();
00239 if (count > free)
00240 count = free;
00241
00242 WvDynBuf tmp;
00243 unsigned char *buf = tmp.alloc(count);
00244 size_t len = read(buf, count);
00245 tmp.unalloc(count - len);
00246 outbuf.merge(tmp);
00247 return len;
00248 }
00249
00250
00251 size_t WvStream::write(WvBuf &inbuf, size_t count)
00252 {
00253
00254 size_t avail = inbuf.used();
00255 if (count > avail)
00256 count = avail;
00257 const unsigned char *buf = inbuf.get(count);
00258 size_t len = write(buf, count);
00259 inbuf.unget(count - len);
00260 return len;
00261 }
00262
00263
00264 size_t WvStream::read(void *buf, size_t count)
00265 {
00266 size_t bufu, i;
00267 unsigned char *newbuf;
00268
00269 bufu = inbuf.used();
00270 if (bufu < queue_min)
00271 {
00272 newbuf = inbuf.alloc(queue_min - bufu);
00273 i = uread(newbuf, queue_min - bufu);
00274 inbuf.unalloc(queue_min - bufu - i);
00275
00276 bufu = inbuf.used();
00277 }
00278
00279 if (bufu < queue_min)
00280 {
00281 maybe_autoclose();
00282 return 0;
00283 }
00284
00285
00286 if (!bufu)
00287 bufu = uread(buf, count);
00288 else
00289 {
00290
00291 if (bufu > count)
00292 bufu = count;
00293
00294 memcpy(buf, inbuf.get(bufu), bufu);
00295 }
00296
00297 TRACE("read obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu, count);
00298 maybe_autoclose();
00299 return bufu;
00300 }
00301
00302
00303 size_t WvStream::write(const void *buf, size_t count)
00304 {
00305 if (!isok() || !buf || !count || stop_write) return 0;
00306
00307 size_t wrote = 0;
00308 if (!outbuf_delayed_flush && !outbuf.used())
00309 {
00310 wrote = uwrite(buf, count);
00311 count -= wrote;
00312 buf = (const unsigned char *)buf + wrote;
00313
00314 }
00315 if (max_outbuf_size != 0)
00316 {
00317 size_t canbuffer = max_outbuf_size - outbuf.used();
00318 if (count > canbuffer)
00319 count = canbuffer;
00320 }
00321 if (count != 0)
00322 {
00323 outbuf.put(buf, count);
00324 wrote += count;
00325 }
00326
00327 if (should_flush())
00328 {
00329 if (is_auto_flush)
00330 flush(0);
00331 else
00332 flush_outbuf(0);
00333 }
00334
00335 return wrote;
00336 }
00337
00338
00339 void WvStream::noread()
00340 {
00341 stop_read = true;
00342 maybe_autoclose();
00343 }
00344
00345
00346 void WvStream::nowrite()
00347 {
00348 stop_write = true;
00349 maybe_autoclose();
00350 }
00351
00352
00353 void WvStream::maybe_autoclose()
00354 {
00355 if (stop_read && stop_write && !outbuf.used() && !inbuf.used() && !closed)
00356 close();
00357 }
00358
00359
00360 bool WvStream::isreadable()
00361 {
00362 return isok() && select(0, true, false, false);
00363 }
00364
00365
00366 bool WvStream::iswritable()
00367 {
00368 return !stop_write && isok() && select(0, false, true, false);
00369 }
00370
00371
00372 char *WvStream::blocking_getline(time_t wait_msec, int separator,
00373 int readahead)
00374 {
00375
00376
00377
00378
00379
00380
00381
00382 assert(separator >= 0);
00383 assert(separator <= 255);
00384
00385
00386
00387 struct timeval timeout_time;
00388 if (wait_msec > 0)
00389 timeout_time = msecadd(wvtime(), wait_msec);
00390
00391 maybe_autoclose();
00392
00393
00394
00395 while (isok())
00396 {
00397
00398 queuemin(0);
00399
00400
00401 if (inbuf.strchr(separator) > 0)
00402 break;
00403 else if (!isok() || stop_read)
00404 break;
00405
00406
00407 queuemin(inbuf.used() + 1);
00408
00409
00410 if (wait_msec > 0)
00411 {
00412 wait_msec = msecdiff(timeout_time, wvtime());
00413 if (wait_msec < 0)
00414 wait_msec = 0;
00415 }
00416
00417
00418
00419 bool hasdata;
00420 if (wait_msec != 0 && uses_continue_select)
00421 hasdata = continue_select(wait_msec);
00422 else
00423 hasdata = select(wait_msec, true, false);
00424
00425 if (!isok())
00426 break;
00427
00428 if (hasdata)
00429 {
00430
00431 WvDynBuf tmp;
00432 unsigned char *buf = tmp.alloc(readahead);
00433 size_t len = uread(buf, readahead);
00434 tmp.unalloc(readahead - len);
00435 inbuf.merge(tmp);
00436 hasdata = len > 0;
00437 }
00438
00439 if (!isok())
00440 break;
00441
00442 if (!hasdata && wait_msec == 0)
00443 return NULL;
00444 }
00445 if (!inbuf.used())
00446 return NULL;
00447
00448
00449 size_t i = 0;
00450 i = inbuf.strchr(separator);
00451 if (i > 0) {
00452 char *eol = (char *)inbuf.mutablepeek(i - 1, 1);
00453 assert(eol);
00454 *eol = 0;
00455 return const_cast<char*>((const char *)inbuf.get(i));
00456 } else {
00457
00458
00459
00460 inbuf.alloc(1)[0] = 0;
00461 return const_cast<char *>((const char *)inbuf.get(inbuf.used()));
00462 }
00463 }
00464
00465
00466 char *WvStream::continue_getline(time_t wait_msec, int separator,
00467 int readahead)
00468 {
00469 assert(false && "not implemented, come back later!");
00470 assert(uses_continue_select);
00471 return NULL;
00472 }
00473
00474
00475 void WvStream::drain()
00476 {
00477 char buf[1024];
00478 while (isreadable())
00479 read(buf, sizeof(buf));
00480 }
00481
00482
00483 bool WvStream::flush(time_t msec_timeout)
00484 {
00485 if (is_flushing) return false;
00486
00487 TRACE("%p flush starts\n", this);
00488
00489 is_flushing = true;
00490 want_to_flush = true;
00491 bool done = flush_internal(msec_timeout)
00492 && flush_outbuf(msec_timeout);
00493 is_flushing = false;
00494
00495 TRACE("flush stops (%d)\n", done);
00496 return done;
00497 }
00498
00499
00500 bool WvStream::should_flush()
00501 {
00502 return want_to_flush;
00503 }
00504
00505
00506 bool WvStream::flush_outbuf(time_t msec_timeout)
00507 {
00508 TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok());
00509 bool outbuf_was_used = outbuf.used();
00510
00511
00512
00513
00514 if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush)
00515 {
00516 maybe_autoclose();
00517 return true;
00518 }
00519
00520 WvTime stoptime = msecadd(wvtime(), msec_timeout);
00521
00522
00523 while (outbuf_was_used && isok())
00524 {
00525
00526
00527
00528 size_t attempt = outbuf.used();
00529 size_t real = uwrite(outbuf.get(attempt), attempt);
00530
00531
00532
00533
00534 if (isok() && real < attempt)
00535 {
00536 TRACE("flush_outbuf: unget %d-%d\n", attempt, real);
00537 assert(outbuf.ungettable() >= attempt - real);
00538 outbuf.unget(attempt - real);
00539 }
00540
00541
00542
00543
00544
00545 if (!msec_timeout)
00546 break;
00547 if (msec_timeout >= 0
00548 && (stoptime < wvtime() || !select(msec_timeout, false, true)))
00549 break;
00550
00551 outbuf_was_used = outbuf.used();
00552 }
00553
00554
00555 if (autoclose_time && isok())
00556 {
00557 time_t now = time(NULL);
00558 TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n",
00559 this, now - autoclose_time, outbuf.used());
00560 if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time)
00561 {
00562 autoclose_time = 0;
00563 close();
00564 }
00565 }
00566
00567 TRACE("flush_outbuf: after autoclose chunk\n");
00568 if (outbuf_delayed_flush && !outbuf_was_used)
00569 want_to_flush = false;
00570
00571 TRACE("flush_outbuf: now isok=%d\n", isok());
00572
00573
00574 if (outbuf_was_used && !isok())
00575 outbuf.zap();
00576
00577 maybe_autoclose();
00578 TRACE("flush_outbuf stops\n");
00579
00580 return !outbuf_was_used;
00581 }
00582
00583
00584 bool WvStream::flush_internal(time_t msec_timeout)
00585 {
00586
00587 return true;
00588 }
00589
00590
00591 int WvStream::getrfd() const
00592 {
00593 return -1;
00594 }
00595
00596
00597 int WvStream::getwfd() const
00598 {
00599 return -1;
00600 }
00601
00602
00603 void WvStream::flush_then_close(int msec_timeout)
00604 {
00605 time_t now = time(NULL);
00606 autoclose_time = now + (msec_timeout + 999) / 1000;
00607
00608 TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n",
00609 this, outbuf.used(), autoclose_time - now);
00610
00611
00612
00613
00614
00615
00616 flush(0);
00617 }
00618
00619
00620 bool WvStream::pre_select(SelectInfo &si)
00621 {
00622 maybe_autoclose();
00623
00624 time_t alarmleft = alarm_remaining();
00625
00626 if (!si.inherit_request && alarmleft == 0)
00627 return true;
00628
00629 if (!si.inherit_request)
00630 {
00631 si.wants.readable |= readcb;
00632 si.wants.writable |= writecb;
00633 si.wants.isexception |= exceptcb;
00634 }
00635
00636
00637 if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
00638 return true;
00639 if (alarmleft >= 0
00640 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
00641 si.msec_timeout = alarmleft + 10;
00642 return false;
00643 }
00644
00645
00646 bool WvStream::post_select(SelectInfo &si)
00647 {
00648
00649
00650
00651
00652
00653
00654 if (should_flush())
00655 flush(0);
00656 if (!si.inherit_request && alarm_remaining() == 0)
00657 return true;
00658 return false;
00659 }
00660
00661
00662 bool WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
00663 bool readable, bool writable, bool isexcept, bool forceable)
00664 {
00665 FD_ZERO(&si.read);
00666 FD_ZERO(&si.write);
00667 FD_ZERO(&si.except);
00668
00669 if (forceable)
00670 {
00671 si.wants.readable = readcb;
00672 si.wants.writable = writecb;
00673 si.wants.isexception = exceptcb;
00674 }
00675 else
00676 {
00677 si.wants.readable = readable;
00678 si.wants.writable = writable;
00679 si.wants.isexception = isexcept;
00680 }
00681
00682 si.max_fd = -1;
00683 si.msec_timeout = msec_timeout;
00684 si.inherit_request = ! forceable;
00685 si.global_sure = false;
00686
00687 if (!isok()) return false;
00688
00689 bool sure = pre_select(si);
00690 if (globalstream && forceable && (globalstream != this))
00691 {
00692 WvStream *s = globalstream;
00693 globalstream = NULL;
00694 si.global_sure = s->xpre_select(si, SelectRequest(false, false, false));
00695 globalstream = s;
00696 }
00697 if (sure || si.global_sure)
00698 si.msec_timeout = 0;
00699 return sure;
00700 }
00701
00702
00703 int WvStream::_do_select(SelectInfo &si)
00704 {
00705
00706 timeval tv;
00707 tv.tv_sec = si.msec_timeout / 1000;
00708 tv.tv_usec = (si.msec_timeout % 1000) * 1000;
00709
00710 #ifdef _WIN32
00711
00712 SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0);
00713 FD_SET(fakefd, &si.except);
00714 #endif
00715
00716
00717 int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
00718 si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
00719
00720
00721
00722
00723
00724
00725 if (sel < 0
00726 && errno != EAGAIN && errno != EINTR
00727 && errno != EBADF
00728 && errno != ENOBUFS
00729 )
00730 {
00731 seterr(errno);
00732 }
00733 #ifdef _WIN32
00734 ::close(fakefd);
00735 #endif
00736 TRACE("select() returned %d\n", sel);
00737 return sel;
00738 }
00739
00740
00741 bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable)
00742 {
00743 if (!isok()) return false;
00744
00745 bool sure = post_select(si);
00746 if (globalstream && forceable && (globalstream != this))
00747 {
00748 WvStream *s = globalstream;
00749 globalstream = NULL;
00750 si.global_sure = s->xpost_select(si, SelectRequest(false, false, false))
00751 || si.global_sure;
00752 globalstream = s;
00753 }
00754 return sure;
00755 }
00756
00757
00758 bool WvStream::_select(time_t msec_timeout,
00759 bool readable, bool writable, bool isexcept, bool forceable)
00760 {
00761 SelectInfo si;
00762 bool sure = _build_selectinfo(si, msec_timeout,
00763 readable, writable, isexcept, forceable);
00764
00765 if (!isok())
00766 return false;
00767
00768
00769
00770
00771
00772
00773
00774
00775
00776 int sel = _do_select(si);
00777 if (sel >= 0)
00778 sure = _process_selectinfo(si, forceable) || sure;
00779 if (si.global_sure && globalstream && forceable && (globalstream != this))
00780 globalstream->callback();
00781 return sure;
00782 }
00783
00784
00785 IWvStream::SelectRequest WvStream::get_select_request()
00786 {
00787 return IWvStream::SelectRequest(readcb, writecb, exceptcb);
00788 }
00789
00790
00791 void WvStream::force_select(bool readable, bool writable, bool isexception)
00792 {
00793 if (readable)
00794 readcb = IWvStreamCallback(this, &WvStream::legacy_callback);
00795 if (writable)
00796 writecb = IWvStreamCallback(this, &WvStream::legacy_callback);
00797 if (isexception)
00798 exceptcb = IWvStreamCallback(this, &WvStream::legacy_callback);
00799 }
00800
00801
00802 void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
00803 {
00804 if (readable)
00805 readcb = 0;
00806 if (writable)
00807 writecb = 0;
00808 if (isexception)
00809 exceptcb = 0;
00810 }
00811
00812
00813 void WvStream::alarm(time_t msec_timeout)
00814 {
00815 if (msec_timeout >= 0)
00816 alarm_time = msecadd(wvtime(), msec_timeout);
00817 else
00818 alarm_time = wvtime_zero;
00819 }
00820
00821
00822 time_t WvStream::alarm_remaining()
00823 {
00824 if (alarm_time.tv_sec)
00825 {
00826 WvTime now = wvtime();
00827
00828
00829 if (now < last_alarm_check)
00830 {
00831 #if 0 // okay, I give up. Time just plain goes backwards on some systems.
00832
00833 if (msecdiff(last_alarm_check, now) > 200)
00834 fprintf(stderr, " ************* TIME WENT BACKWARDS! "
00835 "(%ld:%ld %ld:%ld)\n",
00836 last_alarm_check.tv_sec, last_alarm_check.tv_usec,
00837 now.tv_sec, now.tv_usec);
00838 #endif
00839 alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
00840 }
00841
00842 last_alarm_check = now;
00843
00844 time_t remaining = msecdiff(alarm_time, now);
00845 if (remaining < 0)
00846 remaining = 0;
00847 return remaining;
00848 }
00849 return -1;
00850 }
00851
00852
00853 bool WvStream::continue_select(time_t msec_timeout)
00854 {
00855 assert(uses_continue_select);
00856
00857
00858
00859 assert(call_ctx);
00860
00861 if (msec_timeout >= 0)
00862 alarm(msec_timeout);
00863
00864 alarm(msec_timeout);
00865 WvCont::yield();
00866 alarm(-1);
00867
00868
00869
00870
00871
00872
00873
00874 TRACE("hello-%p\n", this);
00875 return !alarm_was_ticking || select(0, readcb, writecb, exceptcb);
00876 }
00877
00878
00879 void WvStream::terminate_continue_select()
00880 {
00881 close();
00882 call_ctx = 0;
00883 }
00884
00885
00886 const WvAddr *WvStream::src() const
00887 {
00888 return NULL;
00889 }
00890
00891
00892 void WvStream::setcallback(WvStreamCallback _callfunc, void *_userdata)
00893 {
00894 callfunc = _callfunc;
00895 userdata = _userdata;
00896 call_ctx = 0;
00897 }
00898
00899
00900 void WvStream::legacy_callback(IWvStream& s)
00901 {
00902 execute();
00903 if (!! callfunc)
00904 callfunc(*this, userdata);
00905 }
00906
00907
00908 IWvStreamCallback WvStream::setreadcallback(IWvStreamCallback _callback)
00909 {
00910 IWvStreamCallback tmp = readcb;
00911
00912 readcb = _callback;
00913
00914 return tmp;
00915 }
00916
00917
00918 IWvStreamCallback WvStream::setwritecallback(IWvStreamCallback _callback)
00919 {
00920 IWvStreamCallback tmp = writecb;
00921
00922 writecb = _callback;
00923
00924 return tmp;
00925 }
00926
00927
00928 IWvStreamCallback WvStream::setexceptcallback(IWvStreamCallback _callback)
00929 {
00930 IWvStreamCallback tmp = exceptcb;
00931
00932 exceptcb = _callback;
00933
00934 return tmp;
00935 }
00936
00937
00938 IWvStreamCallback WvStream::setclosecallback(IWvStreamCallback _callback)
00939 {
00940 IWvStreamCallback tmp = closecb;
00941 if (isok())
00942 closecb = _callback;
00943 else
00944 {
00945
00946 closecb = 0;
00947 if (!!_callback)
00948 _callback(*this);
00949 }
00950 return tmp;
00951 }
00952
00953
00954 void WvStream::unread(WvBuf &unreadbuf, size_t count)
00955 {
00956 WvDynBuf tmp;
00957 tmp.merge(unreadbuf, count);
00958 tmp.merge(inbuf);
00959 inbuf.zap();
00960 inbuf.merge(tmp);
00961 }