1 /*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define TRACE_TAG INCREMENTAL
18
19 #include "incremental_server.h"
20
21 #include <android-base/endian.h>
22 #include <android-base/strings.h>
23 #include <inttypes.h>
24 #include <lz4.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29
30 #include <array>
31 #include <deque>
32 #include <fstream>
33 #include <thread>
34 #include <type_traits>
35 #include <unordered_set>
36
37 #include "adb.h"
38 #include "adb_io.h"
39 #include "adb_trace.h"
40 #include "adb_unique_fd.h"
41 #include "adb_utils.h"
42 #include "incremental_utils.h"
43 #include "sysdeps.h"
44
45 namespace incremental {
46
47 static constexpr int kHashesPerBlock = kBlockSize / kDigestSize;
48 static constexpr int kCompressedSizeMax = kBlockSize * 0.95;
49 static constexpr int8_t kTypeData = 0;
50 static constexpr int8_t kTypeHash = 1;
51 static constexpr int8_t kCompressionNone = 0;
52 static constexpr int8_t kCompressionLZ4 = 1;
53 static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize));
54 static constexpr auto kReadBufferSize = 128 * 1024;
55 static constexpr int kPollTimeoutMillis = 300000; // 5 minutes
56
57 using BlockSize = int16_t;
58 using FileId = int16_t;
59 using BlockIdx = int32_t;
60 using NumBlocks = int32_t;
61 using BlockType = int8_t;
62 using CompressionType = int8_t;
63 using RequestType = int16_t;
64 using ChunkHeader = int32_t;
65 using MagicType = uint32_t;
66
67 static constexpr MagicType INCR = 0x494e4352; // LE INCR
68
69 static constexpr RequestType SERVING_COMPLETE = 0;
70 static constexpr RequestType BLOCK_MISSING = 1;
71 static constexpr RequestType PREFETCH = 2;
72 static constexpr RequestType DESTROY = 3;
73
roundDownToBlockOffset(int64_t val)74 static constexpr inline int64_t roundDownToBlockOffset(int64_t val) {
75 return val & ~(kBlockSize - 1);
76 }
77
roundUpToBlockOffset(int64_t val)78 static constexpr inline int64_t roundUpToBlockOffset(int64_t val) {
79 return roundDownToBlockOffset(val + kBlockSize - 1);
80 }
81
numBytesToNumBlocks(int64_t bytes)82 static constexpr inline NumBlocks numBytesToNumBlocks(int64_t bytes) {
83 return roundUpToBlockOffset(bytes) / kBlockSize;
84 }
85
blockIndexToOffset(BlockIdx blockIdx)86 static constexpr inline off64_t blockIndexToOffset(BlockIdx blockIdx) {
87 return static_cast<off64_t>(blockIdx) * kBlockSize;
88 }
89
90 template <typename T>
toBigEndian(T t)91 static inline constexpr T toBigEndian(T t) {
92 using unsigned_type = std::make_unsigned_t<T>;
93 if constexpr (std::is_same_v<T, int16_t>) {
94 return htobe16(static_cast<unsigned_type>(t));
95 } else if constexpr (std::is_same_v<T, int32_t>) {
96 return htobe32(static_cast<unsigned_type>(t));
97 } else if constexpr (std::is_same_v<T, int64_t>) {
98 return htobe64(static_cast<unsigned_type>(t));
99 } else {
100 return t;
101 }
102 }
103
104 template <typename T>
readBigEndian(void * data)105 static inline constexpr T readBigEndian(void* data) {
106 using unsigned_type = std::make_unsigned_t<T>;
107 if constexpr (std::is_same_v<T, int16_t>) {
108 return static_cast<T>(be16toh(*reinterpret_cast<unsigned_type*>(data)));
109 } else if constexpr (std::is_same_v<T, int32_t>) {
110 return static_cast<T>(be32toh(*reinterpret_cast<unsigned_type*>(data)));
111 } else if constexpr (std::is_same_v<T, int64_t>) {
112 return static_cast<T>(be64toh(*reinterpret_cast<unsigned_type*>(data)));
113 } else {
114 return T();
115 }
116 }
117
118 // Received from device
119 // !Does not include magic!
120 struct RequestCommand {
121 RequestType request_type; // 2 bytes
122 FileId file_id; // 2 bytes
123 union {
124 BlockIdx block_idx;
125 NumBlocks num_blocks;
126 }; // 4 bytes
127 } __attribute__((packed));
128
129 // Placed before actual data bytes of each block
130 struct ResponseHeader {
131 FileId file_id; // 2 bytes
132 BlockType block_type; // 1 byte
133 CompressionType compression_type; // 1 byte
134 BlockIdx block_idx; // 4 bytes
135 BlockSize block_size; // 2 bytes
136
responseSizeForincremental::ResponseHeader137 static constexpr size_t responseSizeFor(size_t dataSize) {
138 return dataSize + sizeof(ResponseHeader);
139 }
140 } __attribute__((packed));
141
142 template <size_t Size = kBlockSize>
143 struct BlockBuffer {
144 ResponseHeader header;
145 char data[Size];
146 } __attribute__((packed));
147
148 // Holds streaming state for a file
149 class File {
150 public:
151 // Plain file
File(const char * filepath,FileId id,int64_t size,unique_fd fd,int64_t tree_offset,unique_fd tree_fd)152 File(const char* filepath, FileId id, int64_t size, unique_fd fd, int64_t tree_offset,
153 unique_fd tree_fd)
154 : File(filepath, id, size, tree_offset) {
155 this->fd_ = std::move(fd);
156 this->tree_fd_ = std::move(tree_fd);
157 priority_blocks_ = PriorityBlocksForFile(filepath, fd_.get(), size);
158 }
ReadDataBlock(BlockIdx block_idx,void * buf,bool * is_zip_compressed) const159 int64_t ReadDataBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed) const {
160 int64_t bytes_read = -1;
161 const off64_t offsetStart = blockIndexToOffset(block_idx);
162 bytes_read = adb_pread(fd_, buf, kBlockSize, offsetStart);
163 return bytes_read;
164 }
ReadTreeBlock(BlockIdx block_idx,void * buf) const165 int64_t ReadTreeBlock(BlockIdx block_idx, void* buf) const {
166 int64_t bytes_read = -1;
167 const off64_t offsetStart = tree_offset_ + blockIndexToOffset(block_idx);
168 bytes_read = adb_pread(tree_fd_, buf, kBlockSize, offsetStart);
169 return bytes_read;
170 }
171
PriorityBlocks() const172 const std::vector<BlockIdx>& PriorityBlocks() const { return priority_blocks_; }
173
hasTree() const174 bool hasTree() const { return tree_fd_.ok(); }
175
176 std::vector<bool> sentBlocks;
177 NumBlocks sentBlocksCount = 0;
178
179 std::vector<bool> sentTreeBlocks;
180
181 const char* const filepath;
182 const FileId id;
183 const int64_t size;
184
185 private:
File(const char * filepath,FileId id,int64_t size,int64_t tree_offset)186 File(const char* filepath, FileId id, int64_t size, int64_t tree_offset)
187 : filepath(filepath), id(id), size(size), tree_offset_(tree_offset) {
188 sentBlocks.resize(numBytesToNumBlocks(size));
189 sentTreeBlocks.resize(verity_tree_blocks_for_file(size));
190 }
191 unique_fd fd_;
192 std::vector<BlockIdx> priority_blocks_;
193
194 unique_fd tree_fd_;
195 const int64_t tree_offset_;
196 };
197
198 class IncrementalServer {
199 public:
IncrementalServer(unique_fd adb_fd,unique_fd output_fd,std::vector<File> files)200 IncrementalServer(unique_fd adb_fd, unique_fd output_fd, std::vector<File> files)
201 : adb_fd_(std::move(adb_fd)), output_fd_(std::move(output_fd)), files_(std::move(files)) {
202 buffer_.reserve(kReadBufferSize);
203 pendingBlocksBuffer_.resize(kChunkFlushSize + 2 * kBlockSize);
204 pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader);
205 }
206
207 bool Serve();
208
209 private:
210 struct PrefetchState {
211 const File* file;
212 BlockIdx overallIndex = 0;
213 BlockIdx overallEnd = 0;
214 BlockIdx priorityIndex = 0;
215
PrefetchStateincremental::IncrementalServer::PrefetchState216 explicit PrefetchState(const File& f, BlockIdx start, int count)
217 : file(&f),
218 overallIndex(start),
219 overallEnd(std::min<BlockIdx>(start + count, f.sentBlocks.size())) {}
220
PrefetchStateincremental::IncrementalServer::PrefetchState221 explicit PrefetchState(const File& f)
222 : PrefetchState(f, 0, (BlockIdx)f.sentBlocks.size()) {}
223
doneincremental::IncrementalServer::PrefetchState224 bool done() const {
225 const bool overallSent = (overallIndex >= overallEnd);
226 if (file->PriorityBlocks().empty()) {
227 return overallSent;
228 }
229 return overallSent && (priorityIndex >= (BlockIdx)file->PriorityBlocks().size());
230 }
231 };
232
233 bool SkipToRequest(void* buffer, size_t* size, bool blocking);
234 std::optional<RequestCommand> ReadRequest(bool blocking);
235
erase_buffer_head(int count)236 void erase_buffer_head(int count) { buffer_.erase(buffer_.begin(), buffer_.begin() + count); }
237
238 enum class SendResult { Sent, Skipped, Error };
239 SendResult SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush = false);
240
241 bool SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx);
242 bool SendTreeBlocksForDataBlock(FileId fileId, BlockIdx blockIdx);
243
244 bool SendDone();
245 void RunPrefetching();
246
247 void Send(const void* data, size_t size, bool flush);
248 void Flush();
249 using TimePoint = decltype(std::chrono::high_resolution_clock::now());
250 bool ServingComplete(std::optional<TimePoint> startTime, int missesCount, int missesSent);
251
252 unique_fd const adb_fd_;
253 unique_fd const output_fd_;
254 std::vector<File> files_;
255
256 // Incoming data buffer.
257 std::vector<char> buffer_;
258
259 std::deque<PrefetchState> prefetches_;
260 int compressed_ = 0, uncompressed_ = 0;
261 long long sentSize_ = 0;
262
263 static constexpr auto kChunkFlushSize = 31 * kBlockSize;
264
265 std::vector<char> pendingBlocksBuffer_;
266 char* pendingBlocks_ = nullptr;
267
268 // True when client notifies that all the data has been received
269 bool servingComplete_ = false;
270 };
271
SkipToRequest(void * buffer,size_t * size,bool blocking)272 bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) {
273 while (true) {
274 // Looking for INCR magic.
275 bool magic_found = false;
276 int bcur = 0;
277 int bsize = buffer_.size();
278 for (bcur = 0; bcur + 4 < bsize; ++bcur) {
279 uint32_t magic = be32toh(*(uint32_t*)(buffer_.data() + bcur));
280 if (magic == INCR) {
281 magic_found = true;
282 break;
283 }
284 }
285
286 if (bcur > 0) {
287 // output the rest.
288 (void)WriteFdExactly(output_fd_, buffer_.data(), bcur);
289 erase_buffer_head(bcur);
290 }
291
292 if (magic_found && buffer_.size() >= *size + sizeof(INCR)) {
293 // fine, return
294 memcpy(buffer, buffer_.data() + sizeof(INCR), *size);
295 erase_buffer_head(*size + sizeof(INCR));
296 return true;
297 }
298
299 adb_pollfd pfd = {adb_fd_.get(), POLLIN, 0};
300 auto res = adb_poll(&pfd, 1, blocking ? kPollTimeoutMillis : 0);
301
302 if (res != 1) {
303 auto err = errno;
304 (void)WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
305 if (res < 0) {
306 D("Failed to poll: %s", strerror(err));
307 return false;
308 }
309 if (blocking) {
310 fprintf(stderr, "Timed out waiting for data from device.\n");
311 }
312 if (blocking && servingComplete_) {
313 // timeout waiting from client. Serving is complete, so quit.
314 return false;
315 }
316 *size = 0;
317 return true;
318 }
319
320 bsize = buffer_.size();
321 buffer_.resize(kReadBufferSize);
322 int r = adb_read(adb_fd_, buffer_.data() + bsize, kReadBufferSize - bsize);
323 if (r > 0) {
324 buffer_.resize(bsize + r);
325 continue;
326 }
327
328 D("Failed to read from fd %d: %d. Exit", adb_fd_.get(), errno);
329 break;
330 }
331 // socket is closed. print remaining messages
332 WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
333 return false;
334 }
335
ReadRequest(bool blocking)336 std::optional<RequestCommand> IncrementalServer::ReadRequest(bool blocking) {
337 uint8_t commandBuf[sizeof(RequestCommand)];
338 auto size = sizeof(commandBuf);
339 if (!SkipToRequest(&commandBuf, &size, blocking)) {
340 return {{DESTROY}};
341 }
342 if (size < sizeof(RequestCommand)) {
343 return {};
344 }
345 RequestCommand request;
346 request.request_type = readBigEndian<RequestType>(&commandBuf[0]);
347 request.file_id = readBigEndian<FileId>(&commandBuf[2]);
348 request.block_idx = readBigEndian<BlockIdx>(&commandBuf[4]);
349 return request;
350 }
351
SendTreeBlocksForDataBlock(const FileId fileId,const BlockIdx blockIdx)352 bool IncrementalServer::SendTreeBlocksForDataBlock(const FileId fileId, const BlockIdx blockIdx) {
353 auto& file = files_[fileId];
354 if (!file.hasTree()) {
355 return true;
356 }
357 const int32_t data_block_count = numBytesToNumBlocks(file.size);
358
359 const int32_t total_nodes_count(file.sentTreeBlocks.size());
360 const int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock;
361
362 const int32_t leaf_nodes_offset = total_nodes_count - leaf_nodes_count;
363
364 // Leaf level, sending only 1 block.
365 const int32_t leaf_idx = leaf_nodes_offset + blockIdx / kHashesPerBlock;
366 if (file.sentTreeBlocks[leaf_idx]) {
367 return true;
368 }
369 if (!SendTreeBlock(fileId, blockIdx, leaf_idx)) {
370 return false;
371 }
372 file.sentTreeBlocks[leaf_idx] = true;
373
374 // Non-leaf, sending EVERYTHING. This should be done only once.
375 if (leaf_nodes_offset == 0 || file.sentTreeBlocks[0]) {
376 return true;
377 }
378
379 for (int32_t i = 0; i < leaf_nodes_offset; ++i) {
380 if (!SendTreeBlock(fileId, blockIdx, i)) {
381 return false;
382 }
383 file.sentTreeBlocks[i] = true;
384 }
385 return true;
386 }
387
SendTreeBlock(FileId fileId,int32_t fileBlockIdx,BlockIdx blockIdx)388 bool IncrementalServer::SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx) {
389 const auto& file = files_[fileId];
390
391 BlockBuffer buffer;
392 const int64_t bytesRead = file.ReadTreeBlock(blockIdx, buffer.data);
393 if (bytesRead <= 0) {
394 fprintf(stderr, "Failed to get data for %s.idsig at blockIdx=%d.\n", file.filepath,
395 blockIdx);
396 return false;
397 }
398
399 buffer.header.compression_type = kCompressionNone;
400 buffer.header.block_type = kTypeHash;
401 buffer.header.file_id = toBigEndian(fileId);
402 buffer.header.block_size = toBigEndian(int16_t(bytesRead));
403 buffer.header.block_idx = toBigEndian(blockIdx);
404
405 Send(&buffer, ResponseHeader::responseSizeFor(bytesRead), /*flush=*/false);
406
407 return true;
408 }
409
SendDataBlock(FileId fileId,BlockIdx blockIdx,bool flush)410 auto IncrementalServer::SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush) -> SendResult {
411 auto& file = files_[fileId];
412 if (blockIdx >= static_cast<long>(file.sentBlocks.size())) {
413 // may happen as we schedule some extra blocks for reported page misses
414 D("Skipped reading file %s at block %" PRId32 " (past end).", file.filepath, blockIdx);
415 return SendResult::Skipped;
416 }
417 if (file.sentBlocks[blockIdx]) {
418 return SendResult::Skipped;
419 }
420
421 if (!SendTreeBlocksForDataBlock(fileId, blockIdx)) {
422 return SendResult::Error;
423 }
424
425 BlockBuffer raw;
426 bool isZipCompressed = false;
427 const int64_t bytesRead = file.ReadDataBlock(blockIdx, raw.data, &isZipCompressed);
428 if (bytesRead < 0) {
429 fprintf(stderr, "Failed to get data for %s at blockIdx=%d (%d).\n", file.filepath, blockIdx,
430 errno);
431 return SendResult::Error;
432 }
433
434 BlockBuffer<kCompressBound> compressed;
435 int16_t compressedSize = 0;
436 if (!isZipCompressed) {
437 compressedSize = LZ4_compress_default(raw.data, compressed.data, bytesRead, kCompressBound);
438 }
439 int16_t blockSize;
440 ResponseHeader* header;
441 if (compressedSize > 0 && compressedSize < kCompressedSizeMax) {
442 ++compressed_;
443 blockSize = compressedSize;
444 header = &compressed.header;
445 header->compression_type = kCompressionLZ4;
446 } else {
447 ++uncompressed_;
448 blockSize = bytesRead;
449 header = &raw.header;
450 header->compression_type = kCompressionNone;
451 }
452
453 header->block_type = kTypeData;
454 header->file_id = toBigEndian(fileId);
455 header->block_size = toBigEndian(blockSize);
456 header->block_idx = toBigEndian(blockIdx);
457
458 file.sentBlocks[blockIdx] = true;
459 file.sentBlocksCount += 1;
460 Send(header, ResponseHeader::responseSizeFor(blockSize), flush);
461
462 return SendResult::Sent;
463 }
464
SendDone()465 bool IncrementalServer::SendDone() {
466 ResponseHeader header;
467 header.file_id = -1;
468 header.block_type = 0;
469 header.compression_type = 0;
470 header.block_idx = 0;
471 header.block_size = 0;
472 Send(&header, sizeof(header), true);
473 return true;
474 }
475
RunPrefetching()476 void IncrementalServer::RunPrefetching() {
477 constexpr auto kPrefetchBlocksPerIteration = 128;
478
479 int blocksToSend = kPrefetchBlocksPerIteration;
480 while (!prefetches_.empty() && blocksToSend > 0) {
481 auto& prefetch = prefetches_.front();
482 const auto& file = *prefetch.file;
483 const auto& priority_blocks = file.PriorityBlocks();
484 if (!priority_blocks.empty()) {
485 for (auto& i = prefetch.priorityIndex;
486 blocksToSend > 0 && i < (BlockIdx)priority_blocks.size(); ++i) {
487 if (auto res = SendDataBlock(file.id, priority_blocks[i]);
488 res == SendResult::Sent) {
489 --blocksToSend;
490 } else if (res == SendResult::Error) {
491 fprintf(stderr, "Failed to send priority block %" PRId32 "\n", i);
492 }
493 }
494 }
495 for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) {
496 if (auto res = SendDataBlock(file.id, i); res == SendResult::Sent) {
497 --blocksToSend;
498 } else if (res == SendResult::Error) {
499 fprintf(stderr, "Failed to send block %" PRId32 "\n", i);
500 }
501 }
502 if (prefetch.done()) {
503 prefetches_.pop_front();
504 }
505 }
506 }
507
Send(const void * data,size_t size,bool flush)508 void IncrementalServer::Send(const void* data, size_t size, bool flush) {
509 pendingBlocks_ = std::copy_n(static_cast<const char*>(data), size, pendingBlocks_);
510 if (flush || pendingBlocks_ - pendingBlocksBuffer_.data() > kChunkFlushSize) {
511 Flush();
512 }
513 }
514
Flush()515 void IncrementalServer::Flush() {
516 auto dataBytes = pendingBlocks_ - (pendingBlocksBuffer_.data() + sizeof(ChunkHeader));
517 if (dataBytes == 0) {
518 return;
519 }
520
521 *(ChunkHeader*)pendingBlocksBuffer_.data() = toBigEndian<int32_t>(dataBytes);
522 auto totalBytes = sizeof(ChunkHeader) + dataBytes;
523 if (!WriteFdExactly(adb_fd_, pendingBlocksBuffer_.data(), totalBytes)) {
524 fprintf(stderr, "Failed to write %d bytes\n", int(totalBytes));
525 }
526 sentSize_ += totalBytes;
527 pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader);
528 }
529
ServingComplete(std::optional<TimePoint> startTime,int missesCount,int missesSent)530 bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int missesCount,
531 int missesSent) {
532 servingComplete_ = true;
533 using namespace std::chrono;
534 auto endTime = high_resolution_clock::now();
535 D("Streaming completed.\n"
536 "Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: "
537 "%d, mb: %.3f\n"
538 "Total time taken: %.3fms",
539 missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0,
540 duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / 1000.0);
541 return true;
542 }
543
Serve()544 bool IncrementalServer::Serve() {
545 // Initial handshake to verify connection is still alive
546 if (!SendOkay(adb_fd_)) {
547 fprintf(stderr, "Connection is dead. Abort.\n");
548 return false;
549 }
550
551 std::unordered_set<FileId> prefetchedFiles;
552 bool doneSent = false;
553 int missesCount = 0;
554 int missesSent = 0;
555
556 using namespace std::chrono;
557 std::optional<TimePoint> startTime;
558
559 while (true) {
560 if (!doneSent && prefetches_.empty() &&
561 std::all_of(files_.begin(), files_.end(), [](const File& f) {
562 return f.sentBlocksCount == NumBlocks(f.sentBlocks.size());
563 })) {
564 fprintf(stderr, "All files should be loaded. Notifying the device.\n");
565 SendDone();
566 doneSent = true;
567 }
568
569 const bool blocking = prefetches_.empty();
570 if (blocking) {
571 // We've no idea how long the blocking call is, so let's flush whatever is still unsent.
572 Flush();
573 }
574 auto request = ReadRequest(blocking);
575
576 if (!startTime) {
577 startTime = high_resolution_clock::now();
578 }
579
580 if (request) {
581 FileId fileId = request->file_id;
582 BlockIdx blockIdx = request->block_idx;
583
584 switch (request->request_type) {
585 case DESTROY: {
586 // Stop everything.
587 return true;
588 }
589 case SERVING_COMPLETE: {
590 // Not stopping the server here.
591 ServingComplete(startTime, missesCount, missesSent);
592 break;
593 }
594 case BLOCK_MISSING: {
595 ++missesCount;
596 // Sends one single block ASAP.
597 if (fileId < 0 || fileId >= (FileId)files_.size() || blockIdx < 0 ||
598 blockIdx >= (BlockIdx)files_[fileId].sentBlocks.size()) {
599 fprintf(stderr,
600 "Received invalid data request for file_id %" PRId16
601 " block_idx %" PRId32 ".\n",
602 fileId, blockIdx);
603 break;
604 }
605
606 if (VLOG_IS_ON(INCREMENTAL)) {
607 auto& file = files_[fileId];
608 auto posP = std::find(file.PriorityBlocks().begin(),
609 file.PriorityBlocks().end(), blockIdx);
610 D("\tMISSING BLOCK: reading file %d block %04d (in priority: %d of %d)",
611 (int)fileId, (int)blockIdx,
612 posP == file.PriorityBlocks().end()
613 ? -1
614 : int(posP - file.PriorityBlocks().begin()),
615 int(file.PriorityBlocks().size()));
616 }
617
618 if (auto res = SendDataBlock(fileId, blockIdx, true);
619 res == SendResult::Error) {
620 fprintf(stderr, "Failed to send block %" PRId32 ".\n", blockIdx);
621 } else if (res == SendResult::Sent) {
622 ++missesSent;
623 // Make sure we send more pages from this place onward, in case if the OS is
624 // reading a bigger block.
625 prefetches_.emplace_front(files_[fileId], blockIdx + 1, 7);
626 }
627 break;
628 }
629 case PREFETCH: {
630 // Start prefetching for a file
631 if (fileId < 0) {
632 fprintf(stderr,
633 "Received invalid prefetch request for file_id %" PRId16 "\n",
634 fileId);
635 break;
636 }
637 if (!prefetchedFiles.insert(fileId).second) {
638 fprintf(stderr,
639 "Received duplicate prefetch request for file_id %" PRId16 "\n",
640 fileId);
641 break;
642 }
643 D("Received prefetch request for file_id %" PRId16 ".", fileId);
644 prefetches_.emplace_back(files_[fileId]);
645 break;
646 }
647 default:
648 fprintf(stderr, "Invalid request %" PRId16 ",%" PRId16 ",%" PRId32 ".\n",
649 request->request_type, fileId, blockIdx);
650 break;
651 }
652 }
653
654 RunPrefetching();
655 }
656 }
657
open_fd(const char * filepath)658 static std::pair<unique_fd, int64_t> open_fd(const char* filepath) {
659 struct stat st;
660 if (stat(filepath, &st)) {
661 error_exit("inc-server: failed to stat input file '%s'.", filepath);
662 }
663
664 unique_fd fd(adb_open(filepath, O_RDONLY));
665 if (fd < 0) {
666 error_exit("inc-server: failed to open file '%s'.", filepath);
667 }
668
669 return {std::move(fd), st.st_size};
670 }
671
open_signature(int64_t file_size,const char * filepath)672 static std::pair<unique_fd, int64_t> open_signature(int64_t file_size, const char* filepath) {
673 std::string signature_file(filepath);
674 signature_file += IDSIG;
675
676 unique_fd fd(adb_open(signature_file.c_str(), O_RDONLY));
677 if (fd < 0) {
678 D("No signature file found for '%s'('%s')", filepath, signature_file.c_str());
679 return {};
680 }
681
682 auto [tree_offset, tree_size] = skip_id_sig_headers(fd);
683 if (auto expected = verity_tree_size_for_file(file_size); tree_size != expected) {
684 error_exit("Verity tree size mismatch in signature file: %s [was %lld, expected %lld].\n",
685 signature_file.c_str(), (long long)tree_size, (long long)expected);
686 }
687
688 int32_t data_block_count = numBytesToNumBlocks(file_size);
689 int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock;
690 D("Verity tree loaded: %s, tree size: %d (%d blocks, %d leafs)", signature_file.c_str(),
691 int(tree_size), int(numBytesToNumBlocks(tree_size)), int(leaf_nodes_count));
692
693 return {std::move(fd), tree_offset};
694 }
695
serve(int connection_fd,int output_fd,int argc,const char ** argv)696 bool serve(int connection_fd, int output_fd, int argc, const char** argv) {
697 auto connection_ufd = unique_fd(connection_fd);
698 auto output_ufd = unique_fd(output_fd);
699 if (argc <= 0) {
700 error_exit("inc-server: must specify at least one file.");
701 }
702
703 std::vector<File> files;
704 files.reserve(argc);
705 for (int i = 0; i < argc; ++i) {
706 auto filepath = argv[i];
707
708 auto [file_fd, file_size] = open_fd(filepath);
709 auto [sign_fd, sign_offset] = open_signature(file_size, filepath);
710
711 files.emplace_back(filepath, i, file_size, std::move(file_fd), sign_offset,
712 std::move(sign_fd));
713 }
714
715 IncrementalServer server(std::move(connection_ufd), std::move(output_ufd), std::move(files));
716 printf("Serving...\n");
717 fclose(stdin);
718 fclose(stdout);
719 return server.Serve();
720 }
721
722 } // namespace incremental
723