1from acts import asserts
2from acts import base_test
3from acts import signals
4from acts.controllers import android_device
5from acts.test_decorators import test_tracker_info
6
7from scapy.all import *
8from threading import Event
9from threading import Thread
10import random
11import time
12import warnings
13
14
15CLIENT_PORT = 68
16SERVER_PORT = 67
17BROADCAST_MAC = 'ff:ff:ff:ff:ff:ff'
18INET4_ANY = '0.0.0.0'
19NETADDR_PREFIX = '192.168.42.'
20OTHER_NETADDR_PREFIX = '192.168.43.'
21NETADDR_BROADCAST = '255.255.255.255'
22SUBNET_BROADCAST = NETADDR_PREFIX + '255'
23USB_CHARGE_MODE = 'svc usb setFunctions'
24USB_TETHERING_MODE = 'svc usb setFunctions rndis'
25DEVICE_IP_ADDRESS = 'ip address'
26
27
28OFFER = 2
29REQUEST = 3
30ACK = 5
31NAK = 6
32
33
34class DhcpServerTest(base_test.BaseTestClass):
35    def setup_class(self):
36        self.dut = self.android_devices[0]
37        self.USB_TETHERED = False
38        self.next_hwaddr_index = 0
39        self.stop_arp = Event()
40
41        conf.checkIPaddr = 0
42        conf.checkIPsrc = 0
43        # Allow using non-67 server ports as long as client uses 68
44        bind_layers(UDP, BOOTP, dport=CLIENT_PORT)
45
46        iflist_before = get_if_list()
47        self._start_usb_tethering(self.dut)
48        self.iface = self._wait_for_new_iface(iflist_before)
49        self.real_hwaddr = get_if_raw_hwaddr(self.iface)
50
51        # Start a thread to answer to all ARP "who-has"
52        thread = Thread(target=self._sniff_arp, args=(self.stop_arp,))
53        thread.start()
54
55        # Discover server IP and device hwaddr
56        hwaddr = self._next_hwaddr()
57        resp = self._get_response(self._make_discover(hwaddr))
58        asserts.assert_false(None == resp,
59            "Device did not reply to first DHCP discover")
60        self.server_addr = getopt(resp, 'server_id')
61        self.dut_hwaddr = resp.getlayer(Ether).src
62        asserts.assert_false(None == self.server_addr,
63            "DHCP server did not specify server identifier")
64        # Ensure that we don't depend on assigned route/gateway on the host
65        conf.route.add(host=self.server_addr, dev=self.iface, gw="0.0.0.0")
66
67    def setup_test(self):
68        # Some versions of scapy do not close the receive file properly
69        warnings.filterwarnings("ignore", category=ResourceWarning)
70
71        bind_layers(UDP, BOOTP, dport=68)
72        self.hwaddr = self._next_hwaddr()
73        self.other_hwaddr = self._next_hwaddr()
74        self.cleanup_releases = []
75
76    def teardown_test(self):
77        for packet in self.cleanup_releases:
78            self._send(packet)
79
80    def teardown_class(self):
81        self.stop_arp.set()
82        self._stop_usb_tethering(self.dut)
83
84    def on_fail(self, test_name, begin_time):
85        self.dut.take_bug_report(test_name, begin_time)
86
87    def _start_usb_tethering(self, dut):
88        """ Start USB tethering
89
90        Args:
91            1. dut - ad object
92        """
93        self.log.info("Starting USB Tethering")
94        dut.stop_services()
95        dut.adb.shell(USB_TETHERING_MODE, ignore_status=True)
96        dut.adb.wait_for_device()
97        dut.start_services()
98        if 'rndis' not in dut.adb.shell(DEVICE_IP_ADDRESS):
99            raise signals.TestFailure('Unable to enable USB tethering.')
100        self.USB_TETHERED = True
101
102    def _stop_usb_tethering(self, dut):
103        """ Stop USB tethering
104
105        Args:
106            1. dut - ad object
107        """
108        self.log.info("Stopping USB Tethering")
109        dut.stop_services()
110        dut.adb.shell(USB_CHARGE_MODE)
111        dut.adb.wait_for_device()
112        dut.start_services()
113        self.USB_TETHERED = False
114
115    def _wait_for_device(self, dut):
116        """ Wait for device to come back online
117
118        Args:
119            1. dut - ad object
120        """
121        while dut.serial not in android_device.list_adb_devices():
122            pass
123        dut.adb.wait_for_device()
124
125    def _wait_for_new_iface(self, old_ifaces):
126        old_set = set(old_ifaces)
127        # Try 10 times to find a new interface with a 1s sleep every time
128        # (equivalent to a 9s timeout)
129        for i in range(0, 10):
130            new_ifaces = set(get_if_list()) - old_set
131            asserts.assert_true(len(new_ifaces) < 2,
132                "Too many new interfaces after turning on tethering")
133            if len(new_ifaces) == 1:
134                return new_ifaces.pop()
135            time.sleep(1)
136        asserts.fail("Timeout waiting for tethering interface on host")
137
138    def _sniff_arp(self, stop_arp):
139        try:
140            sniff(iface=self.iface, filter='arp', prn=self._handle_arp, store=0,
141                stop_filter=lambda p: stop_arp.is_set())
142        except:
143            # sniff may raise when tethering is disconnected. Ignore
144            # exceptions if stop was requested.
145            if not stop_arp.is_set():
146                raise
147
148    def _handle_arp(self, packet):
149        # Reply to all arp "who-has": say we have everything
150        if packet[ARP].op == ARP.who_has:
151            reply = ARP(op=ARP.is_at, hwsrc=self.real_hwaddr, psrc=packet.pdst,
152                hwdst=BROADCAST_MAC, pdst=SUBNET_BROADCAST)
153            sendp(Ether(dst=BROADCAST_MAC, src=self.real_hwaddr) / reply,
154                iface=self.iface, verbose=False)
155
156    @test_tracker_info(uuid="a8712769-977a-4ee1-902f-90b3ba30b40c")
157    def test_config_assumptions(self):
158        resp = self._get_response(self._make_discover(self.hwaddr))
159        asserts.assert_false(None == resp, "Device did not reply to discover")
160        asserts.assert_true(get_yiaddr(resp).startswith(NETADDR_PREFIX),
161            "Server does not use expected prefix")
162
163    @test_tracker_info(uuid="e3761689-7d64-46b1-97ce-15f315eaf568")
164    def test_discover_broadcastbit(self):
165        resp = self._get_response(
166            self._make_discover(self.hwaddr, bcastbit=True))
167        self._assert_offer(resp)
168        self._assert_broadcast(resp)
169
170    @test_tracker_info(uuid="30a7ea7c-c20f-4c46-aaf2-96f19d8f8191")
171    def test_discover_bootpfields(self):
172        discover = self._make_discover(self.hwaddr)
173        resp = self._get_response(discover)
174        self._assert_offer(resp)
175        self._assert_unicast(resp)
176        bootp = assert_bootp_response(resp, discover)
177        asserts.assert_equal(INET4_ANY, bootp.ciaddr)
178        asserts.assert_equal(self.server_addr, bootp.siaddr)
179        asserts.assert_equal(INET4_ANY, bootp.giaddr)
180        asserts.assert_equal(self.hwaddr, get_chaddr(bootp))
181
182    @test_tracker_info(uuid="593f4051-516d-44fa-8834-7d485362f182")
183    def test_discover_relayed_broadcastbit(self):
184        giaddr = NETADDR_PREFIX + '123'
185        resp = self._get_response(
186            self._make_discover(self.hwaddr, giaddr=giaddr, bcastbit=True))
187        self._assert_offer(resp)
188        self._assert_relayed(resp, giaddr)
189        self._assert_broadcastbit(resp)
190
191    def _run_discover_paramrequestlist(self, params, unwanted_params):
192        params_opt = make_paramrequestlist_opt(params)
193        resp = self._get_response(
194            self._make_discover(self.hwaddr, options=[params_opt]))
195
196        self._assert_offer(resp)
197        # List of requested params in response order
198        resp_opts = get_opt_labels(resp)
199        resp_requested_opts = [opt for opt in resp_opts if opt in params]
200        # All above params should be supported, order should be conserved
201        asserts.assert_equal(params, resp_requested_opts)
202        asserts.assert_equal(0, len(set(resp_opts) & set(unwanted_params)))
203        return resp
204
205    @test_tracker_info(uuid="00a8a3f6-f143-47ff-a79b-482c607fb5b8")
206    def test_discover_paramrequestlist(self):
207        resp = self._run_discover_paramrequestlist(
208            ['subnet_mask', 'broadcast_address', 'router', 'name_server'],
209            unwanted_params=[])
210        for opt in ['broadcast_address', 'router', 'name_server']:
211            asserts.assert_true(getopt(resp, opt).startswith(NETADDR_PREFIX),
212                opt + ' does not start with ' + NETADDR_PREFIX)
213
214        subnet_mask = getopt(resp, 'subnet_mask')
215        asserts.assert_true(subnet_mask.startswith('255.255.'),
216            'Unexpected subnet mask for /16+: ' + subnet_mask)
217
218    @test_tracker_info(uuid="d1aad4a3-9eab-4900-aa6a-5b82a4a64f46")
219    def test_discover_paramrequestlist_rev(self):
220        # RFC2132 #9.8: "The DHCP server is not required to return the options
221        # in the requested order, but MUST try to insert the requested options
222        # in the order requested"
223        asserts.skip('legacy behavior not compliant: fixed order used')
224        self._run_discover_paramrequestlist(
225            ['name_server', 'router', 'broadcast_address', 'subnet_mask'],
226            unwanted_params=[])
227
228    @test_tracker_info(uuid="e3ae6335-8cc7-4bf1-bb58-67646b727f2b")
229    def test_discover_paramrequestlist_unwanted(self):
230        asserts.skip('legacy behavior always sends all parameters')
231        self._run_discover_paramrequestlist(['router', 'name_server'],
232            unwanted_params=['broadcast_address', 'subnet_mask'])
233
234    def _assert_renews(self, request, addr, exp_time, resp_type=ACK):
235        # Sleep to test lease time renewal
236        time.sleep(3)
237        resp = self._get_response(request)
238        self._assert_type(resp, resp_type)
239        asserts.assert_equal(addr, get_yiaddr(resp))
240        remaining_lease = getopt(resp, 'lease_time')
241        # Lease renewed: waited for 3s, lease time not decreased by more than 2
242        asserts.assert_true(remaining_lease >= exp_time - 2,
243            'Lease not renewed')
244        # Lease times should be consistent across offers/renewals
245        asserts.assert_true(remaining_lease <= exp_time + 2,
246            'Lease time inconsistent')
247        return resp
248
249    @test_tracker_info(uuid="d6b598b7-f443-4b5a-ba80-4af5d211cade")
250    def test_discover_assigned_ownaddress(self):
251        addr, siaddr, resp = self._request_address(self.hwaddr)
252
253        lease_time = getopt(resp, 'lease_time')
254        server_id = getopt(resp, 'server_id')
255        asserts.assert_true(lease_time >= 60, "Lease time is too short")
256        asserts.assert_false(addr == INET4_ANY, "Assigned address is empty")
257        # Wait to test lease expiration time change
258        time.sleep(3)
259
260        # New discover, same address
261        resp = self._assert_renews(self._make_discover(self.hwaddr),
262            addr, lease_time, resp_type=OFFER)
263        self._assert_unicast(resp, get_yiaddr(resp))
264        self._assert_broadcastbit(resp, isset=False)
265
266    @test_tracker_info(uuid="cbb07d77-912b-4269-bbbc-adba99779587")
267    def test_discover_assigned_otherhost(self):
268        addr, siaddr, _ = self._request_address(self.hwaddr)
269
270        # New discover, same address, different client
271        resp = self._get_response(self._make_discover(self.other_hwaddr,
272            [('requested_addr', addr)]))
273
274        self._assert_offer(resp)
275        asserts.assert_false(get_yiaddr(resp) == addr,
276            "Already assigned address offered")
277        self._assert_unicast(resp, get_yiaddr(resp))
278        self._assert_broadcastbit(resp, isset=False)
279
280    @test_tracker_info(uuid="3d2b3d2f-eb5f-498f-b887-3b4638cebf14")
281    def test_discover_requestaddress(self):
282        addr = NETADDR_PREFIX + '200'
283        resp = self._get_response(self._make_discover(self.hwaddr,
284            [('requested_addr', addr)]))
285        self._assert_offer(resp)
286        asserts.assert_equal(get_yiaddr(resp), addr)
287
288        # Lease not committed: can request again
289        resp = self._get_response(self._make_discover(self.other_hwaddr,
290            [('requested_addr', addr)]))
291        self._assert_offer(resp)
292        asserts.assert_equal(get_yiaddr(resp), addr)
293
294    @test_tracker_info(uuid="5ffd9d25-304e-434b-bedb-56ccf27dcebd")
295    def test_discover_requestaddress_wrongsubnet(self):
296        addr = OTHER_NETADDR_PREFIX + '200'
297        resp = self._get_response(
298            self._make_discover(self.hwaddr, [('requested_addr', addr)]))
299        self._assert_offer(resp)
300        self._assert_unicast(resp)
301        asserts.assert_false(get_yiaddr(resp) == addr,
302            'Server offered invalid address')
303
304    @test_tracker_info(uuid="f7d6a92f-9386-4b65-b6c1-d0a3f11213bf")
305    def test_discover_giaddr_outside_subnet(self):
306        giaddr = OTHER_NETADDR_PREFIX + '201'
307        resp = self._get_response(
308            self._make_discover(self.hwaddr, giaddr=giaddr))
309        asserts.assert_equal(resp, None)
310
311    @test_tracker_info(uuid="1348c79a-9203-4bb8-b33b-af80bacd17b1")
312    def test_discover_srcaddr_outside_subnet(self):
313        srcaddr = OTHER_NETADDR_PREFIX + '200'
314        resp = self._get_response(
315            self._make_discover(self.hwaddr, ip_src=srcaddr))
316        self._assert_offer(resp)
317        asserts.assert_false(srcaddr == get_yiaddr(resp),
318            'Server offered invalid address')
319
320    @test_tracker_info(uuid="a03bb783-8665-4c66-9c0c-1bb02ddca07e")
321    def test_discover_requestaddress_giaddr_outside_subnet(self):
322        addr = NETADDR_PREFIX + '200'
323        giaddr = OTHER_NETADDR_PREFIX + '201'
324        req = self._make_discover(self.hwaddr, [('requested_addr', addr)],
325                ip_src=giaddr, giaddr=giaddr)
326        resp = self._get_response(req)
327        asserts.assert_equal(resp, None)
328
329    @test_tracker_info(uuid="725956af-71e2-45d8-b8b3-402d21bfc7db")
330    def test_discover_knownaddress_giaddr_outside_subnet(self):
331        addr, siaddr, _ = self._request_address(self.hwaddr)
332
333        # New discover, same client, through relay in invalid subnet
334        giaddr = OTHER_NETADDR_PREFIX + '200'
335        resp = self._get_response(
336            self._make_discover(self.hwaddr, giaddr=giaddr))
337        asserts.assert_equal(resp, None)
338
339    @test_tracker_info(uuid="2ee9d5b1-c15d-40c4-98e9-63202d1f1557")
340    def test_discover_knownaddress_giaddr_valid_subnet(self):
341        addr, siaddr, _ = self._request_address(self.hwaddr)
342
343        # New discover, same client, through relay in valid subnet
344        giaddr = NETADDR_PREFIX + '200'
345        resp = self._get_response(
346            self._make_discover(self.hwaddr, giaddr=giaddr))
347        self._assert_offer(resp)
348        self._assert_unicast(resp, giaddr)
349        self._assert_broadcastbit(resp, isset=False)
350
351    @test_tracker_info(uuid="f43105a5-633a-417a-8a07-39bc36c493e7")
352    def test_request_unicast(self):
353        addr, siaddr, resp = self._request_address(self.hwaddr, bcast=False)
354        self._assert_unicast(resp, addr)
355
356    @test_tracker_info(uuid="09f3c1c4-1202-4f85-a965-4d86aee069f3")
357    def test_request_bootpfields(self):
358        req_addr = NETADDR_PREFIX + '200'
359        req = self._make_request(self.hwaddr, req_addr, self.server_addr)
360        resp = self._get_response(req)
361        self._assert_ack(resp)
362        bootp = assert_bootp_response(resp, req)
363        asserts.assert_equal(INET4_ANY, bootp.ciaddr)
364        asserts.assert_equal(self.server_addr, bootp.siaddr)
365        asserts.assert_equal(INET4_ANY, bootp.giaddr)
366        asserts.assert_equal(self.hwaddr, get_chaddr(bootp))
367
368    @test_tracker_info(uuid="ec00d268-80cb-4be5-9771-2292cc7d2e18")
369    def test_request_selecting_inuse(self):
370        addr, siaddr, _ = self._request_address(self.hwaddr)
371        new_req = self._make_request(self.other_hwaddr, addr, siaddr)
372        resp = self._get_response(new_req)
373        self._assert_nak(resp)
374        self._assert_broadcast(resp)
375        bootp = assert_bootp_response(resp, new_req)
376        asserts.assert_equal(INET4_ANY, bootp.ciaddr)
377        asserts.assert_equal(INET4_ANY, bootp.yiaddr)
378        asserts.assert_equal(INET4_ANY, bootp.siaddr)
379        asserts.assert_equal(INET4_ANY, bootp.giaddr)
380        asserts.assert_equal(self.other_hwaddr, get_chaddr(bootp))
381        asserts.assert_equal(
382            ['message-type', 'server_id', 56, 'end'], # 56 is "message" opt
383            get_opt_labels(bootp))
384        asserts.assert_equal(self.server_addr, getopt(bootp, 'server_id'))
385
386    @test_tracker_info(uuid="0643c179-3542-4297-9b06-8d86ff785e9c")
387    def test_request_selecting_wrongsiaddr(self):
388        addr = NETADDR_PREFIX + '200'
389        wrong_siaddr = NETADDR_PREFIX + '201'
390        asserts.assert_false(wrong_siaddr == self.server_addr,
391            'Test assumption not met: server addr is ' + wrong_siaddr)
392        resp = self._get_response(
393            self._make_request(self.hwaddr, addr, siaddr=wrong_siaddr))
394        asserts.assert_true(resp == None,
395            'Received response for request with incorrect siaddr')
396
397    @test_tracker_info(uuid="676beab2-4af8-4bf0-a4ad-c7626ae5987f")
398    def test_request_selecting_giaddr_outside_subnet(self):
399        addr = NETADDR_PREFIX + '200'
400        giaddr = OTHER_NETADDR_PREFIX + '201'
401        resp = self._get_response(
402            self._make_request(self.hwaddr, addr, siaddr=self.server_addr,
403                giaddr=giaddr))
404        asserts.assert_equal(resp, None)
405
406    @test_tracker_info(uuid="fe17df0c-2f41-416f-bb76-d75b74b63c0f")
407    def test_request_selecting_hostnameupdate(self):
408        addr = NETADDR_PREFIX + '123'
409        hostname1 = b'testhostname1'
410        hostname2 = b'testhostname2'
411        req = self._make_request(self.hwaddr, None, None,
412            options=[
413                ('requested_addr', addr),
414                ('server_id', self.server_addr),
415                ('hostname', hostname1)])
416        resp = self._get_response(req)
417        self._assert_ack(resp)
418        self._assert_unicast(resp, addr)
419        asserts.assert_equal(hostname1, getopt(req, 'hostname'))
420
421        # Re-request with different hostname
422        setopt(req, 'hostname', hostname2)
423        resp = self._get_response(req)
424        self._assert_ack(resp)
425        self._assert_unicast(resp, addr)
426        asserts.assert_equal(hostname2, getopt(req, 'hostname'))
427
428    def _run_initreboot(self, bcastbit):
429        addr, siaddr, resp = self._request_address(self.hwaddr)
430        exp = getopt(resp, 'lease_time')
431
432        # init-reboot: siaddr is None
433        return self._assert_renews(self._make_request(
434                self.hwaddr, addr, siaddr=None, bcastbit=bcastbit), addr, exp)
435
436    @test_tracker_info(uuid="263c91b9-cfe9-4f21-985d-b7046df80528")
437    def test_request_initreboot(self):
438        resp = self._run_initreboot(bcastbit=False)
439        self._assert_unicast(resp)
440        self._assert_broadcastbit(resp, isset=False)
441
442    @test_tracker_info(uuid="f05dd60f-03dd-4e2b-8e58-80f4d752ad51")
443    def test_request_initreboot_broadcastbit(self):
444        resp = self._run_initreboot(bcastbit=True)
445        self._assert_broadcast(resp)
446
447    @test_tracker_info(uuid="5563c616-2136-47f6-9151-4e28cbfe797c")
448    def test_request_initreboot_nolease(self):
449        # RFC2131 #4.3.2
450        asserts.skip("legacy behavior not compliant")
451        addr = NETADDR_PREFIX + '123'
452        resp = self._get_response(self._make_request(self.hwaddr, addr, None))
453        asserts.assert_equal(resp, None)
454
455    @test_tracker_info(uuid="da5c5537-cb38-4a2e-828f-44bc97976fe5")
456    def test_request_initreboot_incorrectlease(self):
457        otheraddr = NETADDR_PREFIX + '123'
458        addr, siaddr, _ = self._request_address(self.hwaddr)
459        asserts.assert_false(addr == otheraddr,
460            "Test assumption not met: server assigned " + otheraddr)
461
462        resp = self._get_response(
463            self._make_request(self.hwaddr, otheraddr, siaddr=None))
464        self._assert_nak(resp)
465        self._assert_broadcast(resp)
466
467    @test_tracker_info(uuid="ce42ba57-07be-427b-9cbd-5535c62b0120")
468    def test_request_initreboot_wrongnet(self):
469        resp = self._get_response(self._make_request(self.hwaddr,
470            OTHER_NETADDR_PREFIX + '1', siaddr=None))
471        self._assert_nak(resp)
472        self._assert_broadcast(resp)
473
474    def _run_rebinding(self, bcastbit, giaddr=INET4_ANY):
475        addr, siaddr, resp = self._request_address(self.hwaddr)
476        exp = getopt(resp, 'lease_time')
477
478        # Rebinding: no siaddr or reqaddr
479        resp = self._assert_renews(
480            self._make_request(self.hwaddr, reqaddr=None, siaddr=None,
481                ciaddr=addr, giaddr=giaddr, ip_src=addr,
482                ip_dst=NETADDR_BROADCAST, bcastbit=bcastbit),
483            addr, exp)
484        return resp, addr
485
486    @test_tracker_info(uuid="68bfcb25-5873-41ad-ad0a-bf22781534ca")
487    def test_request_rebinding(self):
488        resp, addr = self._run_rebinding(bcastbit=False)
489        self._assert_unicast(resp, addr)
490        self._assert_broadcastbit(resp, isset=False)
491
492    @test_tracker_info(uuid="4c591536-8062-40ec-ae12-1ebe7dcad8e2")
493    def test_request_rebinding_relayed(self):
494        giaddr = NETADDR_PREFIX + '123'
495        resp, _ = self._run_rebinding(bcastbit=False, giaddr=giaddr)
496        self._assert_relayed(resp, giaddr)
497        self._assert_broadcastbit(resp, isset=False)
498
499    @test_tracker_info(uuid="cee2668b-bd79-47d7-b358-8f9387d715b1")
500    def test_request_rebinding_inuse(self):
501        addr, siaddr, _ = self._request_address(self.hwaddr)
502
503        resp = self._get_response(self._make_request(
504                self.other_hwaddr, reqaddr=None, siaddr=None, ciaddr=addr))
505        self._assert_nak(resp)
506        self._assert_broadcast(resp)
507
508    @test_tracker_info(uuid="d95d69b5-ab9a-42f5-8dd0-b9b6a6d960cc")
509    def test_request_rebinding_wrongaddr(self):
510        otheraddr = NETADDR_PREFIX + '123'
511        addr, siaddr, _ = self._request_address(self.hwaddr)
512        asserts.assert_false(addr == otheraddr,
513            "Test assumption not met: server assigned " + otheraddr)
514
515        resp = self._get_response(self._make_request(
516            self.hwaddr, reqaddr=None, siaddr=siaddr, ciaddr=otheraddr))
517        self._assert_nak(resp)
518        self._assert_broadcast(resp)
519
520    @test_tracker_info(uuid="421a86b3-8779-4910-8050-7806536efabb")
521    def test_request_rebinding_wrongaddr_relayed(self):
522        otheraddr = NETADDR_PREFIX + '123'
523        relayaddr = NETADDR_PREFIX + '124'
524        addr, siaddr, _ = self._request_address(self.hwaddr)
525        asserts.assert_false(addr == otheraddr,
526            "Test assumption not met: server assigned " + otheraddr)
527        asserts.assert_false(addr == relayaddr,
528            "Test assumption not met: server assigned " + relayaddr)
529
530        req = self._make_request(self.hwaddr, reqaddr=None, siaddr=None,
531            ciaddr=otheraddr, giaddr=relayaddr)
532
533        resp = self._get_response(req)
534        self._assert_nak(resp)
535        self._assert_relayed(resp, relayaddr)
536        self._assert_broadcastbit(resp)
537
538    @test_tracker_info(uuid="6ff1fab4-009a-4758-9153-0d9db63423da")
539    def test_release(self):
540        addr, siaddr, _ = self._request_address(self.hwaddr)
541        # Re-requesting fails
542        resp = self._get_response(
543            self._make_request(self.other_hwaddr, addr, siaddr))
544        self._assert_nak(resp)
545        self._assert_broadcast(resp)
546
547        # Succeeds after release
548        self._send(self._make_release(self.hwaddr, addr, siaddr))
549        resp = self._get_response(
550            self._make_request(self.other_hwaddr, addr, siaddr))
551        self._assert_ack(resp)
552
553    @test_tracker_info(uuid="abb1a53e-6b6c-468f-88b9-ace9ca4d6593")
554    def test_release_noserverid(self):
555        addr, siaddr, _ = self._request_address(self.hwaddr)
556
557        # Release without server_id opt is ignored
558        release = self._make_release(self.hwaddr, addr, siaddr)
559        removeopt(release, 'server_id')
560        self._send(release)
561
562        # Not released: request fails
563        resp = self._get_response(
564            self._make_request(self.other_hwaddr, addr, siaddr))
565        self._assert_nak(resp)
566        self._assert_broadcast(resp)
567
568    @test_tracker_info(uuid="8415b69e-ae61-4474-8495-d783ba6818d1")
569    def test_release_wrongserverid(self):
570        addr, siaddr, _ = self._request_address(self.hwaddr)
571
572        # Release with wrong server id
573        release = self._make_release(self.hwaddr, addr, siaddr)
574        setopt(release, 'server_id', addr)
575        self._send(release)
576
577        # Not released: request fails
578        resp = self._get_response(
579            self._make_request(self.other_hwaddr, addr, siaddr))
580        self._assert_nak(resp)
581        self._assert_broadcast(resp)
582
583    @test_tracker_info(uuid="0858f678-3db2-4c12-a21b-6e16c5d7e7ce")
584    def test_unicast_l2l3(self):
585        reqAddr = NETADDR_PREFIX + '124'
586        resp = self._get_response(self._make_request(
587            self.hwaddr, reqAddr, siaddr=None))
588        self._assert_unicast(resp)
589        str_hwaddr = format_hwaddr(self.hwaddr)
590        asserts.assert_equal(str_hwaddr, resp.getlayer(Ether).dst)
591        asserts.assert_equal(reqAddr, resp.getlayer(IP).dst)
592        asserts.assert_equal(CLIENT_PORT, resp.getlayer(UDP).dport)
593
594    @test_tracker_info(uuid="bf05efe9-ee5b-46ba-9b3c-5a4441c13798")
595    def test_macos_10_13_3_discover(self):
596        params_opt = make_paramrequestlist_opt([
597            'subnet_mask',
598            121, # Classless Static Route
599            'router',
600            'name_server',
601            'domain',
602            119, # Domain Search
603            252, # Private/Proxy autodiscovery
604            95, # LDAP
605            'NetBIOS_server',
606            46, # NetBIOS over TCP/IP Node Type
607            ])
608        req = self._make_discover(self.hwaddr,
609            options=[
610                params_opt,
611                ('max_dhcp_size', 1500),
612                # HW type Ethernet (0x01)
613                ('client_id', b'\x01' + self.hwaddr),
614                ('lease_time', 7776000),
615                ('hostname', b'test12-macbookpro'),
616            ], opts_padding=bytes(6))
617        req.getlayer(BOOTP).secs = 2
618        resp = self._get_response(req)
619        self._assert_standard_offer(resp)
620
621    def _make_macos_10_13_3_paramrequestlist(self):
622        return make_paramrequestlist_opt([
623            'subnet_mask',
624            121, # Classless Static Route
625            'router',
626            'name_server',
627            'domain',
628            119, # Domain Search
629            252, # Private/Proxy autodiscovery
630            95, # LDAP
631            44, # NetBIOS over TCP/IP Name Server
632            46, # NetBIOS over TCP/IP Node Type
633            ])
634
635    @test_tracker_info(uuid="bf05efe9-ee5b-46ba-9b3c-5a4441c13798")
636    def test_macos_10_13_3_discover(self):
637        req = self._make_discover(self.hwaddr,
638            options=[
639                self._make_macos_10_13_3_paramrequestlist(),
640                ('max_dhcp_size', 1500),
641                # HW type Ethernet (0x01)
642                ('client_id', b'\x01' + self.hwaddr),
643                ('lease_time', 7776000),
644                ('hostname', b'test12-macbookpro'),
645            ], opts_padding=bytes(6))
646        req.getlayer(BOOTP).secs = 2
647        resp = self._get_response(req)
648        self._assert_offer(resp)
649        self._assert_standard_offer_or_ack(resp)
650
651    @test_tracker_info(uuid="7acc796b-c4f1-46cc-8ffb-0a0efb05ae86")
652    def test_macos_10_13_3_request_selecting(self):
653        req = self._make_request(self.hwaddr, None, None,
654            options=[
655                self._make_macos_10_13_3_paramrequestlist(),
656                ('max_dhcp_size', 1500),
657                # HW type Ethernet (0x01)
658                ('client_id', b'\x01' + self.hwaddr),
659                ('requested_addr', NETADDR_PREFIX + '109'),
660                ('server_id', self.server_addr),
661                ('hostname', b'test12-macbookpro'),
662            ])
663        req.getlayer(BOOTP).secs = 5
664        resp = self._get_response(req)
665        self._assert_ack(resp)
666        self._assert_standard_offer_or_ack(resp)
667
668    # Note: macOS does not seem to do any rebinding (straight to discover)
669    @test_tracker_info(uuid="e8f0b60c-9ea3-4184-8426-151a395bff5b")
670    def test_macos_10_13_3_request_renewing(self):
671        req_ip = NETADDR_PREFIX + '109'
672        req = self._make_request(self.hwaddr, None, None,
673            ciaddr=req_ip, ip_src=req_ip, ip_dst=self.server_addr, options=[
674                self._make_macos_10_13_3_paramrequestlist(),
675                ('max_dhcp_size', 1500),
676                # HW type Ethernet (0x01)
677                ('client_id', b'\x01' + self.hwaddr),
678                ('lease_time', 7776000),
679                ('hostname', b'test12-macbookpro'),
680            ], opts_padding=bytes(6))
681        resp = self._get_response(req)
682        self._assert_ack(resp)
683        self._assert_standard_offer_or_ack(resp, renewing=True)
684
685    def _make_win10_paramrequestlist(self):
686        return make_paramrequestlist_opt([
687            'subnet_mask',
688            'router',
689            'name_server',
690            'domain',
691            31, # Perform Router Discover
692            33, # Static Route
693            'vendor_specific',
694            44, # NetBIOS over TCP/IP Name Server
695            46, # NetBIOS over TCP/IP Node Type
696            47, # NetBIOS over TCP/IP Scope
697            121, # Classless Static Route
698            249, # Private/Classless Static Route (MS)
699            252, # Private/Proxy autodiscovery
700            ])
701
702    @test_tracker_info(uuid="11b3db9c-4cd7-4088-99dc-881f25ce4e76")
703    def test_win10_discover(self):
704        req = self._make_discover(self.hwaddr, bcastbit=True,
705            options=[
706                # HW type Ethernet (0x01)
707                ('client_id', b'\x01' + self.hwaddr),
708                ('hostname', b'test120-w'),
709                ('vendor_class_id', b'MSFT 5.0'),
710                self._make_win10_paramrequestlist(),
711            ], opts_padding=bytes(11))
712        req.getlayer(BOOTP).secs = 2
713        resp = self._get_response(req)
714        self._assert_offer(resp)
715        self._assert_standard_offer_or_ack(resp, bcast=True)
716
717    @test_tracker_info(uuid="4fe04e7f-c643-4a19-b15c-cf417b2c9410")
718    def test_win10_request_selecting(self):
719        req = self._make_request(self.hwaddr, None, None, bcastbit=True,
720            options=[
721                ('max_dhcp_size', 1500),
722                # HW type Ethernet (0x01)
723                ('client_id', b'\x01' + self.hwaddr),
724                ('requested_addr', NETADDR_PREFIX + '109'),
725                ('server_id', self.server_addr),
726                ('hostname', b'test120-w'),
727                # Client Fully Qualified Domain Name
728                (81, b'\x00\x00\x00test120-w.ad.tst.example.com'),
729                ('vendor_class_id', b'MSFT 5.0'),
730                self._make_win10_paramrequestlist(),
731            ])
732        resp = self._get_response(req)
733        self._assert_ack(resp)
734        self._assert_standard_offer_or_ack(resp, bcast=True)
735
736    def _run_win10_request_renewing(self, bcast):
737        req_ip = NETADDR_PREFIX + '109'
738        req = self._make_request(self.hwaddr, None, None, bcastbit=bcast,
739            ciaddr=req_ip, ip_src=req_ip,
740            ip_dst=NETADDR_BROADCAST if bcast else self.server_addr,
741            options=[
742                ('max_dhcp_size', 1500),
743                # HW type Ethernet (0x01)
744                ('client_id', b'\x01' + self.hwaddr),
745                ('hostname', b'test120-w'),
746                # Client Fully Qualified Domain Name
747                (81, b'\x00\x00\x00test120-w.ad.tst.example.com'),
748                ('vendor_class_id', b'MSFT 5.0'),
749                self._make_win10_paramrequestlist(),
750            ])
751        resp = self._get_response(req)
752        self._assert_ack(resp)
753        self._assert_standard_offer_or_ack(resp, renewing=True, bcast=bcast)
754
755    @test_tracker_info(uuid="1b23c9c7-cc94-42d0-83a6-f1b2bc125fb9")
756    def test_win10_request_renewing(self):
757        self._run_win10_request_renewing(bcast=False)
758
759    @test_tracker_info(uuid="c846bd14-71fb-4492-a4d3-0aa5c2c35751")
760    def test_win10_request_rebinding(self):
761        self._run_win10_request_renewing(bcast=True)
762
763    def _make_debian_paramrequestlist(self):
764        return make_paramrequestlist_opt([
765            'subnet_mask',
766            'broadcast_address',
767            'router',
768            'name_server',
769            119, # Domain Search
770            'hostname',
771            101, # TCode
772            'domain', # NetBIOS over TCP/IP Name Server
773            'vendor_specific', # NetBIOS over TCP/IP Node Type
774            121, # Classless Static Route
775            249, # Private/Classless Static Route (MS)
776            33, # Static Route
777            252, # Private/Proxy autodiscovery
778            'NTP_server',
779            ])
780
781    @test_tracker_info(uuid="b0bb6ae7-07e6-4ecb-9a2f-db9c8146a3d5")
782    def test_debian_dhclient_4_3_5_discover(self):
783        req_ip = NETADDR_PREFIX + '109'
784        req = self._make_discover(self.hwaddr,
785            options=[
786                ('requested_addr', req_ip),
787                ('hostname', b'test12'),
788                self._make_debian_paramrequestlist(),
789            ], opts_padding=bytes(26))
790        resp = self._get_response(req)
791        self._assert_offer(resp)
792        # Don't test for hostname option: previous implementation would not
793        # set it in offer, which was not consistent with ack
794        self._assert_standard_offer_or_ack(resp, ignore_hostname=True)
795        asserts.assert_equal(req_ip, get_yiaddr(resp))
796
797    @test_tracker_info(uuid="d70bc043-84cb-4735-9123-c46c6d1ce5ac")
798    def test_debian_dhclient_4_3_5_request_selecting(self):
799        req = self._make_request(self.hwaddr, None, None,
800            options=[
801                ('server_id', self.server_addr),
802                ('requested_addr', NETADDR_PREFIX + '109'),
803                ('hostname', b'test12'),
804                self._make_debian_paramrequestlist(),
805            ], opts_padding=bytes(20))
806        resp = self._get_response(req)
807        self._assert_ack(resp)
808        self._assert_standard_offer_or_ack(resp, with_hostname=True)
809
810    def _run_debian_renewing(self, bcast):
811        req_ip = NETADDR_PREFIX + '109'
812        req = self._make_request(self.hwaddr, None, None,
813            ciaddr=req_ip, ip_src=req_ip,
814            ip_dst=NETADDR_BROADCAST if bcast else self.server_addr,
815            options=[
816                ('hostname', b'test12'),
817                self._make_debian_paramrequestlist(),
818            ],
819            opts_padding=bytes(32))
820        resp = self._get_response(req)
821        self._assert_ack(resp)
822        self._assert_standard_offer_or_ack(resp, renewing=True,
823            with_hostname=True)
824
825    @test_tracker_info(uuid="5e1e817d-9972-46ca-8d44-1e120bf1bafc")
826    def test_debian_dhclient_4_3_5_request_renewing(self):
827        self._run_debian_renewing(bcast=False)
828
829    @test_tracker_info(uuid="b179a36d-910e-4006-a79a-11cc561b69db")
830    def test_debian_dhclient_4_3_5_request_rebinding(self):
831        self._run_debian_renewing(bcast=True)
832
833    def _assert_standard_offer_or_ack(self, resp, renewing=False, bcast=False,
834            ignore_hostname=False, with_hostname=False):
835        # Responses to renew/rebind are always unicast to ciaddr even with
836        # broadcast flag set (RFC does not define this behavior, but this is
837        # more efficient and matches previous behavior)
838        if bcast and not renewing:
839            self._assert_broadcast(resp)
840            self._assert_broadcastbit(resp, isset=True)
841        else:
842            # Previous implementation would set the broadcast flag but send a
843            # unicast reply if (bcast and renewing). This was not consistent and
844            # new implementation consistently clears the flag. Not testing for
845            # broadcast flag value to maintain compatibility.
846            self._assert_unicast(resp)
847
848        bootp_resp = resp.getlayer(BOOTP)
849        asserts.assert_equal(0, bootp_resp.hops)
850        if renewing:
851            asserts.assert_true(bootp_resp.ciaddr.startswith(NETADDR_PREFIX),
852                'ciaddr does not start with expected prefix')
853        else:
854            asserts.assert_equal(INET4_ANY, bootp_resp.ciaddr)
855        asserts.assert_true(bootp_resp.yiaddr.startswith(NETADDR_PREFIX),
856            'yiaddr does not start with expected prefix')
857        asserts.assert_true(bootp_resp.siaddr.startswith(NETADDR_PREFIX),
858            'siaddr does not start with expected prefix')
859        asserts.assert_equal(INET4_ANY, bootp_resp.giaddr)
860
861        opt_labels = get_opt_labels(bootp_resp)
862        # FQDN option 81 is not supported in new behavior
863        opt_labels = [opt for opt in opt_labels if opt != 81]
864
865        # Expect exactly these options in this order
866        expected_opts = [
867            'message-type', 'server_id', 'lease_time', 'renewal_time',
868            'rebinding_time', 'subnet_mask', 'broadcast_address', 'router',
869            'name_server']
870        if ignore_hostname:
871            opt_labels = [opt for opt in opt_labels if opt != 'hostname']
872        elif with_hostname:
873            expected_opts.append('hostname')
874        expected_opts.extend(['vendor_specific', 'end'])
875        asserts.assert_equal(expected_opts, opt_labels)
876
877    def _request_address(self, hwaddr, bcast=True):
878        resp = self._get_response(self._make_discover(hwaddr))
879        self._assert_offer(resp)
880        addr = get_yiaddr(resp)
881        siaddr = getopt(resp, 'server_id')
882        resp = self._get_response(self._make_request(hwaddr, addr, siaddr,
883                ip_dst=(INET4_ANY if bcast else siaddr)))
884        self._assert_ack(resp)
885        return addr, siaddr, resp
886
887    def _get_response(self, packet):
888        resp = srp1(packet, iface=self.iface, timeout=10, verbose=False)
889        bootp_resp = (resp or None) and resp.getlayer(BOOTP)
890        if bootp_resp != None and get_mess_type(bootp_resp) == ACK:
891            # Note down corresponding release for this request
892            release = self._make_release(bootp_resp.chaddr, bootp_resp.yiaddr,
893                getopt(bootp_resp, 'server_id'))
894            self.cleanup_releases.append(release)
895        return resp
896
897    def _send(self, packet):
898        sendp(packet, iface=self.iface, verbose=False)
899
900    def _assert_type(self, packet, tp):
901        asserts.assert_false(None == packet, "No packet")
902        asserts.assert_equal(tp, get_mess_type(packet))
903
904    def _assert_ack(self, packet):
905        self._assert_type(packet, ACK)
906
907    def _assert_nak(self, packet):
908        self._assert_type(packet, NAK)
909
910    def _assert_offer(self, packet):
911        self._assert_type(packet, OFFER)
912
913    def _assert_broadcast(self, packet):
914        asserts.assert_false(None == packet, "No packet")
915        asserts.assert_equal(packet.getlayer(Ether).dst, BROADCAST_MAC)
916        asserts.assert_equal(packet.getlayer(IP).dst, NETADDR_BROADCAST)
917        self._assert_broadcastbit(packet)
918
919    def _assert_broadcastbit(self, packet, isset=True):
920        mask = 0x8000
921        flag = packet.getlayer(BOOTP).flags
922        asserts.assert_equal(flag & mask, mask if isset else 0)
923
924    def _assert_unicast(self, packet, ipAddr=None):
925        asserts.assert_false(None == packet, "No packet")
926        asserts.assert_false(packet.getlayer(Ether).dst == BROADCAST_MAC,
927            "Layer 2 packet destination address was broadcast")
928        if ipAddr:
929            asserts.assert_equal(packet.getlayer(IP).dst, ipAddr)
930
931    def _assert_relayed(self, packet, giaddr):
932        self._assert_unicast(packet, giaddr)
933        asserts.assert_equal(giaddr, packet.getlayer(BOOTP).giaddr,
934            'Relayed response has invalid giaddr field')
935
936    def _next_hwaddr(self):
937        addr = make_hwaddr(self.next_hwaddr_index)
938        self.next_hwaddr_index = self.next_hwaddr_index + 1
939        return addr
940
941    def _make_dhcp(self, src_hwaddr, options, ciaddr=INET4_ANY,
942            ip_src=INET4_ANY, ip_dst=NETADDR_BROADCAST, giaddr=INET4_ANY,
943            bcastbit=False):
944        broadcast = (ip_dst == NETADDR_BROADCAST)
945        ethernet = Ether(dst=(BROADCAST_MAC if broadcast else self.dut_hwaddr))
946        ip = IP(src=ip_src, dst=ip_dst)
947        udp = UDP(sport=68, dport=SERVER_PORT)
948        bootp = BOOTP(chaddr=src_hwaddr, ciaddr=ciaddr, giaddr=giaddr,
949            flags=(0x8000 if bcastbit else 0), xid=random.randrange(0, 2**32))
950        dhcp = DHCP(options=options)
951        return ethernet / ip / udp / bootp / dhcp
952
953    def _make_discover(self, src_hwaddr, options = [], giaddr=INET4_ANY,
954            bcastbit=False, opts_padding=None, ip_src=INET4_ANY):
955        opts = [('message-type','discover')]
956        opts.extend(options)
957        opts.append('end')
958        if (opts_padding):
959            opts.append(opts_padding)
960        return self._make_dhcp(src_hwaddr, options=opts, giaddr=giaddr,
961            ip_dst=NETADDR_BROADCAST, bcastbit=bcastbit, ip_src=ip_src)
962
963    def _make_request(self, src_hwaddr, reqaddr, siaddr, ciaddr=INET4_ANY,
964            ip_dst=None, ip_src=None, giaddr=INET4_ANY, bcastbit=False,
965            options=[], opts_padding=None):
966        if not ip_dst:
967            ip_dst = siaddr or INET4_ANY
968
969        if not ip_src and ip_dst == INET4_ANY:
970            ip_src = INET4_ANY
971        elif not ip_src:
972            ip_src = (giaddr if not isempty(giaddr)
973                else ciaddr if not isempty(ciaddr)
974                else reqaddr)
975        # Kernel will not receive unicast UDP packets with empty ip_src
976        asserts.assert_false(ip_dst != INET4_ANY and isempty(ip_src),
977            "Unicast ip_src cannot be zero")
978        opts = [('message-type', 'request')]
979        if options:
980            opts.extend(options)
981        else:
982            if siaddr:
983                opts.append(('server_id', siaddr))
984            if reqaddr:
985                opts.append(('requested_addr', reqaddr))
986        opts.append('end')
987        if opts_padding:
988            opts.append(opts_padding)
989        return self._make_dhcp(src_hwaddr, options=opts, ciaddr=ciaddr,
990            ip_src=ip_src, ip_dst=ip_dst, giaddr=giaddr, bcastbit=bcastbit)
991
992    def _make_release(self, src_hwaddr, addr, server_id):
993        opts = [('message-type', 'release'), ('server_id', server_id), 'end']
994        return self._make_dhcp(src_hwaddr, opts, ciaddr=addr, ip_src=addr,
995            ip_dst=server_id)
996
997def assert_bootp_response(resp, req):
998    bootp = resp.getlayer(BOOTP)
999    asserts.assert_equal(2, bootp.op, 'Invalid BOOTP op')
1000    asserts.assert_equal(1, bootp.htype, 'Invalid BOOTP htype')
1001    asserts.assert_equal(6, bootp.hlen, 'Invalid BOOTP hlen')
1002    asserts.assert_equal(0, bootp.hops, 'Invalid BOOTP hops')
1003    asserts.assert_equal(req.getlayer(BOOTP).xid, bootp.xid, 'Invalid XID')
1004    return bootp
1005
1006
1007def make_paramrequestlist_opt(params):
1008    param_indexes = [DHCPRevOptions[opt][0] if isinstance(opt, str) else opt
1009        for opt in params]
1010    return tuple(['param_req_list'] + [
1011        opt.to_bytes(1, byteorder='big') if isinstance(opt, int) else opt
1012        for opt in param_indexes])
1013
1014
1015def isempty(addr):
1016    return not addr or addr == INET4_ANY
1017
1018
1019def setopt(packet, optname, val):
1020    dhcp = packet.getlayer(DHCP)
1021    if optname in get_opt_labels(dhcp):
1022        dhcp.options = [(optname, val) if opt[0] == optname else opt
1023            for opt in dhcp.options]
1024    else:
1025        # Add before the last option (last option is "end")
1026        dhcp.options.insert(len(dhcp.options) - 1, (optname, val))
1027
1028
1029def getopt(packet, key):
1030    opts = [opt[1] for opt in packet.getlayer(DHCP).options if opt[0] == key]
1031    return opts[0] if opts else None
1032
1033
1034def removeopt(packet, key):
1035    dhcp = packet.getlayer(DHCP)
1036    dhcp.options = [opt for opt in dhcp.options if opt[0] != key]
1037
1038
1039def get_opt_labels(packet):
1040    dhcp_resp = packet.getlayer(DHCP)
1041    # end option is a single string, not a tuple.
1042    return [opt if isinstance(opt, str) else opt[0]
1043        for opt in dhcp_resp.options if opt != 'pad']
1044
1045
1046def get_yiaddr(packet):
1047    return packet.getlayer(BOOTP).yiaddr
1048
1049
1050def get_chaddr(packet):
1051    # We use Ethernet addresses. Ignore address padding
1052    return packet.getlayer(BOOTP).chaddr[:6]
1053
1054
1055def get_mess_type(packet):
1056    return getopt(packet, 'message-type')
1057
1058
1059def make_hwaddr(index):
1060    if index > 0xffff:
1061        raise ValueError("Address index out of range")
1062    return b'\x44\x85\x00\x00' + bytes([index >> 8, index & 0xff])
1063
1064
1065def format_hwaddr(addr):
1066    return  ':'.join(['%02x' % c for c in addr])
1067