#include "libbroadcastring/broadcast_ring.h" #include #include #include // NOLINT #include #include namespace android { namespace dvr { namespace { template struct alignas(8) Aligned { char v[N]; }; template struct alignas(8) Sized { Sized() { Clear(); } explicit Sized(char c) { Fill(c); } char v[sizeof(Aligned)]; void Clear() { memset(v, 0, sizeof(v)); } void Fill(char c) { memset(v, c, sizeof(v)); } static Sized Pattern(uint8_t c) { Sized sized; for (size_t i = 0; i < sizeof(v); ++i) { sized.v[i] = static_cast(c + i); } return sized; } bool operator==(const Sized& right) const { static_assert(sizeof(*this) == sizeof(v), "Size mismatch"); return !memcmp(v, right.v, sizeof(v)); } template SmallerSized Truncate() const { SmallerSized val; static_assert(sizeof(val.v) <= sizeof(v), "Cannot truncate to larger size"); memcpy(val.v, v, sizeof(val.v)); return val; } }; char FillChar(int val) { return static_cast(val); } struct FakeMmap { explicit FakeMmap(size_t size) : size(size), data(new char[size]) {} size_t size; std::unique_ptr data; void* mmap() { return static_cast(data.get()); } }; template FakeMmap CreateRing(Ring* ring, uint32_t count) { FakeMmap mmap(Ring::MemorySize(count)); *ring = Ring::Create(mmap.mmap(), mmap.size, count); return mmap; } template struct Traits { using Record = RecordType; static constexpr bool kUseStaticRecordSize = StaticSize; static constexpr uint32_t kStaticRecordCount = StaticCount; static constexpr uint32_t kMaxReservedRecords = MaxReserved; static constexpr uint32_t kMinAvailableRecords = MinAvailable; static constexpr uint32_t kMinRecordCount = MaxReserved + MinAvailable; }; template struct TraitsDynamic : public Traits { using Ring = BroadcastRing; static uint32_t MinCount() { return MaxReserved + MinAvailable; } }; template struct TraitsStatic : public Traits { using Ring = BroadcastRing; static uint32_t MinCount() { return StaticCount; } }; using Dynamic_8_NxM = TraitsDynamic>; using Dynamic_16_NxM = TraitsDynamic>; using Dynamic_32_NxM = TraitsDynamic>; using Dynamic_32_32xM = TraitsDynamic, true>; using Dynamic_16_NxM_1plus0 = TraitsDynamic, false, 1, 0>; using Dynamic_16_NxM_1plus1 = TraitsDynamic, false, 1, 1>; using Dynamic_16_NxM_5plus11 = TraitsDynamic, false, 5, 11>; using Dynamic_256_NxM_1plus0 = TraitsDynamic, false, 1, 0>; using Static_8_8x1 = TraitsStatic, 1>; using Static_8_8x16 = TraitsStatic, 16>; using Static_16_16x8 = TraitsStatic, 8>; using Static_16_16x16 = TraitsStatic, 16>; using Static_16_16x32 = TraitsStatic, 32>; using Static_32_Nx8 = TraitsStatic, 8, false>; using TraitsList = ::testing::Types; } // namespace template class BroadcastRingTest : public ::testing::Test {}; TYPED_TEST_CASE(BroadcastRingTest, TraitsList); TYPED_TEST(BroadcastRingTest, Geometry) { using Record = typename TypeParam::Record; using Ring = typename TypeParam::Ring; Ring ring; auto mmap = CreateRing(&ring, Ring::Traits::MinCount()); EXPECT_EQ(Ring::Traits::MinCount(), ring.record_count()); EXPECT_EQ(sizeof(Record), ring.record_size()); } TYPED_TEST(BroadcastRingTest, PutGet) { using Record = typename TypeParam::Record; using Ring = typename TypeParam::Ring; Ring ring; auto mmap = CreateRing(&ring, Ring::Traits::MinCount()); const uint32_t oldest_sequence_at_start = ring.GetOldestSequence(); const uint32_t next_sequence_at_start = ring.GetNextSequence(); { uint32_t sequence = oldest_sequence_at_start; Record record; EXPECT_FALSE(ring.Get(&sequence, &record)); EXPECT_EQ(oldest_sequence_at_start, sequence); EXPECT_EQ(Record(), record); } const Record original_record(0x1a); ring.Put(original_record); { uint32_t sequence = next_sequence_at_start; Record record; EXPECT_TRUE(ring.Get(&sequence, &record)); EXPECT_EQ(next_sequence_at_start, sequence); EXPECT_EQ(original_record, record); } { uint32_t sequence = next_sequence_at_start + 1; Record record; EXPECT_FALSE(ring.Get(&sequence, &record)); EXPECT_EQ(next_sequence_at_start + 1, sequence); EXPECT_EQ(Record(), record); } } TYPED_TEST(BroadcastRingTest, FillOnce) { using Record = typename TypeParam::Record; using Ring = typename TypeParam::Ring; Ring ring; auto mmap = CreateRing(&ring, Ring::Traits::MinCount()); const uint32_t next_sequence_at_start = ring.GetNextSequence(); for (uint32_t i = 0; i < ring.record_count(); ++i) ring.Put(Record(FillChar(i))); for (uint32_t i = 0; i < ring.record_count(); ++i) { const uint32_t expected_sequence = next_sequence_at_start + i; const Record expected_record(FillChar(i)); { uint32_t sequence = ring.GetOldestSequence() + i; Record record; EXPECT_TRUE(ring.Get(&sequence, &record)); EXPECT_EQ(expected_sequence, sequence); EXPECT_EQ(expected_record, record); } } { uint32_t sequence = ring.GetOldestSequence() + ring.record_count(); Record record; EXPECT_FALSE(ring.Get(&sequence, &record)); } } TYPED_TEST(BroadcastRingTest, FillTwice) { using Record = typename TypeParam::Record; using Ring = typename TypeParam::Ring; Ring ring; auto mmap = CreateRing(&ring, Ring::Traits::MinCount()); const uint32_t next_sequence_at_start = ring.GetNextSequence(); for (uint32_t i = 0; i < 2 * ring.record_count(); ++i) { const Record newest_record(FillChar(i)); ring.Put(newest_record); const uint32_t newest_sequence = next_sequence_at_start + i; const uint32_t records_available = std::min(i + 1, ring.record_count()); const uint32_t oldest_sequence = newest_sequence - records_available + 1; EXPECT_EQ(newest_sequence, ring.GetNewestSequence()); EXPECT_EQ(oldest_sequence, ring.GetOldestSequence()); EXPECT_EQ(newest_sequence + 1, ring.GetNextSequence()); for (uint32_t j = 0; j < records_available; ++j) { const uint32_t sequence_jth_newest = newest_sequence - j; const Record record_jth_newest(FillChar(i - j)); { uint32_t sequence = sequence_jth_newest; Record record; EXPECT_TRUE(ring.Get(&sequence, &record)); EXPECT_EQ(sequence_jth_newest, sequence); EXPECT_EQ(record_jth_newest, record); } { uint32_t sequence = sequence_jth_newest; Record record; EXPECT_TRUE(ring.GetNewest(&sequence, &record)); EXPECT_EQ(newest_sequence, sequence); EXPECT_EQ(newest_record, record); } } const Record oldest_record( FillChar(i + (oldest_sequence - newest_sequence))); const uint32_t sequence_0th_overwritten = oldest_sequence - 1; const uint32_t sequence_0th_future = newest_sequence + 1; const uint32_t sequence_1st_future = newest_sequence + 2; { uint32_t sequence = sequence_0th_overwritten; Record record; EXPECT_TRUE(ring.Get(&sequence, &record)); EXPECT_EQ(oldest_sequence, sequence); EXPECT_EQ(oldest_record, record); } { uint32_t sequence = sequence_0th_overwritten; Record record; EXPECT_TRUE(ring.GetNewest(&sequence, &record)); EXPECT_EQ(newest_sequence, sequence); EXPECT_EQ(newest_record, record); } { uint32_t sequence = sequence_0th_future; Record record; EXPECT_FALSE(ring.Get(&sequence, &record)); EXPECT_EQ(sequence_0th_future, sequence); EXPECT_EQ(Record(), record); } { uint32_t sequence = sequence_0th_future; Record record; EXPECT_FALSE(ring.GetNewest(&sequence, &record)); EXPECT_EQ(sequence_0th_future, sequence); EXPECT_EQ(Record(), record); } { uint32_t sequence = sequence_1st_future; Record record; EXPECT_TRUE(ring.Get(&sequence, &record)); EXPECT_EQ(oldest_sequence, sequence); EXPECT_EQ(oldest_record, record); } { uint32_t sequence = sequence_1st_future; Record record; EXPECT_TRUE(ring.GetNewest(&sequence, &record)); EXPECT_EQ(newest_sequence, sequence); EXPECT_EQ(newest_record, record); } } } TYPED_TEST(BroadcastRingTest, Import) { using Record = typename TypeParam::Record; using Ring = typename TypeParam::Ring; Ring ring; auto mmap = CreateRing(&ring, Ring::Traits::MinCount()); const uint32_t sequence_0 = ring.GetNextSequence(); const uint32_t sequence_1 = ring.GetNextSequence() + 1; const Record record_0 = Record::Pattern(0x00); const Record record_1 = Record::Pattern(0x80); ring.Put(record_0); ring.Put(record_1); { Ring imported_ring; bool import_ok; std::tie(imported_ring, import_ok) = Ring::Import(mmap.mmap(), mmap.size); EXPECT_TRUE(import_ok); EXPECT_EQ(ring.record_size(), imported_ring.record_size()); EXPECT_EQ(ring.record_count(), imported_ring.record_count()); if (ring.record_count() != 1) { uint32_t sequence = sequence_0; Record imported_record; EXPECT_TRUE(imported_ring.Get(&sequence, &imported_record)); EXPECT_EQ(sequence_0, sequence); EXPECT_EQ(record_0, imported_record); } { uint32_t sequence = sequence_1; Record imported_record; EXPECT_TRUE(imported_ring.Get(&sequence, &imported_record)); EXPECT_EQ(sequence_1, sequence); EXPECT_EQ(record_1, imported_record); } } } TEST(BroadcastRingTest, ShouldFailImportIfStaticSizeMismatch) { using OriginalRing = typename Static_16_16x16::Ring; using RecordSizeMismatchRing = typename Static_8_8x16::Ring; using RecordCountMismatchRing = typename Static_16_16x8::Ring; OriginalRing original_ring; auto mmap = CreateRing(&original_ring, OriginalRing::Traits::MinCount()); { using ImportedRing = RecordSizeMismatchRing; ImportedRing imported_ring; bool import_ok; std::tie(imported_ring, import_ok) = ImportedRing::Import(mmap.mmap(), mmap.size); EXPECT_FALSE(import_ok); auto mmap_imported = CreateRing(&imported_ring, ImportedRing::Traits::MinCount()); EXPECT_NE(original_ring.record_size(), imported_ring.record_size()); EXPECT_EQ(original_ring.record_count(), imported_ring.record_count()); } { using ImportedRing = RecordCountMismatchRing; ImportedRing imported_ring; bool import_ok; std::tie(imported_ring, import_ok) = ImportedRing::Import(mmap.mmap(), mmap.size); EXPECT_FALSE(import_ok); auto mmap_imported = CreateRing(&imported_ring, ImportedRing::Traits::MinCount()); EXPECT_EQ(original_ring.record_size(), imported_ring.record_size()); EXPECT_NE(original_ring.record_count(), imported_ring.record_count()); } } TEST(BroadcastRingTest, ShouldFailImportIfDynamicSizeGrows) { using OriginalRing = typename Dynamic_8_NxM::Ring; using RecordSizeGrowsRing = typename Dynamic_16_NxM::Ring; OriginalRing original_ring; auto mmap = CreateRing(&original_ring, OriginalRing::Traits::MinCount()); { using ImportedRing = RecordSizeGrowsRing; ImportedRing imported_ring; bool import_ok; std::tie(imported_ring, import_ok) = ImportedRing::Import(mmap.mmap(), mmap.size); EXPECT_FALSE(import_ok); auto mmap_imported = CreateRing(&imported_ring, ImportedRing::Traits::MinCount()); EXPECT_LT(original_ring.record_size(), imported_ring.record_size()); EXPECT_EQ(original_ring.record_count(), imported_ring.record_count()); } } TEST(BroadcastRingTest, ShouldFailImportIfCountTooSmall) { using OriginalRing = typename Dynamic_16_NxM_1plus0::Ring; using MinCountRing = typename Dynamic_16_NxM_1plus1::Ring; OriginalRing original_ring; auto mmap = CreateRing(&original_ring, OriginalRing::Traits::MinCount()); { using ImportedRing = MinCountRing; ImportedRing imported_ring; bool import_ok; std::tie(imported_ring, import_ok) = ImportedRing::Import(mmap.mmap(), mmap.size); EXPECT_FALSE(import_ok); auto mmap_imported = CreateRing(&imported_ring, ImportedRing::Traits::MinCount()); EXPECT_EQ(original_ring.record_size(), imported_ring.record_size()); EXPECT_LT(original_ring.record_count(), imported_ring.record_count()); } } TEST(BroadcastRingTest, ShouldFailImportIfMmapTooSmall) { using OriginalRing = typename Dynamic_16_NxM::Ring; OriginalRing original_ring; auto mmap = CreateRing(&original_ring, OriginalRing::Traits::MinCount()); { using ImportedRing = OriginalRing; ImportedRing imported_ring; bool import_ok; const size_t kMinSize = ImportedRing::MemorySize(original_ring.record_count()); std::tie(imported_ring, import_ok) = ImportedRing::Import(mmap.mmap(), 0); EXPECT_FALSE(import_ok); std::tie(imported_ring, import_ok) = ImportedRing::Import(mmap.mmap(), kMinSize - 1); EXPECT_FALSE(import_ok); std::tie(imported_ring, import_ok) = ImportedRing::Import(mmap.mmap(), kMinSize); EXPECT_TRUE(import_ok); EXPECT_EQ(original_ring.record_size(), imported_ring.record_size()); EXPECT_EQ(original_ring.record_count(), imported_ring.record_count()); } } TEST(BroadcastRingTest, ShouldImportIfDynamicSizeShrinks) { using OriginalRing = typename Dynamic_16_NxM::Ring; using RecordSizeShrinksRing = typename Dynamic_8_NxM::Ring; OriginalRing original_ring; auto mmap = CreateRing(&original_ring, OriginalRing::Traits::MinCount()); using OriginalRecord = typename OriginalRing::Record; const uint32_t original_sequence_0 = original_ring.GetNextSequence(); const uint32_t original_sequence_1 = original_ring.GetNextSequence() + 1; const OriginalRecord original_record_0 = OriginalRecord::Pattern(0x00); const OriginalRecord original_record_1 = OriginalRecord::Pattern(0x80); original_ring.Put(original_record_0); original_ring.Put(original_record_1); { using ImportedRing = RecordSizeShrinksRing; using ImportedRecord = typename ImportedRing::Record; ImportedRing imported_ring; bool import_ok; std::tie(imported_ring, import_ok) = ImportedRing::Import(mmap.mmap(), mmap.size); EXPECT_TRUE(import_ok); EXPECT_EQ(original_ring.record_size(), imported_ring.record_size()); EXPECT_EQ(original_ring.record_count(), imported_ring.record_count()); EXPECT_GT(sizeof(OriginalRecord), sizeof(ImportedRecord)); { uint32_t sequence = original_sequence_0; ImportedRecord shrunk_record; EXPECT_TRUE(imported_ring.Get(&sequence, &shrunk_record)); EXPECT_EQ(original_sequence_0, sequence); EXPECT_EQ(original_record_0.Truncate(), shrunk_record); } { uint32_t sequence = original_sequence_1; ImportedRecord shrunk_record; EXPECT_TRUE(imported_ring.Get(&sequence, &shrunk_record)); EXPECT_EQ(original_sequence_1, sequence); EXPECT_EQ(original_record_1.Truncate(), shrunk_record); } } } TEST(BroadcastRingTest, ShouldImportIfCompatibleDynamicToStatic) { using OriginalRing = typename Dynamic_16_NxM::Ring; using ImportedRing = typename Static_16_16x16::Ring; using OriginalRecord = typename OriginalRing::Record; using ImportedRecord = typename ImportedRing::Record; using StaticRing = ImportedRing; OriginalRing original_ring; auto mmap = CreateRing(&original_ring, StaticRing::Traits::MinCount()); const uint32_t original_sequence_0 = original_ring.GetNextSequence(); const uint32_t original_sequence_1 = original_ring.GetNextSequence() + 1; const OriginalRecord original_record_0 = OriginalRecord::Pattern(0x00); const OriginalRecord original_record_1 = OriginalRecord::Pattern(0x80); original_ring.Put(original_record_0); original_ring.Put(original_record_1); { ImportedRing imported_ring; bool import_ok; std::tie(imported_ring, import_ok) = ImportedRing::Import(mmap.mmap(), mmap.size); EXPECT_TRUE(import_ok); EXPECT_EQ(original_ring.record_size(), imported_ring.record_size()); EXPECT_EQ(original_ring.record_count(), imported_ring.record_count()); { uint32_t sequence = original_sequence_0; ImportedRecord imported_record; EXPECT_TRUE(imported_ring.Get(&sequence, &imported_record)); EXPECT_EQ(original_sequence_0, sequence); EXPECT_EQ(original_record_0, imported_record); } { uint32_t sequence = original_sequence_1; ImportedRecord imported_record; EXPECT_TRUE(imported_ring.Get(&sequence, &imported_record)); EXPECT_EQ(original_sequence_1, sequence); EXPECT_EQ(original_record_1, imported_record); } } } TEST(BroadcastRingTest, ShouldImportIfCompatibleStaticToDynamic) { using OriginalRing = typename Static_16_16x16::Ring; using ImportedRing = typename Dynamic_16_NxM::Ring; using OriginalRecord = typename OriginalRing::Record; using ImportedRecord = typename ImportedRing::Record; using StaticRing = OriginalRing; OriginalRing original_ring; auto mmap = CreateRing(&original_ring, StaticRing::Traits::MinCount()); const uint32_t original_sequence_0 = original_ring.GetNextSequence(); const uint32_t original_sequence_1 = original_ring.GetNextSequence() + 1; const OriginalRecord original_record_0 = OriginalRecord::Pattern(0x00); const OriginalRecord original_record_1 = OriginalRecord::Pattern(0x80); original_ring.Put(original_record_0); original_ring.Put(original_record_1); { ImportedRing imported_ring; bool import_ok; std::tie(imported_ring, import_ok) = ImportedRing::Import(mmap.mmap(), mmap.size); EXPECT_TRUE(import_ok); EXPECT_EQ(original_ring.record_size(), imported_ring.record_size()); EXPECT_EQ(original_ring.record_count(), imported_ring.record_count()); { uint32_t sequence = original_sequence_0; ImportedRecord imported_record; EXPECT_TRUE(imported_ring.Get(&sequence, &imported_record)); EXPECT_EQ(original_sequence_0, sequence); EXPECT_EQ(original_record_0, imported_record); } { uint32_t sequence = original_sequence_1; ImportedRecord imported_record; EXPECT_TRUE(imported_ring.Get(&sequence, &imported_record)); EXPECT_EQ(original_sequence_1, sequence); EXPECT_EQ(original_record_1, imported_record); } } } TEST(BroadcastRingTest, ShouldImportIfReadonlyMmap) { using Ring = Dynamic_32_NxM::Ring; using Record = Ring::Record; uint32_t record_count = Ring::Traits::MinCount(); size_t ring_size = Ring::MemorySize(record_count); size_t page_size = sysconf(_SC_PAGESIZE); size_t mmap_size = (ring_size + (page_size - 1)) & ~(page_size - 1); ASSERT_GE(mmap_size, ring_size); void* mmap_base = mmap(nullptr, mmap_size, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); ASSERT_NE(MAP_FAILED, mmap_base); Ring ring = Ring::Create(mmap_base, mmap_size, record_count); for (uint32_t i = 0; i < record_count; ++i) ring.Put(Record(FillChar(i))); ASSERT_EQ(0, mprotect(mmap_base, mmap_size, PROT_READ)); { Ring imported_ring; bool import_ok; std::tie(imported_ring, import_ok) = Ring::Import(mmap_base, mmap_size); EXPECT_TRUE(import_ok); EXPECT_EQ(ring.record_size(), imported_ring.record_size()); EXPECT_EQ(ring.record_count(), imported_ring.record_count()); uint32_t oldest_sequence = imported_ring.GetOldestSequence(); for (uint32_t i = 0; i < record_count; ++i) { uint32_t sequence = oldest_sequence + i; Record record; EXPECT_TRUE(imported_ring.Get(&sequence, &record)); EXPECT_EQ(Record(FillChar(i)), record); } } ASSERT_EQ(0, munmap(mmap_base, mmap_size)); } TEST(BroadcastRingTest, ShouldDieIfPutReadonlyMmap) { using Ring = Dynamic_32_NxM::Ring; using Record = Ring::Record; uint32_t record_count = Ring::Traits::MinCount(); size_t ring_size = Ring::MemorySize(record_count); size_t page_size = sysconf(_SC_PAGESIZE); size_t mmap_size = (ring_size + (page_size - 1)) & ~(page_size - 1); ASSERT_GE(mmap_size, ring_size); void* mmap_base = mmap(nullptr, mmap_size, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); ASSERT_NE(MAP_FAILED, mmap_base); Ring ring = Ring::Create(mmap_base, mmap_size, record_count); for (uint32_t i = 0; i < record_count; ++i) ring.Put(Record(FillChar(i))); ASSERT_EQ(0, mprotect(mmap_base, mmap_size, PROT_READ)); EXPECT_DEATH_IF_SUPPORTED({ ring.Put(Record(7)); }, ""); ASSERT_EQ(0, munmap(mmap_base, mmap_size)); } TEST(BroadcastRingTest, ShouldDieIfCreationMmapTooSmall) { using Ring = Dynamic_32_NxM::Ring; using Record = Ring::Record; uint32_t record_count = Ring::Traits::MinCount(); size_t ring_size = Ring::MemorySize(record_count); FakeMmap mmap(ring_size); EXPECT_DEATH_IF_SUPPORTED({ Ring ring = Ring::Create(mmap.mmap(), ring_size - 1, record_count); }, ""); Ring ring = Ring::Create(mmap.mmap(), ring_size, record_count); ring.Put(Record(3)); { uint32_t sequence = ring.GetNewestSequence(); Record record; EXPECT_TRUE(ring.Get(&sequence, &record)); EXPECT_EQ(Record(3), record); } } TEST(BroadcastRingTest, ShouldDieIfCreationMmapMisaligned) { using Ring = Static_8_8x1::Ring; using Record = Ring::Record; constexpr int kAlign = Ring::mmap_alignment(); constexpr int kMisalign = kAlign / 2; size_t ring_size = Ring::MemorySize(); std::unique_ptr buf(new char[ring_size + kMisalign]); EXPECT_DEATH_IF_SUPPORTED( { Ring ring = Ring::Create(buf.get() + kMisalign, ring_size); }, ""); Ring ring = Ring::Create(buf.get(), ring_size); ring.Put(Record(3)); { uint32_t sequence = ring.GetNewestSequence(); Record record; EXPECT_TRUE(ring.Get(&sequence, &record)); EXPECT_EQ(Record(3), record); } } template std::unique_ptr CopyTask(std::atomic* quit, void* in_base, size_t in_size, void* out_base, size_t out_size) { return std::unique_ptr( new std::thread([quit, in_base, in_size, out_base, out_size]() { using Record = typename Ring::Record; bool import_ok; Ring in_ring; Ring out_ring; std::tie(in_ring, import_ok) = Ring::Import(in_base, in_size); ASSERT_TRUE(import_ok); std::tie(out_ring, import_ok) = Ring::Import(out_base, out_size); ASSERT_TRUE(import_ok); uint32_t sequence = in_ring.GetOldestSequence(); while (!std::atomic_load_explicit(quit, std::memory_order_relaxed)) { Record record; if (in_ring.Get(&sequence, &record)) { out_ring.Put(record); sequence++; } } })); } TEST(BroadcastRingTest, ThreadedCopySingle) { using Ring = Dynamic_32_NxM::Ring; using Record = Ring::Record; Ring in_ring; auto in_mmap = CreateRing(&in_ring, Ring::Traits::MinCount()); Ring out_ring; auto out_mmap = CreateRing(&out_ring, Ring::Traits::MinCount()); std::atomic quit(false); std::unique_ptr copy_task = CopyTask( &quit, out_mmap.mmap(), out_mmap.size, in_mmap.mmap(), in_mmap.size); const Record out_record(0x1c); out_ring.Put(out_record); uint32_t in_sequence = in_ring.GetOldestSequence(); Record in_record; while (!in_ring.Get(&in_sequence, &in_record)) { // Do nothing. } EXPECT_EQ(out_record, in_record); std::atomic_store_explicit(&quit, true, std::memory_order_relaxed); copy_task->join(); } TEST(BroadcastRingTest, ThreadedCopyLossless) { using Ring = Dynamic_32_NxM::Ring; using Record = Ring::Record; Ring in_ring; auto in_mmap = CreateRing(&in_ring, Ring::Traits::MinCount()); Ring out_ring; auto out_mmap = CreateRing(&out_ring, Ring::Traits::MinCount()); std::atomic quit(false); std::unique_ptr copy_task = CopyTask( &quit, out_mmap.mmap(), out_mmap.size, in_mmap.mmap(), in_mmap.size); constexpr uint32_t kRecordsToProcess = 10000; uint32_t out_records = 0; uint32_t in_records = 0; uint32_t in_sequence = in_ring.GetNextSequence(); while (out_records < kRecordsToProcess || in_records < kRecordsToProcess) { if (out_records < kRecordsToProcess && out_records - in_records < out_ring.record_count()) { const Record out_record(FillChar(out_records)); out_ring.Put(out_record); out_records++; } Record in_record; while (in_ring.Get(&in_sequence, &in_record)) { EXPECT_EQ(Record(FillChar(in_records)), in_record); in_records++; in_sequence++; } } EXPECT_EQ(kRecordsToProcess, out_records); EXPECT_EQ(kRecordsToProcess, in_records); std::atomic_store_explicit(&quit, true, std::memory_order_relaxed); copy_task->join(); } TEST(BroadcastRingTest, ThreadedCopyLossy) { using Ring = Dynamic_32_NxM::Ring; using Record = Ring::Record; Ring in_ring; auto in_mmap = CreateRing(&in_ring, Ring::Traits::MinCount()); Ring out_ring; auto out_mmap = CreateRing(&out_ring, Ring::Traits::MinCount()); std::atomic quit(false); std::unique_ptr copy_task = CopyTask( &quit, out_mmap.mmap(), out_mmap.size, in_mmap.mmap(), in_mmap.size); constexpr uint32_t kRecordsToProcess = 100000; uint32_t out_records = 0; uint32_t in_records = 0; uint32_t in_sequence = in_ring.GetNextSequence(); while (out_records < kRecordsToProcess) { const Record out_record(FillChar(out_records)); out_ring.Put(out_record); out_records++; Record in_record; if (in_ring.GetNewest(&in_sequence, &in_record)) { EXPECT_EQ(Record(in_record.v[0]), in_record); in_records++; in_sequence++; } } EXPECT_EQ(kRecordsToProcess, out_records); EXPECT_GE(kRecordsToProcess, in_records); std::atomic_store_explicit(&quit, true, std::memory_order_relaxed); copy_task->join(); } template std::unique_ptr CheckFillTask(std::atomic* quit, void* in_base, size_t in_size) { return std::unique_ptr( new std::thread([quit, in_base, in_size]() { using Record = typename Ring::Record; bool import_ok; Ring in_ring; std::tie(in_ring, import_ok) = Ring::Import(in_base, in_size); ASSERT_TRUE(import_ok); uint32_t sequence = in_ring.GetOldestSequence(); while (!std::atomic_load_explicit(quit, std::memory_order_relaxed)) { Record record; if (in_ring.Get(&sequence, &record)) { ASSERT_EQ(Record(record.v[0]), record); sequence++; } } })); } template void ThreadedOverwriteTorture() { using Record = typename Ring::Record; // Maximize overwrites by having few records. const int kMinRecordCount = 1; const int kMaxRecordCount = 4; for (int count = kMinRecordCount; count <= kMaxRecordCount; count *= 2) { Ring out_ring; auto out_mmap = CreateRing(&out_ring, count); std::atomic quit(false); std::unique_ptr check_task = CheckFillTask(&quit, out_mmap.mmap(), out_mmap.size); constexpr int kIterations = 10000; for (int i = 0; i < kIterations; ++i) { const Record record(FillChar(i)); out_ring.Put(record); } std::atomic_store_explicit(&quit, true, std::memory_order_relaxed); check_task->join(); } } TEST(BroadcastRingTest, ThreadedOverwriteTortureSmall) { ThreadedOverwriteTorture(); } TEST(BroadcastRingTest, ThreadedOverwriteTortureLarge) { ThreadedOverwriteTorture(); } } // namespace dvr } // namespace android