1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
34import threading
35import time
36import unittest
37
38from datetime import datetime
39
40import adb
41
42def requires_root(func):
43    def wrapper(self, *args):
44        if self.device.get_prop('ro.debuggable') != '1':
45            raise unittest.SkipTest('requires rootable build')
46
47        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48        if not was_root:
49            self.device.root()
50            self.device.wait()
51
52        try:
53            func(self, *args)
54        finally:
55            if not was_root:
56                self.device.unroot()
57                self.device.wait()
58
59    return wrapper
60
61
62def requires_non_root(func):
63    def wrapper(self, *args):
64        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
65        if was_root:
66            self.device.unroot()
67            self.device.wait()
68
69        try:
70            func(self, *args)
71        finally:
72            if was_root:
73                self.device.root()
74                self.device.wait()
75
76    return wrapper
77
78
79class DeviceTest(unittest.TestCase):
80    device = adb.get_device()
81
82
83class AbbTest(DeviceTest):
84    def test_smoke(self):
85        abb = subprocess.run(['adb', 'abb'], capture_output=True)
86        cmd = subprocess.run(['adb', 'shell', 'cmd'], capture_output=True)
87
88        # abb squashes all failures to 1.
89        self.assertEqual(abb.returncode == 0, cmd.returncode == 0)
90        self.assertEqual(abb.stdout, cmd.stdout)
91        self.assertEqual(abb.stderr, cmd.stderr)
92
93class ForwardReverseTest(DeviceTest):
94    def _test_no_rebind(self, description, direction_list, direction,
95                       direction_no_rebind, direction_remove_all):
96        msg = direction_list()
97        self.assertEqual('', msg.strip(),
98                         description + ' list must be empty to run this test.')
99
100        # Use --no-rebind with no existing binding
101        direction_no_rebind('tcp:5566', 'tcp:6655')
102        msg = direction_list()
103        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
104
105        # Use --no-rebind with existing binding
106        with self.assertRaises(subprocess.CalledProcessError):
107            direction_no_rebind('tcp:5566', 'tcp:6677')
108        msg = direction_list()
109        self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
110        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
111
112        # Use the absence of --no-rebind with existing binding
113        direction('tcp:5566', 'tcp:6677')
114        msg = direction_list()
115        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
116        self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
117
118        direction_remove_all()
119        msg = direction_list()
120        self.assertEqual('', msg.strip())
121
122    def test_forward_no_rebind(self):
123        self._test_no_rebind('forward', self.device.forward_list,
124                            self.device.forward, self.device.forward_no_rebind,
125                            self.device.forward_remove_all)
126
127    def test_reverse_no_rebind(self):
128        self._test_no_rebind('reverse', self.device.reverse_list,
129                            self.device.reverse, self.device.reverse_no_rebind,
130                            self.device.reverse_remove_all)
131
132    def test_forward(self):
133        msg = self.device.forward_list()
134        self.assertEqual('', msg.strip(),
135                         'Forwarding list must be empty to run this test.')
136        self.device.forward('tcp:5566', 'tcp:6655')
137        msg = self.device.forward_list()
138        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
139        self.device.forward('tcp:7788', 'tcp:8877')
140        msg = self.device.forward_list()
141        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
142        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
143        self.device.forward_remove('tcp:5566')
144        msg = self.device.forward_list()
145        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
146        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
147        self.device.forward_remove_all()
148        msg = self.device.forward_list()
149        self.assertEqual('', msg.strip())
150
151    def test_forward_old_protocol(self):
152        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
153
154        msg = self.device.forward_list()
155        self.assertEqual('', msg.strip(),
156                         'Forwarding list must be empty to run this test.')
157
158        s = socket.create_connection(("localhost", 5037))
159        service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno
160        cmd = b"%04x%s" % (len(service), service)
161        s.sendall(cmd)
162
163        msg = self.device.forward_list()
164        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
165
166        self.device.forward_remove_all()
167        msg = self.device.forward_list()
168        self.assertEqual('', msg.strip())
169
170    def test_forward_tcp_port_0(self):
171        self.assertEqual('', self.device.forward_list().strip(),
172                         'Forwarding list must be empty to run this test.')
173
174        try:
175            # If resolving TCP port 0 is supported, `adb forward` will print
176            # the actual port number.
177            port = self.device.forward('tcp:0', 'tcp:8888').strip()
178            if not port:
179                raise unittest.SkipTest('Forwarding tcp:0 is not available.')
180
181            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
182                                      self.device.forward_list()))
183        finally:
184            self.device.forward_remove_all()
185
186    def test_reverse(self):
187        msg = self.device.reverse_list()
188        self.assertEqual('', msg.strip(),
189                         'Reverse forwarding list must be empty to run this test.')
190        self.device.reverse('tcp:5566', 'tcp:6655')
191        msg = self.device.reverse_list()
192        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
193        self.device.reverse('tcp:7788', 'tcp:8877')
194        msg = self.device.reverse_list()
195        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
196        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
197        self.device.reverse_remove('tcp:5566')
198        msg = self.device.reverse_list()
199        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
200        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
201        self.device.reverse_remove_all()
202        msg = self.device.reverse_list()
203        self.assertEqual('', msg.strip())
204
205    def test_reverse_tcp_port_0(self):
206        self.assertEqual('', self.device.reverse_list().strip(),
207                         'Reverse list must be empty to run this test.')
208
209        try:
210            # If resolving TCP port 0 is supported, `adb reverse` will print
211            # the actual port number.
212            port = self.device.reverse('tcp:0', 'tcp:8888').strip()
213            if not port:
214                raise unittest.SkipTest('Reversing tcp:0 is not available.')
215
216            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
217                                      self.device.reverse_list()))
218        finally:
219            self.device.reverse_remove_all()
220
221    def test_forward_reverse_echo(self):
222        """Send data through adb forward and read it back via adb reverse"""
223        forward_port = 12345
224        reverse_port = forward_port + 1
225        forward_spec = 'tcp:' + str(forward_port)
226        reverse_spec = 'tcp:' + str(reverse_port)
227        forward_setup = False
228        reverse_setup = False
229
230        try:
231            # listen on localhost:forward_port, connect to remote:forward_port
232            self.device.forward(forward_spec, forward_spec)
233            forward_setup = True
234            # listen on remote:forward_port, connect to localhost:reverse_port
235            self.device.reverse(forward_spec, reverse_spec)
236            reverse_setup = True
237
238            listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
239            with contextlib.closing(listener):
240                # Use SO_REUSEADDR so that subsequent runs of the test can grab
241                # the port even if it is in TIME_WAIT.
242                listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
243
244                # Listen on localhost:reverse_port before connecting to
245                # localhost:forward_port because that will cause adb to connect
246                # back to localhost:reverse_port.
247                listener.bind(('127.0.0.1', reverse_port))
248                listener.listen(4)
249
250                client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
251                with contextlib.closing(client):
252                    # Connect to the listener.
253                    client.connect(('127.0.0.1', forward_port))
254
255                    # Accept the client connection.
256                    accepted_connection, addr = listener.accept()
257                    with contextlib.closing(accepted_connection) as server:
258                        data = b'hello'
259
260                        # Send data into the port setup by adb forward.
261                        client.sendall(data)
262                        # Explicitly close() so that server gets EOF.
263                        client.close()
264
265                        # Verify that the data came back via adb reverse.
266                        self.assertEqual(data, server.makefile().read().encode("utf8"))
267        finally:
268            if reverse_setup:
269                self.device.reverse_remove(forward_spec)
270            if forward_setup:
271                self.device.forward_remove(forward_spec)
272
273
274class ShellTest(DeviceTest):
275    def _interactive_shell(self, shell_args, input):
276        """Runs an interactive adb shell.
277
278        Args:
279          shell_args: List of string arguments to `adb shell`.
280          input: bytes input to send to the interactive shell.
281
282        Returns:
283          The remote exit code.
284
285        Raises:
286          unittest.SkipTest: The device doesn't support exit codes.
287        """
288        if not self.device.has_shell_protocol():
289            raise unittest.SkipTest('exit codes are unavailable on this device')
290
291        proc = subprocess.Popen(
292                self.device.adb_cmd + ['shell'] + shell_args,
293                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
294                stderr=subprocess.PIPE)
295        # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
296        # to explicitly add an exit command to close the session from the device
297        # side, plus the necessary newline to complete the interactive command.
298        proc.communicate(input + b'; exit\n')
299        return proc.returncode
300
301    def test_cat(self):
302        """Check that we can at least cat a file."""
303        out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
304        elements = out.split()
305        self.assertEqual(len(elements), 2)
306
307        uptime, idle = elements
308        self.assertGreater(float(uptime), 0.0)
309        self.assertGreater(float(idle), 0.0)
310
311    def test_throws_on_failure(self):
312        self.assertRaises(adb.ShellError, self.device.shell, ['false'])
313
314    def test_output_not_stripped(self):
315        out = self.device.shell(['echo', 'foo'])[0]
316        self.assertEqual(out, 'foo' + self.device.linesep)
317
318    def test_shell_command_length(self):
319        # Devices that have shell_v2 should be able to handle long commands.
320        if self.device.has_shell_protocol():
321            rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384])
322            self.assertEqual(rc, 0)
323            self.assertTrue(out == ('x' * 16384 + '\n'))
324
325    def test_shell_nocheck_failure(self):
326        rc, out, _ = self.device.shell_nocheck(['false'])
327        self.assertNotEqual(rc, 0)
328        self.assertEqual(out, '')
329
330    def test_shell_nocheck_output_not_stripped(self):
331        rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
332        self.assertEqual(rc, 0)
333        self.assertEqual(out, 'foo' + self.device.linesep)
334
335    def test_can_distinguish_tricky_results(self):
336        # If result checking on ADB shell is naively implemented as
337        # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
338        # output from the result for a cmd of `echo -n 1`.
339        rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
340        self.assertEqual(rc, 0)
341        self.assertEqual(out, '1')
342
343    def test_line_endings(self):
344        """Ensure that line ending translation is not happening in the pty.
345
346        Bug: http://b/19735063
347        """
348        output = self.device.shell(['uname'])[0]
349        self.assertEqual(output, 'Linux' + self.device.linesep)
350
351    def test_pty_logic(self):
352        """Tests that a PTY is allocated when it should be.
353
354        PTY allocation behavior should match ssh.
355        """
356        def check_pty(args):
357            """Checks adb shell PTY allocation.
358
359            Tests |args| for terminal and non-terminal stdin.
360
361            Args:
362                args: -Tt args in a list (e.g. ['-t', '-t']).
363
364            Returns:
365                A tuple (<terminal>, <non-terminal>). True indicates
366                the corresponding shell allocated a remote PTY.
367            """
368            test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
369
370            terminal = subprocess.Popen(
371                    test_cmd, stdin=None,
372                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
373            terminal.communicate()
374
375            non_terminal = subprocess.Popen(
376                    test_cmd, stdin=subprocess.PIPE,
377                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
378            non_terminal.communicate()
379
380            return (terminal.returncode == 0, non_terminal.returncode == 0)
381
382        # -T: never allocate PTY.
383        self.assertEqual((False, False), check_pty(['-T']))
384
385        # These tests require a new device.
386        if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
387            # No args: PTY only if stdin is a terminal and shell is interactive,
388            # which is difficult to reliably test from a script.
389            self.assertEqual((False, False), check_pty([]))
390
391            # -t: PTY if stdin is a terminal.
392            self.assertEqual((True, False), check_pty(['-t']))
393
394        # -t -t: always allocate PTY.
395        self.assertEqual((True, True), check_pty(['-t', '-t']))
396
397        # -tt: always allocate PTY, POSIX style (http://b/32216152).
398        self.assertEqual((True, True), check_pty(['-tt']))
399
400        # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
401        # we follow the man page instead.
402        self.assertEqual((True, True), check_pty(['-ttt']))
403
404        # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
405        self.assertEqual((True, True), check_pty(['-ttx']))
406
407        # -Ttt: -tt cancels out -T.
408        self.assertEqual((True, True), check_pty(['-Ttt']))
409
410        # -ttT: -T cancels out -tt.
411        self.assertEqual((False, False), check_pty(['-ttT']))
412
413    def test_shell_protocol(self):
414        """Tests the shell protocol on the device.
415
416        If the device supports shell protocol, this gives us the ability
417        to separate stdout/stderr and return the exit code directly.
418
419        Bug: http://b/19734861
420        """
421        if not self.device.has_shell_protocol():
422            raise unittest.SkipTest('shell protocol unsupported on this device')
423
424        # Shell protocol should be used by default.
425        result = self.device.shell_nocheck(
426                shlex.split('echo foo; echo bar >&2; exit 17'))
427        self.assertEqual(17, result[0])
428        self.assertEqual('foo' + self.device.linesep, result[1])
429        self.assertEqual('bar' + self.device.linesep, result[2])
430
431        self.assertEqual(17, self._interactive_shell([], b'exit 17'))
432
433        # -x flag should disable shell protocol.
434        result = self.device.shell_nocheck(
435                shlex.split('-x echo foo; echo bar >&2; exit 17'))
436        self.assertEqual(0, result[0])
437        self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
438        self.assertEqual('', result[2])
439
440        self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17'))
441
442    def test_non_interactive_sigint(self):
443        """Tests that SIGINT in a non-interactive shell kills the process.
444
445        This requires the shell protocol in order to detect the broken
446        pipe; raw data transfer mode will only see the break once the
447        subprocess tries to read or write.
448
449        Bug: http://b/23825725
450        """
451        if not self.device.has_shell_protocol():
452            raise unittest.SkipTest('shell protocol unsupported on this device')
453
454        # Start a long-running process.
455        sleep_proc = subprocess.Popen(
456                self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
457                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
458                stderr=subprocess.STDOUT)
459        remote_pid = sleep_proc.stdout.readline().strip().decode("utf8")
460        self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
461        proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
462
463        # Verify that the process is running, send signal, verify it stopped.
464        self.device.shell(proc_query)
465        os.kill(sleep_proc.pid, signal.SIGINT)
466        sleep_proc.communicate()
467
468        # It can take some time for the process to receive the signal and die.
469        end_time = time.time() + 3
470        while self.device.shell_nocheck(proc_query)[0] != 1:
471            self.assertFalse(time.time() > end_time,
472                             'subprocess failed to terminate in time')
473
474    def test_non_interactive_stdin(self):
475        """Tests that non-interactive shells send stdin."""
476        if not self.device.has_shell_protocol():
477            raise unittest.SkipTest('non-interactive stdin unsupported '
478                                    'on this device')
479
480        # Test both small and large inputs.
481        small_input = b'foo'
482        characters = [c.encode("utf8") for c in string.ascii_letters + string.digits]
483        large_input = b'\n'.join(characters)
484
485
486        for input in (small_input, large_input):
487            proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
488                                    stdin=subprocess.PIPE,
489                                    stdout=subprocess.PIPE,
490                                    stderr=subprocess.PIPE)
491            stdout, stderr = proc.communicate(input)
492            self.assertEqual(input.splitlines(), stdout.splitlines())
493            self.assertEqual(b'', stderr)
494
495    def test_sighup(self):
496        """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
497        log_path = "/data/local/tmp/adb_signal_test.log"
498
499        # Clear the output file.
500        self.device.shell_nocheck(["echo", ">", log_path])
501
502        script = """
503            trap "echo SIGINT > {path}; exit 0" SIGINT
504            trap "echo SIGHUP > {path}; exit 0" SIGHUP
505            echo Waiting
506            read
507        """.format(path=log_path)
508
509        script = ";".join([x.strip() for x in script.strip().splitlines()])
510
511        process = self.device.shell_popen([script], kill_atexit=False,
512                                          stdin=subprocess.PIPE,
513                                          stdout=subprocess.PIPE)
514
515        self.assertEqual(b"Waiting\n", process.stdout.readline())
516        process.send_signal(signal.SIGINT)
517        process.wait()
518
519        # Waiting for the local adb to finish is insufficient, since it hangs
520        # up immediately.
521        time.sleep(1)
522
523        stdout, _ = self.device.shell(["cat", log_path])
524        self.assertEqual(stdout.strip(), "SIGHUP")
525
526    def test_exit_stress(self):
527        """Hammer `adb shell exit 42` with multiple threads."""
528        thread_count = 48
529        result = dict()
530        def hammer(thread_idx, thread_count, result):
531            success = True
532            for i in range(thread_idx, 240, thread_count):
533                ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)])
534                if ret != i % 256:
535                    success = False
536                    break
537            result[thread_idx] = success
538
539        threads = []
540        for i in range(thread_count):
541            thread = threading.Thread(target=hammer, args=(i, thread_count, result))
542            thread.start()
543            threads.append(thread)
544        for thread in threads:
545            thread.join()
546        for i, success in result.items():
547            self.assertTrue(success)
548
549    def disabled_test_parallel(self):
550        """Spawn a bunch of `adb shell` instances in parallel.
551
552        This was broken historically due to the use of select, which only works
553        for fds that are numerically less than 1024.
554
555        Bug: http://b/141955761"""
556
557        n_procs = 2048
558        procs = dict()
559        for i in range(0, n_procs):
560            procs[i] = subprocess.Popen(
561                ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'],
562                stdin=subprocess.PIPE,
563                stdout=subprocess.PIPE
564            )
565
566        for i in range(0, n_procs):
567            procs[i].stdin.write("%d\n" % i)
568
569        for i in range(0, n_procs):
570            response = procs[i].stdout.readline()
571            assert(response == "%d\n" % i)
572
573        for i in range(0, n_procs):
574            procs[i].stdin.write("%d\n" % (i % 256))
575
576        for i in range(0, n_procs):
577            assert(procs[i].wait() == i % 256)
578
579
580class ArgumentEscapingTest(DeviceTest):
581    def test_shell_escaping(self):
582        """Make sure that argument escaping is somewhat sane."""
583
584        # http://b/19734868
585        # Note that this actually matches ssh(1)'s behavior --- it's
586        # converted to `sh -c echo hello; echo world` which sh interprets
587        # as `sh -c echo` (with an argument to that shell of "hello"),
588        # and then `echo world` back in the first shell.
589        result = self.device.shell(
590            shlex.split("sh -c 'echo hello; echo world'"))[0]
591        result = result.splitlines()
592        self.assertEqual(['', 'world'], result)
593        # If you really wanted "hello" and "world", here's what you'd do:
594        result = self.device.shell(
595            shlex.split(r'echo hello\;echo world'))[0].splitlines()
596        self.assertEqual(['hello', 'world'], result)
597
598        # http://b/15479704
599        result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
600        self.assertEqual('t', result)
601        result = self.device.shell(
602            shlex.split("sh -c 'true && echo t'"))[0].strip()
603        self.assertEqual('t', result)
604
605        # http://b/20564385
606        result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
607        self.assertEqual('t', result)
608        result = self.device.shell(
609            shlex.split(r'echo -n 123\;uname'))[0].strip()
610        self.assertEqual('123Linux', result)
611
612    def test_install_argument_escaping(self):
613        """Make sure that install argument escaping works."""
614        # http://b/20323053, http://b/3090932.
615        for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"):
616            tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
617                                             delete=False)
618            tf.close()
619
620            # Installing bogus .apks fails if the device supports exit codes.
621            try:
622                output = self.device.install(tf.name.decode("utf8"))
623            except subprocess.CalledProcessError as e:
624                output = e.output
625
626            self.assertIn(file_suffix, output)
627            os.remove(tf.name)
628
629
630class RootUnrootTest(DeviceTest):
631    def _test_root(self):
632        message = self.device.root()
633        if 'adbd cannot run as root in production builds' in message:
634            return
635        self.device.wait()
636        self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
637
638    def _test_unroot(self):
639        self.device.unroot()
640        self.device.wait()
641        self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
642
643    def test_root_unroot(self):
644        """Make sure that adb root and adb unroot work, using id(1)."""
645        if self.device.get_prop('ro.debuggable') != '1':
646            raise unittest.SkipTest('requires rootable build')
647
648        original_user = self.device.shell(['id', '-un'])[0].strip()
649        try:
650            if original_user == 'root':
651                self._test_unroot()
652                self._test_root()
653            elif original_user == 'shell':
654                self._test_root()
655                self._test_unroot()
656        finally:
657            if original_user == 'root':
658                self.device.root()
659            else:
660                self.device.unroot()
661            self.device.wait()
662
663
664class TcpIpTest(DeviceTest):
665    def test_tcpip_failure_raises(self):
666        """adb tcpip requires a port.
667
668        Bug: http://b/22636927
669        """
670        self.assertRaises(
671            subprocess.CalledProcessError, self.device.tcpip, '')
672        self.assertRaises(
673            subprocess.CalledProcessError, self.device.tcpip, 'foo')
674
675
676class SystemPropertiesTest(DeviceTest):
677    def test_get_prop(self):
678        self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
679
680    @requires_root
681    def test_set_prop(self):
682        prop_name = 'foo.bar'
683        self.device.shell(['setprop', prop_name, '""'])
684
685        self.device.set_prop(prop_name, 'qux')
686        self.assertEqual(
687            self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
688
689
690def compute_md5(string):
691    hsh = hashlib.md5()
692    hsh.update(string)
693    return hsh.hexdigest()
694
695
696def get_md5_prog(device):
697    """Older platforms (pre-L) had the name md5 rather than md5sum."""
698    try:
699        device.shell(['md5sum', '/proc/uptime'])
700        return 'md5sum'
701    except adb.ShellError:
702        return 'md5'
703
704
705class HostFile(object):
706    def __init__(self, handle, checksum):
707        self.handle = handle
708        self.checksum = checksum
709        self.full_path = handle.name
710        self.base_name = os.path.basename(self.full_path)
711
712
713class DeviceFile(object):
714    def __init__(self, checksum, full_path):
715        self.checksum = checksum
716        self.full_path = full_path
717        self.base_name = posixpath.basename(self.full_path)
718
719
720def make_random_host_files(in_dir, num_files):
721    min_size = 1 * (1 << 10)
722    max_size = 16 * (1 << 10)
723
724    files = []
725    for _ in range(num_files):
726        file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
727
728        size = random.randrange(min_size, max_size, 1024)
729        rand_str = os.urandom(size)
730        file_handle.write(rand_str)
731        file_handle.flush()
732        file_handle.close()
733
734        md5 = compute_md5(rand_str)
735        files.append(HostFile(file_handle, md5))
736    return files
737
738
739def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
740    min_size = 1 * (1 << 10)
741    max_size = 16 * (1 << 10)
742
743    files = []
744    for file_num in range(num_files):
745        size = random.randrange(min_size, max_size, 1024)
746
747        base_name = prefix + str(file_num)
748        full_path = posixpath.join(in_dir, base_name)
749
750        device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
751                      'bs={}'.format(size), 'count=1'])
752        dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
753
754        files.append(DeviceFile(dev_md5, full_path))
755    return files
756
757
758class FileOperationsTest:
759    class Base(DeviceTest):
760        SCRATCH_DIR = '/data/local/tmp'
761        DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
762        DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
763
764        def setUp(self):
765            self.previous_env = os.environ.get("ADB_COMPRESSION")
766            os.environ["ADB_COMPRESSION"] = self.compression
767
768        def tearDown(self):
769            if self.previous_env is None:
770                del os.environ["ADB_COMPRESSION"]
771            else:
772                os.environ["ADB_COMPRESSION"] = self.previous_env
773
774        def _verify_remote(self, checksum, remote_path):
775            dev_md5, _ = self.device.shell([get_md5_prog(self.device),
776                                            remote_path])[0].split()
777            self.assertEqual(checksum, dev_md5)
778
779        def _verify_local(self, checksum, local_path):
780            with open(local_path, 'rb') as host_file:
781                host_md5 = compute_md5(host_file.read())
782                self.assertEqual(host_md5, checksum)
783
784        def test_push(self):
785            """Push a randomly generated file to specified device."""
786            kbytes = 512
787            tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
788            rand_str = os.urandom(1024 * kbytes)
789            tmp.write(rand_str)
790            tmp.close()
791
792            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
793            self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
794
795            self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
796            self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
797
798            os.remove(tmp.name)
799
800        def test_push_dir(self):
801            """Push a randomly generated directory of files to the device."""
802            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
803            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
804
805            try:
806                host_dir = tempfile.mkdtemp()
807
808                # Make sure the temp directory isn't setuid, or else adb will complain.
809                os.chmod(host_dir, 0o700)
810
811                # Create 32 random files.
812                temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
813                self.device.push(host_dir, self.DEVICE_TEMP_DIR)
814
815                for temp_file in temp_files:
816                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
817                                                 os.path.basename(host_dir),
818                                                 temp_file.base_name)
819                    self._verify_remote(temp_file.checksum, remote_path)
820                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
821            finally:
822                if host_dir is not None:
823                    shutil.rmtree(host_dir)
824
825        def disabled_test_push_empty(self):
826            """Push an empty directory to the device."""
827            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
828            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
829
830            try:
831                host_dir = tempfile.mkdtemp()
832
833                # Make sure the temp directory isn't setuid, or else adb will complain.
834                os.chmod(host_dir, 0o700)
835
836                # Create an empty directory.
837                empty_dir_path = os.path.join(host_dir, 'empty')
838                os.mkdir(empty_dir_path);
839
840                self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR)
841
842                remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty")
843                test_empty_cmd = ["[", "-d", remote_path, "]"]
844                rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
845
846                self.assertEqual(rc, 0)
847                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
848            finally:
849                if host_dir is not None:
850                    shutil.rmtree(host_dir)
851
852        @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
853        def test_push_symlink(self):
854            """Push a symlink.
855
856            Bug: http://b/31491920
857            """
858            try:
859                host_dir = tempfile.mkdtemp()
860
861                # Make sure the temp directory isn't setuid, or else adb will
862                # complain.
863                os.chmod(host_dir, 0o700)
864
865                with open(os.path.join(host_dir, 'foo'), 'w') as f:
866                    f.write('foo')
867
868                symlink_path = os.path.join(host_dir, 'symlink')
869                os.symlink('foo', symlink_path)
870
871                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
872                self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
873                self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
874                rc, out, _ = self.device.shell_nocheck(
875                    ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
876                self.assertEqual(0, rc)
877                self.assertEqual(out.strip(), 'foo')
878            finally:
879                if host_dir is not None:
880                    shutil.rmtree(host_dir)
881
882        def test_multiple_push(self):
883            """Push multiple files to the device in one adb push command.
884
885            Bug: http://b/25324823
886            """
887
888            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
889            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
890
891            try:
892                host_dir = tempfile.mkdtemp()
893
894                # Create some random files and a subdirectory containing more files.
895                temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
896
897                subdir = os.path.join(host_dir, 'subdir')
898                os.mkdir(subdir)
899                subdir_temp_files = make_random_host_files(in_dir=subdir,
900                                                           num_files=4)
901
902                paths = [x.full_path for x in temp_files]
903                paths.append(subdir)
904                self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
905
906                for temp_file in temp_files:
907                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
908                                                 temp_file.base_name)
909                    self._verify_remote(temp_file.checksum, remote_path)
910
911                for subdir_temp_file in subdir_temp_files:
912                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
913                                                 # BROKEN: http://b/25394682
914                                                 # 'subdir';
915                                                 temp_file.base_name)
916                    self._verify_remote(temp_file.checksum, remote_path)
917
918
919                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
920            finally:
921                if host_dir is not None:
922                    shutil.rmtree(host_dir)
923
924        @requires_non_root
925        def test_push_error_reporting(self):
926            """Make sure that errors that occur while pushing a file get reported
927
928            Bug: http://b/26816782
929            """
930            with tempfile.NamedTemporaryFile() as tmp_file:
931                tmp_file.write(b'\0' * 1024 * 1024)
932                tmp_file.flush()
933                try:
934                    self.device.push(local=tmp_file.name, remote='/system/')
935                    self.fail('push should not have succeeded')
936                except subprocess.CalledProcessError as e:
937                    output = e.output
938
939                self.assertTrue(b'Permission denied' in output or
940                                b'Read-only file system' in output)
941
942        @requires_non_root
943        def test_push_directory_creation(self):
944            """Regression test for directory creation.
945
946            Bug: http://b/110953234
947            """
948            with tempfile.NamedTemporaryFile() as tmp_file:
949                tmp_file.write(b'\0' * 1024 * 1024)
950                tmp_file.flush()
951                remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation'
952                self.device.shell(['rm', '-rf', remote_path])
953
954                remote_path += '/filename'
955                self.device.push(local=tmp_file.name, remote=remote_path)
956
957        def disabled_test_push_multiple_slash_root(self):
958            """Regression test for pushing to //data/local/tmp.
959
960            Bug: http://b/141311284
961
962            Disabled because this broken on the adbd side as well: b/141943968
963            """
964            with tempfile.NamedTemporaryFile() as tmp_file:
965                tmp_file.write('\0' * 1024 * 1024)
966                tmp_file.flush()
967                remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root'
968                self.device.shell(['rm', '-rf', remote_path])
969                self.device.push(local=tmp_file.name, remote=remote_path)
970
971        def _test_pull(self, remote_file, checksum):
972            tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
973            tmp_write.close()
974            self.device.pull(remote=remote_file, local=tmp_write.name)
975            with open(tmp_write.name, 'rb') as tmp_read:
976                host_contents = tmp_read.read()
977                host_md5 = compute_md5(host_contents)
978            self.assertEqual(checksum, host_md5)
979            os.remove(tmp_write.name)
980
981        @requires_non_root
982        def test_pull_error_reporting(self):
983            self.device.shell(['touch', self.DEVICE_TEMP_FILE])
984            self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
985
986            try:
987                output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
988            except subprocess.CalledProcessError as e:
989                output = e.output
990
991            self.assertIn(b'Permission denied', output)
992
993            self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
994
995        def test_pull(self):
996            """Pull a randomly generated file from specified device."""
997            kbytes = 512
998            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
999            cmd = ['dd', 'if=/dev/urandom',
1000                   'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
1001                   'count={}'.format(kbytes)]
1002            self.device.shell(cmd)
1003            dev_md5, _ = self.device.shell(
1004                [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
1005            self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
1006            self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
1007
1008        def test_pull_dir(self):
1009            """Pull a randomly generated directory of files from the device."""
1010            try:
1011                host_dir = tempfile.mkdtemp()
1012
1013                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1014                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1015
1016                # Populate device directory with random files.
1017                temp_files = make_random_device_files(
1018                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1019
1020                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1021
1022                for temp_file in temp_files:
1023                    host_path = os.path.join(
1024                        host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1025                        temp_file.base_name)
1026                    self._verify_local(temp_file.checksum, host_path)
1027
1028                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1029            finally:
1030                if host_dir is not None:
1031                    shutil.rmtree(host_dir)
1032
1033        def test_pull_dir_symlink(self):
1034            """Pull a directory into a symlink to a directory.
1035
1036            Bug: http://b/27362811
1037            """
1038            if os.name != 'posix':
1039                raise unittest.SkipTest('requires POSIX')
1040
1041            try:
1042                host_dir = tempfile.mkdtemp()
1043                real_dir = os.path.join(host_dir, 'dir')
1044                symlink = os.path.join(host_dir, 'symlink')
1045                os.mkdir(real_dir)
1046                os.symlink(real_dir, symlink)
1047
1048                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1049                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1050
1051                # Populate device directory with random files.
1052                temp_files = make_random_device_files(
1053                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1054
1055                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
1056
1057                for temp_file in temp_files:
1058                    host_path = os.path.join(
1059                        real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1060                        temp_file.base_name)
1061                    self._verify_local(temp_file.checksum, host_path)
1062
1063                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1064            finally:
1065                if host_dir is not None:
1066                    shutil.rmtree(host_dir)
1067
1068        def test_pull_dir_symlink_collision(self):
1069            """Pull a directory into a colliding symlink to directory."""
1070            if os.name != 'posix':
1071                raise unittest.SkipTest('requires POSIX')
1072
1073            try:
1074                host_dir = tempfile.mkdtemp()
1075                real_dir = os.path.join(host_dir, 'real')
1076                tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
1077                symlink = os.path.join(host_dir, tmp_dirname)
1078                os.mkdir(real_dir)
1079                os.symlink(real_dir, symlink)
1080
1081                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1082                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1083
1084                # Populate device directory with random files.
1085                temp_files = make_random_device_files(
1086                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1087
1088                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1089
1090                for temp_file in temp_files:
1091                    host_path = os.path.join(real_dir, temp_file.base_name)
1092                    self._verify_local(temp_file.checksum, host_path)
1093
1094                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1095            finally:
1096                if host_dir is not None:
1097                    shutil.rmtree(host_dir)
1098
1099        def test_pull_dir_nonexistent(self):
1100            """Pull a directory of files from the device to a nonexistent path."""
1101            try:
1102                host_dir = tempfile.mkdtemp()
1103                dest_dir = os.path.join(host_dir, 'dest')
1104
1105                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1106                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1107
1108                # Populate device directory with random files.
1109                temp_files = make_random_device_files(
1110                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1111
1112                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1113
1114                for temp_file in temp_files:
1115                    host_path = os.path.join(dest_dir, temp_file.base_name)
1116                    self._verify_local(temp_file.checksum, host_path)
1117
1118                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1119            finally:
1120                if host_dir is not None:
1121                    shutil.rmtree(host_dir)
1122
1123        # selinux prevents adbd from accessing symlinks on /data/local/tmp.
1124        def disabled_test_pull_symlink_dir(self):
1125            """Pull a symlink to a directory of symlinks to files."""
1126            try:
1127                host_dir = tempfile.mkdtemp()
1128
1129                remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1130                remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1131                remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1132
1133                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1134                self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1135                self.device.shell(['ln', '-s', remote_links, remote_symlink])
1136
1137                # Populate device directory with random files.
1138                temp_files = make_random_device_files(
1139                    self.device, in_dir=remote_dir, num_files=32)
1140
1141                for temp_file in temp_files:
1142                    self.device.shell(
1143                        ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1144                         posixpath.join(remote_links, temp_file.base_name)])
1145
1146                self.device.pull(remote=remote_symlink, local=host_dir)
1147
1148                for temp_file in temp_files:
1149                    host_path = os.path.join(
1150                        host_dir, 'symlink', temp_file.base_name)
1151                    self._verify_local(temp_file.checksum, host_path)
1152
1153                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1154            finally:
1155                if host_dir is not None:
1156                    shutil.rmtree(host_dir)
1157
1158        def test_pull_empty(self):
1159            """Pull a directory containing an empty directory from the device."""
1160            try:
1161                host_dir = tempfile.mkdtemp()
1162
1163                remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1164                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1165                self.device.shell(['mkdir', '-p', remote_empty_path])
1166
1167                self.device.pull(remote=remote_empty_path, local=host_dir)
1168                self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1169            finally:
1170                if host_dir is not None:
1171                    shutil.rmtree(host_dir)
1172
1173        def test_multiple_pull(self):
1174            """Pull a randomly generated directory of files from the device."""
1175
1176            try:
1177                host_dir = tempfile.mkdtemp()
1178
1179                subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
1180                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1181                self.device.shell(['mkdir', '-p', subdir])
1182
1183                # Create some random files and a subdirectory containing more files.
1184                temp_files = make_random_device_files(
1185                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1186
1187                subdir_temp_files = make_random_device_files(
1188                    self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1189
1190                paths = [x.full_path for x in temp_files]
1191                paths.append(subdir)
1192                self.device._simple_call(['pull'] + paths + [host_dir])
1193
1194                for temp_file in temp_files:
1195                    local_path = os.path.join(host_dir, temp_file.base_name)
1196                    self._verify_local(temp_file.checksum, local_path)
1197
1198                for subdir_temp_file in subdir_temp_files:
1199                    local_path = os.path.join(host_dir,
1200                                              'subdir',
1201                                              subdir_temp_file.base_name)
1202                    self._verify_local(subdir_temp_file.checksum, local_path)
1203
1204                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1205            finally:
1206                if host_dir is not None:
1207                    shutil.rmtree(host_dir)
1208
1209        def verify_sync(self, device, temp_files, device_dir):
1210            """Verifies that a list of temp files was synced to the device."""
1211            # Confirm that every file on the device mirrors that on the host.
1212            for temp_file in temp_files:
1213                device_full_path = posixpath.join(
1214                    device_dir, temp_file.base_name)
1215                dev_md5, _ = device.shell(
1216                    [get_md5_prog(self.device), device_full_path])[0].split()
1217                self.assertEqual(temp_file.checksum, dev_md5)
1218
1219        def test_sync(self):
1220            """Sync a host directory to the data partition."""
1221
1222            try:
1223                base_dir = tempfile.mkdtemp()
1224
1225                # Create mirror device directory hierarchy within base_dir.
1226                full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1227                os.makedirs(full_dir_path)
1228
1229                # Create 32 random files within the host mirror.
1230                temp_files = make_random_host_files(
1231                    in_dir=full_dir_path, num_files=32)
1232
1233                # Clean up any stale files on the device.
1234                device = adb.get_device()  # pylint: disable=no-member
1235                device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1236
1237                old_product_out = os.environ.get('ANDROID_PRODUCT_OUT')
1238                os.environ['ANDROID_PRODUCT_OUT'] = base_dir
1239                device.sync('data')
1240                if old_product_out is None:
1241                    del os.environ['ANDROID_PRODUCT_OUT']
1242                else:
1243                    os.environ['ANDROID_PRODUCT_OUT'] = old_product_out
1244
1245                self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR)
1246
1247                #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1248            finally:
1249                if base_dir is not None:
1250                    shutil.rmtree(base_dir)
1251
1252        def test_push_sync(self):
1253            """Sync a host directory to a specific path."""
1254
1255            try:
1256                temp_dir = tempfile.mkdtemp()
1257                temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1258
1259                device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1260
1261                # Clean up any stale files on the device.
1262                device = adb.get_device()  # pylint: disable=no-member
1263                device.shell(['rm', '-rf', device_dir])
1264
1265                device.push(temp_dir, device_dir, sync=True)
1266
1267                self.verify_sync(device, temp_files, device_dir)
1268
1269                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1270            finally:
1271                if temp_dir is not None:
1272                    shutil.rmtree(temp_dir)
1273
1274        def test_push_dry_run_nonexistent_file(self):
1275            """Push with dry run."""
1276
1277            for file_size in [8, 1024 * 1024]:
1278                try:
1279                    device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run')
1280                    device_file = posixpath.join(device_dir, 'file')
1281
1282                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1283                    self.device.shell(['mkdir', '-p', device_dir])
1284
1285                    host_dir = tempfile.mkdtemp()
1286                    host_file = posixpath.join(host_dir, 'file')
1287
1288                    with open(host_file, "w") as f:
1289                        f.write('x' * file_size)
1290
1291                    self.device._simple_call(['push', '-n', host_file, device_file])
1292                    rc, _, _ = self.device.shell_nocheck(['[', '-e', device_file, ']'])
1293                    self.assertNotEqual(0, rc)
1294
1295                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1296                finally:
1297                    if host_dir is not None:
1298                        shutil.rmtree(host_dir)
1299
1300        def test_push_dry_run_existent_file(self):
1301            """Push with dry run."""
1302
1303            for file_size in [8, 1024 * 1024]:
1304                try:
1305                    device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run')
1306                    device_file = posixpath.join(device_dir, 'file')
1307
1308                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1309                    self.device.shell(['mkdir', '-p', device_dir])
1310                    self.device.shell(['echo', 'foo', '>', device_file])
1311
1312                    host_dir = tempfile.mkdtemp()
1313                    host_file = posixpath.join(host_dir, 'file')
1314
1315                    with open(host_file, "w") as f:
1316                        f.write('x' * file_size)
1317
1318                    self.device._simple_call(['push', '-n', host_file, device_file])
1319                    stdout, stderr = self.device.shell(['cat', device_file])
1320                    self.assertEqual(stdout.strip(), "foo")
1321
1322                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1323                finally:
1324                    if host_dir is not None:
1325                        shutil.rmtree(host_dir)
1326
1327        def test_unicode_paths(self):
1328            """Ensure that we can support non-ASCII paths, even on Windows."""
1329            name = u'로보카 폴리'
1330
1331            self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1332            remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1333
1334            ## push.
1335            tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1336            tf.close()
1337            self.device.push(tf.name, remote_path)
1338            os.remove(tf.name)
1339            self.assertFalse(os.path.exists(tf.name))
1340
1341            # Verify that the device ended up with the expected UTF-8 path
1342            output = self.device.shell(
1343                    ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1344            self.assertEqual(remote_path, output)
1345
1346            # pull.
1347            self.device.pull(remote_path, tf.name)
1348            self.assertTrue(os.path.exists(tf.name))
1349            os.remove(tf.name)
1350            self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1351
1352
1353class FileOperationsTestUncompressed(FileOperationsTest.Base):
1354    compression = "none"
1355
1356
1357class FileOperationsTestBrotli(FileOperationsTest.Base):
1358    compression = "brotli"
1359
1360
1361class FileOperationsTestLZ4(FileOperationsTest.Base):
1362    compression = "lz4"
1363
1364
1365class FileOperationsTestZstd(FileOperationsTest.Base):
1366    compression = "zstd"
1367
1368
1369class DeviceOfflineTest(DeviceTest):
1370    def _get_device_state(self, serialno):
1371        output = subprocess.check_output(self.device.adb_cmd + ['devices'])
1372        for line in output.split('\n'):
1373            m = re.match('(\S+)\s+(\S+)', line)
1374            if m and m.group(1) == serialno:
1375                return m.group(2)
1376        return None
1377
1378    def disabled_test_killed_when_pushing_a_large_file(self):
1379        """
1380           While running adb push with a large file, kill adb server.
1381           Occasionally the device becomes offline. Because the device is still
1382           reading data without realizing that the adb server has been restarted.
1383           Test if we can bring the device online automatically now.
1384           http://b/32952319
1385        """
1386        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1387        # 1. Push a large file
1388        file_path = 'tmp_large_file'
1389        try:
1390            fh = open(file_path, 'w')
1391            fh.write('\0' * (100 * 1024 * 1024))
1392            fh.close()
1393            subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp'])
1394            time.sleep(0.1)
1395            # 2. Kill the adb server
1396            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1397            subproc.terminate()
1398        finally:
1399            try:
1400                os.unlink(file_path)
1401            except:
1402                pass
1403        # 3. See if the device still exist.
1404        # Sleep to wait for the adb server exit.
1405        time.sleep(0.5)
1406        # 4. The device should be online
1407        self.assertEqual(self._get_device_state(serialno), 'device')
1408
1409    def disabled_test_killed_when_pulling_a_large_file(self):
1410        """
1411           While running adb pull with a large file, kill adb server.
1412           Occasionally the device can't be connected. Because the device is trying to
1413           send a message larger than what is expected by the adb server.
1414           Test if we can bring the device online automatically now.
1415        """
1416        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1417        file_path = 'tmp_large_file'
1418        try:
1419            # 1. Create a large file on device.
1420            self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file',
1421                               'bs=1000000', 'count=100'])
1422            # 2. Pull the large file on host.
1423            subproc = subprocess.Popen(self.device.adb_cmd +
1424                                       ['pull','/data/local/tmp/tmp_large_file', file_path])
1425            time.sleep(0.1)
1426            # 3. Kill the adb server
1427            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1428            subproc.terminate()
1429        finally:
1430            try:
1431                os.unlink(file_path)
1432            except:
1433                pass
1434        # 4. See if the device still exist.
1435        # Sleep to wait for the adb server exit.
1436        time.sleep(0.5)
1437        self.assertEqual(self._get_device_state(serialno), 'device')
1438
1439
1440    def test_packet_size_regression(self):
1441        """Test for http://b/37783561
1442
1443        Receiving packets of a length divisible by 512 but not 1024 resulted in
1444        the adb client waiting indefinitely for more input.
1445        """
1446        # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n
1447        # Probe some surrounding values as well, for the hell of it.
1448        for base in [512] + list(range(1024, 1024 * 16, 1024)):
1449            for offset in [-6, -5, -4]:
1450                length = base + offset
1451                cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;'
1452                       'echo', 'foo']
1453                rc, stdout, _ = self.device.shell_nocheck(cmd)
1454
1455                self.assertEqual(0, rc)
1456
1457                # Output should be '\0' * length, followed by "foo\n"
1458                self.assertEqual(length, len(stdout) - 4)
1459                self.assertEqual(stdout, "\0" * length + "foo\n")
1460
1461    def test_zero_packet(self):
1462        """Test for http://b/113070258
1463
1464        Make sure that we don't blow up when sending USB transfers that line up
1465        exactly with the USB packet size.
1466        """
1467
1468        local_port = int(self.device.forward("tcp:0", "tcp:12345"))
1469        try:
1470            for size in [512, 1024]:
1471                def listener():
1472                    cmd = ["echo foo | nc -l -p 12345; echo done"]
1473                    rc, stdout, stderr = self.device.shell_nocheck(cmd)
1474
1475                thread = threading.Thread(target=listener)
1476                thread.start()
1477
1478                # Wait a bit to let the shell command start.
1479                time.sleep(0.25)
1480
1481                sock = socket.create_connection(("localhost", local_port))
1482                with contextlib.closing(sock):
1483                    bytesWritten = sock.send(b"a" * size)
1484                    self.assertEqual(size, bytesWritten)
1485                    readBytes = sock.recv(4096)
1486                    self.assertEqual(b"foo\n", readBytes)
1487
1488                thread.join()
1489        finally:
1490            self.device.forward_remove("tcp:{}".format(local_port))
1491
1492
1493class SocketTest(DeviceTest):
1494    def test_socket_flush(self):
1495        """Test that we handle socket closure properly.
1496
1497        If we're done writing to a socket, closing before the other end has
1498        closed will send a TCP_RST if we have incoming data queued up, which
1499        may result in data that we've written being discarded.
1500
1501        Bug: http://b/74616284
1502        """
1503        s = socket.create_connection(("localhost", 5037))
1504
1505        def adb_length_prefixed(string):
1506            encoded = string.encode("utf8")
1507            result = b"%04x%s" % (len(encoded), encoded)
1508            return result
1509
1510        if "ANDROID_SERIAL" in os.environ:
1511            transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"]
1512        else:
1513            transport_string = "host:transport-any"
1514
1515        s.sendall(adb_length_prefixed(transport_string))
1516        response = s.recv(4)
1517        self.assertEqual(b"OKAY", response)
1518
1519        shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo"
1520        s.sendall(adb_length_prefixed(shell_string))
1521
1522        response = s.recv(4)
1523        self.assertEqual(b"OKAY", response)
1524
1525        # Spawn a thread that dumps garbage into the socket until failure.
1526        def spam():
1527            buf = b"\0" * 16384
1528            try:
1529                while True:
1530                    s.sendall(buf)
1531            except Exception as ex:
1532                print(ex)
1533
1534        thread = threading.Thread(target=spam)
1535        thread.start()
1536
1537        time.sleep(1)
1538
1539        received = b""
1540        while True:
1541            read = s.recv(512)
1542            if len(read) == 0:
1543                break
1544            received += read
1545
1546        self.assertEqual(1024 * 1024 + len("foo\n"), len(received))
1547        thread.join()
1548
1549
1550if sys.platform == "win32":
1551    # From https://stackoverflow.com/a/38749458
1552    import os
1553    import contextlib
1554    import msvcrt
1555    import ctypes
1556    from ctypes import wintypes
1557
1558    kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
1559
1560    GENERIC_READ  = 0x80000000
1561    GENERIC_WRITE = 0x40000000
1562    FILE_SHARE_READ  = 1
1563    FILE_SHARE_WRITE = 2
1564    CONSOLE_TEXTMODE_BUFFER = 1
1565    INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
1566    STD_OUTPUT_HANDLE = wintypes.DWORD(-11)
1567    STD_ERROR_HANDLE = wintypes.DWORD(-12)
1568
1569    def _check_zero(result, func, args):
1570        if not result:
1571            raise ctypes.WinError(ctypes.get_last_error())
1572        return args
1573
1574    def _check_invalid(result, func, args):
1575        if result == INVALID_HANDLE_VALUE:
1576            raise ctypes.WinError(ctypes.get_last_error())
1577        return args
1578
1579    if not hasattr(wintypes, 'LPDWORD'): # Python 2
1580        wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
1581        wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT)
1582
1583    class COORD(ctypes.Structure):
1584        _fields_ = (('X', wintypes.SHORT),
1585                    ('Y', wintypes.SHORT))
1586
1587    class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure):
1588        _fields_ = (('cbSize',               wintypes.ULONG),
1589                    ('dwSize',               COORD),
1590                    ('dwCursorPosition',     COORD),
1591                    ('wAttributes',          wintypes.WORD),
1592                    ('srWindow',             wintypes.SMALL_RECT),
1593                    ('dwMaximumWindowSize',  COORD),
1594                    ('wPopupAttributes',     wintypes.WORD),
1595                    ('bFullscreenSupported', wintypes.BOOL),
1596                    ('ColorTable',           wintypes.DWORD * 16))
1597        def __init__(self, *args, **kwds):
1598            super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__(
1599                    *args, **kwds)
1600            self.cbSize = ctypes.sizeof(self)
1601
1602    PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER(
1603                                        CONSOLE_SCREEN_BUFFER_INFOEX)
1604    LPSECURITY_ATTRIBUTES = wintypes.LPVOID
1605
1606    kernel32.GetStdHandle.errcheck = _check_invalid
1607    kernel32.GetStdHandle.restype = wintypes.HANDLE
1608    kernel32.GetStdHandle.argtypes = (
1609        wintypes.DWORD,) # _In_ nStdHandle
1610
1611    kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid
1612    kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE
1613    kernel32.CreateConsoleScreenBuffer.argtypes = (
1614        wintypes.DWORD,        # _In_       dwDesiredAccess
1615        wintypes.DWORD,        # _In_       dwShareMode
1616        LPSECURITY_ATTRIBUTES, # _In_opt_   lpSecurityAttributes
1617        wintypes.DWORD,        # _In_       dwFlags
1618        wintypes.LPVOID)       # _Reserved_ lpScreenBufferData
1619
1620    kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero
1621    kernel32.GetConsoleScreenBufferInfoEx.argtypes = (
1622        wintypes.HANDLE,               # _In_  hConsoleOutput
1623        PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo
1624
1625    kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero
1626    kernel32.SetConsoleScreenBufferInfoEx.argtypes = (
1627        wintypes.HANDLE,               # _In_  hConsoleOutput
1628        PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_  lpConsoleScreenBufferInfo
1629
1630    kernel32.SetConsoleWindowInfo.errcheck = _check_zero
1631    kernel32.SetConsoleWindowInfo.argtypes = (
1632        wintypes.HANDLE,      # _In_ hConsoleOutput
1633        wintypes.BOOL,        # _In_ bAbsolute
1634        wintypes.PSMALL_RECT) # _In_ lpConsoleWindow
1635
1636    kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero
1637    kernel32.FillConsoleOutputCharacterW.argtypes = (
1638        wintypes.HANDLE,  # _In_  hConsoleOutput
1639        wintypes.WCHAR,   # _In_  cCharacter
1640        wintypes.DWORD,   # _In_  nLength
1641        COORD,            # _In_  dwWriteCoord
1642        wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten
1643
1644    kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero
1645    kernel32.ReadConsoleOutputCharacterW.argtypes = (
1646        wintypes.HANDLE,  # _In_  hConsoleOutput
1647        wintypes.LPWSTR,  # _Out_ lpCharacter
1648        wintypes.DWORD,   # _In_  nLength
1649        COORD,            # _In_  dwReadCoord
1650        wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead
1651
1652    @contextlib.contextmanager
1653    def allocate_console():
1654        allocated = kernel32.AllocConsole()
1655        try:
1656            yield allocated
1657        finally:
1658            if allocated:
1659                kernel32.FreeConsole()
1660
1661    @contextlib.contextmanager
1662    def console_screen(ncols=None, nrows=None):
1663        info = CONSOLE_SCREEN_BUFFER_INFOEX()
1664        new_info = CONSOLE_SCREEN_BUFFER_INFOEX()
1665        nwritten = (wintypes.DWORD * 1)()
1666        hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
1667        kernel32.GetConsoleScreenBufferInfoEx(
1668               hStdOut, ctypes.byref(info))
1669        if ncols is None:
1670            ncols = info.dwSize.X
1671        if nrows is None:
1672            nrows = info.dwSize.Y
1673        elif nrows > 9999:
1674            raise ValueError('nrows must be 9999 or less')
1675        fd_screen = None
1676        hScreen = kernel32.CreateConsoleScreenBuffer(
1677                    GENERIC_READ | GENERIC_WRITE,
1678                    FILE_SHARE_READ | FILE_SHARE_WRITE,
1679                    None, CONSOLE_TEXTMODE_BUFFER, None)
1680        try:
1681            fd_screen = msvcrt.open_osfhandle(
1682                            hScreen, os.O_RDWR | os.O_BINARY)
1683            kernel32.GetConsoleScreenBufferInfoEx(
1684                   hScreen, ctypes.byref(new_info))
1685            new_info.dwSize = COORD(ncols, nrows)
1686            new_info.srWindow = wintypes.SMALL_RECT(
1687                    Left=0, Top=0, Right=(ncols - 1),
1688                    Bottom=(info.srWindow.Bottom - info.srWindow.Top))
1689            kernel32.SetConsoleScreenBufferInfoEx(
1690                    hScreen, ctypes.byref(new_info))
1691            kernel32.SetConsoleWindowInfo(hScreen, True,
1692                    ctypes.byref(new_info.srWindow))
1693            kernel32.FillConsoleOutputCharacterW(
1694                    hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten)
1695            kernel32.SetConsoleActiveScreenBuffer(hScreen)
1696            try:
1697                yield fd_screen
1698            finally:
1699                kernel32.SetConsoleScreenBufferInfoEx(
1700                    hStdOut, ctypes.byref(info))
1701                kernel32.SetConsoleWindowInfo(hStdOut, True,
1702                        ctypes.byref(info.srWindow))
1703                kernel32.SetConsoleActiveScreenBuffer(hStdOut)
1704        finally:
1705            if fd_screen is not None:
1706                os.close(fd_screen)
1707            else:
1708                kernel32.CloseHandle(hScreen)
1709
1710    def read_screen(fd):
1711        hScreen = msvcrt.get_osfhandle(fd)
1712        csbi = CONSOLE_SCREEN_BUFFER_INFOEX()
1713        kernel32.GetConsoleScreenBufferInfoEx(
1714            hScreen, ctypes.byref(csbi))
1715        ncols = csbi.dwSize.X
1716        pos = csbi.dwCursorPosition
1717        length = ncols * pos.Y + pos.X + 1
1718        buf = (ctypes.c_wchar * length)()
1719        n = (wintypes.DWORD * 1)()
1720        kernel32.ReadConsoleOutputCharacterW(
1721            hScreen, buf, length, COORD(0,0), n)
1722        lines = [buf[i:i+ncols].rstrip(u'\0')
1723                    for i in range(0, n[0], ncols)]
1724        return u'\n'.join(lines)
1725
1726@unittest.skipUnless(sys.platform == "win32", "requires Windows")
1727class WindowsConsoleTest(DeviceTest):
1728    def test_unicode_output(self):
1729        """Test Unicode command line parameters and Unicode console window output.
1730
1731        Bug: https://issuetracker.google.com/issues/111972753
1732        """
1733        # If we don't have a console window, allocate one. This isn't necessary if we're already
1734        # being run from a console window, which is typical.
1735        with allocate_console() as allocated_console:
1736            # Create a temporary console buffer and switch to it. We could also pass a parameter of
1737            # ncols=len(unicode_string), but it causes the window to flash as it is resized and
1738            # likely unnecessary given the typical console window size.
1739            with console_screen(nrows=1000) as screen:
1740                unicode_string = u'로보카 폴리'
1741                # Run adb and allow it to detect that stdout is a console, not a pipe, by using
1742                # device.shell_popen() which does not use a pipe, unlike device.shell().
1743                process = self.device.shell_popen(['echo', '"' + unicode_string + '"'])
1744                process.wait()
1745                # Read what was written by adb to the temporary console buffer.
1746                console_output = read_screen(screen)
1747                self.assertEqual(unicode_string, console_output)
1748
1749
1750def main():
1751    random.seed(0)
1752    if len(adb.get_devices()) > 0:
1753        suite = unittest.TestLoader().loadTestsFromName(__name__)
1754        unittest.TextTestRunner(verbosity=3).run(suite)
1755    else:
1756        print('Test suite must be run with attached devices')
1757
1758
1759if __name__ == '__main__':
1760    main()
1761