1#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""Stream music through connected device from phone test implementation."""
17import acts
18import os
19import shutil
20import time
21
22import acts.test_utils.coex.audio_test_utils as atu
23import acts.test_utils.bt.bt_test_utils as btutils
24from acts import asserts
25from acts.test_utils.abstract_devices.bluetooth_handsfree_abstract_device import BluetoothHandsfreeAbstractDeviceFactory as bt_factory
26from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
27
28PHONE_MUSIC_FILE_DIRECTORY = '/sdcard/Music'
29INIT_ATTEN = 0
30WAIT_TIME = 1
31
32
33class A2dpBaseTest(BluetoothBaseTest):
34    """Stream audio file over desired Bluetooth codec configurations.
35
36    Audio file should be a sine wave. Other audio files will not work for the
37    test analysis metrics.
38
39    Device under test is Android phone, connected to headset with a controller
40    that can generate a BluetoothHandsfreeAbstractDevice from test_utils.
41    abstract_devices.bluetooth_handsfree_abstract_device.
42    BuetoothHandsfreeAbstractDeviceFactory.
43    """
44    def setup_class(self):
45
46        super().setup_class()
47        self.dut = self.android_devices[0]
48        req_params = ['audio_params', 'music_files']
49        #'audio_params' is a dict, contains the audio device type, audio streaming
50        #settings such as volumn, duration, audio recording parameters such as
51        #channel, sampling rate/width, and thdn parameters for audio processing
52        self.unpack_userparams(req_params)
53        # Find music file and push it to the dut
54        music_src = self.music_files[0]
55        music_dest = PHONE_MUSIC_FILE_DIRECTORY
56        success = self.dut.push_system_file(music_src, music_dest)
57        if success:
58            self.music_file = os.path.join(PHONE_MUSIC_FILE_DIRECTORY,
59                                           os.path.basename(music_src))
60        # Initialize media_control class
61        self.media = btutils.MediaControlOverSl4a(self.dut, self.music_file)
62        # Set attenuator to minimum attenuation
63        if hasattr(self, 'attenuators'):
64            self.attenuator = self.attenuators[0]
65            self.attenuator.set_atten(INIT_ATTEN)
66        # Create the BTOE(Bluetooth-Other-End) device object
67        bt_devices = self.user_params.get('bt_devices', [])
68        if bt_devices:
69            attr, idx = bt_devices.split(':')
70            self.bt_device_controller = getattr(self, attr)[int(idx)]
71            self.bt_device = bt_factory().generate(self.bt_device_controller)
72        else:
73            self.log.error('No BT devices config is provided!')
74
75    def teardown_class(self):
76
77        super().teardown_class()
78        if hasattr(self, 'media'):
79            self.media.stop()
80        if hasattr(self, 'attenuator'):
81            self.attenuator.set_atten(INIT_ATTEN)
82        self.dut.droid.bluetoothFactoryReset()
83        self.bt_device.reset()
84        self.bt_device.power_off()
85        btutils.disable_bluetooth(self.dut.droid)
86
87    def setup_test(self):
88
89        super().setup_test()
90        # Initialize audio capture devices
91        self.audio_device = atu.get_audio_capture_device(
92            self.bt_device_controller, self.audio_params)
93        # Reset BT to factory defaults
94        self.dut.droid.bluetoothFactoryReset()
95        self.bt_device.reset()
96        self.bt_device.power_on()
97        btutils.enable_bluetooth(self.dut.droid, self.dut.ed)
98        btutils.connect_phone_to_headset(self.dut, self.bt_device, 60)
99        vol = self.dut.droid.getMaxMediaVolume() * self.audio_params['volume']
100        self.dut.droid.setMediaVolume(0)
101        time.sleep(1)
102        self.dut.droid.setMediaVolume(int(vol))
103
104    def teardown_test(self):
105
106        super().teardown_test()
107        self.dut.droid.bluetoothFactoryReset()
108        self.media.stop()
109        # Set Attenuator to the initial attenuation
110        if hasattr(self, 'attenuator'):
111            self.attenuator.set_atten(INIT_ATTEN)
112        self.bt_device.reset()
113        self.bt_device.power_off()
114        btutils.disable_bluetooth(self.dut.droid)
115
116    def play_and_record_audio(self, duration):
117        """Play and record audio for a set duration.
118
119        Args:
120            duration: duration in seconds for music playing
121        Returns:
122            audio_captured: captured audio file path
123        """
124
125        self.log.info('Play and record audio for {} second'.format(duration))
126        self.media.play()
127        self.audio_device.start()
128        time.sleep(duration + WAIT_TIME)
129        audio_captured = self.audio_device.stop()
130        self.media.stop()
131        self.log.info('Audio play and record stopped')
132        asserts.assert_true(audio_captured, 'Audio not recorded')
133        return audio_captured
134
135    def _get_bt_link_metrics(self):
136        """Get bt link metrics such as rssi and tx pwls.
137
138        Returns:
139            rssi_master: master rssi
140            pwl_master: master tx pwl
141            rssi_slave: slave rssi
142        """
143
144        self.media.play()
145        # Get master rssi and power level
146        rssi_master = btutils.get_bt_metric(self.dut)['rssi']
147        pwl_master = btutils.get_bt_metric(self.dut)['pwlv']
148        # Get slave rssi if possible
149        if isinstance(self.bt_device_controller,
150                      acts.controllers.android_device.AndroidDevice):
151            rssi_slave = btutils.get_bt_rssi(self.bt_device_controller)
152        else:
153            rssi_slave = None
154        self.media.stop()
155        return [rssi_master, pwl_master, rssi_slave]
156
157    def run_thdn_analysis(self, audio_captured, tag):
158        """Calculate Total Harmonic Distortion plus Noise for latest recording.
159
160        Store result in self.metrics.
161
162        Args:
163            audio_captured: the captured audio file
164        Returns:
165            thdn: thdn value in a list
166        """
167        # Calculate Total Harmonic Distortion + Noise
168        audio_result = atu.AudioCaptureResult(audio_captured,
169                                              self.audio_params)
170        thdn = audio_result.THDN(**self.audio_params['thdn_params'])
171        file_name = tag + os.path.basename(audio_result.path)
172        file_new = os.path.join(os.path.dirname(audio_result.path), file_name)
173        shutil.copyfile(audio_result.path, file_new)
174        for ch_no, t in enumerate(thdn):
175            self.log.info('THD+N for channel %s: %.4f%%' % (ch_no, t * 100))
176        return thdn
177
178    def run_anomaly_detection(self, audio_captured):
179        """Detect anomalies in latest recording.
180
181        Store result in self.metrics.
182
183        Args:
184            audio_captured: the captured audio file
185        Returns:
186            anom: anom detected in the captured file
187        """
188        # Detect Anomalies
189        audio_result = atu.AudioCaptureResult(audio_captured)
190        anom = audio_result.detect_anomalies(
191            **self.audio_params['anomaly_params'])
192        num_anom = 0
193        for ch_no, anomalies in enumerate(anom):
194            if anomalies:
195                for anomaly in anomalies:
196                    num_anom += 1
197                    start, end = anomaly
198                    self.log.warning(
199                        'Anomaly on channel {} at {}:{}. Duration '
200                        '{} sec'.format(ch_no, start // 60, start % 60,
201                                        end - start))
202        else:
203            self.log.info('%i anomalies detected.' % num_anom)
204        return anom
205