1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from acts.controllers.fuchsia_lib.base_lib import BaseLib 18 19 20class FuchsiaAvdtpLib(BaseLib): 21 def __init__(self, addr, tc, client_id): 22 self.address = addr 23 self.test_counter = tc 24 self.client_id = client_id 25 26 def init(self, role): 27 """Initializes the ProfileServerFacade's proxy object. 28 29 No operations for SDP can be performed until this is initialized. 30 31 Returns: 32 Dictionary, None if success, error if error. 33 """ 34 test_cmd = "avdtp_facade.AvdtpInit" 35 36 test_args = {"role": role} 37 test_id = self.build_id(self.test_counter) 38 self.test_counter += 1 39 40 return self.send_command(test_id, test_cmd, test_args) 41 42 def getConnectedPeers(self): 43 """Gets the AVDTP connected peers. 44 45 Returns: 46 Dictionary, None if success, error if error. 47 """ 48 test_cmd = "avdtp_facade.AvdtpGetConnectedPeers" 49 test_args = {} 50 test_id = self.build_id(self.test_counter) 51 self.test_counter += 1 52 53 return self.send_command(test_id, test_cmd, test_args) 54 55 def setConfiguration(self, peer_id): 56 """Sends the AVDTP command to input peer_id: set configuration 57 58 Args: 59 peer_id: The peer id to send the AVDTP command to. 60 61 Returns: 62 Dictionary, None if success, error if error. 63 """ 64 test_cmd = "avdtp_facade.AvdtpSetConfiguration" 65 test_args = {"identifier": peer_id} 66 test_id = self.build_id(self.test_counter) 67 self.test_counter += 1 68 69 return self.send_command(test_id, test_cmd, test_args) 70 71 def getConfiguration(self, peer_id): 72 """Sends the AVDTP command to input peer_id: get configuration 73 74 Args: 75 peer_id: The peer id to send the AVDTP command to. 76 77 Returns: 78 Dictionary, None if success, error if error. 79 """ 80 test_cmd = "avdtp_facade.AvdtpGetConfiguration" 81 test_args = {"identifier": peer_id} 82 test_id = self.build_id(self.test_counter) 83 self.test_counter += 1 84 85 return self.send_command(test_id, test_cmd, test_args) 86 87 def getCapabilities(self, peer_id): 88 """Sends the AVDTP command to input peer_id: get capabilities 89 90 Args: 91 peer_id: The peer id to send the AVDTP command to. 92 93 Returns: 94 Dictionary, None if success, error if error. 95 """ 96 test_cmd = "avdtp_facade.AvdtpGetCapabilities" 97 test_args = {"identifier": peer_id} 98 test_id = self.build_id(self.test_counter) 99 self.test_counter += 1 100 101 return self.send_command(test_id, test_cmd, test_args) 102 103 def getAllCapabilities(self, peer_id): 104 """Sends the AVDTP command to input peer_id: get all capabilities 105 106 Args: 107 peer_id: The peer id to send the AVDTP command to. 108 109 Returns: 110 Dictionary, None if success, error if error. 111 """ 112 test_cmd = "avdtp_facade.AvdtpGetAllCapabilities" 113 test_args = {"identifier": peer_id} 114 test_id = self.build_id(self.test_counter) 115 self.test_counter += 1 116 117 return self.send_command(test_id, test_cmd, test_args) 118 119 def reconfigureStream(self, peer_id): 120 """Sends the AVDTP command to input peer_id: reconfigure stream 121 122 Args: 123 peer_id: The peer id to send the AVDTP command to. 124 125 Returns: 126 Dictionary, None if success, error if error. 127 """ 128 test_cmd = "avdtp_facade.AvdtpReconfigureStream" 129 test_args = {"identifier": peer_id} 130 test_id = self.build_id(self.test_counter) 131 self.test_counter += 1 132 133 return self.send_command(test_id, test_cmd, test_args) 134 135 def suspendStream(self, peer_id): 136 """Sends the AVDTP command to input peer_id: suspend stream 137 Args: 138 peer_id: The peer id to send the AVDTP command to. 139 140 Returns: 141 Dictionary, None if success, error if error. 142 """ 143 test_cmd = "avdtp_facade.AvdtpSuspendStream" 144 test_args = {"identifier": peer_id} 145 test_id = self.build_id(self.test_counter) 146 self.test_counter += 1 147 148 return self.send_command(test_id, test_cmd, test_args) 149 150 def suspendAndReconfigure(self, peer_id): 151 """Sends the AVDTP command to input peer_id: suspend and reconfigure 152 153 Args: 154 peer_id: The peer id to send the AVDTP command to. 155 156 Returns: 157 Dictionary, None if success, error if error. 158 """ 159 test_cmd = "avdtp_facade.AvdtpSuspendAndReconfigure" 160 test_args = {"identifier": peer_id} 161 test_id = self.build_id(self.test_counter) 162 self.test_counter += 1 163 164 return self.send_command(test_id, test_cmd, test_args) 165 166 def releaseStream(self, peer_id): 167 """Sends the AVDTP command to input peer_id: release stream 168 169 Args: 170 peer_id: The peer id to send the AVDTP command to. 171 172 Returns: 173 Dictionary, None if success, error if error. 174 """ 175 test_cmd = "avdtp_facade.AvdtpReleaseStream" 176 test_args = {"identifier": peer_id} 177 test_id = self.build_id(self.test_counter) 178 self.test_counter += 1 179 180 return self.send_command(test_id, test_cmd, test_args) 181 182 def establishStream(self, peer_id): 183 """Sends the AVDTP command to input peer_id: establish stream 184 185 Args: 186 peer_id: The peer id to send the AVDTP command to. 187 188 Returns: 189 Dictionary, None if success, error if error. 190 """ 191 test_cmd = "avdtp_facade.AvdtpEstablishStream" 192 test_args = {"identifier": peer_id} 193 test_id = self.build_id(self.test_counter) 194 self.test_counter += 1 195 196 return self.send_command(test_id, test_cmd, test_args) 197 198 def startStream(self, peer_id): 199 """Sends the AVDTP command to input peer_id: start stream 200 201 Args: 202 peer_id: The peer id to send the AVDTP command to. 203 204 Returns: 205 Dictionary, None if success, error if error. 206 """ 207 test_cmd = "avdtp_facade.AvdtpStartStream" 208 test_args = {"identifier": peer_id} 209 test_id = self.build_id(self.test_counter) 210 self.test_counter += 1 211 212 return self.send_command(test_id, test_cmd, test_args) 213 214 def abortStream(self, peer_id): 215 """Sends the AVDTP command to input peer_id: abort stream 216 217 Args: 218 peer_id: The peer id to send the AVDTP command to. 219 220 Returns: 221 Dictionary, None if success, error if error. 222 """ 223 test_cmd = "avdtp_facade.AvdtpAbortStream" 224 test_args = {"identifier": peer_id} 225 test_id = self.build_id(self.test_counter) 226 self.test_counter += 1 227 228 return self.send_command(test_id, test_cmd, test_args) 229 230 def establishStream(self, peer_id): 231 """Sends the AVDTP command to input peer_id: establish stream 232 233 Args: 234 peer_id: The peer id to send the AVDTP command to. 235 236 Returns: 237 Dictionary, None if success, error if error. 238 """ 239 test_cmd = "avdtp_facade.AvdtpEstablishStream" 240 test_args = {"identifier": peer_id} 241 test_id = self.build_id(self.test_counter) 242 self.test_counter += 1 243 244 return self.send_command(test_id, test_cmd, test_args) 245 246 def removeService(self): 247 """Removes the AVDTP service from the Fuchsia device 248 249 Returns: 250 Dictionary, None if success, error if error. 251 """ 252 test_cmd = "avdtp_facade.AvdtpRemoveService" 253 test_args = {} 254 test_id = self.build_id(self.test_counter) 255 self.test_counter += 1 256 257 return self.send_command(test_id, test_cmd, test_args) 258