Skip to content

Commit

Permalink
Introduce audio::LoopbackStream implementation.
Browse files Browse the repository at this point in the history
The LoopbackStream is an audio::InputStream that provides the result of
looping-back and mixing-together all current and future audio output
streams belonging to the same "group." An example of this would be for
tab capture, where all output streams from one tab are recorded and then
re-mixed into a single capture stream.

Bug: 824019
Change-Id: I3374182798452bbf9d8688e318dc0546d3630f5e
Reviewed-on: https://chromium-review.googlesource.com/1041661
Commit-Queue: Yuri Wiitala <miu@chromium.org>
Reviewed-by: Olga Sharonova <olka@chromium.org>
Reviewed-by: Max Morin <maxmorin@chromium.org>
Cr-Commit-Position: refs/heads/master@{#557440}
  • Loading branch information
miu-chromium authored and Commit Bot committed May 10, 2018
1 parent b97a264 commit 6905515
Show file tree
Hide file tree
Showing 4 changed files with 1,021 additions and 0 deletions.
3 changes: 3 additions & 0 deletions services/audio/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ source_set("lib") {
"input_stream.h",
"local_muter.cc",
"local_muter.h",
"loopback_stream.cc",
"loopback_stream.h",
"output_controller.cc",
"output_controller.h",
"output_stream.cc",
Expand Down Expand Up @@ -82,6 +84,7 @@ source_set("tests") {
"group_coordinator_unittest.cc",
"input_stream_unittest.cc",
"local_muter_unittest.cc",
"loopback_stream_unittest.cc",
"output_controller_unittest.cc",
"output_stream_unittest.cc",
"snooper_node_unittest.cc",
Expand Down
356 changes: 356 additions & 0 deletions services/audio/loopback_stream.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,356 @@
// Copyright 2018 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "services/audio/loopback_stream.h"

#include <algorithm>
#include <string>

#include "base/bind.h"
#include "base/stl_util.h"
#include "base/sync_socket.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "base/time/default_tick_clock.h"
#include "media/base/audio_bus.h"
#include "media/base/vector_math.h"
#include "mojo/public/cpp/system/buffer.h"
#include "mojo/public/cpp/system/platform_handle.h"

namespace audio {

// static
constexpr double LoopbackStream::kMaxVolume;

// static
constexpr base::TimeDelta LoopbackStream::kCaptureDelay;

LoopbackStream::LoopbackStream(
CreatedCallback created_callback,
BindingLostCallback binding_lost_callback,
scoped_refptr<base::SequencedTaskRunner> flow_task_runner,
media::mojom::AudioInputStreamRequest request,
media::mojom::AudioInputStreamClientPtr client,
media::mojom::AudioInputStreamObserverPtr observer,
const media::AudioParameters& params,
uint32_t shared_memory_count,
GroupCoordinator* coordinator,
const base::UnguessableToken& group_id)
: binding_lost_callback_(std::move(binding_lost_callback)),
binding_(this, std::move(request)),
client_(std::move(client)),
observer_(std::move(observer)),
coordinator_(coordinator),
group_id_(group_id),
network_(nullptr, base::OnTaskRunnerDeleter(flow_task_runner)),
weak_factory_(this) {
DCHECK(coordinator_);

// Generate an error and shut down automatically whenever any of the mojo
// bindings is closed.
binding_.set_connection_error_handler(
base::BindOnce(&LoopbackStream::OnError, base::Unretained(this)));
client_.set_connection_error_handler(
base::BindOnce(&LoopbackStream::OnError, base::Unretained(this)));
observer_.set_connection_error_handler(
base::BindOnce(&LoopbackStream::OnError, base::Unretained(this)));

// As of this writing, only machines older than about 10 years won't be able
// to produce high-resolution timestamps. In order to avoid adding extra
// complexity to the implementation, simply refuse to operate without that
// basic level of hardware support.
//
// Construct the components of the AudioDataPipe, for delivering the data to
// the consumer. If successful, create the FlowNetwork too.
if (base::TimeTicks::IsHighResolution()) {
base::CancelableSyncSocket foreign_socket;
std::unique_ptr<media::AudioInputSyncWriter> writer =
media::AudioInputSyncWriter::Create(
base::BindRepeating(
[](const std::string& message) { VLOG(1) << message; }),
shared_memory_count, params, &foreign_socket);
if (writer) {
const base::SharedMemory* memory = writer->shared_memory();
base::SharedMemoryHandle foreign_memory_handle =
memory->GetReadOnlyHandle();
mojo::ScopedSharedBufferHandle buffer_handle;
mojo::ScopedHandle socket_handle;
if (base::SharedMemory::IsHandleValid(foreign_memory_handle)) {
buffer_handle = mojo::WrapSharedMemoryHandle(
foreign_memory_handle, memory->requested_size(),
mojo::UnwrappedSharedMemoryHandleProtection::kReadOnly);
socket_handle = mojo::WrapPlatformFile(foreign_socket.Release());
if (buffer_handle.is_valid() && socket_handle.is_valid()) {
std::move(created_callback)
.Run({base::in_place, std::move(buffer_handle),
std::move(socket_handle)});
network_.reset(new FlowNetwork(std::move(flow_task_runner), params,
std::move(writer)));
return; // Success!
}
}
}
} else /* if (!base::TimeTicks::IsHighResolution()) */ {
LOG(ERROR) << "Refusing to start loop-back because this machine cannot "
"provide high-resolution timestamps.";
}

// If this point is reached, either the TimeTicks clock is not high resolution
// or one or more AudioDataPipe components failed to initialize. Report the
// error.
std::move(created_callback).Run(nullptr);
OnError();
}

LoopbackStream::~LoopbackStream() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);

if (network_) {
if (network_->is_started()) {
coordinator_->RemoveObserver(group_id_, this);
for (GroupMember* member : coordinator_->GetCurrentMembers(group_id_)) {
OnMemberLeftGroup(member);
}
}
DCHECK(snoopers_.empty());
}
}

void LoopbackStream::Record() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);

if (!network_ || network_->is_started()) {
return;
}

// Begin snooping on all group members. This will set up the mixer network
// and begin accumulating audio data in the Snoopers' buffers.
DCHECK(snoopers_.empty());
for (GroupMember* member : coordinator_->GetCurrentMembers(group_id_)) {
OnMemberJoinedGroup(member);
}
coordinator_->AddObserver(group_id_, this);

// Start the data flow.
network_->Start();

if (observer_) {
observer_->DidStartRecording();
}
}

void LoopbackStream::SetVolume(double volume) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);

if (!std::isfinite(volume) || volume < 0.0) {
mojo::ReportBadMessage("Invalid volume");
OnError();
return;
}

if (network_) {
network_->SetVolume(std::min(volume, kMaxVolume));
}
}

void LoopbackStream::OnMemberJoinedGroup(GroupMember* member) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);

if (!network_) {
return;
}

const media::AudioParameters& input_params = member->GetAudioParameters();
const auto emplace_result = snoopers_.emplace(
std::piecewise_construct, std::forward_as_tuple(member),
std::forward_as_tuple(input_params, network_->output_params()));
DCHECK(emplace_result.second); // There was no pre-existing map entry.
SnooperNode* const snooper = &(emplace_result.first->second);
member->StartSnooping(snooper);
network_->AddInput(snooper);
}

void LoopbackStream::OnMemberLeftGroup(GroupMember* member) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);

if (!network_) {
return;
}

const auto snoop_it = snoopers_.find(member);
DCHECK(snoop_it != snoopers_.end());
SnooperNode* const snooper = &(snoop_it->second);
member->StopSnooping(snooper);
network_->RemoveInput(snooper);
snoopers_.erase(snoop_it);
}

void LoopbackStream::OnError() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);

if (!binding_lost_callback_) {
return; // OnError() was already called.
}

binding_.Close();
if (client_) {
client_->OnError();
client_.reset();
}
observer_.reset();

// Post a task to run the BindingLostCallback, since this method can be called
// from the constructor.
base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE,
base::BindOnce(
[](base::WeakPtr<LoopbackStream> weak_self,
BindingLostCallback callback) {
if (auto* self = weak_self.get()) {
std::move(callback).Run(self);
}
},
weak_factory_.GetWeakPtr(), std::move(binding_lost_callback_)));

// No need to shut down anything else, as the destructor will run soon and
// take care of the rest.
}

LoopbackStream::FlowNetwork::FlowNetwork(
scoped_refptr<base::SequencedTaskRunner> flow_task_runner,
const media::AudioParameters& output_params,
std::unique_ptr<media::AudioInputSyncWriter> writer)
: clock_(base::DefaultTickClock::GetInstance()),
flow_task_runner_(flow_task_runner),
output_params_(output_params),
writer_(std::move(writer)),
mix_bus_(media::AudioBus::Create(output_params_)) {}

void LoopbackStream::FlowNetwork::AddInput(SnooperNode* node) {
DCHECK_CALLED_ON_VALID_SEQUENCE(control_sequence_);

base::AutoLock scoped_lock(lock_);
DCHECK(!base::ContainsValue(inputs_, node));
inputs_.push_back(node);
}

void LoopbackStream::FlowNetwork::RemoveInput(SnooperNode* node) {
DCHECK_CALLED_ON_VALID_SEQUENCE(control_sequence_);

base::AutoLock scoped_lock(lock_);
const auto it = std::find(inputs_.begin(), inputs_.end(), node);
DCHECK(it != inputs_.end());
inputs_.erase(it);
}

void LoopbackStream::FlowNetwork::SetVolume(double volume) {
DCHECK_CALLED_ON_VALID_SEQUENCE(control_sequence_);

base::AutoLock scoped_lock(lock_);
volume_ = volume;
}

void LoopbackStream::FlowNetwork::Start() {
DCHECK_CALLED_ON_VALID_SEQUENCE(control_sequence_);
DCHECK(!is_started());

timer_.emplace(clock_);
timer_->SetTaskRunner(flow_task_runner_);
// Note: GenerateMoreAudio() will schedule the timer.

first_generate_time_ = clock_->NowTicks();
frames_elapsed_ = 0;
next_generate_time_ = first_generate_time_;

flow_task_runner_->PostTask(
FROM_HERE,
// Unretained is safe because the destructor will always be invoked from a
// task that runs afterwards.
base::BindOnce(&FlowNetwork::GenerateMoreAudio, base::Unretained(this)));
}

LoopbackStream::FlowNetwork::~FlowNetwork() {
DCHECK(flow_task_runner_->RunsTasksInCurrentSequence());
DCHECK(inputs_.empty());
}

void LoopbackStream::FlowNetwork::GenerateMoreAudio() {
DCHECK(flow_task_runner_->RunsTasksInCurrentSequence());

// Always generate audio from the recent past, to prevent buffer underruns
// in the inputs.
const base::TimeTicks delayed_capture_time =
next_generate_time_ - kCaptureDelay;

// Drive the audio flows from the SnooperNodes and produce the single result
// stream. Hold the lock during this part of the process to prevent any of the
// control methods from altering the configuration of the network.
double output_volume;
{
base::AutoLock scoped_lock(lock_);
output_volume = volume_;

// Render the audio from each input, apply this stream's volume setting by
// scaling the data, then mix it all together to form a single audio
// signal. If there are no snoopers, just render silence.
auto it = inputs_.begin();
if (it == inputs_.end()) {
mix_bus_->Zero();
} else {
// Render the first input's signal directly into |mix_bus_|.
(*it)->Render(delayed_capture_time, mix_bus_.get());
mix_bus_->Scale(volume_);

// Render each successive input's signal into |transfer_bus_|, and then
// mix it into |mix_bus_|.
++it;
if (it != inputs_.end()) {
if (!transfer_bus_) {
transfer_bus_ = media::AudioBus::Create(output_params_);
}
do {
(*it)->Render(delayed_capture_time, transfer_bus_.get());
for (int ch = 0; ch < transfer_bus_->channels(); ++ch) {
media::vector_math::FMAC(transfer_bus_->channel(ch), volume_,
transfer_bus_->frames(),
mix_bus_->channel(ch));
}
++it;
} while (it != inputs_.end());
}
}
}

// Insert the result into the AudioDataPipe.
writer_->Write(mix_bus_.get(), output_volume, false, delayed_capture_time);

// Determine when to generate more audio again. This is done by advancing the
// frame count by one interval's worth, then computing the TimeTicks
// corresponding to the new frame count. Also, check the clock to detect when
// the user's machine is overloaded and the output needs to skip forward one
// or more intervals.
const int frames_per_buffer = mix_bus_->frames();
frames_elapsed_ += frames_per_buffer;
const base::TimeTicks now = clock_->NowTicks();
const int64_t required_frames_elapsed =
(now - first_generate_time_).InMicroseconds() *
output_params_.sample_rate() / base::Time::kMicrosecondsPerSecond;
if (frames_elapsed_ < required_frames_elapsed) {
// Audio generation has fallen behind. Skip ahead to the next interval.
frames_elapsed_ = ((required_frames_elapsed + frames_per_buffer - 1) /
frames_per_buffer) *
frames_per_buffer;
}
next_generate_time_ =
first_generate_time_ +
base::TimeDelta::FromMicroseconds(frames_elapsed_ *
base::Time::kMicrosecondsPerSecond /
output_params_.sample_rate());

// Use the OneShotTimer to call this method again at the desired time.
DCHECK_GE(next_generate_time_ - now, base::TimeDelta());
timer_->Start(FROM_HERE, next_generate_time_ - now, this,
&FlowNetwork::GenerateMoreAudio);
}

} // namespace audio
Loading

0 comments on commit 6905515

Please sign in to comment.