Skip to content

Commit

Permalink
Revert 235213 "android: forwader2: Simplify Forwarder implementa..."
Browse files Browse the repository at this point in the history
> android: forwader2: Simplify Forwarder implementation.
> 
> This patch does several things to the Forwarder class used in the 'forwarder2'
> tool:
> 
> - Add a Stop() method to gently ask the forwarder to shut-down. This stops
>   the forwarder from listening from incoming data, but keeps it running
>   until it could flush its buffer properly, to avoid packet loss.
> 
> - Simplify / refactor the buffer management code inside a Forwarder
>   instance, to make the state of its buffer, and their transitions,
>   more explict.
> 
> BUG=313809
> R=pliard@chromium.org
> 
> Review URL: https://codereview.chromium.org/61793013

TBR=digit@chromium.org

Review URL: https://codereview.chromium.org/70193013

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@235729 0039d316-1c4b-4281-b951-d872f2087c98
  • Loading branch information
pliard@chromium.org committed Nov 18, 2013
1 parent 3e00d6a commit 715d972
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 204 deletions.
261 changes: 59 additions & 202 deletions tools/android/forwarder2/forwarder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,206 +8,77 @@
#include "base/bind.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
#include "base/message_loop/message_loop_proxy.h"
#include "base/posix/eintr_wrapper.h"
#include "base/single_thread_task_runner.h"
#include "base/threading/thread.h"
#include "tools/android/forwarder2/pipe_notifier.h"
#include "tools/android/forwarder2/socket.h"

namespace forwarder2 {
namespace {

// Helper class to buffer reads and writes from one socket to another.
// Each implements a small buffer connected two one input socket, and
// one output socket.
//
// socket_from_ ---> [BufferedCopier] ---> socket_to_
//
// These objects are used in a pair to handle duplex traffic, as in:
//
// ------> [BufferedCopier_1] --->
// / \
// socket_1 * * socket_2
// \ /
// <------ [BufferedCopier_2] <----
//
// When a BufferedCopier is in the READING state (see below), it only listens
// to events on its input socket, and won't detect when its output socket
// disconnects. To work around this, its peer will call its Close() method
// when that happens.

class BufferedCopier {
public:
// Possible states:
// READING - Empty buffer and Waiting for input.
// WRITING - Data in buffer, and waiting for output.
// CLOSING - Like WRITING, but do not try to read after that.
// CLOSED - Completely closed.
//
// State transitions are:
//
// T01: READING ---[receive data]---> WRITING
// T02: READING ---[error on input socket]---> CLOSED
// T03: READING ---[Close() call]---> CLOSED
//
// T04: WRITING ---[write partial data]---> WRITING
// T05: WRITING ---[write all data]----> READING
// T06: WRITING ---[error on output socket]----> CLOSED
// T07: WRITING ---[Close() call]---> CLOSING
//
// T08: CLOSING ---[write partial data]---> CLOSING
// T09: CLOSING ---[write all data]----> CLOSED
// T10: CLOSING ---[Close() call]---> CLOSING
// T11: CLOSING ---[error on output socket] ---> CLOSED
//
enum State {
STATE_READING = 0,
STATE_WRITING = 1,
STATE_CLOSING = 2,
STATE_CLOSED = 3,
};

// Does NOT own the pointers.
BufferedCopier(Socket* socket_from, Socket* socket_to)
BufferedCopier(Socket* socket_from,
Socket* socket_to)
: socket_from_(socket_from),
socket_to_(socket_to),
bytes_read_(0),
write_offset_(0),
peer_(NULL),
state_(STATE_READING) {}

// Sets the 'peer_' field pointing to the other BufferedCopier in a pair.
void SetPeer(BufferedCopier* peer) { peer_ = peer; }

// Gently asks to close a buffer. Called either by the peer or the forwarder.
void Close() {
switch (state_) {
case STATE_READING:
state_ = STATE_CLOSED; // T03
break;
case STATE_WRITING:
state_ = STATE_CLOSING; // T07
break;
case STATE_CLOSING:
break; // T10
case STATE_CLOSED:
;
}
write_offset_(0) {
}

// Call this before select(). This updates |read_fds|,
// |write_fds| and |max_fd| appropriately *if* the buffer isn't closed.
void PrepareSelect(fd_set* read_fds, fd_set* write_fds, int* max_fd) {
int fd;
switch (state_) {
case STATE_READING:
DCHECK(bytes_read_ == 0);
DCHECK(write_offset_ == 0);
fd = socket_from_->fd();
if (fd < 0) {
ForceClose(); // T02
return;
}
FD_SET(fd, read_fds);
break;
bool AddToReadSet(fd_set* read_fds) {
if (bytes_read_ == 0)
return socket_from_->AddFdToSet(read_fds);
return false;
}

case STATE_WRITING:
case STATE_CLOSING:
DCHECK(bytes_read_ > 0);
DCHECK(write_offset_ < bytes_read_);
fd = socket_to_->fd();
if (fd < 0) {
ForceClose(); // T06
return;
}
FD_SET(fd, write_fds);
break;
bool AddToWriteSet(fd_set* write_fds) {
if (write_offset_ < bytes_read_)
return socket_to_->AddFdToSet(write_fds);
return false;
}

case STATE_CLOSED:
return;
bool TryRead(const fd_set& read_fds) {
if (!socket_from_->IsFdInSet(read_fds))
return false;
if (bytes_read_ != 0) // Can't read.
return false;
int ret = socket_from_->Read(buffer_, kBufferSize);
if (ret > 0) {
bytes_read_ = ret;
return true;
}
*max_fd = std::max(*max_fd, fd);
return false;
}

// Call this after a select() call to operate over the buffer.
void ProcessSelect(const fd_set& read_fds, const fd_set& write_fds) {
int fd, ret;
switch (state_) {
case STATE_READING:
fd = socket_from_->fd();
if (fd < 0) {
state_ = STATE_CLOSED; // T02
return;
}
if (!FD_ISSET(fd, &read_fds))
return;

ret = socket_from_->NonBlockingRead(buffer_, kBufferSize);
if (ret <= 0) {
ForceClose(); // T02
return;
}
bytes_read_ = ret;
write_offset_ = 0;
state_ = STATE_WRITING; // T01
break;

case STATE_WRITING:
case STATE_CLOSING:
fd = socket_to_->fd();
if (fd < 0) {
ForceClose(); // T06 + T11
return;
}
if (!FD_ISSET(fd, &write_fds))
return;

ret = socket_to_->NonBlockingWrite(buffer_ + write_offset_,
bytes_read_ - write_offset_);
if (ret <= 0) {
ForceClose(); // T06 + T11
return;
}

write_offset_ += ret;
if (write_offset_ < bytes_read_)
return; // T08 + T04

bool TryWrite(const fd_set& write_fds) {
if (!socket_to_->IsFdInSet(write_fds))
return false;
if (write_offset_ >= bytes_read_) // Nothing to write.
return false;
int ret = socket_to_->Write(buffer_ + write_offset_,
bytes_read_ - write_offset_);
if (ret > 0) {
write_offset_ += ret;
if (write_offset_ == bytes_read_) {
write_offset_ = 0;
bytes_read_ = 0;
if (state_ == STATE_CLOSING) {
ForceClose(); // T09
return;
}
state_ = STATE_READING; // T05
break;

case STATE_CLOSED:
;
}
return true;
}
return false;
}

private:
// Internal method used to close the buffer and notify the peer, if any.
void ForceClose() {
if (peer_) {
peer_->Close();
peer_ = NULL;
}
state_ = STATE_CLOSED;
}

// Not owned.
Socket* socket_from_;
Socket* socket_to_;

// A big buffer to let the file-over-http bridge work more like real file.
// A big buffer to let our file-over-http bridge work more like real file.
static const int kBufferSize = 1024 * 128;
int bytes_read_;
int write_offset_;
BufferedCopier* peer_;
State state_;
char buffer_[kBufferSize];

DISALLOW_COPY_AND_ASSIGN(BufferedCopier);
Expand All @@ -223,13 +94,12 @@ class BufferedCopier {
// created it.
class Forwarder {
public:
// Create a new Forwarder instance. |socket1| and |socket2| are the two socket
// endpoints.
Forwarder(scoped_ptr<Socket> socket1, scoped_ptr<Socket> socket2)
: socket1_(socket1.Pass()),
socket2_(socket2.Pass()),
destructor_runner_(base::MessageLoopProxy::current()),
thread_("ForwarderThread") {}
thread_("ForwarderThread") {
}

void Start() {
thread_.Start();
Expand All @@ -238,54 +108,42 @@ class Forwarder {
base::Bind(&Forwarder::ThreadHandler, base::Unretained(this)));
}

void Stop() { deletion_notifier_.Notify(); }

private:
void ThreadHandler() {
const int nfds = Socket::GetHighestFileDescriptor(*socket1_, *socket2_) + 1;
fd_set read_fds;
fd_set write_fds;

// Copy from socket1 to socket2
BufferedCopier buffer1(socket1_.get(), socket2_.get());

// Copy from socket2 to socket1
BufferedCopier buffer2(socket2_.get(), socket1_.get());

buffer1.SetPeer(&buffer2);
buffer2.SetPeer(&buffer1);

for (;;) {
bool run = true;
while (run) {
FD_ZERO(&read_fds);
FD_ZERO(&write_fds);

int max_fd = -1;
buffer1.PrepareSelect(&read_fds, &write_fds, &max_fd);
buffer2.PrepareSelect(&read_fds, &write_fds, &max_fd);
buffer1.AddToReadSet(&read_fds);
buffer2.AddToReadSet(&read_fds);
buffer1.AddToWriteSet(&write_fds);
buffer2.AddToWriteSet(&write_fds);

if (max_fd < 0) {
// Both buffers are closed. Exit immediately.
break;
}

const int deletion_fd = deletion_notifier_.receiver_fd();
if (deletion_fd >= 0) {
FD_SET(deletion_fd, &read_fds);
max_fd = std::max(max_fd, deletion_fd);
}

if (HANDLE_EINTR(select(max_fd + 1, &read_fds, &write_fds, NULL, NULL)) <=
0) {
if (HANDLE_EINTR(select(nfds, &read_fds, &write_fds, NULL, NULL)) <= 0) {
PLOG(ERROR) << "select";
break;
}

buffer1.ProcessSelect(read_fds, write_fds);
buffer2.ProcessSelect(read_fds, write_fds);

if (deletion_fd >= 0 && FD_ISSET(deletion_fd, &read_fds)) {
buffer1.Close();
buffer2.Close();
}
// When a socket in the read set closes the connection, select() returns
// with that socket descriptor set as "ready to read". When we call
// TryRead() below, it will return false, but the while loop will continue
// to run until all the write operations are finished, to make sure the
// buffers are completely flushed out.

// Keep running while we have some operation to do.
run = buffer1.TryRead(read_fds);
run = run || buffer2.TryRead(read_fds);
run = run || buffer1.TryWrite(write_fds);
run = run || buffer2.TryWrite(write_fds);
}

// Note that the thread that |destruction_runner_| runs tasks on could be
Expand All @@ -294,13 +152,12 @@ class Forwarder {
socket1_.reset();
socket2_.reset();

// Ensure the object is destroyed on the thread that created it.
// Note that base::Thread must be destroyed on the thread it was created on.
destructor_runner_->DeleteSoon(FROM_HERE, this);
}

scoped_ptr<Socket> socket1_;
scoped_ptr<Socket> socket2_;
PipeNotifier deletion_notifier_;
scoped_refptr<base::SingleThreadTaskRunner> destructor_runner_;
base::Thread thread_;
};
Expand Down
1 change: 1 addition & 0 deletions tools/android/forwarder2/forwarder.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define TOOLS_ANDROID_FORWARDER2_FORWARDER_H_

#include "base/memory/scoped_ptr.h"
#include "base/threading/thread.h"

namespace forwarder2 {

Expand Down
18 changes: 18 additions & 0 deletions tools/android/forwarder2/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,19 @@ int Socket::GetPort() {
return port_;
}

bool Socket::IsFdInSet(const fd_set& fds) const {
if (IsClosed())
return false;
return FD_ISSET(socket_, &fds);
}

bool Socket::AddFdToSet(fd_set* fds) const {
if (IsClosed())
return false;
FD_SET(socket_, fds);
return true;
}

int Socket::ReadNumBytes(void* buffer, size_t num_bytes) {
int bytes_read = 0;
int ret = 1;
Expand Down Expand Up @@ -428,6 +441,11 @@ bool Socket::WaitForEvent(EventType type, int timeout_secs) {
return !event_was_fired;
}

// static
int Socket::GetHighestFileDescriptor(const Socket& s1, const Socket& s2) {
return std::max(s1.socket_, s2.socket_);
}

// static
pid_t Socket::GetUnixDomainSocketProcessOwner(const std::string& path) {
Socket socket;
Expand Down
Loading

0 comments on commit 715d972

Please sign in to comment.