/* * Copyright (C) 2021 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "host/commands/cvd/server.h" #include #include #include #include #include #include #include #include #include #include #include "cvd_server.pb.h" #include "common/libs/fs/shared_buf.h" #include "common/libs/fs/shared_fd.h" #include "common/libs/fs/shared_select.h" #include "common/libs/utils/files.h" #include "common/libs/utils/flag_parser.h" #include "common/libs/utils/result.h" #include "common/libs/utils/shared_fd_flag.h" #include "common/libs/utils/subprocess.h" #include "host/commands/cvd/epoll_loop.h" #include "host/commands/cvd/scope_guard.h" #include "host/commands/cvd/server_constants.h" #include "host/libs/config/cuttlefish_config.h" #include "host/libs/config/known_paths.h" namespace cuttlefish { static fruit::Component<> RequestComponent(CvdServer* server, InstanceManager* instance_manager) { return fruit::createComponent() .bindInstance(*server) .bindInstance(*instance_manager) .install(AcloudCommandComponent) .install(cvdCommandComponent) .install(cvdShutdownComponent) .install(cvdVersionComponent); } static constexpr int kNumThreads = 10; CvdServer::CvdServer(EpollPool& epoll_pool, InstanceManager& instance_manager) : epoll_pool_(epoll_pool), instance_manager_(instance_manager), running_(true) { std::scoped_lock lock(threads_mutex_); for (auto i = 0; i < kNumThreads; i++) { threads_.emplace_back([this]() { while (running_) { auto result = epoll_pool_.HandleEvent(); if (!result.ok()) { LOG(ERROR) << "Epoll worker error:\n" << result.error(); } } auto wakeup = BestEffortWakeup(); CHECK(wakeup.ok()) << wakeup.error().message(); }); } } CvdServer::~CvdServer() { running_ = false; auto wakeup = BestEffortWakeup(); CHECK(wakeup.ok()) << wakeup.error().message(); Join(); } Result CvdServer::BestEffortWakeup() { // This attempts to cascade through the responder threads, forcing them // to wake up and see that running_ is false, then exit and wake up // further threads. auto eventfd = SharedFD::Event(); CF_EXPECT(eventfd->IsOpen(), eventfd->StrError()); CF_EXPECT(eventfd->EventfdWrite(1) == 0, eventfd->StrError()); auto cb = [](EpollEvent) -> Result { return {}; }; CF_EXPECT(epoll_pool_.Register(eventfd, EPOLLIN, cb)); return {}; } void CvdServer::Stop() { { std::lock_guard lock(ongoing_requests_mutex_); running_ = false; } while (true) { std::shared_ptr request; { std::lock_guard lock(ongoing_requests_mutex_); if (ongoing_requests_.empty()) { break; } auto it = ongoing_requests_.begin(); request = *it; ongoing_requests_.erase(it); } { std::lock_guard lock(request->mutex); if (request->handler == nullptr) { continue; } request->handler->Interrupt(); } std::scoped_lock lock(threads_mutex_); for (auto& thread : threads_) { auto current_thread = thread.get_id() == std::this_thread::get_id(); auto matching_thread = thread.get_id() == request->thread_id; if (!current_thread && matching_thread && thread.joinable()) { thread.join(); } } } } void CvdServer::Join() { for (auto& thread : threads_) { if (thread.joinable()) { thread.join(); } } } static Result RequestHandler( const RequestWithStdio& request, const std::vector& handlers) { Result response; std::vector compatible_handlers; for (auto& handler : handlers) { if (CF_EXPECT(handler->CanHandle(request))) { compatible_handlers.push_back(handler); } } CF_EXPECT(compatible_handlers.size() == 1, "Expected exactly one handler for message, found " << compatible_handlers.size()); return compatible_handlers[0]; } Result CvdServer::StartServer(SharedFD server_fd) { auto cb = [this](EpollEvent ev) -> Result { CF_EXPECT(AcceptClient(ev)); return {}; }; CF_EXPECT(epoll_pool_.Register(server_fd, EPOLLIN, cb)); return {}; } Result CvdServer::AcceptClient(EpollEvent event) { ScopeGuard stop_on_failure([this] { Stop(); }); CF_EXPECT(event.events & EPOLLIN); auto client_fd = SharedFD::Accept(*event.fd); CF_EXPECT(client_fd->IsOpen(), client_fd->StrError()); auto client_cb = [this](EpollEvent ev) -> Result { CF_EXPECT(HandleMessage(ev)); return {}; }; CF_EXPECT(epoll_pool_.Register(client_fd, EPOLLIN, client_cb)); auto self_cb = [this](EpollEvent ev) -> Result { CF_EXPECT(AcceptClient(ev)); return {}; }; CF_EXPECT(epoll_pool_.Register(event.fd, EPOLLIN, self_cb)); stop_on_failure.Cancel(); return {}; } Result CvdServer::HandleMessage(EpollEvent event) { ScopeGuard abandon_client([this, event] { epoll_pool_.Remove(event.fd); }); if (event.events & EPOLLHUP) { // Client went away. epoll_pool_.Remove(event.fd); return {}; } CF_EXPECT(event.events & EPOLLIN); auto request = CF_EXPECT(GetRequest(event.fd)); if (!request) { // End-of-file / client went away. epoll_pool_.Remove(event.fd); return {}; } auto response = HandleRequest(*request, event.fd); if (!response.ok()) { cvd::Response failure_message; failure_message.mutable_status()->set_code(cvd::Status::INTERNAL); failure_message.mutable_status()->set_message(response.error().message()); CF_EXPECT(SendResponse(event.fd, failure_message)); return {}; // Error already sent to the client, don't repeat on the server } CF_EXPECT(SendResponse(event.fd, *response)); auto self_cb = [this](EpollEvent ev) -> Result { CF_EXPECT(HandleMessage(ev)); return {}; }; CF_EXPECT(epoll_pool_.Register(event.fd, EPOLLIN, self_cb)); abandon_client.Cancel(); return {}; } Result CvdServer::HandleRequest(RequestWithStdio request, SharedFD client) { fruit::Injector<> injector(RequestComponent, this, &instance_manager_); auto possible_handlers = injector.getMultibindings(); // Even if the interrupt callback outlives the request handler, it'll only // hold on to this struct which will be cleaned out when the request handler // exits. auto shared = std::make_shared(); shared->handler = CF_EXPECT(RequestHandler(request, possible_handlers)); shared->thread_id = std::this_thread::get_id(); { std::lock_guard lock(ongoing_requests_mutex_); if (running_) { ongoing_requests_.insert(shared); } else { // We're executing concurrently with a Stop() call. return {}; } } ScopeGuard remove_ongoing_request([this, shared] { std::lock_guard lock(ongoing_requests_mutex_); ongoing_requests_.erase(shared); }); auto interrupt_cb = [shared](EpollEvent) -> Result { std::lock_guard lock(shared->mutex); CF_EXPECT(shared->handler != nullptr); CF_EXPECT(shared->handler->Interrupt()); return {}; }; CF_EXPECT(epoll_pool_.Register(client, EPOLLHUP, interrupt_cb)); auto response = CF_EXPECT(shared->handler->Handle(request)); { std::lock_guard lock(shared->mutex); shared->handler = nullptr; } CF_EXPECT(epoll_pool_.Remove(client)); // Delete interrupt handler return response; } static fruit::Component ServerComponent() { return fruit::createComponent() .install(EpollLoopComponent); } Result CvdServerMain(SharedFD server_fd) { LOG(INFO) << "Starting server"; signal(SIGPIPE, SIG_IGN); CF_EXPECT(server_fd->IsOpen(), "Did not receive a valid cvd_server fd"); fruit::Injector injector(ServerComponent); CvdServer& server = injector.get(); server.StartServer(server_fd); server.Join(); return 0; } } // namespace cuttlefish