@@ -250,7 +250,7 @@ manapi::net::worker::shared_conn manapi::net::worker::TCP::accept (ev::shared_tc
250
250
return ;
251
251
252
252
if (nread < 0 ) {
253
- if (errno == EAGAIN || errno == EWOULDBLOCK )
253
+ if (nread == ev::FS_EAGAIN )
254
254
return ;
255
255
256
256
/* maybe EOF */
@@ -461,7 +461,7 @@ ssize_t manapi::net::worker::TCP::sync_write_ex(const worker::shared_conn &conn,
461
461
rhs = connection->watcher ->try_write (buff, nbuff);
462
462
463
463
if (rhs < 0 )
464
- if (errno == EAGAIN || errno == EWOULDBLOCK )
464
+ if (rhs == ev::FS_EAGAIN )
465
465
rhs = 0 ;
466
466
else
467
467
return -1 ;
@@ -569,115 +569,119 @@ int manapi::net::worker::TCP::event_flags(const shared_conn & conn) {
569
569
int manapi::net::worker::TCP::flush_write_ (const worker::shared_conn &connection, bool flush) {
570
570
auto conn = connection->as <connection_interface>();
571
571
572
- while ((conn->top ->cur_send_size >= this ->config_ ->max_merge_buffer_stack )
573
- // || ((conn->top->cur_send_size == this->config_->max_merge_buffer_stack) && (conn->top->send.last_deque->buffer.size() == conn->top->send.deque_cursor))
574
- || (flush && conn->top ->cur_send_size )) {
572
+ if (conn->top ->cur_send_size ) {
573
+ std::cout << " flush " << flush << " " <<(bool )conn->top ->cur_send_size << " " << (bool )conn->top ->send .deque << " \n " ;
574
+ while (conn->top ->cur_send_size && ((conn->top ->cur_send_size >= this ->config_ ->max_merge_buffer_stack )
575
+ // || ((conn->top->cur_send_size == this->config_->max_merge_buffer_stack) && (conn->top->send.last_deque->buffer.size() == conn->top->send.deque_cursor))
576
+ || (flush))) {
575
577
576
- std::unique_ptr<ev::buff_t , ev::buffer_deleter> s;
577
- s.reset (new ev::buff_t [conn->top ->cur_send_size ]);
578
+ std::unique_ptr<ev::buff_t , ev::buffer_deleter> s;
579
+ s.reset (new ev::buff_t [conn->top ->cur_send_size ]);
578
580
579
- auto const buffptr = s.get ();
580
- std::unique_ptr<buffer_deque> sent = std::move (conn->top ->send .deque );
581
- auto current = sent.get ();
582
- ssize_t request = 0 ;
581
+ auto const buffptr = s.get ();
582
+ std::unique_ptr<buffer_deque> sent = std::move (conn->top ->send .deque );
583
+ auto current = sent.get ();
584
+ ssize_t request = 0 ;
583
585
584
- for (int i = 0 ; i < conn->top ->cur_send_size ; i++) {
585
- auto &object = current->buffer ;
586
+ for (int i = 0 ; i < conn->top ->cur_send_size ; i++) {
587
+ auto &object = current->buffer ;
586
588
587
- if (current == conn->top ->send .last_deque ) {
588
- conn->top ->send .last_deque = nullptr ;
589
- object.resize (conn->top ->send .deque_cursor );
590
- conn->top ->send .deque_cursor = 0 ;
591
- }
592
-
593
- if (conn->top ->send .deque_current ) {
594
- object.shift_add (conn->top ->send .deque_current );
595
- conn->top ->send .deque_current = 0 ;
596
- }
597
-
598
- buffptr[i].base = object.data ();
599
- buffptr[i].len = object.size ();
589
+ if (current == conn->top ->send .last_deque ) {
590
+ conn->top ->send .last_deque = nullptr ;
591
+ object.resize (conn->top ->send .deque_cursor );
592
+ conn->top ->send .deque_cursor = 0 ;
593
+ }
600
594
601
- request += buffptr[i].len ;
595
+ if (conn->top ->send .deque_current ) {
596
+ object.shift_add (conn->top ->send .deque_current );
597
+ conn->top ->send .deque_current = 0 ;
598
+ }
602
599
603
- if (i + 1 != conn->top ->cur_send_size )
604
- current = current->next .get ();
605
- }
600
+ buffptr[i].base = object.data ();
601
+ buffptr[i].len = object.size ();
606
602
607
- if (current && current->next )
608
- conn->top ->send .deque = std::move (current->next );
603
+ request += buffptr[i].len ;
609
604
610
- ssize_t rhs = conn->watcher ->try_write (buffptr, conn->top ->cur_send_size );
611
- if (request == rhs) {
612
- conn->top ->send_size -= conn->top ->cur_send_size ;
613
- conn->top ->cur_send_size = 0 ;
614
- }
615
- else {
616
- if (rhs < 0 ) {
617
- if (errno == EAGAIN || errno == EWOULDBLOCK)
618
- rhs = 0 ;
619
- else {
620
- conn->top ->send_size -= conn->top ->cur_send_size ;
621
- conn->top ->cur_send_size = 0 ;
622
- return CONN_IO_ERROR;
623
- }
605
+ if (i + 1 != conn->top ->cur_send_size )
606
+ current = current->next .get ();
624
607
}
625
608
609
+ if (current && current->next )
610
+ conn->top ->send .deque = std::move (current->next );
626
611
627
- uint32_t cursor = 0 ;
628
- while (cursor != conn->top ->cur_send_size
629
- && rhs >= buffptr[cursor].len ) {
630
- rhs -= static_cast <ssize_t >(buffptr[cursor].len );
631
- sent = std::move (sent->next );
632
- cursor++;
612
+ ssize_t rhs = conn->watcher ->try_write (buffptr, conn->top ->cur_send_size );
613
+ if (request == rhs) {
614
+ conn->top ->send_size -= conn->top ->cur_send_size ;
615
+ conn->top ->cur_send_size = 0 ;
633
616
}
617
+ else {
618
+ if (rhs < 0 ) {
619
+ if (rhs == ev::FS_EAGAIN)
620
+ rhs = 0 ;
621
+ else {
622
+ conn->top ->send_size -= conn->top ->cur_send_size ;
623
+ conn->top ->cur_send_size = 0 ;
624
+ return CONN_IO_ERROR;
625
+ }
626
+ }
634
627
635
- conn->top ->cur_send_size -= cursor;
636
- conn->top ->send_size -= cursor;
637
-
638
- if (rhs && sent) {
639
- sent->buffer .shift_add (rhs);
640
- buffptr[cursor].base += rhs;
641
- buffptr[cursor].len -= rhs;
642
- }
643
628
644
- if (conn->top ->cur_send_size >= this ->config_ ->max_merge_buffer_stack ) {
645
- auto w = manapi::async::current ()->eventloop ()
646
- ->create_watcher_write (conn->watcher .get (), [connection, b = std::move (sent), s = std::move (s)]
647
- (std::shared_ptr<ev::write> &w, int status)
648
- mutable -> void {
649
- auto conn = connection->as <connection_interface>();
629
+ uint32_t cursor = 0 ;
630
+ while (cursor != conn->top ->cur_send_size
631
+ && rhs >= buffptr[cursor].len ) {
632
+ rhs -= static_cast <ssize_t >(buffptr[cursor].len );
633
+ sent = std::move (sent->next );
634
+ cursor++;
635
+ }
650
636
651
- conn->top ->send_size -= w->custom ()->nbufs ;
652
- s.reset ();
653
- b.reset ();
637
+ conn->top ->cur_send_size -= cursor;
638
+ conn->top ->send_size -= cursor;
654
639
655
- if (status) {
656
- /* error */
657
- conn->status |= ev::DISCONNECT;
658
- connection->cancellation .cancel ();
640
+ if (rhs && sent) {
641
+ sent->buffer .shift_add (rhs);
642
+ buffptr[cursor].base += rhs;
643
+ buffptr[cursor].len -= rhs;
644
+ }
659
645
660
- if (conn->ev_callback )
661
- conn->ev_callback ->operator ()(connection, ev::DISCONNECT, nullptr , 0 , nullptr );
662
- }
663
- else {
664
- if (conn->status & ev::WRITE && conn->ev_callback )
665
- conn->ev_callback ->operator ()(connection, ev::WRITE, nullptr , 0 , nullptr );
666
- }
646
+ if (conn->top ->cur_send_size
647
+ && (conn->top ->cur_send_size >= this ->config_ ->max_merge_buffer_stack || flush)) {
648
+ auto w = manapi::async::current ()->eventloop ()
649
+ ->create_watcher_write (conn->watcher .get (), [connection, b = std::move (sent), s = std::move (s)]
650
+ (std::shared_ptr<ev::write> &w, int status)
651
+ mutable -> void {
652
+ auto conn = connection->as <connection_interface>();
653
+
654
+ conn->top ->send_size -= w->custom ()->nbufs ;
655
+ s.reset ();
656
+ b.reset ();
657
+
658
+ if (status) {
659
+ /* error */
660
+ conn->status |= ev::DISCONNECT;
661
+ connection->cancellation .cancel ();
662
+
663
+ if (conn->ev_callback )
664
+ conn->ev_callback ->operator ()(connection, ev::DISCONNECT, nullptr , 0 , nullptr );
665
+ }
666
+ else {
667
+ if (conn->status & ev::WRITE && conn->ev_callback )
668
+ conn->ev_callback ->operator ()(connection, ev::WRITE, nullptr , 0 , nullptr );
669
+ }
667
670
668
- manapi::async::current ()->eventloop ()->stop_watcher (w);
669
- }, s.get () + cursor, conn->top ->cur_send_size /* nbuf */ );
671
+ manapi::async::current ()->eventloop ()->stop_watcher (w);
672
+ }, s.get () + cursor, conn->top ->cur_send_size /* nbuf */ );
670
673
671
- conn->top ->cur_send_size = 0 ;
672
- }
673
- else {
674
- if (sent) {
675
- assert (current);
676
- if (conn->top ->send .last_deque ) {
677
- current->next = std::move (conn->top ->send .deque );
674
+ conn->top ->cur_send_size = 0 ;
675
+ }
676
+ else {
677
+ if (sent) {
678
+ assert (current);
679
+ if (conn->top ->send .last_deque ) {
680
+ current->next = std::move (conn->top ->send .deque );
681
+ }
682
+ conn->top ->send .deque = std::move (sent);
683
+ conn->top ->send .last_deque = current;
678
684
}
679
- conn->top ->send .deque = std::move (sent);
680
- conn->top ->send .last_deque = current;
681
685
}
682
686
}
683
687
}
0 commit comments