1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2015 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18from __future__ import print_function 19 20import contextlib 21import hashlib 22import os 23import posixpath 24import random 25import re 26import shlex 27import shutil 28import signal 29import socket 30import string 31import subprocess 32import sys 33import tempfile 34import threading 35import time 36import unittest 37 38from datetime import datetime 39 40import adb 41 42def requires_root(func): 43 def wrapper(self, *args): 44 if self.device.get_prop('ro.debuggable') != '1': 45 raise unittest.SkipTest('requires rootable build') 46 47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 48 if not was_root: 49 self.device.root() 50 self.device.wait() 51 52 try: 53 func(self, *args) 54 finally: 55 if not was_root: 56 self.device.unroot() 57 self.device.wait() 58 59 return wrapper 60 61 62def requires_non_root(func): 63 def wrapper(self, *args): 64 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 65 if was_root: 66 self.device.unroot() 67 self.device.wait() 68 69 try: 70 func(self, *args) 71 finally: 72 if was_root: 73 self.device.root() 74 self.device.wait() 75 76 return wrapper 77 78 79class DeviceTest(unittest.TestCase): 80 device = adb.get_device() 81 82 83class AbbTest(DeviceTest): 84 def test_smoke(self): 85 abb = subprocess.run(['adb', 'abb'], capture_output=True) 86 cmd = subprocess.run(['adb', 'shell', 'cmd'], capture_output=True) 87 88 # abb squashes all failures to 1. 89 self.assertEqual(abb.returncode == 0, cmd.returncode == 0) 90 self.assertEqual(abb.stdout, cmd.stdout) 91 self.assertEqual(abb.stderr, cmd.stderr) 92 93class ForwardReverseTest(DeviceTest): 94 def _test_no_rebind(self, description, direction_list, direction, 95 direction_no_rebind, direction_remove_all): 96 msg = direction_list() 97 self.assertEqual('', msg.strip(), 98 description + ' list must be empty to run this test.') 99 100 # Use --no-rebind with no existing binding 101 direction_no_rebind('tcp:5566', 'tcp:6655') 102 msg = direction_list() 103 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 104 105 # Use --no-rebind with existing binding 106 with self.assertRaises(subprocess.CalledProcessError): 107 direction_no_rebind('tcp:5566', 'tcp:6677') 108 msg = direction_list() 109 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg)) 110 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 111 112 # Use the absence of --no-rebind with existing binding 113 direction('tcp:5566', 'tcp:6677') 114 msg = direction_list() 115 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 116 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg)) 117 118 direction_remove_all() 119 msg = direction_list() 120 self.assertEqual('', msg.strip()) 121 122 def test_forward_no_rebind(self): 123 self._test_no_rebind('forward', self.device.forward_list, 124 self.device.forward, self.device.forward_no_rebind, 125 self.device.forward_remove_all) 126 127 def test_reverse_no_rebind(self): 128 self._test_no_rebind('reverse', self.device.reverse_list, 129 self.device.reverse, self.device.reverse_no_rebind, 130 self.device.reverse_remove_all) 131 132 def test_forward(self): 133 msg = self.device.forward_list() 134 self.assertEqual('', msg.strip(), 135 'Forwarding list must be empty to run this test.') 136 self.device.forward('tcp:5566', 'tcp:6655') 137 msg = self.device.forward_list() 138 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 139 self.device.forward('tcp:7788', 'tcp:8877') 140 msg = self.device.forward_list() 141 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 142 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 143 self.device.forward_remove('tcp:5566') 144 msg = self.device.forward_list() 145 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 146 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 147 self.device.forward_remove_all() 148 msg = self.device.forward_list() 149 self.assertEqual('', msg.strip()) 150 151 def test_forward_old_protocol(self): 152 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 153 154 msg = self.device.forward_list() 155 self.assertEqual('', msg.strip(), 156 'Forwarding list must be empty to run this test.') 157 158 s = socket.create_connection(("localhost", 5037)) 159 service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno 160 cmd = b"%04x%s" % (len(service), service) 161 s.sendall(cmd) 162 163 msg = self.device.forward_list() 164 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 165 166 self.device.forward_remove_all() 167 msg = self.device.forward_list() 168 self.assertEqual('', msg.strip()) 169 170 def test_forward_tcp_port_0(self): 171 self.assertEqual('', self.device.forward_list().strip(), 172 'Forwarding list must be empty to run this test.') 173 174 try: 175 # If resolving TCP port 0 is supported, `adb forward` will print 176 # the actual port number. 177 port = self.device.forward('tcp:0', 'tcp:8888').strip() 178 if not port: 179 raise unittest.SkipTest('Forwarding tcp:0 is not available.') 180 181 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 182 self.device.forward_list())) 183 finally: 184 self.device.forward_remove_all() 185 186 def test_reverse(self): 187 msg = self.device.reverse_list() 188 self.assertEqual('', msg.strip(), 189 'Reverse forwarding list must be empty to run this test.') 190 self.device.reverse('tcp:5566', 'tcp:6655') 191 msg = self.device.reverse_list() 192 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 193 self.device.reverse('tcp:7788', 'tcp:8877') 194 msg = self.device.reverse_list() 195 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 196 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 197 self.device.reverse_remove('tcp:5566') 198 msg = self.device.reverse_list() 199 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 200 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 201 self.device.reverse_remove_all() 202 msg = self.device.reverse_list() 203 self.assertEqual('', msg.strip()) 204 205 def test_reverse_tcp_port_0(self): 206 self.assertEqual('', self.device.reverse_list().strip(), 207 'Reverse list must be empty to run this test.') 208 209 try: 210 # If resolving TCP port 0 is supported, `adb reverse` will print 211 # the actual port number. 212 port = self.device.reverse('tcp:0', 'tcp:8888').strip() 213 if not port: 214 raise unittest.SkipTest('Reversing tcp:0 is not available.') 215 216 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 217 self.device.reverse_list())) 218 finally: 219 self.device.reverse_remove_all() 220 221 def test_forward_reverse_echo(self): 222 """Send data through adb forward and read it back via adb reverse""" 223 forward_port = 12345 224 reverse_port = forward_port + 1 225 forward_spec = 'tcp:' + str(forward_port) 226 reverse_spec = 'tcp:' + str(reverse_port) 227 forward_setup = False 228 reverse_setup = False 229 230 try: 231 # listen on localhost:forward_port, connect to remote:forward_port 232 self.device.forward(forward_spec, forward_spec) 233 forward_setup = True 234 # listen on remote:forward_port, connect to localhost:reverse_port 235 self.device.reverse(forward_spec, reverse_spec) 236 reverse_setup = True 237 238 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 239 with contextlib.closing(listener): 240 # Use SO_REUSEADDR so that subsequent runs of the test can grab 241 # the port even if it is in TIME_WAIT. 242 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 243 244 # Listen on localhost:reverse_port before connecting to 245 # localhost:forward_port because that will cause adb to connect 246 # back to localhost:reverse_port. 247 listener.bind(('127.0.0.1', reverse_port)) 248 listener.listen(4) 249 250 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 251 with contextlib.closing(client): 252 # Connect to the listener. 253 client.connect(('127.0.0.1', forward_port)) 254 255 # Accept the client connection. 256 accepted_connection, addr = listener.accept() 257 with contextlib.closing(accepted_connection) as server: 258 data = b'hello' 259 260 # Send data into the port setup by adb forward. 261 client.sendall(data) 262 # Explicitly close() so that server gets EOF. 263 client.close() 264 265 # Verify that the data came back via adb reverse. 266 self.assertEqual(data, server.makefile().read().encode("utf8")) 267 finally: 268 if reverse_setup: 269 self.device.reverse_remove(forward_spec) 270 if forward_setup: 271 self.device.forward_remove(forward_spec) 272 273 274class ShellTest(DeviceTest): 275 def _interactive_shell(self, shell_args, input): 276 """Runs an interactive adb shell. 277 278 Args: 279 shell_args: List of string arguments to `adb shell`. 280 input: bytes input to send to the interactive shell. 281 282 Returns: 283 The remote exit code. 284 285 Raises: 286 unittest.SkipTest: The device doesn't support exit codes. 287 """ 288 if not self.device.has_shell_protocol(): 289 raise unittest.SkipTest('exit codes are unavailable on this device') 290 291 proc = subprocess.Popen( 292 self.device.adb_cmd + ['shell'] + shell_args, 293 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 294 stderr=subprocess.PIPE) 295 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need 296 # to explicitly add an exit command to close the session from the device 297 # side, plus the necessary newline to complete the interactive command. 298 proc.communicate(input + b'; exit\n') 299 return proc.returncode 300 301 def test_cat(self): 302 """Check that we can at least cat a file.""" 303 out = self.device.shell(['cat', '/proc/uptime'])[0].strip() 304 elements = out.split() 305 self.assertEqual(len(elements), 2) 306 307 uptime, idle = elements 308 self.assertGreater(float(uptime), 0.0) 309 self.assertGreater(float(idle), 0.0) 310 311 def test_throws_on_failure(self): 312 self.assertRaises(adb.ShellError, self.device.shell, ['false']) 313 314 def test_output_not_stripped(self): 315 out = self.device.shell(['echo', 'foo'])[0] 316 self.assertEqual(out, 'foo' + self.device.linesep) 317 318 def test_shell_command_length(self): 319 # Devices that have shell_v2 should be able to handle long commands. 320 if self.device.has_shell_protocol(): 321 rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384]) 322 self.assertEqual(rc, 0) 323 self.assertTrue(out == ('x' * 16384 + '\n')) 324 325 def test_shell_nocheck_failure(self): 326 rc, out, _ = self.device.shell_nocheck(['false']) 327 self.assertNotEqual(rc, 0) 328 self.assertEqual(out, '') 329 330 def test_shell_nocheck_output_not_stripped(self): 331 rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) 332 self.assertEqual(rc, 0) 333 self.assertEqual(out, 'foo' + self.device.linesep) 334 335 def test_can_distinguish_tricky_results(self): 336 # If result checking on ADB shell is naively implemented as 337 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the 338 # output from the result for a cmd of `echo -n 1`. 339 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) 340 self.assertEqual(rc, 0) 341 self.assertEqual(out, '1') 342 343 def test_line_endings(self): 344 """Ensure that line ending translation is not happening in the pty. 345 346 Bug: http://b/19735063 347 """ 348 output = self.device.shell(['uname'])[0] 349 self.assertEqual(output, 'Linux' + self.device.linesep) 350 351 def test_pty_logic(self): 352 """Tests that a PTY is allocated when it should be. 353 354 PTY allocation behavior should match ssh. 355 """ 356 def check_pty(args): 357 """Checks adb shell PTY allocation. 358 359 Tests |args| for terminal and non-terminal stdin. 360 361 Args: 362 args: -Tt args in a list (e.g. ['-t', '-t']). 363 364 Returns: 365 A tuple (<terminal>, <non-terminal>). True indicates 366 the corresponding shell allocated a remote PTY. 367 """ 368 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]'] 369 370 terminal = subprocess.Popen( 371 test_cmd, stdin=None, 372 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 373 terminal.communicate() 374 375 non_terminal = subprocess.Popen( 376 test_cmd, stdin=subprocess.PIPE, 377 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 378 non_terminal.communicate() 379 380 return (terminal.returncode == 0, non_terminal.returncode == 0) 381 382 # -T: never allocate PTY. 383 self.assertEqual((False, False), check_pty(['-T'])) 384 385 # These tests require a new device. 386 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()): 387 # No args: PTY only if stdin is a terminal and shell is interactive, 388 # which is difficult to reliably test from a script. 389 self.assertEqual((False, False), check_pty([])) 390 391 # -t: PTY if stdin is a terminal. 392 self.assertEqual((True, False), check_pty(['-t'])) 393 394 # -t -t: always allocate PTY. 395 self.assertEqual((True, True), check_pty(['-t', '-t'])) 396 397 # -tt: always allocate PTY, POSIX style (http://b/32216152). 398 self.assertEqual((True, True), check_pty(['-tt'])) 399 400 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but 401 # we follow the man page instead. 402 self.assertEqual((True, True), check_pty(['-ttt'])) 403 404 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error). 405 self.assertEqual((True, True), check_pty(['-ttx'])) 406 407 # -Ttt: -tt cancels out -T. 408 self.assertEqual((True, True), check_pty(['-Ttt'])) 409 410 # -ttT: -T cancels out -tt. 411 self.assertEqual((False, False), check_pty(['-ttT'])) 412 413 def test_shell_protocol(self): 414 """Tests the shell protocol on the device. 415 416 If the device supports shell protocol, this gives us the ability 417 to separate stdout/stderr and return the exit code directly. 418 419 Bug: http://b/19734861 420 """ 421 if not self.device.has_shell_protocol(): 422 raise unittest.SkipTest('shell protocol unsupported on this device') 423 424 # Shell protocol should be used by default. 425 result = self.device.shell_nocheck( 426 shlex.split('echo foo; echo bar >&2; exit 17')) 427 self.assertEqual(17, result[0]) 428 self.assertEqual('foo' + self.device.linesep, result[1]) 429 self.assertEqual('bar' + self.device.linesep, result[2]) 430 431 self.assertEqual(17, self._interactive_shell([], b'exit 17')) 432 433 # -x flag should disable shell protocol. 434 result = self.device.shell_nocheck( 435 shlex.split('-x echo foo; echo bar >&2; exit 17')) 436 self.assertEqual(0, result[0]) 437 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1]) 438 self.assertEqual('', result[2]) 439 440 self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17')) 441 442 def test_non_interactive_sigint(self): 443 """Tests that SIGINT in a non-interactive shell kills the process. 444 445 This requires the shell protocol in order to detect the broken 446 pipe; raw data transfer mode will only see the break once the 447 subprocess tries to read or write. 448 449 Bug: http://b/23825725 450 """ 451 if not self.device.has_shell_protocol(): 452 raise unittest.SkipTest('shell protocol unsupported on this device') 453 454 # Start a long-running process. 455 sleep_proc = subprocess.Popen( 456 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), 457 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 458 stderr=subprocess.STDOUT) 459 remote_pid = sleep_proc.stdout.readline().strip().decode("utf8") 460 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') 461 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) 462 463 # Verify that the process is running, send signal, verify it stopped. 464 self.device.shell(proc_query) 465 os.kill(sleep_proc.pid, signal.SIGINT) 466 sleep_proc.communicate() 467 468 # It can take some time for the process to receive the signal and die. 469 end_time = time.time() + 3 470 while self.device.shell_nocheck(proc_query)[0] != 1: 471 self.assertFalse(time.time() > end_time, 472 'subprocess failed to terminate in time') 473 474 def test_non_interactive_stdin(self): 475 """Tests that non-interactive shells send stdin.""" 476 if not self.device.has_shell_protocol(): 477 raise unittest.SkipTest('non-interactive stdin unsupported ' 478 'on this device') 479 480 # Test both small and large inputs. 481 small_input = b'foo' 482 characters = [c.encode("utf8") for c in string.ascii_letters + string.digits] 483 large_input = b'\n'.join(characters) 484 485 486 for input in (small_input, large_input): 487 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'], 488 stdin=subprocess.PIPE, 489 stdout=subprocess.PIPE, 490 stderr=subprocess.PIPE) 491 stdout, stderr = proc.communicate(input) 492 self.assertEqual(input.splitlines(), stdout.splitlines()) 493 self.assertEqual(b'', stderr) 494 495 def test_sighup(self): 496 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c""" 497 log_path = "/data/local/tmp/adb_signal_test.log" 498 499 # Clear the output file. 500 self.device.shell_nocheck(["echo", ">", log_path]) 501 502 script = """ 503 trap "echo SIGINT > {path}; exit 0" SIGINT 504 trap "echo SIGHUP > {path}; exit 0" SIGHUP 505 echo Waiting 506 read 507 """.format(path=log_path) 508 509 script = ";".join([x.strip() for x in script.strip().splitlines()]) 510 511 process = self.device.shell_popen([script], kill_atexit=False, 512 stdin=subprocess.PIPE, 513 stdout=subprocess.PIPE) 514 515 self.assertEqual(b"Waiting\n", process.stdout.readline()) 516 process.send_signal(signal.SIGINT) 517 process.wait() 518 519 # Waiting for the local adb to finish is insufficient, since it hangs 520 # up immediately. 521 time.sleep(1) 522 523 stdout, _ = self.device.shell(["cat", log_path]) 524 self.assertEqual(stdout.strip(), "SIGHUP") 525 526 def test_exit_stress(self): 527 """Hammer `adb shell exit 42` with multiple threads.""" 528 thread_count = 48 529 result = dict() 530 def hammer(thread_idx, thread_count, result): 531 success = True 532 for i in range(thread_idx, 240, thread_count): 533 ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)]) 534 if ret != i % 256: 535 success = False 536 break 537 result[thread_idx] = success 538 539 threads = [] 540 for i in range(thread_count): 541 thread = threading.Thread(target=hammer, args=(i, thread_count, result)) 542 thread.start() 543 threads.append(thread) 544 for thread in threads: 545 thread.join() 546 for i, success in result.items(): 547 self.assertTrue(success) 548 549 def disabled_test_parallel(self): 550 """Spawn a bunch of `adb shell` instances in parallel. 551 552 This was broken historically due to the use of select, which only works 553 for fds that are numerically less than 1024. 554 555 Bug: http://b/141955761""" 556 557 n_procs = 2048 558 procs = dict() 559 for i in range(0, n_procs): 560 procs[i] = subprocess.Popen( 561 ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'], 562 stdin=subprocess.PIPE, 563 stdout=subprocess.PIPE 564 ) 565 566 for i in range(0, n_procs): 567 procs[i].stdin.write("%d\n" % i) 568 569 for i in range(0, n_procs): 570 response = procs[i].stdout.readline() 571 assert(response == "%d\n" % i) 572 573 for i in range(0, n_procs): 574 procs[i].stdin.write("%d\n" % (i % 256)) 575 576 for i in range(0, n_procs): 577 assert(procs[i].wait() == i % 256) 578 579 580class ArgumentEscapingTest(DeviceTest): 581 def test_shell_escaping(self): 582 """Make sure that argument escaping is somewhat sane.""" 583 584 # http://b/19734868 585 # Note that this actually matches ssh(1)'s behavior --- it's 586 # converted to `sh -c echo hello; echo world` which sh interprets 587 # as `sh -c echo` (with an argument to that shell of "hello"), 588 # and then `echo world` back in the first shell. 589 result = self.device.shell( 590 shlex.split("sh -c 'echo hello; echo world'"))[0] 591 result = result.splitlines() 592 self.assertEqual(['', 'world'], result) 593 # If you really wanted "hello" and "world", here's what you'd do: 594 result = self.device.shell( 595 shlex.split(r'echo hello\;echo world'))[0].splitlines() 596 self.assertEqual(['hello', 'world'], result) 597 598 # http://b/15479704 599 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() 600 self.assertEqual('t', result) 601 result = self.device.shell( 602 shlex.split("sh -c 'true && echo t'"))[0].strip() 603 self.assertEqual('t', result) 604 605 # http://b/20564385 606 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() 607 self.assertEqual('t', result) 608 result = self.device.shell( 609 shlex.split(r'echo -n 123\;uname'))[0].strip() 610 self.assertEqual('123Linux', result) 611 612 def test_install_argument_escaping(self): 613 """Make sure that install argument escaping works.""" 614 # http://b/20323053, http://b/3090932. 615 for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"): 616 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix, 617 delete=False) 618 tf.close() 619 620 # Installing bogus .apks fails if the device supports exit codes. 621 try: 622 output = self.device.install(tf.name.decode("utf8")) 623 except subprocess.CalledProcessError as e: 624 output = e.output 625 626 self.assertIn(file_suffix, output) 627 os.remove(tf.name) 628 629 630class RootUnrootTest(DeviceTest): 631 def _test_root(self): 632 message = self.device.root() 633 if 'adbd cannot run as root in production builds' in message: 634 return 635 self.device.wait() 636 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) 637 638 def _test_unroot(self): 639 self.device.unroot() 640 self.device.wait() 641 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) 642 643 def test_root_unroot(self): 644 """Make sure that adb root and adb unroot work, using id(1).""" 645 if self.device.get_prop('ro.debuggable') != '1': 646 raise unittest.SkipTest('requires rootable build') 647 648 original_user = self.device.shell(['id', '-un'])[0].strip() 649 try: 650 if original_user == 'root': 651 self._test_unroot() 652 self._test_root() 653 elif original_user == 'shell': 654 self._test_root() 655 self._test_unroot() 656 finally: 657 if original_user == 'root': 658 self.device.root() 659 else: 660 self.device.unroot() 661 self.device.wait() 662 663 664class TcpIpTest(DeviceTest): 665 def test_tcpip_failure_raises(self): 666 """adb tcpip requires a port. 667 668 Bug: http://b/22636927 669 """ 670 self.assertRaises( 671 subprocess.CalledProcessError, self.device.tcpip, '') 672 self.assertRaises( 673 subprocess.CalledProcessError, self.device.tcpip, 'foo') 674 675 676class SystemPropertiesTest(DeviceTest): 677 def test_get_prop(self): 678 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') 679 680 @requires_root 681 def test_set_prop(self): 682 prop_name = 'foo.bar' 683 self.device.shell(['setprop', prop_name, '""']) 684 685 self.device.set_prop(prop_name, 'qux') 686 self.assertEqual( 687 self.device.shell(['getprop', prop_name])[0].strip(), 'qux') 688 689 690def compute_md5(string): 691 hsh = hashlib.md5() 692 hsh.update(string) 693 return hsh.hexdigest() 694 695 696def get_md5_prog(device): 697 """Older platforms (pre-L) had the name md5 rather than md5sum.""" 698 try: 699 device.shell(['md5sum', '/proc/uptime']) 700 return 'md5sum' 701 except adb.ShellError: 702 return 'md5' 703 704 705class HostFile(object): 706 def __init__(self, handle, checksum): 707 self.handle = handle 708 self.checksum = checksum 709 self.full_path = handle.name 710 self.base_name = os.path.basename(self.full_path) 711 712 713class DeviceFile(object): 714 def __init__(self, checksum, full_path): 715 self.checksum = checksum 716 self.full_path = full_path 717 self.base_name = posixpath.basename(self.full_path) 718 719 720def make_random_host_files(in_dir, num_files): 721 min_size = 1 * (1 << 10) 722 max_size = 16 * (1 << 10) 723 724 files = [] 725 for _ in range(num_files): 726 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) 727 728 size = random.randrange(min_size, max_size, 1024) 729 rand_str = os.urandom(size) 730 file_handle.write(rand_str) 731 file_handle.flush() 732 file_handle.close() 733 734 md5 = compute_md5(rand_str) 735 files.append(HostFile(file_handle, md5)) 736 return files 737 738 739def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'): 740 min_size = 1 * (1 << 10) 741 max_size = 16 * (1 << 10) 742 743 files = [] 744 for file_num in range(num_files): 745 size = random.randrange(min_size, max_size, 1024) 746 747 base_name = prefix + str(file_num) 748 full_path = posixpath.join(in_dir, base_name) 749 750 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), 751 'bs={}'.format(size), 'count=1']) 752 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split() 753 754 files.append(DeviceFile(dev_md5, full_path)) 755 return files 756 757 758class FileOperationsTest: 759 class Base(DeviceTest): 760 SCRATCH_DIR = '/data/local/tmp' 761 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' 762 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' 763 764 def setUp(self): 765 self.previous_env = os.environ.get("ADB_COMPRESSION") 766 os.environ["ADB_COMPRESSION"] = self.compression 767 768 def tearDown(self): 769 if self.previous_env is None: 770 del os.environ["ADB_COMPRESSION"] 771 else: 772 os.environ["ADB_COMPRESSION"] = self.previous_env 773 774 def _verify_remote(self, checksum, remote_path): 775 dev_md5, _ = self.device.shell([get_md5_prog(self.device), 776 remote_path])[0].split() 777 self.assertEqual(checksum, dev_md5) 778 779 def _verify_local(self, checksum, local_path): 780 with open(local_path, 'rb') as host_file: 781 host_md5 = compute_md5(host_file.read()) 782 self.assertEqual(host_md5, checksum) 783 784 def test_push(self): 785 """Push a randomly generated file to specified device.""" 786 kbytes = 512 787 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) 788 rand_str = os.urandom(1024 * kbytes) 789 tmp.write(rand_str) 790 tmp.close() 791 792 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 793 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE) 794 795 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE) 796 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 797 798 os.remove(tmp.name) 799 800 def test_push_dir(self): 801 """Push a randomly generated directory of files to the device.""" 802 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 803 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 804 805 try: 806 host_dir = tempfile.mkdtemp() 807 808 # Make sure the temp directory isn't setuid, or else adb will complain. 809 os.chmod(host_dir, 0o700) 810 811 # Create 32 random files. 812 temp_files = make_random_host_files(in_dir=host_dir, num_files=32) 813 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 814 815 for temp_file in temp_files: 816 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 817 os.path.basename(host_dir), 818 temp_file.base_name) 819 self._verify_remote(temp_file.checksum, remote_path) 820 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 821 finally: 822 if host_dir is not None: 823 shutil.rmtree(host_dir) 824 825 def disabled_test_push_empty(self): 826 """Push an empty directory to the device.""" 827 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 828 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 829 830 try: 831 host_dir = tempfile.mkdtemp() 832 833 # Make sure the temp directory isn't setuid, or else adb will complain. 834 os.chmod(host_dir, 0o700) 835 836 # Create an empty directory. 837 empty_dir_path = os.path.join(host_dir, 'empty') 838 os.mkdir(empty_dir_path); 839 840 self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR) 841 842 remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty") 843 test_empty_cmd = ["[", "-d", remote_path, "]"] 844 rc, _, _ = self.device.shell_nocheck(test_empty_cmd) 845 846 self.assertEqual(rc, 0) 847 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 848 finally: 849 if host_dir is not None: 850 shutil.rmtree(host_dir) 851 852 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows") 853 def test_push_symlink(self): 854 """Push a symlink. 855 856 Bug: http://b/31491920 857 """ 858 try: 859 host_dir = tempfile.mkdtemp() 860 861 # Make sure the temp directory isn't setuid, or else adb will 862 # complain. 863 os.chmod(host_dir, 0o700) 864 865 with open(os.path.join(host_dir, 'foo'), 'w') as f: 866 f.write('foo') 867 868 symlink_path = os.path.join(host_dir, 'symlink') 869 os.symlink('foo', symlink_path) 870 871 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 872 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 873 self.device.push(symlink_path, self.DEVICE_TEMP_DIR) 874 rc, out, _ = self.device.shell_nocheck( 875 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')]) 876 self.assertEqual(0, rc) 877 self.assertEqual(out.strip(), 'foo') 878 finally: 879 if host_dir is not None: 880 shutil.rmtree(host_dir) 881 882 def test_multiple_push(self): 883 """Push multiple files to the device in one adb push command. 884 885 Bug: http://b/25324823 886 """ 887 888 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 889 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 890 891 try: 892 host_dir = tempfile.mkdtemp() 893 894 # Create some random files and a subdirectory containing more files. 895 temp_files = make_random_host_files(in_dir=host_dir, num_files=4) 896 897 subdir = os.path.join(host_dir, 'subdir') 898 os.mkdir(subdir) 899 subdir_temp_files = make_random_host_files(in_dir=subdir, 900 num_files=4) 901 902 paths = [x.full_path for x in temp_files] 903 paths.append(subdir) 904 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR]) 905 906 for temp_file in temp_files: 907 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 908 temp_file.base_name) 909 self._verify_remote(temp_file.checksum, remote_path) 910 911 for subdir_temp_file in subdir_temp_files: 912 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 913 # BROKEN: http://b/25394682 914 # 'subdir'; 915 temp_file.base_name) 916 self._verify_remote(temp_file.checksum, remote_path) 917 918 919 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 920 finally: 921 if host_dir is not None: 922 shutil.rmtree(host_dir) 923 924 @requires_non_root 925 def test_push_error_reporting(self): 926 """Make sure that errors that occur while pushing a file get reported 927 928 Bug: http://b/26816782 929 """ 930 with tempfile.NamedTemporaryFile() as tmp_file: 931 tmp_file.write(b'\0' * 1024 * 1024) 932 tmp_file.flush() 933 try: 934 self.device.push(local=tmp_file.name, remote='/system/') 935 self.fail('push should not have succeeded') 936 except subprocess.CalledProcessError as e: 937 output = e.output 938 939 self.assertTrue(b'Permission denied' in output or 940 b'Read-only file system' in output) 941 942 @requires_non_root 943 def test_push_directory_creation(self): 944 """Regression test for directory creation. 945 946 Bug: http://b/110953234 947 """ 948 with tempfile.NamedTemporaryFile() as tmp_file: 949 tmp_file.write(b'\0' * 1024 * 1024) 950 tmp_file.flush() 951 remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation' 952 self.device.shell(['rm', '-rf', remote_path]) 953 954 remote_path += '/filename' 955 self.device.push(local=tmp_file.name, remote=remote_path) 956 957 def disabled_test_push_multiple_slash_root(self): 958 """Regression test for pushing to //data/local/tmp. 959 960 Bug: http://b/141311284 961 962 Disabled because this broken on the adbd side as well: b/141943968 963 """ 964 with tempfile.NamedTemporaryFile() as tmp_file: 965 tmp_file.write('\0' * 1024 * 1024) 966 tmp_file.flush() 967 remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root' 968 self.device.shell(['rm', '-rf', remote_path]) 969 self.device.push(local=tmp_file.name, remote=remote_path) 970 971 def _test_pull(self, remote_file, checksum): 972 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) 973 tmp_write.close() 974 self.device.pull(remote=remote_file, local=tmp_write.name) 975 with open(tmp_write.name, 'rb') as tmp_read: 976 host_contents = tmp_read.read() 977 host_md5 = compute_md5(host_contents) 978 self.assertEqual(checksum, host_md5) 979 os.remove(tmp_write.name) 980 981 @requires_non_root 982 def test_pull_error_reporting(self): 983 self.device.shell(['touch', self.DEVICE_TEMP_FILE]) 984 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE]) 985 986 try: 987 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x') 988 except subprocess.CalledProcessError as e: 989 output = e.output 990 991 self.assertIn(b'Permission denied', output) 992 993 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 994 995 def test_pull(self): 996 """Pull a randomly generated file from specified device.""" 997 kbytes = 512 998 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 999 cmd = ['dd', 'if=/dev/urandom', 1000 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', 1001 'count={}'.format(kbytes)] 1002 self.device.shell(cmd) 1003 dev_md5, _ = self.device.shell( 1004 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split() 1005 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) 1006 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) 1007 1008 def test_pull_dir(self): 1009 """Pull a randomly generated directory of files from the device.""" 1010 try: 1011 host_dir = tempfile.mkdtemp() 1012 1013 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1014 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1015 1016 # Populate device directory with random files. 1017 temp_files = make_random_device_files( 1018 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1019 1020 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1021 1022 for temp_file in temp_files: 1023 host_path = os.path.join( 1024 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1025 temp_file.base_name) 1026 self._verify_local(temp_file.checksum, host_path) 1027 1028 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1029 finally: 1030 if host_dir is not None: 1031 shutil.rmtree(host_dir) 1032 1033 def test_pull_dir_symlink(self): 1034 """Pull a directory into a symlink to a directory. 1035 1036 Bug: http://b/27362811 1037 """ 1038 if os.name != 'posix': 1039 raise unittest.SkipTest('requires POSIX') 1040 1041 try: 1042 host_dir = tempfile.mkdtemp() 1043 real_dir = os.path.join(host_dir, 'dir') 1044 symlink = os.path.join(host_dir, 'symlink') 1045 os.mkdir(real_dir) 1046 os.symlink(real_dir, symlink) 1047 1048 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1049 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1050 1051 # Populate device directory with random files. 1052 temp_files = make_random_device_files( 1053 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1054 1055 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink) 1056 1057 for temp_file in temp_files: 1058 host_path = os.path.join( 1059 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1060 temp_file.base_name) 1061 self._verify_local(temp_file.checksum, host_path) 1062 1063 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1064 finally: 1065 if host_dir is not None: 1066 shutil.rmtree(host_dir) 1067 1068 def test_pull_dir_symlink_collision(self): 1069 """Pull a directory into a colliding symlink to directory.""" 1070 if os.name != 'posix': 1071 raise unittest.SkipTest('requires POSIX') 1072 1073 try: 1074 host_dir = tempfile.mkdtemp() 1075 real_dir = os.path.join(host_dir, 'real') 1076 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR) 1077 symlink = os.path.join(host_dir, tmp_dirname) 1078 os.mkdir(real_dir) 1079 os.symlink(real_dir, symlink) 1080 1081 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1082 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1083 1084 # Populate device directory with random files. 1085 temp_files = make_random_device_files( 1086 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1087 1088 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1089 1090 for temp_file in temp_files: 1091 host_path = os.path.join(real_dir, temp_file.base_name) 1092 self._verify_local(temp_file.checksum, host_path) 1093 1094 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1095 finally: 1096 if host_dir is not None: 1097 shutil.rmtree(host_dir) 1098 1099 def test_pull_dir_nonexistent(self): 1100 """Pull a directory of files from the device to a nonexistent path.""" 1101 try: 1102 host_dir = tempfile.mkdtemp() 1103 dest_dir = os.path.join(host_dir, 'dest') 1104 1105 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1106 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1107 1108 # Populate device directory with random files. 1109 temp_files = make_random_device_files( 1110 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1111 1112 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir) 1113 1114 for temp_file in temp_files: 1115 host_path = os.path.join(dest_dir, temp_file.base_name) 1116 self._verify_local(temp_file.checksum, host_path) 1117 1118 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1119 finally: 1120 if host_dir is not None: 1121 shutil.rmtree(host_dir) 1122 1123 # selinux prevents adbd from accessing symlinks on /data/local/tmp. 1124 def disabled_test_pull_symlink_dir(self): 1125 """Pull a symlink to a directory of symlinks to files.""" 1126 try: 1127 host_dir = tempfile.mkdtemp() 1128 1129 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents') 1130 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links') 1131 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink') 1132 1133 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1134 self.device.shell(['mkdir', '-p', remote_dir, remote_links]) 1135 self.device.shell(['ln', '-s', remote_links, remote_symlink]) 1136 1137 # Populate device directory with random files. 1138 temp_files = make_random_device_files( 1139 self.device, in_dir=remote_dir, num_files=32) 1140 1141 for temp_file in temp_files: 1142 self.device.shell( 1143 ['ln', '-s', '../contents/{}'.format(temp_file.base_name), 1144 posixpath.join(remote_links, temp_file.base_name)]) 1145 1146 self.device.pull(remote=remote_symlink, local=host_dir) 1147 1148 for temp_file in temp_files: 1149 host_path = os.path.join( 1150 host_dir, 'symlink', temp_file.base_name) 1151 self._verify_local(temp_file.checksum, host_path) 1152 1153 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1154 finally: 1155 if host_dir is not None: 1156 shutil.rmtree(host_dir) 1157 1158 def test_pull_empty(self): 1159 """Pull a directory containing an empty directory from the device.""" 1160 try: 1161 host_dir = tempfile.mkdtemp() 1162 1163 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty') 1164 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1165 self.device.shell(['mkdir', '-p', remote_empty_path]) 1166 1167 self.device.pull(remote=remote_empty_path, local=host_dir) 1168 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty'))) 1169 finally: 1170 if host_dir is not None: 1171 shutil.rmtree(host_dir) 1172 1173 def test_multiple_pull(self): 1174 """Pull a randomly generated directory of files from the device.""" 1175 1176 try: 1177 host_dir = tempfile.mkdtemp() 1178 1179 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir') 1180 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1181 self.device.shell(['mkdir', '-p', subdir]) 1182 1183 # Create some random files and a subdirectory containing more files. 1184 temp_files = make_random_device_files( 1185 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4) 1186 1187 subdir_temp_files = make_random_device_files( 1188 self.device, in_dir=subdir, num_files=4, prefix='subdir_') 1189 1190 paths = [x.full_path for x in temp_files] 1191 paths.append(subdir) 1192 self.device._simple_call(['pull'] + paths + [host_dir]) 1193 1194 for temp_file in temp_files: 1195 local_path = os.path.join(host_dir, temp_file.base_name) 1196 self._verify_local(temp_file.checksum, local_path) 1197 1198 for subdir_temp_file in subdir_temp_files: 1199 local_path = os.path.join(host_dir, 1200 'subdir', 1201 subdir_temp_file.base_name) 1202 self._verify_local(subdir_temp_file.checksum, local_path) 1203 1204 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1205 finally: 1206 if host_dir is not None: 1207 shutil.rmtree(host_dir) 1208 1209 def verify_sync(self, device, temp_files, device_dir): 1210 """Verifies that a list of temp files was synced to the device.""" 1211 # Confirm that every file on the device mirrors that on the host. 1212 for temp_file in temp_files: 1213 device_full_path = posixpath.join( 1214 device_dir, temp_file.base_name) 1215 dev_md5, _ = device.shell( 1216 [get_md5_prog(self.device), device_full_path])[0].split() 1217 self.assertEqual(temp_file.checksum, dev_md5) 1218 1219 def test_sync(self): 1220 """Sync a host directory to the data partition.""" 1221 1222 try: 1223 base_dir = tempfile.mkdtemp() 1224 1225 # Create mirror device directory hierarchy within base_dir. 1226 full_dir_path = base_dir + self.DEVICE_TEMP_DIR 1227 os.makedirs(full_dir_path) 1228 1229 # Create 32 random files within the host mirror. 1230 temp_files = make_random_host_files( 1231 in_dir=full_dir_path, num_files=32) 1232 1233 # Clean up any stale files on the device. 1234 device = adb.get_device() # pylint: disable=no-member 1235 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1236 1237 old_product_out = os.environ.get('ANDROID_PRODUCT_OUT') 1238 os.environ['ANDROID_PRODUCT_OUT'] = base_dir 1239 device.sync('data') 1240 if old_product_out is None: 1241 del os.environ['ANDROID_PRODUCT_OUT'] 1242 else: 1243 os.environ['ANDROID_PRODUCT_OUT'] = old_product_out 1244 1245 self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR) 1246 1247 #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1248 finally: 1249 if base_dir is not None: 1250 shutil.rmtree(base_dir) 1251 1252 def test_push_sync(self): 1253 """Sync a host directory to a specific path.""" 1254 1255 try: 1256 temp_dir = tempfile.mkdtemp() 1257 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1258 1259 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1260 1261 # Clean up any stale files on the device. 1262 device = adb.get_device() # pylint: disable=no-member 1263 device.shell(['rm', '-rf', device_dir]) 1264 1265 device.push(temp_dir, device_dir, sync=True) 1266 1267 self.verify_sync(device, temp_files, device_dir) 1268 1269 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1270 finally: 1271 if temp_dir is not None: 1272 shutil.rmtree(temp_dir) 1273 1274 def test_push_dry_run_nonexistent_file(self): 1275 """Push with dry run.""" 1276 1277 for file_size in [8, 1024 * 1024]: 1278 try: 1279 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run') 1280 device_file = posixpath.join(device_dir, 'file') 1281 1282 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1283 self.device.shell(['mkdir', '-p', device_dir]) 1284 1285 host_dir = tempfile.mkdtemp() 1286 host_file = posixpath.join(host_dir, 'file') 1287 1288 with open(host_file, "w") as f: 1289 f.write('x' * file_size) 1290 1291 self.device._simple_call(['push', '-n', host_file, device_file]) 1292 rc, _, _ = self.device.shell_nocheck(['[', '-e', device_file, ']']) 1293 self.assertNotEqual(0, rc) 1294 1295 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1296 finally: 1297 if host_dir is not None: 1298 shutil.rmtree(host_dir) 1299 1300 def test_push_dry_run_existent_file(self): 1301 """Push with dry run.""" 1302 1303 for file_size in [8, 1024 * 1024]: 1304 try: 1305 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run') 1306 device_file = posixpath.join(device_dir, 'file') 1307 1308 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1309 self.device.shell(['mkdir', '-p', device_dir]) 1310 self.device.shell(['echo', 'foo', '>', device_file]) 1311 1312 host_dir = tempfile.mkdtemp() 1313 host_file = posixpath.join(host_dir, 'file') 1314 1315 with open(host_file, "w") as f: 1316 f.write('x' * file_size) 1317 1318 self.device._simple_call(['push', '-n', host_file, device_file]) 1319 stdout, stderr = self.device.shell(['cat', device_file]) 1320 self.assertEqual(stdout.strip(), "foo") 1321 1322 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1323 finally: 1324 if host_dir is not None: 1325 shutil.rmtree(host_dir) 1326 1327 def test_unicode_paths(self): 1328 """Ensure that we can support non-ASCII paths, even on Windows.""" 1329 name = u'로보카 폴리' 1330 1331 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1332 remote_path = u'/data/local/tmp/adb-test-{}'.format(name) 1333 1334 ## push. 1335 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) 1336 tf.close() 1337 self.device.push(tf.name, remote_path) 1338 os.remove(tf.name) 1339 self.assertFalse(os.path.exists(tf.name)) 1340 1341 # Verify that the device ended up with the expected UTF-8 path 1342 output = self.device.shell( 1343 ['ls', '/data/local/tmp/adb-test-*'])[0].strip() 1344 self.assertEqual(remote_path, output) 1345 1346 # pull. 1347 self.device.pull(remote_path, tf.name) 1348 self.assertTrue(os.path.exists(tf.name)) 1349 os.remove(tf.name) 1350 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1351 1352 1353class FileOperationsTestUncompressed(FileOperationsTest.Base): 1354 compression = "none" 1355 1356 1357class FileOperationsTestBrotli(FileOperationsTest.Base): 1358 compression = "brotli" 1359 1360 1361class FileOperationsTestLZ4(FileOperationsTest.Base): 1362 compression = "lz4" 1363 1364 1365class FileOperationsTestZstd(FileOperationsTest.Base): 1366 compression = "zstd" 1367 1368 1369class DeviceOfflineTest(DeviceTest): 1370 def _get_device_state(self, serialno): 1371 output = subprocess.check_output(self.device.adb_cmd + ['devices']) 1372 for line in output.split('\n'): 1373 m = re.match('(\S+)\s+(\S+)', line) 1374 if m and m.group(1) == serialno: 1375 return m.group(2) 1376 return None 1377 1378 def disabled_test_killed_when_pushing_a_large_file(self): 1379 """ 1380 While running adb push with a large file, kill adb server. 1381 Occasionally the device becomes offline. Because the device is still 1382 reading data without realizing that the adb server has been restarted. 1383 Test if we can bring the device online automatically now. 1384 http://b/32952319 1385 """ 1386 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1387 # 1. Push a large file 1388 file_path = 'tmp_large_file' 1389 try: 1390 fh = open(file_path, 'w') 1391 fh.write('\0' * (100 * 1024 * 1024)) 1392 fh.close() 1393 subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp']) 1394 time.sleep(0.1) 1395 # 2. Kill the adb server 1396 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1397 subproc.terminate() 1398 finally: 1399 try: 1400 os.unlink(file_path) 1401 except: 1402 pass 1403 # 3. See if the device still exist. 1404 # Sleep to wait for the adb server exit. 1405 time.sleep(0.5) 1406 # 4. The device should be online 1407 self.assertEqual(self._get_device_state(serialno), 'device') 1408 1409 def disabled_test_killed_when_pulling_a_large_file(self): 1410 """ 1411 While running adb pull with a large file, kill adb server. 1412 Occasionally the device can't be connected. Because the device is trying to 1413 send a message larger than what is expected by the adb server. 1414 Test if we can bring the device online automatically now. 1415 """ 1416 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1417 file_path = 'tmp_large_file' 1418 try: 1419 # 1. Create a large file on device. 1420 self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file', 1421 'bs=1000000', 'count=100']) 1422 # 2. Pull the large file on host. 1423 subproc = subprocess.Popen(self.device.adb_cmd + 1424 ['pull','/data/local/tmp/tmp_large_file', file_path]) 1425 time.sleep(0.1) 1426 # 3. Kill the adb server 1427 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1428 subproc.terminate() 1429 finally: 1430 try: 1431 os.unlink(file_path) 1432 except: 1433 pass 1434 # 4. See if the device still exist. 1435 # Sleep to wait for the adb server exit. 1436 time.sleep(0.5) 1437 self.assertEqual(self._get_device_state(serialno), 'device') 1438 1439 1440 def test_packet_size_regression(self): 1441 """Test for http://b/37783561 1442 1443 Receiving packets of a length divisible by 512 but not 1024 resulted in 1444 the adb client waiting indefinitely for more input. 1445 """ 1446 # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n 1447 # Probe some surrounding values as well, for the hell of it. 1448 for base in [512] + list(range(1024, 1024 * 16, 1024)): 1449 for offset in [-6, -5, -4]: 1450 length = base + offset 1451 cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;' 1452 'echo', 'foo'] 1453 rc, stdout, _ = self.device.shell_nocheck(cmd) 1454 1455 self.assertEqual(0, rc) 1456 1457 # Output should be '\0' * length, followed by "foo\n" 1458 self.assertEqual(length, len(stdout) - 4) 1459 self.assertEqual(stdout, "\0" * length + "foo\n") 1460 1461 def test_zero_packet(self): 1462 """Test for http://b/113070258 1463 1464 Make sure that we don't blow up when sending USB transfers that line up 1465 exactly with the USB packet size. 1466 """ 1467 1468 local_port = int(self.device.forward("tcp:0", "tcp:12345")) 1469 try: 1470 for size in [512, 1024]: 1471 def listener(): 1472 cmd = ["echo foo | nc -l -p 12345; echo done"] 1473 rc, stdout, stderr = self.device.shell_nocheck(cmd) 1474 1475 thread = threading.Thread(target=listener) 1476 thread.start() 1477 1478 # Wait a bit to let the shell command start. 1479 time.sleep(0.25) 1480 1481 sock = socket.create_connection(("localhost", local_port)) 1482 with contextlib.closing(sock): 1483 bytesWritten = sock.send(b"a" * size) 1484 self.assertEqual(size, bytesWritten) 1485 readBytes = sock.recv(4096) 1486 self.assertEqual(b"foo\n", readBytes) 1487 1488 thread.join() 1489 finally: 1490 self.device.forward_remove("tcp:{}".format(local_port)) 1491 1492 1493class SocketTest(DeviceTest): 1494 def test_socket_flush(self): 1495 """Test that we handle socket closure properly. 1496 1497 If we're done writing to a socket, closing before the other end has 1498 closed will send a TCP_RST if we have incoming data queued up, which 1499 may result in data that we've written being discarded. 1500 1501 Bug: http://b/74616284 1502 """ 1503 s = socket.create_connection(("localhost", 5037)) 1504 1505 def adb_length_prefixed(string): 1506 encoded = string.encode("utf8") 1507 result = b"%04x%s" % (len(encoded), encoded) 1508 return result 1509 1510 if "ANDROID_SERIAL" in os.environ: 1511 transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"] 1512 else: 1513 transport_string = "host:transport-any" 1514 1515 s.sendall(adb_length_prefixed(transport_string)) 1516 response = s.recv(4) 1517 self.assertEqual(b"OKAY", response) 1518 1519 shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo" 1520 s.sendall(adb_length_prefixed(shell_string)) 1521 1522 response = s.recv(4) 1523 self.assertEqual(b"OKAY", response) 1524 1525 # Spawn a thread that dumps garbage into the socket until failure. 1526 def spam(): 1527 buf = b"\0" * 16384 1528 try: 1529 while True: 1530 s.sendall(buf) 1531 except Exception as ex: 1532 print(ex) 1533 1534 thread = threading.Thread(target=spam) 1535 thread.start() 1536 1537 time.sleep(1) 1538 1539 received = b"" 1540 while True: 1541 read = s.recv(512) 1542 if len(read) == 0: 1543 break 1544 received += read 1545 1546 self.assertEqual(1024 * 1024 + len("foo\n"), len(received)) 1547 thread.join() 1548 1549 1550if sys.platform == "win32": 1551 # From https://stackoverflow.com/a/38749458 1552 import os 1553 import contextlib 1554 import msvcrt 1555 import ctypes 1556 from ctypes import wintypes 1557 1558 kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) 1559 1560 GENERIC_READ = 0x80000000 1561 GENERIC_WRITE = 0x40000000 1562 FILE_SHARE_READ = 1 1563 FILE_SHARE_WRITE = 2 1564 CONSOLE_TEXTMODE_BUFFER = 1 1565 INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value 1566 STD_OUTPUT_HANDLE = wintypes.DWORD(-11) 1567 STD_ERROR_HANDLE = wintypes.DWORD(-12) 1568 1569 def _check_zero(result, func, args): 1570 if not result: 1571 raise ctypes.WinError(ctypes.get_last_error()) 1572 return args 1573 1574 def _check_invalid(result, func, args): 1575 if result == INVALID_HANDLE_VALUE: 1576 raise ctypes.WinError(ctypes.get_last_error()) 1577 return args 1578 1579 if not hasattr(wintypes, 'LPDWORD'): # Python 2 1580 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD) 1581 wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT) 1582 1583 class COORD(ctypes.Structure): 1584 _fields_ = (('X', wintypes.SHORT), 1585 ('Y', wintypes.SHORT)) 1586 1587 class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure): 1588 _fields_ = (('cbSize', wintypes.ULONG), 1589 ('dwSize', COORD), 1590 ('dwCursorPosition', COORD), 1591 ('wAttributes', wintypes.WORD), 1592 ('srWindow', wintypes.SMALL_RECT), 1593 ('dwMaximumWindowSize', COORD), 1594 ('wPopupAttributes', wintypes.WORD), 1595 ('bFullscreenSupported', wintypes.BOOL), 1596 ('ColorTable', wintypes.DWORD * 16)) 1597 def __init__(self, *args, **kwds): 1598 super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__( 1599 *args, **kwds) 1600 self.cbSize = ctypes.sizeof(self) 1601 1602 PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER( 1603 CONSOLE_SCREEN_BUFFER_INFOEX) 1604 LPSECURITY_ATTRIBUTES = wintypes.LPVOID 1605 1606 kernel32.GetStdHandle.errcheck = _check_invalid 1607 kernel32.GetStdHandle.restype = wintypes.HANDLE 1608 kernel32.GetStdHandle.argtypes = ( 1609 wintypes.DWORD,) # _In_ nStdHandle 1610 1611 kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid 1612 kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE 1613 kernel32.CreateConsoleScreenBuffer.argtypes = ( 1614 wintypes.DWORD, # _In_ dwDesiredAccess 1615 wintypes.DWORD, # _In_ dwShareMode 1616 LPSECURITY_ATTRIBUTES, # _In_opt_ lpSecurityAttributes 1617 wintypes.DWORD, # _In_ dwFlags 1618 wintypes.LPVOID) # _Reserved_ lpScreenBufferData 1619 1620 kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero 1621 kernel32.GetConsoleScreenBufferInfoEx.argtypes = ( 1622 wintypes.HANDLE, # _In_ hConsoleOutput 1623 PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo 1624 1625 kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero 1626 kernel32.SetConsoleScreenBufferInfoEx.argtypes = ( 1627 wintypes.HANDLE, # _In_ hConsoleOutput 1628 PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_ lpConsoleScreenBufferInfo 1629 1630 kernel32.SetConsoleWindowInfo.errcheck = _check_zero 1631 kernel32.SetConsoleWindowInfo.argtypes = ( 1632 wintypes.HANDLE, # _In_ hConsoleOutput 1633 wintypes.BOOL, # _In_ bAbsolute 1634 wintypes.PSMALL_RECT) # _In_ lpConsoleWindow 1635 1636 kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero 1637 kernel32.FillConsoleOutputCharacterW.argtypes = ( 1638 wintypes.HANDLE, # _In_ hConsoleOutput 1639 wintypes.WCHAR, # _In_ cCharacter 1640 wintypes.DWORD, # _In_ nLength 1641 COORD, # _In_ dwWriteCoord 1642 wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten 1643 1644 kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero 1645 kernel32.ReadConsoleOutputCharacterW.argtypes = ( 1646 wintypes.HANDLE, # _In_ hConsoleOutput 1647 wintypes.LPWSTR, # _Out_ lpCharacter 1648 wintypes.DWORD, # _In_ nLength 1649 COORD, # _In_ dwReadCoord 1650 wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead 1651 1652 @contextlib.contextmanager 1653 def allocate_console(): 1654 allocated = kernel32.AllocConsole() 1655 try: 1656 yield allocated 1657 finally: 1658 if allocated: 1659 kernel32.FreeConsole() 1660 1661 @contextlib.contextmanager 1662 def console_screen(ncols=None, nrows=None): 1663 info = CONSOLE_SCREEN_BUFFER_INFOEX() 1664 new_info = CONSOLE_SCREEN_BUFFER_INFOEX() 1665 nwritten = (wintypes.DWORD * 1)() 1666 hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE) 1667 kernel32.GetConsoleScreenBufferInfoEx( 1668 hStdOut, ctypes.byref(info)) 1669 if ncols is None: 1670 ncols = info.dwSize.X 1671 if nrows is None: 1672 nrows = info.dwSize.Y 1673 elif nrows > 9999: 1674 raise ValueError('nrows must be 9999 or less') 1675 fd_screen = None 1676 hScreen = kernel32.CreateConsoleScreenBuffer( 1677 GENERIC_READ | GENERIC_WRITE, 1678 FILE_SHARE_READ | FILE_SHARE_WRITE, 1679 None, CONSOLE_TEXTMODE_BUFFER, None) 1680 try: 1681 fd_screen = msvcrt.open_osfhandle( 1682 hScreen, os.O_RDWR | os.O_BINARY) 1683 kernel32.GetConsoleScreenBufferInfoEx( 1684 hScreen, ctypes.byref(new_info)) 1685 new_info.dwSize = COORD(ncols, nrows) 1686 new_info.srWindow = wintypes.SMALL_RECT( 1687 Left=0, Top=0, Right=(ncols - 1), 1688 Bottom=(info.srWindow.Bottom - info.srWindow.Top)) 1689 kernel32.SetConsoleScreenBufferInfoEx( 1690 hScreen, ctypes.byref(new_info)) 1691 kernel32.SetConsoleWindowInfo(hScreen, True, 1692 ctypes.byref(new_info.srWindow)) 1693 kernel32.FillConsoleOutputCharacterW( 1694 hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten) 1695 kernel32.SetConsoleActiveScreenBuffer(hScreen) 1696 try: 1697 yield fd_screen 1698 finally: 1699 kernel32.SetConsoleScreenBufferInfoEx( 1700 hStdOut, ctypes.byref(info)) 1701 kernel32.SetConsoleWindowInfo(hStdOut, True, 1702 ctypes.byref(info.srWindow)) 1703 kernel32.SetConsoleActiveScreenBuffer(hStdOut) 1704 finally: 1705 if fd_screen is not None: 1706 os.close(fd_screen) 1707 else: 1708 kernel32.CloseHandle(hScreen) 1709 1710 def read_screen(fd): 1711 hScreen = msvcrt.get_osfhandle(fd) 1712 csbi = CONSOLE_SCREEN_BUFFER_INFOEX() 1713 kernel32.GetConsoleScreenBufferInfoEx( 1714 hScreen, ctypes.byref(csbi)) 1715 ncols = csbi.dwSize.X 1716 pos = csbi.dwCursorPosition 1717 length = ncols * pos.Y + pos.X + 1 1718 buf = (ctypes.c_wchar * length)() 1719 n = (wintypes.DWORD * 1)() 1720 kernel32.ReadConsoleOutputCharacterW( 1721 hScreen, buf, length, COORD(0,0), n) 1722 lines = [buf[i:i+ncols].rstrip(u'\0') 1723 for i in range(0, n[0], ncols)] 1724 return u'\n'.join(lines) 1725 1726@unittest.skipUnless(sys.platform == "win32", "requires Windows") 1727class WindowsConsoleTest(DeviceTest): 1728 def test_unicode_output(self): 1729 """Test Unicode command line parameters and Unicode console window output. 1730 1731 Bug: https://issuetracker.google.com/issues/111972753 1732 """ 1733 # If we don't have a console window, allocate one. This isn't necessary if we're already 1734 # being run from a console window, which is typical. 1735 with allocate_console() as allocated_console: 1736 # Create a temporary console buffer and switch to it. We could also pass a parameter of 1737 # ncols=len(unicode_string), but it causes the window to flash as it is resized and 1738 # likely unnecessary given the typical console window size. 1739 with console_screen(nrows=1000) as screen: 1740 unicode_string = u'로보카 폴리' 1741 # Run adb and allow it to detect that stdout is a console, not a pipe, by using 1742 # device.shell_popen() which does not use a pipe, unlike device.shell(). 1743 process = self.device.shell_popen(['echo', '"' + unicode_string + '"']) 1744 process.wait() 1745 # Read what was written by adb to the temporary console buffer. 1746 console_output = read_screen(screen) 1747 self.assertEqual(unicode_string, console_output) 1748 1749 1750def main(): 1751 random.seed(0) 1752 if len(adb.get_devices()) > 0: 1753 suite = unittest.TestLoader().loadTestsFromName(__name__) 1754 unittest.TextTestRunner(verbosity=3).run(suite) 1755 else: 1756 print('Test suite must be run with attached devices') 1757 1758 1759if __name__ == '__main__': 1760 main() 1761