diff options
author | Vlad Storona <vstorona@cybervisiontech.com> | 2018-06-21 16:41:17 +0300 |
---|---|---|
committer | Arina Ielchiieva <arina.yelchiyeva@gmail.com> | 2018-08-21 20:08:20 +0300 |
commit | 2dfd0dab41864f22c7ed924e17c6a8cf9b2f54ad (patch) | |
tree | 5a127b2794023cd82412e56c93c858419cb996e1 /exec | |
parent | 0f9d19c4c90a148573cfed908dd03ad771306bc1 (diff) |
DRILL-6179: Added pcapng-format support
Diffstat (limited to 'exec')
18 files changed, 1351 insertions, 8 deletions
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index f175c654c..f4068952e 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -534,6 +534,11 @@ <artifactId>metadata-extractor</artifactId> <version>2.11.0</version> </dependency> + <dependency> + <groupId>fr.bmartel</groupId> + <artifactId>pcapngdecoder</artifactId> + <version>1.2</version> + </dependency> </dependencies> <profiles> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java index 9cc98de9c..a0a07a99d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java @@ -42,18 +42,18 @@ public class Packet { private long timestamp; private int originalLength; - private byte[] raw; + protected byte[] raw; // index into the raw data where the current ethernet packet starts private int etherOffset; // index into the raw data where the current IP packet starts. Should be just after etherOffset - private int ipOffset; + protected int ipOffset; private int packetLength; - private int etherProtocol; - private int protocol; + protected int etherProtocol; + protected int protocol; - private boolean isRoutingV6; + protected boolean isRoutingV6; @SuppressWarnings("WeakerAccess") public boolean readPcap(final InputStream in, final boolean byteOrder, final int maxLength) throws IOException { @@ -379,7 +379,7 @@ public class Packet { return (getByte(raw, ipOffset) & 0xf) * 4; } - private int ipVersion() { + protected int ipVersion() { return getByte(raw, ipOffset) >>> 4; } @@ -409,12 +409,12 @@ public class Packet { // everything is decoded lazily } - private int processIpV4Packet() { + protected int processIpV4Packet() { validateIpV4Packet(); return getByte(raw, ipOffset + 9); } - private int processIpV6Packet() { + protected int processIpV6Packet() { Preconditions.checkState(ipVersion() == 6, "Should have seen IP version 6, got %d", ipVersion()); int headerLength = 40; int nextHeader = raw[ipOffset + 6] & 0xff; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java new file mode 100644 index 000000000..7ff875acf --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.logical.FormatPluginConfig; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +@JsonTypeName("pcapng") +public class PcapngFormatConfig implements FormatPluginConfig { + + public List<String> extensions = Collections.singletonList("pcapng"); + + public List<String> getExtensions() { + return extensions; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PcapngFormatConfig that = (PcapngFormatConfig) o; + return Objects.equals(extensions, that.extensions); + } + + @Override + public int hashCode() { + return Objects.hash(extensions); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java new file mode 100644 index 000000000..832c0ec3b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.RecordWriter; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; +import org.apache.drill.exec.store.dfs.easy.EasyWriter; +import org.apache.drill.exec.store.dfs.easy.FileWork; +import org.apache.hadoop.conf.Configuration; + +import java.util.List; + +public class PcapngFormatPlugin extends EasyFormatPlugin<PcapngFormatConfig> { + + public static final String DEFAULT_NAME = "pcapng"; + + public PcapngFormatPlugin(String name, DrillbitContext context, Configuration fsConf, + StoragePluginConfig storagePluginConfig) { + this(name, context, fsConf, storagePluginConfig, new PcapngFormatConfig()); + } + + public PcapngFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, PcapngFormatConfig formatPluginConfig) { + super(name, context, fsConf, config, formatPluginConfig, true, + false, true, false, + formatPluginConfig.getExtensions(), DEFAULT_NAME); + } + + @Override + public boolean supportsPushDown() { + return true; + } + + @Override + public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, + FileWork fileWork, List<SchemaPath> columns, + String userName) { + return new PcapngRecordReader(fileWork.getPath(), dfs, columns); + } + + @Override + public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) { + throw new UnsupportedOperationException("unimplemented"); + } + + @Override + public int getReaderOperatorType() { + return UserBitShared.CoreOperatorType.PCAPNG_SUB_SCAN_VALUE; + } + + @Override + public int getWriterOperatorType() { + throw new UnsupportedOperationException("unimplemented"); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java new file mode 100644 index 000000000..b1c5f2427 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng; + +import fr.bmartel.pcapdecoder.PcapDecoder; +import fr.bmartel.pcapdecoder.structure.types.IPcapngType; +import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock; +import org.apache.commons.io.IOUtils; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.pcapng.schema.Column; +import org.apache.drill.exec.store.pcapng.schema.DummyArrayImpl; +import org.apache.drill.exec.store.pcapng.schema.DummyImpl; +import org.apache.drill.exec.store.pcapng.schema.Schema; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.function.BiConsumer; + +public class PcapngRecordReader extends AbstractRecordReader { + private static final Logger logger = LoggerFactory.getLogger(PcapngRecordReader.class); + + // batch size should not exceed max allowed record count + private static final int BATCH_SIZE = 40_000; + + private final Path pathToFile; + private OutputMutator output; + private List<ProjectedColumnInfo> projectedCols; + private FileSystem fs; + private FSDataInputStream in; + private List<SchemaPath> columns; + + private Iterator<IPcapngType> it; + + public PcapngRecordReader(final String pathToFile, + final FileSystem fileSystem, + final List<SchemaPath> columns) { + this.fs = fileSystem; + this.pathToFile = fs.makeQualified(new Path(pathToFile)); + this.columns = columns; + setColumns(columns); + } + + @Override + public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException { + try { + + this.output = output; + this.in = fs.open(pathToFile); + PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in)); + decoder.decode(); + this.it = decoder.getSectionList().iterator(); + setupProjection(); + } catch (IOException io) { + throw UserException.dataReadError(io) + .addContext("File name:", pathToFile.toUri().getPath()) + .build(logger); + } + } + + @Override + public int next() { + if (isSkipQuery()) { + return iterateOverBlocks((block, counter) -> { + }); + } else { + return iterateOverBlocks((block, counter) -> putToTable((IEnhancedPacketBLock) block, counter)); + } + } + + private void putToTable(IEnhancedPacketBLock bLock, Integer counter) { + for (ProjectedColumnInfo pci : projectedCols) { + pci.getColumn().process(bLock, pci.getVv(), counter); + } + } + + @Override + public void close() throws Exception { + if (in != null) { + in.close(); + in = null; + } + } + + private void setupProjection() { + if (isSkipQuery()) { + projectedCols = projectNone(); + } else if (isStarQuery()) { + projectedCols = projectAllCols(Schema.getColumnsNames()); + } else { + projectedCols = projectCols(columns); + } + } + + private List<ProjectedColumnInfo> projectNone() { + List<ProjectedColumnInfo> pciBuilder = new ArrayList<>(); + pciBuilder.add(makeColumn("dummy", new DummyImpl())); + return Collections.unmodifiableList(pciBuilder); + } + + private List<ProjectedColumnInfo> projectAllCols(final Set<String> columns) { + List<ProjectedColumnInfo> pciBuilder = new ArrayList<>(); + for (String colName : columns) { + pciBuilder.add(makeColumn(colName, Schema.getColumns().get(colName))); + } + return Collections.unmodifiableList(pciBuilder); + } + + private List<ProjectedColumnInfo> projectCols(final List<SchemaPath> columns) { + List<ProjectedColumnInfo> pciBuilder = new ArrayList<>(); + for (SchemaPath schemaPath : columns) { + String projectedName = schemaPath.rootName(); + if (schemaPath.isArray()) { + pciBuilder.add(makeColumn(projectedName, new DummyArrayImpl())); + } else if (Schema.getColumns().containsKey(projectedName.toLowerCase())) { + pciBuilder.add(makeColumn(projectedName, + Schema.getColumns().get(projectedName.toLowerCase()))); + } else { + pciBuilder.add(makeColumn(projectedName, new DummyImpl())); + } + } + return Collections.unmodifiableList(pciBuilder); + } + + private ProjectedColumnInfo makeColumn(final String colName, final Column column) { + MaterializedField field = MaterializedField.create(colName, column.getMinorType()); + ValueVector vector = getValueVector(field, output); + return new ProjectedColumnInfo(vector, column, colName); + } + + private ValueVector getValueVector(final MaterializedField field, final OutputMutator output) { + try { + TypeProtos.MajorType majorType = field.getType(); + final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass( + majorType.getMinorType(), majorType.getMode()); + + return output.addField(field, clazz); + } catch (SchemaChangeException sce) { + throw UserException.internalError(sce) + .addContext("The addition of this field is incompatible with this OutputMutator's capabilities") + .build(logger); + } + } + + private Integer iterateOverBlocks(BiConsumer<IPcapngType, Integer> consumer) { + int counter = 0; + while (it.hasNext() && counter < BATCH_SIZE) { + IPcapngType block = it.next(); + if (block instanceof IEnhancedPacketBLock) { + consumer.accept(block, counter); + counter++; + } + } + return counter; + } + + private static class ProjectedColumnInfo { + + private ValueVector vv; + private Column colDef; + private String columnName; + + ProjectedColumnInfo(ValueVector vv, Column colDef, String columnName) { + this.vv = vv; + this.colDef = colDef; + this.columnName = columnName; + } + + public ValueVector getVv() { + return vv; + } + + Column getColumn() { + return colDef; + } + + public String getColumnName() { + return columnName; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java new file mode 100644 index 000000000..ea5d83104 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.decoder; + +import org.apache.drill.exec.store.pcap.decoder.Packet; +import org.apache.drill.exec.store.pcap.decoder.PacketConstants; + +import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getByte; +import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getShort; + +public class PacketDecoder extends Packet { + + @SuppressWarnings("WeakerAccess") + public boolean readPcapng(final byte[] raw) { + this.raw = raw; + return decodeEtherPacket(); + } + + private boolean decodeEtherPacket() { + etherProtocol = getShort(raw, PacketConstants.PACKET_PROTOCOL_OFFSET); + ipOffset = PacketConstants.IP_OFFSET; + if (isIpV4Packet()) { + protocol = processIpV4Packet(); + return true; + } else if (isIpV6Packet()) { + int tmp = processIpV6Packet(); + if (tmp != -1) { + protocol = tmp; + } + return true; + } else if (isPPPoV6Packet()) { + protocol = getByte(raw, 48); + return true; + } + return false; + } + + @Override + protected int processIpV6Packet() { + try { + return super.processIpV6Packet(); + } catch (IllegalStateException ise) { + return -1; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java new file mode 100644 index 000000000..dafeaa399 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * For comments on realization of this format plugin look at : + * + * @see <a href="https://issues.apache.org/jira/browse/DRILL-6179"> Jira</a> + */ +package org.apache.drill.exec.store.pcapng; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java new file mode 100644 index 000000000..109b7ddc9 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.schema; + +import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.vector.ValueVector; + +public interface Column { + TypeProtos.MajorType getMinorType(); + + void process(IEnhancedPacketBLock block, ValueVector vv, int count); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java new file mode 100644 index 000000000..2023d195e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.schema; + +import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.vector.ValueVector; + +public class DummyArrayImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.repeated(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java new file mode 100644 index 000000000..a8c26a06a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.schema; + +import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.vector.ValueVector; + +public class DummyImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java new file mode 100644 index 000000000..a9738bdbe --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.schema; + +import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.store.pcapng.decoder.PacketDecoder; +import org.apache.drill.exec.vector.ValueVector; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII; +import static org.apache.drill.exec.store.pcapng.schema.Util.setNullableLongColumnValue; + +public class Schema { + + private final static Map<String, Column> columns = new HashMap<>(); + + static { + columns.put("timestamp", new TimestampImpl()); + columns.put("packet_length", new PacketLenImpl()); + columns.put("type", new TypeImpl()); + columns.put("src_ip", new SrcIpImpl()); + columns.put("dst_ip", new DstIpImpl()); + columns.put("src_port", new SrcPortImpl()); + columns.put("dst_port", new DstPortImpl()); + columns.put("src_mac_address", new SrcMacImpl()); + columns.put("dst_mac_address", new DstMacImpl()); + columns.put("tcp_session", new TcpSessionImpl()); + columns.put("tcp_ack", new TcpAckImpl()); + columns.put("tcp_flags", new TcpFlags()); + columns.put("tcp_flags_ns", new TcpFlagsNsImpl()); + columns.put("tcp_flags_cwr", new TcpFlagsCwrImpl()); + columns.put("tcp_flags_ece", new TcpFlagsEceImpl()); + columns.put("tcp_flags_ece_ecn_capable", new TcpFlagsEceEcnCapableImpl()); + columns.put("tcp_flags_ece_congestion_experienced", new TcpFlagsEceCongestionExperiencedImpl()); + columns.put("tcp_flags_urg", new TcpFlagsUrgIml()); + columns.put("tcp_flags_ack", new TcpFlagsAckImpl()); + columns.put("tcp_flags_psh", new TcpFlagsPshImpl()); + columns.put("tcp_flags_rst", new TcpFlagsRstImpl()); + columns.put("tcp_flags_syn", new TcpFlagsSynImpl()); + columns.put("tcp_flags_fin", new TcpFlagsFinImpl()); + columns.put("tcp_parsed_flags", new TcpParsedFlags()); + columns.put("packet_data", new PacketDataImpl()); + } + + public static Map<String, Column> getColumns() { + return columns; + } + + public static Set<String> getColumnsNames() { + return columns.keySet(); + } + + static class TimestampImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.required(TypeProtos.MinorType.TIMESTAMP); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + Util.setTimestampColumnValue(block.getTimeStamp(), vv, count); + } + } + + static class PacketLenImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.required(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + Util.setIntegerColumnValue(block.getPacketLength(), vv, count); + } + } + + static class TypeImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getPacketType(), vv, count); + } + } + } + + static class SrcIpImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getSrc_ip().getHostAddress(), vv, count); + } + } + } + + static class DstIpImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getDst_ip().getHostAddress(), vv, count); + } + } + } + + static class SrcPortImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableIntegerColumnValue(packet.getSrc_port(), vv, count); + } + } + } + + static class DstPortImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableIntegerColumnValue(packet.getDst_port(), vv, count); + } + } + } + + static class SrcMacImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getEthernetSource(), vv, count); + } + } + } + + static class DstMacImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getEthernetDestination(), vv, count); + } + } + } + + static class TcpSessionImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.BIGINT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + setNullableLongColumnValue(packet.getSessionHash(), vv, count); + } + } + } + + static class TcpAckImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableIntegerColumnValue(packet.getAckNumber(), vv, count); + } + } + } + + static class TcpFlags implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableIntegerColumnValue(packet.getFlags(), vv, count); + } + } + } + + static class TcpFlagsNsImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x100) != 0, vv, count); + } + } + } + + static class TcpFlagsCwrImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x80) != 0, vv, count); + } + } + } + + static class TcpFlagsEceImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x40) != 0, vv, count); + } + } + } + + static class TcpFlagsEceEcnCapableImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x42) == 0x42, vv, count); + } + } + } + + static class TcpFlagsEceCongestionExperiencedImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x42) == 0x40, vv, count); + } + } + } + + static class TcpFlagsUrgIml implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x20) != 0, vv, count); + } + } + } + + static class TcpFlagsAckImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x10) != 0, vv, count); + } + } + } + + static class TcpFlagsPshImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x8) != 0, vv, count); + } + } + } + + static class TcpFlagsRstImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x4) != 0, vv, count); + } + } + } + + static class TcpFlagsSynImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x2) != 0, vv, count); + } + } + } + + static class TcpFlagsFinImpl implements Column { + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.INT); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableBooleanColumnValue((packet.getFlags() & 0x1) != 0, vv, count); + } + } + } + + static class TcpParsedFlags implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(packet.getParsedFlags(), vv, count); + } + } + } + + static class PacketDataImpl implements Column { + @Override + public TypeProtos.MajorType getMinorType() { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + + @Override + public void process(IEnhancedPacketBLock block, ValueVector vv, int count) { + PacketDecoder packet = new PacketDecoder(); + if (packet.readPcapng(block.getPacketData())) { + Util.setNullableStringColumnValue(parseBytesToASCII(block.getPacketData()), vv, count); + } + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java new file mode 100644 index 000000000..06e8e6ac6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng.schema; + +import org.apache.drill.exec.vector.IntVector; +import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.TimeStampVector; +import org.apache.drill.exec.vector.ValueVector; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class Util { + static void setNullableIntegerColumnValue(final int data, final ValueVector vv, final int count) { + ((NullableIntVector.Mutator) vv.getMutator()) + .setSafe(count, data); + } + + static void setIntegerColumnValue(final int data, final ValueVector vv, final int count) { + ((IntVector.Mutator) vv.getMutator()) + .setSafe(count, data); + } + + static void setTimestampColumnValue(final long data, final ValueVector vv, final int count) { + ((TimeStampVector.Mutator) vv.getMutator()) + .setSafe(count, data / 1000); + } + + static void setNullableLongColumnValue(final long data, final ValueVector vv, final int count) { + ((NullableBigIntVector.Mutator) vv.getMutator()) + .setSafe(count, data); + } + + static void setNullableStringColumnValue(final String data, final ValueVector vv, final int count) { + ((NullableVarCharVector.Mutator) vv.getMutator()) + .setSafe(count, data.getBytes(UTF_8), 0, data.length()); + } + + static void setNullableBooleanColumnValue(final boolean data, final ValueVector vv, final int count) { + ((NullableIntVector.Mutator) vv.getMutator()) + .setSafe(count, data ? 1 : 0); + } +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json index 42cddd865..46f162052 100644 --- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json +++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json @@ -46,6 +46,9 @@ "pcap" : { type: "pcap" }, + "pcapng" : { + type: "pcapng" + }, "avro" : { type: "avro" }, diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java index f51fe4c89..e53c394ec 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java @@ -59,6 +59,7 @@ public class TestFormatPluginOptionExtractor { case "json": case "sequencefile": case "pcap": + case "pcapng": case "avro": assertEquals(d.typeName, "(type: String)", d.presentParams()); break; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java new file mode 100644 index 000000000..5dcffa95c --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng; + +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.metadata.TupleSchema; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetBuilder; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Paths; + +public class TestPcapngHeaders extends ClusterTest { + @BeforeClass + public static void setupTestFiles() throws Exception { + startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1)); + dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng")); + } + + @Test + public void testValidHeadersForStarQuery() throws IOException { + String query = "select * from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("tcp_flags_ece_ecn_capable", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_ece_congestion_experienced", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_psh", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("type", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("tcp_flags_cwr", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("dst_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("src_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("tcp_flags_fin", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_ece", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_ack", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("src_mac_address", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("tcp_flags_syn", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_rst", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP))); + expectedSchema.add(MaterializedField.create("tcp_session", Types.optional(TypeProtos.MinorType.BIGINT))); + expectedSchema.add(MaterializedField.create("packet_data", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("tcp_parsed_flags", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("tcp_flags_ns", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("src_port", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("packet_length", Types.required(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_flags_urg", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_ack", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("dst_port", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("dst_mac_address", Types.optional(TypeProtos.MinorType.VARCHAR))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } + + @Test + public void testValidHeadersForProjection() throws IOException { + String query = "select sRc_ip, dst_IP, dst_mAc_address, src_Port, tcp_session, `Timestamp` from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("sRc_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("dst_IP", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("dst_mAc_address", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("src_Port", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_session", Types.optional(TypeProtos.MinorType.BIGINT))); + expectedSchema.add(MaterializedField.create("Timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } + + @Test + public void testValidHeadersForMissColumns() throws IOException { + String query = "select `timestamp`, `name`, `color` from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP))); + expectedSchema.add(MaterializedField.create("name", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("color", Types.optional(TypeProtos.MinorType.INT))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } + + @Test + public void testMixColumns() throws IOException { + String query = "select src_ip, dst_ip, dst_mac_address, src_port, tcp_session, `timestamp` from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("sRc_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("dst_IP", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("dst_mAc_address", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("src_Port", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_session", Types.optional(TypeProtos.MinorType.BIGINT))); + expectedSchema.add(MaterializedField.create("Timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + + String queryWithDiffOrder = "select `timestamp`, src_ip, dst_ip, src_port, tcp_session, dst_mac_address from dfs.`store/pcapng/sniff.pcapng`"; + actual = client.queryBuilder().sql(queryWithDiffOrder).rowSet(); + + expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP))); + expectedSchema.add(MaterializedField.create("src_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("dst_ip", Types.optional(TypeProtos.MinorType.VARCHAR))); + expectedSchema.add(MaterializedField.create("src_port", Types.optional(TypeProtos.MinorType.INT))); + expectedSchema.add(MaterializedField.create("tcp_session", Types.optional(TypeProtos.MinorType.BIGINT))); + expectedSchema.add(MaterializedField.create("dst_mac_address", Types.optional(TypeProtos.MinorType.VARCHAR))); + + expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } + + @Test + public void testValidHeaderForArrayColumns() throws IOException { + // query with non-existent field + String query = "select arr[3] as arr from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("arr", Types.optional(TypeProtos.MinorType.INT))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + + // query with an existent field which doesn't support arrays + query = "select type[45] as arr from dfs.`store/pcapng/sniff.pcapng`"; + + expectedSchema = new TupleSchema(); + actual = client.queryBuilder().sql(query).rowSet(); + + expectedSchema.add(MaterializedField.create("arr", Types.optional(TypeProtos.MinorType.INT))); + + expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } + + @Test + public void testValidHeaderForNestedColumns() throws IOException { + // query with non-existent field + String query = "select top['nested'] as nested from dfs.`store/pcapng/sniff.pcapng`"; + RowSet actual = client.queryBuilder().sql(query).rowSet(); + + TupleSchema expectedSchema = new TupleSchema(); + + expectedSchema.add(MaterializedField.create("nested", Types.optional(TypeProtos.MinorType.INT))); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + + // query with an existent field which doesn't support nesting + query = "select type['nested'] as nested from dfs.`store/pcapng/sniff.pcapng`"; + + expectedSchema = new TupleSchema(); + actual = client.queryBuilder().sql(query).rowSet(); + + expectedSchema.add(MaterializedField.create("nested", Types.optional(TypeProtos.MinorType.INT))); + + expected = new RowSetBuilder(client.allocator(), expectedSchema) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(actual); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java new file mode 100644 index 000000000..98d7b6738 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.pcapng; + +import org.apache.drill.PlanTestBase; +import org.apache.drill.common.exceptions.UserRemoteException; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.nio.file.Paths; + +public class TestPcapngRecordReader extends PlanTestBase { + @BeforeClass + public static void setupTestFiles() { + dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng")); + } + + @Test + public void testStarQuery() throws Exception { + Assert.assertEquals(123, testSql("select * from dfs.`store/pcapng/sniff.pcapng`")); + Assert.assertEquals(1, testSql("select * from dfs.`store/pcapng/example.pcapng`")); + } + + @Test + public void testProjectingByName() throws Exception { + Assert.assertEquals(123, testSql("select `timestamp`, packet_data, type from dfs.`store/pcapng/sniff.pcapng`")); + Assert.assertEquals(1, testSql("select src_ip, dst_ip, `timestamp` from dfs.`store/pcapng/example.pcapng`")); + } + + @Test + public void testDiffCaseQuery() throws Exception { + Assert.assertEquals(123, testSql("select `timestamp`, paCket_dAta, TyPe from dfs.`store/pcapng/sniff.pcapng`")); + Assert.assertEquals(1, testSql("select src_ip, dst_ip, `Timestamp` from dfs.`store/pcapng/example.pcapng`")); + } + + @Test + public void testProjectingMissColls() throws Exception { + Assert.assertEquals(123, testSql("select `timestamp`, `name`, `color` from dfs.`store/pcapng/sniff.pcapng`")); + Assert.assertEquals(1, testSql("select src_ip, `time` from dfs.`store/pcapng/example.pcapng`")); + } + + + @Test + public void testCountQuery() throws Exception { + testBuilder() + .sqlQuery("select count(*) as ct from dfs.`store/pcapng/sniff.pcapng`") + .ordered() + .baselineColumns("ct") + .baselineValues(123L) + .build() + .run(); + + testBuilder() + .sqlQuery("select count(*) as ct from dfs.`store/pcapng/example.pcapng`") + .ordered() + .baselineColumns("ct") + .baselineValues(1L) + .build() + .run(); + } + + @Test + public void testGroupBy() throws Exception { + Assert.assertEquals(47, testSql("select src_ip, count(1), sum(packet_length) from dfs.`store/pcapng/sniff.pcapng` group by src_ip")); + } + + @Test + public void testDistinctQuery() throws Exception { + Assert.assertEquals(119, testSql("select distinct `timestamp`, src_ip from dfs.`store/pcapng/sniff.pcapng`")); + Assert.assertEquals(1, testSql("select distinct packet_data from dfs.`store/pcapng/example.pcapng`")); + } + + @Test(expected = UserRemoteException.class) + public void testBasicQueryWithIncorrectFileName() throws Exception { + testSql("select * from dfs.`store/pcapng/snaff.pcapng`"); + } + + @Test + public void testPhysicalPlanExecutionBasedOnQuery() throws Exception { + String query = "EXPLAIN PLAN for select * from dfs.`store/pcapng/sniff.pcapng`"; + String plan = getPlanInString(query, JSON_FORMAT); + Assert.assertEquals(123, testPhysical(plan)); + } +} diff --git a/exec/java-exec/src/test/resources/store/pcapng/example.pcapng b/exec/java-exec/src/test/resources/store/pcapng/example.pcapng Binary files differnew file mode 100644 index 000000000..002cb8d9b --- /dev/null +++ b/exec/java-exec/src/test/resources/store/pcapng/example.pcapng diff --git a/exec/java-exec/src/test/resources/store/pcapng/sniff.pcapng b/exec/java-exec/src/test/resources/store/pcapng/sniff.pcapng Binary files differnew file mode 100644 index 000000000..cd542bd4d --- /dev/null +++ b/exec/java-exec/src/test/resources/store/pcapng/sniff.pcapng |