1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <algorithm>
20 #include <memory>
21 #include <span>
22 
23 #include <android-base/logging.h>
24 
25 #include <brotli/decode.h>
26 #include <brotli/encode.h>
27 #include <lz4frame.h>
28 #include <zstd.h>
29 
30 #include "types.h"
31 
32 enum class DecodeResult {
33     Error,
34     Done,
35     NeedInput,
36     MoreOutput,
37 };
38 
39 enum class EncodeResult {
40     Error,
41     Done,
42     NeedInput,
43     MoreOutput,
44 };
45 
46 struct Decoder {
AppendDecoder47     void Append(Block&& block) { input_buffer_.append(std::move(block)); }
FinishDecoder48     bool Finish() {
49         bool old = std::exchange(finished_, true);
50         if (old) {
51             LOG(FATAL) << "Decoder::Finish called while already finished?";
52             return false;
53         }
54         return true;
55     }
56 
57     virtual DecodeResult Decode(std::span<char>* output) = 0;
58 
59   protected:
DecoderDecoder60     Decoder(std::span<char> output_buffer) : output_buffer_(output_buffer) {}
61     ~Decoder() = default;
62 
63     bool finished_ = false;
64     IOVector input_buffer_;
65     std::span<char> output_buffer_;
66 };
67 
68 struct Encoder {
AppendEncoder69     void Append(Block input) { input_buffer_.append(std::move(input)); }
FinishEncoder70     bool Finish() {
71         bool old = std::exchange(finished_, true);
72         if (old) {
73             LOG(FATAL) << "Decoder::Finish called while already finished?";
74             return false;
75         }
76         return true;
77     }
78 
79     virtual EncodeResult Encode(Block* output) = 0;
80 
81   protected:
EncoderEncoder82     explicit Encoder(size_t output_block_size) : output_block_size_(output_block_size) {}
83     ~Encoder() = default;
84 
85     const size_t output_block_size_;
86     bool finished_ = false;
87     IOVector input_buffer_;
88 };
89 
90 struct NullDecoder final : public Decoder {
NullDecoderfinal91     explicit NullDecoder(std::span<char> output_buffer) : Decoder(output_buffer) {}
92 
Decodefinal93     DecodeResult Decode(std::span<char>* output) final {
94         size_t available_out = output_buffer_.size();
95         void* p = output_buffer_.data();
96         while (available_out > 0 && !input_buffer_.empty()) {
97             size_t len = std::min(available_out, input_buffer_.front_size());
98             p = mempcpy(p, input_buffer_.front_data(), len);
99             available_out -= len;
100             input_buffer_.drop_front(len);
101         }
102         *output = std::span(output_buffer_.data(), static_cast<char*>(p));
103         if (input_buffer_.empty()) {
104             return finished_ ? DecodeResult::Done : DecodeResult::NeedInput;
105         }
106         return DecodeResult::MoreOutput;
107     }
108 };
109 
110 struct NullEncoder final : public Encoder {
NullEncoderfinal111     explicit NullEncoder(size_t output_block_size) : Encoder(output_block_size) {}
112 
Encodefinal113     EncodeResult Encode(Block* output) final {
114         output->clear();
115         output->resize(output_block_size_);
116 
117         size_t available_out = output->size();
118         void* p = output->data();
119 
120         while (available_out > 0 && !input_buffer_.empty()) {
121             size_t len = std::min(available_out, input_buffer_.front_size());
122             p = mempcpy(p, input_buffer_.front_data(), len);
123             available_out -= len;
124             input_buffer_.drop_front(len);
125         }
126 
127         output->resize(output->size() - available_out);
128 
129         if (input_buffer_.empty()) {
130             return finished_ ? EncodeResult::Done : EncodeResult::NeedInput;
131         }
132         return EncodeResult::MoreOutput;
133     }
134 };
135 
136 struct BrotliDecoder final : public Decoder {
BrotliDecoderfinal137     explicit BrotliDecoder(std::span<char> output_buffer)
138         : Decoder(output_buffer),
139           decoder_(BrotliDecoderCreateInstance(nullptr, nullptr, nullptr),
140                    BrotliDecoderDestroyInstance) {}
141 
Decodefinal142     DecodeResult Decode(std::span<char>* output) final {
143         size_t available_in = input_buffer_.front_size();
144         const uint8_t* next_in = reinterpret_cast<const uint8_t*>(input_buffer_.front_data());
145 
146         size_t available_out = output_buffer_.size();
147         uint8_t* next_out = reinterpret_cast<uint8_t*>(output_buffer_.data());
148 
149         BrotliDecoderResult r = BrotliDecoderDecompressStream(
150                 decoder_.get(), &available_in, &next_in, &available_out, &next_out, nullptr);
151 
152         size_t bytes_consumed = input_buffer_.front_size() - available_in;
153         input_buffer_.drop_front(bytes_consumed);
154 
155         size_t bytes_emitted = output_buffer_.size() - available_out;
156         *output = std::span<char>(output_buffer_.data(), bytes_emitted);
157 
158         switch (r) {
159             case BROTLI_DECODER_RESULT_SUCCESS:
160                 // We need to wait for ID_DONE from the other end.
161                 return finished_ ? DecodeResult::Done : DecodeResult::NeedInput;
162             case BROTLI_DECODER_RESULT_ERROR:
163                 return DecodeResult::Error;
164             case BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT:
165                 // Brotli guarantees as one of its invariants that if it returns NEEDS_MORE_INPUT,
166                 // it will consume the entire input buffer passed in, so we don't have to worry
167                 // about bytes left over in the front block with more input remaining.
168                 return DecodeResult::NeedInput;
169             case BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT:
170                 return DecodeResult::MoreOutput;
171         }
172     }
173 
174   private:
175     std::unique_ptr<BrotliDecoderState, void (*)(BrotliDecoderState*)> decoder_;
176 };
177 
178 struct BrotliEncoder final : public Encoder {
BrotliEncoderfinal179     explicit BrotliEncoder(size_t output_block_size)
180         : Encoder(output_block_size),
181           output_block_(output_block_size_),
182           output_bytes_left_(output_block_size_),
183           encoder_(BrotliEncoderCreateInstance(nullptr, nullptr, nullptr),
184                    BrotliEncoderDestroyInstance) {
185         BrotliEncoderSetParameter(encoder_.get(), BROTLI_PARAM_QUALITY, 1);
186     }
187 
Encodefinal188     EncodeResult Encode(Block* output) final {
189         output->clear();
190 
191         while (true) {
192             size_t available_in = input_buffer_.front_size();
193             const uint8_t* next_in = reinterpret_cast<const uint8_t*>(input_buffer_.front_data());
194 
195             size_t available_out = output_bytes_left_;
196             uint8_t* next_out = reinterpret_cast<uint8_t*>(
197                     output_block_.data() + (output_block_size_ - output_bytes_left_));
198 
199             BrotliEncoderOperation op = BROTLI_OPERATION_PROCESS;
200             if (finished_) {
201                 op = BROTLI_OPERATION_FINISH;
202             }
203 
204             if (!BrotliEncoderCompressStream(encoder_.get(), op, &available_in, &next_in,
205                                              &available_out, &next_out, nullptr)) {
206                 return EncodeResult::Error;
207             }
208 
209             size_t bytes_consumed = input_buffer_.front_size() - available_in;
210             input_buffer_.drop_front(bytes_consumed);
211 
212             output_bytes_left_ = available_out;
213 
214             if (BrotliEncoderIsFinished(encoder_.get())) {
215                 output_block_.resize(output_block_size_ - output_bytes_left_);
216                 *output = std::move(output_block_);
217                 return EncodeResult::Done;
218             } else if (output_bytes_left_ == 0) {
219                 *output = std::move(output_block_);
220                 output_block_.resize(output_block_size_);
221                 output_bytes_left_ = output_block_size_;
222                 return EncodeResult::MoreOutput;
223             } else if (input_buffer_.empty()) {
224                 return EncodeResult::NeedInput;
225             }
226         }
227     }
228 
229   private:
230     Block output_block_;
231     size_t output_bytes_left_;
232     std::unique_ptr<BrotliEncoderState, void (*)(BrotliEncoderState*)> encoder_;
233 };
234 
235 struct LZ4Decoder final : public Decoder {
LZ4Decoderfinal236     explicit LZ4Decoder(std::span<char> output_buffer)
237         : Decoder(output_buffer), decoder_(nullptr, nullptr) {
238         LZ4F_dctx* dctx;
239         if (LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION) != 0) {
240             LOG(FATAL) << "failed to initialize LZ4 decompression context";
241         }
242         decoder_ = std::unique_ptr<LZ4F_dctx, decltype(&LZ4F_freeDecompressionContext)>(
243                 dctx, LZ4F_freeDecompressionContext);
244     }
245 
Decodefinal246     DecodeResult Decode(std::span<char>* output) final {
247         size_t available_in = input_buffer_.front_size();
248         const char* next_in = input_buffer_.front_data();
249 
250         size_t available_out = output_buffer_.size();
251         char* next_out = output_buffer_.data();
252 
253         size_t rc = LZ4F_decompress(decoder_.get(), next_out, &available_out, next_in,
254                                     &available_in, nullptr);
255         if (LZ4F_isError(rc)) {
256             LOG(ERROR) << "LZ4F_decompress failed: " << LZ4F_getErrorName(rc);
257             return DecodeResult::Error;
258         }
259 
260         input_buffer_.drop_front(available_in);
261 
262         if (rc == 0) {
263             if (!input_buffer_.empty()) {
264                 LOG(ERROR) << "LZ4 stream hit end before reading all data";
265                 return DecodeResult::Error;
266             }
267             lz4_done_ = true;
268         }
269 
270         *output = std::span<char>(output_buffer_.data(), available_out);
271 
272         if (finished_) {
273             return input_buffer_.empty() && lz4_done_ ? DecodeResult::Done
274                                                       : DecodeResult::MoreOutput;
275         }
276 
277         return DecodeResult::NeedInput;
278     }
279 
280   private:
281     bool lz4_done_ = false;
282     std::unique_ptr<LZ4F_dctx, LZ4F_errorCode_t (*)(LZ4F_dctx*)> decoder_;
283 };
284 
285 struct LZ4Encoder final : public Encoder {
LZ4Encoderfinal286     explicit LZ4Encoder(size_t output_block_size)
287         : Encoder(output_block_size), encoder_(nullptr, nullptr) {
288         LZ4F_cctx* cctx;
289         if (LZ4F_createCompressionContext(&cctx, LZ4F_VERSION) != 0) {
290             LOG(FATAL) << "failed to initialize LZ4 compression context";
291         }
292         encoder_ = std::unique_ptr<LZ4F_cctx, decltype(&LZ4F_freeCompressionContext)>(
293                 cctx, LZ4F_freeCompressionContext);
294         Block header(LZ4F_HEADER_SIZE_MAX);
295         size_t rc = LZ4F_compressBegin(encoder_.get(), header.data(), header.size(), nullptr);
296         if (LZ4F_isError(rc)) {
297             LOG(FATAL) << "LZ4F_compressBegin failed: %s", LZ4F_getErrorName(rc);
298         }
299         header.resize(rc);
300         output_buffer_.append(std::move(header));
301     }
302 
303     // As an optimization, only emit a block if we have an entire output block ready, or we're done.
OutputReadyfinal304     bool OutputReady() const {
305         return output_buffer_.size() >= output_block_size_ || lz4_finalized_;
306     }
307 
308     // TODO: Switch the output type to IOVector to remove a copy?
Encodefinal309     EncodeResult Encode(Block* output) final {
310         size_t available_in = input_buffer_.front_size();
311         const char* next_in = input_buffer_.front_data();
312 
313         // LZ4 makes no guarantees about being able to recover from trying to compress with an
314         // insufficiently large output buffer. LZ4F_compressBound tells us how much buffer we
315         // need to compress a given number of bytes, but the smallest value seems to be bigger
316         // than SYNC_DATA_MAX, so we need to buffer ourselves.
317 
318         // Input size chosen to be a local maximum for LZ4F_compressBound (i.e. the block size).
319         constexpr size_t max_input_size = 65536;
320         const size_t encode_block_size = LZ4F_compressBound(max_input_size, nullptr);
321 
322         if (available_in != 0) {
323             if (lz4_finalized_) {
324                 LOG(ERROR) << "LZ4Encoder received data after Finish?";
325                 return EncodeResult::Error;
326             }
327 
328             available_in = std::min(available_in, max_input_size);
329 
330             Block encode_block(encode_block_size);
331             size_t available_out = encode_block.capacity();
332             char* next_out = encode_block.data();
333 
334             size_t rc = LZ4F_compressUpdate(encoder_.get(), next_out, available_out, next_in,
335                                             available_in, nullptr);
336             if (LZ4F_isError(rc)) {
337                 LOG(ERROR) << "LZ4F_compressUpdate failed: " << LZ4F_getErrorName(rc);
338                 return EncodeResult::Error;
339             }
340 
341             input_buffer_.drop_front(available_in);
342 
343             available_out -= rc;
344             next_out += rc;
345 
346             encode_block.resize(encode_block_size - available_out);
347             output_buffer_.append(std::move(encode_block));
348         }
349 
350         if (finished_ && !lz4_finalized_) {
351             lz4_finalized_ = true;
352 
353             Block final_block(encode_block_size + 4);
354             size_t rc = LZ4F_compressEnd(encoder_.get(), final_block.data(), final_block.size(),
355                                          nullptr);
356             if (LZ4F_isError(rc)) {
357                 LOG(ERROR) << "LZ4F_compressEnd failed: " << LZ4F_getErrorName(rc);
358                 return EncodeResult::Error;
359             }
360 
361             final_block.resize(rc);
362             output_buffer_.append(std::move(final_block));
363         }
364 
365         if (OutputReady()) {
366             size_t len = std::min(output_block_size_, output_buffer_.size());
367             *output = output_buffer_.take_front(len).coalesce();
368         } else {
369             output->clear();
370         }
371 
372         if (lz4_finalized_ && output_buffer_.empty()) {
373             return EncodeResult::Done;
374         } else if (OutputReady()) {
375             return EncodeResult::MoreOutput;
376         }
377         return EncodeResult::NeedInput;
378     }
379 
380   private:
381     bool lz4_finalized_ = false;
382     std::unique_ptr<LZ4F_cctx, LZ4F_errorCode_t (*)(LZ4F_cctx*)> encoder_;
383     IOVector output_buffer_;
384 };
385 
386 struct ZstdDecoder final : public Decoder {
ZstdDecoderfinal387     explicit ZstdDecoder(std::span<char> output_buffer)
388         : Decoder(output_buffer), decoder_(ZSTD_createDStream(), ZSTD_freeDStream) {
389         if (!decoder_) {
390             LOG(FATAL) << "failed to initialize Zstd decompression context";
391         }
392     }
393 
Decodefinal394     DecodeResult Decode(std::span<char>* output) final {
395         ZSTD_inBuffer in;
396         in.src = input_buffer_.front_data();
397         in.size = input_buffer_.front_size();
398         in.pos = 0;
399 
400         ZSTD_outBuffer out;
401         out.dst = output_buffer_.data();
402         // The standard specifies size() as returning size_t, but our current version of
403         // libc++ returns a signed value instead.
404         out.size = static_cast<size_t>(output_buffer_.size());
405         out.pos = 0;
406 
407         size_t rc = ZSTD_decompressStream(decoder_.get(), &out, &in);
408         if (ZSTD_isError(rc)) {
409             LOG(ERROR) << "ZSTD_decompressStream failed: " << ZSTD_getErrorName(rc);
410             return DecodeResult::Error;
411         }
412 
413         input_buffer_.drop_front(in.pos);
414         if (rc == 0) {
415             if (!input_buffer_.empty()) {
416                 LOG(ERROR) << "Zstd stream hit end before reading all data";
417                 return DecodeResult::Error;
418             }
419             zstd_done_ = true;
420         }
421 
422         *output = std::span<char>(output_buffer_.data(), out.pos);
423 
424         if (finished_) {
425             return input_buffer_.empty() && zstd_done_ ? DecodeResult::Done
426                                                        : DecodeResult::MoreOutput;
427         }
428         return DecodeResult::NeedInput;
429     }
430 
431   private:
432     bool zstd_done_ = false;
433     std::unique_ptr<ZSTD_DStream, size_t (*)(ZSTD_DStream*)> decoder_;
434 };
435 
436 struct ZstdEncoder final : public Encoder {
ZstdEncoderfinal437     explicit ZstdEncoder(size_t output_block_size)
438         : Encoder(output_block_size), encoder_(ZSTD_createCStream(), ZSTD_freeCStream) {
439         if (!encoder_) {
440             LOG(FATAL) << "failed to initialize Zstd compression context";
441         }
442         ZSTD_CCtx_setParameter(encoder_.get(), ZSTD_c_compressionLevel, 1);
443     }
444 
Encodefinal445     EncodeResult Encode(Block* output) final {
446         ZSTD_inBuffer in;
447         in.src = input_buffer_.front_data();
448         in.size = input_buffer_.front_size();
449         in.pos = 0;
450 
451         output->resize(output_block_size_);
452 
453         ZSTD_outBuffer out;
454         out.dst = output->data();
455         out.size = static_cast<size_t>(output->size());
456         out.pos = 0;
457 
458         ZSTD_EndDirective end_directive = finished_ ? ZSTD_e_end : ZSTD_e_continue;
459         size_t rc = ZSTD_compressStream2(encoder_.get(), &out, &in, end_directive);
460         if (ZSTD_isError(rc)) {
461             LOG(ERROR) << "ZSTD_compressStream2 failed: " << ZSTD_getErrorName(rc);
462             return EncodeResult::Error;
463         }
464 
465         input_buffer_.drop_front(in.pos);
466         output->resize(out.pos);
467 
468         if (rc == 0) {
469             // Zstd finished flushing its data.
470             if (finished_) {
471                 if (!input_buffer_.empty()) {
472                     LOG(ERROR) << "ZSTD_compressStream2 finished early";
473                     return EncodeResult::Error;
474                 }
475                 return EncodeResult::Done;
476             } else {
477                 return input_buffer_.empty() ? EncodeResult::NeedInput : EncodeResult::MoreOutput;
478             }
479         } else {
480             return EncodeResult::MoreOutput;
481         }
482     }
483 
484   private:
485     std::unique_ptr<ZSTD_CStream, size_t (*)(ZSTD_CStream*)> encoder_;
486 };
487