1 #include <android-base/logging.h>
2 #include <android/native_window.h>
3 #include <benchmark/benchmark.h>
4 #include <binder/IPCThreadState.h>
5 #include <binder/IServiceManager.h>
6 #include <dvr/dvr_api.h>
7 #include <gui/BufferItem.h>
8 #include <gui/BufferItemConsumer.h>
9 #include <gui/Surface.h>
10 #include <private/dvr/epoll_file_descriptor.h>
11 #include <utils/Trace.h>
12 
13 #include <chrono>
14 #include <functional>
15 #include <iostream>
16 #include <thread>
17 #include <vector>
18 
19 #include <dlfcn.h>
20 #include <poll.h>
21 #include <sys/epoll.h>
22 #include <sys/wait.h>
23 
24 // Use ALWAYS at the tag level. Control is performed manually during command
25 // line processing.
26 #ifdef ATRACE_TAG
27 #undef ATRACE_TAG
28 #endif
29 #define ATRACE_TAG ATRACE_TAG_ALWAYS
30 
31 using namespace android;
32 using ::benchmark::State;
33 
34 static const String16 kBinderService = String16("bufferTransport");
35 static const uint32_t kBufferWidth = 100;
36 static const uint32_t kBufferHeight = 1;
37 static const uint32_t kBufferFormat = HAL_PIXEL_FORMAT_BLOB;
38 static const uint64_t kBufferUsage =
39     GRALLOC_USAGE_SW_READ_OFTEN | GRALLOC_USAGE_SW_WRITE_OFTEN;
40 static const uint32_t kBufferLayer = 1;
41 static const int kMaxAcquiredImages = 1;
42 static const int kQueueDepth = 2;  // We are double buffering for this test.
43 static const size_t kMaxQueueCounts = 128;
44 static const int kInvalidFence = -1;
45 
46 enum BufferTransportServiceCode {
47   CREATE_BUFFER_QUEUE = IBinder::FIRST_CALL_TRANSACTION,
48 };
49 
50 // A binder services that minics a compositor that consumes buffers. It provides
51 // one Binder interface to create a new Surface for buffer producer to write
52 // into; while itself will carry out no-op buffer consuming by acquiring then
53 // releasing the buffer immediately.
54 class BufferTransportService : public BBinder {
55  public:
56   BufferTransportService() = default;
57   ~BufferTransportService() = default;
58 
onTransact(uint32_t code,const Parcel & data,Parcel * reply,uint32_t flags=0)59   virtual status_t onTransact(uint32_t code, const Parcel& data, Parcel* reply,
60                               uint32_t flags = 0) {
61     (void)flags;
62     (void)data;
63     switch (code) {
64       case CREATE_BUFFER_QUEUE: {
65         auto new_queue = std::make_shared<BufferQueueHolder>(this);
66         reply->writeStrongBinder(
67             IGraphicBufferProducer::asBinder(new_queue->producer));
68         buffer_queues_.push_back(new_queue);
69         return OK;
70       }
71       default:
72         return UNKNOWN_TRANSACTION;
73     };
74   }
75 
76  private:
77   struct FrameListener : public ConsumerBase::FrameAvailableListener {
78    public:
FrameListenerBufferTransportService::FrameListener79     FrameListener(BufferTransportService* /*service*/,
80                   sp<BufferItemConsumer> buffer_item_consumer)
81         : buffer_item_consumer_(buffer_item_consumer) {}
82 
onFrameAvailableBufferTransportService::FrameListener83     void onFrameAvailable(const BufferItem& /*item*/) override {
84       BufferItem buffer;
85       status_t ret = 0;
86       {
87         ATRACE_NAME("AcquireBuffer");
88         ret = buffer_item_consumer_->acquireBuffer(&buffer, /*presentWhen=*/0,
89                                                    /*waitForFence=*/false);
90       }
91 
92       if (ret != OK) {
93         LOG(ERROR) << "Failed to acquire next buffer.";
94         return;
95       }
96 
97       {
98         ATRACE_NAME("ReleaseBuffer");
99         ret = buffer_item_consumer_->releaseBuffer(buffer);
100       }
101 
102       if (ret != OK) {
103         LOG(ERROR) << "Failed to release buffer.";
104         return;
105       }
106     }
107 
108    private:
109     sp<BufferItemConsumer> buffer_item_consumer_;
110   };
111 
112   struct BufferQueueHolder {
BufferQueueHolderBufferTransportService::BufferQueueHolder113     explicit BufferQueueHolder(BufferTransportService* service) {
114       BufferQueue::createBufferQueue(&producer, &consumer);
115 
116       sp<BufferItemConsumer> buffer_item_consumer =
117           new BufferItemConsumer(consumer, kBufferUsage, kMaxAcquiredImages,
118                                  /*controlledByApp=*/true);
119       buffer_item_consumer->setName(String8("BinderBufferTransport"));
120       frame_listener_ = new FrameListener(service, buffer_item_consumer);
121       buffer_item_consumer->setFrameAvailableListener(frame_listener_);
122     }
123 
124     sp<IGraphicBufferProducer> producer;
125     sp<IGraphicBufferConsumer> consumer;
126 
127    private:
128     sp<FrameListener> frame_listener_;
129   };
130 
131   std::vector<std::shared_ptr<BufferQueueHolder>> buffer_queues_;
132 };
133 
134 // A virtual interfaces that abstracts the common BufferQueue operations, so
135 // that the test suite can use the same test case to drive different types of
136 // transport backends.
137 class BufferTransport {
138  public:
~BufferTransport()139   virtual ~BufferTransport() {}
140 
141   virtual int Start() = 0;
142   virtual sp<Surface> CreateSurface() = 0;
143 };
144 
145 // Binder-based buffer transport backend.
146 //
147 // On Start() a new process will be swapned to run a Binder server that
148 // actually consumes the buffer.
149 // On CreateSurface() a new Binder BufferQueue will be created, which the
150 // service holds the concrete binder node of the IGraphicBufferProducer while
151 // sending the binder proxy to the client. In another word, the producer side
152 // operations are carried out process while the consumer side operations are
153 // carried out within the BufferTransportService's own process.
154 class BinderBufferTransport : public BufferTransport {
155  public:
BinderBufferTransport()156   BinderBufferTransport() {}
157 
Start()158   int Start() override {
159     sp<IServiceManager> sm = defaultServiceManager();
160     service_ = sm->getService(kBinderService);
161     if (service_ == nullptr) {
162       LOG(ERROR) << "Failed to get the benchmark service.";
163       return -EIO;
164     }
165 
166     LOG(INFO) << "Binder server is ready for client.";
167     return 0;
168   }
169 
CreateSurface()170   sp<Surface> CreateSurface() override {
171     Parcel data;
172     Parcel reply;
173     int error = service_->transact(CREATE_BUFFER_QUEUE, data, &reply);
174     if (error != OK) {
175       LOG(ERROR) << "Failed to get buffer queue over binder.";
176       return nullptr;
177     }
178 
179     sp<IBinder> binder;
180     error = reply.readNullableStrongBinder(&binder);
181     if (error != OK) {
182       LOG(ERROR) << "Failed to get IGraphicBufferProducer over binder.";
183       return nullptr;
184     }
185 
186     auto producer = interface_cast<IGraphicBufferProducer>(binder);
187     if (producer == nullptr) {
188       LOG(ERROR) << "Failed to get IGraphicBufferProducer over binder.";
189       return nullptr;
190     }
191 
192     sp<Surface> surface = new Surface(producer, /*controlledByApp=*/true);
193 
194     // Set buffer dimension.
195     ANativeWindow* window = static_cast<ANativeWindow*>(surface.get());
196     ANativeWindow_setBuffersGeometry(window, kBufferWidth, kBufferHeight,
197                                      kBufferFormat);
198 
199     return surface;
200   }
201 
202  private:
203   sp<IBinder> service_;
204 };
205 
206 class DvrApi {
207  public:
DvrApi()208   DvrApi() {
209     handle_ = dlopen("libdvr.google.so", RTLD_NOW | RTLD_LOCAL);
210     CHECK(handle_);
211 
212     auto dvr_get_api =
213         reinterpret_cast<decltype(&dvrGetApi)>(dlsym(handle_, "dvrGetApi"));
214     int ret = dvr_get_api(&api_, sizeof(api_), /*version=*/1);
215 
216     CHECK(ret == 0);
217   }
218 
~DvrApi()219   ~DvrApi() { dlclose(handle_); }
220 
Api()221   const DvrApi_v1& Api() { return api_; }
222 
223  private:
224   void* handle_ = nullptr;
225   DvrApi_v1 api_;
226 };
227 
228 // BufferHub/PDX-based buffer transport.
229 //
230 // On Start() a new thread will be swapned to run an epoll polling thread which
231 // minics the behavior of a compositor. Similar to Binder-based backend, the
232 // buffer available handler is also a no-op: Buffer gets acquired and released
233 // immediately.
234 // On CreateSurface() a pair of dvr::ProducerQueue and dvr::ConsumerQueue will
235 // be created. The epoll thread holds on the consumer queue and dequeues buffer
236 // from it; while the producer queue will be wrapped in a Surface and returned
237 // to test suite.
238 class BufferHubTransport : public BufferTransport {
239  public:
~BufferHubTransport()240   virtual ~BufferHubTransport() {
241     stopped_.store(true);
242     if (reader_thread_.joinable()) {
243       reader_thread_.join();
244     }
245   }
246 
Start()247   int Start() override {
248     int ret = epoll_fd_.Create();
249     if (ret < 0) {
250       LOG(ERROR) << "Failed to create epoll fd: %s", strerror(-ret);
251       return -1;
252     }
253 
254     // Create the reader thread.
255     reader_thread_ = std::thread([this]() {
256       int ret = dvr_.Api().PerformanceSetSchedulerPolicy(0, "graphics");
257       if (ret < 0) {
258         LOG(ERROR) << "Failed to set scheduler policy, ret=" << ret;
259         return;
260       }
261 
262       stopped_.store(false);
263       LOG(INFO) << "Reader Thread Running...";
264 
265       while (!stopped_.load()) {
266         std::array<epoll_event, kMaxQueueCounts> events;
267 
268         // Don't sleep forever so that we will have a chance to wake up.
269         const int ret = epoll_fd_.Wait(events.data(), events.size(),
270                                        /*timeout=*/100);
271         if (ret < 0) {
272           LOG(ERROR) << "Error polling consumer queues.";
273           continue;
274         }
275         if (ret == 0) {
276           continue;
277         }
278 
279         const int num_events = ret;
280         for (int i = 0; i < num_events; i++) {
281           uint32_t index = events[i].data.u32;
282           dvr_.Api().ReadBufferQueueHandleEvents(
283               buffer_queues_[index]->GetReadQueue());
284         }
285       }
286 
287       LOG(INFO) << "Reader Thread Exiting...";
288     });
289 
290     return 0;
291   }
292 
CreateSurface()293   sp<Surface> CreateSurface() override {
294     auto new_queue = std::make_shared<BufferQueueHolder>();
295     if (!new_queue->IsReady()) {
296       LOG(ERROR) << "Failed to create BufferHub-based BufferQueue.";
297       return nullptr;
298     }
299 
300     // Set buffer dimension.
301     ANativeWindow_setBuffersGeometry(new_queue->GetSurface(), kBufferWidth,
302                                      kBufferHeight, kBufferFormat);
303 
304     // Use the next position as buffer_queue index.
305     uint32_t index = buffer_queues_.size();
306     epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u32 = index}};
307     int queue_fd =
308         dvr_.Api().ReadBufferQueueGetEventFd(new_queue->GetReadQueue());
309     const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, queue_fd, &event);
310     if (ret < 0) {
311       LOG(ERROR) << "Failed to track consumer queue: " << strerror(-ret)
312                  << ", consumer queue fd: " << queue_fd;
313       return nullptr;
314     }
315 
316     buffer_queues_.push_back(new_queue);
317     ANativeWindow_acquire(new_queue->GetSurface());
318     return static_cast<Surface*>(new_queue->GetSurface());
319   }
320 
321  private:
322   struct BufferQueueHolder {
BufferQueueHolderBufferHubTransport::BufferQueueHolder323     BufferQueueHolder() {
324       int ret = 0;
325       ret = dvr_.Api().WriteBufferQueueCreate(
326           kBufferWidth, kBufferHeight, kBufferFormat, kBufferLayer,
327           kBufferUsage, 0, sizeof(DvrNativeBufferMetadata), &write_queue_);
328       if (ret < 0) {
329         LOG(ERROR) << "Failed to create write buffer queue, ret=" << ret;
330         return;
331       }
332 
333       ret = dvr_.Api().WriteBufferQueueCreateReadQueue(write_queue_,
334                                                        &read_queue_);
335       if (ret < 0) {
336         LOG(ERROR) << "Failed to create read buffer queue, ret=" << ret;
337         return;
338       }
339 
340       ret = dvr_.Api().ReadBufferQueueSetBufferAvailableCallback(
341           read_queue_, BufferAvailableCallback, this);
342       if (ret < 0) {
343         LOG(ERROR) << "Failed to create buffer available callback, ret=" << ret;
344         return;
345       }
346 
347       ret =
348           dvr_.Api().WriteBufferQueueGetANativeWindow(write_queue_, &surface_);
349       if (ret < 0) {
350         LOG(ERROR) << "Failed to create surface, ret=" << ret;
351         return;
352       }
353     }
354 
BufferAvailableCallbackBufferHubTransport::BufferQueueHolder355     static void BufferAvailableCallback(void* context) {
356       BufferQueueHolder* thiz = static_cast<BufferQueueHolder*>(context);
357       thiz->HandleBufferAvailable();
358     }
359 
GetReadQueueBufferHubTransport::BufferQueueHolder360     DvrReadBufferQueue* GetReadQueue() { return read_queue_; }
361 
GetSurfaceBufferHubTransport::BufferQueueHolder362     ANativeWindow* GetSurface() { return surface_; }
363 
IsReadyBufferHubTransport::BufferQueueHolder364     bool IsReady() {
365       return write_queue_ != nullptr && read_queue_ != nullptr &&
366              surface_ != nullptr;
367     }
368 
HandleBufferAvailableBufferHubTransport::BufferQueueHolder369     void HandleBufferAvailable() {
370       int ret = 0;
371       DvrNativeBufferMetadata meta;
372       DvrReadBuffer* buffer = nullptr;
373       DvrNativeBufferMetadata metadata;
374       int acquire_fence = kInvalidFence;
375 
376       {
377         ATRACE_NAME("AcquireBuffer");
378         ret = dvr_.Api().ReadBufferQueueAcquireBuffer(
379             read_queue_, 0, &buffer, &metadata, &acquire_fence);
380       }
381       if (ret < 0) {
382         LOG(ERROR) << "Failed to acquire consumer buffer, error: " << ret;
383         return;
384       }
385 
386       if (buffer != nullptr) {
387         ATRACE_NAME("ReleaseBuffer");
388         ret = dvr_.Api().ReadBufferQueueReleaseBuffer(read_queue_, buffer,
389                                                       &meta, kInvalidFence);
390       }
391       if (ret < 0) {
392         LOG(ERROR) << "Failed to release consumer buffer, error: " << ret;
393       }
394     }
395 
396    private:
397     DvrWriteBufferQueue* write_queue_ = nullptr;
398     DvrReadBufferQueue* read_queue_ = nullptr;
399     ANativeWindow* surface_ = nullptr;
400   };
401 
402   static DvrApi dvr_;
403   std::atomic<bool> stopped_;
404   std::thread reader_thread_;
405 
406   dvr::EpollFileDescriptor epoll_fd_;
407   std::vector<std::shared_ptr<BufferQueueHolder>> buffer_queues_;
408 };
409 
410 DvrApi BufferHubTransport::dvr_ = {};
411 
412 enum TransportType {
413   kBinderBufferTransport,
414   kBufferHubTransport,
415 };
416 
417 // Main test suite, which supports two transport backend: 1) BinderBufferQueue,
418 // 2) BufferHubQueue. The test case drives the producer end of both transport
419 // backend by queuing buffers into the buffer queue by using ANativeWindow API.
420 class BufferTransportBenchmark : public ::benchmark::Fixture {
421  public:
SetUp(State & state)422   void SetUp(State& state) override {
423     if (state.thread_index == 0) {
424       const int transport = state.range(0);
425       switch (transport) {
426         case kBinderBufferTransport:
427           transport_.reset(new BinderBufferTransport);
428           break;
429         case kBufferHubTransport:
430           transport_.reset(new BufferHubTransport);
431           break;
432         default:
433           CHECK(false) << "Unknown test case.";
434           break;
435       }
436 
437       CHECK(transport_);
438       const int ret = transport_->Start();
439       CHECK_EQ(ret, 0);
440 
441       LOG(INFO) << "Transport backend running, transport=" << transport << ".";
442 
443       // Create surfaces for each thread.
444       surfaces_.resize(state.threads);
445       for (int i = 0; i < state.threads; i++) {
446         // Common setup every thread needs.
447         surfaces_[i] = transport_->CreateSurface();
448         CHECK(surfaces_[i]);
449 
450         LOG(INFO) << "Surface initialized on thread " << i << ".";
451       }
452     }
453   }
454 
TearDown(State & state)455   void TearDown(State& state) override {
456     if (state.thread_index == 0) {
457       surfaces_.clear();
458       transport_.reset();
459       LOG(INFO) << "Tear down benchmark.";
460     }
461   }
462 
463  protected:
464   std::unique_ptr<BufferTransport> transport_;
465   std::vector<sp<Surface>> surfaces_;
466 };
467 
BENCHMARK_DEFINE_F(BufferTransportBenchmark,Producers)468 BENCHMARK_DEFINE_F(BufferTransportBenchmark, Producers)(State& state) {
469   ANativeWindow* window = nullptr;
470   ANativeWindow_Buffer buffer;
471   int32_t error = 0;
472   double total_gain_buffer_us = 0;
473   double total_post_buffer_us = 0;
474   int iterations = 0;
475 
476   while (state.KeepRunning()) {
477     if (window == nullptr) {
478       CHECK(surfaces_[state.thread_index]);
479       window = static_cast<ANativeWindow*>(surfaces_[state.thread_index].get());
480 
481       // Lock buffers a couple time from the queue, so that we have the buffer
482       // allocated.
483       for (int i = 0; i < kQueueDepth; i++) {
484         error = ANativeWindow_lock(window, &buffer,
485                                    /*inOutDirtyBounds=*/nullptr);
486         CHECK_EQ(error, 0);
487         error = ANativeWindow_unlockAndPost(window);
488         CHECK_EQ(error, 0);
489       }
490     }
491 
492     {
493       ATRACE_NAME("GainBuffer");
494       auto t1 = std::chrono::high_resolution_clock::now();
495       error = ANativeWindow_lock(window, &buffer,
496                                  /*inOutDirtyBounds=*/nullptr);
497       auto t2 = std::chrono::high_resolution_clock::now();
498       std::chrono::duration<double, std::micro> delta_us = t2 - t1;
499       total_gain_buffer_us += delta_us.count();
500     }
501     CHECK_EQ(error, 0);
502 
503     {
504       ATRACE_NAME("PostBuffer");
505       auto t1 = std::chrono::high_resolution_clock::now();
506       error = ANativeWindow_unlockAndPost(window);
507       auto t2 = std::chrono::high_resolution_clock::now();
508       std::chrono::duration<double, std::micro> delta_us = t2 - t1;
509       total_post_buffer_us += delta_us.count();
510     }
511     CHECK_EQ(error, 0);
512 
513     iterations++;
514   }
515 
516   state.counters["gain_buffer_us"] = ::benchmark::Counter(
517       total_gain_buffer_us / iterations, ::benchmark::Counter::kAvgThreads);
518   state.counters["post_buffer_us"] = ::benchmark::Counter(
519       total_post_buffer_us / iterations, ::benchmark::Counter::kAvgThreads);
520   state.counters["producer_us"] = ::benchmark::Counter(
521       (total_gain_buffer_us + total_post_buffer_us) / iterations,
522       ::benchmark::Counter::kAvgThreads);
523 }
524 
525 BENCHMARK_REGISTER_F(BufferTransportBenchmark, Producers)
526     ->Unit(::benchmark::kMicrosecond)
527     ->Ranges({{kBinderBufferTransport, kBufferHubTransport}})
528     ->ThreadRange(1, 32);
529 
runBinderServer()530 static void runBinderServer() {
531   ProcessState::self()->setThreadPoolMaxThreadCount(0);
532   ProcessState::self()->startThreadPool();
533 
534   sp<IServiceManager> sm = defaultServiceManager();
535   sp<BufferTransportService> service = new BufferTransportService;
536   sm->addService(kBinderService, service, false);
537 
538   LOG(INFO) << "Binder server running...";
539 
540   while (true) {
541     int stat, retval;
542     retval = wait(&stat);
543     if (retval == -1 && errno == ECHILD) {
544       break;
545     }
546   }
547 
548   LOG(INFO) << "Service Exiting...";
549 }
550 
551 // To run binder-based benchmark, use:
552 // adb shell buffer_transport_benchmark \
553 //   --benchmark_filter="BufferTransportBenchmark/ContinuousLoad/0/"
554 //
555 // To run bufferhub-based benchmark, use:
556 // adb shell buffer_transport_benchmark \
557 //   --benchmark_filter="BufferTransportBenchmark/ContinuousLoad/1/"
main(int argc,char ** argv)558 int main(int argc, char** argv) {
559   bool tracing_enabled = false;
560 
561   // Parse arguments in addition to "--benchmark_filter" paramters.
562   for (int i = 1; i < argc; i++) {
563     if (std::string(argv[i]) == "--help") {
564       std::cout << "Usage: binderThroughputTest [OPTIONS]" << std::endl;
565       std::cout << "\t--trace: Enable systrace logging." << std::endl;
566       return 0;
567     }
568     if (std::string(argv[i]) == "--trace") {
569       tracing_enabled = true;
570       continue;
571     }
572   }
573 
574   // Setup ATRACE/systrace based on command line.
575   atrace_setup();
576   atrace_set_tracing_enabled(tracing_enabled);
577 
578   pid_t pid = fork();
579   if (pid == 0) {
580     // Child, i.e. the client side.
581     ProcessState::self()->startThreadPool();
582 
583     ::benchmark::Initialize(&argc, argv);
584     ::benchmark::RunSpecifiedBenchmarks();
585   } else {
586     LOG(INFO) << "Benchmark process pid: " << pid;
587     runBinderServer();
588   }
589 }
590