Skip to content

Commit

Permalink
Instantiate AttachmentDownloader and use it in AttachmentServiceImpl
Browse files Browse the repository at this point in the history
GetOrDownloadAttachments creates state object that tracks set of attachments
for which requests are still pending. Once AttachmentStore::Read finishes 
AttachmentServiceImpl calls AttachmentDownloader to download attachments that 
were not found locally. 

Once all calls to Downloader finish consumer callback is called with results.

I used AttachmentIdSet in this code but didn't update AttachmentIdList to 
AttachmentIdSet in other places. I'd rather do it in separate change.

R=maniscalco@chromium.org
BUG=376073

Review URL: https://codereview.chromium.org/307783002

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@274164 0039d316-1c4b-4281-b951-d872f2087c98
  • Loading branch information
pavely@chromium.org committed Jun 2, 2014
1 parent ab0b115 commit ef72c18
Show file tree
Hide file tree
Showing 7 changed files with 412 additions and 17 deletions.
15 changes: 10 additions & 5 deletions chrome/browser/sync/profile_sync_components_factory_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
#include "sync/api/attachments/attachment_service.h"
#include "sync/api/attachments/attachment_service_impl.h"
#include "sync/api/syncable_service.h"
#include "sync/internal_api/public/attachments/fake_attachment_downloader.h"
#include "sync/internal_api/public/attachments/fake_attachment_store.h"
#include "sync/internal_api/public/attachments/fake_attachment_uploader.h"

Expand Down Expand Up @@ -570,19 +571,23 @@ base::WeakPtr<syncer::SyncableService> ProfileSyncComponentsFactoryImpl::
scoped_ptr<syncer::AttachmentService>
ProfileSyncComponentsFactoryImpl::CreateAttachmentService(
syncer::AttachmentService::Delegate* delegate) {
// TODO(maniscalco): Use a shared (one per profile) thread-safe instance of
// AttachmentUpload instead of creating a new one per AttachmentService (bug
// 369536).
// TODO(maniscalco): Use a shared (one per profile) thread-safe instances of
// AttachmentUploader and AttachmentDownloader instead of creating a new one
// per AttachmentService (bug 369536).
scoped_ptr<syncer::AttachmentUploader> attachment_uploader(
new syncer::FakeAttachmentUploader);
scoped_ptr<syncer::AttachmentDownloader> attachment_downloader(
new syncer::FakeAttachmentDownloader());

scoped_ptr<syncer::AttachmentStore> attachment_store(
new syncer::FakeAttachmentStore(
BrowserThread::GetMessageLoopProxyForThread(BrowserThread::FILE)));

scoped_ptr<syncer::AttachmentService> attachment_service(
new syncer::AttachmentServiceImpl(
attachment_store.Pass(), attachment_uploader.Pass(), delegate));
new syncer::AttachmentServiceImpl(attachment_store.Pass(),
attachment_uploader.Pass(),
attachment_downloader.Pass(),
delegate));

return attachment_service.Pass();
}
Expand Down
3 changes: 3 additions & 0 deletions components/sync_driver/generic_change_processor_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "sync/api/fake_syncable_service.h"
#include "sync/api/sync_change.h"
#include "sync/api/sync_merge_result.h"
#include "sync/internal_api/public/attachments/fake_attachment_downloader.h"
#include "sync/internal_api/public/attachments/fake_attachment_store.h"
#include "sync/internal_api/public/attachments/fake_attachment_uploader.h"
#include "sync/internal_api/public/base/model_type.h"
Expand Down Expand Up @@ -51,6 +52,8 @@ MockAttachmentService::MockAttachmentService()
base::MessageLoopProxy::current())),
scoped_ptr<syncer::AttachmentUploader>(
new syncer::FakeAttachmentUploader),
scoped_ptr<syncer::AttachmentDownloader>(
new syncer::FakeAttachmentDownloader),
NULL) {
}

Expand Down
2 changes: 2 additions & 0 deletions sync/api/attachments/attachment_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#ifndef SYNC_API_ATTACHMENTS_ATTACHMENT_ID_H_
#define SYNC_API_ATTACHMENTS_ATTACHMENT_ID_H_

#include <set>
#include <string>
#include <vector>

Expand Down Expand Up @@ -65,6 +66,7 @@ class SYNC_EXPORT AttachmentId {
};

typedef std::vector<AttachmentId> AttachmentIdList;
typedef std::set<AttachmentId> AttachmentIdSet;

} // namespace syncer

Expand Down
147 changes: 136 additions & 11 deletions sync/api/attachments/attachment_service_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,113 @@
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
#include "sync/api/attachments/attachment.h"
#include "sync/internal_api/public/attachments/fake_attachment_downloader.h"
#include "sync/internal_api/public/attachments/fake_attachment_store.h"
#include "sync/internal_api/public/attachments/fake_attachment_uploader.h"

namespace syncer {

// GetOrDownloadAttachments starts multiple parallel DownloadAttachment calls.
// GetOrDownloadState tracks completion of these calls and posts callback for
// consumer once all attachments are either retrieved or reported unavailable.
class AttachmentServiceImpl::GetOrDownloadState
: public base::RefCounted<GetOrDownloadState>,
public base::NonThreadSafe {
public:
// GetOrDownloadState gets parameter from values passed to
// AttachmentService::GetOrDownloadAttachments.
// |attachment_ids| is a list of attachmens to retrieve.
// |callback| will be posted on current thread when all attachments retrieved
// or confirmed unavailable.
GetOrDownloadState(const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback);

// Attachment was just retrieved. Add it to retrieved attachments.
void AddAttachment(const Attachment& attachment);

// Both reading from local store and downloading attachment failed.
// Add it to unavailable set.
void AddUnavailableAttachmentId(const AttachmentId& attachment_id);

private:
friend class base::RefCounted<GetOrDownloadState>;
virtual ~GetOrDownloadState();

// If all attachment requests completed then post callback to consumer with
// results.
void PostResultIfAllRequestsCompleted();

GetOrDownloadCallback callback_;

// Requests for these attachments are still in progress.
AttachmentIdSet in_progress_attachments_;

AttachmentIdSet unavailable_attachments_;
scoped_ptr<AttachmentMap> retrieved_attachments_;

DISALLOW_COPY_AND_ASSIGN(GetOrDownloadState);
};

AttachmentServiceImpl::GetOrDownloadState::GetOrDownloadState(
const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback)
: callback_(callback), retrieved_attachments_(new AttachmentMap()) {
std::copy(
attachment_ids.begin(),
attachment_ids.end(),
std::inserter(in_progress_attachments_, in_progress_attachments_.end()));
PostResultIfAllRequestsCompleted();
}

AttachmentServiceImpl::GetOrDownloadState::~GetOrDownloadState() {
DCHECK(CalledOnValidThread());
}

void AttachmentServiceImpl::GetOrDownloadState::AddAttachment(
const Attachment& attachment) {
DCHECK(CalledOnValidThread());
DCHECK(retrieved_attachments_->find(attachment.GetId()) ==
retrieved_attachments_->end());
retrieved_attachments_->insert(
std::make_pair(attachment.GetId(), attachment));
DCHECK(in_progress_attachments_.find(attachment.GetId()) !=
in_progress_attachments_.end());
in_progress_attachments_.erase(attachment.GetId());
PostResultIfAllRequestsCompleted();
}

void AttachmentServiceImpl::GetOrDownloadState::AddUnavailableAttachmentId(
const AttachmentId& attachment_id) {
DCHECK(CalledOnValidThread());
DCHECK(unavailable_attachments_.find(attachment_id) ==
unavailable_attachments_.end());
unavailable_attachments_.insert(attachment_id);
DCHECK(in_progress_attachments_.find(attachment_id) !=
in_progress_attachments_.end());
in_progress_attachments_.erase(attachment_id);
PostResultIfAllRequestsCompleted();
}

void
AttachmentServiceImpl::GetOrDownloadState::PostResultIfAllRequestsCompleted() {
if (in_progress_attachments_.empty()) {
// All requests completed. Let's notify consumer.
GetOrDownloadResult result =
unavailable_attachments_.empty() ? GET_SUCCESS : GET_UNSPECIFIED_ERROR;
base::MessageLoop::current()->PostTask(
FROM_HERE,
base::Bind(callback_, result, base::Passed(&retrieved_attachments_)));
}
}

AttachmentServiceImpl::AttachmentServiceImpl(
scoped_ptr<AttachmentStore> attachment_store,
scoped_ptr<AttachmentUploader> attachment_uploader,
scoped_ptr<AttachmentDownloader> attachment_downloader,
Delegate* delegate)
: attachment_store_(attachment_store.Pass()),
attachment_uploader_(attachment_uploader.Pass()),
attachment_downloader_(attachment_downloader.Pass()),
delegate_(delegate),
weak_ptr_factory_(this) {
DCHECK(CalledOnValidThread());
Expand All @@ -35,20 +131,26 @@ scoped_ptr<syncer::AttachmentService> AttachmentServiceImpl::CreateForTest() {
new syncer::FakeAttachmentStore(base::MessageLoopProxy::current()));
scoped_ptr<AttachmentUploader> attachment_uploader(
new FakeAttachmentUploader);
scoped_ptr<AttachmentDownloader> attachment_downloader(
new FakeAttachmentDownloader());
scoped_ptr<syncer::AttachmentService> attachment_service(
new syncer::AttachmentServiceImpl(
attachment_store.Pass(), attachment_uploader.Pass(), NULL));
new syncer::AttachmentServiceImpl(attachment_store.Pass(),
attachment_uploader.Pass(),
attachment_downloader.Pass(),
NULL));
return attachment_service.Pass();
}

void AttachmentServiceImpl::GetOrDownloadAttachments(
const AttachmentIdList& attachment_ids,
const GetOrDownloadCallback& callback) {
DCHECK(CalledOnValidThread());
scoped_refptr<GetOrDownloadState> state(
new GetOrDownloadState(attachment_ids, callback));
attachment_store_->Read(attachment_ids,
base::Bind(&AttachmentServiceImpl::ReadDone,
weak_ptr_factory_.GetWeakPtr(),
callback));
state));
}

void AttachmentServiceImpl::DropAttachments(
Expand Down Expand Up @@ -96,18 +198,29 @@ void AttachmentServiceImpl::OnSyncDataUpdate(
}

void AttachmentServiceImpl::ReadDone(
const GetOrDownloadCallback& callback,
const scoped_refptr<GetOrDownloadState>& state,
const AttachmentStore::Result& result,
scoped_ptr<AttachmentMap> attachments,
scoped_ptr<AttachmentIdList> unavailable_attachment_ids) {
AttachmentService::GetOrDownloadResult get_result =
AttachmentService::GET_UNSPECIFIED_ERROR;
if (result == AttachmentStore::SUCCESS) {
get_result = AttachmentService::GET_SUCCESS;
// Add read attachments to result.
for (AttachmentMap::const_iterator iter = attachments->begin();
iter != attachments->end();
++iter) {
state->AddAttachment(iter->second);
}
// Try to download locally unavailable attachments.
for (AttachmentIdList::const_iterator iter =
unavailable_attachment_ids->begin();
iter != unavailable_attachment_ids->end();
++iter) {
attachment_downloader_->DownloadAttachment(
*iter,
base::Bind(&AttachmentServiceImpl::DownloadDone,
weak_ptr_factory_.GetWeakPtr(),
state,
*iter));
;
}
// TODO(maniscalco): Deal with case where an error occurred (bug 361251).
base::MessageLoop::current()->PostTask(
FROM_HERE, base::Bind(callback, get_result, base::Passed(&attachments)));
}

void AttachmentServiceImpl::DropDone(const DropCallback& callback,
Expand Down Expand Up @@ -145,4 +258,16 @@ void AttachmentServiceImpl::UploadDone(
}
}

void AttachmentServiceImpl::DownloadDone(
const scoped_refptr<GetOrDownloadState>& state,
const AttachmentId& attachment_id,
const AttachmentDownloader::DownloadResult& result,
scoped_ptr<Attachment> attachment) {
if (result == AttachmentDownloader::DOWNLOAD_SUCCESS) {
state->AddAttachment(*attachment.get());
} else {
state->AddUnavailableAttachmentId(attachment_id);
}
}

} // namespace syncer
12 changes: 11 additions & 1 deletion sync/api/attachments/attachment_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
#ifndef SYNC_API_ATTACHMENTS_ATTACHMENT_SERVICE_IMPL_H_
#define SYNC_API_ATTACHMENTS_ATTACHMENT_SERVICE_IMPL_H_

#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
#include "base/threading/non_thread_safe.h"
#include "sync/api/attachments/attachment_downloader.h"
#include "sync/api/attachments/attachment_service.h"
#include "sync/api/attachments/attachment_service_proxy.h"
#include "sync/api/attachments/attachment_store.h"
Expand All @@ -24,6 +26,7 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
// must be valid throughout AttachmentService lifetime.
AttachmentServiceImpl(scoped_ptr<AttachmentStore> attachment_store,
scoped_ptr<AttachmentUploader> attachment_uploader,
scoped_ptr<AttachmentDownloader> attachment_downloader,
Delegate* delegate);
virtual ~AttachmentServiceImpl();

Expand All @@ -43,7 +46,9 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
const SyncData& updated_sync_data) OVERRIDE;

private:
void ReadDone(const GetOrDownloadCallback& callback,
class GetOrDownloadState;

void ReadDone(const scoped_refptr<GetOrDownloadState>& state,
const AttachmentStore::Result& result,
scoped_ptr<AttachmentMap> attachments,
scoped_ptr<AttachmentIdList> unavailable_attachment_ids);
Expand All @@ -53,9 +58,14 @@ class SYNC_EXPORT AttachmentServiceImpl : public AttachmentService,
const AttachmentStore::Result& result);
void UploadDone(const AttachmentUploader::UploadResult& result,
const AttachmentId& attachment_id);
void DownloadDone(const scoped_refptr<GetOrDownloadState>& state,
const AttachmentId& attachment_id,
const AttachmentDownloader::DownloadResult& result,
scoped_ptr<Attachment> attachment);

const scoped_ptr<AttachmentStore> attachment_store_;
const scoped_ptr<AttachmentUploader> attachment_uploader_;
const scoped_ptr<AttachmentDownloader> attachment_downloader_;
Delegate* delegate_;
// Must be last data member.
base::WeakPtrFactory<AttachmentServiceImpl> weak_ptr_factory_;
Expand Down
Loading

0 comments on commit ef72c18

Please sign in to comment.