1 /* 2 * Copyright (C) 2020 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.car.connecteddevice.ble; 18 19 import static com.android.car.connecteddevice.BleStreamProtos.BleOperationProto.OperationType; 20 import static com.android.car.connecteddevice.BleStreamProtos.BlePacketProto.BlePacket; 21 import static com.android.car.connecteddevice.BleStreamProtos.VersionExchangeProto.BleVersionExchange; 22 import static com.android.car.connecteddevice.util.SafeLog.logd; 23 import static com.android.car.connecteddevice.util.SafeLog.loge; 24 import static com.android.car.connecteddevice.util.SafeLog.logw; 25 26 import android.annotation.NonNull; 27 import android.annotation.Nullable; 28 import android.bluetooth.BluetoothDevice; 29 import android.bluetooth.BluetoothGattCharacteristic; 30 import android.os.Handler; 31 import android.os.Looper; 32 33 import com.android.car.connecteddevice.BleStreamProtos.BleDeviceMessageProto.BleDeviceMessage; 34 import com.android.car.connecteddevice.util.ByteUtils; 35 import com.android.car.protobuf.ByteString; 36 import com.android.car.protobuf.InvalidProtocolBufferException; 37 import com.android.internal.annotations.VisibleForTesting; 38 39 import java.io.ByteArrayOutputStream; 40 import java.io.IOException; 41 import java.util.ArrayDeque; 42 import java.util.HashMap; 43 import java.util.List; 44 import java.util.UUID; 45 import java.util.concurrent.atomic.AtomicBoolean; 46 import java.util.concurrent.atomic.AtomicInteger; 47 import java.util.concurrent.atomic.AtomicLong; 48 49 /** BLE message stream to a device. */ 50 class BleDeviceMessageStream { 51 52 private static final String TAG = "BleDeviceMessageStream"; 53 54 // Only version 2 of the messaging and version 1 of the security supported. 55 private static final int MESSAGING_VERSION = 2; 56 private static final int SECURITY_VERSION = 1; 57 58 /* 59 * During bandwidth testing, it was discovered that allowing the stream to send as fast as it 60 * can blocked outgoing notifications from being received by the connected device. Adding a 61 * throttle to the outgoing messages alleviated this block and allowed both sides to 62 * send/receive in parallel successfully. 63 */ 64 private static final long THROTTLE_DEFAULT_MS = 10L; 65 private static final long THROTTLE_WAIT_MS = 75L; 66 67 private final ArrayDeque<BlePacket> mPacketQueue = new ArrayDeque<>(); 68 69 private final HashMap<Integer, ByteArrayOutputStream> mPendingData = 70 new HashMap<>(); 71 72 private final MessageIdGenerator mMessageIdGenerator = new MessageIdGenerator(); 73 74 private final Handler mHandler = new Handler(Looper.getMainLooper()); 75 76 private final AtomicBoolean mIsVersionExchanged = new AtomicBoolean(false); 77 78 private final AtomicBoolean mIsSendingInProgress = new AtomicBoolean(false); 79 80 private final AtomicLong mThrottleDelay = new AtomicLong(THROTTLE_DEFAULT_MS); 81 82 private final BlePeripheralManager mBlePeripheralManager; 83 84 private final BluetoothDevice mDevice; 85 86 private final BluetoothGattCharacteristic mWriteCharacteristic; 87 88 private final BluetoothGattCharacteristic mReadCharacteristic; 89 90 private MessageReceivedListener mMessageReceivedListener; 91 92 private MessageReceivedErrorListener mMessageReceivedErrorListener; 93 94 /* 95 * This initial value is 20 because BLE has a default write of 23 bytes. However, 3 bytes are 96 * subtracted due to bytes being reserved for the command type and attribute ID. 97 */ 98 private int mMaxWriteSize = 20; 99 BleDeviceMessageStream(@onNull BlePeripheralManager blePeripheralManager, @NonNull BluetoothDevice device, @NonNull BluetoothGattCharacteristic writeCharacteristic, @NonNull BluetoothGattCharacteristic readCharacteristic)100 BleDeviceMessageStream(@NonNull BlePeripheralManager blePeripheralManager, 101 @NonNull BluetoothDevice device, 102 @NonNull BluetoothGattCharacteristic writeCharacteristic, 103 @NonNull BluetoothGattCharacteristic readCharacteristic) { 104 mBlePeripheralManager = blePeripheralManager; 105 mDevice = device; 106 mWriteCharacteristic = writeCharacteristic; 107 mReadCharacteristic = readCharacteristic; 108 mBlePeripheralManager.addOnCharacteristicWriteListener(this::onCharacteristicWrite); 109 mBlePeripheralManager.addOnCharacteristicReadListener(this::onCharacteristicRead); 110 } 111 112 /** 113 * Writes the given message to the write characteristic of this stream with operation type 114 * {@code CLIENT_MESSAGE}. 115 * 116 * This method will handle the chunking of messages based on the max write size. 117 * 118 * @param deviceMessage The data object contains recipient, isPayloadEncrypted and message. 119 */ writeMessage(@onNull DeviceMessage deviceMessage)120 void writeMessage(@NonNull DeviceMessage deviceMessage) { 121 writeMessage(deviceMessage, OperationType.CLIENT_MESSAGE); 122 } 123 124 /** 125 * Writes the given message to the write characteristic of this stream. 126 * 127 * This method will handle the chunking of messages based on the max write size. If it is 128 * a handshake message, the message recipient should be {@code null} and it cannot be 129 * encrypted. 130 * 131 * @param deviceMessage The data object contains recipient, isPayloadEncrypted and message. 132 * @param operationType The {@link OperationType} of this message. 133 */ writeMessage(@onNull DeviceMessage deviceMessage, OperationType operationType)134 void writeMessage(@NonNull DeviceMessage deviceMessage, OperationType operationType) { 135 logd(TAG, "Writing message to device: " + mDevice.getAddress() + "."); 136 BleDeviceMessage.Builder builder = BleDeviceMessage.newBuilder() 137 .setOperation(operationType) 138 .setIsPayloadEncrypted(deviceMessage.isMessageEncrypted()) 139 .setPayload(ByteString.copyFrom(deviceMessage.getMessage())); 140 141 UUID recipient = deviceMessage.getRecipient(); 142 if (recipient != null) { 143 builder.setRecipient(ByteString.copyFrom(ByteUtils.uuidToBytes(recipient))); 144 } 145 146 BleDeviceMessage bleDeviceMessage = builder.build(); 147 byte[] rawBytes = bleDeviceMessage.toByteArray(); 148 List<BlePacket> blePackets; 149 try { 150 blePackets = BlePacketFactory.makeBlePackets(rawBytes, mMessageIdGenerator.next(), 151 mMaxWriteSize); 152 } catch (BlePacketFactoryException e) { 153 loge(TAG, "Error while creating message packets.", e); 154 return; 155 } 156 mPacketQueue.addAll(blePackets); 157 writeNextMessageInQueue(); 158 } 159 writeNextMessageInQueue()160 private void writeNextMessageInQueue() { 161 mHandler.postDelayed(() -> { 162 if (mPacketQueue.isEmpty()) { 163 logd(TAG, "No more packets to send."); 164 return; 165 } 166 if (mIsSendingInProgress.get()) { 167 logd(TAG, "Unable to send packet at this time."); 168 return; 169 } 170 171 mIsSendingInProgress.set(true); 172 BlePacket packet = mPacketQueue.remove(); 173 logd(TAG, "Writing packet " + packet.getPacketNumber() + " of " 174 + packet.getTotalPackets() + " for " + packet.getMessageId() + "."); 175 mWriteCharacteristic.setValue(packet.toByteArray()); 176 mBlePeripheralManager.notifyCharacteristicChanged(mDevice, mWriteCharacteristic, 177 /* confirm = */ false); 178 }, mThrottleDelay.get()); 179 } 180 onCharacteristicRead(@onNull BluetoothDevice device)181 private void onCharacteristicRead(@NonNull BluetoothDevice device) { 182 if (!mDevice.equals(device)) { 183 logw(TAG, "Received a read notification from a device (" + device.getAddress() 184 + ") that is not the expected device (" + mDevice.getAddress() + ") registered " 185 + "to this stream. Ignoring."); 186 return; 187 } 188 189 logd(TAG, "Releasing lock on characteristic."); 190 mIsSendingInProgress.set(false); 191 writeNextMessageInQueue(); 192 } 193 onCharacteristicWrite(@onNull BluetoothDevice device, @NonNull BluetoothGattCharacteristic characteristic, @NonNull byte[] value)194 private void onCharacteristicWrite(@NonNull BluetoothDevice device, 195 @NonNull BluetoothGattCharacteristic characteristic, @NonNull byte[] value) { 196 logd(TAG, "Received a message from a device (" + device.getAddress() + ")."); 197 if (!mDevice.equals(device)) { 198 logw(TAG, "Received a message from a device (" + device.getAddress() + ") that is not " 199 + "the expected device (" + mDevice.getAddress() + ") registered to this " 200 + "stream. Ignoring."); 201 return; 202 } 203 204 if (!characteristic.getUuid().equals(mReadCharacteristic.getUuid())) { 205 logw(TAG, "Received a write to a characteristic (" + characteristic.getUuid() + ") that" 206 + " is not the expected UUID (" + mReadCharacteristic.getUuid() + "). " 207 + "Ignoring."); 208 return; 209 } 210 211 if (!mIsVersionExchanged.get()) { 212 processVersionExchange(device, value); 213 return; 214 } 215 216 BlePacket packet; 217 try { 218 packet = BlePacket.parseFrom(value); 219 } catch (InvalidProtocolBufferException e) { 220 loge(TAG, "Can not parse Ble packet from client.", e); 221 if (mMessageReceivedErrorListener != null) { 222 mMessageReceivedErrorListener.onMessageReceivedError(e); 223 } 224 return; 225 } 226 processPacket(packet); 227 } 228 processVersionExchange(@onNull BluetoothDevice device, @NonNull byte[] value)229 private void processVersionExchange(@NonNull BluetoothDevice device, @NonNull byte[] value) { 230 BleVersionExchange versionExchange; 231 try { 232 versionExchange = BleVersionExchange.parseFrom(value); 233 } catch (InvalidProtocolBufferException e) { 234 loge(TAG, "Could not parse version exchange message", e); 235 if (mMessageReceivedErrorListener != null) { 236 mMessageReceivedErrorListener.onMessageReceivedError(e); 237 } 238 return; 239 } 240 int minMessagingVersion = versionExchange.getMinSupportedMessagingVersion(); 241 int maxMessagingVersion = versionExchange.getMaxSupportedMessagingVersion(); 242 int minSecurityVersion = versionExchange.getMinSupportedSecurityVersion(); 243 int maxSecurityVersion = versionExchange.getMaxSupportedSecurityVersion(); 244 if (minMessagingVersion > MESSAGING_VERSION || maxMessagingVersion < MESSAGING_VERSION 245 || minSecurityVersion > SECURITY_VERSION || maxSecurityVersion < SECURITY_VERSION) { 246 loge(TAG, "Unsupported message version for min " + minMessagingVersion + " and max " 247 + maxMessagingVersion + " or security version for " + minSecurityVersion 248 + " and max " + maxSecurityVersion + "."); 249 if (mMessageReceivedErrorListener != null) { 250 mMessageReceivedErrorListener.onMessageReceivedError( 251 new IllegalStateException("Unsupported version.")); 252 } 253 return; 254 } 255 256 BleVersionExchange headunitVersion = BleVersionExchange.newBuilder() 257 .setMinSupportedMessagingVersion(MESSAGING_VERSION) 258 .setMaxSupportedMessagingVersion(MESSAGING_VERSION) 259 .setMinSupportedSecurityVersion(SECURITY_VERSION) 260 .setMaxSupportedSecurityVersion(SECURITY_VERSION) 261 .build(); 262 mWriteCharacteristic.setValue(headunitVersion.toByteArray()); 263 mBlePeripheralManager.notifyCharacteristicChanged(device, mWriteCharacteristic, 264 /* confirm = */ false); 265 mIsVersionExchanged.set(true); 266 logd(TAG, "Sent supported version to the phone."); 267 } 268 269 @VisibleForTesting processPacket(@onNull BlePacket packet)270 void processPacket(@NonNull BlePacket packet) { 271 // Messages are coming in. Need to throttle outgoing messages to allow outgoing 272 // notifications to make it to the device. 273 mThrottleDelay.set(THROTTLE_WAIT_MS); 274 275 int messageId = packet.getMessageId(); 276 ByteArrayOutputStream currentPayloadStream = 277 mPendingData.getOrDefault(messageId, new ByteArrayOutputStream()); 278 mPendingData.putIfAbsent(messageId, currentPayloadStream); 279 280 byte[] payload = packet.getPayload().toByteArray(); 281 try { 282 currentPayloadStream.write(payload); 283 } catch (IOException e) { 284 loge(TAG, "Error writing packet to stream.", e); 285 if (mMessageReceivedErrorListener != null) { 286 mMessageReceivedErrorListener.onMessageReceivedError(e); 287 } 288 return; 289 } 290 logd(TAG, "Parsed packet " + packet.getPacketNumber() + " of " 291 + packet.getTotalPackets() + " for message " + messageId + ". Writing " 292 + payload.length + "."); 293 294 if (packet.getPacketNumber() != packet.getTotalPackets()) { 295 return; 296 } 297 298 byte[] messageBytes = currentPayloadStream.toByteArray(); 299 mPendingData.remove(messageId); 300 301 // All message packets received. Resetting throttle back to default until next message 302 // started. 303 mThrottleDelay.set(THROTTLE_DEFAULT_MS); 304 305 logd(TAG, "Received complete device message " + messageId + " of " + messageBytes.length 306 + " bytes."); 307 BleDeviceMessage message; 308 try { 309 message = BleDeviceMessage.parseFrom(messageBytes); 310 } catch (InvalidProtocolBufferException e) { 311 loge(TAG, "Cannot parse device message from client.", e); 312 if (mMessageReceivedErrorListener != null) { 313 mMessageReceivedErrorListener.onMessageReceivedError(e); 314 } 315 return; 316 } 317 318 DeviceMessage deviceMessage = new DeviceMessage( 319 ByteUtils.bytesToUUID(message.getRecipient().toByteArray()), 320 message.getIsPayloadEncrypted(), message.getPayload().toByteArray()); 321 if (mMessageReceivedListener != null) { 322 mMessageReceivedListener.onMessageReceived(deviceMessage, message.getOperation()); 323 } 324 } 325 326 /** The maximum amount of bytes that can be written over BLE. */ setMaxWriteSize(int maxWriteSize)327 void setMaxWriteSize(int maxWriteSize) { 328 mMaxWriteSize = maxWriteSize; 329 } 330 331 /** 332 * Set the given listener to be notified when a new message was received from the 333 * client. If listener is {@code null}, clear. 334 */ setMessageReceivedListener(@ullable MessageReceivedListener listener)335 void setMessageReceivedListener(@Nullable MessageReceivedListener listener) { 336 mMessageReceivedListener = listener; 337 } 338 339 /** 340 * Set the given listener to be notified when there was an error during receiving 341 * message from the client. If listener is {@code null}, clear. 342 */ setMessageReceivedErrorListener( @ullable MessageReceivedErrorListener listener)343 void setMessageReceivedErrorListener( 344 @Nullable MessageReceivedErrorListener listener) { 345 mMessageReceivedErrorListener = listener; 346 } 347 348 /** 349 * Listener to be invoked when a complete message is received from the client. 350 */ 351 interface MessageReceivedListener { 352 353 /** 354 * Called when a complete message is received from the client. 355 * 356 * @param deviceMessage The message received from the client. 357 * @param operationType The {@link OperationType} of the received message. 358 */ onMessageReceived(@onNull DeviceMessage deviceMessage, OperationType operationType)359 void onMessageReceived(@NonNull DeviceMessage deviceMessage, OperationType operationType); 360 } 361 362 /** 363 * Listener to be invoked when there was an error during receiving message from the client. 364 */ 365 interface MessageReceivedErrorListener { 366 /** 367 * Called when there was an error during receiving message from the client. 368 * 369 * @param exception The error. 370 */ onMessageReceivedError(@onNull Exception exception)371 void onMessageReceivedError(@NonNull Exception exception); 372 } 373 374 /** A generator of unique IDs for messages. */ 375 private static class MessageIdGenerator { 376 private final AtomicInteger mMessageId = new AtomicInteger(0); 377 next()378 int next() { 379 int current = mMessageId.getAndIncrement(); 380 mMessageId.compareAndSet(Integer.MAX_VALUE, 0); 381 return current; 382 } 383 } 384 } 385