257 lines
7.8 KiB
C++
257 lines
7.8 KiB
C++
// Copyright (C) 2021 The Android Open Source Project
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
#include "OutputStreamImpl.h"
|
|
|
|
#include <aidl/device/google/atv/audio_proxy/MessageQueueFlag.h>
|
|
#include <aidl/device/google/atv/audio_proxy/PresentationPosition.h>
|
|
#include <android-base/logging.h>
|
|
#include <time.h>
|
|
|
|
#include "AudioProxyClientError.h"
|
|
#include "AudioProxyStreamOut.h"
|
|
|
|
using aidl::device::google::atv::audio_proxy::MessageQueueFlag;
|
|
using aidl::device::google::atv::audio_proxy::PresentationPosition;
|
|
using android::status_t;
|
|
|
|
namespace audio_proxy {
|
|
namespace {
|
|
// 1GB
|
|
constexpr uint32_t kMaxBufferSize = 1 << 30;
|
|
|
|
void deleteEventFlag(EventFlag* obj) {
|
|
if (!obj) {
|
|
return;
|
|
}
|
|
|
|
status_t status = EventFlag::deleteEventFlag(&obj);
|
|
if (status) {
|
|
LOG(ERROR) << "write MQ event flag deletion error: " << strerror(-status);
|
|
}
|
|
}
|
|
|
|
class WriteThread : public Thread {
|
|
public:
|
|
// WriteThread's lifespan never exceeds StreamOut's lifespan.
|
|
WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
|
|
OutputStreamImpl::DataMQ* dataMQ,
|
|
OutputStreamImpl::StatusMQ* statusMQ, EventFlag* eventFlag);
|
|
|
|
~WriteThread() override;
|
|
|
|
private:
|
|
bool threadLoop() override;
|
|
|
|
PresentationPosition doGetPresentationPosition();
|
|
int64_t doWrite();
|
|
|
|
std::atomic<bool>* const mStop;
|
|
AudioProxyStreamOut* mStream;
|
|
OutputStreamImpl::DataMQ* const mDataMQ;
|
|
OutputStreamImpl::StatusMQ* const mStatusMQ;
|
|
EventFlag* const mEventFlag;
|
|
const std::unique_ptr<int8_t[]> mBuffer;
|
|
};
|
|
|
|
WriteThread::WriteThread(std::atomic<bool>* stop, AudioProxyStreamOut* stream,
|
|
OutputStreamImpl::DataMQ* dataMQ,
|
|
OutputStreamImpl::StatusMQ* statusMQ,
|
|
EventFlag* eventFlag)
|
|
: Thread(false /*canCallJava*/),
|
|
mStop(stop),
|
|
mStream(stream),
|
|
mDataMQ(dataMQ),
|
|
mStatusMQ(statusMQ),
|
|
mEventFlag(eventFlag),
|
|
mBuffer(new int8_t[mDataMQ->getQuantumCount()]) {}
|
|
|
|
WriteThread::~WriteThread() = default;
|
|
|
|
int64_t WriteThread::doWrite() {
|
|
const size_t availToRead = mDataMQ->availableToRead();
|
|
if (availToRead == 0) {
|
|
return 0;
|
|
}
|
|
|
|
if (!mDataMQ->read(&mBuffer[0], availToRead)) {
|
|
return 0;
|
|
}
|
|
|
|
return mStream->write(&mBuffer[0], availToRead);
|
|
}
|
|
|
|
PresentationPosition WriteThread::doGetPresentationPosition() {
|
|
PresentationPosition position;
|
|
mStream->getPresentationPosition(&position.frames, &position.timestamp);
|
|
return position;
|
|
}
|
|
|
|
bool WriteThread::threadLoop() {
|
|
// This implementation doesn't return control back to the Thread until the
|
|
// parent thread decides to stop, as the Thread uses mutexes, and this can
|
|
// lead to priority inversion.
|
|
while (!std::atomic_load_explicit(mStop, std::memory_order_acquire)) {
|
|
uint32_t efState = 0;
|
|
mEventFlag->wait(static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY),
|
|
&efState);
|
|
if (!(efState & static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY))) {
|
|
continue; // Nothing to do.
|
|
}
|
|
|
|
WriteStatus status;
|
|
status.written = doWrite();
|
|
status.position = doGetPresentationPosition();
|
|
|
|
if (!mStatusMQ->write(&status)) {
|
|
LOG(ERROR) << "status message queue write failed.";
|
|
}
|
|
mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlag::NOT_FULL));
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
} // namespace
|
|
|
|
OutputStreamImpl::OutputStreamImpl(std::unique_ptr<AudioProxyStreamOut> stream)
|
|
: mStream(std::move(stream)), mEventFlag(nullptr, deleteEventFlag) {}
|
|
|
|
OutputStreamImpl::~OutputStreamImpl() {
|
|
closeImpl();
|
|
|
|
if (mWriteThread) {
|
|
status_t status = mWriteThread->join();
|
|
if (status) {
|
|
LOG(ERROR) << "write thread exit error: " << strerror(-status);
|
|
}
|
|
}
|
|
|
|
mEventFlag.reset();
|
|
}
|
|
|
|
ndk::ScopedAStatus OutputStreamImpl::standby() {
|
|
mStream->standby();
|
|
return ndk::ScopedAStatus::ok();
|
|
}
|
|
|
|
ndk::ScopedAStatus OutputStreamImpl::close() { return closeImpl(); }
|
|
|
|
ndk::ScopedAStatus OutputStreamImpl::closeImpl() {
|
|
if (mStopWriteThread.load(
|
|
std::memory_order_relaxed)) { // only this thread writes
|
|
return ndk::ScopedAStatus::ok();
|
|
}
|
|
mStopWriteThread.store(true, std::memory_order_release);
|
|
if (mEventFlag) {
|
|
mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY));
|
|
}
|
|
|
|
return ndk::ScopedAStatus::ok();
|
|
}
|
|
|
|
ndk::ScopedAStatus OutputStreamImpl::prepareForWriting(
|
|
int32_t frameSize, int32_t framesCount, DataMQDesc* dataMQDesc,
|
|
StatusMQDesc* statusMQDesc) {
|
|
if (mDataMQ) {
|
|
LOG(ERROR) << "the client attempted to call prepareForWriting twice.";
|
|
return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
|
|
}
|
|
|
|
if (frameSize == 0 || framesCount == 0) {
|
|
LOG(ERROR) << "Invalid frameSize (" << frameSize << ") or framesCount ("
|
|
<< framesCount << ")";
|
|
return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
|
|
}
|
|
|
|
if (frameSize > kMaxBufferSize / framesCount) {
|
|
LOG(ERROR) << "Buffer too big: " << frameSize << "*" << framesCount
|
|
<< " bytes > MAX_BUFFER_SIZE (" << kMaxBufferSize << ")";
|
|
return ndk::ScopedAStatus::fromServiceSpecificError(ERROR_INVALID_ARGS);
|
|
}
|
|
|
|
auto dataMQ =
|
|
std::make_unique<DataMQ>(frameSize * framesCount, true /* EventFlag */);
|
|
if (!dataMQ->isValid()) {
|
|
LOG(ERROR) << "data MQ is invalid";
|
|
return ndk::ScopedAStatus::fromServiceSpecificError(
|
|
ERROR_FMQ_CREATION_FAILURE);
|
|
}
|
|
|
|
auto statusMQ = std::make_unique<StatusMQ>(1);
|
|
if (!statusMQ->isValid()) {
|
|
LOG(ERROR) << "status MQ is invalid";
|
|
return ndk::ScopedAStatus::fromServiceSpecificError(
|
|
ERROR_FMQ_CREATION_FAILURE);
|
|
}
|
|
|
|
EventFlag* rawEventFlag = nullptr;
|
|
status_t status =
|
|
EventFlag::createEventFlag(dataMQ->getEventFlagWord(), &rawEventFlag);
|
|
std::unique_ptr<EventFlag, EventFlagDeleter> eventFlag(rawEventFlag,
|
|
deleteEventFlag);
|
|
if (status != ::android::OK || !eventFlag) {
|
|
LOG(ERROR) << "failed creating event flag for data MQ: "
|
|
<< strerror(-status);
|
|
return ndk::ScopedAStatus::fromServiceSpecificError(
|
|
ERROR_FMQ_CREATION_FAILURE);
|
|
}
|
|
|
|
sp<WriteThread> writeThread =
|
|
new WriteThread(&mStopWriteThread, mStream.get(), dataMQ.get(),
|
|
statusMQ.get(), eventFlag.get());
|
|
status = writeThread->run("writer", ::android::PRIORITY_URGENT_AUDIO);
|
|
if (status != ::android::OK) {
|
|
LOG(ERROR) << "failed to start writer thread: " << strerror(-status);
|
|
return ndk::ScopedAStatus::fromServiceSpecificError(
|
|
ERROR_FMQ_CREATION_FAILURE);
|
|
}
|
|
|
|
mDataMQ = std::move(dataMQ);
|
|
mStatusMQ = std::move(statusMQ);
|
|
mEventFlag = std::move(eventFlag);
|
|
mWriteThread = std::move(writeThread);
|
|
|
|
*dataMQDesc = mDataMQ->dupeDesc();
|
|
*statusMQDesc = mStatusMQ->dupeDesc();
|
|
|
|
return ndk::ScopedAStatus::ok();
|
|
}
|
|
|
|
ndk::ScopedAStatus OutputStreamImpl::pause() {
|
|
mStream->pause();
|
|
return ndk::ScopedAStatus::ok();
|
|
}
|
|
|
|
ndk::ScopedAStatus OutputStreamImpl::resume() {
|
|
mStream->resume();
|
|
return ndk::ScopedAStatus::ok();
|
|
}
|
|
|
|
ndk::ScopedAStatus OutputStreamImpl::drain(AudioDrain type) {
|
|
mStream->drain(type);
|
|
return ndk::ScopedAStatus::ok();
|
|
}
|
|
|
|
ndk::ScopedAStatus OutputStreamImpl::flush() {
|
|
mStream->flush();
|
|
return ndk::ScopedAStatus::ok();
|
|
}
|
|
|
|
ndk::ScopedAStatus OutputStreamImpl::setVolume(float left, float right) {
|
|
mStream->setVolume(left, right);
|
|
return ndk::ScopedAStatus::ok();
|
|
}
|
|
|
|
} // namespace audio_proxy
|