diff options
8 files changed, 226 insertions, 78 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionHelpers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionHelpers.java index 207d96c32..8a9379e3c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionHelpers.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionHelpers.java @@ -23,11 +23,12 @@ import io.netty.util.internal.PlatformDependent; import org.apache.drill.exec.expr.holders.NullableVarCharHolder; import org.apache.drill.exec.expr.holders.VarCharHolder; -import org.apache.drill.exec.memory.BoundsChecking; import org.joda.time.chrono.ISOChronology; import com.google.common.base.Charsets; +import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck; + public class StringFunctionHelpers { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StringFunctionHelpers.class); @@ -205,9 +206,7 @@ public class StringFunctionHelpers { private static final ISOChronology CHRONOLOGY = org.joda.time.chrono.ISOChronology.getInstanceUTC(); public static long getDate(DrillBuf buf, int start, int end){ - if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { - buf.checkBytes(start, end); - } + rangeCheck(buf, start, end); int[] dateFields = memGetDate(buf.memoryAddress(), start, end); return CHRONOLOGY.getDateTimeMillis(dateFields[0], dateFields[1], dateFields[2], 0); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/XXHash.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/XXHash.java index 72ab49227..b8aefde01 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/XXHash.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/XXHash.java @@ -20,9 +20,7 @@ package org.apache.drill.exec.expr.fn.impl; import io.netty.buffer.DrillBuf; import io.netty.util.internal.PlatformDependent; -import org.apache.drill.exec.memory.BoundsChecking; - -import com.google.common.primitives.UnsignedLongs; +import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck; public final class XXHash extends DrillHash{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(XXHash.class); @@ -166,9 +164,7 @@ public final class XXHash extends DrillHash{ } public static long hash64(long start, long end, DrillBuf buffer, long seed){ - if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { - buffer.checkBytes((int)start, (int)end); - } + rangeCheck(buffer, (int)start, (int)end); long s = buffer.memoryAddress() + start; long e = buffer.memoryAddress() + end; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java index 971bb9b73..8c7aad0c5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import org.apache.drill.exec.memory.BoundsChecking; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Seekable; @@ -34,6 +33,8 @@ import org.apache.hadoop.io.compress.CompressionInputStream; import com.google.common.base.Preconditions; import com.univocity.parsers.common.Format; +import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck; + /** * Class that fronts an InputStream to provide a byte consumption interface. * Also manages only reading lines to and from each split. @@ -301,9 +302,7 @@ final class TextInput { throw StreamFinishedPseudoException.INSTANCE; } - if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { - buffer.checkBytes(bufferPtr - 1, bufferPtr); - } + rangeCheck(buffer, bufferPtr - 1, bufferPtr); byte byteChar = PlatformDependent.getByte(bStartMinus1 + bufferPtr); diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java index eda189ec0..115d31e91 100644 --- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java +++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java @@ -110,47 +110,8 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { return addr + index; } - private final void checkIndexD(int index, int fieldLength) { - ensureAccessible(); - if (fieldLength < 0) { - throw new IllegalArgumentException("length: " + fieldLength + " (expected: >= 0)"); - } - if (index < 0 || index > capacity() - fieldLength) { - if (BaseAllocator.DEBUG) { - historicalLog.logHistory(logger); - } - throw new IndexOutOfBoundsException(String.format( - "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity())); - } - } - - /** - * Allows a function to determine whether not reading a particular string of bytes is valid. - * - * Will throw an exception if the memory is not readable for some reason. Only doesn't something in the case that - * AssertionUtil.BOUNDS_CHECKING_ENABLED is true. - * - * @param start - * The starting position of the bytes to be read. - * @param end - * The exclusive endpoint of the bytes to be read. - */ - public void checkBytes(int start, int end) { - if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { - checkIndexD(start, end - start); - } - } - private void chk(int index, int width) { - if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { - checkIndexD(index, width); - } - } - - private void ensure(int width) { - if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { - ensureWritable(width); - } + BoundsChecking.lengthCheck(this, index, width); } /** @@ -581,7 +542,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { @Override public ByteBuf writeShort(int value) { - ensure(2); + BoundsChecking.ensureWritable(this, 2); PlatformDependent.putShort(addr(writerIndex), (short) value); writerIndex += 2; return this; @@ -589,7 +550,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { @Override public ByteBuf writeInt(int value) { - ensure(4); + BoundsChecking.ensureWritable(this, 4); PlatformDependent.putInt(addr(writerIndex), value); writerIndex += 4; return this; @@ -597,7 +558,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { @Override public ByteBuf writeLong(long value) { - ensure(8); + BoundsChecking.ensureWritable(this, 8); PlatformDependent.putLong(addr(writerIndex), value); writerIndex += 8; return this; @@ -605,7 +566,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { @Override public ByteBuf writeChar(int value) { - ensure(2); + BoundsChecking.ensureWritable(this, 2); PlatformDependent.putShort(addr(writerIndex), (short) value); writerIndex += 2; return this; @@ -613,7 +574,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { @Override public ByteBuf writeFloat(float value) { - ensure(4); + BoundsChecking.ensureWritable(this, 4); PlatformDependent.putInt(addr(writerIndex), Float.floatToRawIntBits(value)); writerIndex += 4; return this; @@ -621,7 +582,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable { @Override public ByteBuf writeDouble(double value) { - ensure(8); + BoundsChecking.ensureWritable(this, 8); PlatformDependent.putLong(addr(writerIndex), Double.doubleToRawLongBits(value)); writerIndex += 8; return this; diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BoundsChecking.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BoundsChecking.java index bc611826a..17817722c 100644 --- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BoundsChecking.java +++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BoundsChecking.java @@ -17,19 +17,92 @@ */ package org.apache.drill.exec.memory; +import java.lang.reflect.Field; +import java.util.Formatter; + +import com.google.common.base.Preconditions; + +import io.netty.buffer.AbstractByteBuf; +import io.netty.buffer.DrillBuf; +import io.netty.util.IllegalReferenceCountException; + +import static org.apache.drill.exec.util.SystemPropertyUtil.getBoolean; + public class BoundsChecking { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BoundsChecking.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BoundsChecking.class); - public static final boolean BOUNDS_CHECKING_ENABLED; + public static final String ENABLE_UNSAFE_BOUNDS_CHECK_PROPERTY = "drill.exec.memory.enable_unsafe_bounds_check"; + // for backward compatibility check "drill.enable_unsafe_memory_access" property and enable bounds checking when + // unsafe memory access is explicitly disabled + public static final String ENABLE_UNSAFE_MEMORY_ACCESS_PROPERTY = "drill.enable_unsafe_memory_access"; + public static final boolean BOUNDS_CHECKING_ENABLED = + getBoolean(ENABLE_UNSAFE_BOUNDS_CHECK_PROPERTY, !getBoolean(ENABLE_UNSAFE_MEMORY_ACCESS_PROPERTY, true)); + private static final boolean checkAccessible = getStaticBooleanField(AbstractByteBuf.class, "checkAccessible", false); static { - boolean isAssertEnabled = false; - assert isAssertEnabled = true; - BOUNDS_CHECKING_ENABLED = isAssertEnabled - || !"true".equals(System.getProperty("drill.enable_unsafe_memory_access")); + if (BOUNDS_CHECKING_ENABLED) { + logger.warn("Drill is running with direct memory bounds checking enabled. If this is a production system, disable it."); + } else if (logger.isDebugEnabled()) { + logger.debug("Direct memory bounds checking is disabled."); + } } private BoundsChecking() { } + private static boolean getStaticBooleanField(Class cls, String name, boolean def) { + try { + Field field = cls.getDeclaredField(name); + field.setAccessible(true); + return field.getBoolean(null); + } catch (ReflectiveOperationException e) { + return def; + } + } + + private static void checkIndex(DrillBuf buf, int index, int fieldLength) { + Preconditions.checkNotNull(buf); + if (checkAccessible && buf.refCnt() == 0) { + Formatter formatter = new Formatter().format("%s, refCnt: 0", buf); + if (BaseAllocator.DEBUG) { + formatter.format("%n%s", buf.toVerboseString()); + } + throw new IllegalReferenceCountException(formatter.toString()); + } + if (fieldLength < 0) { + throw new IllegalArgumentException(String.format("length: %d (expected: >= 0)", fieldLength)); + } + if (index < 0 || index > buf.capacity() - fieldLength) { + Formatter formatter = new Formatter().format("%s, index: %d, length: %d (expected: range(0, %d))", buf, index, fieldLength, buf.capacity()); + if (BaseAllocator.DEBUG) { + formatter.format("%n%s", buf.toVerboseString()); + } + throw new IndexOutOfBoundsException(formatter.toString()); + } + } + + public static void lengthCheck(DrillBuf buf, int start, int length) { + if (BOUNDS_CHECKING_ENABLED) { + checkIndex(buf, start, length); + } + } + + public static void rangeCheck(DrillBuf buf, int start, int end) { + if (BOUNDS_CHECKING_ENABLED) { + checkIndex(buf, start, end - start); + } + } + + public static void rangeCheck(DrillBuf buf1, int start1, int end1, DrillBuf buf2, int start2, int end2) { + if (BOUNDS_CHECKING_ENABLED) { + checkIndex(buf1, start1, end1 - start1); + checkIndex(buf2, start2, end2 - start2); + } + } + + public static void ensureWritable(DrillBuf buf, int minWritableBytes) { + if (BOUNDS_CHECKING_ENABLED) { + buf.ensureWritable(minWritableBytes); + } + } } diff --git a/exec/memory/base/src/test/java/org/apache/drill/exec/memory/BoundsCheckingTest.java b/exec/memory/base/src/test/java/org/apache/drill/exec/memory/BoundsCheckingTest.java new file mode 100644 index 000000000..6213e7197 --- /dev/null +++ b/exec/memory/base/src/test/java/org/apache/drill/exec/memory/BoundsCheckingTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.drill.exec.memory; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.DrillBuf; +import io.netty.util.IllegalReferenceCountException; + +import static org.junit.Assert.fail; + +public class BoundsCheckingTest +{ + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BoundsCheckingTest.class); + + private static boolean old; + + private RootAllocator allocator; + + private static boolean setBoundsChecking(boolean enabled) throws Exception + { + Field field = BoundsChecking.class.getDeclaredField("BOUNDS_CHECKING_ENABLED"); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + boolean old = field.getBoolean(null); + field.set(null, enabled); + return old; + } + + @BeforeClass + public static void setBoundsCheckingEnabled() throws Exception + { + old = setBoundsChecking(true); + } + + @AfterClass + public static void restoreBoundsChecking() throws Exception + { + setBoundsChecking(old); + } + + @Before + public void setupAllocator() + { + allocator = new RootAllocator(Integer.MAX_VALUE); + } + + @After + public void closeAllocator() + { + allocator.close(); + } + + @Test + public void testLengthCheck() + { + try { + BoundsChecking.lengthCheck(null, 0, 0); + fail("expecting NullPointerException"); + } catch (NullPointerException e) { + logger.debug("", e); + } + + try (DrillBuf buffer = allocator.buffer(1)) { + try { + BoundsChecking.lengthCheck(buffer, 0, -1); + fail("expecting IllegalArgumentException"); + } catch (IllegalArgumentException e) { + logger.debug("", e); + } + BoundsChecking.lengthCheck(buffer, 0, 0); + BoundsChecking.lengthCheck(buffer, 0, 1); + try { + BoundsChecking.lengthCheck(buffer, 0, 2); + fail("expecting IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException e) { + logger.debug("", e); + } + try { + BoundsChecking.lengthCheck(buffer, 2, 0); + fail("expecting IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException e) { + logger.debug("", e); + } + try { + BoundsChecking.lengthCheck(buffer, -1, 0); + fail("expecting IndexOutOfBoundsException"); + } catch (IndexOutOfBoundsException e) { + logger.debug("", e); + } + } + + DrillBuf buffer = allocator.buffer(1); + buffer.release(); + try { + BoundsChecking.lengthCheck(buffer, 0, 0); + fail("expecting IllegalReferenceCountException"); + } catch (IllegalReferenceCountException e) { + logger.debug("", e); + } + } +} diff --git a/exec/vector/src/main/java/org/apache/drill/exec/expr/fn/impl/ByteFunctionHelpers.java b/exec/vector/src/main/java/org/apache/drill/exec/expr/fn/impl/ByteFunctionHelpers.java index bd06ba5c7..0ecd9d19c 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/expr/fn/impl/ByteFunctionHelpers.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/expr/fn/impl/ByteFunctionHelpers.java @@ -21,11 +21,12 @@ package org.apache.drill.exec.expr.fn.impl; import io.netty.buffer.DrillBuf; import io.netty.util.internal.PlatformDependent; -import org.apache.drill.exec.memory.BoundsChecking; import org.apache.drill.exec.util.DecimalUtility; import com.google.common.primitives.UnsignedLongs; +import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck; + public class ByteFunctionHelpers { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ByteFunctionHelpers.class); @@ -41,10 +42,7 @@ public class ByteFunctionHelpers { * @return 1 if left input is greater, -1 if left input is smaller, 0 otherwise */ public static final int equal(final DrillBuf left, int lStart, int lEnd, final DrillBuf right, int rStart, int rEnd){ - if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { - left.checkBytes(lStart, lEnd); - right.checkBytes(rStart, rEnd); - } + rangeCheck(left, lStart, lEnd, right, rStart, rEnd); return memEqual(left.memoryAddress(), lStart, lEnd, right.memoryAddress(), rStart, rEnd); } @@ -95,10 +93,7 @@ public class ByteFunctionHelpers { * @return 1 if left input is greater, -1 if left input is smaller, 0 otherwise */ public static final int compare(final DrillBuf left, int lStart, int lEnd, final DrillBuf right, int rStart, int rEnd){ - if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { - left.checkBytes(lStart, lEnd); - right.checkBytes(rStart, rEnd); - } + rangeCheck(left, lStart, lEnd, right, rStart, rEnd); return memcmp(left.memoryAddress(), lStart, lEnd, right.memoryAddress(), rStart, rEnd); } @@ -150,9 +145,7 @@ public class ByteFunctionHelpers { * @return 1 if left input is greater, -1 if left input is smaller, 0 otherwise */ public static final int compare(final DrillBuf left, int lStart, int lEnd, final byte[] right, int rStart, final int rEnd) { - if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { - left.checkBytes(lStart, lEnd); - } + rangeCheck(left, lStart, lEnd); return memcmp(left.memoryAddress(), lStart, lEnd, right, rStart, rEnd); } @@ -462,6 +462,7 @@ </dependencies> <configuration> <argLine>-Xms512m -Xmx${memoryMb}m -Ddrill.exec.http.enabled=false + -Ddrill.exec.memory.enable_unsafe_bounds_check=true -Ddrill.exec.sys.store.provider.local.write=false -Dorg.apache.drill.exec.server.Drillbit.system_options="org.apache.drill.exec.compile.ClassTransformer.scalar_replacement=on" -Ddrill.test.query.printing.silent=true |