1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.car.connecteddevice.ble;
18 
19 import static com.android.car.connecteddevice.BleStreamProtos.BleOperationProto.OperationType;
20 import static com.android.car.connecteddevice.BleStreamProtos.BlePacketProto.BlePacket;
21 import static com.android.car.connecteddevice.BleStreamProtos.VersionExchangeProto.BleVersionExchange;
22 import static com.android.car.connecteddevice.util.SafeLog.logd;
23 import static com.android.car.connecteddevice.util.SafeLog.loge;
24 import static com.android.car.connecteddevice.util.SafeLog.logw;
25 
26 import android.annotation.NonNull;
27 import android.annotation.Nullable;
28 import android.bluetooth.BluetoothDevice;
29 import android.bluetooth.BluetoothGattCharacteristic;
30 import android.os.Handler;
31 import android.os.Looper;
32 
33 import com.android.car.connecteddevice.BleStreamProtos.BleDeviceMessageProto.BleDeviceMessage;
34 import com.android.car.connecteddevice.util.ByteUtils;
35 import com.android.car.protobuf.ByteString;
36 import com.android.car.protobuf.InvalidProtocolBufferException;
37 import com.android.internal.annotations.VisibleForTesting;
38 
39 import java.io.ByteArrayOutputStream;
40 import java.io.IOException;
41 import java.util.ArrayDeque;
42 import java.util.HashMap;
43 import java.util.List;
44 import java.util.UUID;
45 import java.util.concurrent.atomic.AtomicBoolean;
46 import java.util.concurrent.atomic.AtomicInteger;
47 import java.util.concurrent.atomic.AtomicLong;
48 
49 /** BLE message stream to a device. */
50 class BleDeviceMessageStream {
51 
52     private static final String TAG = "BleDeviceMessageStream";
53 
54     // Only version 2 of the messaging and version 1 of the security supported.
55     private static final int MESSAGING_VERSION = 2;
56     private static final int SECURITY_VERSION = 1;
57 
58     /*
59      * During bandwidth testing, it was discovered that allowing the stream to send as fast as it
60      * can blocked outgoing notifications from being received by the connected device. Adding a
61      * throttle to the outgoing messages alleviated this block and allowed both sides to
62      * send/receive in parallel successfully.
63      */
64     private static final long THROTTLE_DEFAULT_MS = 10L;
65     private static final long THROTTLE_WAIT_MS = 75L;
66 
67     private final ArrayDeque<BlePacket> mPacketQueue = new ArrayDeque<>();
68 
69     private final HashMap<Integer, ByteArrayOutputStream> mPendingData =
70             new HashMap<>();
71 
72     private final MessageIdGenerator mMessageIdGenerator = new MessageIdGenerator();
73 
74     private final Handler mHandler = new Handler(Looper.getMainLooper());
75 
76     private final AtomicBoolean mIsVersionExchanged = new AtomicBoolean(false);
77 
78     private final AtomicBoolean mIsSendingInProgress = new AtomicBoolean(false);
79 
80     private final AtomicLong mThrottleDelay = new AtomicLong(THROTTLE_DEFAULT_MS);
81 
82     private final BlePeripheralManager mBlePeripheralManager;
83 
84     private final BluetoothDevice mDevice;
85 
86     private final BluetoothGattCharacteristic mWriteCharacteristic;
87 
88     private final BluetoothGattCharacteristic mReadCharacteristic;
89 
90     private MessageReceivedListener mMessageReceivedListener;
91 
92     private MessageReceivedErrorListener mMessageReceivedErrorListener;
93 
94     /*
95      * This initial value is 20 because BLE has a default write of 23 bytes. However, 3 bytes are
96      * subtracted due to bytes being reserved for the command type and attribute ID.
97      */
98     private int mMaxWriteSize = 20;
99 
BleDeviceMessageStream(@onNull BlePeripheralManager blePeripheralManager, @NonNull BluetoothDevice device, @NonNull BluetoothGattCharacteristic writeCharacteristic, @NonNull BluetoothGattCharacteristic readCharacteristic)100     BleDeviceMessageStream(@NonNull BlePeripheralManager blePeripheralManager,
101             @NonNull BluetoothDevice device,
102             @NonNull BluetoothGattCharacteristic writeCharacteristic,
103             @NonNull BluetoothGattCharacteristic readCharacteristic) {
104         mBlePeripheralManager = blePeripheralManager;
105         mDevice = device;
106         mWriteCharacteristic = writeCharacteristic;
107         mReadCharacteristic = readCharacteristic;
108         mBlePeripheralManager.addOnCharacteristicWriteListener(this::onCharacteristicWrite);
109         mBlePeripheralManager.addOnCharacteristicReadListener(this::onCharacteristicRead);
110     }
111 
112     /**
113      * Writes the given message to the write characteristic of this stream with operation type
114      * {@code CLIENT_MESSAGE}.
115      *
116      * This method will handle the chunking of messages based on the max write size.
117      *
118      * @param deviceMessage The data object contains recipient, isPayloadEncrypted and message.
119      */
writeMessage(@onNull DeviceMessage deviceMessage)120     void writeMessage(@NonNull DeviceMessage deviceMessage) {
121         writeMessage(deviceMessage, OperationType.CLIENT_MESSAGE);
122     }
123 
124     /**
125      * Writes the given message to the write characteristic of this stream.
126      *
127      * This method will handle the chunking of messages based on the max write size. If it is
128      * a handshake message, the message recipient should be {@code null} and it cannot be
129      * encrypted.
130      *
131      * @param deviceMessage The data object contains recipient, isPayloadEncrypted and message.
132      * @param operationType The {@link OperationType} of this message.
133      */
writeMessage(@onNull DeviceMessage deviceMessage, OperationType operationType)134     void writeMessage(@NonNull DeviceMessage deviceMessage, OperationType operationType) {
135         logd(TAG, "Writing message to device: " + mDevice.getAddress() + ".");
136         BleDeviceMessage.Builder builder = BleDeviceMessage.newBuilder()
137                 .setOperation(operationType)
138                 .setIsPayloadEncrypted(deviceMessage.isMessageEncrypted())
139                 .setPayload(ByteString.copyFrom(deviceMessage.getMessage()));
140 
141         UUID recipient = deviceMessage.getRecipient();
142         if (recipient != null) {
143             builder.setRecipient(ByteString.copyFrom(ByteUtils.uuidToBytes(recipient)));
144         }
145 
146         BleDeviceMessage bleDeviceMessage = builder.build();
147         byte[] rawBytes = bleDeviceMessage.toByteArray();
148         List<BlePacket> blePackets;
149         try {
150             blePackets = BlePacketFactory.makeBlePackets(rawBytes, mMessageIdGenerator.next(),
151                     mMaxWriteSize);
152         } catch (BlePacketFactoryException e) {
153             loge(TAG, "Error while creating message packets.", e);
154             return;
155         }
156         mPacketQueue.addAll(blePackets);
157         writeNextMessageInQueue();
158     }
159 
writeNextMessageInQueue()160     private void writeNextMessageInQueue() {
161         mHandler.postDelayed(() -> {
162             if (mPacketQueue.isEmpty()) {
163                 logd(TAG, "No more packets to send.");
164                 return;
165             }
166             if (mIsSendingInProgress.get()) {
167                 logd(TAG, "Unable to send packet at this time.");
168                 return;
169             }
170 
171             mIsSendingInProgress.set(true);
172             BlePacket packet = mPacketQueue.remove();
173             logd(TAG, "Writing packet " + packet.getPacketNumber() + " of "
174                     + packet.getTotalPackets() + " for " + packet.getMessageId() + ".");
175             mWriteCharacteristic.setValue(packet.toByteArray());
176             mBlePeripheralManager.notifyCharacteristicChanged(mDevice, mWriteCharacteristic,
177                     /* confirm = */ false);
178         }, mThrottleDelay.get());
179     }
180 
onCharacteristicRead(@onNull BluetoothDevice device)181     private void onCharacteristicRead(@NonNull BluetoothDevice device) {
182         if (!mDevice.equals(device)) {
183             logw(TAG, "Received a read notification from a device (" + device.getAddress()
184                     + ") that is not the expected device (" + mDevice.getAddress() + ") registered "
185                     + "to this stream. Ignoring.");
186             return;
187         }
188 
189         logd(TAG, "Releasing lock on characteristic.");
190         mIsSendingInProgress.set(false);
191         writeNextMessageInQueue();
192     }
193 
onCharacteristicWrite(@onNull BluetoothDevice device, @NonNull BluetoothGattCharacteristic characteristic, @NonNull byte[] value)194     private void onCharacteristicWrite(@NonNull BluetoothDevice device,
195             @NonNull BluetoothGattCharacteristic characteristic, @NonNull byte[] value) {
196         logd(TAG, "Received a message from a device (" + device.getAddress() + ").");
197         if (!mDevice.equals(device)) {
198             logw(TAG, "Received a message from a device (" + device.getAddress() + ") that is not "
199                     + "the expected device (" + mDevice.getAddress() + ") registered to this "
200                     + "stream. Ignoring.");
201             return;
202         }
203 
204         if (!characteristic.getUuid().equals(mReadCharacteristic.getUuid())) {
205             logw(TAG, "Received a write to a characteristic (" + characteristic.getUuid() + ") that"
206                     + " is not the expected UUID (" + mReadCharacteristic.getUuid() + "). "
207                     + "Ignoring.");
208             return;
209         }
210 
211         if (!mIsVersionExchanged.get()) {
212             processVersionExchange(device, value);
213             return;
214         }
215 
216         BlePacket packet;
217         try {
218             packet = BlePacket.parseFrom(value);
219         } catch (InvalidProtocolBufferException e) {
220             loge(TAG, "Can not parse Ble packet from client.", e);
221             if (mMessageReceivedErrorListener != null) {
222                 mMessageReceivedErrorListener.onMessageReceivedError(e);
223             }
224             return;
225         }
226         processPacket(packet);
227     }
228 
processVersionExchange(@onNull BluetoothDevice device, @NonNull byte[] value)229     private void processVersionExchange(@NonNull BluetoothDevice device, @NonNull byte[] value) {
230         BleVersionExchange versionExchange;
231         try {
232             versionExchange = BleVersionExchange.parseFrom(value);
233         } catch (InvalidProtocolBufferException e) {
234             loge(TAG, "Could not parse version exchange message", e);
235             if (mMessageReceivedErrorListener != null) {
236                 mMessageReceivedErrorListener.onMessageReceivedError(e);
237             }
238             return;
239         }
240         int minMessagingVersion = versionExchange.getMinSupportedMessagingVersion();
241         int maxMessagingVersion = versionExchange.getMaxSupportedMessagingVersion();
242         int minSecurityVersion = versionExchange.getMinSupportedSecurityVersion();
243         int maxSecurityVersion = versionExchange.getMaxSupportedSecurityVersion();
244         if (minMessagingVersion > MESSAGING_VERSION || maxMessagingVersion < MESSAGING_VERSION
245                 || minSecurityVersion > SECURITY_VERSION || maxSecurityVersion < SECURITY_VERSION) {
246             loge(TAG, "Unsupported message version for min " + minMessagingVersion + " and max "
247                     + maxMessagingVersion + " or security version for " + minSecurityVersion
248                     + " and max " + maxSecurityVersion + ".");
249             if (mMessageReceivedErrorListener != null) {
250                 mMessageReceivedErrorListener.onMessageReceivedError(
251                         new IllegalStateException("Unsupported version."));
252             }
253             return;
254         }
255 
256         BleVersionExchange headunitVersion = BleVersionExchange.newBuilder()
257                 .setMinSupportedMessagingVersion(MESSAGING_VERSION)
258                 .setMaxSupportedMessagingVersion(MESSAGING_VERSION)
259                 .setMinSupportedSecurityVersion(SECURITY_VERSION)
260                 .setMaxSupportedSecurityVersion(SECURITY_VERSION)
261                 .build();
262         mWriteCharacteristic.setValue(headunitVersion.toByteArray());
263         mBlePeripheralManager.notifyCharacteristicChanged(device, mWriteCharacteristic,
264                 /* confirm = */ false);
265         mIsVersionExchanged.set(true);
266         logd(TAG, "Sent supported version to the phone.");
267     }
268 
269     @VisibleForTesting
processPacket(@onNull BlePacket packet)270     void processPacket(@NonNull BlePacket packet) {
271         // Messages are coming in. Need to throttle outgoing messages to allow outgoing
272         // notifications to make it to the device.
273         mThrottleDelay.set(THROTTLE_WAIT_MS);
274 
275         int messageId = packet.getMessageId();
276         ByteArrayOutputStream currentPayloadStream =
277                 mPendingData.getOrDefault(messageId, new ByteArrayOutputStream());
278         mPendingData.putIfAbsent(messageId, currentPayloadStream);
279 
280         byte[] payload = packet.getPayload().toByteArray();
281         try {
282             currentPayloadStream.write(payload);
283         } catch (IOException e) {
284             loge(TAG, "Error writing packet to stream.", e);
285             if (mMessageReceivedErrorListener != null) {
286                 mMessageReceivedErrorListener.onMessageReceivedError(e);
287             }
288             return;
289         }
290         logd(TAG, "Parsed packet " + packet.getPacketNumber() + " of "
291                 + packet.getTotalPackets() + " for message " + messageId + ". Writing "
292                 + payload.length + ".");
293 
294         if (packet.getPacketNumber() != packet.getTotalPackets()) {
295             return;
296         }
297 
298         byte[] messageBytes = currentPayloadStream.toByteArray();
299         mPendingData.remove(messageId);
300 
301         // All message packets received. Resetting throttle back to default until next message
302         // started.
303         mThrottleDelay.set(THROTTLE_DEFAULT_MS);
304 
305         logd(TAG, "Received complete device message " + messageId + " of " + messageBytes.length
306                 + " bytes.");
307         BleDeviceMessage message;
308         try {
309             message = BleDeviceMessage.parseFrom(messageBytes);
310         } catch (InvalidProtocolBufferException e) {
311             loge(TAG, "Cannot parse device message from client.", e);
312             if (mMessageReceivedErrorListener != null) {
313                 mMessageReceivedErrorListener.onMessageReceivedError(e);
314             }
315             return;
316         }
317 
318         DeviceMessage deviceMessage = new DeviceMessage(
319                 ByteUtils.bytesToUUID(message.getRecipient().toByteArray()),
320                 message.getIsPayloadEncrypted(), message.getPayload().toByteArray());
321         if (mMessageReceivedListener != null) {
322             mMessageReceivedListener.onMessageReceived(deviceMessage, message.getOperation());
323         }
324     }
325 
326     /** The maximum amount of bytes that can be written over BLE. */
setMaxWriteSize(int maxWriteSize)327     void setMaxWriteSize(int maxWriteSize) {
328         mMaxWriteSize = maxWriteSize;
329     }
330 
331     /**
332      * Set the given listener to be notified when a new message was received from the
333      * client. If listener is {@code null}, clear.
334      */
setMessageReceivedListener(@ullable MessageReceivedListener listener)335     void setMessageReceivedListener(@Nullable MessageReceivedListener listener) {
336         mMessageReceivedListener = listener;
337     }
338 
339     /**
340      * Set the given listener to be notified when there was an error during receiving
341      * message from the client. If listener is {@code null}, clear.
342      */
setMessageReceivedErrorListener( @ullable MessageReceivedErrorListener listener)343     void setMessageReceivedErrorListener(
344             @Nullable MessageReceivedErrorListener listener) {
345         mMessageReceivedErrorListener = listener;
346     }
347 
348     /**
349      * Listener to be invoked when a complete message is received from the client.
350      */
351     interface MessageReceivedListener {
352 
353         /**
354          * Called when a complete message is received from the client.
355          *
356          * @param deviceMessage The message received from the client.
357          * @param operationType The {@link OperationType} of the received message.
358          */
onMessageReceived(@onNull DeviceMessage deviceMessage, OperationType operationType)359         void onMessageReceived(@NonNull DeviceMessage deviceMessage, OperationType operationType);
360     }
361 
362     /**
363      * Listener to be invoked when there was an error during receiving message from the client.
364      */
365     interface MessageReceivedErrorListener {
366         /**
367          * Called when there was an error during receiving message from the client.
368          *
369          * @param exception The error.
370          */
onMessageReceivedError(@onNull Exception exception)371         void onMessageReceivedError(@NonNull Exception exception);
372     }
373 
374     /** A generator of unique IDs for messages. */
375     private static class MessageIdGenerator {
376         private final AtomicInteger mMessageId = new AtomicInteger(0);
377 
next()378         int next() {
379             int current = mMessageId.getAndIncrement();
380             mMessageId.compareAndSet(Integer.MAX_VALUE, 0);
381             return current;
382         }
383     }
384 }
385