diff options
author | Hanifi Gunes <hgunes@maprtech.com> | 2014-07-25 18:55:03 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-07-26 10:43:31 -0700 |
commit | 1fe9c21a83e7642e31827bc6c582bf6ea0e22bf3 (patch) | |
tree | 1871ac2785881d1bce6558dede453ca0b0ba1a28 /exec/java-exec/src/main | |
parent | 846e291d5f6966a9fdf2b590c597b79b3205bb68 (diff) |
DRILL-1153: skip brackets, braces & respect escape chars in literal strings while splitting JSON records
Diffstat (limited to 'exec/java-exec/src/main')
3 files changed, 178 insertions, 176 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java new file mode 100644 index 000000000..c73fef1cb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitterBase.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex.fn; + +import java.io.IOException; +import java.io.Reader; + +public abstract class JsonRecordSplitterBase implements JsonRecordSplitter { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReaderJSONRecordSplitter.class); + public final static int MAX_RECORD_SIZE = 128*1024; + + private static final int OPEN_CBRACKET = '{'; + private static final int OPEN_BRACKET = '['; + private static final int CLOSE_CBRACKET = '}'; + private static final int CLOSE_BRACKET = ']'; + private static final int ESCAPE = '\\'; + private static final int LITERAL = '"'; + + private static final int SPACE = ' '; + private static final int TAB = '\t'; + private static final int NEW_LINE = '\n'; + private static final int FORM_FEED = '\f'; + private static final int CR = '\r'; + + private long start = 0; + + + protected void preScan() throws IOException { } + + protected void postScan() throws IOException { } + + protected abstract int readNext() throws IOException; + + protected abstract Reader createReader(long maxBytes); + + @Override + public Reader getNextReader() throws IOException { + preScan(); + + boolean isEscaped = false; + boolean inLiteral = false; + boolean inCandidate = false; + boolean found = false; + + long endOffset = start; + int cur; + outside: while(true) { + cur = readNext(); + endOffset++; + + if(cur == -1) { + if(inCandidate){ + found = true; + } + break; + } + + switch(cur) { + case ESCAPE: + isEscaped = !isEscaped; + break; + case LITERAL: + if (!isEscaped) { + inLiteral = !inLiteral; + } + isEscaped = false; + break; + case CLOSE_BRACKET: + case CLOSE_CBRACKET: + inCandidate = !inLiteral; + break; + case OPEN_BRACKET: + case OPEN_CBRACKET: + if(inCandidate){ + found = true; + break outside; + } + break; + + case SPACE: + case TAB: + case NEW_LINE: + case CR: + case FORM_FEED: + break; + + default: + inCandidate = false; + isEscaped = false; + } + } + + if(!found) { + return null; + } + + postScan(); + long maxBytes = endOffset - 1 - start; + start = endOffset; + return createReader(maxBytes); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java index 0cdbf852b..aef8b7562 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java @@ -21,91 +21,37 @@ import java.io.IOException; import java.io.Reader; import java.io.StringReader; -import com.google.common.io.CharStreams; +public class ReaderJSONRecordSplitter extends JsonRecordSplitterBase { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReaderJSONRecordSplitter.class); -public class ReaderJSONRecordSplitter implements JsonRecordSplitter { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReaderJSONRecordSplitter.class); - - private static final int OPEN_CBRACKET = '{'; - private static final int OPEN_BRACKET = '['; - private static final int CLOSE_CBRACKET = '}'; - private static final int CLOSE_BRACKET = ']'; - - private static final int SPACE = ' '; - private static final int TAB = '\t'; - private static final int NEW_LINE = '\n'; - private static final int FORM_FEED = '\f'; - private static final int CR = '\r'; - - private long start = 0; private Reader reader; + public ReaderJSONRecordSplitter(String str){ + this(new StringReader(str)); + } + public ReaderJSONRecordSplitter(Reader reader){ this.reader = reader; } - public ReaderJSONRecordSplitter(String str){ - this.reader = new StringReader(str); + @Override + protected void preScan() throws IOException { + reader.mark(MAX_RECORD_SIZE); } @Override - public Reader getNextReader() throws IOException{ - - boolean inCandidate = false; - boolean found = false; - - reader.mark(128*1024); - long endOffset = start; - outside: while(true){ - int c = reader.read(); -// System.out.println(b); - endOffset++; - - if(c == -1){ - if(inCandidate){ - found = true; - } - break; - } - - switch(c){ - case CLOSE_BRACKET: - case CLOSE_CBRACKET: -// System.out.print("c"); - inCandidate = true; - break; - case OPEN_BRACKET: - case OPEN_CBRACKET: -// System.out.print("o"); - if(inCandidate){ - found = true; - break outside; - } - break; - - case SPACE: - case TAB: - case NEW_LINE: - case CR: - case FORM_FEED: -// System.out.print(' '); - break; - - default: -// System.out.print('-'); - inCandidate = false; - } - } + protected void postScan() throws IOException { + reader.reset(); + } - if(found){ - long maxBytes = endOffset - 1 - start; - start = endOffset; - reader.reset(); - return new LimitedReader(reader, (int) maxBytes); - }else{ - return null; - } + @Override + protected int readNext() throws IOException { + return reader.read(); + } + @Override + protected Reader createReader(long maxBytes) { + return new LimitedReader(reader, (int)maxBytes); } private class LimitedReader extends Reader { @@ -128,11 +74,8 @@ public class ReaderJSONRecordSplitter implements JsonRecordSplitter { bytes++; return incoming.read(); } - - } - @Override public void mark(int readAheadLimit) throws IOException { incoming.mark(readAheadLimit); @@ -158,21 +101,6 @@ public class ReaderJSONRecordSplitter implements JsonRecordSplitter { } @Override - public void close() throws IOException { - } - - } - - public static void main(String[] args) throws Exception{ - String str = " { \"b\": \"hello\", \"c\": \"goodbye\", r: []}\n { \"b\": \"yellow\", \"c\": \"red\"}\n "; - JsonRecordSplitter splitter = new ReaderJSONRecordSplitter(new StringReader(str)); - Reader obj = null; - System.out.println(); - - while( (obj = splitter.getNextReader()) != null){ - System.out.println(); - System.out.println(CharStreams.toString(obj)); - System.out.println("===end obj==="); - } + public void close() throws IOException { } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java index e46e1bdc2..ee9401527 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java @@ -19,94 +19,45 @@ package org.apache.drill.exec.vector.complex.fn; import java.io.BufferedInputStream; import java.io.BufferedReader; -import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Charsets; -import com.google.common.io.CharStreams; -public class UTF8JsonRecordSplitter implements JsonRecordSplitter { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UTF8JsonRecordSplitter.class); +public class UTF8JsonRecordSplitter extends JsonRecordSplitterBase { + private final static org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UTF8JsonRecordSplitter.class); - private static final int OPEN_CBRACKET = '{'; - private static final int OPEN_BRACKET = '['; - private static final int CLOSE_CBRACKET = '}'; - private static final int CLOSE_BRACKET = ']'; - - private static final int SPACE = ' '; - private static final int TAB = '\t'; - private static final int NEW_LINE = '\n'; - private static final int FORM_FEED = '\f'; - private static final int CR = '\r'; - - private long start = 0; - private InputStream incoming; + private final InputStream incoming; public UTF8JsonRecordSplitter(InputStream incoming){ this.incoming = new BufferedInputStream(incoming); } @Override - public Reader getNextReader() throws IOException{ - - boolean inCandidate = false; - boolean found = false; - - incoming.mark(128*1024); - long endOffset = start; - outside: while(true){ - int b = incoming.read(); -// System.out.println(b); - endOffset++; - - if(b == -1){ - if(inCandidate){ - found = true; - } - break; - } - - switch(b){ - case CLOSE_BRACKET: - case CLOSE_CBRACKET: -// System.out.print("c"); - inCandidate = true; - break; - case OPEN_BRACKET: - case OPEN_CBRACKET: -// System.out.print("o"); - if(inCandidate){ - found = true; - break outside; - } - break; - - case SPACE: - case TAB: - case NEW_LINE: - case CR: - case FORM_FEED: -// System.out.print(' '); - break; + protected void preScan() throws IOException { + incoming.mark(MAX_RECORD_SIZE); + } - default: -// System.out.print('-'); - inCandidate = false; - } - } + @Override + protected void postScan() throws IOException { + incoming.reset(); + } - if(found){ - long maxBytes = endOffset - 1 - start; - start = endOffset; - incoming.reset(); - return new BufferedReader(new InputStreamReader(new DelInputStream(incoming, maxBytes), Charsets.UTF_8)); - }else{ - return null; - } + @Override + protected int readNext() throws IOException { + return incoming.read(); + } + @Override + protected Reader createReader(long maxBytes) { + return new BufferedReader(new InputStreamReader(new DelInputStream(incoming, maxBytes), Charsets.UTF_8)); } private class DelInputStream extends InputStream { @@ -128,23 +79,29 @@ public class UTF8JsonRecordSplitter implements JsonRecordSplitter { bytes++; return incoming.read(); } - - } - } - public static void main(String[] args) throws Exception{ - byte[] str = " { \"b\": \"hello\", \"c\": \"goodbye\", r: []}\n { \"b\": \"yellow\", \"c\": \"red\"}\n ".getBytes(Charsets.UTF_8); - InputStream s = new ByteArrayInputStream(str); + public static void main(String[] args) throws Exception { + String path = "/Users/hgunes/workspaces/mapr/incubator-drill/yelp_academic_dataset_review.json"; + InputStream s = new FileInputStream(new File(path)); JsonRecordSplitter splitter = new UTF8JsonRecordSplitter(s); - Reader obj = null; - System.out.println(); - - while( (obj = splitter.getNextReader()) != null){ - System.out.println(); - System.out.println(CharStreams.toString(obj)); - System.out.println("===end obj==="); + int recordCount = 0; + Reader record = null; + ObjectMapper mapper = new ObjectMapper(); + try { + while ((record = splitter.getNextReader()) != null) { + recordCount++; + JsonNode node = mapper.readTree(record); + out(node.toString()); + } + } catch (Exception e) { + e.printStackTrace(); } + out("last record: " + recordCount); + } + + static void out(Object thing) { + System.out.println(thing); } } |