1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef MY_HANDLER_H_
18
19 #define MY_HANDLER_H_
20
21 //#define LOG_NDEBUG 0
22
23 #ifndef LOG_TAG
24 #define LOG_TAG "MyHandler"
25 #endif
26
27 #include <utils/Log.h>
28 #include <cutils/properties.h> // for property_get
29
30 #include "APacketSource.h"
31 #include "ARTPConnection.h"
32 #include "ARTSPConnection.h"
33 #include "ASessionDescription.h"
34 #include "NetworkUtils.h"
35
36 #include <ctype.h>
37 #include <cutils/properties.h>
38
39 #include <datasource/HTTPBase.h>
40 #include <media/stagefright/foundation/ABuffer.h>
41 #include <media/stagefright/foundation/ADebug.h>
42 #include <media/stagefright/foundation/ALooper.h>
43 #include <media/stagefright/foundation/AMessage.h>
44 #include <media/stagefright/MediaErrors.h>
45 #include <media/stagefright/Utils.h>
46 #include <media/stagefright/FoundationUtils.h>
47
48 #include <arpa/inet.h>
49 #include <sys/socket.h>
50 #include <netdb.h>
51
52
53 #if LOG_NDEBUG
54 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
55 #else
56 #define UNUSED_UNLESS_VERBOSE(x)
57 #endif
58
59 #ifndef FALLTHROUGH_INTENDED
60 #define FALLTHROUGH_INTENDED [[clang::fallthrough]] // NOLINT
61 #endif
62
63 // If no access units are received within 5 secs, assume that the rtp
64 // stream has ended and signal end of stream.
65 static int64_t kAccessUnitTimeoutUs = 10000000ll;
66
67 // If no access units arrive for the first 10 secs after starting the
68 // stream, assume none ever will and signal EOS or switch transports.
69 static int64_t kStartupTimeoutUs = 10000000ll;
70
71 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
72
73 static int64_t kPauseDelayUs = 3000000ll;
74
75 // The allowed maximum number of stale access units at the beginning of
76 // a new sequence.
77 static int32_t kMaxAllowedStaleAccessUnits = 20;
78
79 static int64_t kTearDownTimeoutUs = 3000000ll;
80
81 namespace android {
82
GetAttribute(const char * s,const char * key,AString * value)83 static bool GetAttribute(const char *s, const char *key, AString *value) {
84 value->clear();
85
86 size_t keyLen = strlen(key);
87
88 for (;;) {
89 while (isspace(*s)) {
90 ++s;
91 }
92
93 const char *colonPos = strchr(s, ';');
94
95 size_t len =
96 (colonPos == NULL) ? strlen(s) : colonPos - s;
97
98 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
99 value->setTo(&s[keyLen + 1], len - keyLen - 1);
100 return true;
101 }
102
103 if (colonPos == NULL) {
104 return false;
105 }
106
107 s = colonPos + 1;
108 }
109 }
110
111 struct MyHandler : public AHandler {
112 enum {
113 kWhatConnected = 'conn',
114 kWhatDisconnected = 'disc',
115 kWhatSeekPaused = 'spau',
116 kWhatSeekDone = 'sdon',
117
118 kWhatAccessUnit = 'accU',
119 kWhatEOS = 'eos!',
120 kWhatSeekDiscontinuity = 'seeD',
121 kWhatNormalPlayTimeMapping = 'nptM',
122 };
123
124 MyHandler(
125 const char *url,
126 const sp<AMessage> ¬ify,
127 bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler128 : mNotify(notify),
129 mUIDValid(uidValid),
130 mUID(uid),
131 mNetLooper(new ALooper),
132 mConn(new ARTSPConnection(mUIDValid, mUID)),
133 mRTPConn(new ARTPConnection),
134 mOriginalSessionURL(url),
135 mSessionURL(url),
136 mSetupTracksSuccessful(false),
137 mSeekPending(false),
138 mFirstAccessUnit(true),
139 mAllTracksHaveTime(false),
140 mNTPAnchorUs(-1),
141 mMediaAnchorUs(-1),
142 mLastMediaTimeUs(0),
143 mNumAccessUnitsReceived(0),
144 mCheckPending(false),
145 mCheckGeneration(0),
146 mCheckTimeoutGeneration(0),
147 mTryTCPInterleaving(property_get_bool("rtp.transport.TCP", false)),
148 mTryFakeRTCP(false),
149 mReceivedFirstRTCPPacket(false),
150 mReceivedFirstRTPPacket(false),
151 mSeekable(true),
152 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
153 mKeepAliveGeneration(0),
154 mPausing(false),
155 mPauseGeneration(0),
156 mPlayResponseParsed(false) {
157 mNetLooper->setName("rtsp net");
158 mNetLooper->start(false /* runOnCallingThread */,
159 false /* canCallJava */,
160 PRIORITY_HIGHEST);
161
162 // Strip any authentication info from the session url, we don't
163 // want to transmit user/pass in cleartext.
164 AString host, path, user, pass;
165 unsigned port;
166 CHECK(ARTSPConnection::ParseURL(
167 mSessionURL.c_str(), &host, &port, &path, &user, &pass));
168
169 if (user.size() > 0) {
170 mSessionURL.clear();
171 mSessionURL.append("rtsp://");
172 mSessionURL.append(host);
173 mSessionURL.append(":");
174 mSessionURL.append(AStringPrintf("%u", port));
175 mSessionURL.append(path);
176
177 ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
178 }
179
180 mSessionHost = host;
181 }
182
connectMyHandler183 void connect() {
184 looper()->registerHandler(mConn);
185 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
186
187 sp<AMessage> notify = new AMessage('biny', this);
188 mConn->observeBinaryData(notify);
189
190 sp<AMessage> reply = new AMessage('conn', this);
191 mConn->connect(mOriginalSessionURL.c_str(), reply);
192 }
193
loadSDPMyHandler194 void loadSDP(const sp<ASessionDescription>& desc) {
195 looper()->registerHandler(mConn);
196 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
197
198 sp<AMessage> notify = new AMessage('biny', this);
199 mConn->observeBinaryData(notify);
200
201 sp<AMessage> reply = new AMessage('sdpl', this);
202 reply->setObject("description", desc);
203 mConn->connect(mOriginalSessionURL.c_str(), reply);
204 }
205
getControlURLMyHandler206 AString getControlURL() {
207 AString sessionLevelControlURL;
208 if (mSessionDesc->findAttribute(
209 0,
210 "a=control",
211 &sessionLevelControlURL)) {
212 if (sessionLevelControlURL.compare("*") == 0) {
213 return mBaseURL;
214 } else {
215 AString controlURL;
216 CHECK(MakeURL(
217 mBaseURL.c_str(),
218 sessionLevelControlURL.c_str(),
219 &controlURL));
220 return controlURL;
221 }
222 } else {
223 return mSessionURL;
224 }
225 }
226
disconnectMyHandler227 void disconnect() {
228 (new AMessage('abor', this))->post();
229 }
230
seekMyHandler231 void seek(int64_t timeUs) {
232 sp<AMessage> msg = new AMessage('seek', this);
233 msg->setInt64("time", timeUs);
234 mPauseGeneration++;
235 msg->post();
236 }
237
continueSeekAfterPauseMyHandler238 void continueSeekAfterPause(int64_t timeUs) {
239 sp<AMessage> msg = new AMessage('see1', this);
240 msg->setInt64("time", timeUs);
241 msg->post();
242 }
243
isSeekableMyHandler244 bool isSeekable() const {
245 return mSeekable;
246 }
247
pauseMyHandler248 void pause() {
249 sp<AMessage> msg = new AMessage('paus', this);
250 mPauseGeneration++;
251 msg->setInt32("pausecheck", mPauseGeneration);
252 msg->post();
253 }
254
resumeMyHandler255 void resume() {
256 sp<AMessage> msg = new AMessage('resu', this);
257 mPauseGeneration++;
258 msg->post();
259 }
260
addRRMyHandler261 static void addRR(const sp<ABuffer> &buf) {
262 uint8_t *ptr = buf->data() + buf->size();
263 ptr[0] = 0x80 | 0;
264 ptr[1] = 201; // RR
265 ptr[2] = 0;
266 ptr[3] = 1;
267 ptr[4] = 0xde; // SSRC
268 ptr[5] = 0xad;
269 ptr[6] = 0xbe;
270 ptr[7] = 0xef;
271
272 buf->setRange(0, buf->size() + 8);
273 }
274
addSDESMyHandler275 static void addSDES(int s, const sp<ABuffer> &buffer) {
276 struct sockaddr_in addr;
277 socklen_t addrSize = sizeof(addr);
278 if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
279 inet_aton("0.0.0.0", &(addr.sin_addr));
280 }
281
282 uint8_t *data = buffer->data() + buffer->size();
283 data[0] = 0x80 | 1;
284 data[1] = 202; // SDES
285 data[4] = 0xde; // SSRC
286 data[5] = 0xad;
287 data[6] = 0xbe;
288 data[7] = 0xef;
289
290 size_t offset = 8;
291
292 data[offset++] = 1; // CNAME
293
294 AString cname = "stagefright@";
295 cname.append(inet_ntoa(addr.sin_addr));
296 data[offset++] = cname.size();
297
298 memcpy(&data[offset], cname.c_str(), cname.size());
299 offset += cname.size();
300
301 data[offset++] = 6; // TOOL
302
303 AString tool = MakeUserAgent();
304
305 data[offset++] = tool.size();
306
307 memcpy(&data[offset], tool.c_str(), tool.size());
308 offset += tool.size();
309
310 data[offset++] = 0;
311
312 if ((offset % 4) > 0) {
313 size_t count = 4 - (offset % 4);
314 switch (count) {
315 case 3:
316 data[offset++] = 0;
317 FALLTHROUGH_INTENDED;
318 case 2:
319 data[offset++] = 0;
320 FALLTHROUGH_INTENDED;
321 case 1:
322 data[offset++] = 0;
323 }
324 }
325
326 size_t numWords = (offset / 4) - 1;
327 data[2] = numWords >> 8;
328 data[3] = numWords & 0xff;
329
330 buffer->setRange(buffer->offset(), buffer->size() + offset);
331 }
332
333 // In case we're behind NAT, fire off two UDP packets to the remote
334 // rtp/rtcp ports to poke a hole into the firewall for future incoming
335 // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler336 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
337 struct sockaddr_in addr;
338 memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
339 addr.sin_family = AF_INET;
340
341 AString source;
342 AString server_port;
343 if (!GetAttribute(transport.c_str(),
344 "source",
345 &source)) {
346 ALOGW("Missing 'source' field in Transport response. Using "
347 "RTSP endpoint address.");
348
349 struct hostent *ent = gethostbyname(mSessionHost.c_str());
350 if (ent == NULL) {
351 ALOGE("Failed to look up address of session host");
352
353 return false;
354 }
355
356 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
357 } else {
358 addr.sin_addr.s_addr = inet_addr(source.c_str());
359 }
360
361 if (!GetAttribute(transport.c_str(),
362 "server_port",
363 &server_port)) {
364 ALOGI("Missing 'server_port' field in Transport response.");
365 return false;
366 }
367
368 int rtpPort, rtcpPort;
369 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
370 || rtpPort <= 0 || rtpPort > 65535
371 || rtcpPort <=0 || rtcpPort > 65535
372 || rtcpPort != rtpPort + 1) {
373 ALOGE("Server picked invalid RTP/RTCP port pair %s,"
374 " RTP port must be even, RTCP port must be one higher.",
375 server_port.c_str());
376
377 return false;
378 }
379
380 if (rtpPort & 1) {
381 ALOGW("Server picked an odd RTP port, it should've picked an "
382 "even one, we'll let it pass for now, but this may break "
383 "in the future.");
384 }
385
386 if (addr.sin_addr.s_addr == INADDR_NONE) {
387 return true;
388 }
389
390 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
391 // No firewalls to traverse on the loopback interface.
392 return true;
393 }
394
395 // Make up an RR/SDES RTCP packet.
396 sp<ABuffer> buf = new ABuffer(65536);
397 buf->setRange(0, 0);
398 addRR(buf);
399 addSDES(rtpSocket, buf);
400
401 addr.sin_port = htons(rtpPort);
402
403 ssize_t n = sendto(
404 rtpSocket, buf->data(), buf->size(), 0,
405 (const sockaddr *)&addr, sizeof(addr));
406
407 if (n < (ssize_t)buf->size()) {
408 ALOGE("failed to poke a hole for RTP packets");
409 return false;
410 }
411
412 addr.sin_port = htons(rtcpPort);
413
414 n = sendto(
415 rtcpSocket, buf->data(), buf->size(), 0,
416 (const sockaddr *)&addr, sizeof(addr));
417
418 if (n < (ssize_t)buf->size()) {
419 ALOGE("failed to poke a hole for RTCP packets");
420 return false;
421 }
422
423 ALOGV("successfully poked holes.");
424
425 return true;
426 }
427
isLiveStreamMyHandler428 static bool isLiveStream(const sp<ASessionDescription> &desc) {
429 AString attrLiveStream;
430 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
431 ssize_t semicolonPos = attrLiveStream.find(";", 2);
432
433 const char* liveStreamValue;
434 if (semicolonPos < 0) {
435 liveStreamValue = attrLiveStream.c_str();
436 } else {
437 AString valString;
438 valString.setTo(attrLiveStream,
439 semicolonPos + 1,
440 attrLiveStream.size() - semicolonPos - 1);
441 liveStreamValue = valString.c_str();
442 }
443
444 uint32_t value = strtoul(liveStreamValue, NULL, 10);
445 if (value == 1) {
446 ALOGV("found live stream");
447 return true;
448 }
449 } else {
450 // It is a live stream if no duration is returned
451 int64_t durationUs;
452 if (!desc->getDurationUs(&durationUs)) {
453 ALOGV("No duration found, assume live stream");
454 return true;
455 }
456 }
457
458 return false;
459 }
460
onMessageReceivedMyHandler461 virtual void onMessageReceived(const sp<AMessage> &msg) {
462 switch (msg->what()) {
463 case 'conn':
464 {
465 int32_t result;
466 CHECK(msg->findInt32("result", &result));
467
468 ALOGI("connection request completed with result %d (%s)",
469 result, strerror(-result));
470
471 if (result == OK) {
472 AString request;
473 request = "DESCRIBE ";
474 request.append(mSessionURL);
475 request.append(" RTSP/1.0\r\n");
476 request.append("Accept: application/sdp\r\n");
477 request.append("\r\n");
478
479 sp<AMessage> reply = new AMessage('desc', this);
480 mConn->sendRequest(request.c_str(), reply);
481 } else {
482 (new AMessage('disc', this))->post();
483 }
484 break;
485 }
486
487 case 'disc':
488 {
489 ++mKeepAliveGeneration;
490
491 int32_t reconnect;
492 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
493 sp<AMessage> reply = new AMessage('conn', this);
494 mConn->connect(mOriginalSessionURL.c_str(), reply);
495 } else {
496 (new AMessage('quit', this))->post();
497 }
498 break;
499 }
500
501 case 'desc':
502 {
503 int32_t result;
504 CHECK(msg->findInt32("result", &result));
505
506 ALOGI("DESCRIBE completed with result %d (%s)",
507 result, strerror(-result));
508
509 if (result == OK) {
510 sp<RefBase> obj;
511 CHECK(msg->findObject("response", &obj));
512 sp<ARTSPResponse> response =
513 static_cast<ARTSPResponse *>(obj.get());
514
515 if (response->mStatusCode == 301 || response->mStatusCode == 302) {
516 ssize_t i = response->mHeaders.indexOfKey("location");
517 CHECK_GE(i, 0);
518
519 mOriginalSessionURL = response->mHeaders.valueAt(i);
520 mSessionURL = mOriginalSessionURL;
521
522 // Strip any authentication info from the session url, we don't
523 // want to transmit user/pass in cleartext.
524 AString host, path, user, pass;
525 unsigned port;
526 if (ARTSPConnection::ParseURL(
527 mSessionURL.c_str(), &host, &port, &path, &user, &pass)
528 && user.size() > 0) {
529 mSessionURL.clear();
530 mSessionURL.append("rtsp://");
531 mSessionURL.append(host);
532 mSessionURL.append(":");
533 mSessionURL.append(AStringPrintf("%u", port));
534 mSessionURL.append(path);
535
536 ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
537 }
538
539 sp<AMessage> reply = new AMessage('conn', this);
540 mConn->connect(mOriginalSessionURL.c_str(), reply);
541 break;
542 }
543
544 if (response->mStatusCode != 200) {
545 result = UNKNOWN_ERROR;
546 } else if (response->mContent == NULL) {
547 result = ERROR_MALFORMED;
548 ALOGE("The response has no content.");
549 } else {
550 mSessionDesc = new ASessionDescription;
551
552 mSessionDesc->setTo(
553 response->mContent->data(),
554 response->mContent->size());
555
556 if (!mSessionDesc->isValid()) {
557 ALOGE("Failed to parse session description.");
558 result = ERROR_MALFORMED;
559 } else {
560 ssize_t i = response->mHeaders.indexOfKey("content-base");
561 if (i >= 0) {
562 mBaseURL = response->mHeaders.valueAt(i);
563 } else {
564 i = response->mHeaders.indexOfKey("content-location");
565 if (i >= 0) {
566 mBaseURL = response->mHeaders.valueAt(i);
567 } else {
568 mBaseURL = mSessionURL;
569 }
570 }
571
572 mSeekable = !isLiveStream(mSessionDesc);
573
574 if (!mBaseURL.startsWith("rtsp://")) {
575 // Some misbehaving servers specify a relative
576 // URL in one of the locations above, combine
577 // it with the absolute session URL to get
578 // something usable...
579
580 ALOGW("Server specified a non-absolute base URL"
581 ", combining it with the session URL to "
582 "get something usable...");
583
584 AString tmp;
585 CHECK(MakeURL(
586 mSessionURL.c_str(),
587 mBaseURL.c_str(),
588 &tmp));
589
590 mBaseURL = tmp;
591 }
592
593 mControlURL = getControlURL();
594
595 if (mSessionDesc->countTracks() < 2) {
596 // There's no actual tracks in this session.
597 // The first "track" is merely session meta
598 // data.
599
600 ALOGW("Session doesn't contain any playable "
601 "tracks. Aborting.");
602 result = ERROR_UNSUPPORTED;
603 } else {
604 setupTrack(1);
605 }
606 }
607 }
608 }
609
610 if (result != OK) {
611 sp<AMessage> reply = new AMessage('disc', this);
612 mConn->disconnect(reply);
613 }
614 break;
615 }
616
617 case 'sdpl':
618 {
619 int32_t result;
620 CHECK(msg->findInt32("result", &result));
621
622 ALOGI("SDP connection request completed with result %d (%s)",
623 result, strerror(-result));
624
625 if (result == OK) {
626 sp<RefBase> obj;
627 CHECK(msg->findObject("description", &obj));
628 mSessionDesc =
629 static_cast<ASessionDescription *>(obj.get());
630
631 if (!mSessionDesc->isValid()) {
632 ALOGE("Failed to parse session description.");
633 result = ERROR_MALFORMED;
634 } else {
635 mBaseURL = mSessionURL;
636
637 mSeekable = !isLiveStream(mSessionDesc);
638
639 mControlURL = getControlURL();
640
641 if (mSessionDesc->countTracks() < 2) {
642 // There's no actual tracks in this session.
643 // The first "track" is merely session meta
644 // data.
645
646 ALOGW("Session doesn't contain any playable "
647 "tracks. Aborting.");
648 result = ERROR_UNSUPPORTED;
649 } else {
650 setupTrack(1);
651 }
652 }
653 }
654
655 if (result != OK) {
656 sp<AMessage> reply = new AMessage('disc', this);
657 mConn->disconnect(reply);
658 }
659 break;
660 }
661
662 case 'setu':
663 {
664 size_t index;
665 CHECK(msg->findSize("index", &index));
666
667 TrackInfo *track = NULL;
668 size_t trackIndex;
669 if (msg->findSize("track-index", &trackIndex)) {
670 track = &mTracks.editItemAt(trackIndex);
671 }
672
673 int32_t result;
674 CHECK(msg->findInt32("result", &result));
675
676 ALOGI("SETUP(%zu) completed with result %d (%s)",
677 index, result, strerror(-result));
678
679 if (result == OK) {
680 CHECK(track != NULL);
681
682 sp<RefBase> obj;
683 CHECK(msg->findObject("response", &obj));
684 sp<ARTSPResponse> response =
685 static_cast<ARTSPResponse *>(obj.get());
686
687 if (response->mStatusCode != 200) {
688 result = UNKNOWN_ERROR;
689 } else {
690 ssize_t i = response->mHeaders.indexOfKey("session");
691 CHECK_GE(i, 0);
692
693 mSessionID = response->mHeaders.valueAt(i);
694
695 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
696 AString timeoutStr;
697 if (GetAttribute(
698 mSessionID.c_str(), "timeout", &timeoutStr)) {
699 char *end;
700 unsigned long timeoutSecs =
701 strtoul(timeoutStr.c_str(), &end, 10);
702
703 if (end == timeoutStr.c_str() || *end != '\0') {
704 ALOGW("server specified malformed timeout '%s'",
705 timeoutStr.c_str());
706
707 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
708 } else if (timeoutSecs < 15) {
709 ALOGW("server specified too short a timeout "
710 "(%lu secs), using default.",
711 timeoutSecs);
712
713 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
714 } else {
715 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
716
717 ALOGI("server specified timeout of %lu secs.",
718 timeoutSecs);
719 }
720 }
721
722 i = mSessionID.find(";");
723 if (i >= 0) {
724 // Remove options, i.e. ";timeout=90"
725 mSessionID.erase(i, mSessionID.size() - i);
726 }
727
728 sp<AMessage> notify = new AMessage('accu', this);
729 notify->setSize("track-index", trackIndex);
730
731 i = response->mHeaders.indexOfKey("transport");
732 CHECK_GE(i, 0);
733
734 if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
735 if (!track->mUsingInterleavedTCP) {
736 AString transport = response->mHeaders.valueAt(i);
737
738 // We are going to continue even if we were
739 // unable to poke a hole into the firewall...
740 pokeAHole(
741 track->mRTPSocket,
742 track->mRTCPSocket,
743 transport);
744 }
745
746 mRTPConn->addStream(
747 track->mRTPSocket, track->mRTCPSocket,
748 mSessionDesc, index,
749 notify, track->mUsingInterleavedTCP);
750
751 mSetupTracksSuccessful = true;
752 } else {
753 result = BAD_VALUE;
754 }
755 }
756 }
757
758 if (result != OK) {
759 if (track) {
760 if (!track->mUsingInterleavedTCP) {
761 // Clear the tag
762 if (mUIDValid) {
763 NetworkUtils::UnRegisterSocketUserTag(track->mRTPSocket);
764 NetworkUtils::UnRegisterSocketUserMark(track->mRTPSocket);
765 NetworkUtils::UnRegisterSocketUserTag(track->mRTCPSocket);
766 NetworkUtils::UnRegisterSocketUserMark(track->mRTCPSocket);
767 }
768
769 close(track->mRTPSocket);
770 close(track->mRTCPSocket);
771 }
772
773 mTracks.removeItemsAt(trackIndex);
774 }
775 }
776
777 ++index;
778 if (result == OK && index < mSessionDesc->countTracks()) {
779 setupTrack(index);
780 } else if (mSetupTracksSuccessful) {
781 ++mKeepAliveGeneration;
782 postKeepAlive();
783
784 AString request = "PLAY ";
785 request.append(mControlURL);
786 request.append(" RTSP/1.0\r\n");
787
788 request.append("Session: ");
789 request.append(mSessionID);
790 request.append("\r\n");
791
792 request.append("\r\n");
793
794 sp<AMessage> reply = new AMessage('play', this);
795 mConn->sendRequest(request.c_str(), reply);
796 } else {
797 sp<AMessage> reply = new AMessage('disc', this);
798 mConn->disconnect(reply);
799 }
800 break;
801 }
802
803 case 'play':
804 {
805 int32_t result;
806 CHECK(msg->findInt32("result", &result));
807
808 ALOGI("PLAY completed with result %d (%s)",
809 result, strerror(-result));
810
811 if (result == OK) {
812 sp<RefBase> obj;
813 CHECK(msg->findObject("response", &obj));
814 sp<ARTSPResponse> response =
815 static_cast<ARTSPResponse *>(obj.get());
816
817 if (response->mStatusCode != 200) {
818 result = UNKNOWN_ERROR;
819 } else {
820 parsePlayResponse(response);
821 postTimeout();
822 }
823 }
824
825 if (result != OK) {
826 sp<AMessage> reply = new AMessage('disc', this);
827 mConn->disconnect(reply);
828 }
829
830 break;
831 }
832
833 case 'aliv':
834 {
835 int32_t generation;
836 CHECK(msg->findInt32("generation", &generation));
837
838 if (generation != mKeepAliveGeneration) {
839 // obsolete event.
840 break;
841 }
842
843 AString request;
844 request.append("OPTIONS ");
845 request.append(mSessionURL);
846 request.append(" RTSP/1.0\r\n");
847 request.append("Session: ");
848 request.append(mSessionID);
849 request.append("\r\n");
850 request.append("\r\n");
851
852 sp<AMessage> reply = new AMessage('opts', this);
853 reply->setInt32("generation", mKeepAliveGeneration);
854 mConn->sendRequest(request.c_str(), reply);
855 break;
856 }
857
858 case 'opts':
859 {
860 int32_t result;
861 CHECK(msg->findInt32("result", &result));
862
863 ALOGI("OPTIONS completed with result %d (%s)",
864 result, strerror(-result));
865
866 int32_t generation;
867 CHECK(msg->findInt32("generation", &generation));
868
869 if (generation != mKeepAliveGeneration) {
870 // obsolete event.
871 break;
872 }
873
874 postKeepAlive();
875 break;
876 }
877
878 case 'abor':
879 {
880 for (size_t i = 0; i < mTracks.size(); ++i) {
881 TrackInfo *info = &mTracks.editItemAt(i);
882
883 if (!mFirstAccessUnit) {
884 postQueueEOS(i, ERROR_END_OF_STREAM);
885 }
886
887 if (!info->mUsingInterleavedTCP) {
888 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
889
890 // Clear the tag
891 if (mUIDValid) {
892 NetworkUtils::UnRegisterSocketUserTag(info->mRTPSocket);
893 NetworkUtils::UnRegisterSocketUserMark(info->mRTPSocket);
894 NetworkUtils::UnRegisterSocketUserTag(info->mRTCPSocket);
895 NetworkUtils::UnRegisterSocketUserMark(info->mRTCPSocket);
896 }
897
898 close(info->mRTPSocket);
899 close(info->mRTCPSocket);
900 }
901 }
902 mTracks.clear();
903 mSetupTracksSuccessful = false;
904 mSeekPending = false;
905 mFirstAccessUnit = true;
906 mAllTracksHaveTime = false;
907 mNTPAnchorUs = -1;
908 mMediaAnchorUs = -1;
909 mNumAccessUnitsReceived = 0;
910 mReceivedFirstRTCPPacket = false;
911 mReceivedFirstRTPPacket = false;
912 mPausing = false;
913 mSeekable = true;
914
915 sp<AMessage> reply = new AMessage('tear', this);
916
917 int32_t reconnect;
918 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
919 reply->setInt32("reconnect", true);
920 }
921
922 AString request;
923 request = "TEARDOWN ";
924
925 // XXX should use aggregate url from SDP here...
926 request.append(mSessionURL);
927 request.append(" RTSP/1.0\r\n");
928
929 request.append("Session: ");
930 request.append(mSessionID);
931 request.append("\r\n");
932
933 request.append("\r\n");
934
935 mConn->sendRequest(request.c_str(), reply);
936
937 // If the response of teardown hasn't been received in 3 seconds,
938 // post 'tear' message to avoid ANR.
939 if (!msg->findInt32("reconnect", &reconnect) || !reconnect) {
940 sp<AMessage> teardown = reply->dup();
941 teardown->setInt32("result", -ECONNABORTED);
942 teardown->post(kTearDownTimeoutUs);
943 }
944 break;
945 }
946
947 case 'tear':
948 {
949 int32_t result;
950 CHECK(msg->findInt32("result", &result));
951
952 ALOGI("TEARDOWN completed with result %d (%s)",
953 result, strerror(-result));
954
955 sp<AMessage> reply = new AMessage('disc', this);
956
957 int32_t reconnect;
958 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
959 reply->setInt32("reconnect", true);
960 }
961
962 mConn->disconnect(reply);
963 break;
964 }
965
966 case 'quit':
967 {
968 sp<AMessage> msg = mNotify->dup();
969 msg->setInt32("what", kWhatDisconnected);
970 msg->setInt32("result", UNKNOWN_ERROR);
971 msg->post();
972 break;
973 }
974
975 case 'chek':
976 {
977 int32_t generation;
978 CHECK(msg->findInt32("generation", &generation));
979 if (generation != mCheckGeneration) {
980 // This is an outdated message. Ignore.
981 break;
982 }
983
984 if (mNumAccessUnitsReceived == 0) {
985 #if 1
986 ALOGI("stream ended? aborting.");
987 (new AMessage('abor', this))->post();
988 break;
989 #else
990 ALOGI("haven't seen an AU in a looong time.");
991 #endif
992 }
993
994 mNumAccessUnitsReceived = 0;
995 msg->post(kAccessUnitTimeoutUs);
996 break;
997 }
998
999 case 'accu':
1000 {
1001 if (mSeekPending) {
1002 ALOGV("Stale access unit.");
1003 break;
1004 }
1005
1006 int32_t timeUpdate;
1007 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
1008 size_t trackIndex;
1009 CHECK(msg->findSize("track-index", &trackIndex));
1010
1011 uint32_t rtpTime;
1012 uint64_t ntpTime;
1013 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
1014 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
1015
1016 onTimeUpdate(trackIndex, rtpTime, ntpTime);
1017 break;
1018 }
1019
1020 int32_t first;
1021 if (msg->findInt32("first-rtcp", &first)) {
1022 mReceivedFirstRTCPPacket = true;
1023 break;
1024 }
1025
1026 if (msg->findInt32("first-rtp", &first)) {
1027 mReceivedFirstRTPPacket = true;
1028 break;
1029 }
1030
1031 ++mNumAccessUnitsReceived;
1032 postAccessUnitTimeoutCheck();
1033
1034 size_t trackIndex;
1035 CHECK(msg->findSize("track-index", &trackIndex));
1036
1037 if (trackIndex >= mTracks.size()) {
1038 ALOGV("late packets ignored.");
1039 break;
1040 }
1041
1042 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1043
1044 int32_t eos;
1045 if (msg->findInt32("eos", &eos)) {
1046 ALOGI("received BYE on track index %zu", trackIndex);
1047 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1048 ALOGI("No time established => fake existing data");
1049
1050 track->mEOSReceived = true;
1051 mTryFakeRTCP = true;
1052 mReceivedFirstRTCPPacket = true;
1053 fakeTimestamps();
1054 } else {
1055 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1056 }
1057 return;
1058 }
1059
1060 if (mSeekPending) {
1061 ALOGV("we're seeking, dropping stale packet.");
1062 break;
1063 }
1064
1065 sp<ABuffer> accessUnit;
1066 CHECK(msg->findBuffer("access-unit", &accessUnit));
1067 onAccessUnitComplete(trackIndex, accessUnit);
1068 break;
1069 }
1070
1071 case 'paus':
1072 {
1073 int32_t generation;
1074 CHECK(msg->findInt32("pausecheck", &generation));
1075 if (generation != mPauseGeneration) {
1076 ALOGV("Ignoring outdated pause message.");
1077 break;
1078 }
1079
1080 if (!mSeekable) {
1081 ALOGW("This is a live stream, ignoring pause request.");
1082 break;
1083 }
1084
1085 if (mPausing) {
1086 ALOGV("This stream is already paused.");
1087 break;
1088 }
1089
1090 mCheckPending = true;
1091 ++mCheckGeneration;
1092 mPausing = true;
1093
1094 AString request = "PAUSE ";
1095 request.append(mControlURL);
1096 request.append(" RTSP/1.0\r\n");
1097
1098 request.append("Session: ");
1099 request.append(mSessionID);
1100 request.append("\r\n");
1101
1102 request.append("\r\n");
1103
1104 sp<AMessage> reply = new AMessage('pau2', this);
1105 mConn->sendRequest(request.c_str(), reply);
1106 break;
1107 }
1108
1109 case 'pau2':
1110 {
1111 int32_t result;
1112 CHECK(msg->findInt32("result", &result));
1113 mCheckTimeoutGeneration++;
1114
1115 ALOGI("PAUSE completed with result %d (%s)",
1116 result, strerror(-result));
1117 break;
1118 }
1119
1120 case 'resu':
1121 {
1122 if (mPausing && mSeekPending) {
1123 // If seeking, Play will be sent from see1 instead
1124 break;
1125 }
1126
1127 if (!mPausing) {
1128 // Dont send PLAY if we have not paused
1129 break;
1130 }
1131 AString request = "PLAY ";
1132 request.append(mControlURL);
1133 request.append(" RTSP/1.0\r\n");
1134
1135 request.append("Session: ");
1136 request.append(mSessionID);
1137 request.append("\r\n");
1138
1139 request.append("\r\n");
1140
1141 sp<AMessage> reply = new AMessage('res2', this);
1142 mConn->sendRequest(request.c_str(), reply);
1143 break;
1144 }
1145
1146 case 'res2':
1147 {
1148 int32_t result;
1149 CHECK(msg->findInt32("result", &result));
1150
1151 ALOGI("PLAY (for resume) completed with result %d (%s)",
1152 result, strerror(-result));
1153
1154 mCheckPending = false;
1155 ++mCheckGeneration;
1156 postAccessUnitTimeoutCheck();
1157
1158 if (result == OK) {
1159 sp<RefBase> obj;
1160 CHECK(msg->findObject("response", &obj));
1161 sp<ARTSPResponse> response =
1162 static_cast<ARTSPResponse *>(obj.get());
1163
1164 if (response->mStatusCode != 200) {
1165 result = UNKNOWN_ERROR;
1166 } else {
1167 parsePlayResponse(response);
1168
1169 // Post new timeout in order to make sure to use
1170 // fake timestamps if no new Sender Reports arrive
1171 postTimeout();
1172 }
1173 }
1174
1175 if (result != OK) {
1176 ALOGE("resume failed, aborting.");
1177 (new AMessage('abor', this))->post();
1178 }
1179
1180 mPausing = false;
1181 break;
1182 }
1183
1184 case 'seek':
1185 {
1186 if (!mSeekable) {
1187 ALOGW("This is a live stream, ignoring seek request.");
1188
1189 sp<AMessage> msg = mNotify->dup();
1190 msg->setInt32("what", kWhatSeekDone);
1191 msg->post();
1192 break;
1193 }
1194
1195 int64_t timeUs;
1196 CHECK(msg->findInt64("time", &timeUs));
1197
1198 mSeekPending = true;
1199
1200 // Disable the access unit timeout until we resumed
1201 // playback again.
1202 mCheckPending = true;
1203 ++mCheckGeneration;
1204
1205 sp<AMessage> reply = new AMessage('see0', this);
1206 reply->setInt64("time", timeUs);
1207
1208 if (mPausing) {
1209 // PAUSE already sent
1210 ALOGI("Pause already sent");
1211 reply->post();
1212 break;
1213 }
1214 AString request = "PAUSE ";
1215 request.append(mControlURL);
1216 request.append(" RTSP/1.0\r\n");
1217
1218 request.append("Session: ");
1219 request.append(mSessionID);
1220 request.append("\r\n");
1221
1222 request.append("\r\n");
1223
1224 mConn->sendRequest(request.c_str(), reply);
1225 break;
1226 }
1227
1228 case 'see0':
1229 {
1230 // Session is paused now.
1231 status_t err = OK;
1232 msg->findInt32("result", &err);
1233
1234 int64_t timeUs;
1235 CHECK(msg->findInt64("time", &timeUs));
1236
1237 sp<AMessage> notify = mNotify->dup();
1238 notify->setInt32("what", kWhatSeekPaused);
1239 notify->setInt32("err", err);
1240 notify->setInt64("time", timeUs);
1241 notify->post();
1242 break;
1243
1244 }
1245
1246 case 'see1':
1247 {
1248 for (size_t i = 0; i < mTracks.size(); ++i) {
1249 TrackInfo *info = &mTracks.editItemAt(i);
1250
1251 postQueueSeekDiscontinuity(i);
1252 info->mEOSReceived = false;
1253
1254 info->mRTPAnchor = 0;
1255 info->mNTPAnchorUs = -1;
1256 }
1257
1258 mAllTracksHaveTime = false;
1259 mNTPAnchorUs = -1;
1260
1261 // Start new timeoutgeneration to avoid getting timeout
1262 // before PLAY response arrive
1263 postTimeout();
1264
1265 int64_t timeUs;
1266 CHECK(msg->findInt64("time", &timeUs));
1267
1268 AString request = "PLAY ";
1269 request.append(mControlURL);
1270 request.append(" RTSP/1.0\r\n");
1271
1272 request.append("Session: ");
1273 request.append(mSessionID);
1274 request.append("\r\n");
1275
1276 request.append(
1277 AStringPrintf(
1278 "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1279
1280 request.append("\r\n");
1281
1282 sp<AMessage> reply = new AMessage('see2', this);
1283 mConn->sendRequest(request.c_str(), reply);
1284 break;
1285 }
1286
1287 case 'see2':
1288 {
1289 if (mTracks.size() == 0) {
1290 // We have already hit abor, break
1291 break;
1292 }
1293
1294 int32_t result;
1295 CHECK(msg->findInt32("result", &result));
1296
1297 ALOGI("PLAY (for seek) completed with result %d (%s)",
1298 result, strerror(-result));
1299
1300 mCheckPending = false;
1301 ++mCheckGeneration;
1302 postAccessUnitTimeoutCheck();
1303
1304 if (result == OK) {
1305 sp<RefBase> obj;
1306 CHECK(msg->findObject("response", &obj));
1307 sp<ARTSPResponse> response =
1308 static_cast<ARTSPResponse *>(obj.get());
1309
1310 if (response->mStatusCode != 200) {
1311 result = UNKNOWN_ERROR;
1312 } else {
1313 parsePlayResponse(response);
1314
1315 // Post new timeout in order to make sure to use
1316 // fake timestamps if no new Sender Reports arrive
1317 postTimeout();
1318
1319 ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1320 CHECK_GE(i, 0);
1321
1322 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1323
1324 ALOGI("seek completed.");
1325 }
1326 }
1327
1328 if (result != OK) {
1329 ALOGE("seek failed, aborting.");
1330 (new AMessage('abor', this))->post();
1331 }
1332
1333 mPausing = false;
1334 mSeekPending = false;
1335
1336 // Discard all stale access units.
1337 for (size_t i = 0; i < mTracks.size(); ++i) {
1338 TrackInfo *track = &mTracks.editItemAt(i);
1339 track->mPackets.clear();
1340 }
1341
1342 sp<AMessage> msg = mNotify->dup();
1343 msg->setInt32("what", kWhatSeekDone);
1344 msg->post();
1345 break;
1346 }
1347
1348 case 'biny':
1349 {
1350 sp<ABuffer> buffer;
1351 CHECK(msg->findBuffer("buffer", &buffer));
1352
1353 int32_t index;
1354 CHECK(buffer->meta()->findInt32("index", &index));
1355
1356 mRTPConn->injectPacket(index, buffer);
1357 break;
1358 }
1359
1360 case 'tiou':
1361 {
1362 int32_t timeoutGenerationCheck;
1363 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1364 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1365 // This is an outdated message. Ignore.
1366 // This typically happens if a lot of seeks are
1367 // performed, since new timeout messages now are
1368 // posted at seek as well.
1369 break;
1370 }
1371 if (!mReceivedFirstRTCPPacket) {
1372 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1373 ALOGW("We received RTP packets but no RTCP packets, "
1374 "using fake timestamps.");
1375
1376 mTryFakeRTCP = true;
1377
1378 mReceivedFirstRTCPPacket = true;
1379
1380 fakeTimestamps();
1381 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1382 ALOGW("Never received any data, switching transports.");
1383
1384 mTryTCPInterleaving = true;
1385
1386 sp<AMessage> msg = new AMessage('abor', this);
1387 msg->setInt32("reconnect", true);
1388 msg->post();
1389 } else {
1390 ALOGW("Never received any data, disconnecting.");
1391 (new AMessage('abor', this))->post();
1392 }
1393 } else {
1394 if (!mAllTracksHaveTime) {
1395 ALOGW("We received some RTCP packets, but time "
1396 "could not be established on all tracks, now "
1397 "using fake timestamps");
1398
1399 fakeTimestamps();
1400 }
1401 }
1402 break;
1403 }
1404
1405 default:
1406 TRESPASS();
1407 break;
1408 }
1409 }
1410
postKeepAliveMyHandler1411 void postKeepAlive() {
1412 sp<AMessage> msg = new AMessage('aliv', this);
1413 msg->setInt32("generation", mKeepAliveGeneration);
1414 msg->post((mKeepAliveTimeoutUs * 9) / 10);
1415 }
1416
cancelAccessUnitTimeoutCheckMyHandler1417 void cancelAccessUnitTimeoutCheck() {
1418 ALOGV("cancelAccessUnitTimeoutCheck");
1419 ++mCheckGeneration;
1420 }
1421
postAccessUnitTimeoutCheckMyHandler1422 void postAccessUnitTimeoutCheck() {
1423 if (mCheckPending) {
1424 return;
1425 }
1426
1427 mCheckPending = true;
1428 sp<AMessage> check = new AMessage('chek', this);
1429 check->setInt32("generation", mCheckGeneration);
1430 check->post(kAccessUnitTimeoutUs);
1431 }
1432
SplitStringMyHandler1433 static void SplitString(
1434 const AString &s, const char *separator, List<AString> *items) {
1435 items->clear();
1436 size_t start = 0;
1437 while (start < s.size()) {
1438 ssize_t offset = s.find(separator, start);
1439
1440 if (offset < 0) {
1441 items->push_back(AString(s, start, s.size() - start));
1442 break;
1443 }
1444
1445 items->push_back(AString(s, start, offset - start));
1446 start = offset + strlen(separator);
1447 }
1448 }
1449
parsePlayResponseMyHandler1450 void parsePlayResponse(const sp<ARTSPResponse> &response) {
1451 mPlayResponseParsed = true;
1452 if (mTracks.size() == 0) {
1453 ALOGV("parsePlayResponse: late packets ignored.");
1454 return;
1455 }
1456
1457 ssize_t i = response->mHeaders.indexOfKey("range");
1458 if (i < 0) {
1459 // Server doesn't even tell use what range it is going to
1460 // play, therefore we won't support seeking.
1461 return;
1462 }
1463
1464 AString range = response->mHeaders.valueAt(i);
1465 ALOGV("Range: %s", range.c_str());
1466
1467 AString val;
1468 CHECK(GetAttribute(range.c_str(), "npt", &val));
1469
1470 float npt1, npt2;
1471 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1472 // This is a live stream and therefore not seekable.
1473
1474 ALOGI("This is a live stream");
1475 return;
1476 }
1477
1478 i = response->mHeaders.indexOfKey("rtp-info");
1479 CHECK_GE(i, 0);
1480
1481 AString rtpInfo = response->mHeaders.valueAt(i);
1482 List<AString> streamInfos;
1483 SplitString(rtpInfo, ",", &streamInfos);
1484
1485 int n = 1;
1486 for (List<AString>::iterator it = streamInfos.begin();
1487 it != streamInfos.end(); ++it) {
1488 (*it).trim();
1489 ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1490
1491 CHECK(GetAttribute((*it).c_str(), "url", &val));
1492
1493 size_t trackIndex = 0;
1494 while (trackIndex < mTracks.size()
1495 && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1496 ++trackIndex;
1497 }
1498 CHECK_LT(trackIndex, mTracks.size());
1499
1500 CHECK(GetAttribute((*it).c_str(), "seq", &val));
1501
1502 char *end;
1503 unsigned long seq = strtoul(val.c_str(), &end, 10);
1504
1505 TrackInfo *info = &mTracks.editItemAt(trackIndex);
1506 info->mFirstSeqNumInSegment = seq;
1507 info->mNewSegment = true;
1508 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1509
1510 CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1511
1512 uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1513
1514 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1515
1516 info->mNormalPlayTimeRTP = rtpTime;
1517 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1518
1519 if (!mFirstAccessUnit) {
1520 postNormalPlayTimeMapping(
1521 trackIndex,
1522 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1523 }
1524
1525 ++n;
1526 }
1527 }
1528
getTrackFormatMyHandler1529 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1530 CHECK_GE(index, 0u);
1531 CHECK_LT(index, mTracks.size());
1532
1533 const TrackInfo &info = mTracks.itemAt(index);
1534
1535 *timeScale = info.mTimeScale;
1536
1537 return info.mPacketSource->getFormat();
1538 }
1539
countTracksMyHandler1540 size_t countTracks() const {
1541 return mTracks.size();
1542 }
1543
1544 private:
1545 struct TrackInfo {
1546 AString mURL;
1547 int mRTPSocket;
1548 int mRTCPSocket;
1549 bool mUsingInterleavedTCP;
1550 uint32_t mFirstSeqNumInSegment;
1551 bool mNewSegment;
1552 int32_t mAllowedStaleAccessUnits;
1553
1554 uint32_t mRTPAnchor;
1555 int64_t mNTPAnchorUs;
1556 int32_t mTimeScale;
1557 bool mEOSReceived;
1558
1559 uint32_t mNormalPlayTimeRTP;
1560 int64_t mNormalPlayTimeUs;
1561
1562 sp<APacketSource> mPacketSource;
1563
1564 // Stores packets temporarily while no notion of time
1565 // has been established yet.
1566 List<sp<ABuffer> > mPackets;
1567 };
1568
1569 sp<AMessage> mNotify;
1570 bool mUIDValid;
1571 uid_t mUID;
1572 sp<ALooper> mNetLooper;
1573 sp<ARTSPConnection> mConn;
1574 sp<ARTPConnection> mRTPConn;
1575 sp<ASessionDescription> mSessionDesc;
1576 AString mOriginalSessionURL; // This one still has user:pass@
1577 AString mSessionURL;
1578 AString mSessionHost;
1579 AString mBaseURL;
1580 AString mControlURL;
1581 AString mSessionID;
1582 bool mSetupTracksSuccessful;
1583 bool mSeekPending;
1584 bool mFirstAccessUnit;
1585
1586 bool mAllTracksHaveTime;
1587 int64_t mNTPAnchorUs;
1588 int64_t mMediaAnchorUs;
1589 int64_t mLastMediaTimeUs;
1590
1591 int64_t mNumAccessUnitsReceived;
1592 bool mCheckPending;
1593 int32_t mCheckGeneration;
1594 int32_t mCheckTimeoutGeneration;
1595 bool mTryTCPInterleaving;
1596 bool mTryFakeRTCP;
1597 bool mReceivedFirstRTCPPacket;
1598 bool mReceivedFirstRTPPacket;
1599 bool mSeekable;
1600 int64_t mKeepAliveTimeoutUs;
1601 int32_t mKeepAliveGeneration;
1602 bool mPausing;
1603 int32_t mPauseGeneration;
1604
1605 Vector<TrackInfo> mTracks;
1606
1607 bool mPlayResponseParsed;
1608
setupTrackMyHandler1609 void setupTrack(size_t index) {
1610 sp<APacketSource> source =
1611 new APacketSource(mSessionDesc, index);
1612
1613 if (source->initCheck() != OK) {
1614 ALOGW("Unsupported format. Ignoring track #%zu.", index);
1615
1616 sp<AMessage> reply = new AMessage('setu', this);
1617 reply->setSize("index", index);
1618 reply->setInt32("result", ERROR_UNSUPPORTED);
1619 reply->post();
1620 return;
1621 }
1622
1623 AString url;
1624 CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1625
1626 AString trackURL;
1627 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1628
1629 mTracks.push(TrackInfo());
1630 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1631 info->mURL = trackURL;
1632 info->mPacketSource = source;
1633 info->mUsingInterleavedTCP = false;
1634 info->mFirstSeqNumInSegment = 0;
1635 info->mNewSegment = true;
1636 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1637 info->mRTPSocket = -1;
1638 info->mRTCPSocket = -1;
1639 info->mRTPAnchor = 0;
1640 info->mNTPAnchorUs = -1;
1641 info->mNormalPlayTimeRTP = 0;
1642 info->mNormalPlayTimeUs = 0ll;
1643
1644 unsigned long PT;
1645 AString formatDesc;
1646 AString formatParams;
1647 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1648
1649 int32_t timescale;
1650 int32_t numChannels;
1651 ASessionDescription::ParseFormatDesc(
1652 formatDesc.c_str(), ×cale, &numChannels);
1653
1654 info->mTimeScale = timescale;
1655 info->mEOSReceived = false;
1656
1657 ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1658
1659 AString request = "SETUP ";
1660 request.append(trackURL);
1661 request.append(" RTSP/1.0\r\n");
1662
1663 if (mTryTCPInterleaving) {
1664 size_t interleaveIndex = 2 * (mTracks.size() - 1);
1665 info->mUsingInterleavedTCP = true;
1666 info->mRTPSocket = interleaveIndex;
1667 info->mRTCPSocket = interleaveIndex + 1;
1668
1669 request.append("Transport: RTP/AVP/TCP;interleaved=");
1670 request.append(interleaveIndex);
1671 request.append("-");
1672 request.append(interleaveIndex + 1);
1673 } else {
1674 unsigned rtpPort;
1675 ARTPConnection::MakePortPair(
1676 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1677
1678 if (mUIDValid) {
1679 NetworkUtils::RegisterSocketUserTag(info->mRTPSocket, mUID,
1680 (uint32_t)*(uint32_t*) "RTP_");
1681 NetworkUtils::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1682 (uint32_t)*(uint32_t*) "RTP_");
1683 NetworkUtils::RegisterSocketUserMark(info->mRTPSocket, mUID);
1684 NetworkUtils::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1685 }
1686
1687 request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1688 request.append(rtpPort);
1689 request.append("-");
1690 request.append(rtpPort + 1);
1691 }
1692
1693 request.append("\r\n");
1694
1695 if (index > 1) {
1696 request.append("Session: ");
1697 request.append(mSessionID);
1698 request.append("\r\n");
1699 }
1700
1701 request.append("\r\n");
1702
1703 sp<AMessage> reply = new AMessage('setu', this);
1704 reply->setSize("index", index);
1705 reply->setSize("track-index", mTracks.size() - 1);
1706 mConn->sendRequest(request.c_str(), reply);
1707 }
1708
MakeURLMyHandler1709 static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1710 out->clear();
1711
1712 if (strncasecmp("rtsp://", baseURL, 7)) {
1713 // Base URL must be absolute
1714 return false;
1715 }
1716
1717 if (!strncasecmp("rtsp://", url, 7)) {
1718 // "url" is already an absolute URL, ignore base URL.
1719 out->setTo(url);
1720 return true;
1721 }
1722
1723 size_t n = strlen(baseURL);
1724 out->setTo(baseURL);
1725 if (baseURL[n - 1] != '/') {
1726 out->append("/");
1727 }
1728 out->append(url);
1729
1730 return true;
1731 }
1732
fakeTimestampsMyHandler1733 void fakeTimestamps() {
1734 mNTPAnchorUs = -1ll;
1735 for (size_t i = 0; i < mTracks.size(); ++i) {
1736 onTimeUpdate(i, 0, 0ll);
1737 }
1738 }
1739
dataReceivedOnAllChannelsMyHandler1740 bool dataReceivedOnAllChannels() {
1741 TrackInfo *track;
1742 for (size_t i = 0; i < mTracks.size(); ++i) {
1743 track = &mTracks.editItemAt(i);
1744 if (track->mPackets.empty()) {
1745 return false;
1746 }
1747 }
1748 return true;
1749 }
1750
handleFirstAccessUnitMyHandler1751 void handleFirstAccessUnit() {
1752 if (mFirstAccessUnit) {
1753 sp<AMessage> msg = mNotify->dup();
1754 msg->setInt32("what", kWhatConnected);
1755 msg->post();
1756
1757 if (mSeekable) {
1758 for (size_t i = 0; i < mTracks.size(); ++i) {
1759 TrackInfo *info = &mTracks.editItemAt(i);
1760
1761 postNormalPlayTimeMapping(
1762 i,
1763 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1764 }
1765 }
1766
1767 mFirstAccessUnit = false;
1768 }
1769 }
1770
onTimeUpdateMyHandler1771 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1772 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1773 trackIndex, rtpTime, (long long)ntpTime);
1774
1775 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1776
1777 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1778
1779 track->mRTPAnchor = rtpTime;
1780 track->mNTPAnchorUs = ntpTimeUs;
1781
1782 if (mNTPAnchorUs < 0) {
1783 mNTPAnchorUs = ntpTimeUs;
1784 mMediaAnchorUs = mLastMediaTimeUs;
1785 }
1786
1787 if (!mAllTracksHaveTime) {
1788 bool allTracksHaveTime = (mTracks.size() > 0);
1789 for (size_t i = 0; i < mTracks.size(); ++i) {
1790 TrackInfo *track = &mTracks.editItemAt(i);
1791 if (track->mNTPAnchorUs < 0) {
1792 allTracksHaveTime = false;
1793 break;
1794 }
1795 }
1796 if (allTracksHaveTime) {
1797 mAllTracksHaveTime = true;
1798 ALOGI("Time now established for all tracks.");
1799 }
1800 }
1801 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1802 handleFirstAccessUnit();
1803
1804 // Time is now established, lets start timestamping immediately
1805 for (size_t i = 0; i < mTracks.size(); ++i) {
1806 if (OK != processAccessUnitQueue(i)) {
1807 return;
1808 }
1809 }
1810 for (size_t i = 0; i < mTracks.size(); ++i) {
1811 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1812 if (trackInfo->mEOSReceived) {
1813 postQueueEOS(i, ERROR_END_OF_STREAM);
1814 trackInfo->mEOSReceived = false;
1815 }
1816 }
1817 }
1818 }
1819
processAccessUnitQueueMyHandler1820 status_t processAccessUnitQueue(int32_t trackIndex) {
1821 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1822 while (!track->mPackets.empty()) {
1823 sp<ABuffer> accessUnit = *track->mPackets.begin();
1824 track->mPackets.erase(track->mPackets.begin());
1825
1826 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1827 if (track->mNewSegment) {
1828 // The sequence number from RTP packet has only 16 bits and is extended
1829 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
1830 // RTSP "PLAY" command should be used to detect the first RTP packet
1831 // after seeking.
1832 if (mSeekable) {
1833 if (track->mAllowedStaleAccessUnits > 0) {
1834 uint32_t seqNum16 = seqNum & 0xffff;
1835 uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
1836 if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits
1837 || seqNum16 < firstSeqNumInSegment16) {
1838 // Not the first rtp packet of the stream after seeking, discarding.
1839 track->mAllowedStaleAccessUnits--;
1840 ALOGV("discarding stale access unit (0x%x : 0x%x)",
1841 seqNum, track->mFirstSeqNumInSegment);
1842 continue;
1843 }
1844 ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
1845 "Missing the first packet(%u), now take packet(%u) as first one",
1846 track->mFirstSeqNumInSegment, seqNum);
1847 } else { // track->mAllowedStaleAccessUnits <= 0
1848 mNumAccessUnitsReceived = 0;
1849 ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
1850 "Still no first rtp packet after %d stale ones",
1851 kMaxAllowedStaleAccessUnits);
1852 track->mAllowedStaleAccessUnits = -1;
1853 return UNKNOWN_ERROR;
1854 }
1855 }
1856
1857 // Now found the first rtp packet of the stream after seeking.
1858 track->mFirstSeqNumInSegment = seqNum;
1859 track->mNewSegment = false;
1860 }
1861
1862 if (seqNum < track->mFirstSeqNumInSegment) {
1863 ALOGV("dropping stale access-unit (%d < %d)",
1864 seqNum, track->mFirstSeqNumInSegment);
1865 continue;
1866 }
1867
1868 if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1869 postQueueAccessUnit(trackIndex, accessUnit);
1870 }
1871 }
1872 return OK;
1873 }
1874
onAccessUnitCompleteMyHandler1875 void onAccessUnitComplete(
1876 int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1877 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1878 track->mPackets.push_back(accessUnit);
1879
1880 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1881 ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum);
1882
1883 if(!mPlayResponseParsed){
1884 ALOGV("play response is not parsed");
1885 return;
1886 }
1887
1888 handleFirstAccessUnit();
1889
1890 if (!mAllTracksHaveTime) {
1891 ALOGV("storing accessUnit, no time established yet");
1892 return;
1893 }
1894
1895 if (OK != processAccessUnitQueue(trackIndex)) {
1896 return;
1897 }
1898
1899 if (track->mEOSReceived) {
1900 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1901 track->mEOSReceived = false;
1902 }
1903 }
1904
addMediaTimestampMyHandler1905 bool addMediaTimestamp(
1906 int32_t trackIndex, const TrackInfo *track,
1907 const sp<ABuffer> &accessUnit) {
1908 UNUSED_UNLESS_VERBOSE(trackIndex);
1909
1910 uint32_t rtpTime;
1911 CHECK(accessUnit->meta()->findInt32(
1912 "rtp-time", (int32_t *)&rtpTime));
1913
1914 int64_t relRtpTimeUs =
1915 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1916 / track->mTimeScale;
1917
1918 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1919
1920 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1921
1922 if (mediaTimeUs > mLastMediaTimeUs) {
1923 mLastMediaTimeUs = mediaTimeUs;
1924 }
1925
1926 if (mediaTimeUs < 0 && !mSeekable) {
1927 ALOGV("dropping early accessUnit.");
1928 return false;
1929 }
1930
1931 ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1932 trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1933
1934 accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1935
1936 return true;
1937 }
1938
postQueueAccessUnitMyHandler1939 void postQueueAccessUnit(
1940 size_t trackIndex, const sp<ABuffer> &accessUnit) {
1941 sp<AMessage> msg = mNotify->dup();
1942 msg->setInt32("what", kWhatAccessUnit);
1943 msg->setSize("trackIndex", trackIndex);
1944 msg->setBuffer("accessUnit", accessUnit);
1945 msg->post();
1946 }
1947
postQueueEOSMyHandler1948 void postQueueEOS(size_t trackIndex, status_t finalResult) {
1949 sp<AMessage> msg = mNotify->dup();
1950 msg->setInt32("what", kWhatEOS);
1951 msg->setSize("trackIndex", trackIndex);
1952 msg->setInt32("finalResult", finalResult);
1953 msg->post();
1954 }
1955
postQueueSeekDiscontinuityMyHandler1956 void postQueueSeekDiscontinuity(size_t trackIndex) {
1957 sp<AMessage> msg = mNotify->dup();
1958 msg->setInt32("what", kWhatSeekDiscontinuity);
1959 msg->setSize("trackIndex", trackIndex);
1960 msg->post();
1961 }
1962
postNormalPlayTimeMappingMyHandler1963 void postNormalPlayTimeMapping(
1964 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1965 sp<AMessage> msg = mNotify->dup();
1966 msg->setInt32("what", kWhatNormalPlayTimeMapping);
1967 msg->setSize("trackIndex", trackIndex);
1968 msg->setInt32("rtpTime", rtpTime);
1969 msg->setInt64("nptUs", nptUs);
1970 msg->post();
1971 }
1972
postTimeoutMyHandler1973 void postTimeout() {
1974 sp<AMessage> timeout = new AMessage('tiou', this);
1975 mCheckTimeoutGeneration++;
1976 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1977
1978 int64_t startupTimeoutUs;
1979 startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs);
1980 timeout->post(startupTimeoutUs);
1981 }
1982
1983 DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1984 };
1985
1986 } // namespace android
1987
1988 #endif // MY_HANDLER_H_
1989