/* * Copyright (C) 2020 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define TRACE_TAG INCREMENTAL #include "incremental_server.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "adb.h" #include "adb_io.h" #include "adb_trace.h" #include "adb_unique_fd.h" #include "adb_utils.h" #include "incremental_utils.h" #include "sysdeps.h" namespace incremental { static constexpr int kHashesPerBlock = kBlockSize / kDigestSize; static constexpr int kCompressedSizeMax = kBlockSize * 0.95; static constexpr int8_t kTypeData = 0; static constexpr int8_t kTypeHash = 1; static constexpr int8_t kCompressionNone = 0; static constexpr int8_t kCompressionLZ4 = 1; static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize)); static constexpr auto kReadBufferSize = 128 * 1024; static constexpr int kPollTimeoutMillis = 300000; // 5 minutes using BlockSize = int16_t; using FileId = int16_t; using BlockIdx = int32_t; using NumBlocks = int32_t; using BlockType = int8_t; using CompressionType = int8_t; using RequestType = int16_t; using ChunkHeader = int32_t; using MagicType = uint32_t; static constexpr MagicType INCR = 0x494e4352; // LE INCR static constexpr RequestType SERVING_COMPLETE = 0; static constexpr RequestType BLOCK_MISSING = 1; static constexpr RequestType PREFETCH = 2; static constexpr RequestType DESTROY = 3; static constexpr inline int64_t roundDownToBlockOffset(int64_t val) { return val & ~(kBlockSize - 1); } static constexpr inline int64_t roundUpToBlockOffset(int64_t val) { return roundDownToBlockOffset(val + kBlockSize - 1); } static constexpr inline NumBlocks numBytesToNumBlocks(int64_t bytes) { return roundUpToBlockOffset(bytes) / kBlockSize; } static constexpr inline off64_t blockIndexToOffset(BlockIdx blockIdx) { return static_cast(blockIdx) * kBlockSize; } template static inline constexpr T toBigEndian(T t) { using unsigned_type = std::make_unsigned_t; if constexpr (std::is_same_v) { return htobe16(static_cast(t)); } else if constexpr (std::is_same_v) { return htobe32(static_cast(t)); } else if constexpr (std::is_same_v) { return htobe64(static_cast(t)); } else { return t; } } template static inline constexpr T readBigEndian(void* data) { using unsigned_type = std::make_unsigned_t; if constexpr (std::is_same_v) { return static_cast(be16toh(*reinterpret_cast(data))); } else if constexpr (std::is_same_v) { return static_cast(be32toh(*reinterpret_cast(data))); } else if constexpr (std::is_same_v) { return static_cast(be64toh(*reinterpret_cast(data))); } else { return T(); } } // Received from device // !Does not include magic! struct RequestCommand { RequestType request_type; // 2 bytes FileId file_id; // 2 bytes union { BlockIdx block_idx; NumBlocks num_blocks; }; // 4 bytes } __attribute__((packed)); // Placed before actual data bytes of each block struct ResponseHeader { FileId file_id; // 2 bytes BlockType block_type; // 1 byte CompressionType compression_type; // 1 byte BlockIdx block_idx; // 4 bytes BlockSize block_size; // 2 bytes static constexpr size_t responseSizeFor(size_t dataSize) { return dataSize + sizeof(ResponseHeader); } } __attribute__((packed)); template struct BlockBuffer { ResponseHeader header; char data[Size]; } __attribute__((packed)); // Holds streaming state for a file class File { public: // Plain file File(const char* filepath, FileId id, int64_t size, unique_fd fd, int64_t tree_offset, unique_fd tree_fd) : File(filepath, id, size, tree_offset) { this->fd_ = std::move(fd); this->tree_fd_ = std::move(tree_fd); priority_blocks_ = PriorityBlocksForFile(filepath, fd_.get(), size); } int64_t ReadDataBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed) const { int64_t bytes_read = -1; const off64_t offsetStart = blockIndexToOffset(block_idx); bytes_read = adb_pread(fd_, buf, kBlockSize, offsetStart); return bytes_read; } int64_t ReadTreeBlock(BlockIdx block_idx, void* buf) const { int64_t bytes_read = -1; const off64_t offsetStart = tree_offset_ + blockIndexToOffset(block_idx); bytes_read = adb_pread(tree_fd_, buf, kBlockSize, offsetStart); return bytes_read; } const std::vector& PriorityBlocks() const { return priority_blocks_; } bool hasTree() const { return tree_fd_.ok(); } std::vector sentBlocks; NumBlocks sentBlocksCount = 0; std::vector sentTreeBlocks; const char* const filepath; const FileId id; const int64_t size; private: File(const char* filepath, FileId id, int64_t size, int64_t tree_offset) : filepath(filepath), id(id), size(size), tree_offset_(tree_offset) { sentBlocks.resize(numBytesToNumBlocks(size)); sentTreeBlocks.resize(verity_tree_blocks_for_file(size)); } unique_fd fd_; std::vector priority_blocks_; unique_fd tree_fd_; const int64_t tree_offset_; }; class IncrementalServer { public: IncrementalServer(unique_fd adb_fd, unique_fd output_fd, std::vector files) : adb_fd_(std::move(adb_fd)), output_fd_(std::move(output_fd)), files_(std::move(files)) { buffer_.reserve(kReadBufferSize); pendingBlocksBuffer_.resize(kChunkFlushSize + 2 * kBlockSize); pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader); } bool Serve(); private: struct PrefetchState { const File* file; BlockIdx overallIndex = 0; BlockIdx overallEnd = 0; BlockIdx priorityIndex = 0; explicit PrefetchState(const File& f, BlockIdx start, int count) : file(&f), overallIndex(start), overallEnd(std::min(start + count, f.sentBlocks.size())) {} explicit PrefetchState(const File& f) : PrefetchState(f, 0, (BlockIdx)f.sentBlocks.size()) {} bool done() const { const bool overallSent = (overallIndex >= overallEnd); if (file->PriorityBlocks().empty()) { return overallSent; } return overallSent && (priorityIndex >= (BlockIdx)file->PriorityBlocks().size()); } }; bool SkipToRequest(void* buffer, size_t* size, bool blocking); std::optional ReadRequest(bool blocking); void erase_buffer_head(int count) { buffer_.erase(buffer_.begin(), buffer_.begin() + count); } enum class SendResult { Sent, Skipped, Error }; SendResult SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush = false); bool SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx); bool SendTreeBlocksForDataBlock(FileId fileId, BlockIdx blockIdx); bool SendDone(); void RunPrefetching(); void Send(const void* data, size_t size, bool flush); void Flush(); using TimePoint = decltype(std::chrono::high_resolution_clock::now()); bool ServingComplete(std::optional startTime, int missesCount, int missesSent); unique_fd const adb_fd_; unique_fd const output_fd_; std::vector files_; // Incoming data buffer. std::vector buffer_; std::deque prefetches_; int compressed_ = 0, uncompressed_ = 0; long long sentSize_ = 0; static constexpr auto kChunkFlushSize = 31 * kBlockSize; std::vector pendingBlocksBuffer_; char* pendingBlocks_ = nullptr; // True when client notifies that all the data has been received bool servingComplete_ = false; }; bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) { while (true) { // Looking for INCR magic. bool magic_found = false; int bcur = 0; int bsize = buffer_.size(); for (bcur = 0; bcur + 4 < bsize; ++bcur) { uint32_t magic = be32toh(*(uint32_t*)(buffer_.data() + bcur)); if (magic == INCR) { magic_found = true; break; } } if (bcur > 0) { // output the rest. (void)WriteFdExactly(output_fd_, buffer_.data(), bcur); erase_buffer_head(bcur); } if (magic_found && buffer_.size() >= *size + sizeof(INCR)) { // fine, return memcpy(buffer, buffer_.data() + sizeof(INCR), *size); erase_buffer_head(*size + sizeof(INCR)); return true; } adb_pollfd pfd = {adb_fd_.get(), POLLIN, 0}; auto res = adb_poll(&pfd, 1, blocking ? kPollTimeoutMillis : 0); if (res != 1) { auto err = errno; (void)WriteFdExactly(output_fd_, buffer_.data(), buffer_.size()); if (res < 0) { D("Failed to poll: %s", strerror(err)); return false; } if (blocking) { fprintf(stderr, "Timed out waiting for data from device.\n"); } if (blocking && servingComplete_) { // timeout waiting from client. Serving is complete, so quit. return false; } *size = 0; return true; } bsize = buffer_.size(); buffer_.resize(kReadBufferSize); int r = adb_read(adb_fd_, buffer_.data() + bsize, kReadBufferSize - bsize); if (r > 0) { buffer_.resize(bsize + r); continue; } D("Failed to read from fd %d: %d. Exit", adb_fd_.get(), errno); break; } // socket is closed. print remaining messages WriteFdExactly(output_fd_, buffer_.data(), buffer_.size()); return false; } std::optional IncrementalServer::ReadRequest(bool blocking) { uint8_t commandBuf[sizeof(RequestCommand)]; auto size = sizeof(commandBuf); if (!SkipToRequest(&commandBuf, &size, blocking)) { return {{DESTROY}}; } if (size < sizeof(RequestCommand)) { return {}; } RequestCommand request; request.request_type = readBigEndian(&commandBuf[0]); request.file_id = readBigEndian(&commandBuf[2]); request.block_idx = readBigEndian(&commandBuf[4]); return request; } bool IncrementalServer::SendTreeBlocksForDataBlock(const FileId fileId, const BlockIdx blockIdx) { auto& file = files_[fileId]; if (!file.hasTree()) { return true; } const int32_t data_block_count = numBytesToNumBlocks(file.size); const int32_t total_nodes_count(file.sentTreeBlocks.size()); const int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock; const int32_t leaf_nodes_offset = total_nodes_count - leaf_nodes_count; // Leaf level, sending only 1 block. const int32_t leaf_idx = leaf_nodes_offset + blockIdx / kHashesPerBlock; if (file.sentTreeBlocks[leaf_idx]) { return true; } if (!SendTreeBlock(fileId, blockIdx, leaf_idx)) { return false; } file.sentTreeBlocks[leaf_idx] = true; // Non-leaf, sending EVERYTHING. This should be done only once. if (leaf_nodes_offset == 0 || file.sentTreeBlocks[0]) { return true; } for (int32_t i = 0; i < leaf_nodes_offset; ++i) { if (!SendTreeBlock(fileId, blockIdx, i)) { return false; } file.sentTreeBlocks[i] = true; } return true; } bool IncrementalServer::SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx) { const auto& file = files_[fileId]; BlockBuffer buffer; const int64_t bytesRead = file.ReadTreeBlock(blockIdx, buffer.data); if (bytesRead <= 0) { fprintf(stderr, "Failed to get data for %s.idsig at blockIdx=%d.\n", file.filepath, blockIdx); return false; } buffer.header.compression_type = kCompressionNone; buffer.header.block_type = kTypeHash; buffer.header.file_id = toBigEndian(fileId); buffer.header.block_size = toBigEndian(int16_t(bytesRead)); buffer.header.block_idx = toBigEndian(blockIdx); Send(&buffer, ResponseHeader::responseSizeFor(bytesRead), /*flush=*/false); return true; } auto IncrementalServer::SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush) -> SendResult { auto& file = files_[fileId]; if (blockIdx >= static_cast(file.sentBlocks.size())) { // may happen as we schedule some extra blocks for reported page misses D("Skipped reading file %s at block %" PRId32 " (past end).", file.filepath, blockIdx); return SendResult::Skipped; } if (file.sentBlocks[blockIdx]) { return SendResult::Skipped; } if (!SendTreeBlocksForDataBlock(fileId, blockIdx)) { return SendResult::Error; } BlockBuffer raw; bool isZipCompressed = false; const int64_t bytesRead = file.ReadDataBlock(blockIdx, raw.data, &isZipCompressed); if (bytesRead < 0) { fprintf(stderr, "Failed to get data for %s at blockIdx=%d (%d).\n", file.filepath, blockIdx, errno); return SendResult::Error; } BlockBuffer compressed; int16_t compressedSize = 0; if (!isZipCompressed) { compressedSize = LZ4_compress_default(raw.data, compressed.data, bytesRead, kCompressBound); } int16_t blockSize; ResponseHeader* header; if (compressedSize > 0 && compressedSize < kCompressedSizeMax) { ++compressed_; blockSize = compressedSize; header = &compressed.header; header->compression_type = kCompressionLZ4; } else { ++uncompressed_; blockSize = bytesRead; header = &raw.header; header->compression_type = kCompressionNone; } header->block_type = kTypeData; header->file_id = toBigEndian(fileId); header->block_size = toBigEndian(blockSize); header->block_idx = toBigEndian(blockIdx); file.sentBlocks[blockIdx] = true; file.sentBlocksCount += 1; Send(header, ResponseHeader::responseSizeFor(blockSize), flush); return SendResult::Sent; } bool IncrementalServer::SendDone() { ResponseHeader header; header.file_id = -1; header.block_type = 0; header.compression_type = 0; header.block_idx = 0; header.block_size = 0; Send(&header, sizeof(header), true); return true; } void IncrementalServer::RunPrefetching() { constexpr auto kPrefetchBlocksPerIteration = 128; int blocksToSend = kPrefetchBlocksPerIteration; while (!prefetches_.empty() && blocksToSend > 0) { auto& prefetch = prefetches_.front(); const auto& file = *prefetch.file; const auto& priority_blocks = file.PriorityBlocks(); if (!priority_blocks.empty()) { for (auto& i = prefetch.priorityIndex; blocksToSend > 0 && i < (BlockIdx)priority_blocks.size(); ++i) { if (auto res = SendDataBlock(file.id, priority_blocks[i]); res == SendResult::Sent) { --blocksToSend; } else if (res == SendResult::Error) { fprintf(stderr, "Failed to send priority block %" PRId32 "\n", i); } } } for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) { if (auto res = SendDataBlock(file.id, i); res == SendResult::Sent) { --blocksToSend; } else if (res == SendResult::Error) { fprintf(stderr, "Failed to send block %" PRId32 "\n", i); } } if (prefetch.done()) { prefetches_.pop_front(); } } } void IncrementalServer::Send(const void* data, size_t size, bool flush) { pendingBlocks_ = std::copy_n(static_cast(data), size, pendingBlocks_); if (flush || pendingBlocks_ - pendingBlocksBuffer_.data() > kChunkFlushSize) { Flush(); } } void IncrementalServer::Flush() { auto dataBytes = pendingBlocks_ - (pendingBlocksBuffer_.data() + sizeof(ChunkHeader)); if (dataBytes == 0) { return; } *(ChunkHeader*)pendingBlocksBuffer_.data() = toBigEndian(dataBytes); auto totalBytes = sizeof(ChunkHeader) + dataBytes; if (!WriteFdExactly(adb_fd_, pendingBlocksBuffer_.data(), totalBytes)) { fprintf(stderr, "Failed to write %d bytes\n", int(totalBytes)); } sentSize_ += totalBytes; pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader); } bool IncrementalServer::ServingComplete(std::optional startTime, int missesCount, int missesSent) { servingComplete_ = true; using namespace std::chrono; auto endTime = high_resolution_clock::now(); D("Streaming completed.\n" "Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: " "%d, mb: %.3f\n" "Total time taken: %.3fms", missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0, duration_cast(endTime - (startTime ? *startTime : endTime)).count() / 1000.0); return true; } bool IncrementalServer::Serve() { // Initial handshake to verify connection is still alive if (!SendOkay(adb_fd_)) { fprintf(stderr, "Connection is dead. Abort.\n"); return false; } std::unordered_set prefetchedFiles; bool doneSent = false; int missesCount = 0; int missesSent = 0; using namespace std::chrono; std::optional startTime; while (true) { if (!doneSent && prefetches_.empty() && std::all_of(files_.begin(), files_.end(), [](const File& f) { return f.sentBlocksCount == NumBlocks(f.sentBlocks.size()); })) { fprintf(stderr, "All files should be loaded. Notifying the device.\n"); SendDone(); doneSent = true; } const bool blocking = prefetches_.empty(); if (blocking) { // We've no idea how long the blocking call is, so let's flush whatever is still unsent. Flush(); } auto request = ReadRequest(blocking); if (!startTime) { startTime = high_resolution_clock::now(); } if (request) { FileId fileId = request->file_id; BlockIdx blockIdx = request->block_idx; switch (request->request_type) { case DESTROY: { // Stop everything. return true; } case SERVING_COMPLETE: { // Not stopping the server here. ServingComplete(startTime, missesCount, missesSent); break; } case BLOCK_MISSING: { ++missesCount; // Sends one single block ASAP. if (fileId < 0 || fileId >= (FileId)files_.size() || blockIdx < 0 || blockIdx >= (BlockIdx)files_[fileId].sentBlocks.size()) { fprintf(stderr, "Received invalid data request for file_id %" PRId16 " block_idx %" PRId32 ".\n", fileId, blockIdx); break; } if (VLOG_IS_ON(INCREMENTAL)) { auto& file = files_[fileId]; auto posP = std::find(file.PriorityBlocks().begin(), file.PriorityBlocks().end(), blockIdx); D("\tMISSING BLOCK: reading file %d block %04d (in priority: %d of %d)", (int)fileId, (int)blockIdx, posP == file.PriorityBlocks().end() ? -1 : int(posP - file.PriorityBlocks().begin()), int(file.PriorityBlocks().size())); } if (auto res = SendDataBlock(fileId, blockIdx, true); res == SendResult::Error) { fprintf(stderr, "Failed to send block %" PRId32 ".\n", blockIdx); } else if (res == SendResult::Sent) { ++missesSent; // Make sure we send more pages from this place onward, in case if the OS is // reading a bigger block. prefetches_.emplace_front(files_[fileId], blockIdx + 1, 7); } break; } case PREFETCH: { // Start prefetching for a file if (fileId < 0) { fprintf(stderr, "Received invalid prefetch request for file_id %" PRId16 "\n", fileId); break; } if (!prefetchedFiles.insert(fileId).second) { fprintf(stderr, "Received duplicate prefetch request for file_id %" PRId16 "\n", fileId); break; } D("Received prefetch request for file_id %" PRId16 ".", fileId); prefetches_.emplace_back(files_[fileId]); break; } default: fprintf(stderr, "Invalid request %" PRId16 ",%" PRId16 ",%" PRId32 ".\n", request->request_type, fileId, blockIdx); break; } } RunPrefetching(); } } static std::pair open_fd(const char* filepath) { struct stat st; if (stat(filepath, &st)) { error_exit("inc-server: failed to stat input file '%s'.", filepath); } unique_fd fd(adb_open(filepath, O_RDONLY)); if (fd < 0) { error_exit("inc-server: failed to open file '%s'.", filepath); } return {std::move(fd), st.st_size}; } static std::pair open_signature(int64_t file_size, const char* filepath) { std::string signature_file(filepath); signature_file += IDSIG; unique_fd fd(adb_open(signature_file.c_str(), O_RDONLY)); if (fd < 0) { D("No signature file found for '%s'('%s')", filepath, signature_file.c_str()); return {}; } auto [tree_offset, tree_size] = skip_id_sig_headers(fd); if (auto expected = verity_tree_size_for_file(file_size); tree_size != expected) { error_exit("Verity tree size mismatch in signature file: %s [was %lld, expected %lld].\n", signature_file.c_str(), (long long)tree_size, (long long)expected); } int32_t data_block_count = numBytesToNumBlocks(file_size); int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock; D("Verity tree loaded: %s, tree size: %d (%d blocks, %d leafs)", signature_file.c_str(), int(tree_size), int(numBytesToNumBlocks(tree_size)), int(leaf_nodes_count)); return {std::move(fd), tree_offset}; } bool serve(int connection_fd, int output_fd, int argc, const char** argv) { auto connection_ufd = unique_fd(connection_fd); auto output_ufd = unique_fd(output_fd); if (argc <= 0) { error_exit("inc-server: must specify at least one file."); } std::vector files; files.reserve(argc); for (int i = 0; i < argc; ++i) { auto filepath = argv[i]; auto [file_fd, file_size] = open_fd(filepath); auto [sign_fd, sign_offset] = open_signature(file_size, filepath); files.emplace_back(filepath, i, file_size, std::move(file_fd), sign_offset, std::move(sign_fd)); } IncrementalServer server(std::move(connection_ufd), std::move(output_ufd), std::move(files)); printf("Serving...\n"); fclose(stdin); fclose(stdout); return server.Serve(); } } // namespace incremental