From a43839e2147c24700f8a331c6863566abed7a51e Mon Sep 17 00:00:00 2001 From: "Charles S. Givre" Date: Sun, 17 Feb 2019 21:42:02 -0500 Subject: DRILL-6582: SYSLOG (RFC-5424) Format Plugin closes #1530 --- contrib/format-syslog/README.md | 41 +++ contrib/format-syslog/pom.xml | 89 +++++ .../exec/store/syslog/SyslogFormatConfig.java | 87 +++++ .../exec/store/syslog/SyslogFormatPlugin.java | 82 +++++ .../exec/store/syslog/SyslogRecordReader.java | 401 +++++++++++++++++++++ .../src/main/resources/drill-module.conf | 22 ++ .../drill/exec/store/syslog/TestSyslogFormat.java | 302 ++++++++++++++++ .../src/test/resources/syslog/logs.syslog | 8 + .../src/test/resources/syslog/logs.syslog1 | 8 + .../src/test/resources/syslog/logs1.syslog | 6 + .../src/test/resources/syslog/test.syslog | 1 + .../src/test/resources/syslog/test.syslog1 | 2 + .../native/client/src/protobuf/UserBitShared.pb.cc | 13 +- .../native/client/src/protobuf/UserBitShared.pb.h | 5 +- contrib/pom.xml | 1 + distribution/pom.xml | 5 + distribution/src/assemble/component.xml | 49 +-- pom.xml | 4 + .../org/apache/drill/exec/proto/UserBitShared.java | 21 +- .../drill/exec/proto/beans/CoreOperatorType.java | 4 +- protocol/src/main/protobuf/UserBitShared.proto | 1 + 21 files changed, 1113 insertions(+), 39 deletions(-) create mode 100644 contrib/format-syslog/README.md create mode 100644 contrib/format-syslog/pom.xml create mode 100644 contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatConfig.java create mode 100644 contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java create mode 100644 contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java create mode 100644 contrib/format-syslog/src/main/resources/drill-module.conf create mode 100644 contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java create mode 100644 contrib/format-syslog/src/test/resources/syslog/logs.syslog create mode 100644 contrib/format-syslog/src/test/resources/syslog/logs.syslog1 create mode 100644 contrib/format-syslog/src/test/resources/syslog/logs1.syslog create mode 100644 contrib/format-syslog/src/test/resources/syslog/test.syslog create mode 100644 contrib/format-syslog/src/test/resources/syslog/test.syslog1 diff --git a/contrib/format-syslog/README.md b/contrib/format-syslog/README.md new file mode 100644 index 000000000..2d7cbd8b1 --- /dev/null +++ b/contrib/format-syslog/README.md @@ -0,0 +1,41 @@ +# Syslog Format Plugin +This format plugin enables Drill to query syslog formatted data as specified in RFC-5424, as shown below. + +``` +<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog - ID47 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"][examplePriority@32473 class="high"] +``` + +## Configuration Options +This format pluin has the following configuration options: + +* **`maxErrors`**: Sets the maximum number of malformatted lines that the format plugin will tolerate before throwing an error and halting execution +* **`flattenStructuredData`**: Syslog data optionally contains a series of key/value pairs known as the structured data. By default, Drill will parse these into a `map`. + +``` +"syslog": { + "type": "syslog", + "extensions": [ "syslog" ], + "maxErrors": 10, + "flattenStructuredData": false +} +``` + +## Fields +Since the structure of the data contained in a syslog is well known. In terms of data types, the `event_date` field is a datetime, the `severity_code`, `facility_code`, and `proc_id` are integers and all other fields are VARCHARs. + +** Note: All fields, with the exception of the `event_date`, are not required, so not all fields may be present at all times. ** + +* `event_date`: This is the time of the event +* `severity_code`: The severity code of the event +* `facility_code`: The facility code of the incident +* `severity`: The severity of the event +* `facility`: +* `ip`: The IP address or hostname of the source machine +* `app_name`: The name of the application that is generating the event +* `proc_id`: The process ID of the event that generated the event +* `msg_id`: The identifier of the message +* `message`: The actual message text of the event +* `_raw`: The full text of the event + +### Structured Data +Syslog data can contain a list of key/value pairs which Drill will extract in a field called `structured_data`. This field is a Drill Map. \ No newline at end of file diff --git a/contrib/format-syslog/pom.xml b/contrib/format-syslog/pom.xml new file mode 100644 index 000000000..9bc9159f3 --- /dev/null +++ b/contrib/format-syslog/pom.xml @@ -0,0 +1,89 @@ + + + + 4.0.0 + + + drill-contrib-parent + org.apache.drill.contrib + 1.16.0-SNAPSHOT + + + drill-format-syslog + contrib/format-syslog + + + + org.apache.drill.exec + drill-java-exec + ${project.version} + + + org.realityforge.jsyslog-message + jsyslog-message + 1.2 + + + + + org.apache.drill.exec + drill-java-exec + tests + ${project.version} + test + + + + org.apache.drill + drill-common + tests + ${project.version} + test + + + + + + maven-resources-plugin + + + copy-java-sources + process-sources + + copy-resources + + + ${basedir}/target/classes/org/apache/drill/exec/store/syslog + + + + src/main/java/org/apache/drill/exec/store/syslog + true + + + + + + + + + diff --git a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatConfig.java b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatConfig.java new file mode 100644 index 000000000..0f60eeb4b --- /dev/null +++ b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatConfig.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.syslog; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.drill.shaded.guava.com.google.common.base.Objects; +import org.apache.drill.common.logical.FormatPluginConfig; + +import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; + +@JsonTypeName("syslog") +@JsonInclude(JsonInclude.Include.NON_DEFAULT) +public class SyslogFormatConfig implements FormatPluginConfig { + + public List extensions; + public int maxErrors = 10; + public boolean flattenStructuredData; + + public boolean getFlattenStructuredData() { + return flattenStructuredData; + } + + public int getMaxErrors() { + return maxErrors; + } + + public List getExtensions() { + return extensions; + } + + public void setExtensions(List ext) { + this.extensions = ext; + } + + public void setExtension(String ext) { + if (this.extensions == null) { + this.extensions = new ArrayList(); + } + this.extensions.add(ext); + } + + public void setMaxErrors(int errors) { + this.maxErrors = errors; + } + + public void setFlattenStructuredData(boolean flattenData) { + this.flattenStructuredData = flattenData; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + SyslogFormatConfig other = (SyslogFormatConfig) obj; + return Objects.equal(extensions, other.extensions) && + Objects.equal(maxErrors, other.maxErrors) && + Objects.equal(flattenStructuredData, other.flattenStructuredData); + } + + @Override + public int hashCode() { + return Arrays.hashCode(new Object[]{maxErrors, flattenStructuredData, extensions}); + } +} diff --git a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java new file mode 100644 index 000000000..bf4b4b479 --- /dev/null +++ b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.syslog; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.RecordWriter; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; +import org.apache.drill.exec.store.dfs.easy.EasyWriter; +import org.apache.drill.exec.store.dfs.easy.FileWork; +import org.apache.hadoop.conf.Configuration; +import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; + + +import java.util.List; + +public class SyslogFormatPlugin extends EasyFormatPlugin { + + public static final String DEFAULT_NAME = "syslog"; + private final SyslogFormatConfig formatConfig; + + public SyslogFormatPlugin(String name, DrillbitContext context, + Configuration fsConf, StoragePluginConfig storageConfig, + SyslogFormatConfig formatConfig) { + super(name, context, fsConf, storageConfig, formatConfig, + true, // readable + false, // writable + true, // blockSplittable + true, // compressible + Lists.newArrayList(formatConfig.getExtensions()), + DEFAULT_NAME); + this.formatConfig = formatConfig; + } + + @Override + public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, + List columns, String userName) throws ExecutionSetupException { + return new SyslogRecordReader(context, dfs, fileWork, columns, userName, formatConfig); + } + + @Override + public boolean supportsPushDown() { + return true; + } + + @Override + public RecordWriter getRecordWriter(FragmentContext context, + EasyWriter writer) throws UnsupportedOperationException { + throw new UnsupportedOperationException("Drill does not support writing records to Syslog format."); + } + + @Override + public int getReaderOperatorType() { + return CoreOperatorType.SYSLOG_SUB_SCAN_VALUE; + } + + @Override + public int getWriterOperatorType() { + throw new UnsupportedOperationException("Drill does not support writing records to Syslog format."); + } +} diff --git a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java new file mode 100644 index 000000000..a198e3444 --- /dev/null +++ b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.syslog; + +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.easy.FileWork; +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter; +import org.apache.hadoop.fs.Path; +import org.realityforge.jsyslog.message.StructuredDataParameter; +import org.realityforge.jsyslog.message.SyslogMessage; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.text.SimpleDateFormat; +import java.util.List; +import java.util.Map; +import java.util.Iterator; + +public class SyslogRecordReader extends AbstractRecordReader { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SyslogRecordReader.class); + private static final int MAX_RECORDS_PER_BATCH = 4096; + + private final DrillFileSystem fileSystem; + private final FileWork fileWork; + private final String userName; + private BufferedReader reader; + private DrillBuf buffer; + private VectorContainerWriter writer; + private SyslogFormatConfig config; + private int maxErrors; + private boolean flattenStructuredData; + private int errorCount; + private int lineCount; + private List projectedColumns; + private String line; + + private SimpleDateFormat df; + + public SyslogRecordReader(FragmentContext context, + DrillFileSystem fileSystem, + FileWork fileWork, + List columns, + String userName, + SyslogFormatConfig config) throws OutOfMemoryException { + + this.fileSystem = fileSystem; + this.fileWork = fileWork; + this.userName = userName; + this.config = config; + this.maxErrors = config.getMaxErrors(); + this.df = getValidDateObject("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + this.errorCount = 0; + this.buffer = context.getManagedBuffer().reallocIfNeeded(4096); + this.projectedColumns = columns; + this.flattenStructuredData = config.getFlattenStructuredData(); + + setColumns(columns); + } + + @Override + public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException { + openFile(); + this.writer = new VectorContainerWriter(output); + } + + private void openFile() { + InputStream in; + try { + in = fileSystem.open(new Path(fileWork.getPath())); + } catch (Exception e) { + throw UserException + .dataReadError(e) + .message("Failed to open open input file: %s", fileWork.getPath()) + .addContext("User name", this.userName) + .build(logger); + } + this.lineCount = 0; + reader = new BufferedReader(new InputStreamReader(in)); + } + + @Override + public int next() { + this.writer.allocate(); + this.writer.reset(); + + int recordCount = 0; + + try { + BaseWriter.MapWriter map = this.writer.rootAsMap(); + String line = null; + + while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) { + lineCount++; + + // Skip empty lines + line = line.trim(); + if (line.length() == 0) { + continue; + } + this.line = line; + + try { + SyslogMessage parsedMessage = SyslogMessage.parseStructuredSyslogMessage(line); + + this.writer.setPosition(recordCount); + map.start(); + + if (isStarQuery()) { + writeAllColumns(map, parsedMessage); + } else { + writeProjectedColumns(map, parsedMessage); + } + map.end(); + recordCount++; + + } catch (Exception e) { + errorCount++; + if (errorCount > maxErrors) { + throw UserException + .dataReadError() + .message("Maximum Error Threshold Exceeded: ") + .addContext("Line: " + lineCount) + .addContext(e.getMessage()) + .build(logger); + } + } + } + + this.writer.setValueCount(recordCount); + return recordCount; + + } catch (final Exception e) { + errorCount++; + if (errorCount > maxErrors) { + throw UserException.dataReadError() + .message("Error parsing file") + .addContext(e.getMessage()) + .build(logger); + } + } + + return recordCount; + } + + private void writeAllColumns(BaseWriter.MapWriter map, SyslogMessage parsedMessage) { + + long milliseconds = 0; + try { + milliseconds = parsedMessage.getTimestamp().getMillis(); + } catch (final Exception e) { + errorCount++; + if (errorCount > maxErrors) { + throw UserException.dataReadError() + .message("Syslog Format Plugin: Error parsing date") + .addContext(e.getMessage()) + .build(logger); + } + } + map.timeStamp("event_date").writeTimeStamp(milliseconds); + map.integer("severity_code").writeInt(parsedMessage.getLevel().ordinal()); + map.integer("facility_code").writeInt(parsedMessage.getFacility().ordinal()); + + mapStringField("severity", parsedMessage.getLevel().name(), map); + mapStringField("facility", parsedMessage.getFacility().name(), map); + mapStringField("ip", parsedMessage.getHostname(), map); + mapStringField("app_name", parsedMessage.getAppName(), map); + mapStringField("process_id", parsedMessage.getProcId(), map); + mapStringField("message_id", parsedMessage.getMsgId(), map); + + if (parsedMessage.getStructuredData() != null) { + mapStringField("structured_data_text", parsedMessage.getStructuredData().toString(), map); + Map> structuredData = parsedMessage.getStructuredData(); + if (flattenStructuredData) { + mapFlattenedStructuredData(structuredData, map); + } else { + mapComplexField("structured_data", structuredData, map); + } + } + mapStringField("message", parsedMessage.getMessage(), map); + } + + private void writeProjectedColumns(BaseWriter.MapWriter map, SyslogMessage parsedMessage) throws UserException { + String columnName; + + for (SchemaPath col : projectedColumns) { + + //Case for nested fields + if (col.getAsNamePart().hasChild()) { + String fieldName = col.getAsNamePart().getChild().getName(); + mapStructuredDataField(fieldName, map, parsedMessage); + } else { + columnName = col.getAsNamePart().getName(); + + //Extracts fields from structured data IF the user selected to flatten these fields + if ((!columnName.equals("structured_data_text")) && columnName.startsWith("structured_data_")) { + String fieldName = columnName.replace("structured_data_", ""); + String value = getFieldFromStructuredData(fieldName, parsedMessage); + mapStringField(columnName, value, map); + } else { + switch (columnName) { + case "event_date": + long milliseconds = parsedMessage.getTimestamp().getMillis(); //TODO put in try/catch + map.timeStamp("event_date").writeTimeStamp(milliseconds); + break; + case "severity_code": + map.integer("severity_code").writeInt(parsedMessage.getLevel().ordinal()); + break; + case "facility_code": + map.integer("facility_code").writeInt(parsedMessage.getFacility().ordinal()); + break; + case "severity": + mapStringField("severity", parsedMessage.getLevel().name(), map); + break; + case "facility": + mapStringField("facility", parsedMessage.getFacility().name(), map); + break; + case "ip": + mapStringField("ip", parsedMessage.getHostname(), map); + break; + case "app_name": + mapStringField("app_name", parsedMessage.getAppName(), map); + break; + case "process_id": + mapStringField("process_id", parsedMessage.getProcId(), map); + break; + case "msg_id": + mapStringField("message_id", parsedMessage.getMsgId(), map); + break; + case "structured_data": + if (parsedMessage.getStructuredData() != null) { + Map> structured_data = parsedMessage.getStructuredData(); + mapComplexField("structured_data", structured_data, map); + } + break; + case "structured_data_text": + if (parsedMessage.getStructuredData() != null) { + mapStringField("structured_data_text", parsedMessage.getStructuredData().toString(), map); + } else { + mapStringField("structured_data_text", "", map); + } + break; + case "message": + mapStringField("message", parsedMessage.getMessage(), map); + break; + case "_raw": + mapStringField("_raw", this.line, map); + break; + + default: + mapStringField(columnName, "", map); + } + } + } + } + } + + //Helper function to map strings + private void mapStringField(String name, String value, BaseWriter.MapWriter map) { + if (value == null) { + return; + } + try { + byte[] bytes = value.getBytes("UTF-8"); + int stringLength = bytes.length; + this.buffer = buffer.reallocIfNeeded(stringLength); + this.buffer.setBytes(0, bytes, 0, stringLength); + map.varChar(name).writeVarChar(0, stringLength, buffer); + } catch (Exception e) { + throw UserException + .dataWriteError() + .addContext("Could not write string: ") + .addContext(e.getMessage()) + .build(logger); + } + } + + //Helper function to flatten structured data + private void mapFlattenedStructuredData(Map> data, BaseWriter.MapWriter map) { + Iterator>> entries = data.entrySet().iterator(); + while (entries.hasNext()) { + Map.Entry> entry = entries.next(); + + List dataParameters = entry.getValue(); + String fieldName; + String fieldValue; + + for (StructuredDataParameter parameter : dataParameters) { + fieldName = "structured_data_" + parameter.getName(); + fieldValue = parameter.getValue(); + + mapStringField(fieldName, fieldValue, map); + } + } + } + + //Gets field from the Structured Data Construct + private String getFieldFromStructuredData(String fieldName, SyslogMessage parsedMessage) { + String result = null; + Map> structuredData = parsedMessage.getStructuredData(); + Iterator>> entries = parsedMessage.getStructuredData().entrySet().iterator(); + while (entries.hasNext()) { + Map.Entry> entry = entries.next(); + List dataParameters = entry.getValue(); + + for (StructuredDataParameter d : dataParameters) { + if (d.getName().equals(fieldName)) { + return d.getValue(); + } + } + } + return result; + } + + //Helper function to map arrays + private void mapComplexField(String mapName, Map> data, BaseWriter.MapWriter map) { + Iterator>> entries = data.entrySet().iterator(); + while (entries.hasNext()) { + Map.Entry> entry = entries.next(); + + List dataParameters = entry.getValue(); + String fieldName; + String fieldValue; + + for (StructuredDataParameter parameter : dataParameters) { + fieldName = parameter.getName(); + fieldValue = parameter.getValue(); + + VarCharHolder rowHolder = new VarCharHolder(); + + byte[] rowStringBytes = fieldValue.getBytes(); + this.buffer.reallocIfNeeded(rowStringBytes.length); + this.buffer.setBytes(0, rowStringBytes); + rowHolder.start = 0; + rowHolder.end = rowStringBytes.length; + rowHolder.buffer = this.buffer; + + map.map(mapName).varChar(fieldName).write(rowHolder); + } + } + } + + private void mapStructuredDataField(String fieldName, BaseWriter.MapWriter map, SyslogMessage parsedMessage) { + String fieldValue = getFieldFromStructuredData(fieldName, parsedMessage); + VarCharHolder rowHolder = new VarCharHolder(); + + byte[] rowStringBytes = fieldValue.getBytes(); + this.buffer.reallocIfNeeded(rowStringBytes.length); + this.buffer.setBytes(0, rowStringBytes); + rowHolder.start = 0; + rowHolder.end = rowStringBytes.length; + rowHolder.buffer = this.buffer; + + map.map("structured_data").varChar(fieldName).write(rowHolder); + } + + public SimpleDateFormat getValidDateObject(String d) { + SimpleDateFormat tempDateFormat; + if (d != null && !d.isEmpty()) { + tempDateFormat = new SimpleDateFormat(d); + } else { + throw UserException + .parseError() + .message("Invalid date format") + .build(logger); + } + return tempDateFormat; + } + + public void close() throws Exception { + this.reader.close(); + } +} diff --git a/contrib/format-syslog/src/main/resources/drill-module.conf b/contrib/format-syslog/src/main/resources/drill-module.conf new file mode 100644 index 000000000..d56f649aa --- /dev/null +++ b/contrib/format-syslog/src/main/resources/drill-module.conf @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// This file tells Drill to consider this module when class path scanning. +// This file can also include any supplementary configuration information. +// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information. + +drill.classpath.scanning: { + packages += "org.apache.drill.exec.store.syslog" +} diff --git a/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java b/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java new file mode 100644 index 000000000..ce1af1033 --- /dev/null +++ b/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.syslog; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.server.Drillbit; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.test.ClusterTest; +import org.apache.drill.test.BaseDirTestWatcher; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetBuilder; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.ClassRule; + +public class TestSyslogFormat extends ClusterTest { + + @ClassRule + public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher(); + + @BeforeClass + public static void setup() throws Exception { + ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1)); + defineSyslogPlugin(); + } + + private static void defineSyslogPlugin() throws ExecutionSetupException { + SyslogFormatConfig sampleConfig = new SyslogFormatConfig(); + sampleConfig.setExtension("syslog"); + + SyslogFormatConfig flattenedDataConfig = new SyslogFormatConfig(); + flattenedDataConfig.setExtension("syslog1"); + flattenedDataConfig.setFlattenStructuredData(true); + + // Define a temporary plugin for the "cp" storage plugin. + Drillbit drillbit = cluster.drillbit(); + final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage(); + final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin("cp"); + final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig(); + pluginConfig.getFormats().put("sample", sampleConfig); + pluginConfig.getFormats().put("flat", flattenedDataConfig); + pluginRegistry.createOrUpdate("cp", pluginConfig, false); + } + + @Test + public void testNonComplexFields() throws RpcException { + String sql = "SELECT event_date," + + "severity_code," + + "severity," + + "facility_code," + + "facility," + + "ip," + + "process_id," + + "message_id," + + "structured_data_text " + + "FROM cp.`syslog/logs.syslog`"; + + RowSet results = client.queryBuilder().sql(sql).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("event_date", TypeProtos.MinorType.TIMESTAMP, TypeProtos.DataMode.OPTIONAL) + .add("severity_code", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL) + .add("severity", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("facility_code", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL) + .add("facility", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("ip", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("process_id", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("message_id", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_text", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1065910455003L, 2, "CRIT", 4, "AUTH", "mymachine.example.com", null, "", "") + .addRow(482196050520L, 2, "CRIT", 4, "AUTH", "mymachine.example.com", null, "", "") + .addRow(482196050520L, 2, "CRIT", 4, "AUTH", "mymachine.example.com", null, "", "") + .addRow(1065910455003L, 2, "CRIT", 4, "AUTH", "mymachine.example.com", null, "", "") + .addRow(1061727255000L, 2, "CRIT", 4, "AUTH", "mymachine.example.com", null, "", "") + .addRow(1061727255000L, 5, "NOTICE", 20, "LOCAL4", "192.0.2.1", "8710", "", "") + .addRow(1065910455003L, 5, "NOTICE", 20, "LOCAL4", "mymachine.example.com", null, "", "{examplePriority@32473=[class=high], exampleSDID@32473=[iut=3, eventSource=Application, eventID=1011]}") + .addRow(1065910455003L, 5, "NOTICE", 20, "LOCAL4", "mymachine.example.com", null, "", "{examplePriority@32473=[class=high], exampleSDID@32473=[iut=3, eventSource=Application, eventID=1011]}") + .build(); + + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testStarQuery() throws RpcException { + String sql = "SELECT * FROM cp.`syslog/logs1.syslog`"; + + + RowSet results = client.queryBuilder().sql(sql).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("event_date", TypeProtos.MinorType.TIMESTAMP, TypeProtos.DataMode.OPTIONAL) + .add("severity_code", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL) + .add("facility_code", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL) + .add("severity", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("facility", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("ip", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("app_name", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("message_id", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("message", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("process_id", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1065910455003L, 2, 4, "CRIT", "AUTH", "mymachine.example.com", "su", "ID47", "BOM'su root' failed for lonvick on /dev/pts/8", null) + .addRow(482196050520L, 2, 4, "CRIT", "AUTH", "mymachine.example.com", "su", "ID47", "BOM'su root' failed for lonvick on /dev/pts/8", null) + .addRow(482196050520L, 2, 4, "CRIT", "AUTH", "mymachine.example.com", "su", "ID47", "BOM'su root' failed for lonvick on /dev/pts/8", null) + .addRow(1065910455003L, 2, 4, "CRIT", "AUTH", "mymachine.example.com", "su", "ID47", "BOM'su root' failed for lonvick on /dev/pts/8", null) + .addRow(1061727255000L, 2, 4, "CRIT", "AUTH", "mymachine.example.com", "su", "ID47", "BOM'su root' failed for lonvick on /dev/pts/8", null) + .addRow(1061727255000L, 5, 20, "NOTICE", "LOCAL4", "192.0.2.1", "myproc", null, "%% It's time to make the do-nuts.", "8710") + .build(); + + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testRawQuery() throws RpcException { + String sql = "SELECT _raw FROM cp.`syslog/logs.syslog`"; + + RowSet results = client.queryBuilder().sql(sql).rowSet(); + TupleMetadata expectedSchema = new SchemaBuilder() + .add("_raw", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8") + .addRow("<34>1 1985-04-12T19:20:50.52-04:00 mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8") + .addRow("<34>1 1985-04-12T23:20:50.52Z mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8") + .addRow("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8") + .addRow("<34>1 2003-08-24T05:14:15.000003-07:00 mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8") + .addRow("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc 8710 - - %% It's time to make the do-nuts.") + .addRow("<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog - ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"][examplePriority@32473 class=\"high\"]") + .addRow("<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog - ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"][examplePriority@32473 class=\"high\"] - and thats a wrap!") + .build(); + + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testStructuredDataQuery() throws RpcException { + String sql = "SELECT syslog_data.`structured_data`.`UserAgent` AS UserAgent, " + + "syslog_data.`structured_data`.`UserHostAddress` AS UserHostAddress," + + "syslog_data.`structured_data`.`BrowserSession` AS BrowserSession," + + "syslog_data.`structured_data`.`Realm` AS Realm," + + "syslog_data.`structured_data`.`Appliance` AS Appliance," + + "syslog_data.`structured_data`.`Company` AS Company," + + "syslog_data.`structured_data`.`UserID` AS UserID," + + "syslog_data.`structured_data`.`PEN` AS PEN," + + "syslog_data.`structured_data`.`HostName` AS HostName," + + "syslog_data.`structured_data`.`Category` AS Category," + + "syslog_data.`structured_data`.`Priority` AS Priority " + + "FROM (" + + "SELECT structured_data " + + "FROM cp.`syslog/test.syslog`" + + ") AS syslog_data"; + + + RowSet results = client.queryBuilder().sql(sql).rowSet(); + TupleMetadata expectedSchema = new SchemaBuilder() + .add("UserAgent", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("UserHostAddress", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("BrowserSession", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("Realm", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("Appliance", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("Company", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("UserID", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("PEN", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("HostName", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("Category", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("Priority", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko", "192.168.2.132", "0gvhdi5udjuqtweprbgoxilc", "SecureAuth0", "secureauthqa.gosecureauth.com", "SecureAuth Corporation", "Tester2", "27389", "192.168.2.132", "AUDIT", "4") + .build(); + + new RowSetComparison(expected).verifyAndClearAll(results); + } + + + @Test + public void testStarFlattenedStructuredDataQuery() throws RpcException { + String sql = "SELECT * FROM cp.`syslog/test.syslog1`"; + + RowSet results = client.queryBuilder().sql(sql).rowSet(); + TupleMetadata expectedSchema = new SchemaBuilder() + .add("event_date", TypeProtos.MinorType.TIMESTAMP, TypeProtos.DataMode.OPTIONAL) + .add("severity_code", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL) + .add("facility_code", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL) + .add("severity", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("facility", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("ip", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("app_name", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("process_id", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("message_id", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_text", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_UserAgent", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_UserHostAddress", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_BrowserSession", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_Realm", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_Appliance", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_Company", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_UserID", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_PEN", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_HostName", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_Category", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_Priority", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("message", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1438811939693L, 6, 10, "INFO", "AUTHPRIV", "192.168.2.132", "SecureAuth0", "23108", "ID52020", "{SecureAuth@27389=[UserAgent=Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko, UserHostAddress=192.168.2.132, BrowserSession=0gvhdi5udjuqtweprbgoxilc, Realm=SecureAuth0, Appliance=secureauthqa.gosecureauth.com, Company=SecureAuth Corporation, UserID=Tester2, PEN=27389, HostName=192.168.2.132, Category=AUDIT, Priority=4]}", "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko", "192.168.2.132", "0gvhdi5udjuqtweprbgoxilc", "SecureAuth0", "secureauthqa.gosecureauth.com", "SecureAuth Corporation", "Tester2", "27389", "192.168.2.132", "AUDIT", "4", "Found the user for retrieving user's profile") + .build(); + + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testExplicitFlattenedStructuredDataQuery() throws RpcException { + String sql = "SELECT event_date," + + "severity_code," + + "facility_code," + + "severity," + + "facility," + + "ip," + + "app_name," + + "process_id," + + "message_id," + + "structured_data_text," + + "structured_data_UserAgent," + + "structured_data_UserHostAddress," + + "structured_data_BrowserSession," + + "structured_data_Realm," + + "structured_data_Appliance," + + "structured_data_Company," + + "structured_data_UserID," + + "structured_data_PEN," + + "structured_data_HostName," + + "structured_data_Category," + + "structured_data_Priority," + + "message " + + "FROM cp.`syslog/test.syslog1`"; + + RowSet results = client.queryBuilder().sql(sql).rowSet(); + TupleMetadata expectedSchema = new SchemaBuilder() + .add("event_date", TypeProtos.MinorType.TIMESTAMP, TypeProtos.DataMode.OPTIONAL) + .add("severity_code", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL) + .add("facility_code", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL) + .add("severity", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("facility", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("ip", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("app_name", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("process_id", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("message_id", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_text", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_UserAgent", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_UserHostAddress", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_BrowserSession", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_Realm", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_Appliance", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_Company", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_UserID", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_PEN", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_HostName", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_Category", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("structured_data_Priority", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .add("message", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL) + .buildSchema(); + + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1438811939693L, 6, 10, "INFO", "AUTHPRIV", "192.168.2.132", "SecureAuth0", "23108", "", "{SecureAuth@27389=[UserAgent=Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko, UserHostAddress=192.168.2.132, BrowserSession=0gvhdi5udjuqtweprbgoxilc, Realm=SecureAuth0, Appliance=secureauthqa.gosecureauth.com, Company=SecureAuth Corporation, UserID=Tester2, PEN=27389, HostName=192.168.2.132, Category=AUDIT, Priority=4]}", "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko", "192.168.2.132", "0gvhdi5udjuqtweprbgoxilc", "SecureAuth0", "secureauthqa.gosecureauth.com", "SecureAuth Corporation", "Tester2", "27389", "192.168.2.132", "AUDIT", "4", "Found the user for retrieving user's profile") + .build(); + + new RowSetComparison(expected).verifyAndClearAll(results); + } +} diff --git a/contrib/format-syslog/src/test/resources/syslog/logs.syslog b/contrib/format-syslog/src/test/resources/syslog/logs.syslog new file mode 100644 index 000000000..e52141218 --- /dev/null +++ b/contrib/format-syslog/src/test/resources/syslog/logs.syslog @@ -0,0 +1,8 @@ +<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8 +<34>1 1985-04-12T19:20:50.52-04:00 mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8 +<34>1 1985-04-12T23:20:50.52Z mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8 +<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8 +<34>1 2003-08-24T05:14:15.000003-07:00 mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8 +<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc 8710 - - %% It's time to make the do-nuts. +<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog - ID47 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"][examplePriority@32473 class="high"] +<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog - ID47 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"][examplePriority@32473 class="high"] - and thats a wrap! \ No newline at end of file diff --git a/contrib/format-syslog/src/test/resources/syslog/logs.syslog1 b/contrib/format-syslog/src/test/resources/syslog/logs.syslog1 new file mode 100644 index 000000000..e52141218 --- /dev/null +++ b/contrib/format-syslog/src/test/resources/syslog/logs.syslog1 @@ -0,0 +1,8 @@ +<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8 +<34>1 1985-04-12T19:20:50.52-04:00 mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8 +<34>1 1985-04-12T23:20:50.52Z mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8 +<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8 +<34>1 2003-08-24T05:14:15.000003-07:00 mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8 +<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc 8710 - - %% It's time to make the do-nuts. +<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog - ID47 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"][examplePriority@32473 class="high"] +<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog - ID47 [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"][examplePriority@32473 class="high"] - and thats a wrap! \ No newline at end of file diff --git a/contrib/format-syslog/src/test/resources/syslog/logs1.syslog b/contrib/format-syslog/src/test/resources/syslog/logs1.syslog new file mode 100644 index 000000000..bbfb6ed05 --- /dev/null +++ b/contrib/format-syslog/src/test/resources/syslog/logs1.syslog @@ -0,0 +1,6 @@ +<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8 +<34>1 1985-04-12T19:20:50.52-04:00 mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8 +<34>1 1985-04-12T23:20:50.52Z mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8 +<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8 +<34>1 2003-08-24T05:14:15.000003-07:00 mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8 +<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc 8710 - - %% It's time to make the do-nuts. diff --git a/contrib/format-syslog/src/test/resources/syslog/test.syslog b/contrib/format-syslog/src/test/resources/syslog/test.syslog new file mode 100644 index 000000000..09435157f --- /dev/null +++ b/contrib/format-syslog/src/test/resources/syslog/test.syslog @@ -0,0 +1 @@ +<86>1 2015-08-05T21:58:59.693Z 192.168.2.132 SecureAuth0 23108 ID52020 [SecureAuth@27389 UserAgent="Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko" UserHostAddress="192.168.2.132" BrowserSession="0gvhdi5udjuqtweprbgoxilc" Realm="SecureAuth0" Appliance="secureauthqa.gosecureauth.com" Company="SecureAuth Corporation" UserID="Tester2" PEN="27389" HostName="192.168.2.132" Category="AUDIT" Priority="4"] Found the user for retrieving user's profile \ No newline at end of file diff --git a/contrib/format-syslog/src/test/resources/syslog/test.syslog1 b/contrib/format-syslog/src/test/resources/syslog/test.syslog1 new file mode 100644 index 000000000..d8e19d938 --- /dev/null +++ b/contrib/format-syslog/src/test/resources/syslog/test.syslog1 @@ -0,0 +1,2 @@ +<86>1 2015-08-05T21:58:59.693Z 192.168.2.132 SecureAuth0 23108 ID52020 [SecureAuth@27389 UserAgent="Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko" UserHostAddress="192.168.2.132" BrowserSession="0gvhdi5udjuqtweprbgoxilc" Realm="SecureAuth0" Appliance="secureauthqa.gosecureauth.com" Company="SecureAuth Corporation" UserID="Tester2" PEN="27389" HostName="192.168.2.132" Category="AUDIT" Priority="4"] Found the user for retrieving user's profile +<134>1 2016-04-01T16:44:58Z MacBook-Pro-3 - 94473 - - {"pid":94473,"hostname":"MacBook-Pro-3","level":30,"msg":"hello world","time":1459529098958,"v":1} \ No newline at end of file diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc index ee81ee206..1e0712091 100644 --- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc +++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc @@ -754,7 +754,7 @@ void protobuf_AddDesc_UserBitShared_2eproto() { "entState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCA" "TION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCAN" "CELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_REQ" - "UESTED\020\006*\222\t\n\020CoreOperatorType\022\021\n\rSINGLE_" + "UESTED\020\006*\247\t\n\020CoreOperatorType\022\021\n\rSINGLE_" "SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001\022\n\n\006FILTER" "\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n" "\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITION_SENDER\020\006" @@ -783,11 +783,11 @@ void protobuf_AddDesc_UserBitShared_2eproto() { "TPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_SUB_SCAN\0204\022" "\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PARTITION_LIMI" "T\0206\022\023\n\017PCAPNG_SUB_SCAN\0207\022\022\n\016RUNTIME_FILT" - "ER\0208\022\017\n\013ROWKEY_JOIN\0209*g\n\nSaslStatus\022\020\n\014S" - "ASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN" - "_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FA" - "ILED\020\004B.\n\033org.apache.drill.exec.protoB\rU" - "serBitSharedH\001", 5534); + "ER\0208\022\017\n\013ROWKEY_JOIN\0209\022\023\n\017SYSLOG_SUB_SCAN" + "\020:*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSA" + "SL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL" + "_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apach" + "e.drill.exec.protoB\rUserBitSharedH\001", 5555); ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile( "UserBitShared.proto", &protobuf_RegisterTypes); UserCredentials::default_instance_ = new UserCredentials(); @@ -966,6 +966,7 @@ bool CoreOperatorType_IsValid(int value) { case 55: case 56: case 57: + case 58: return true; default: return false; diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h index 3fa19118c..b95b311c8 100644 --- a/contrib/native/client/src/protobuf/UserBitShared.pb.h +++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h @@ -261,11 +261,12 @@ enum CoreOperatorType { PARTITION_LIMIT = 54, PCAPNG_SUB_SCAN = 55, RUNTIME_FILTER = 56, - ROWKEY_JOIN = 57 + ROWKEY_JOIN = 57, + SYSLOG_SUB_SCAN = 58 }; bool CoreOperatorType_IsValid(int value); const CoreOperatorType CoreOperatorType_MIN = SINGLE_SENDER; -const CoreOperatorType CoreOperatorType_MAX = ROWKEY_JOIN; +const CoreOperatorType CoreOperatorType_MAX = SYSLOG_SUB_SCAN; const int CoreOperatorType_ARRAYSIZE = CoreOperatorType_MAX + 1; const ::google::protobuf::EnumDescriptor* CoreOperatorType_descriptor(); diff --git a/contrib/pom.xml b/contrib/pom.xml index bd879f6c6..0341edb6b 100644 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -41,6 +41,7 @@ storage-hbase format-maprdb + format-syslog storage-hive storage-mongo storage-jdbc diff --git a/distribution/pom.xml b/distribution/pom.xml index 31a64f74f..735eea8a5 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -285,6 +285,11 @@ drill-udfs ${project.version} + + org.apache.drill.contrib + drill-format-syslog + ${project.version} + diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml index a79a4fa35..7174ba223 100644 --- a/distribution/src/assemble/component.xml +++ b/distribution/src/assemble/component.xml @@ -24,28 +24,29 @@ true - - - org.apache.drill.exec:drill-jdbc:jar - org.apache.drill:drill-protocol:jar - org.apache.drill:drill-common:jar - org.apache.drill:drill-logical:jar - org.apache.drill.exec:vector:jar - org.apache.drill.memory:drill-memory-base:jar - org.apache.drill.exec:drill-rpc:jar - org.apache.drill.exec:drill-java-exec:jar - org.apache.drill.contrib.storage-hive:drill-storage-hive-core:jar - org.apache.drill.contrib.storage-hive:drill-hive-exec-shaded:jar - org.apache.drill.contrib.data:tpch-sample-data:jar - org.apache.drill.contrib:drill-mongo-storage:jar - org.apache.drill.contrib:drill-storage-hbase:jar - org.apache.drill.contrib:drill-format-mapr:jar - org.apache.drill.contrib:drill-jdbc-storage:jar - org.apache.drill.contrib:drill-kudu-storage:jar - org.apache.drill.contrib:drill-storage-kafka:jar - org.apache.drill.contrib:drill-opentsdb-storage:jar - org.apache.drill.contrib:drill-udfs:jar - + + + org.apache.drill.exec:drill-jdbc:jar + org.apache.drill:drill-protocol:jar + org.apache.drill:drill-common:jar + org.apache.drill:drill-logical:jar + org.apache.drill.exec:vector:jar + org.apache.drill.memory:drill-memory-base:jar + org.apache.drill.exec:drill-rpc:jar + org.apache.drill.exec:drill-java-exec:jar + org.apache.drill.contrib.storage-hive:drill-storage-hive-core:jar + org.apache.drill.contrib.storage-hive:drill-hive-exec-shaded:jar + org.apache.drill.contrib.data:tpch-sample-data:jar + org.apache.drill.contrib:drill-mongo-storage:jar + org.apache.drill.contrib:drill-storage-hbase:jar + org.apache.drill.contrib:drill-format-mapr:jar + org.apache.drill.contrib:drill-format-syslog:jar + org.apache.drill.contrib:drill-jdbc-storage:jar + org.apache.drill.contrib:drill-kudu-storage:jar + org.apache.drill.contrib:drill-storage-kafka:jar + org.apache.drill.contrib:drill-opentsdb-storage:jar + org.apache.drill.contrib:drill-udfs:jar + false @@ -369,11 +370,11 @@ src/resources/yarn-client-log.xml conf 0640 - + src/resources/drill-am-log.xml conf - 0640 + 0640 src/resources/drill-env.sh diff --git a/pom.xml b/pom.xml index b9f17dab6..678e27cd8 100644 --- a/pom.xml +++ b/pom.xml @@ -340,6 +340,8 @@ **/*.log2 **/*.sqllog **/*.sqllog2 + **/*.syslog + **/*.syslog1 **/*.log **/*.css **/*.js @@ -601,6 +603,8 @@ **/*.woff2 **/*.ks **/*.pcap + **/*.syslog + **/*.syslog1 **/*.props **/*.conf **/*.log diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 34fb47e5d..c540c8f2a 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -597,6 +597,10 @@ public final class UserBitShared { * ROWKEY_JOIN = 57; */ ROWKEY_JOIN(57, 57), + /** + * SYSLOG_SUB_SCAN = 58; + */ + SYSLOG_SUB_SCAN(58, 58), ; /** @@ -831,6 +835,10 @@ public final class UserBitShared { * ROWKEY_JOIN = 57; */ public static final int ROWKEY_JOIN_VALUE = 57; + /** + * SYSLOG_SUB_SCAN = 58; + */ + public static final int SYSLOG_SUB_SCAN_VALUE = 58; public final int getNumber() { return value; } @@ -895,6 +903,7 @@ public final class UserBitShared { case 55: return PCAPNG_SUB_SCAN; case 56: return RUNTIME_FILTER; case 57: return ROWKEY_JOIN; + case 58: return SYSLOG_SUB_SCAN; default: return null; } } @@ -24635,7 +24644,7 @@ public final class UserBitShared { "entState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCA" + "TION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCAN" + "CELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_REQ" + - "UESTED\020\006*\222\t\n\020CoreOperatorType\022\021\n\rSINGLE_" + + "UESTED\020\006*\247\t\n\020CoreOperatorType\022\021\n\rSINGLE_" + "SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001\022\n\n\006FILTER" + "\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n" + "\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITION_SENDER\020\006" + @@ -24664,11 +24673,11 @@ public final class UserBitShared { "TPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_SUB_SCAN\0204\022" + "\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PARTITION_LIMI" + "T\0206\022\023\n\017PCAPNG_SUB_SCAN\0207\022\022\n\016RUNTIME_FILT" + - "ER\0208\022\017\n\013ROWKEY_JOIN\0209*g\n\nSaslStatus\022\020\n\014S" + - "ASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN" + - "_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FA" + - "ILED\020\004B.\n\033org.apache.drill.exec.protoB\rU" + - "serBitSharedH\001" + "ER\0208\022\017\n\013ROWKEY_JOIN\0209\022\023\n\017SYSLOG_SUB_SCAN" + + "\020:*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSA" + + "SL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL" + + "_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apach" + + "e.drill.exec.protoB\rUserBitSharedH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java index 6138ad614..7d5041c5b 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java @@ -79,7 +79,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite