1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define TRACE_TAG INCREMENTAL
18 
19 #include "incremental_server.h"
20 
21 #include <android-base/endian.h>
22 #include <android-base/strings.h>
23 #include <inttypes.h>
24 #include <lz4.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 
30 #include <array>
31 #include <deque>
32 #include <fstream>
33 #include <thread>
34 #include <type_traits>
35 #include <unordered_set>
36 
37 #include "adb.h"
38 #include "adb_io.h"
39 #include "adb_trace.h"
40 #include "adb_unique_fd.h"
41 #include "adb_utils.h"
42 #include "incremental_utils.h"
43 #include "sysdeps.h"
44 
45 namespace incremental {
46 
47 static constexpr int kHashesPerBlock = kBlockSize / kDigestSize;
48 static constexpr int kCompressedSizeMax = kBlockSize * 0.95;
49 static constexpr int8_t kTypeData = 0;
50 static constexpr int8_t kTypeHash = 1;
51 static constexpr int8_t kCompressionNone = 0;
52 static constexpr int8_t kCompressionLZ4 = 1;
53 static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize));
54 static constexpr auto kReadBufferSize = 128 * 1024;
55 static constexpr int kPollTimeoutMillis = 300000;  // 5 minutes
56 
57 using BlockSize = int16_t;
58 using FileId = int16_t;
59 using BlockIdx = int32_t;
60 using NumBlocks = int32_t;
61 using BlockType = int8_t;
62 using CompressionType = int8_t;
63 using RequestType = int16_t;
64 using ChunkHeader = int32_t;
65 using MagicType = uint32_t;
66 
67 static constexpr MagicType INCR = 0x494e4352;  // LE INCR
68 
69 static constexpr RequestType SERVING_COMPLETE = 0;
70 static constexpr RequestType BLOCK_MISSING = 1;
71 static constexpr RequestType PREFETCH = 2;
72 static constexpr RequestType DESTROY = 3;
73 
roundDownToBlockOffset(int64_t val)74 static constexpr inline int64_t roundDownToBlockOffset(int64_t val) {
75     return val & ~(kBlockSize - 1);
76 }
77 
roundUpToBlockOffset(int64_t val)78 static constexpr inline int64_t roundUpToBlockOffset(int64_t val) {
79     return roundDownToBlockOffset(val + kBlockSize - 1);
80 }
81 
numBytesToNumBlocks(int64_t bytes)82 static constexpr inline NumBlocks numBytesToNumBlocks(int64_t bytes) {
83     return roundUpToBlockOffset(bytes) / kBlockSize;
84 }
85 
blockIndexToOffset(BlockIdx blockIdx)86 static constexpr inline off64_t blockIndexToOffset(BlockIdx blockIdx) {
87     return static_cast<off64_t>(blockIdx) * kBlockSize;
88 }
89 
90 template <typename T>
toBigEndian(T t)91 static inline constexpr T toBigEndian(T t) {
92     using unsigned_type = std::make_unsigned_t<T>;
93     if constexpr (std::is_same_v<T, int16_t>) {
94         return htobe16(static_cast<unsigned_type>(t));
95     } else if constexpr (std::is_same_v<T, int32_t>) {
96         return htobe32(static_cast<unsigned_type>(t));
97     } else if constexpr (std::is_same_v<T, int64_t>) {
98         return htobe64(static_cast<unsigned_type>(t));
99     } else {
100         return t;
101     }
102 }
103 
104 template <typename T>
readBigEndian(void * data)105 static inline constexpr T readBigEndian(void* data) {
106     using unsigned_type = std::make_unsigned_t<T>;
107     if constexpr (std::is_same_v<T, int16_t>) {
108         return static_cast<T>(be16toh(*reinterpret_cast<unsigned_type*>(data)));
109     } else if constexpr (std::is_same_v<T, int32_t>) {
110         return static_cast<T>(be32toh(*reinterpret_cast<unsigned_type*>(data)));
111     } else if constexpr (std::is_same_v<T, int64_t>) {
112         return static_cast<T>(be64toh(*reinterpret_cast<unsigned_type*>(data)));
113     } else {
114         return T();
115     }
116 }
117 
118 // Received from device
119 // !Does not include magic!
120 struct RequestCommand {
121     RequestType request_type;  // 2 bytes
122     FileId file_id;            // 2 bytes
123     union {
124         BlockIdx block_idx;
125         NumBlocks num_blocks;
126     };  // 4 bytes
127 } __attribute__((packed));
128 
129 // Placed before actual data bytes of each block
130 struct ResponseHeader {
131     FileId file_id;                    // 2 bytes
132     BlockType block_type;              // 1 byte
133     CompressionType compression_type;  // 1 byte
134     BlockIdx block_idx;                // 4 bytes
135     BlockSize block_size;              // 2 bytes
136 
responseSizeForincremental::ResponseHeader137     static constexpr size_t responseSizeFor(size_t dataSize) {
138         return dataSize + sizeof(ResponseHeader);
139     }
140 } __attribute__((packed));
141 
142 template <size_t Size = kBlockSize>
143 struct BlockBuffer {
144     ResponseHeader header;
145     char data[Size];
146 } __attribute__((packed));
147 
148 // Holds streaming state for a file
149 class File {
150   public:
151     // Plain file
File(const char * filepath,FileId id,int64_t size,unique_fd fd,int64_t tree_offset,unique_fd tree_fd)152     File(const char* filepath, FileId id, int64_t size, unique_fd fd, int64_t tree_offset,
153          unique_fd tree_fd)
154         : File(filepath, id, size, tree_offset) {
155         this->fd_ = std::move(fd);
156         this->tree_fd_ = std::move(tree_fd);
157         priority_blocks_ = PriorityBlocksForFile(filepath, fd_.get(), size);
158     }
ReadDataBlock(BlockIdx block_idx,void * buf,bool * is_zip_compressed) const159     int64_t ReadDataBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed) const {
160         int64_t bytes_read = -1;
161         const off64_t offsetStart = blockIndexToOffset(block_idx);
162         bytes_read = adb_pread(fd_, buf, kBlockSize, offsetStart);
163         return bytes_read;
164     }
ReadTreeBlock(BlockIdx block_idx,void * buf) const165     int64_t ReadTreeBlock(BlockIdx block_idx, void* buf) const {
166         int64_t bytes_read = -1;
167         const off64_t offsetStart = tree_offset_ + blockIndexToOffset(block_idx);
168         bytes_read = adb_pread(tree_fd_, buf, kBlockSize, offsetStart);
169         return bytes_read;
170     }
171 
PriorityBlocks() const172     const std::vector<BlockIdx>& PriorityBlocks() const { return priority_blocks_; }
173 
hasTree() const174     bool hasTree() const { return tree_fd_.ok(); }
175 
176     std::vector<bool> sentBlocks;
177     NumBlocks sentBlocksCount = 0;
178 
179     std::vector<bool> sentTreeBlocks;
180 
181     const char* const filepath;
182     const FileId id;
183     const int64_t size;
184 
185   private:
File(const char * filepath,FileId id,int64_t size,int64_t tree_offset)186     File(const char* filepath, FileId id, int64_t size, int64_t tree_offset)
187         : filepath(filepath), id(id), size(size), tree_offset_(tree_offset) {
188         sentBlocks.resize(numBytesToNumBlocks(size));
189         sentTreeBlocks.resize(verity_tree_blocks_for_file(size));
190     }
191     unique_fd fd_;
192     std::vector<BlockIdx> priority_blocks_;
193 
194     unique_fd tree_fd_;
195     const int64_t tree_offset_;
196 };
197 
198 class IncrementalServer {
199   public:
IncrementalServer(unique_fd adb_fd,unique_fd output_fd,std::vector<File> files)200     IncrementalServer(unique_fd adb_fd, unique_fd output_fd, std::vector<File> files)
201         : adb_fd_(std::move(adb_fd)), output_fd_(std::move(output_fd)), files_(std::move(files)) {
202         buffer_.reserve(kReadBufferSize);
203         pendingBlocksBuffer_.resize(kChunkFlushSize + 2 * kBlockSize);
204         pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader);
205     }
206 
207     bool Serve();
208 
209   private:
210     struct PrefetchState {
211         const File* file;
212         BlockIdx overallIndex = 0;
213         BlockIdx overallEnd = 0;
214         BlockIdx priorityIndex = 0;
215 
PrefetchStateincremental::IncrementalServer::PrefetchState216         explicit PrefetchState(const File& f, BlockIdx start, int count)
217             : file(&f),
218               overallIndex(start),
219               overallEnd(std::min<BlockIdx>(start + count, f.sentBlocks.size())) {}
220 
PrefetchStateincremental::IncrementalServer::PrefetchState221         explicit PrefetchState(const File& f)
222             : PrefetchState(f, 0, (BlockIdx)f.sentBlocks.size()) {}
223 
doneincremental::IncrementalServer::PrefetchState224         bool done() const {
225             const bool overallSent = (overallIndex >= overallEnd);
226             if (file->PriorityBlocks().empty()) {
227                 return overallSent;
228             }
229             return overallSent && (priorityIndex >= (BlockIdx)file->PriorityBlocks().size());
230         }
231     };
232 
233     bool SkipToRequest(void* buffer, size_t* size, bool blocking);
234     std::optional<RequestCommand> ReadRequest(bool blocking);
235 
erase_buffer_head(int count)236     void erase_buffer_head(int count) { buffer_.erase(buffer_.begin(), buffer_.begin() + count); }
237 
238     enum class SendResult { Sent, Skipped, Error };
239     SendResult SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush = false);
240 
241     bool SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx);
242     bool SendTreeBlocksForDataBlock(FileId fileId, BlockIdx blockIdx);
243 
244     bool SendDone();
245     void RunPrefetching();
246 
247     void Send(const void* data, size_t size, bool flush);
248     void Flush();
249     using TimePoint = decltype(std::chrono::high_resolution_clock::now());
250     bool ServingComplete(std::optional<TimePoint> startTime, int missesCount, int missesSent);
251 
252     unique_fd const adb_fd_;
253     unique_fd const output_fd_;
254     std::vector<File> files_;
255 
256     // Incoming data buffer.
257     std::vector<char> buffer_;
258 
259     std::deque<PrefetchState> prefetches_;
260     int compressed_ = 0, uncompressed_ = 0;
261     long long sentSize_ = 0;
262 
263     static constexpr auto kChunkFlushSize = 31 * kBlockSize;
264 
265     std::vector<char> pendingBlocksBuffer_;
266     char* pendingBlocks_ = nullptr;
267 
268     // True when client notifies that all the data has been received
269     bool servingComplete_ = false;
270 };
271 
SkipToRequest(void * buffer,size_t * size,bool blocking)272 bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) {
273     while (true) {
274         // Looking for INCR magic.
275         bool magic_found = false;
276         int bcur = 0;
277         int bsize = buffer_.size();
278         for (bcur = 0; bcur + 4 < bsize; ++bcur) {
279             uint32_t magic = be32toh(*(uint32_t*)(buffer_.data() + bcur));
280             if (magic == INCR) {
281                 magic_found = true;
282                 break;
283             }
284         }
285 
286         if (bcur > 0) {
287             // output the rest.
288             (void)WriteFdExactly(output_fd_, buffer_.data(), bcur);
289             erase_buffer_head(bcur);
290         }
291 
292         if (magic_found && buffer_.size() >= *size + sizeof(INCR)) {
293             // fine, return
294             memcpy(buffer, buffer_.data() + sizeof(INCR), *size);
295             erase_buffer_head(*size + sizeof(INCR));
296             return true;
297         }
298 
299         adb_pollfd pfd = {adb_fd_.get(), POLLIN, 0};
300         auto res = adb_poll(&pfd, 1, blocking ? kPollTimeoutMillis : 0);
301 
302         if (res != 1) {
303             auto err = errno;
304             (void)WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
305             if (res < 0) {
306                 D("Failed to poll: %s", strerror(err));
307                 return false;
308             }
309             if (blocking) {
310                 fprintf(stderr, "Timed out waiting for data from device.\n");
311             }
312             if (blocking && servingComplete_) {
313                 // timeout waiting from client. Serving is complete, so quit.
314                 return false;
315             }
316             *size = 0;
317             return true;
318         }
319 
320         bsize = buffer_.size();
321         buffer_.resize(kReadBufferSize);
322         int r = adb_read(adb_fd_, buffer_.data() + bsize, kReadBufferSize - bsize);
323         if (r > 0) {
324             buffer_.resize(bsize + r);
325             continue;
326         }
327 
328         D("Failed to read from fd %d: %d. Exit", adb_fd_.get(), errno);
329         break;
330     }
331     // socket is closed. print remaining messages
332     WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
333     return false;
334 }
335 
ReadRequest(bool blocking)336 std::optional<RequestCommand> IncrementalServer::ReadRequest(bool blocking) {
337     uint8_t commandBuf[sizeof(RequestCommand)];
338     auto size = sizeof(commandBuf);
339     if (!SkipToRequest(&commandBuf, &size, blocking)) {
340         return {{DESTROY}};
341     }
342     if (size < sizeof(RequestCommand)) {
343         return {};
344     }
345     RequestCommand request;
346     request.request_type = readBigEndian<RequestType>(&commandBuf[0]);
347     request.file_id = readBigEndian<FileId>(&commandBuf[2]);
348     request.block_idx = readBigEndian<BlockIdx>(&commandBuf[4]);
349     return request;
350 }
351 
SendTreeBlocksForDataBlock(const FileId fileId,const BlockIdx blockIdx)352 bool IncrementalServer::SendTreeBlocksForDataBlock(const FileId fileId, const BlockIdx blockIdx) {
353     auto& file = files_[fileId];
354     if (!file.hasTree()) {
355         return true;
356     }
357     const int32_t data_block_count = numBytesToNumBlocks(file.size);
358 
359     const int32_t total_nodes_count(file.sentTreeBlocks.size());
360     const int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock;
361 
362     const int32_t leaf_nodes_offset = total_nodes_count - leaf_nodes_count;
363 
364     // Leaf level, sending only 1 block.
365     const int32_t leaf_idx = leaf_nodes_offset + blockIdx / kHashesPerBlock;
366     if (file.sentTreeBlocks[leaf_idx]) {
367         return true;
368     }
369     if (!SendTreeBlock(fileId, blockIdx, leaf_idx)) {
370         return false;
371     }
372     file.sentTreeBlocks[leaf_idx] = true;
373 
374     // Non-leaf, sending EVERYTHING. This should be done only once.
375     if (leaf_nodes_offset == 0 || file.sentTreeBlocks[0]) {
376         return true;
377     }
378 
379     for (int32_t i = 0; i < leaf_nodes_offset; ++i) {
380         if (!SendTreeBlock(fileId, blockIdx, i)) {
381             return false;
382         }
383         file.sentTreeBlocks[i] = true;
384     }
385     return true;
386 }
387 
SendTreeBlock(FileId fileId,int32_t fileBlockIdx,BlockIdx blockIdx)388 bool IncrementalServer::SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx) {
389     const auto& file = files_[fileId];
390 
391     BlockBuffer buffer;
392     const int64_t bytesRead = file.ReadTreeBlock(blockIdx, buffer.data);
393     if (bytesRead <= 0) {
394         fprintf(stderr, "Failed to get data for %s.idsig at blockIdx=%d.\n", file.filepath,
395                 blockIdx);
396         return false;
397     }
398 
399     buffer.header.compression_type = kCompressionNone;
400     buffer.header.block_type = kTypeHash;
401     buffer.header.file_id = toBigEndian(fileId);
402     buffer.header.block_size = toBigEndian(int16_t(bytesRead));
403     buffer.header.block_idx = toBigEndian(blockIdx);
404 
405     Send(&buffer, ResponseHeader::responseSizeFor(bytesRead), /*flush=*/false);
406 
407     return true;
408 }
409 
SendDataBlock(FileId fileId,BlockIdx blockIdx,bool flush)410 auto IncrementalServer::SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush) -> SendResult {
411     auto& file = files_[fileId];
412     if (blockIdx >= static_cast<long>(file.sentBlocks.size())) {
413         // may happen as we schedule some extra blocks for reported page misses
414         D("Skipped reading file %s at block %" PRId32 " (past end).", file.filepath, blockIdx);
415         return SendResult::Skipped;
416     }
417     if (file.sentBlocks[blockIdx]) {
418         return SendResult::Skipped;
419     }
420 
421     if (!SendTreeBlocksForDataBlock(fileId, blockIdx)) {
422         return SendResult::Error;
423     }
424 
425     BlockBuffer raw;
426     bool isZipCompressed = false;
427     const int64_t bytesRead = file.ReadDataBlock(blockIdx, raw.data, &isZipCompressed);
428     if (bytesRead < 0) {
429         fprintf(stderr, "Failed to get data for %s at blockIdx=%d (%d).\n", file.filepath, blockIdx,
430                 errno);
431         return SendResult::Error;
432     }
433 
434     BlockBuffer<kCompressBound> compressed;
435     int16_t compressedSize = 0;
436     if (!isZipCompressed) {
437         compressedSize = LZ4_compress_default(raw.data, compressed.data, bytesRead, kCompressBound);
438     }
439     int16_t blockSize;
440     ResponseHeader* header;
441     if (compressedSize > 0 && compressedSize < kCompressedSizeMax) {
442         ++compressed_;
443         blockSize = compressedSize;
444         header = &compressed.header;
445         header->compression_type = kCompressionLZ4;
446     } else {
447         ++uncompressed_;
448         blockSize = bytesRead;
449         header = &raw.header;
450         header->compression_type = kCompressionNone;
451     }
452 
453     header->block_type = kTypeData;
454     header->file_id = toBigEndian(fileId);
455     header->block_size = toBigEndian(blockSize);
456     header->block_idx = toBigEndian(blockIdx);
457 
458     file.sentBlocks[blockIdx] = true;
459     file.sentBlocksCount += 1;
460     Send(header, ResponseHeader::responseSizeFor(blockSize), flush);
461 
462     return SendResult::Sent;
463 }
464 
SendDone()465 bool IncrementalServer::SendDone() {
466     ResponseHeader header;
467     header.file_id = -1;
468     header.block_type = 0;
469     header.compression_type = 0;
470     header.block_idx = 0;
471     header.block_size = 0;
472     Send(&header, sizeof(header), true);
473     return true;
474 }
475 
RunPrefetching()476 void IncrementalServer::RunPrefetching() {
477     constexpr auto kPrefetchBlocksPerIteration = 128;
478 
479     int blocksToSend = kPrefetchBlocksPerIteration;
480     while (!prefetches_.empty() && blocksToSend > 0) {
481         auto& prefetch = prefetches_.front();
482         const auto& file = *prefetch.file;
483         const auto& priority_blocks = file.PriorityBlocks();
484         if (!priority_blocks.empty()) {
485             for (auto& i = prefetch.priorityIndex;
486                  blocksToSend > 0 && i < (BlockIdx)priority_blocks.size(); ++i) {
487                 if (auto res = SendDataBlock(file.id, priority_blocks[i]);
488                     res == SendResult::Sent) {
489                     --blocksToSend;
490                 } else if (res == SendResult::Error) {
491                     fprintf(stderr, "Failed to send priority block %" PRId32 "\n", i);
492                 }
493             }
494         }
495         for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) {
496             if (auto res = SendDataBlock(file.id, i); res == SendResult::Sent) {
497                 --blocksToSend;
498             } else if (res == SendResult::Error) {
499                 fprintf(stderr, "Failed to send block %" PRId32 "\n", i);
500             }
501         }
502         if (prefetch.done()) {
503             prefetches_.pop_front();
504         }
505     }
506 }
507 
Send(const void * data,size_t size,bool flush)508 void IncrementalServer::Send(const void* data, size_t size, bool flush) {
509     pendingBlocks_ = std::copy_n(static_cast<const char*>(data), size, pendingBlocks_);
510     if (flush || pendingBlocks_ - pendingBlocksBuffer_.data() > kChunkFlushSize) {
511         Flush();
512     }
513 }
514 
Flush()515 void IncrementalServer::Flush() {
516     auto dataBytes = pendingBlocks_ - (pendingBlocksBuffer_.data() + sizeof(ChunkHeader));
517     if (dataBytes == 0) {
518         return;
519     }
520 
521     *(ChunkHeader*)pendingBlocksBuffer_.data() = toBigEndian<int32_t>(dataBytes);
522     auto totalBytes = sizeof(ChunkHeader) + dataBytes;
523     if (!WriteFdExactly(adb_fd_, pendingBlocksBuffer_.data(), totalBytes)) {
524         fprintf(stderr, "Failed to write %d bytes\n", int(totalBytes));
525     }
526     sentSize_ += totalBytes;
527     pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader);
528 }
529 
ServingComplete(std::optional<TimePoint> startTime,int missesCount,int missesSent)530 bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int missesCount,
531                                         int missesSent) {
532     servingComplete_ = true;
533     using namespace std::chrono;
534     auto endTime = high_resolution_clock::now();
535     D("Streaming completed.\n"
536       "Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: "
537       "%d, mb: %.3f\n"
538       "Total time taken: %.3fms",
539       missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0,
540       duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / 1000.0);
541     return true;
542 }
543 
Serve()544 bool IncrementalServer::Serve() {
545     // Initial handshake to verify connection is still alive
546     if (!SendOkay(adb_fd_)) {
547         fprintf(stderr, "Connection is dead. Abort.\n");
548         return false;
549     }
550 
551     std::unordered_set<FileId> prefetchedFiles;
552     bool doneSent = false;
553     int missesCount = 0;
554     int missesSent = 0;
555 
556     using namespace std::chrono;
557     std::optional<TimePoint> startTime;
558 
559     while (true) {
560         if (!doneSent && prefetches_.empty() &&
561             std::all_of(files_.begin(), files_.end(), [](const File& f) {
562                 return f.sentBlocksCount == NumBlocks(f.sentBlocks.size());
563             })) {
564             fprintf(stderr, "All files should be loaded. Notifying the device.\n");
565             SendDone();
566             doneSent = true;
567         }
568 
569         const bool blocking = prefetches_.empty();
570         if (blocking) {
571             // We've no idea how long the blocking call is, so let's flush whatever is still unsent.
572             Flush();
573         }
574         auto request = ReadRequest(blocking);
575 
576         if (!startTime) {
577             startTime = high_resolution_clock::now();
578         }
579 
580         if (request) {
581             FileId fileId = request->file_id;
582             BlockIdx blockIdx = request->block_idx;
583 
584             switch (request->request_type) {
585                 case DESTROY: {
586                     // Stop everything.
587                     return true;
588                 }
589                 case SERVING_COMPLETE: {
590                     // Not stopping the server here.
591                     ServingComplete(startTime, missesCount, missesSent);
592                     break;
593                 }
594                 case BLOCK_MISSING: {
595                     ++missesCount;
596                     // Sends one single block ASAP.
597                     if (fileId < 0 || fileId >= (FileId)files_.size() || blockIdx < 0 ||
598                         blockIdx >= (BlockIdx)files_[fileId].sentBlocks.size()) {
599                         fprintf(stderr,
600                                 "Received invalid data request for file_id %" PRId16
601                                 " block_idx %" PRId32 ".\n",
602                                 fileId, blockIdx);
603                         break;
604                     }
605 
606                     if (VLOG_IS_ON(INCREMENTAL)) {
607                         auto& file = files_[fileId];
608                         auto posP = std::find(file.PriorityBlocks().begin(),
609                                               file.PriorityBlocks().end(), blockIdx);
610                         D("\tMISSING BLOCK: reading file %d block %04d (in priority: %d of %d)",
611                           (int)fileId, (int)blockIdx,
612                           posP == file.PriorityBlocks().end()
613                                   ? -1
614                                   : int(posP - file.PriorityBlocks().begin()),
615                           int(file.PriorityBlocks().size()));
616                     }
617 
618                     if (auto res = SendDataBlock(fileId, blockIdx, true);
619                         res == SendResult::Error) {
620                         fprintf(stderr, "Failed to send block %" PRId32 ".\n", blockIdx);
621                     } else if (res == SendResult::Sent) {
622                         ++missesSent;
623                         // Make sure we send more pages from this place onward, in case if the OS is
624                         // reading a bigger block.
625                         prefetches_.emplace_front(files_[fileId], blockIdx + 1, 7);
626                     }
627                     break;
628                 }
629                 case PREFETCH: {
630                     // Start prefetching for a file
631                     if (fileId < 0) {
632                         fprintf(stderr,
633                                 "Received invalid prefetch request for file_id %" PRId16 "\n",
634                                 fileId);
635                         break;
636                     }
637                     if (!prefetchedFiles.insert(fileId).second) {
638                         fprintf(stderr,
639                                 "Received duplicate prefetch request for file_id %" PRId16 "\n",
640                                 fileId);
641                         break;
642                     }
643                     D("Received prefetch request for file_id %" PRId16 ".", fileId);
644                     prefetches_.emplace_back(files_[fileId]);
645                     break;
646                 }
647                 default:
648                     fprintf(stderr, "Invalid request %" PRId16 ",%" PRId16 ",%" PRId32 ".\n",
649                             request->request_type, fileId, blockIdx);
650                     break;
651             }
652         }
653 
654         RunPrefetching();
655     }
656 }
657 
open_fd(const char * filepath)658 static std::pair<unique_fd, int64_t> open_fd(const char* filepath) {
659     struct stat st;
660     if (stat(filepath, &st)) {
661         error_exit("inc-server: failed to stat input file '%s'.", filepath);
662     }
663 
664     unique_fd fd(adb_open(filepath, O_RDONLY));
665     if (fd < 0) {
666         error_exit("inc-server: failed to open file '%s'.", filepath);
667     }
668 
669     return {std::move(fd), st.st_size};
670 }
671 
open_signature(int64_t file_size,const char * filepath)672 static std::pair<unique_fd, int64_t> open_signature(int64_t file_size, const char* filepath) {
673     std::string signature_file(filepath);
674     signature_file += IDSIG;
675 
676     unique_fd fd(adb_open(signature_file.c_str(), O_RDONLY));
677     if (fd < 0) {
678         D("No signature file found for '%s'('%s')", filepath, signature_file.c_str());
679         return {};
680     }
681 
682     auto [tree_offset, tree_size] = skip_id_sig_headers(fd);
683     if (auto expected = verity_tree_size_for_file(file_size); tree_size != expected) {
684         error_exit("Verity tree size mismatch in signature file: %s [was %lld, expected %lld].\n",
685                    signature_file.c_str(), (long long)tree_size, (long long)expected);
686     }
687 
688     int32_t data_block_count = numBytesToNumBlocks(file_size);
689     int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock;
690     D("Verity tree loaded: %s, tree size: %d (%d blocks, %d leafs)", signature_file.c_str(),
691       int(tree_size), int(numBytesToNumBlocks(tree_size)), int(leaf_nodes_count));
692 
693     return {std::move(fd), tree_offset};
694 }
695 
serve(int connection_fd,int output_fd,int argc,const char ** argv)696 bool serve(int connection_fd, int output_fd, int argc, const char** argv) {
697     auto connection_ufd = unique_fd(connection_fd);
698     auto output_ufd = unique_fd(output_fd);
699     if (argc <= 0) {
700         error_exit("inc-server: must specify at least one file.");
701     }
702 
703     std::vector<File> files;
704     files.reserve(argc);
705     for (int i = 0; i < argc; ++i) {
706         auto filepath = argv[i];
707 
708         auto [file_fd, file_size] = open_fd(filepath);
709         auto [sign_fd, sign_offset] = open_signature(file_size, filepath);
710 
711         files.emplace_back(filepath, i, file_size, std::move(file_fd), sign_offset,
712                            std::move(sign_fd));
713     }
714 
715     IncrementalServer server(std::move(connection_ufd), std::move(output_ufd), std::move(files));
716     printf("Serving...\n");
717     fclose(stdin);
718     fclose(stdout);
719     return server.Serve();
720 }
721 
722 }  // namespace incremental
723