summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLorenzo Colitti <lorenzo@google.com>2022-12-21 13:28:22 +0900
committerLorenzo Colitti <lorenzo@google.com>2022-12-22 15:02:20 +0900
commit6b15190f992fae60f5cad4041192e668bcb885a0 (patch)
treec2cf61aaf876f451c987c486e97d1d61d55e7bed
parentbd91fbaaad4fccb7763021355196c1b87f5862fa (diff)
Add tests for UDP IPv6 encap.
Test: pass on Pixel kernel Change-Id: Ia132c52cdd8a8b0a6d8e0dd25b9bbc38a075e87a
-rw-r--r--net/test/xfrm_base.py23
-rwxr-xr-xnet/test/xfrm_test.py80
2 files changed, 92 insertions, 11 deletions
diff --git a/net/test/xfrm_base.py b/net/test/xfrm_base.py
index 06e8cf2..e5aadf3 100644
--- a/net/test/xfrm_base.py
+++ b/net/test/xfrm_base.py
@@ -143,6 +143,18 @@ def GetEspPacketLength(mode, version, udp_encap, payload,
return payload_len
+def GetEspTrailer(length, nexthdr):
+ # ESP padding per RFC 4303 section 2.4.
+ # For a null cipher with a block size of 1, padding is only necessary to
+ # ensure that the 1-byte Pad Length and Next Header fields are right aligned
+ # on a 4-byte boundary.
+ esplen = length + 2 # Packet length plus Pad Length and Next Header.
+ padlen = util.GetPadLength(4, esplen)
+ # The pad bytes are consecutive integers starting from 0x01.
+ padding = "".join((chr(i) for i in range(1, padlen + 1))).encode("utf-8")
+ return padding + struct.pack("BB", padlen, nexthdr)
+
+
def EncryptPacketWithNull(packet, spi, seq, tun_addrs):
"""Apply null encryption to a packet.
@@ -189,16 +201,7 @@ def EncryptPacketWithNull(packet, spi, seq, tun_addrs):
inner_layer = udp_layer
esp_nexthdr = IPPROTO_UDP
-
- # ESP padding per RFC 4303 section 2.4.
- # For a null cipher with a block size of 1, padding is only necessary to
- # ensure that the 1-byte Pad Length and Next Header fields are right aligned
- # on a 4-byte boundary.
- esplen = (len(inner_layer) + 2) # UDP length plus Pad Length and Next Header.
- padlen = util.GetPadLength(4, esplen)
- # The pad bytes are consecutive integers starting from 0x01.
- padding = "".join((chr(i) for i in range(1, padlen + 1))).encode("utf-8")
- trailer = padding + struct.pack("BB", padlen, esp_nexthdr)
+ trailer = GetEspTrailer(len(inner_layer), esp_nexthdr)
# Assemble the packet.
esp_packet.payload = scapy.Raw(inner_layer)
diff --git a/net/test/xfrm_test.py b/net/test/xfrm_test.py
index 5607203..8672762 100755
--- a/net/test/xfrm_test.py
+++ b/net/test/xfrm_test.py
@@ -310,7 +310,7 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
start_integrity_failures = sainfo.stats.integrity_failed
# Now play back the valid packet and check that we receive it.
- ip = scapy.IPv6 if version == 6 else scapy.IP
+ ip = {4: scapy.IP, 6: scapy.IPv6}[version]
incoming = (ip(src=remoteaddr, dst=myaddr) /
scapy.UDP(sport=4500, dport=encap_port) / payload)
incoming = ip(bytes(incoming))
@@ -414,6 +414,84 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
encap_sock.close()
s.close()
+ def _CheckUDPEncapRecv(self, version, mode):
+ netid, myaddr, remoteaddr, encap_sock, encap_port, s = \
+ self._SetupUdpEncapSockets(version)
+
+ # Create inbound and outbound SAs that specify UDP encapsulation.
+ reqid = 123
+ encaptmpl = xfrm.XfrmEncapTmpl((xfrm.UDP_ENCAP_ESPINUDP, htons(encap_port),
+ htons(4500), 16 * b"\x00"))
+ self.xfrm.AddSaInfo(remoteaddr, myaddr, TEST_SPI, mode, reqid,
+ xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None,
+ encaptmpl, None, None)
+
+ sainfo = self.xfrm.FindSaInfo(TEST_SPI)
+ self.assertEqual(0, sainfo.curlft.packets)
+ self.assertEqual(0, sainfo.curlft.bytes)
+ self.assertEqual(0, sainfo.stats.integrity_failed)
+
+ IpType = {4: scapy.IP, 6: scapy.IPv6}[version]
+ if mode == xfrm.XFRM_MODE_TRANSPORT:
+ # Due to a bug in the IPv6 UDP encap code, there must be at least 32
+ # bytes after the ESP header or the packet will be dropped.
+ # 8 (UDP header) + 18 (payload) + 2 (ESP trailer) = 28, dropped
+ # 8 (UDP header) + 19 (payload) + 4 (ESP trailer) = 32, received
+ # There is a similar bug in IPv4 encap, but the minimum is only 12 bytes,
+ # which is much less likely to occur. This doesn't affect tunnel mode
+ # because IP headers are always at least 20 bytes long.
+ data = 19 * b"a"
+ datalen = len(data)
+ data += xfrm_base.GetEspTrailer(len(data), IPPROTO_UDP)
+ self.assertEqual(32, len(data) + 8)
+ input_pkt = (IpType(src=remoteaddr, dst=myaddr) /
+ scapy.UDP(sport=4500, dport=encap_port) /
+ scapy.ESP(spi=TEST_SPI, seq=1) /
+ scapy.UDP(sport=443, dport=32123) / data)
+ else:
+ # TODO: test IPv4 in IPv6 encap and vice versa.
+ data = b"" # Empty UDP payload
+ datalen = len(data) + {4: 20, 6: 40}[version]
+ data += xfrm_base.GetEspTrailer(len(data), IPPROTO_UDP)
+ input_pkt = (IpType(src=remoteaddr, dst=myaddr) /
+ scapy.UDP(sport=4500, dport=encap_port) /
+ scapy.ESP(spi=TEST_SPI, seq=1) /
+ IpType(src=remoteaddr, dst=myaddr) /
+ scapy.UDP(sport=443, dport=32123) / data)
+
+ # input_pkt.show2()
+ self.ReceivePacketOn(netid, input_pkt)
+
+ sainfo = self.xfrm.FindSaInfo(TEST_SPI)
+ self.assertEqual(1, sainfo.curlft.packets)
+ self.assertEqual(datalen + 8, sainfo.curlft.bytes)
+ self.assertEqual(0, sainfo.stats.integrity_failed)
+
+ # Uncomment for debugging.
+ # subprocess.call("ip -s xfrm state".split())
+
+ encap_sock.close()
+ s.close()
+
+ def testIPv4UDPEncapRecvTransport(self):
+ self._CheckUDPEncapRecv(4, xfrm.XFRM_MODE_TRANSPORT)
+
+ def testIPv4UDPEncapRecvTunnel(self):
+ self._CheckUDPEncapRecv(4, xfrm.XFRM_MODE_TUNNEL)
+
+ # IPv6 UDP encap is broken between:
+ # 4db4075f92af ("esp6: fix check on ipv6_skip_exthdr's return value") and
+ # 5f9c55c8066b ("ipv6: check return value of ipv6_skip_exthdr")
+ @unittest.skipUnless(net_test.KernelAtLeast([(5, 10, 108), (5, 15, 31)]),
+ reason="Unsupported or broken on current kernel")
+ def testIPv6UDPEncapRecvTransport(self):
+ self._CheckUDPEncapRecv(6, xfrm.XFRM_MODE_TRANSPORT)
+
+ @unittest.skipUnless(net_test.KernelAtLeast([(5, 10, 108), (5, 15, 31)]),
+ reason="Unsupported or broken on current kernel")
+ def testIPv6UDPEncapRecvTunnel(self):
+ self._CheckUDPEncapRecv(6, xfrm.XFRM_MODE_TUNNEL)
+
def testAllocSpecificSpi(self):
spi = 0xABCD
new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi)