aboutsummaryrefslogtreecommitdiff
path: root/exec
diff options
context:
space:
mode:
authorVlad Storona <vstorona@cybervisiontech.com>2018-06-21 16:41:17 +0300
committerArina Ielchiieva <arina.yelchiyeva@gmail.com>2018-08-21 20:08:20 +0300
commit2dfd0dab41864f22c7ed924e17c6a8cf9b2f54ad (patch)
tree5a127b2794023cd82412e56c93c858419cb996e1 /exec
parent0f9d19c4c90a148573cfed908dd03ad771306bc1 (diff)
DRILL-6179: Added pcapng-format support
Diffstat (limited to 'exec')
-rw-r--r--exec/java-exec/pom.xml5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java52
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java76
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java214
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java61
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java23
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java28
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java34
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java34
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java441
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java59
-rw-r--r--exec/java-exec/src/main/resources/bootstrap-storage-plugins.json3
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java1
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java212
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java100
-rw-r--r--exec/java-exec/src/test/resources/store/pcapng/example.pcapngbin0 -> 512 bytes
-rw-r--r--exec/java-exec/src/test/resources/store/pcapng/sniff.pcapngbin0 -> 33464 bytes
18 files changed, 1351 insertions, 8 deletions
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index f175c654c..f4068952e 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -534,6 +534,11 @@
<artifactId>metadata-extractor</artifactId>
<version>2.11.0</version>
</dependency>
+ <dependency>
+ <groupId>fr.bmartel</groupId>
+ <artifactId>pcapngdecoder</artifactId>
+ <version>1.2</version>
+ </dependency>
</dependencies>
<profiles>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
index 9cc98de9c..a0a07a99d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
@@ -42,18 +42,18 @@ public class Packet {
private long timestamp;
private int originalLength;
- private byte[] raw;
+ protected byte[] raw;
// index into the raw data where the current ethernet packet starts
private int etherOffset;
// index into the raw data where the current IP packet starts. Should be just after etherOffset
- private int ipOffset;
+ protected int ipOffset;
private int packetLength;
- private int etherProtocol;
- private int protocol;
+ protected int etherProtocol;
+ protected int protocol;
- private boolean isRoutingV6;
+ protected boolean isRoutingV6;
@SuppressWarnings("WeakerAccess")
public boolean readPcap(final InputStream in, final boolean byteOrder, final int maxLength) throws IOException {
@@ -379,7 +379,7 @@ public class Packet {
return (getByte(raw, ipOffset) & 0xf) * 4;
}
- private int ipVersion() {
+ protected int ipVersion() {
return getByte(raw, ipOffset) >>> 4;
}
@@ -409,12 +409,12 @@ public class Packet {
// everything is decoded lazily
}
- private int processIpV4Packet() {
+ protected int processIpV4Packet() {
validateIpV4Packet();
return getByte(raw, ipOffset + 9);
}
- private int processIpV6Packet() {
+ protected int processIpV6Packet() {
Preconditions.checkState(ipVersion() == 6, "Should have seen IP version 6, got %d", ipVersion());
int headerLength = 40;
int nextHeader = raw[ipOffset + 6] & 0xff;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
new file mode 100644
index 000000000..7ff875acf
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+@JsonTypeName("pcapng")
+public class PcapngFormatConfig implements FormatPluginConfig {
+
+ public List<String> extensions = Collections.singletonList("pcapng");
+
+ public List<String> getExtensions() {
+ return extensions;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PcapngFormatConfig that = (PcapngFormatConfig) o;
+ return Objects.equals(extensions, that.extensions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(extensions);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
new file mode 100644
index 000000000..832c0ec3b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyWriter;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.List;
+
+public class PcapngFormatPlugin extends EasyFormatPlugin<PcapngFormatConfig> {
+
+ public static final String DEFAULT_NAME = "pcapng";
+
+ public PcapngFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
+ StoragePluginConfig storagePluginConfig) {
+ this(name, context, fsConf, storagePluginConfig, new PcapngFormatConfig());
+ }
+
+ public PcapngFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, PcapngFormatConfig formatPluginConfig) {
+ super(name, context, fsConf, config, formatPluginConfig, true,
+ false, true, false,
+ formatPluginConfig.getExtensions(), DEFAULT_NAME);
+ }
+
+ @Override
+ public boolean supportsPushDown() {
+ return true;
+ }
+
+ @Override
+ public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs,
+ FileWork fileWork, List<SchemaPath> columns,
+ String userName) {
+ return new PcapngRecordReader(fileWork.getPath(), dfs, columns);
+ }
+
+ @Override
+ public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) {
+ throw new UnsupportedOperationException("unimplemented");
+ }
+
+ @Override
+ public int getReaderOperatorType() {
+ return UserBitShared.CoreOperatorType.PCAPNG_SUB_SCAN_VALUE;
+ }
+
+ @Override
+ public int getWriterOperatorType() {
+ throw new UnsupportedOperationException("unimplemented");
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
new file mode 100644
index 000000000..b1c5f2427
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import fr.bmartel.pcapdecoder.PcapDecoder;
+import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.pcapng.schema.Column;
+import org.apache.drill.exec.store.pcapng.schema.DummyArrayImpl;
+import org.apache.drill.exec.store.pcapng.schema.DummyImpl;
+import org.apache.drill.exec.store.pcapng.schema.Schema;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+public class PcapngRecordReader extends AbstractRecordReader {
+ private static final Logger logger = LoggerFactory.getLogger(PcapngRecordReader.class);
+
+ // batch size should not exceed max allowed record count
+ private static final int BATCH_SIZE = 40_000;
+
+ private final Path pathToFile;
+ private OutputMutator output;
+ private List<ProjectedColumnInfo> projectedCols;
+ private FileSystem fs;
+ private FSDataInputStream in;
+ private List<SchemaPath> columns;
+
+ private Iterator<IPcapngType> it;
+
+ public PcapngRecordReader(final String pathToFile,
+ final FileSystem fileSystem,
+ final List<SchemaPath> columns) {
+ this.fs = fileSystem;
+ this.pathToFile = fs.makeQualified(new Path(pathToFile));
+ this.columns = columns;
+ setColumns(columns);
+ }
+
+ @Override
+ public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
+ try {
+
+ this.output = output;
+ this.in = fs.open(pathToFile);
+ PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in));
+ decoder.decode();
+ this.it = decoder.getSectionList().iterator();
+ setupProjection();
+ } catch (IOException io) {
+ throw UserException.dataReadError(io)
+ .addContext("File name:", pathToFile.toUri().getPath())
+ .build(logger);
+ }
+ }
+
+ @Override
+ public int next() {
+ if (isSkipQuery()) {
+ return iterateOverBlocks((block, counter) -> {
+ });
+ } else {
+ return iterateOverBlocks((block, counter) -> putToTable((IEnhancedPacketBLock) block, counter));
+ }
+ }
+
+ private void putToTable(IEnhancedPacketBLock bLock, Integer counter) {
+ for (ProjectedColumnInfo pci : projectedCols) {
+ pci.getColumn().process(bLock, pci.getVv(), counter);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (in != null) {
+ in.close();
+ in = null;
+ }
+ }
+
+ private void setupProjection() {
+ if (isSkipQuery()) {
+ projectedCols = projectNone();
+ } else if (isStarQuery()) {
+ projectedCols = projectAllCols(Schema.getColumnsNames());
+ } else {
+ projectedCols = projectCols(columns);
+ }
+ }
+
+ private List<ProjectedColumnInfo> projectNone() {
+ List<ProjectedColumnInfo> pciBuilder = new ArrayList<>();
+ pciBuilder.add(makeColumn("dummy", new DummyImpl()));
+ return Collections.unmodifiableList(pciBuilder);
+ }
+
+ private List<ProjectedColumnInfo> projectAllCols(final Set<String> columns) {
+ List<ProjectedColumnInfo> pciBuilder = new ArrayList<>();
+ for (String colName : columns) {
+ pciBuilder.add(makeColumn(colName, Schema.getColumns().get(colName)));
+ }
+ return Collections.unmodifiableList(pciBuilder);
+ }
+
+ private List<ProjectedColumnInfo> projectCols(final List<SchemaPath> columns) {
+ List<ProjectedColumnInfo> pciBuilder = new ArrayList<>();
+ for (SchemaPath schemaPath : columns) {
+ String projectedName = schemaPath.rootName();
+ if (schemaPath.isArray()) {
+ pciBuilder.add(makeColumn(projectedName, new DummyArrayImpl()));
+ } else if (Schema.getColumns().containsKey(projectedName.toLowerCase())) {
+ pciBuilder.add(makeColumn(projectedName,
+ Schema.getColumns().get(projectedName.toLowerCase())));
+ } else {
+ pciBuilder.add(makeColumn(projectedName, new DummyImpl()));
+ }
+ }
+ return Collections.unmodifiableList(pciBuilder);
+ }
+
+ private ProjectedColumnInfo makeColumn(final String colName, final Column column) {
+ MaterializedField field = MaterializedField.create(colName, column.getMinorType());
+ ValueVector vector = getValueVector(field, output);
+ return new ProjectedColumnInfo(vector, column, colName);
+ }
+
+ private ValueVector getValueVector(final MaterializedField field, final OutputMutator output) {
+ try {
+ TypeProtos.MajorType majorType = field.getType();
+ final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass(
+ majorType.getMinorType(), majorType.getMode());
+
+ return output.addField(field, clazz);
+ } catch (SchemaChangeException sce) {
+ throw UserException.internalError(sce)
+ .addContext("The addition of this field is incompatible with this OutputMutator's capabilities")
+ .build(logger);
+ }
+ }
+
+ private Integer iterateOverBlocks(BiConsumer<IPcapngType, Integer> consumer) {
+ int counter = 0;
+ while (it.hasNext() && counter < BATCH_SIZE) {
+ IPcapngType block = it.next();
+ if (block instanceof IEnhancedPacketBLock) {
+ consumer.accept(block, counter);
+ counter++;
+ }
+ }
+ return counter;
+ }
+
+ private static class ProjectedColumnInfo {
+
+ private ValueVector vv;
+ private Column colDef;
+ private String columnName;
+
+ ProjectedColumnInfo(ValueVector vv, Column colDef, String columnName) {
+ this.vv = vv;
+ this.colDef = colDef;
+ this.columnName = columnName;
+ }
+
+ public ValueVector getVv() {
+ return vv;
+ }
+
+ Column getColumn() {
+ return colDef;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java
new file mode 100644
index 000000000..ea5d83104
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/decoder/PacketDecoder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng.decoder;
+
+import org.apache.drill.exec.store.pcap.decoder.Packet;
+import org.apache.drill.exec.store.pcap.decoder.PacketConstants;
+
+import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getByte;
+import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getShort;
+
+public class PacketDecoder extends Packet {
+
+ @SuppressWarnings("WeakerAccess")
+ public boolean readPcapng(final byte[] raw) {
+ this.raw = raw;
+ return decodeEtherPacket();
+ }
+
+ private boolean decodeEtherPacket() {
+ etherProtocol = getShort(raw, PacketConstants.PACKET_PROTOCOL_OFFSET);
+ ipOffset = PacketConstants.IP_OFFSET;
+ if (isIpV4Packet()) {
+ protocol = processIpV4Packet();
+ return true;
+ } else if (isIpV6Packet()) {
+ int tmp = processIpV6Packet();
+ if (tmp != -1) {
+ protocol = tmp;
+ }
+ return true;
+ } else if (isPPPoV6Packet()) {
+ protocol = getByte(raw, 48);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected int processIpV6Packet() {
+ try {
+ return super.processIpV6Packet();
+ } catch (IllegalStateException ise) {
+ return -1;
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java
new file mode 100644
index 000000000..dafeaa399
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * For comments on realization of this format plugin look at :
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/DRILL-6179"> Jira</a>
+ */
+package org.apache.drill.exec.store.pcapng;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java
new file mode 100644
index 000000000..109b7ddc9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng.schema;
+
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.vector.ValueVector;
+
+public interface Column {
+ TypeProtos.MajorType getMinorType();
+
+ void process(IEnhancedPacketBLock block, ValueVector vv, int count);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java
new file mode 100644
index 000000000..2023d195e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng.schema;
+
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.vector.ValueVector;
+
+public class DummyArrayImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.repeated(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java
new file mode 100644
index 000000000..a8c26a06a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng.schema;
+
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.vector.ValueVector;
+
+public class DummyImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java
new file mode 100644
index 000000000..a9738bdbe
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng.schema;
+
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.store.pcapng.decoder.PacketDecoder;
+import org.apache.drill.exec.vector.ValueVector;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
+import static org.apache.drill.exec.store.pcapng.schema.Util.setNullableLongColumnValue;
+
+public class Schema {
+
+ private final static Map<String, Column> columns = new HashMap<>();
+
+ static {
+ columns.put("timestamp", new TimestampImpl());
+ columns.put("packet_length", new PacketLenImpl());
+ columns.put("type", new TypeImpl());
+ columns.put("src_ip", new SrcIpImpl());
+ columns.put("dst_ip", new DstIpImpl());
+ columns.put("src_port", new SrcPortImpl());
+ columns.put("dst_port", new DstPortImpl());
+ columns.put("src_mac_address", new SrcMacImpl());
+ columns.put("dst_mac_address", new DstMacImpl());
+ columns.put("tcp_session", new TcpSessionImpl());
+ columns.put("tcp_ack", new TcpAckImpl());
+ columns.put("tcp_flags", new TcpFlags());
+ columns.put("tcp_flags_ns", new TcpFlagsNsImpl());
+ columns.put("tcp_flags_cwr", new TcpFlagsCwrImpl());
+ columns.put("tcp_flags_ece", new TcpFlagsEceImpl());
+ columns.put("tcp_flags_ece_ecn_capable", new TcpFlagsEceEcnCapableImpl());
+ columns.put("tcp_flags_ece_congestion_experienced", new TcpFlagsEceCongestionExperiencedImpl());
+ columns.put("tcp_flags_urg", new TcpFlagsUrgIml());
+ columns.put("tcp_flags_ack", new TcpFlagsAckImpl());
+ columns.put("tcp_flags_psh", new TcpFlagsPshImpl());
+ columns.put("tcp_flags_rst", new TcpFlagsRstImpl());
+ columns.put("tcp_flags_syn", new TcpFlagsSynImpl());
+ columns.put("tcp_flags_fin", new TcpFlagsFinImpl());
+ columns.put("tcp_parsed_flags", new TcpParsedFlags());
+ columns.put("packet_data", new PacketDataImpl());
+ }
+
+ public static Map<String, Column> getColumns() {
+ return columns;
+ }
+
+ public static Set<String> getColumnsNames() {
+ return columns.keySet();
+ }
+
+ static class TimestampImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.required(TypeProtos.MinorType.TIMESTAMP);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ Util.setTimestampColumnValue(block.getTimeStamp(), vv, count);
+ }
+ }
+
+ static class PacketLenImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.required(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ Util.setIntegerColumnValue(block.getPacketLength(), vv, count);
+ }
+ }
+
+ static class TypeImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableStringColumnValue(packet.getPacketType(), vv, count);
+ }
+ }
+ }
+
+ static class SrcIpImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableStringColumnValue(packet.getSrc_ip().getHostAddress(), vv, count);
+ }
+ }
+ }
+
+ static class DstIpImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableStringColumnValue(packet.getDst_ip().getHostAddress(), vv, count);
+ }
+ }
+ }
+
+ static class SrcPortImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableIntegerColumnValue(packet.getSrc_port(), vv, count);
+ }
+ }
+ }
+
+ static class DstPortImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableIntegerColumnValue(packet.getDst_port(), vv, count);
+ }
+ }
+ }
+
+ static class SrcMacImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableStringColumnValue(packet.getEthernetSource(), vv, count);
+ }
+ }
+ }
+
+ static class DstMacImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableStringColumnValue(packet.getEthernetDestination(), vv, count);
+ }
+ }
+ }
+
+ static class TcpSessionImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.BIGINT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ setNullableLongColumnValue(packet.getSessionHash(), vv, count);
+ }
+ }
+ }
+
+ static class TcpAckImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableIntegerColumnValue(packet.getAckNumber(), vv, count);
+ }
+ }
+ }
+
+ static class TcpFlags implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableIntegerColumnValue(packet.getFlags(), vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsNsImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x100) != 0, vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsCwrImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x80) != 0, vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsEceImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x40) != 0, vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsEceEcnCapableImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x42) == 0x42, vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsEceCongestionExperiencedImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x42) == 0x40, vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsUrgIml implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x20) != 0, vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsAckImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x10) != 0, vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsPshImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x8) != 0, vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsRstImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x4) != 0, vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsSynImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x2) != 0, vv, count);
+ }
+ }
+ }
+
+ static class TcpFlagsFinImpl implements Column {
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableBooleanColumnValue((packet.getFlags() & 0x1) != 0, vv, count);
+ }
+ }
+ }
+
+ static class TcpParsedFlags implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableStringColumnValue(packet.getParsedFlags(), vv, count);
+ }
+ }
+ }
+
+ static class PacketDataImpl implements Column {
+ @Override
+ public TypeProtos.MajorType getMinorType() {
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ }
+
+ @Override
+ public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(block.getPacketData())) {
+ Util.setNullableStringColumnValue(parseBytesToASCII(block.getPacketData()), vv, count);
+ }
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java
new file mode 100644
index 000000000..06e8e6ac6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng.schema;
+
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.TimeStampVector;
+import org.apache.drill.exec.vector.ValueVector;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class Util {
+ static void setNullableIntegerColumnValue(final int data, final ValueVector vv, final int count) {
+ ((NullableIntVector.Mutator) vv.getMutator())
+ .setSafe(count, data);
+ }
+
+ static void setIntegerColumnValue(final int data, final ValueVector vv, final int count) {
+ ((IntVector.Mutator) vv.getMutator())
+ .setSafe(count, data);
+ }
+
+ static void setTimestampColumnValue(final long data, final ValueVector vv, final int count) {
+ ((TimeStampVector.Mutator) vv.getMutator())
+ .setSafe(count, data / 1000);
+ }
+
+ static void setNullableLongColumnValue(final long data, final ValueVector vv, final int count) {
+ ((NullableBigIntVector.Mutator) vv.getMutator())
+ .setSafe(count, data);
+ }
+
+ static void setNullableStringColumnValue(final String data, final ValueVector vv, final int count) {
+ ((NullableVarCharVector.Mutator) vv.getMutator())
+ .setSafe(count, data.getBytes(UTF_8), 0, data.length());
+ }
+
+ static void setNullableBooleanColumnValue(final boolean data, final ValueVector vv, final int count) {
+ ((NullableIntVector.Mutator) vv.getMutator())
+ .setSafe(count, data ? 1 : 0);
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
index 42cddd865..46f162052 100644
--- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
@@ -46,6 +46,9 @@
"pcap" : {
type: "pcap"
},
+ "pcapng" : {
+ type: "pcapng"
+ },
"avro" : {
type: "avro"
},
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
index f51fe4c89..e53c394ec 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
@@ -59,6 +59,7 @@ public class TestFormatPluginOptionExtractor {
case "json":
case "sequencefile":
case "pcap":
+ case "pcapng":
case "avro":
assertEquals(d.typeName, "(type: String)", d.presentParams());
break;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java
new file mode 100644
index 000000000..5dcffa95c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+
+public class TestPcapngHeaders extends ClusterTest {
+ @BeforeClass
+ public static void setupTestFiles() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1));
+ dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng"));
+ }
+
+ @Test
+ public void testValidHeadersForStarQuery() throws IOException {
+ String query = "select * from dfs.`store/pcapng/sniff.pcapng`";
+ RowSet actual = client.queryBuilder().sql(query).rowSet();
+
+ TupleSchema expectedSchema = new TupleSchema();
+
+ expectedSchema.add(MaterializedField.create("tcp_flags_ece_ecn_capable", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_ece_congestion_experienced", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_psh", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("type", Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_cwr", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("dst_ip", Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("src_ip", Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_fin", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_ece", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_flags", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_ack", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("src_mac_address", Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_syn", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_rst", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP)));
+ expectedSchema.add(MaterializedField.create("tcp_session", Types.optional(TypeProtos.MinorType.BIGINT)));
+ expectedSchema.add(MaterializedField.create("packet_data", Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("tcp_parsed_flags", Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_ns", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("src_port", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("packet_length", Types.required(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_flags_urg", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_ack", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("dst_port", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("dst_mac_address", Types.optional(TypeProtos.MinorType.VARCHAR)));
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+ }
+
+ @Test
+ public void testValidHeadersForProjection() throws IOException {
+ String query = "select sRc_ip, dst_IP, dst_mAc_address, src_Port, tcp_session, `Timestamp` from dfs.`store/pcapng/sniff.pcapng`";
+ RowSet actual = client.queryBuilder().sql(query).rowSet();
+
+ TupleSchema expectedSchema = new TupleSchema();
+
+ expectedSchema.add(MaterializedField.create("sRc_ip", Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("dst_IP", Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("dst_mAc_address", Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("src_Port", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_session", Types.optional(TypeProtos.MinorType.BIGINT)));
+ expectedSchema.add(MaterializedField.create("Timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP)));
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+ }
+
+ @Test
+ public void testValidHeadersForMissColumns() throws IOException {
+ String query = "select `timestamp`, `name`, `color` from dfs.`store/pcapng/sniff.pcapng`";
+ RowSet actual = client.queryBuilder().sql(query).rowSet();
+
+ TupleSchema expectedSchema = new TupleSchema();
+
+ expectedSchema.add(MaterializedField.create("timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP)));
+ expectedSchema.add(MaterializedField.create("name", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("color", Types.optional(TypeProtos.MinorType.INT)));
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+ }
+
+ @Test
+ public void testMixColumns() throws IOException {
+ String query = "select src_ip, dst_ip, dst_mac_address, src_port, tcp_session, `timestamp` from dfs.`store/pcapng/sniff.pcapng`";
+ RowSet actual = client.queryBuilder().sql(query).rowSet();
+
+ TupleSchema expectedSchema = new TupleSchema();
+
+ expectedSchema.add(MaterializedField.create("sRc_ip", Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("dst_IP", Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("dst_mAc_address", Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("src_Port", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_session", Types.optional(TypeProtos.MinorType.BIGINT)));
+ expectedSchema.add(MaterializedField.create("Timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP)));
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+
+ String queryWithDiffOrder = "select `timestamp`, src_ip, dst_ip, src_port, tcp_session, dst_mac_address from dfs.`store/pcapng/sniff.pcapng`";
+ actual = client.queryBuilder().sql(queryWithDiffOrder).rowSet();
+
+ expectedSchema = new TupleSchema();
+
+ expectedSchema.add(MaterializedField.create("timestamp", Types.required(TypeProtos.MinorType.TIMESTAMP)));
+ expectedSchema.add(MaterializedField.create("src_ip", Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("dst_ip", Types.optional(TypeProtos.MinorType.VARCHAR)));
+ expectedSchema.add(MaterializedField.create("src_port", Types.optional(TypeProtos.MinorType.INT)));
+ expectedSchema.add(MaterializedField.create("tcp_session", Types.optional(TypeProtos.MinorType.BIGINT)));
+ expectedSchema.add(MaterializedField.create("dst_mac_address", Types.optional(TypeProtos.MinorType.VARCHAR)));
+
+ expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+ }
+
+ @Test
+ public void testValidHeaderForArrayColumns() throws IOException {
+ // query with non-existent field
+ String query = "select arr[3] as arr from dfs.`store/pcapng/sniff.pcapng`";
+ RowSet actual = client.queryBuilder().sql(query).rowSet();
+
+ TupleSchema expectedSchema = new TupleSchema();
+
+ expectedSchema.add(MaterializedField.create("arr", Types.optional(TypeProtos.MinorType.INT)));
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+
+ // query with an existent field which doesn't support arrays
+ query = "select type[45] as arr from dfs.`store/pcapng/sniff.pcapng`";
+
+ expectedSchema = new TupleSchema();
+ actual = client.queryBuilder().sql(query).rowSet();
+
+ expectedSchema.add(MaterializedField.create("arr", Types.optional(TypeProtos.MinorType.INT)));
+
+ expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+ }
+
+ @Test
+ public void testValidHeaderForNestedColumns() throws IOException {
+ // query with non-existent field
+ String query = "select top['nested'] as nested from dfs.`store/pcapng/sniff.pcapng`";
+ RowSet actual = client.queryBuilder().sql(query).rowSet();
+
+ TupleSchema expectedSchema = new TupleSchema();
+
+ expectedSchema.add(MaterializedField.create("nested", Types.optional(TypeProtos.MinorType.INT)));
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+
+ // query with an existent field which doesn't support nesting
+ query = "select type['nested'] as nested from dfs.`store/pcapng/sniff.pcapng`";
+
+ expectedSchema = new TupleSchema();
+ actual = client.queryBuilder().sql(query).rowSet();
+
+ expectedSchema.add(MaterializedField.create("nested", Types.optional(TypeProtos.MinorType.INT)));
+
+ expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(actual);
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
new file mode 100644
index 000000000..98d7b6738
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.file.Paths;
+
+public class TestPcapngRecordReader extends PlanTestBase {
+ @BeforeClass
+ public static void setupTestFiles() {
+ dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng"));
+ }
+
+ @Test
+ public void testStarQuery() throws Exception {
+ Assert.assertEquals(123, testSql("select * from dfs.`store/pcapng/sniff.pcapng`"));
+ Assert.assertEquals(1, testSql("select * from dfs.`store/pcapng/example.pcapng`"));
+ }
+
+ @Test
+ public void testProjectingByName() throws Exception {
+ Assert.assertEquals(123, testSql("select `timestamp`, packet_data, type from dfs.`store/pcapng/sniff.pcapng`"));
+ Assert.assertEquals(1, testSql("select src_ip, dst_ip, `timestamp` from dfs.`store/pcapng/example.pcapng`"));
+ }
+
+ @Test
+ public void testDiffCaseQuery() throws Exception {
+ Assert.assertEquals(123, testSql("select `timestamp`, paCket_dAta, TyPe from dfs.`store/pcapng/sniff.pcapng`"));
+ Assert.assertEquals(1, testSql("select src_ip, dst_ip, `Timestamp` from dfs.`store/pcapng/example.pcapng`"));
+ }
+
+ @Test
+ public void testProjectingMissColls() throws Exception {
+ Assert.assertEquals(123, testSql("select `timestamp`, `name`, `color` from dfs.`store/pcapng/sniff.pcapng`"));
+ Assert.assertEquals(1, testSql("select src_ip, `time` from dfs.`store/pcapng/example.pcapng`"));
+ }
+
+
+ @Test
+ public void testCountQuery() throws Exception {
+ testBuilder()
+ .sqlQuery("select count(*) as ct from dfs.`store/pcapng/sniff.pcapng`")
+ .ordered()
+ .baselineColumns("ct")
+ .baselineValues(123L)
+ .build()
+ .run();
+
+ testBuilder()
+ .sqlQuery("select count(*) as ct from dfs.`store/pcapng/example.pcapng`")
+ .ordered()
+ .baselineColumns("ct")
+ .baselineValues(1L)
+ .build()
+ .run();
+ }
+
+ @Test
+ public void testGroupBy() throws Exception {
+ Assert.assertEquals(47, testSql("select src_ip, count(1), sum(packet_length) from dfs.`store/pcapng/sniff.pcapng` group by src_ip"));
+ }
+
+ @Test
+ public void testDistinctQuery() throws Exception {
+ Assert.assertEquals(119, testSql("select distinct `timestamp`, src_ip from dfs.`store/pcapng/sniff.pcapng`"));
+ Assert.assertEquals(1, testSql("select distinct packet_data from dfs.`store/pcapng/example.pcapng`"));
+ }
+
+ @Test(expected = UserRemoteException.class)
+ public void testBasicQueryWithIncorrectFileName() throws Exception {
+ testSql("select * from dfs.`store/pcapng/snaff.pcapng`");
+ }
+
+ @Test
+ public void testPhysicalPlanExecutionBasedOnQuery() throws Exception {
+ String query = "EXPLAIN PLAN for select * from dfs.`store/pcapng/sniff.pcapng`";
+ String plan = getPlanInString(query, JSON_FORMAT);
+ Assert.assertEquals(123, testPhysical(plan));
+ }
+}
diff --git a/exec/java-exec/src/test/resources/store/pcapng/example.pcapng b/exec/java-exec/src/test/resources/store/pcapng/example.pcapng
new file mode 100644
index 000000000..002cb8d9b
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/pcapng/example.pcapng
Binary files differ
diff --git a/exec/java-exec/src/test/resources/store/pcapng/sniff.pcapng b/exec/java-exec/src/test/resources/store/pcapng/sniff.pcapng
new file mode 100644
index 000000000..cd542bd4d
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/pcapng/sniff.pcapng
Binary files differ