diff options
author | Lorenzo Colitti <lorenzo@google.com> | 2022-12-21 13:28:22 +0900 |
---|---|---|
committer | Lorenzo Colitti <lorenzo@google.com> | 2022-12-22 15:02:20 +0900 |
commit | 6b15190f992fae60f5cad4041192e668bcb885a0 (patch) | |
tree | c2cf61aaf876f451c987c486e97d1d61d55e7bed | |
parent | bd91fbaaad4fccb7763021355196c1b87f5862fa (diff) |
Add tests for UDP IPv6 encap.
Test: pass on Pixel kernel
Change-Id: Ia132c52cdd8a8b0a6d8e0dd25b9bbc38a075e87a
-rw-r--r-- | net/test/xfrm_base.py | 23 | ||||
-rwxr-xr-x | net/test/xfrm_test.py | 80 |
2 files changed, 92 insertions, 11 deletions
diff --git a/net/test/xfrm_base.py b/net/test/xfrm_base.py index 06e8cf2..e5aadf3 100644 --- a/net/test/xfrm_base.py +++ b/net/test/xfrm_base.py @@ -143,6 +143,18 @@ def GetEspPacketLength(mode, version, udp_encap, payload, return payload_len +def GetEspTrailer(length, nexthdr): + # ESP padding per RFC 4303 section 2.4. + # For a null cipher with a block size of 1, padding is only necessary to + # ensure that the 1-byte Pad Length and Next Header fields are right aligned + # on a 4-byte boundary. + esplen = length + 2 # Packet length plus Pad Length and Next Header. + padlen = util.GetPadLength(4, esplen) + # The pad bytes are consecutive integers starting from 0x01. + padding = "".join((chr(i) for i in range(1, padlen + 1))).encode("utf-8") + return padding + struct.pack("BB", padlen, nexthdr) + + def EncryptPacketWithNull(packet, spi, seq, tun_addrs): """Apply null encryption to a packet. @@ -189,16 +201,7 @@ def EncryptPacketWithNull(packet, spi, seq, tun_addrs): inner_layer = udp_layer esp_nexthdr = IPPROTO_UDP - - # ESP padding per RFC 4303 section 2.4. - # For a null cipher with a block size of 1, padding is only necessary to - # ensure that the 1-byte Pad Length and Next Header fields are right aligned - # on a 4-byte boundary. - esplen = (len(inner_layer) + 2) # UDP length plus Pad Length and Next Header. - padlen = util.GetPadLength(4, esplen) - # The pad bytes are consecutive integers starting from 0x01. - padding = "".join((chr(i) for i in range(1, padlen + 1))).encode("utf-8") - trailer = padding + struct.pack("BB", padlen, esp_nexthdr) + trailer = GetEspTrailer(len(inner_layer), esp_nexthdr) # Assemble the packet. esp_packet.payload = scapy.Raw(inner_layer) diff --git a/net/test/xfrm_test.py b/net/test/xfrm_test.py index 5607203..8672762 100755 --- a/net/test/xfrm_test.py +++ b/net/test/xfrm_test.py @@ -310,7 +310,7 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest): start_integrity_failures = sainfo.stats.integrity_failed # Now play back the valid packet and check that we receive it. - ip = scapy.IPv6 if version == 6 else scapy.IP + ip = {4: scapy.IP, 6: scapy.IPv6}[version] incoming = (ip(src=remoteaddr, dst=myaddr) / scapy.UDP(sport=4500, dport=encap_port) / payload) incoming = ip(bytes(incoming)) @@ -414,6 +414,84 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest): encap_sock.close() s.close() + def _CheckUDPEncapRecv(self, version, mode): + netid, myaddr, remoteaddr, encap_sock, encap_port, s = \ + self._SetupUdpEncapSockets(version) + + # Create inbound and outbound SAs that specify UDP encapsulation. + reqid = 123 + encaptmpl = xfrm.XfrmEncapTmpl((xfrm.UDP_ENCAP_ESPINUDP, htons(encap_port), + htons(4500), 16 * b"\x00")) + self.xfrm.AddSaInfo(remoteaddr, myaddr, TEST_SPI, mode, reqid, + xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, + encaptmpl, None, None) + + sainfo = self.xfrm.FindSaInfo(TEST_SPI) + self.assertEqual(0, sainfo.curlft.packets) + self.assertEqual(0, sainfo.curlft.bytes) + self.assertEqual(0, sainfo.stats.integrity_failed) + + IpType = {4: scapy.IP, 6: scapy.IPv6}[version] + if mode == xfrm.XFRM_MODE_TRANSPORT: + # Due to a bug in the IPv6 UDP encap code, there must be at least 32 + # bytes after the ESP header or the packet will be dropped. + # 8 (UDP header) + 18 (payload) + 2 (ESP trailer) = 28, dropped + # 8 (UDP header) + 19 (payload) + 4 (ESP trailer) = 32, received + # There is a similar bug in IPv4 encap, but the minimum is only 12 bytes, + # which is much less likely to occur. This doesn't affect tunnel mode + # because IP headers are always at least 20 bytes long. + data = 19 * b"a" + datalen = len(data) + data += xfrm_base.GetEspTrailer(len(data), IPPROTO_UDP) + self.assertEqual(32, len(data) + 8) + input_pkt = (IpType(src=remoteaddr, dst=myaddr) / + scapy.UDP(sport=4500, dport=encap_port) / + scapy.ESP(spi=TEST_SPI, seq=1) / + scapy.UDP(sport=443, dport=32123) / data) + else: + # TODO: test IPv4 in IPv6 encap and vice versa. + data = b"" # Empty UDP payload + datalen = len(data) + {4: 20, 6: 40}[version] + data += xfrm_base.GetEspTrailer(len(data), IPPROTO_UDP) + input_pkt = (IpType(src=remoteaddr, dst=myaddr) / + scapy.UDP(sport=4500, dport=encap_port) / + scapy.ESP(spi=TEST_SPI, seq=1) / + IpType(src=remoteaddr, dst=myaddr) / + scapy.UDP(sport=443, dport=32123) / data) + + # input_pkt.show2() + self.ReceivePacketOn(netid, input_pkt) + + sainfo = self.xfrm.FindSaInfo(TEST_SPI) + self.assertEqual(1, sainfo.curlft.packets) + self.assertEqual(datalen + 8, sainfo.curlft.bytes) + self.assertEqual(0, sainfo.stats.integrity_failed) + + # Uncomment for debugging. + # subprocess.call("ip -s xfrm state".split()) + + encap_sock.close() + s.close() + + def testIPv4UDPEncapRecvTransport(self): + self._CheckUDPEncapRecv(4, xfrm.XFRM_MODE_TRANSPORT) + + def testIPv4UDPEncapRecvTunnel(self): + self._CheckUDPEncapRecv(4, xfrm.XFRM_MODE_TUNNEL) + + # IPv6 UDP encap is broken between: + # 4db4075f92af ("esp6: fix check on ipv6_skip_exthdr's return value") and + # 5f9c55c8066b ("ipv6: check return value of ipv6_skip_exthdr") + @unittest.skipUnless(net_test.KernelAtLeast([(5, 10, 108), (5, 15, 31)]), + reason="Unsupported or broken on current kernel") + def testIPv6UDPEncapRecvTransport(self): + self._CheckUDPEncapRecv(6, xfrm.XFRM_MODE_TRANSPORT) + + @unittest.skipUnless(net_test.KernelAtLeast([(5, 10, 108), (5, 15, 31)]), + reason="Unsupported or broken on current kernel") + def testIPv6UDPEncapRecvTunnel(self): + self._CheckUDPEncapRecv(6, xfrm.XFRM_MODE_TUNNEL) + def testAllocSpecificSpi(self): spi = 0xABCD new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi) |