1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from datetime import datetime, timedelta 18import logging 19from threading import Timer 20import time 21import traceback 22 23from mobly import asserts 24 25from acts import signals 26from acts.base_test import BaseTestClass 27 28from bluetooth_packets_python3 import hci_packets 29from bluetooth_packets_python3 import l2cap_packets 30from cert.event_stream import EventStream, FilteringEventStream 31from cert.truth import assertThat 32from cert.metadata import metadata 33from cert.behavior import when, wait_until 34from cert.behavior import IHasBehaviors 35from cert.behavior import anything 36from cert.behavior import SingleArgumentBehavior 37from cert.behavior import ReplyStage 38 39 40class BogusProto: 41 42 class BogusType: 43 44 def __init__(self): 45 self.name = "BogusProto" 46 self.is_extension = False 47 self.cpp_type = False 48 49 def type(self): 50 return 'BogusRpc' 51 52 def label(self): 53 return "label" 54 55 class BogusDescriptor: 56 57 def __init__(self, name): 58 self.full_name = name 59 60 def __init__(self, value): 61 self.value_ = value 62 self.DESCRIPTOR = BogusProto.BogusDescriptor(str(value)) 63 64 def __str__(self): 65 return "BogusRpc value = " + str(self.value_) 66 67 def ListFields(self): 68 for field in [BogusProto.BogusType()]: 69 yield [field, self.value_] 70 71 72class FetchEvents: 73 74 def __init__(self, events, delay_ms): 75 self.events_ = events 76 self.sleep_time_ = (delay_ms * 1.0) / 1000 77 self.index_ = 0 78 self.done_ = False 79 self.then_ = datetime.now() 80 81 def __iter__(self): 82 for event in self.events_: 83 time.sleep(self.sleep_time_) 84 if self.done_: 85 return 86 logging.debug("yielding %d" % event) 87 yield BogusProto(event) 88 89 def done(self): 90 return self.done_ 91 92 def cancel(self): 93 logging.debug("cancel") 94 self.done_ = True 95 return None 96 97 98class TestBehaviors(object): 99 100 def __init__(self, parent): 101 self.test_request_behavior = SingleArgumentBehavior(lambda: TestBehaviors.TestRequestReplyStage(parent)) 102 103 def test_request(self, matcher): 104 return self.test_request_behavior.begin(matcher) 105 106 class TestRequestReplyStage(ReplyStage): 107 108 def __init__(self, parent): 109 self._parent = parent 110 111 def increment_count(self): 112 self._commit(lambda obj: self._increment_count(obj)) 113 return self 114 115 def _increment_count(self, obj): 116 self._parent.count += 1 117 self._parent.captured.append(obj) 118 119 120class ObjectWithBehaviors(IHasBehaviors): 121 122 def __init__(self): 123 self.behaviors = TestBehaviors(self) 124 self.count = 0 125 self.captured = [] 126 self.unhandled_count = 0 127 128 def get_behaviors(self): 129 return self.behaviors 130 131 def increment_unhandled(self): 132 self.unhandled_count += 1 133 134 135class CertSelfTest(BaseTestClass): 136 137 def setup_test(self): 138 return True 139 140 def teardown_test(self): 141 return True 142 143 def test_assert_occurs_at_least_passes(self): 144 with EventStream(FetchEvents(events=[1, 2, 3, 1, 2, 3], delay_ms=40)) as event_stream: 145 event_stream.assert_event_occurs( 146 lambda data: data.value_ == 1, timeout=timedelta(milliseconds=300), at_least_times=2) 147 148 def test_assert_occurs_passes(self): 149 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 150 event_stream.assert_event_occurs(lambda data: data.value_ == 1, timeout=timedelta(seconds=1)) 151 152 def test_assert_occurs_fails(self): 153 try: 154 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 155 event_stream.assert_event_occurs(lambda data: data.value_ == 4, timeout=timedelta(seconds=1)) 156 except Exception as e: 157 logging.debug(e) 158 return True # Failed as expected 159 return False 160 161 def test_assert_occurs_at_most_passes(self): 162 with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream: 163 event_stream.assert_event_occurs_at_most( 164 lambda data: data.value_ < 4, timeout=timedelta(seconds=1), at_most_times=3) 165 166 def test_assert_occurs_at_most_fails(self): 167 try: 168 with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream: 169 event_stream.assert_event_occurs_at_most( 170 lambda data: data.value_ > 1, timeout=timedelta(seconds=1), at_most_times=2) 171 except Exception as e: 172 logging.debug(e) 173 return True # Failed as expected 174 return False 175 176 def test_skip_a_test(self): 177 asserts.skip("Skipping this test because it's blocked by b/xyz") 178 assert False 179 180 def test_nested_packets(self): 181 handle = 123 182 inside = hci_packets.ReadScanEnableBuilder() 183 logging.debug(inside.Serialize()) 184 logging.debug("building outside") 185 outside = hci_packets.AclPacketBuilder(handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 186 hci_packets.BroadcastFlag.POINT_TO_POINT, inside) 187 logging.debug(outside.Serialize()) 188 logging.debug("Done!") 189 190 def test_l2cap_config_options(self): 191 mtu_opt = l2cap_packets.MtuConfigurationOption() 192 mtu_opt.mtu = 123 193 fcs_opt = l2cap_packets.FrameCheckSequenceOption() 194 fcs_opt.fcs_type = l2cap_packets.FcsType.DEFAULT 195 request = l2cap_packets.ConfigurationRequestBuilder( 196 0x1d, # Command ID 197 0xc1d, # Channel ID 198 l2cap_packets.Continuation.END, 199 [mtu_opt, fcs_opt]) 200 request_b_frame = l2cap_packets.BasicFrameBuilder(0x01, request) 201 handle = 123 202 wrapped = hci_packets.AclPacketBuilder(handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 203 hci_packets.BroadcastFlag.POINT_TO_POINT, request_b_frame) 204 # Size is ACL (4) + L2CAP (4) + Configure (8) + MTU (4) + FCS (3) 205 asserts.assert_true(len(wrapped.Serialize()) == 23, "Packet serialized incorrectly") 206 207 def test_assertThat_boolean_success(self): 208 assertThat(True).isTrue() 209 assertThat(False).isFalse() 210 211 def test_assertThat_boolean_falseIsTrue(self): 212 try: 213 assertThat(False).isTrue() 214 except Exception as e: 215 return True 216 return False 217 218 def test_assertThat_boolean_trueIsFalse(self): 219 try: 220 assertThat(True).isFalse() 221 except Exception as e: 222 return True 223 return False 224 225 def test_assertThat_object_success(self): 226 assertThat("this").isEqualTo("this") 227 assertThat("this").isNotEqualTo("that") 228 assertThat(None).isNone() 229 assertThat("this").isNotNone() 230 231 def test_assertThat_object_isEqualToFails(self): 232 try: 233 assertThat("this").isEqualTo("that") 234 except Exception as e: 235 return True 236 return False 237 238 def test_assertThat_object_isNotEqualToFails(self): 239 try: 240 assertThat("this").isNotEqualTo("this") 241 except Exception as e: 242 return True 243 return False 244 245 def test_assertThat_object_isNoneFails(self): 246 try: 247 assertThat("this").isNone() 248 except Exception as e: 249 return True 250 return False 251 252 def test_assertThat_object_isNotNoneFails(self): 253 try: 254 assertThat(None).isNotNone() 255 except Exception as e: 256 return True 257 return False 258 259 def test_assertThat_eventStream_emits_passes(self): 260 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 261 assertThat(event_stream).emits(lambda data: data.value_ == 1) 262 263 def test_assertThat_eventStream_emits_then_passes(self): 264 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 265 assertThat(event_stream).emits(lambda data: data.value_ == 1).then(lambda data: data.value_ == 3) 266 267 def test_assertThat_eventStream_emits_fails(self): 268 try: 269 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 270 assertThat(event_stream).emits(lambda data: data.value_ == 4) 271 except Exception as e: 272 logging.debug(e) 273 return True # Failed as expected 274 return False 275 276 def test_assertThat_eventStream_emits_then_fails(self): 277 try: 278 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 279 assertThat(event_stream).emits(lambda data: data.value_ == 1).emits(lambda data: data.value_ == 4) 280 except Exception as e: 281 logging.debug(e) 282 return True # Failed as expected 283 return False 284 285 def test_assertThat_eventStream_emitsInOrder_passes(self): 286 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 287 assertThat(event_stream).emits(lambda data: data.value_ == 1, lambda data: data.value_ == 2).inOrder() 288 289 def test_assertThat_eventStream_emitsInAnyOrder_passes(self): 290 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 291 assertThat(event_stream).emits( 292 lambda data: data.value_ == 2, 293 lambda data: data.value_ == 1).inAnyOrder().then(lambda data: data.value_ == 3) 294 295 def test_assertThat_eventStream_emitsInOrder_fails(self): 296 try: 297 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 298 assertThat(event_stream).emits(lambda data: data.value_ == 2, lambda data: data.value_ == 1).inOrder() 299 except Exception as e: 300 logging.debug(e) 301 return True # Failed as expected 302 return False 303 304 def test_assertThat_eventStream_emitsInAnyOrder_fails(self): 305 try: 306 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 307 assertThat(event_stream).emits(lambda data: data.value_ == 4, 308 lambda data: data.value_ == 1).inAnyOrder() 309 except Exception as e: 310 logging.debug(e) 311 return True # Failed as expected 312 return False 313 314 def test_assertThat_emitsNone_passes(self): 315 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 316 assertThat(event_stream).emitsNone( 317 lambda data: data.value_ == 4, timeout=timedelta(seconds=0.15)).thenNone( 318 lambda data: data.value_ == 5, timeout=timedelta(seconds=0.15)) 319 320 def test_assertThat_emitsNone_passes_after_1_second(self): 321 with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=400)) as event_stream: 322 assertThat(event_stream).emitsNone(lambda data: data.value_ == 4, timeout=timedelta(seconds=1)) 323 324 def test_assertThat_emitsNone_fails(self): 325 try: 326 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 327 assertThat(event_stream).emitsNone(lambda data: data.value_ == 2, timeout=timedelta(seconds=1)) 328 except Exception as e: 329 logging.debug(e) 330 return True # Failed as expected 331 return False 332 333 def test_assertThat_emitsNone_zero_passes(self): 334 with EventStream(FetchEvents(events=[], delay_ms=50)) as event_stream: 335 assertThat(event_stream).emitsNone(timeout=timedelta(milliseconds=10)).thenNone( 336 timeout=timedelta(milliseconds=10)) 337 338 def test_assertThat_emitsNone_zero_passes_after_one_second(self): 339 with EventStream(FetchEvents([1], delay_ms=1500)) as event_stream: 340 assertThat(event_stream).emitsNone(timeout=timedelta(seconds=1.0)) 341 342 def test_assertThat_emitsNone_zero_fails(self): 343 try: 344 with EventStream(FetchEvents(events=[17], delay_ms=50)) as event_stream: 345 assertThat(event_stream).emitsNone(timeout=timedelta(seconds=1)) 346 except Exception as e: 347 logging.debug(e) 348 return True # Failed as expected 349 return False 350 351 def test_filtering_event_stream_none_filter_function(self): 352 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 353 filtered_event_stream = FilteringEventStream(event_stream, None) 354 assertThat(filtered_event_stream)\ 355 .emits(lambda data: data.value_ == 1)\ 356 .then(lambda data: data.value_ == 3) 357 358 def test_metadata_empty(self): 359 360 @metadata() 361 def simple_pass_test(arg): 362 pass 363 364 try: 365 simple_pass_test(1) 366 except signals.TestFailure: 367 pass 368 except Exception as e: 369 asserts.fail("@metadata() should only raise signals.TestFailure, " 370 "but raised %s with msg %s instead" % (e.__class__.__name__, str(e))) 371 else: 372 asserts.fail("@metadata() should not work") 373 374 def test_metadata_empty_no_function_call(self): 375 376 @metadata 377 def simple_pass_test(arg): 378 pass 379 380 try: 381 simple_pass_test(1) 382 except signals.TestFailure: 383 pass 384 except Exception as e: 385 asserts.fail("@metadata should only raise signals.TestFailure, " 386 "but raised %s with msg %s instead" % (e.__class__.__name__, str(e))) 387 else: 388 asserts.fail("@metadata should not work") 389 390 def test_metadata_pts_missing_id(self): 391 392 @metadata(pts_test_name="Hello world") 393 def simple_pass_test(arg): 394 pass 395 396 try: 397 simple_pass_test(1) 398 except signals.TestFailure: 399 pass 400 except Exception as e: 401 asserts.fail("should only raise signals.TestFailure, " 402 "but raised %s with msg %s instead" % (e.__class__.__name__, str(e))) 403 else: 404 asserts.fail("missing pts_test_id should not work") 405 406 def test_metadata_pts_missing_name(self): 407 408 @metadata(pts_test_id="A/B/C") 409 def simple_pass_test(arg): 410 pass 411 412 try: 413 simple_pass_test(1) 414 except signals.TestFailure: 415 pass 416 except Exception as e: 417 asserts.fail("should only raise signals.TestFailure, " 418 "but raised %s with msg %s instead" % (e.__class__.__name__, str(e))) 419 else: 420 asserts.fail("missing pts_test_name should not work") 421 422 def test_metadata_pts_test_id_and_description(self): 423 424 @metadata(pts_test_id="A/B/C", pts_test_name="Hello world") 425 def simple_pass_test(arg): 426 pass 427 428 try: 429 simple_pass_test(1) 430 except signals.TestPass as e: 431 asserts.assert_true("pts_test_id" in e.extras, msg=("pts_test_id not in extra: %s" % str(e.extras))) 432 asserts.assert_equal(e.extras["pts_test_id"], "A/B/C") 433 asserts.assert_true("pts_test_name" in e.extras, msg=("pts_test_name not in extra: %s" % str(e.extras))) 434 asserts.assert_equal(e.extras["pts_test_name"], "Hello world") 435 else: 436 asserts.fail("Must throw an exception using @metadata decorator") 437 438 def test_metadata_test_with_exception_stacktrace(self): 439 440 @metadata(pts_test_id="A/B/C", pts_test_name="Hello world") 441 def simple_fail_test(failure_argument): 442 raise ValueError(failure_argument) 443 444 try: 445 simple_fail_test("BEEFBEEF") 446 except signals.TestError as e: 447 asserts.assert_true("pts_test_id" in e.extras, msg=("pts_test_id not in extra: %s" % str(e.extras))) 448 asserts.assert_equal(e.extras["pts_test_id"], "A/B/C") 449 asserts.assert_true("pts_test_name" in e.extras, msg=("pts_test_name not in extra: %s" % str(e.extras))) 450 asserts.assert_equal(e.extras["pts_test_name"], "Hello world") 451 trace_str = traceback.format_exc() 452 asserts.assert_true( 453 "raise ValueError(failure_argument)" in trace_str, 454 msg="Failed test method not in error stack trace: %s" % trace_str) 455 else: 456 asserts.fail("Must throw an exception using @metadata decorator") 457 458 def test_fluent_behavior_simple(self): 459 thing = ObjectWithBehaviors() 460 461 when(thing).test_request(anything()).then().increment_count() 462 463 thing.behaviors.test_request_behavior.run("A") 464 465 assertThat(thing.count).isEqualTo(1) 466 assertThat(thing.captured).isEqualTo(["A"]) 467 468 def test_fluent_behavior__then_single__captures_one(self): 469 thing = ObjectWithBehaviors() 470 471 thing.behaviors.test_request_behavior.set_default_to_ignore() 472 473 when(thing).test_request(anything()).then().increment_count() 474 475 thing.behaviors.test_request_behavior.run("A") 476 thing.behaviors.test_request_behavior.run("A") 477 thing.behaviors.test_request_behavior.run("A") 478 479 assertThat(thing.count).isEqualTo(1) 480 assertThat(thing.captured).isEqualTo(["A"]) 481 482 def test_fluent_behavior__then_times__captures_all(self): 483 thing = ObjectWithBehaviors() 484 485 when(thing).test_request(anything()).then(times=3).increment_count() 486 487 thing.behaviors.test_request_behavior.run("A") 488 thing.behaviors.test_request_behavior.run("B") 489 thing.behaviors.test_request_behavior.run("C") 490 491 assertThat(thing.count).isEqualTo(3) 492 assertThat(thing.captured).isEqualTo(["A", "B", "C"]) 493 494 def test_fluent_behavior__always__captures_all(self): 495 thing = ObjectWithBehaviors() 496 497 when(thing).test_request(anything()).always().increment_count() 498 499 thing.behaviors.test_request_behavior.run("A") 500 thing.behaviors.test_request_behavior.run("B") 501 thing.behaviors.test_request_behavior.run("C") 502 503 assertThat(thing.count).isEqualTo(3) 504 assertThat(thing.captured).isEqualTo(["A", "B", "C"]) 505 506 def test_fluent_behavior__matcher__captures_relevant(self): 507 thing = ObjectWithBehaviors() 508 thing.behaviors.test_request_behavior.set_default_to_ignore() 509 510 when(thing).test_request(lambda obj: obj == "B").always().increment_count() 511 512 thing.behaviors.test_request_behavior.run("A") 513 thing.behaviors.test_request_behavior.run("B") 514 thing.behaviors.test_request_behavior.run("C") 515 516 assertThat(thing.count).isEqualTo(1) 517 assertThat(thing.captured).isEqualTo(["B"]) 518 519 def test_fluent_behavior__then_repeated__captures_relevant(self): 520 thing = ObjectWithBehaviors() 521 thing.behaviors.test_request_behavior.set_default_to_ignore() 522 523 when(thing).test_request(anything()).then().increment_count().increment_count() 524 525 thing.behaviors.test_request_behavior.run("A") 526 thing.behaviors.test_request_behavior.run("B") 527 thing.behaviors.test_request_behavior.run("A") 528 529 assertThat(thing.count).isEqualTo(2) 530 assertThat(thing.captured).isEqualTo(["A", "B"]) 531 532 def test_fluent_behavior__fallback__captures_relevant(self): 533 thing = ObjectWithBehaviors() 534 thing.behaviors.test_request_behavior.set_default_to_ignore() 535 536 when(thing).test_request(lambda obj: obj == "B").then(times=1).increment_count() 537 when(thing).test_request(lambda obj: obj == "C").always().increment_count() 538 539 thing.behaviors.test_request_behavior.run("A") 540 thing.behaviors.test_request_behavior.run("B") 541 thing.behaviors.test_request_behavior.run("A") 542 thing.behaviors.test_request_behavior.run("C") 543 thing.behaviors.test_request_behavior.run("B") 544 thing.behaviors.test_request_behavior.run("C") 545 546 assertThat(thing.count).isEqualTo(3) 547 assertThat(thing.captured).isEqualTo(["B", "C", "C"]) 548 549 def test_fluent_behavior__default_unhandled_crash(self): 550 thing = ObjectWithBehaviors() 551 552 when(thing).test_request(anything()).then().increment_count() 553 554 thing.behaviors.test_request_behavior.run("A") 555 try: 556 thing.behaviors.test_request_behavior.run("A") 557 except Exception as e: 558 logging.debug(e) 559 return True # Failed as expected 560 return False 561 562 def test_fluent_behavior__set_default_works(self): 563 thing = ObjectWithBehaviors() 564 thing.behaviors.test_request_behavior.set_default(lambda obj: thing.increment_unhandled()) 565 566 when(thing).test_request(anything()).then().increment_count() 567 568 thing.behaviors.test_request_behavior.run("A") 569 thing.behaviors.test_request_behavior.run("A") 570 assertThat(thing.unhandled_count).isEqualTo(1) 571 572 def test_fluent_behavior__wait_until_done(self): 573 thing = ObjectWithBehaviors() 574 is_a = lambda obj: obj == "A" 575 when(thing).test_request(is_a).then().increment_count() 576 577 closure = lambda: thing.behaviors.test_request_behavior.run("A") 578 t = Timer(0.5, closure) 579 t.start() 580 581 wait_until(thing).test_request(is_a).times(1) 582 assertThat(thing.count).isEqualTo(1) 583 assertThat(thing.captured).isEqualTo(["A"]) 584 585 def test_fluent_behavior__wait_until_done_different_lambda(self): 586 thing = ObjectWithBehaviors() 587 when(thing).test_request(lambda obj: obj == "A").then().increment_count() 588 589 closure = lambda: thing.behaviors.test_request_behavior.run("A") 590 t = Timer(0.5, closure) 591 t.start() 592 593 wait_until(thing).test_request(lambda obj: obj == "A").times(1) 594 assertThat(thing.count).isEqualTo(1) 595 assertThat(thing.captured).isEqualTo(["A"]) 596 597 def test_fluent_behavior__wait_until_done_anything(self): 598 thing = ObjectWithBehaviors() 599 when(thing).test_request(lambda obj: obj == "A").then().increment_count() 600 601 closure = lambda: thing.behaviors.test_request_behavior.run("A") 602 t = Timer(0.5, closure) 603 t.start() 604 605 wait_until(thing).test_request(anything()).times(1) 606 assertThat(thing.count).isEqualTo(1) 607 assertThat(thing.captured).isEqualTo(["A"]) 608 609 def test_fluent_behavior__wait_until_done_not_happened(self): 610 thing = ObjectWithBehaviors() 611 thing.behaviors.test_request_behavior.set_default_to_ignore() 612 when(thing).test_request(lambda obj: obj == "A").then().increment_count() 613 614 closure = lambda: thing.behaviors.test_request_behavior.run("B") 615 t = Timer(0.5, closure) 616 t.start() 617 assertThat(wait_until(thing).test_request(lambda obj: obj == "A").times(1)).isFalse() 618 619 def test_fluent_behavior__wait_until_done_with_default(self): 620 thing = ObjectWithBehaviors() 621 thing.behaviors.test_request_behavior.set_default(lambda obj: thing.increment_unhandled()) 622 623 closure = lambda: thing.behaviors.test_request_behavior.run("A") 624 t = Timer(0.5, closure) 625 t.start() 626 627 wait_until(thing).test_request(anything()).times(1) 628 assertThat(thing.unhandled_count).isEqualTo(1) 629 630 def test_fluent_behavior__wait_until_done_two_events_AA(self): 631 thing = ObjectWithBehaviors() 632 when(thing).test_request(lambda obj: obj == "A").then().increment_count().increment_count() 633 634 closure1 = lambda: thing.behaviors.test_request_behavior.run("A") 635 t1 = Timer(0.5, closure1) 636 t1.start() 637 closure2 = lambda: thing.behaviors.test_request_behavior.run("A") 638 t2 = Timer(0.5, closure2) 639 t2.start() 640 641 wait_until(thing).test_request(lambda obj: obj == "A").times(2) 642 assertThat(thing.count).isEqualTo(2) 643 assertThat(thing.captured).isEqualTo(["A", "A"]) 644 645 def test_fluent_behavior__wait_until_done_two_events_AB(self): 646 thing = ObjectWithBehaviors() 647 when(thing).test_request(anything()).always().increment_count() 648 649 closure1 = lambda: thing.behaviors.test_request_behavior.run("A") 650 t1 = Timer(0.5, closure1) 651 t1.start() 652 closure2 = lambda: thing.behaviors.test_request_behavior.run("B") 653 t2 = Timer(1, closure2) 654 t2.start() 655 656 wait_until(thing).test_request(anything()).times(2) 657 assertThat(thing.count).isEqualTo(2) 658 assertThat(thing.captured).isEqualTo(["A", "B"]) 659 660 def test_fluent_behavior__wait_until_done_only_one_event_is_done(self): 661 thing = ObjectWithBehaviors() 662 when(thing).test_request(anything()).always().increment_count() 663 664 closure1 = lambda: thing.behaviors.test_request_behavior.run("A") 665 t1 = Timer(1, closure1) 666 t1.start() 667 closure2 = lambda: thing.behaviors.test_request_behavior.run("B") 668 t2 = Timer(3, closure2) 669 t2.start() 670 assertThat(wait_until(thing).test_request(lambda obj: obj == "A").times(2)).isFalse() 671