1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MY_HANDLER_H_
18 
19 #define MY_HANDLER_H_
20 
21 //#define LOG_NDEBUG 0
22 
23 #ifndef LOG_TAG
24 #define LOG_TAG "MyHandler"
25 #endif
26 
27 #include <utils/Log.h>
28 #include <cutils/properties.h> // for property_get
29 
30 #include "APacketSource.h"
31 #include "ARTPConnection.h"
32 #include "ARTSPConnection.h"
33 #include "ASessionDescription.h"
34 #include "NetworkUtils.h"
35 
36 #include <ctype.h>
37 #include <cutils/properties.h>
38 
39 #include <datasource/HTTPBase.h>
40 #include <media/stagefright/foundation/ABuffer.h>
41 #include <media/stagefright/foundation/ADebug.h>
42 #include <media/stagefright/foundation/ALooper.h>
43 #include <media/stagefright/foundation/AMessage.h>
44 #include <media/stagefright/MediaErrors.h>
45 #include <media/stagefright/Utils.h>
46 #include <media/stagefright/FoundationUtils.h>
47 
48 #include <arpa/inet.h>
49 #include <sys/socket.h>
50 #include <netdb.h>
51 
52 
53 #if LOG_NDEBUG
54 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
55 #else
56 #define UNUSED_UNLESS_VERBOSE(x)
57 #endif
58 
59 #ifndef FALLTHROUGH_INTENDED
60 #define FALLTHROUGH_INTENDED [[clang::fallthrough]]  // NOLINT
61 #endif
62 
63 // If no access units are received within 5 secs, assume that the rtp
64 // stream has ended and signal end of stream.
65 static int64_t kAccessUnitTimeoutUs = 10000000ll;
66 
67 // If no access units arrive for the first 10 secs after starting the
68 // stream, assume none ever will and signal EOS or switch transports.
69 static int64_t kStartupTimeoutUs = 10000000ll;
70 
71 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
72 
73 static int64_t kPauseDelayUs = 3000000ll;
74 
75 // The allowed maximum number of stale access units at the beginning of
76 // a new sequence.
77 static int32_t kMaxAllowedStaleAccessUnits = 20;
78 
79 static int64_t kTearDownTimeoutUs = 3000000ll;
80 
81 namespace android {
82 
GetAttribute(const char * s,const char * key,AString * value)83 static bool GetAttribute(const char *s, const char *key, AString *value) {
84     value->clear();
85 
86     size_t keyLen = strlen(key);
87 
88     for (;;) {
89         while (isspace(*s)) {
90             ++s;
91         }
92 
93         const char *colonPos = strchr(s, ';');
94 
95         size_t len =
96             (colonPos == NULL) ? strlen(s) : colonPos - s;
97 
98         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
99             value->setTo(&s[keyLen + 1], len - keyLen - 1);
100             return true;
101         }
102 
103         if (colonPos == NULL) {
104             return false;
105         }
106 
107         s = colonPos + 1;
108     }
109 }
110 
111 struct MyHandler : public AHandler {
112     enum {
113         kWhatConnected                  = 'conn',
114         kWhatDisconnected               = 'disc',
115         kWhatSeekPaused                 = 'spau',
116         kWhatSeekDone                   = 'sdon',
117 
118         kWhatAccessUnit                 = 'accU',
119         kWhatEOS                        = 'eos!',
120         kWhatSeekDiscontinuity          = 'seeD',
121         kWhatNormalPlayTimeMapping      = 'nptM',
122     };
123 
124     MyHandler(
125             const char *url,
126             const sp<AMessage> &notify,
127             bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler128         : mNotify(notify),
129           mUIDValid(uidValid),
130           mUID(uid),
131           mNetLooper(new ALooper),
132           mConn(new ARTSPConnection(mUIDValid, mUID)),
133           mRTPConn(new ARTPConnection),
134           mOriginalSessionURL(url),
135           mSessionURL(url),
136           mSetupTracksSuccessful(false),
137           mSeekPending(false),
138           mFirstAccessUnit(true),
139           mAllTracksHaveTime(false),
140           mNTPAnchorUs(-1),
141           mMediaAnchorUs(-1),
142           mLastMediaTimeUs(0),
143           mNumAccessUnitsReceived(0),
144           mCheckPending(false),
145           mCheckGeneration(0),
146           mCheckTimeoutGeneration(0),
147           mTryTCPInterleaving(property_get_bool("rtp.transport.TCP", false)),
148           mTryFakeRTCP(false),
149           mReceivedFirstRTCPPacket(false),
150           mReceivedFirstRTPPacket(false),
151           mSeekable(true),
152           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
153           mKeepAliveGeneration(0),
154           mPausing(false),
155           mPauseGeneration(0),
156           mPlayResponseParsed(false) {
157         mNetLooper->setName("rtsp net");
158         mNetLooper->start(false /* runOnCallingThread */,
159                           false /* canCallJava */,
160                           PRIORITY_HIGHEST);
161 
162         // Strip any authentication info from the session url, we don't
163         // want to transmit user/pass in cleartext.
164         AString host, path, user, pass;
165         unsigned port;
166         CHECK(ARTSPConnection::ParseURL(
167                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
168 
169         if (user.size() > 0) {
170             mSessionURL.clear();
171             mSessionURL.append("rtsp://");
172             mSessionURL.append(host);
173             mSessionURL.append(":");
174             mSessionURL.append(AStringPrintf("%u", port));
175             mSessionURL.append(path);
176 
177             ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
178         }
179 
180         mSessionHost = host;
181     }
182 
connectMyHandler183     void connect() {
184         looper()->registerHandler(mConn);
185         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
186 
187         sp<AMessage> notify = new AMessage('biny', this);
188         mConn->observeBinaryData(notify);
189 
190         sp<AMessage> reply = new AMessage('conn', this);
191         mConn->connect(mOriginalSessionURL.c_str(), reply);
192     }
193 
loadSDPMyHandler194     void loadSDP(const sp<ASessionDescription>& desc) {
195         looper()->registerHandler(mConn);
196         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
197 
198         sp<AMessage> notify = new AMessage('biny', this);
199         mConn->observeBinaryData(notify);
200 
201         sp<AMessage> reply = new AMessage('sdpl', this);
202         reply->setObject("description", desc);
203         mConn->connect(mOriginalSessionURL.c_str(), reply);
204     }
205 
getControlURLMyHandler206     AString getControlURL() {
207         AString sessionLevelControlURL;
208         if (mSessionDesc->findAttribute(
209                 0,
210                 "a=control",
211                 &sessionLevelControlURL)) {
212             if (sessionLevelControlURL.compare("*") == 0) {
213                 return mBaseURL;
214             } else {
215                 AString controlURL;
216                 CHECK(MakeURL(
217                         mBaseURL.c_str(),
218                         sessionLevelControlURL.c_str(),
219                         &controlURL));
220                 return controlURL;
221             }
222         } else {
223             return mSessionURL;
224         }
225     }
226 
disconnectMyHandler227     void disconnect() {
228         (new AMessage('abor', this))->post();
229     }
230 
seekMyHandler231     void seek(int64_t timeUs) {
232         sp<AMessage> msg = new AMessage('seek', this);
233         msg->setInt64("time", timeUs);
234         mPauseGeneration++;
235         msg->post();
236     }
237 
continueSeekAfterPauseMyHandler238     void continueSeekAfterPause(int64_t timeUs) {
239         sp<AMessage> msg = new AMessage('see1', this);
240         msg->setInt64("time", timeUs);
241         msg->post();
242     }
243 
isSeekableMyHandler244     bool isSeekable() const {
245         return mSeekable;
246     }
247 
pauseMyHandler248     void pause() {
249         sp<AMessage> msg = new AMessage('paus', this);
250         mPauseGeneration++;
251         msg->setInt32("pausecheck", mPauseGeneration);
252         msg->post();
253     }
254 
resumeMyHandler255     void resume() {
256         sp<AMessage> msg = new AMessage('resu', this);
257         mPauseGeneration++;
258         msg->post();
259     }
260 
addRRMyHandler261     static void addRR(const sp<ABuffer> &buf) {
262         uint8_t *ptr = buf->data() + buf->size();
263         ptr[0] = 0x80 | 0;
264         ptr[1] = 201;  // RR
265         ptr[2] = 0;
266         ptr[3] = 1;
267         ptr[4] = 0xde;  // SSRC
268         ptr[5] = 0xad;
269         ptr[6] = 0xbe;
270         ptr[7] = 0xef;
271 
272         buf->setRange(0, buf->size() + 8);
273     }
274 
addSDESMyHandler275     static void addSDES(int s, const sp<ABuffer> &buffer) {
276         struct sockaddr_in addr;
277         socklen_t addrSize = sizeof(addr);
278         if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
279             inet_aton("0.0.0.0", &(addr.sin_addr));
280         }
281 
282         uint8_t *data = buffer->data() + buffer->size();
283         data[0] = 0x80 | 1;
284         data[1] = 202;  // SDES
285         data[4] = 0xde;  // SSRC
286         data[5] = 0xad;
287         data[6] = 0xbe;
288         data[7] = 0xef;
289 
290         size_t offset = 8;
291 
292         data[offset++] = 1;  // CNAME
293 
294         AString cname = "stagefright@";
295         cname.append(inet_ntoa(addr.sin_addr));
296         data[offset++] = cname.size();
297 
298         memcpy(&data[offset], cname.c_str(), cname.size());
299         offset += cname.size();
300 
301         data[offset++] = 6;  // TOOL
302 
303         AString tool = MakeUserAgent();
304 
305         data[offset++] = tool.size();
306 
307         memcpy(&data[offset], tool.c_str(), tool.size());
308         offset += tool.size();
309 
310         data[offset++] = 0;
311 
312         if ((offset % 4) > 0) {
313             size_t count = 4 - (offset % 4);
314             switch (count) {
315                 case 3:
316                     data[offset++] = 0;
317                     FALLTHROUGH_INTENDED;
318                 case 2:
319                     data[offset++] = 0;
320                     FALLTHROUGH_INTENDED;
321                 case 1:
322                     data[offset++] = 0;
323             }
324         }
325 
326         size_t numWords = (offset / 4) - 1;
327         data[2] = numWords >> 8;
328         data[3] = numWords & 0xff;
329 
330         buffer->setRange(buffer->offset(), buffer->size() + offset);
331     }
332 
333     // In case we're behind NAT, fire off two UDP packets to the remote
334     // rtp/rtcp ports to poke a hole into the firewall for future incoming
335     // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler336     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
337         struct sockaddr_in addr;
338         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
339         addr.sin_family = AF_INET;
340 
341         AString source;
342         AString server_port;
343         if (!GetAttribute(transport.c_str(),
344                           "source",
345                           &source)) {
346             ALOGW("Missing 'source' field in Transport response. Using "
347                  "RTSP endpoint address.");
348 
349             struct hostent *ent = gethostbyname(mSessionHost.c_str());
350             if (ent == NULL) {
351                 ALOGE("Failed to look up address of session host");
352 
353                 return false;
354             }
355 
356             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
357         } else {
358             addr.sin_addr.s_addr = inet_addr(source.c_str());
359         }
360 
361         if (!GetAttribute(transport.c_str(),
362                                  "server_port",
363                                  &server_port)) {
364             ALOGI("Missing 'server_port' field in Transport response.");
365             return false;
366         }
367 
368         int rtpPort, rtcpPort;
369         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
370                 || rtpPort <= 0 || rtpPort > 65535
371                 || rtcpPort <=0 || rtcpPort > 65535
372                 || rtcpPort != rtpPort + 1) {
373             ALOGE("Server picked invalid RTP/RTCP port pair %s,"
374                  " RTP port must be even, RTCP port must be one higher.",
375                  server_port.c_str());
376 
377             return false;
378         }
379 
380         if (rtpPort & 1) {
381             ALOGW("Server picked an odd RTP port, it should've picked an "
382                  "even one, we'll let it pass for now, but this may break "
383                  "in the future.");
384         }
385 
386         if (addr.sin_addr.s_addr == INADDR_NONE) {
387             return true;
388         }
389 
390         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
391             // No firewalls to traverse on the loopback interface.
392             return true;
393         }
394 
395         // Make up an RR/SDES RTCP packet.
396         sp<ABuffer> buf = new ABuffer(65536);
397         buf->setRange(0, 0);
398         addRR(buf);
399         addSDES(rtpSocket, buf);
400 
401         addr.sin_port = htons(rtpPort);
402 
403         ssize_t n = sendto(
404                 rtpSocket, buf->data(), buf->size(), 0,
405                 (const sockaddr *)&addr, sizeof(addr));
406 
407         if (n < (ssize_t)buf->size()) {
408             ALOGE("failed to poke a hole for RTP packets");
409             return false;
410         }
411 
412         addr.sin_port = htons(rtcpPort);
413 
414         n = sendto(
415                 rtcpSocket, buf->data(), buf->size(), 0,
416                 (const sockaddr *)&addr, sizeof(addr));
417 
418         if (n < (ssize_t)buf->size()) {
419             ALOGE("failed to poke a hole for RTCP packets");
420             return false;
421         }
422 
423         ALOGV("successfully poked holes.");
424 
425         return true;
426     }
427 
isLiveStreamMyHandler428     static bool isLiveStream(const sp<ASessionDescription> &desc) {
429         AString attrLiveStream;
430         if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
431             ssize_t semicolonPos = attrLiveStream.find(";", 2);
432 
433             const char* liveStreamValue;
434             if (semicolonPos < 0) {
435                 liveStreamValue = attrLiveStream.c_str();
436             } else {
437                 AString valString;
438                 valString.setTo(attrLiveStream,
439                         semicolonPos + 1,
440                         attrLiveStream.size() - semicolonPos - 1);
441                 liveStreamValue = valString.c_str();
442             }
443 
444             uint32_t value = strtoul(liveStreamValue, NULL, 10);
445             if (value == 1) {
446                 ALOGV("found live stream");
447                 return true;
448             }
449         } else {
450             // It is a live stream if no duration is returned
451             int64_t durationUs;
452             if (!desc->getDurationUs(&durationUs)) {
453                 ALOGV("No duration found, assume live stream");
454                 return true;
455             }
456         }
457 
458         return false;
459     }
460 
onMessageReceivedMyHandler461     virtual void onMessageReceived(const sp<AMessage> &msg) {
462         switch (msg->what()) {
463             case 'conn':
464             {
465                 int32_t result;
466                 CHECK(msg->findInt32("result", &result));
467 
468                 ALOGI("connection request completed with result %d (%s)",
469                      result, strerror(-result));
470 
471                 if (result == OK) {
472                     AString request;
473                     request = "DESCRIBE ";
474                     request.append(mSessionURL);
475                     request.append(" RTSP/1.0\r\n");
476                     request.append("Accept: application/sdp\r\n");
477                     request.append("\r\n");
478 
479                     sp<AMessage> reply = new AMessage('desc', this);
480                     mConn->sendRequest(request.c_str(), reply);
481                 } else {
482                     (new AMessage('disc', this))->post();
483                 }
484                 break;
485             }
486 
487             case 'disc':
488             {
489                 ++mKeepAliveGeneration;
490 
491                 int32_t reconnect;
492                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
493                     sp<AMessage> reply = new AMessage('conn', this);
494                     mConn->connect(mOriginalSessionURL.c_str(), reply);
495                 } else {
496                     (new AMessage('quit', this))->post();
497                 }
498                 break;
499             }
500 
501             case 'desc':
502             {
503                 int32_t result;
504                 CHECK(msg->findInt32("result", &result));
505 
506                 ALOGI("DESCRIBE completed with result %d (%s)",
507                      result, strerror(-result));
508 
509                 if (result == OK) {
510                     sp<RefBase> obj;
511                     CHECK(msg->findObject("response", &obj));
512                     sp<ARTSPResponse> response =
513                         static_cast<ARTSPResponse *>(obj.get());
514 
515                     if (response->mStatusCode == 301 || response->mStatusCode == 302) {
516                         ssize_t i = response->mHeaders.indexOfKey("location");
517                         CHECK_GE(i, 0);
518 
519                         mOriginalSessionURL = response->mHeaders.valueAt(i);
520                         mSessionURL = mOriginalSessionURL;
521 
522                         // Strip any authentication info from the session url, we don't
523                         // want to transmit user/pass in cleartext.
524                         AString host, path, user, pass;
525                         unsigned port;
526                         if (ARTSPConnection::ParseURL(
527                                     mSessionURL.c_str(), &host, &port, &path, &user, &pass)
528                                 && user.size() > 0) {
529                             mSessionURL.clear();
530                             mSessionURL.append("rtsp://");
531                             mSessionURL.append(host);
532                             mSessionURL.append(":");
533                             mSessionURL.append(AStringPrintf("%u", port));
534                             mSessionURL.append(path);
535 
536                             ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
537                         }
538 
539                         sp<AMessage> reply = new AMessage('conn', this);
540                         mConn->connect(mOriginalSessionURL.c_str(), reply);
541                         break;
542                     }
543 
544                     if (response->mStatusCode != 200) {
545                         result = UNKNOWN_ERROR;
546                     } else if (response->mContent == NULL) {
547                         result = ERROR_MALFORMED;
548                         ALOGE("The response has no content.");
549                     } else {
550                         mSessionDesc = new ASessionDescription;
551 
552                         mSessionDesc->setTo(
553                                 response->mContent->data(),
554                                 response->mContent->size());
555 
556                         if (!mSessionDesc->isValid()) {
557                             ALOGE("Failed to parse session description.");
558                             result = ERROR_MALFORMED;
559                         } else {
560                             ssize_t i = response->mHeaders.indexOfKey("content-base");
561                             if (i >= 0) {
562                                 mBaseURL = response->mHeaders.valueAt(i);
563                             } else {
564                                 i = response->mHeaders.indexOfKey("content-location");
565                                 if (i >= 0) {
566                                     mBaseURL = response->mHeaders.valueAt(i);
567                                 } else {
568                                     mBaseURL = mSessionURL;
569                                 }
570                             }
571 
572                             mSeekable = !isLiveStream(mSessionDesc);
573 
574                             if (!mBaseURL.startsWith("rtsp://")) {
575                                 // Some misbehaving servers specify a relative
576                                 // URL in one of the locations above, combine
577                                 // it with the absolute session URL to get
578                                 // something usable...
579 
580                                 ALOGW("Server specified a non-absolute base URL"
581                                      ", combining it with the session URL to "
582                                      "get something usable...");
583 
584                                 AString tmp;
585                                 CHECK(MakeURL(
586                                             mSessionURL.c_str(),
587                                             mBaseURL.c_str(),
588                                             &tmp));
589 
590                                 mBaseURL = tmp;
591                             }
592 
593                             mControlURL = getControlURL();
594 
595                             if (mSessionDesc->countTracks() < 2) {
596                                 // There's no actual tracks in this session.
597                                 // The first "track" is merely session meta
598                                 // data.
599 
600                                 ALOGW("Session doesn't contain any playable "
601                                      "tracks. Aborting.");
602                                 result = ERROR_UNSUPPORTED;
603                             } else {
604                                 setupTrack(1);
605                             }
606                         }
607                     }
608                 }
609 
610                 if (result != OK) {
611                     sp<AMessage> reply = new AMessage('disc', this);
612                     mConn->disconnect(reply);
613                 }
614                 break;
615             }
616 
617             case 'sdpl':
618             {
619                 int32_t result;
620                 CHECK(msg->findInt32("result", &result));
621 
622                 ALOGI("SDP connection request completed with result %d (%s)",
623                      result, strerror(-result));
624 
625                 if (result == OK) {
626                     sp<RefBase> obj;
627                     CHECK(msg->findObject("description", &obj));
628                     mSessionDesc =
629                         static_cast<ASessionDescription *>(obj.get());
630 
631                     if (!mSessionDesc->isValid()) {
632                         ALOGE("Failed to parse session description.");
633                         result = ERROR_MALFORMED;
634                     } else {
635                         mBaseURL = mSessionURL;
636 
637                         mSeekable = !isLiveStream(mSessionDesc);
638 
639                         mControlURL = getControlURL();
640 
641                         if (mSessionDesc->countTracks() < 2) {
642                             // There's no actual tracks in this session.
643                             // The first "track" is merely session meta
644                             // data.
645 
646                             ALOGW("Session doesn't contain any playable "
647                                  "tracks. Aborting.");
648                             result = ERROR_UNSUPPORTED;
649                         } else {
650                             setupTrack(1);
651                         }
652                     }
653                 }
654 
655                 if (result != OK) {
656                     sp<AMessage> reply = new AMessage('disc', this);
657                     mConn->disconnect(reply);
658                 }
659                 break;
660             }
661 
662             case 'setu':
663             {
664                 size_t index;
665                 CHECK(msg->findSize("index", &index));
666 
667                 TrackInfo *track = NULL;
668                 size_t trackIndex;
669                 if (msg->findSize("track-index", &trackIndex)) {
670                     track = &mTracks.editItemAt(trackIndex);
671                 }
672 
673                 int32_t result;
674                 CHECK(msg->findInt32("result", &result));
675 
676                 ALOGI("SETUP(%zu) completed with result %d (%s)",
677                      index, result, strerror(-result));
678 
679                 if (result == OK) {
680                     CHECK(track != NULL);
681 
682                     sp<RefBase> obj;
683                     CHECK(msg->findObject("response", &obj));
684                     sp<ARTSPResponse> response =
685                         static_cast<ARTSPResponse *>(obj.get());
686 
687                     if (response->mStatusCode != 200) {
688                         result = UNKNOWN_ERROR;
689                     } else {
690                         ssize_t i = response->mHeaders.indexOfKey("session");
691                         CHECK_GE(i, 0);
692 
693                         mSessionID = response->mHeaders.valueAt(i);
694 
695                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
696                         AString timeoutStr;
697                         if (GetAttribute(
698                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
699                             char *end;
700                             unsigned long timeoutSecs =
701                                 strtoul(timeoutStr.c_str(), &end, 10);
702 
703                             if (end == timeoutStr.c_str() || *end != '\0') {
704                                 ALOGW("server specified malformed timeout '%s'",
705                                      timeoutStr.c_str());
706 
707                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
708                             } else if (timeoutSecs < 15) {
709                                 ALOGW("server specified too short a timeout "
710                                      "(%lu secs), using default.",
711                                      timeoutSecs);
712 
713                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
714                             } else {
715                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
716 
717                                 ALOGI("server specified timeout of %lu secs.",
718                                      timeoutSecs);
719                             }
720                         }
721 
722                         i = mSessionID.find(";");
723                         if (i >= 0) {
724                             // Remove options, i.e. ";timeout=90"
725                             mSessionID.erase(i, mSessionID.size() - i);
726                         }
727 
728                         sp<AMessage> notify = new AMessage('accu', this);
729                         notify->setSize("track-index", trackIndex);
730 
731                         i = response->mHeaders.indexOfKey("transport");
732                         CHECK_GE(i, 0);
733 
734                         if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
735                             if (!track->mUsingInterleavedTCP) {
736                                 AString transport = response->mHeaders.valueAt(i);
737 
738                                 // We are going to continue even if we were
739                                 // unable to poke a hole into the firewall...
740                                 pokeAHole(
741                                         track->mRTPSocket,
742                                         track->mRTCPSocket,
743                                         transport);
744                             }
745 
746                             mRTPConn->addStream(
747                                     track->mRTPSocket, track->mRTCPSocket,
748                                     mSessionDesc, index,
749                                     notify, track->mUsingInterleavedTCP);
750 
751                             mSetupTracksSuccessful = true;
752                         } else {
753                             result = BAD_VALUE;
754                         }
755                     }
756                 }
757 
758                 if (result != OK) {
759                     if (track) {
760                         if (!track->mUsingInterleavedTCP) {
761                             // Clear the tag
762                             if (mUIDValid) {
763                                 NetworkUtils::UnRegisterSocketUserTag(track->mRTPSocket);
764                                 NetworkUtils::UnRegisterSocketUserMark(track->mRTPSocket);
765                                 NetworkUtils::UnRegisterSocketUserTag(track->mRTCPSocket);
766                                 NetworkUtils::UnRegisterSocketUserMark(track->mRTCPSocket);
767                             }
768 
769                             close(track->mRTPSocket);
770                             close(track->mRTCPSocket);
771                         }
772 
773                         mTracks.removeItemsAt(trackIndex);
774                     }
775                 }
776 
777                 ++index;
778                 if (result == OK && index < mSessionDesc->countTracks()) {
779                     setupTrack(index);
780                 } else if (mSetupTracksSuccessful) {
781                     ++mKeepAliveGeneration;
782                     postKeepAlive();
783 
784                     AString request = "PLAY ";
785                     request.append(mControlURL);
786                     request.append(" RTSP/1.0\r\n");
787 
788                     request.append("Session: ");
789                     request.append(mSessionID);
790                     request.append("\r\n");
791 
792                     request.append("\r\n");
793 
794                     sp<AMessage> reply = new AMessage('play', this);
795                     mConn->sendRequest(request.c_str(), reply);
796                 } else {
797                     sp<AMessage> reply = new AMessage('disc', this);
798                     mConn->disconnect(reply);
799                 }
800                 break;
801             }
802 
803             case 'play':
804             {
805                 int32_t result;
806                 CHECK(msg->findInt32("result", &result));
807 
808                 ALOGI("PLAY completed with result %d (%s)",
809                      result, strerror(-result));
810 
811                 if (result == OK) {
812                     sp<RefBase> obj;
813                     CHECK(msg->findObject("response", &obj));
814                     sp<ARTSPResponse> response =
815                         static_cast<ARTSPResponse *>(obj.get());
816 
817                     if (response->mStatusCode != 200) {
818                         result = UNKNOWN_ERROR;
819                     } else {
820                         parsePlayResponse(response);
821                         postTimeout();
822                     }
823                 }
824 
825                 if (result != OK) {
826                     sp<AMessage> reply = new AMessage('disc', this);
827                     mConn->disconnect(reply);
828                 }
829 
830                 break;
831             }
832 
833             case 'aliv':
834             {
835                 int32_t generation;
836                 CHECK(msg->findInt32("generation", &generation));
837 
838                 if (generation != mKeepAliveGeneration) {
839                     // obsolete event.
840                     break;
841                 }
842 
843                 AString request;
844                 request.append("OPTIONS ");
845                 request.append(mSessionURL);
846                 request.append(" RTSP/1.0\r\n");
847                 request.append("Session: ");
848                 request.append(mSessionID);
849                 request.append("\r\n");
850                 request.append("\r\n");
851 
852                 sp<AMessage> reply = new AMessage('opts', this);
853                 reply->setInt32("generation", mKeepAliveGeneration);
854                 mConn->sendRequest(request.c_str(), reply);
855                 break;
856             }
857 
858             case 'opts':
859             {
860                 int32_t result;
861                 CHECK(msg->findInt32("result", &result));
862 
863                 ALOGI("OPTIONS completed with result %d (%s)",
864                      result, strerror(-result));
865 
866                 int32_t generation;
867                 CHECK(msg->findInt32("generation", &generation));
868 
869                 if (generation != mKeepAliveGeneration) {
870                     // obsolete event.
871                     break;
872                 }
873 
874                 postKeepAlive();
875                 break;
876             }
877 
878             case 'abor':
879             {
880                 for (size_t i = 0; i < mTracks.size(); ++i) {
881                     TrackInfo *info = &mTracks.editItemAt(i);
882 
883                     if (!mFirstAccessUnit) {
884                         postQueueEOS(i, ERROR_END_OF_STREAM);
885                     }
886 
887                     if (!info->mUsingInterleavedTCP) {
888                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
889 
890                         // Clear the tag
891                         if (mUIDValid) {
892                             NetworkUtils::UnRegisterSocketUserTag(info->mRTPSocket);
893                             NetworkUtils::UnRegisterSocketUserMark(info->mRTPSocket);
894                             NetworkUtils::UnRegisterSocketUserTag(info->mRTCPSocket);
895                             NetworkUtils::UnRegisterSocketUserMark(info->mRTCPSocket);
896                         }
897 
898                         close(info->mRTPSocket);
899                         close(info->mRTCPSocket);
900                     }
901                 }
902                 mTracks.clear();
903                 mSetupTracksSuccessful = false;
904                 mSeekPending = false;
905                 mFirstAccessUnit = true;
906                 mAllTracksHaveTime = false;
907                 mNTPAnchorUs = -1;
908                 mMediaAnchorUs = -1;
909                 mNumAccessUnitsReceived = 0;
910                 mReceivedFirstRTCPPacket = false;
911                 mReceivedFirstRTPPacket = false;
912                 mPausing = false;
913                 mSeekable = true;
914 
915                 sp<AMessage> reply = new AMessage('tear', this);
916 
917                 int32_t reconnect;
918                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
919                     reply->setInt32("reconnect", true);
920                 }
921 
922                 AString request;
923                 request = "TEARDOWN ";
924 
925                 // XXX should use aggregate url from SDP here...
926                 request.append(mSessionURL);
927                 request.append(" RTSP/1.0\r\n");
928 
929                 request.append("Session: ");
930                 request.append(mSessionID);
931                 request.append("\r\n");
932 
933                 request.append("\r\n");
934 
935                 mConn->sendRequest(request.c_str(), reply);
936 
937                 // If the response of teardown hasn't been received in 3 seconds,
938                 // post 'tear' message to avoid ANR.
939                 if (!msg->findInt32("reconnect", &reconnect) || !reconnect) {
940                     sp<AMessage> teardown = reply->dup();
941                     teardown->setInt32("result", -ECONNABORTED);
942                     teardown->post(kTearDownTimeoutUs);
943                 }
944                 break;
945             }
946 
947             case 'tear':
948             {
949                 int32_t result;
950                 CHECK(msg->findInt32("result", &result));
951 
952                 ALOGI("TEARDOWN completed with result %d (%s)",
953                      result, strerror(-result));
954 
955                 sp<AMessage> reply = new AMessage('disc', this);
956 
957                 int32_t reconnect;
958                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
959                     reply->setInt32("reconnect", true);
960                 }
961 
962                 mConn->disconnect(reply);
963                 break;
964             }
965 
966             case 'quit':
967             {
968                 sp<AMessage> msg = mNotify->dup();
969                 msg->setInt32("what", kWhatDisconnected);
970                 msg->setInt32("result", UNKNOWN_ERROR);
971                 msg->post();
972                 break;
973             }
974 
975             case 'chek':
976             {
977                 int32_t generation;
978                 CHECK(msg->findInt32("generation", &generation));
979                 if (generation != mCheckGeneration) {
980                     // This is an outdated message. Ignore.
981                     break;
982                 }
983 
984                 if (mNumAccessUnitsReceived == 0) {
985 #if 1
986                     ALOGI("stream ended? aborting.");
987                     (new AMessage('abor', this))->post();
988                     break;
989 #else
990                     ALOGI("haven't seen an AU in a looong time.");
991 #endif
992                 }
993 
994                 mNumAccessUnitsReceived = 0;
995                 msg->post(kAccessUnitTimeoutUs);
996                 break;
997             }
998 
999             case 'accu':
1000             {
1001                 if (mSeekPending) {
1002                     ALOGV("Stale access unit.");
1003                     break;
1004                 }
1005 
1006                 int32_t timeUpdate;
1007                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
1008                     size_t trackIndex;
1009                     CHECK(msg->findSize("track-index", &trackIndex));
1010 
1011                     uint32_t rtpTime;
1012                     uint64_t ntpTime;
1013                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
1014                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
1015 
1016                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
1017                     break;
1018                 }
1019 
1020                 int32_t first;
1021                 if (msg->findInt32("first-rtcp", &first)) {
1022                     mReceivedFirstRTCPPacket = true;
1023                     break;
1024                 }
1025 
1026                 if (msg->findInt32("first-rtp", &first)) {
1027                     mReceivedFirstRTPPacket = true;
1028                     break;
1029                 }
1030 
1031                 ++mNumAccessUnitsReceived;
1032                 postAccessUnitTimeoutCheck();
1033 
1034                 size_t trackIndex;
1035                 CHECK(msg->findSize("track-index", &trackIndex));
1036 
1037                 if (trackIndex >= mTracks.size()) {
1038                     ALOGV("late packets ignored.");
1039                     break;
1040                 }
1041 
1042                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1043 
1044                 int32_t eos;
1045                 if (msg->findInt32("eos", &eos)) {
1046                     ALOGI("received BYE on track index %zu", trackIndex);
1047                     if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1048                         ALOGI("No time established => fake existing data");
1049 
1050                         track->mEOSReceived = true;
1051                         mTryFakeRTCP = true;
1052                         mReceivedFirstRTCPPacket = true;
1053                         fakeTimestamps();
1054                     } else {
1055                         postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1056                     }
1057                     return;
1058                 }
1059 
1060                 if (mSeekPending) {
1061                     ALOGV("we're seeking, dropping stale packet.");
1062                     break;
1063                 }
1064 
1065                 sp<ABuffer> accessUnit;
1066                 CHECK(msg->findBuffer("access-unit", &accessUnit));
1067                 onAccessUnitComplete(trackIndex, accessUnit);
1068                 break;
1069             }
1070 
1071             case 'paus':
1072             {
1073                 int32_t generation;
1074                 CHECK(msg->findInt32("pausecheck", &generation));
1075                 if (generation != mPauseGeneration) {
1076                     ALOGV("Ignoring outdated pause message.");
1077                     break;
1078                 }
1079 
1080                 if (!mSeekable) {
1081                     ALOGW("This is a live stream, ignoring pause request.");
1082                     break;
1083                 }
1084 
1085                 if (mPausing) {
1086                     ALOGV("This stream is already paused.");
1087                     break;
1088                 }
1089 
1090                 mCheckPending = true;
1091                 ++mCheckGeneration;
1092                 mPausing = true;
1093 
1094                 AString request = "PAUSE ";
1095                 request.append(mControlURL);
1096                 request.append(" RTSP/1.0\r\n");
1097 
1098                 request.append("Session: ");
1099                 request.append(mSessionID);
1100                 request.append("\r\n");
1101 
1102                 request.append("\r\n");
1103 
1104                 sp<AMessage> reply = new AMessage('pau2', this);
1105                 mConn->sendRequest(request.c_str(), reply);
1106                 break;
1107             }
1108 
1109             case 'pau2':
1110             {
1111                 int32_t result;
1112                 CHECK(msg->findInt32("result", &result));
1113                 mCheckTimeoutGeneration++;
1114 
1115                 ALOGI("PAUSE completed with result %d (%s)",
1116                      result, strerror(-result));
1117                 break;
1118             }
1119 
1120             case 'resu':
1121             {
1122                 if (mPausing && mSeekPending) {
1123                     // If seeking, Play will be sent from see1 instead
1124                     break;
1125                 }
1126 
1127                 if (!mPausing) {
1128                     // Dont send PLAY if we have not paused
1129                     break;
1130                 }
1131                 AString request = "PLAY ";
1132                 request.append(mControlURL);
1133                 request.append(" RTSP/1.0\r\n");
1134 
1135                 request.append("Session: ");
1136                 request.append(mSessionID);
1137                 request.append("\r\n");
1138 
1139                 request.append("\r\n");
1140 
1141                 sp<AMessage> reply = new AMessage('res2', this);
1142                 mConn->sendRequest(request.c_str(), reply);
1143                 break;
1144             }
1145 
1146             case 'res2':
1147             {
1148                 int32_t result;
1149                 CHECK(msg->findInt32("result", &result));
1150 
1151                 ALOGI("PLAY (for resume) completed with result %d (%s)",
1152                      result, strerror(-result));
1153 
1154                 mCheckPending = false;
1155                 ++mCheckGeneration;
1156                 postAccessUnitTimeoutCheck();
1157 
1158                 if (result == OK) {
1159                     sp<RefBase> obj;
1160                     CHECK(msg->findObject("response", &obj));
1161                     sp<ARTSPResponse> response =
1162                         static_cast<ARTSPResponse *>(obj.get());
1163 
1164                     if (response->mStatusCode != 200) {
1165                         result = UNKNOWN_ERROR;
1166                     } else {
1167                         parsePlayResponse(response);
1168 
1169                         // Post new timeout in order to make sure to use
1170                         // fake timestamps if no new Sender Reports arrive
1171                         postTimeout();
1172                     }
1173                 }
1174 
1175                 if (result != OK) {
1176                     ALOGE("resume failed, aborting.");
1177                     (new AMessage('abor', this))->post();
1178                 }
1179 
1180                 mPausing = false;
1181                 break;
1182             }
1183 
1184             case 'seek':
1185             {
1186                 if (!mSeekable) {
1187                     ALOGW("This is a live stream, ignoring seek request.");
1188 
1189                     sp<AMessage> msg = mNotify->dup();
1190                     msg->setInt32("what", kWhatSeekDone);
1191                     msg->post();
1192                     break;
1193                 }
1194 
1195                 int64_t timeUs;
1196                 CHECK(msg->findInt64("time", &timeUs));
1197 
1198                 mSeekPending = true;
1199 
1200                 // Disable the access unit timeout until we resumed
1201                 // playback again.
1202                 mCheckPending = true;
1203                 ++mCheckGeneration;
1204 
1205                 sp<AMessage> reply = new AMessage('see0', this);
1206                 reply->setInt64("time", timeUs);
1207 
1208                 if (mPausing) {
1209                     // PAUSE already sent
1210                     ALOGI("Pause already sent");
1211                     reply->post();
1212                     break;
1213                 }
1214                 AString request = "PAUSE ";
1215                 request.append(mControlURL);
1216                 request.append(" RTSP/1.0\r\n");
1217 
1218                 request.append("Session: ");
1219                 request.append(mSessionID);
1220                 request.append("\r\n");
1221 
1222                 request.append("\r\n");
1223 
1224                 mConn->sendRequest(request.c_str(), reply);
1225                 break;
1226             }
1227 
1228             case 'see0':
1229             {
1230                 // Session is paused now.
1231                 status_t err = OK;
1232                 msg->findInt32("result", &err);
1233 
1234                 int64_t timeUs;
1235                 CHECK(msg->findInt64("time", &timeUs));
1236 
1237                 sp<AMessage> notify = mNotify->dup();
1238                 notify->setInt32("what", kWhatSeekPaused);
1239                 notify->setInt32("err", err);
1240                 notify->setInt64("time", timeUs);
1241                 notify->post();
1242                 break;
1243 
1244             }
1245 
1246             case 'see1':
1247             {
1248                 for (size_t i = 0; i < mTracks.size(); ++i) {
1249                     TrackInfo *info = &mTracks.editItemAt(i);
1250 
1251                     postQueueSeekDiscontinuity(i);
1252                     info->mEOSReceived = false;
1253 
1254                     info->mRTPAnchor = 0;
1255                     info->mNTPAnchorUs = -1;
1256                 }
1257 
1258                 mAllTracksHaveTime = false;
1259                 mNTPAnchorUs = -1;
1260 
1261                 // Start new timeoutgeneration to avoid getting timeout
1262                 // before PLAY response arrive
1263                 postTimeout();
1264 
1265                 int64_t timeUs;
1266                 CHECK(msg->findInt64("time", &timeUs));
1267 
1268                 AString request = "PLAY ";
1269                 request.append(mControlURL);
1270                 request.append(" RTSP/1.0\r\n");
1271 
1272                 request.append("Session: ");
1273                 request.append(mSessionID);
1274                 request.append("\r\n");
1275 
1276                 request.append(
1277                         AStringPrintf(
1278                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1279 
1280                 request.append("\r\n");
1281 
1282                 sp<AMessage> reply = new AMessage('see2', this);
1283                 mConn->sendRequest(request.c_str(), reply);
1284                 break;
1285             }
1286 
1287             case 'see2':
1288             {
1289                 if (mTracks.size() == 0) {
1290                     // We have already hit abor, break
1291                     break;
1292                 }
1293 
1294                 int32_t result;
1295                 CHECK(msg->findInt32("result", &result));
1296 
1297                 ALOGI("PLAY (for seek) completed with result %d (%s)",
1298                      result, strerror(-result));
1299 
1300                 mCheckPending = false;
1301                 ++mCheckGeneration;
1302                 postAccessUnitTimeoutCheck();
1303 
1304                 if (result == OK) {
1305                     sp<RefBase> obj;
1306                     CHECK(msg->findObject("response", &obj));
1307                     sp<ARTSPResponse> response =
1308                         static_cast<ARTSPResponse *>(obj.get());
1309 
1310                     if (response->mStatusCode != 200) {
1311                         result = UNKNOWN_ERROR;
1312                     } else {
1313                         parsePlayResponse(response);
1314 
1315                         // Post new timeout in order to make sure to use
1316                         // fake timestamps if no new Sender Reports arrive
1317                         postTimeout();
1318 
1319                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1320                         CHECK_GE(i, 0);
1321 
1322                         ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1323 
1324                         ALOGI("seek completed.");
1325                     }
1326                 }
1327 
1328                 if (result != OK) {
1329                     ALOGE("seek failed, aborting.");
1330                     (new AMessage('abor', this))->post();
1331                 }
1332 
1333                 mPausing = false;
1334                 mSeekPending = false;
1335 
1336                 // Discard all stale access units.
1337                 for (size_t i = 0; i < mTracks.size(); ++i) {
1338                     TrackInfo *track = &mTracks.editItemAt(i);
1339                     track->mPackets.clear();
1340                 }
1341 
1342                 sp<AMessage> msg = mNotify->dup();
1343                 msg->setInt32("what", kWhatSeekDone);
1344                 msg->post();
1345                 break;
1346             }
1347 
1348             case 'biny':
1349             {
1350                 sp<ABuffer> buffer;
1351                 CHECK(msg->findBuffer("buffer", &buffer));
1352 
1353                 int32_t index;
1354                 CHECK(buffer->meta()->findInt32("index", &index));
1355 
1356                 mRTPConn->injectPacket(index, buffer);
1357                 break;
1358             }
1359 
1360             case 'tiou':
1361             {
1362                 int32_t timeoutGenerationCheck;
1363                 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1364                 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1365                     // This is an outdated message. Ignore.
1366                     // This typically happens if a lot of seeks are
1367                     // performed, since new timeout messages now are
1368                     // posted at seek as well.
1369                     break;
1370                 }
1371                 if (!mReceivedFirstRTCPPacket) {
1372                     if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1373                         ALOGW("We received RTP packets but no RTCP packets, "
1374                              "using fake timestamps.");
1375 
1376                         mTryFakeRTCP = true;
1377 
1378                         mReceivedFirstRTCPPacket = true;
1379 
1380                         fakeTimestamps();
1381                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1382                         ALOGW("Never received any data, switching transports.");
1383 
1384                         mTryTCPInterleaving = true;
1385 
1386                         sp<AMessage> msg = new AMessage('abor', this);
1387                         msg->setInt32("reconnect", true);
1388                         msg->post();
1389                     } else {
1390                         ALOGW("Never received any data, disconnecting.");
1391                         (new AMessage('abor', this))->post();
1392                     }
1393                 } else {
1394                     if (!mAllTracksHaveTime) {
1395                         ALOGW("We received some RTCP packets, but time "
1396                               "could not be established on all tracks, now "
1397                               "using fake timestamps");
1398 
1399                         fakeTimestamps();
1400                     }
1401                 }
1402                 break;
1403             }
1404 
1405             default:
1406                 TRESPASS();
1407                 break;
1408         }
1409     }
1410 
postKeepAliveMyHandler1411     void postKeepAlive() {
1412         sp<AMessage> msg = new AMessage('aliv', this);
1413         msg->setInt32("generation", mKeepAliveGeneration);
1414         msg->post((mKeepAliveTimeoutUs * 9) / 10);
1415     }
1416 
cancelAccessUnitTimeoutCheckMyHandler1417     void cancelAccessUnitTimeoutCheck() {
1418         ALOGV("cancelAccessUnitTimeoutCheck");
1419         ++mCheckGeneration;
1420     }
1421 
postAccessUnitTimeoutCheckMyHandler1422     void postAccessUnitTimeoutCheck() {
1423         if (mCheckPending) {
1424             return;
1425         }
1426 
1427         mCheckPending = true;
1428         sp<AMessage> check = new AMessage('chek', this);
1429         check->setInt32("generation", mCheckGeneration);
1430         check->post(kAccessUnitTimeoutUs);
1431     }
1432 
SplitStringMyHandler1433     static void SplitString(
1434             const AString &s, const char *separator, List<AString> *items) {
1435         items->clear();
1436         size_t start = 0;
1437         while (start < s.size()) {
1438             ssize_t offset = s.find(separator, start);
1439 
1440             if (offset < 0) {
1441                 items->push_back(AString(s, start, s.size() - start));
1442                 break;
1443             }
1444 
1445             items->push_back(AString(s, start, offset - start));
1446             start = offset + strlen(separator);
1447         }
1448     }
1449 
parsePlayResponseMyHandler1450     void parsePlayResponse(const sp<ARTSPResponse> &response) {
1451         mPlayResponseParsed = true;
1452         if (mTracks.size() == 0) {
1453             ALOGV("parsePlayResponse: late packets ignored.");
1454             return;
1455         }
1456 
1457         ssize_t i = response->mHeaders.indexOfKey("range");
1458         if (i < 0) {
1459             // Server doesn't even tell use what range it is going to
1460             // play, therefore we won't support seeking.
1461             return;
1462         }
1463 
1464         AString range = response->mHeaders.valueAt(i);
1465         ALOGV("Range: %s", range.c_str());
1466 
1467         AString val;
1468         CHECK(GetAttribute(range.c_str(), "npt", &val));
1469 
1470         float npt1, npt2;
1471         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1472             // This is a live stream and therefore not seekable.
1473 
1474             ALOGI("This is a live stream");
1475             return;
1476         }
1477 
1478         i = response->mHeaders.indexOfKey("rtp-info");
1479         CHECK_GE(i, 0);
1480 
1481         AString rtpInfo = response->mHeaders.valueAt(i);
1482         List<AString> streamInfos;
1483         SplitString(rtpInfo, ",", &streamInfos);
1484 
1485         int n = 1;
1486         for (List<AString>::iterator it = streamInfos.begin();
1487              it != streamInfos.end(); ++it) {
1488             (*it).trim();
1489             ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1490 
1491             CHECK(GetAttribute((*it).c_str(), "url", &val));
1492 
1493             size_t trackIndex = 0;
1494             while (trackIndex < mTracks.size()
1495                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1496                 ++trackIndex;
1497             }
1498             CHECK_LT(trackIndex, mTracks.size());
1499 
1500             CHECK(GetAttribute((*it).c_str(), "seq", &val));
1501 
1502             char *end;
1503             unsigned long seq = strtoul(val.c_str(), &end, 10);
1504 
1505             TrackInfo *info = &mTracks.editItemAt(trackIndex);
1506             info->mFirstSeqNumInSegment = seq;
1507             info->mNewSegment = true;
1508             info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1509 
1510             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1511 
1512             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1513 
1514             ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1515 
1516             info->mNormalPlayTimeRTP = rtpTime;
1517             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1518 
1519             if (!mFirstAccessUnit) {
1520                 postNormalPlayTimeMapping(
1521                         trackIndex,
1522                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1523             }
1524 
1525             ++n;
1526         }
1527     }
1528 
getTrackFormatMyHandler1529     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1530         CHECK_GE(index, 0u);
1531         CHECK_LT(index, mTracks.size());
1532 
1533         const TrackInfo &info = mTracks.itemAt(index);
1534 
1535         *timeScale = info.mTimeScale;
1536 
1537         return info.mPacketSource->getFormat();
1538     }
1539 
countTracksMyHandler1540     size_t countTracks() const {
1541         return mTracks.size();
1542     }
1543 
1544 private:
1545     struct TrackInfo {
1546         AString mURL;
1547         int mRTPSocket;
1548         int mRTCPSocket;
1549         bool mUsingInterleavedTCP;
1550         uint32_t mFirstSeqNumInSegment;
1551         bool mNewSegment;
1552         int32_t mAllowedStaleAccessUnits;
1553 
1554         uint32_t mRTPAnchor;
1555         int64_t mNTPAnchorUs;
1556         int32_t mTimeScale;
1557         bool mEOSReceived;
1558 
1559         uint32_t mNormalPlayTimeRTP;
1560         int64_t mNormalPlayTimeUs;
1561 
1562         sp<APacketSource> mPacketSource;
1563 
1564         // Stores packets temporarily while no notion of time
1565         // has been established yet.
1566         List<sp<ABuffer> > mPackets;
1567     };
1568 
1569     sp<AMessage> mNotify;
1570     bool mUIDValid;
1571     uid_t mUID;
1572     sp<ALooper> mNetLooper;
1573     sp<ARTSPConnection> mConn;
1574     sp<ARTPConnection> mRTPConn;
1575     sp<ASessionDescription> mSessionDesc;
1576     AString mOriginalSessionURL;  // This one still has user:pass@
1577     AString mSessionURL;
1578     AString mSessionHost;
1579     AString mBaseURL;
1580     AString mControlURL;
1581     AString mSessionID;
1582     bool mSetupTracksSuccessful;
1583     bool mSeekPending;
1584     bool mFirstAccessUnit;
1585 
1586     bool mAllTracksHaveTime;
1587     int64_t mNTPAnchorUs;
1588     int64_t mMediaAnchorUs;
1589     int64_t mLastMediaTimeUs;
1590 
1591     int64_t mNumAccessUnitsReceived;
1592     bool mCheckPending;
1593     int32_t mCheckGeneration;
1594     int32_t mCheckTimeoutGeneration;
1595     bool mTryTCPInterleaving;
1596     bool mTryFakeRTCP;
1597     bool mReceivedFirstRTCPPacket;
1598     bool mReceivedFirstRTPPacket;
1599     bool mSeekable;
1600     int64_t mKeepAliveTimeoutUs;
1601     int32_t mKeepAliveGeneration;
1602     bool mPausing;
1603     int32_t mPauseGeneration;
1604 
1605     Vector<TrackInfo> mTracks;
1606 
1607     bool mPlayResponseParsed;
1608 
setupTrackMyHandler1609     void setupTrack(size_t index) {
1610         sp<APacketSource> source =
1611             new APacketSource(mSessionDesc, index);
1612 
1613         if (source->initCheck() != OK) {
1614             ALOGW("Unsupported format. Ignoring track #%zu.", index);
1615 
1616             sp<AMessage> reply = new AMessage('setu', this);
1617             reply->setSize("index", index);
1618             reply->setInt32("result", ERROR_UNSUPPORTED);
1619             reply->post();
1620             return;
1621         }
1622 
1623         AString url;
1624         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1625 
1626         AString trackURL;
1627         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1628 
1629         mTracks.push(TrackInfo());
1630         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1631         info->mURL = trackURL;
1632         info->mPacketSource = source;
1633         info->mUsingInterleavedTCP = false;
1634         info->mFirstSeqNumInSegment = 0;
1635         info->mNewSegment = true;
1636         info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1637         info->mRTPSocket = -1;
1638         info->mRTCPSocket = -1;
1639         info->mRTPAnchor = 0;
1640         info->mNTPAnchorUs = -1;
1641         info->mNormalPlayTimeRTP = 0;
1642         info->mNormalPlayTimeUs = 0ll;
1643 
1644         unsigned long PT;
1645         AString formatDesc;
1646         AString formatParams;
1647         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1648 
1649         int32_t timescale;
1650         int32_t numChannels;
1651         ASessionDescription::ParseFormatDesc(
1652                 formatDesc.c_str(), &timescale, &numChannels);
1653 
1654         info->mTimeScale = timescale;
1655         info->mEOSReceived = false;
1656 
1657         ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1658 
1659         AString request = "SETUP ";
1660         request.append(trackURL);
1661         request.append(" RTSP/1.0\r\n");
1662 
1663         if (mTryTCPInterleaving) {
1664             size_t interleaveIndex = 2 * (mTracks.size() - 1);
1665             info->mUsingInterleavedTCP = true;
1666             info->mRTPSocket = interleaveIndex;
1667             info->mRTCPSocket = interleaveIndex + 1;
1668 
1669             request.append("Transport: RTP/AVP/TCP;interleaved=");
1670             request.append(interleaveIndex);
1671             request.append("-");
1672             request.append(interleaveIndex + 1);
1673         } else {
1674             unsigned rtpPort;
1675             ARTPConnection::MakePortPair(
1676                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1677 
1678             if (mUIDValid) {
1679                 NetworkUtils::RegisterSocketUserTag(info->mRTPSocket, mUID,
1680                         (uint32_t)*(uint32_t*) "RTP_");
1681                 NetworkUtils::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1682                         (uint32_t)*(uint32_t*) "RTP_");
1683                 NetworkUtils::RegisterSocketUserMark(info->mRTPSocket, mUID);
1684                 NetworkUtils::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1685             }
1686 
1687             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1688             request.append(rtpPort);
1689             request.append("-");
1690             request.append(rtpPort + 1);
1691         }
1692 
1693         request.append("\r\n");
1694 
1695         if (index > 1) {
1696             request.append("Session: ");
1697             request.append(mSessionID);
1698             request.append("\r\n");
1699         }
1700 
1701         request.append("\r\n");
1702 
1703         sp<AMessage> reply = new AMessage('setu', this);
1704         reply->setSize("index", index);
1705         reply->setSize("track-index", mTracks.size() - 1);
1706         mConn->sendRequest(request.c_str(), reply);
1707     }
1708 
MakeURLMyHandler1709     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1710         out->clear();
1711 
1712         if (strncasecmp("rtsp://", baseURL, 7)) {
1713             // Base URL must be absolute
1714             return false;
1715         }
1716 
1717         if (!strncasecmp("rtsp://", url, 7)) {
1718             // "url" is already an absolute URL, ignore base URL.
1719             out->setTo(url);
1720             return true;
1721         }
1722 
1723         size_t n = strlen(baseURL);
1724         out->setTo(baseURL);
1725         if (baseURL[n - 1] != '/') {
1726             out->append("/");
1727         }
1728         out->append(url);
1729 
1730         return true;
1731     }
1732 
fakeTimestampsMyHandler1733     void fakeTimestamps() {
1734         mNTPAnchorUs = -1ll;
1735         for (size_t i = 0; i < mTracks.size(); ++i) {
1736             onTimeUpdate(i, 0, 0ll);
1737         }
1738     }
1739 
dataReceivedOnAllChannelsMyHandler1740     bool dataReceivedOnAllChannels() {
1741         TrackInfo *track;
1742         for (size_t i = 0; i < mTracks.size(); ++i) {
1743             track = &mTracks.editItemAt(i);
1744             if (track->mPackets.empty()) {
1745                 return false;
1746             }
1747         }
1748         return true;
1749     }
1750 
handleFirstAccessUnitMyHandler1751     void handleFirstAccessUnit() {
1752         if (mFirstAccessUnit) {
1753             sp<AMessage> msg = mNotify->dup();
1754             msg->setInt32("what", kWhatConnected);
1755             msg->post();
1756 
1757             if (mSeekable) {
1758                 for (size_t i = 0; i < mTracks.size(); ++i) {
1759                     TrackInfo *info = &mTracks.editItemAt(i);
1760 
1761                     postNormalPlayTimeMapping(
1762                             i,
1763                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1764                 }
1765             }
1766 
1767             mFirstAccessUnit = false;
1768         }
1769     }
1770 
onTimeUpdateMyHandler1771     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1772         ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1773              trackIndex, rtpTime, (long long)ntpTime);
1774 
1775         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1776 
1777         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1778 
1779         track->mRTPAnchor = rtpTime;
1780         track->mNTPAnchorUs = ntpTimeUs;
1781 
1782         if (mNTPAnchorUs < 0) {
1783             mNTPAnchorUs = ntpTimeUs;
1784             mMediaAnchorUs = mLastMediaTimeUs;
1785         }
1786 
1787         if (!mAllTracksHaveTime) {
1788             bool allTracksHaveTime = (mTracks.size() > 0);
1789             for (size_t i = 0; i < mTracks.size(); ++i) {
1790                 TrackInfo *track = &mTracks.editItemAt(i);
1791                 if (track->mNTPAnchorUs < 0) {
1792                     allTracksHaveTime = false;
1793                     break;
1794                 }
1795             }
1796             if (allTracksHaveTime) {
1797                 mAllTracksHaveTime = true;
1798                 ALOGI("Time now established for all tracks.");
1799             }
1800         }
1801         if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1802             handleFirstAccessUnit();
1803 
1804             // Time is now established, lets start timestamping immediately
1805             for (size_t i = 0; i < mTracks.size(); ++i) {
1806                 if (OK != processAccessUnitQueue(i)) {
1807                     return;
1808                 }
1809             }
1810             for (size_t i = 0; i < mTracks.size(); ++i) {
1811                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1812                 if (trackInfo->mEOSReceived) {
1813                     postQueueEOS(i, ERROR_END_OF_STREAM);
1814                     trackInfo->mEOSReceived = false;
1815                 }
1816             }
1817         }
1818     }
1819 
processAccessUnitQueueMyHandler1820     status_t processAccessUnitQueue(int32_t trackIndex) {
1821         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1822         while (!track->mPackets.empty()) {
1823             sp<ABuffer> accessUnit = *track->mPackets.begin();
1824             track->mPackets.erase(track->mPackets.begin());
1825 
1826             uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1827             if (track->mNewSegment) {
1828                 // The sequence number from RTP packet has only 16 bits and is extended
1829                 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
1830                 // RTSP "PLAY" command should be used to detect the first RTP packet
1831                 // after seeking.
1832                 if (mSeekable) {
1833                     if (track->mAllowedStaleAccessUnits > 0) {
1834                         uint32_t seqNum16 = seqNum & 0xffff;
1835                         uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
1836                         if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits
1837                                 || seqNum16 < firstSeqNumInSegment16) {
1838                             // Not the first rtp packet of the stream after seeking, discarding.
1839                             track->mAllowedStaleAccessUnits--;
1840                             ALOGV("discarding stale access unit (0x%x : 0x%x)",
1841                                  seqNum, track->mFirstSeqNumInSegment);
1842                             continue;
1843                         }
1844                         ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
1845                                 "Missing the first packet(%u), now take packet(%u) as first one",
1846                                 track->mFirstSeqNumInSegment, seqNum);
1847                     } else { // track->mAllowedStaleAccessUnits <= 0
1848                         mNumAccessUnitsReceived = 0;
1849                         ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
1850                              "Still no first rtp packet after %d stale ones",
1851                              kMaxAllowedStaleAccessUnits);
1852                         track->mAllowedStaleAccessUnits = -1;
1853                         return UNKNOWN_ERROR;
1854                     }
1855                 }
1856 
1857                 // Now found the first rtp packet of the stream after seeking.
1858                 track->mFirstSeqNumInSegment = seqNum;
1859                 track->mNewSegment = false;
1860             }
1861 
1862             if (seqNum < track->mFirstSeqNumInSegment) {
1863                 ALOGV("dropping stale access-unit (%d < %d)",
1864                      seqNum, track->mFirstSeqNumInSegment);
1865                 continue;
1866             }
1867 
1868             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1869                 postQueueAccessUnit(trackIndex, accessUnit);
1870             }
1871         }
1872         return OK;
1873     }
1874 
onAccessUnitCompleteMyHandler1875     void onAccessUnitComplete(
1876             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1877         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1878         track->mPackets.push_back(accessUnit);
1879 
1880         uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1881         ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum);
1882 
1883         if(!mPlayResponseParsed){
1884             ALOGV("play response is not parsed");
1885             return;
1886         }
1887 
1888         handleFirstAccessUnit();
1889 
1890         if (!mAllTracksHaveTime) {
1891             ALOGV("storing accessUnit, no time established yet");
1892             return;
1893         }
1894 
1895         if (OK != processAccessUnitQueue(trackIndex)) {
1896             return;
1897         }
1898 
1899         if (track->mEOSReceived) {
1900             postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1901             track->mEOSReceived = false;
1902         }
1903     }
1904 
addMediaTimestampMyHandler1905     bool addMediaTimestamp(
1906             int32_t trackIndex, const TrackInfo *track,
1907             const sp<ABuffer> &accessUnit) {
1908         UNUSED_UNLESS_VERBOSE(trackIndex);
1909 
1910         uint32_t rtpTime;
1911         CHECK(accessUnit->meta()->findInt32(
1912                     "rtp-time", (int32_t *)&rtpTime));
1913 
1914         int64_t relRtpTimeUs =
1915             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1916                 / track->mTimeScale;
1917 
1918         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1919 
1920         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1921 
1922         if (mediaTimeUs > mLastMediaTimeUs) {
1923             mLastMediaTimeUs = mediaTimeUs;
1924         }
1925 
1926         if (mediaTimeUs < 0 && !mSeekable) {
1927             ALOGV("dropping early accessUnit.");
1928             return false;
1929         }
1930 
1931         ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1932              trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1933 
1934         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1935 
1936         return true;
1937     }
1938 
postQueueAccessUnitMyHandler1939     void postQueueAccessUnit(
1940             size_t trackIndex, const sp<ABuffer> &accessUnit) {
1941         sp<AMessage> msg = mNotify->dup();
1942         msg->setInt32("what", kWhatAccessUnit);
1943         msg->setSize("trackIndex", trackIndex);
1944         msg->setBuffer("accessUnit", accessUnit);
1945         msg->post();
1946     }
1947 
postQueueEOSMyHandler1948     void postQueueEOS(size_t trackIndex, status_t finalResult) {
1949         sp<AMessage> msg = mNotify->dup();
1950         msg->setInt32("what", kWhatEOS);
1951         msg->setSize("trackIndex", trackIndex);
1952         msg->setInt32("finalResult", finalResult);
1953         msg->post();
1954     }
1955 
postQueueSeekDiscontinuityMyHandler1956     void postQueueSeekDiscontinuity(size_t trackIndex) {
1957         sp<AMessage> msg = mNotify->dup();
1958         msg->setInt32("what", kWhatSeekDiscontinuity);
1959         msg->setSize("trackIndex", trackIndex);
1960         msg->post();
1961     }
1962 
postNormalPlayTimeMappingMyHandler1963     void postNormalPlayTimeMapping(
1964             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1965         sp<AMessage> msg = mNotify->dup();
1966         msg->setInt32("what", kWhatNormalPlayTimeMapping);
1967         msg->setSize("trackIndex", trackIndex);
1968         msg->setInt32("rtpTime", rtpTime);
1969         msg->setInt64("nptUs", nptUs);
1970         msg->post();
1971     }
1972 
postTimeoutMyHandler1973     void postTimeout() {
1974         sp<AMessage> timeout = new AMessage('tiou', this);
1975         mCheckTimeoutGeneration++;
1976         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1977 
1978         int64_t startupTimeoutUs;
1979         startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs);
1980         timeout->post(startupTimeoutUs);
1981     }
1982 
1983     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1984 };
1985 
1986 }  // namespace android
1987 
1988 #endif  // MY_HANDLER_H_
1989