aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2013-10-21 23:03:49 -0700
committerSteven Phillips <sphillips@maprtech.com>2013-10-30 15:31:37 -0700
commitfe94aa8147beb8c67fca5a184748b151c2b4b7ba (patch)
treeaafffaeee1b1adebe36a34591fbce6607129659a /exec/java-exec
parent5ca503c141f76d8c01c89d0e3a58e1c117ef051f (diff)
DRILL-230: Addressing comments in code review, abstract out references to HazelCache and add comments
Diffstat (limited to 'exec/java-exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/cache/Counter.java25
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java28
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java26
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java63
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java91
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java146
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java108
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorWrap.java106
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java26
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java26
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java242
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SortContainerBuilder.java101
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java38
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java4
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java30
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java14
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java)3
-rw-r--r--exec/java-exec/src/test/resources/sender/ordered_exchange.json4
29 files changed, 805 insertions, 381 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/Counter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/Counter.java
new file mode 100644
index 000000000..4568878f6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/Counter.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+public interface Counter {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Counter.class);
+ public long get();
+ public long incrementAndGet();
+ public long decrementAndGet();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
index d1b0e89ad..aed6cc235 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
@@ -18,13 +18,10 @@
package org.apache.drill.exec.cache;
import java.io.Closeable;
-import java.util.List;
import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
-import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
public interface DistributedCache extends Closeable{
@@ -37,4 +34,7 @@ public interface DistributedCache extends Closeable{
public PlanFragment getFragment(FragmentHandle handle);
public void storeFragment(PlanFragment fragment);
+ public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz);
+ public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz);
+ public Counter getCounter(String name);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
new file mode 100644
index 000000000..b7595f91a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.util.concurrent.TimeUnit;
+
+public interface DistributedMap<V> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMap.class);
+ public DrillSerializable get(String key);
+ public void put(String key, DrillSerializable value);
+ public void putIfAbsent(String key, DrillSerializable value);
+ public void putIfAbsent(String key, DrillSerializable value, long ttl, TimeUnit timeUnit);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
new file mode 100644
index 000000000..886f122d7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.util.Collection;
+
+public interface DistributedMultiMap<V> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMultiMap.class);
+ public Collection<DrillSerializable> get(String key);
+ public void put(String key, DrillSerializable value);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
new file mode 100644
index 000000000..534d78106
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Classes that can be put in the Distributed Cache must implement this interface.
+ */
+public interface DrillSerializable {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSerializable.class);
+ public void read(DataInput arg0) throws IOException;
+ public void write (DataOutput arg0) throws IOException;
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java
new file mode 100644
index 000000000..3f2c41c5e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.hazelcast.nio.DataSerializable;
+
+import java.io.*;
+
+/**
+ * Wraps a DrillSerializable object. Objects of this class can be put in the HazelCast implementation of Distributed Cache
+ */
+public abstract class HCDrillSerializableWrapper implements DataSerializable {
+
+ private DrillSerializable obj;
+
+ public HCDrillSerializableWrapper() {}
+
+ public HCDrillSerializableWrapper(DrillSerializable obj) {
+ this.obj = obj;
+ }
+
+ public void readData(DataInput in) throws IOException {
+ obj.read(in);
+ }
+
+ public void writeData(DataOutput out) throws IOException {
+ obj.write(out);
+ }
+
+ public DrillSerializable get() {
+ return obj;
+ }
+
+ /**
+ * This is a method that will get a Class specific implementation of HCDrillSerializableWrapper. Class specific implentations
+ * are necessary because Hazel Cast requires object that have constructors with no parameters.
+ * @param value
+ * @param clazz
+ * @return
+ */
+ public static HCDrillSerializableWrapper getWrapper(DrillSerializable value, Class clazz) {
+ if (clazz.equals(VectorContainerSerializable.class)) {
+ return new HCSerializableWrapperClasses.HCVectorListSerializable(value);
+ } else {
+ throw new UnsupportedOperationException("HCDrillSerializableWrapper not implemented for " + clazz);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java
new file mode 100644
index 000000000..d22723ab0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+public class HCSerializableWrapperClasses {
+ public static class HCVectorListSerializable extends HCDrillSerializableWrapper {
+
+ public HCVectorListSerializable() {
+ super(new VectorContainerSerializable());
+ }
+
+ public HCVectorListSerializable(DrillSerializable obj) {
+ super(obj);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
index 41eca9b26..577dfebb1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -18,22 +18,22 @@
package org.apache.drill.exec.cache;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Lists;
import com.hazelcast.core.*;
-import com.hazelcast.nio.DataSerializable;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.ProtoBufImpl.HWorkQueueStatus;
import org.apache.drill.exec.cache.ProtoBufImpl.HandlePlan;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import com.google.protobuf.InvalidProtocolBufferException;
import com.hazelcast.config.Config;
public class HazelCache implements DistributedCache {
@@ -108,17 +108,88 @@ public class HazelCache implements DistributedCache {
}
- public <K,V extends DataSerializable> MultiMap<K,V> getMultiMap(String name) {
- return this.instance.getMultiMap(name);
+ @Override
+ public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
+ return new HCDistributedMultiMapImpl(this.instance.getMultiMap(clazz.toString()), clazz);
}
- public <K,V extends DataSerializable> IMap<K,V> getMap(String name) {
- return this.instance.getMap(name);
+ @Override
+ public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
+ return new HCDistributedMapImpl(this.instance.getMap(clazz.toString()), clazz);
}
- public AtomicNumber getAtomicNumber(String name) {
- return this.instance.getAtomicNumber(name);
+ @Override
+ public Counter getCounter(String name) {
+ return new HCCounterImpl(this.instance.getAtomicNumber(name));
+ }
+
+ public static class HCDistributedMapImpl<V> implements DistributedMap<V> {
+ private IMap<String, HCDrillSerializableWrapper> m;
+ private Class<V> clazz;
+
+ public HCDistributedMapImpl(IMap m, Class<V> clazz) {
+ this.m = m;
+ this.clazz = clazz;
+ }
+
+ public DrillSerializable get(String key) {
+ return m.get(key).get();
+ }
+
+ public void put(String key, DrillSerializable value) {
+ m.put(key, HCDrillSerializableWrapper.getWrapper(value, clazz));
+ }
+
+ public void putIfAbsent(String key, DrillSerializable value) {
+ m.putIfAbsent(key, HCDrillSerializableWrapper.getWrapper(value, clazz));
+ }
+
+ public void putIfAbsent(String key, DrillSerializable value, long ttl, TimeUnit timeunit) {
+ m.putIfAbsent(key, HCDrillSerializableWrapper.getWrapper(value, clazz), ttl, timeunit);
+ }
+ }
+
+ public static class HCDistributedMultiMapImpl<V> implements DistributedMultiMap<V> {
+ private com.hazelcast.core.MultiMap<String, HCDrillSerializableWrapper> mmap;
+ private Class<V> clazz;
+
+ public HCDistributedMultiMapImpl(com.hazelcast.core.MultiMap mmap, Class<V> clazz) {
+ this.mmap = mmap;
+ this.clazz = clazz;
+ }
+
+ public Collection<DrillSerializable> get(String key) {
+ List<DrillSerializable> list = Lists.newArrayList();
+ for (HCDrillSerializableWrapper v : mmap.get(key)) {
+ list.add(v.get());
+ }
+ return list;
+ }
+
+ @Override
+ public void put(String key, DrillSerializable value) {
+ mmap.put(key, HCDrillSerializableWrapper.getWrapper(value, clazz));
+ }
+ }
+
+ public static class HCCounterImpl implements Counter {
+ private AtomicNumber n;
+
+ public HCCounterImpl(AtomicNumber n) {
+ this.n = n;
+ }
+
+ public long get() {
+ return n.get();
+ }
+
+ public long incrementAndGet() {
+ return n.incrementAndGet();
+ }
+
+ public long decrementAndGet() {
+ return n.decrementAndGet();
+ }
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
index 79675c3a6..7ad6ec687 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -17,9 +17,20 @@
*/
package org.apache.drill.exec.cache;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
@@ -30,6 +41,9 @@ public class LocalCache implements DistributedCache {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class);
private volatile Map<FragmentHandle, PlanFragment> handles;
+ private volatile ConcurrentMap<Class, DistributedMap> maps;
+ private volatile ConcurrentMap<Class, DistributedMultiMap> multiMaps;
+ private volatile ConcurrentMap<String, Counter> counters;
@Override
public void close() throws IOException {
@@ -39,6 +53,9 @@ public class LocalCache implements DistributedCache {
@Override
public void run() throws DrillbitStartupException {
handles = Maps.newConcurrentMap();
+ maps = Maps.newConcurrentMap();
+ multiMaps = Maps.newConcurrentMap();
+ counters = Maps.newConcurrentMap();
}
@Override
@@ -53,5 +70,132 @@ public class LocalCache implements DistributedCache {
handles.put(fragment.getHandle(), fragment);
}
-
+ @Override
+ public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
+ DistributedMultiMap mmap = multiMaps.get(clazz);
+ if (mmap == null) {
+ multiMaps.putIfAbsent(clazz, new LocalDistributedMultiMapImpl<V>(clazz));
+ return multiMaps.get(clazz);
+ } else {
+ return mmap;
+ }
+ }
+
+ @Override
+ public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
+ DistributedMap m = maps.get(clazz);
+ if (m == null) {
+ maps.putIfAbsent(clazz, new LocalDistributedMapImpl(clazz));
+ return maps.get(clazz);
+ } else {
+ return m;
+ }
+ }
+
+ @Override
+ public Counter getCounter(String name) {
+ Counter c = counters.get(name);
+ if (c == null) {
+ counters.putIfAbsent(name, new LocalCounterImpl());
+ return counters.get(name);
+ } else {
+ return c;
+ }
+ }
+
+ public static ByteArrayDataOutput serialize(DrillSerializable obj) {
+ ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ try {
+ obj.write(out);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return out;
+ }
+
+ public static DrillSerializable deserialize(byte[] bytes, Class clazz) {
+ ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
+ try {
+ DrillSerializable obj = (DrillSerializable)clazz.newInstance();
+ obj.read(in);
+ return obj;
+ } catch (InstantiationException | IllegalAccessException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static class LocalDistributedMultiMapImpl<V> implements DistributedMultiMap<V> {
+ private ArrayListMultimap<String, ByteArrayDataOutput> mmap;
+ private Class<DrillSerializable> clazz;
+
+ public LocalDistributedMultiMapImpl(Class clazz) {
+ mmap = ArrayListMultimap.create();
+ this.clazz = clazz;
+ }
+
+ @Override
+ public Collection<DrillSerializable> get(String key) {
+ List<DrillSerializable> list = Lists.newArrayList();
+ for (ByteArrayDataOutput o : mmap.get(key)) {
+ list.add(deserialize(o.toByteArray(), this.clazz));
+ }
+ return list;
+ }
+
+ @Override
+ public void put(String key, DrillSerializable value) {
+ mmap.put(key, serialize(value));
+ }
+ }
+
+ public static class LocalDistributedMapImpl<V> implements DistributedMap<V> {
+ private ConcurrentMap<String, ByteArrayDataOutput> m;
+ private Class<DrillSerializable> clazz;
+
+ public LocalDistributedMapImpl(Class clazz) {
+ m = Maps.newConcurrentMap();
+ this.clazz = clazz;
+ }
+
+ @Override
+ public DrillSerializable get(String key) {
+ if (m.get(key) == null) return null;
+ return deserialize(m.get(key).toByteArray(), this.clazz);
+ }
+
+ @Override
+ public void put(String key, DrillSerializable value) {
+ m.put(key, serialize(value));
+ }
+
+ @Override
+ public void putIfAbsent(String key, DrillSerializable value) {
+ m.putIfAbsent(key, serialize(value));
+ }
+
+ @Override
+ public void putIfAbsent(String key, DrillSerializable value, long ttl, TimeUnit timeUnit) {
+ m.putIfAbsent(key, serialize(value));
+ logger.warn("Expiration not implemented in local map cache");
+ }
+ }
+
+ public static class LocalCounterImpl implements Counter {
+ private AtomicLong al = new AtomicLong();
+
+ @Override
+ public long get() {
+ return al.get();
+ }
+
+ @Override
+ public long incrementAndGet() {
+ return al.incrementAndGet();
+ }
+
+ @Override
+ public long decrementAndGet() {
+ return al.decrementAndGet();
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
new file mode 100644
index 000000000..1e6eeacec
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+
+import java.io.*;
+import java.util.List;
+
+public class VectorContainerSerializable implements DrillSerializable {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainerSerializable.class);
+
+// List<ValueVector> vectorList;
+ private VectorContainer container;
+ private BootStrapContext context;
+ private int listSize = 0;
+ private int recordCount = -1;
+
+ /**
+ *
+ * @param container
+ */
+ public VectorContainerSerializable(VectorContainer container){
+ this.container = container;
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ this.context = new BootStrapContext(DrillConfig.create());
+ this.listSize = container.getNumberOfColumns();
+ }
+
+ public VectorContainerSerializable() {
+ this.container = new VectorContainer();
+ this.context = new BootStrapContext(DrillConfig.create());
+ }
+
+ @Override
+ public void read(DataInput input) throws IOException {
+ List<ValueVector> vectorList = Lists.newArrayList();
+ int size = input.readInt();
+ InputStream stream = DataInputInputStream.constructInputStream(input);
+ for (int i = 0; i < size; i++) {
+ FieldMetadata metaData = FieldMetadata.parseDelimitedFrom(stream);
+ if (recordCount == -1) recordCount = metaData.getValueCount();
+ int dataLength = metaData.getBufferLength();
+ byte[] bytes = new byte[dataLength];
+ input.readFully(bytes);
+ MaterializedField field = MaterializedField.create(metaData.getDef());
+ ByteBuf buf = context.getAllocator().buffer(dataLength);
+ buf.setBytes(0, bytes);
+ ValueVector vector = TypeHelper.getNewVector(field, context.getAllocator());
+ vector.load(metaData, buf);
+ vectorList.add((BaseDataValueVector) vector);
+ }
+ container.addCollection(vectorList);
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ container.setRecordCount(recordCount);
+ this.listSize = vectorList.size();
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+// int size = vectorList.size();
+ output.writeInt(listSize);
+ for (VectorWrapper w : container) {
+ OutputStream stream = DataOutputOutputStream.constructOutputStream(output);
+ ValueVector vector = w.getValueVector();
+ if (recordCount == -1) container.setRecordCount(vector.getMetadata().getValueCount());
+ vector.getMetadata().writeDelimitedTo(stream);
+ ByteBuf[] data = vector.getBuffers();
+ for (ByteBuf bb : data) {
+ byte[] bytes = new byte[bb.readableBytes()];
+ bb.getBytes(0, bytes);
+ stream.write(bytes);
+ }
+ }
+ }
+
+ public VectorContainer get() {
+ return container;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorWrap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorWrap.java
deleted file mode 100644
index 4811ea57a..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorWrap.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import com.beust.jcommander.internal.Lists;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-import com.hazelcast.nio.DataSerializable;
-import io.netty.buffer.ByteBuf;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-public class VectorWrap implements DataSerializable{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorWrap.class);
-
- List<ValueVector> vectorList;
- BootStrapContext context;
- final Parser<FieldMetadata> parser = FieldMetadata.PARSER;
-
- public VectorWrap(List<ValueVector> vectorList){
- this.vectorList = vectorList;
- this.context = new BootStrapContext(DrillConfig.create());
- }
-
- public VectorWrap() {
- this.vectorList = Lists.newArrayList();
- this.context = new BootStrapContext(DrillConfig.create());
- }
-
- @Override
- public void readData(DataInput arg0) throws IOException {
- int size = arg0.readInt();
- for (int i = 0; i < size; i++) {
- int metaLength = arg0.readInt();
- byte[] meta = new byte[metaLength];
- arg0.readFully(meta);
- FieldMetadata metaData = parser.parseFrom(meta);
- int dataLength = metaData.getBufferLength();
- byte[] bytes = new byte[dataLength];
- arg0.readFully(bytes);
- MaterializedField field = MaterializedField.create(metaData.getDef());
- ByteBuf buf = context.getAllocator().buffer(dataLength);
- buf.setBytes(0, bytes);
- ValueVector vector = TypeHelper.getNewVector(field, context.getAllocator());
- vector.load(metaData, buf);
- vectorList.add((BaseDataValueVector) vector);
- }
- }
-
- @Override
- public void writeData(DataOutput arg0) throws IOException {
- int size = vectorList.size();
- arg0.writeInt(size);
- for (ValueVector vector : vectorList) {
- byte[] meta = vector.getMetadata().toByteArray();
- int length = vector.getBufferSize();
- byte[] bytes = new byte[length];
- ByteBuf[] data = vector.getBuffers();
- int pos = 0;
- for (ByteBuf bb : data) {
- bb.getBytes(0, bytes, pos, bb.readableBytes());
- pos += bb.readableBytes();
- }
- arg0.writeInt(meta.length);
- arg0.write(meta);
- arg0.write(bytes);
- }
- }
-
- public List<ValueVector> get() {
- return vectorList;
- }
-
- public void set(List<ValueVector> vectorList) {
- this.vectorList = vectorList;
- }
-
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
index 2ae0bb50d..f0aa85bdc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
import org.apache.drill.common.defs.OrderDef;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
@@ -38,16 +39,34 @@ public class OrderedPartitionExchange extends AbstractExchange {
private final List<OrderDef> orderings;
private final FieldReference ref;
+ private int recordsToSample = 10000; // How many records must be received before analyzing
+ private int samplingFactor = 10; // Will collect SAMPLING_FACTOR * number of partitions to send to distributed cache
+ private float completionFactor = .75f; // What fraction of fragments must be completed before attempting to build partition table
//ephemeral for setup tasks.
private List<DrillbitEndpoint> senderLocations;
private List<DrillbitEndpoint> receiverLocations;
@JsonCreator
- public OrderedPartitionExchange(@JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("ref") FieldReference ref, @JsonProperty("child") PhysicalOperator child) {
+ public OrderedPartitionExchange(@JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("ref") FieldReference ref,
+ @JsonProperty("child") PhysicalOperator child, @JsonProperty("recordsToSample") Integer recordsToSample,
+ @JsonProperty("samplingFactor") Integer samplingFactor, @JsonProperty("completionFactor") Float completionFactor) {
super(child);
this.orderings = orderings;
this.ref = ref;
+ if (recordsToSample != null) {
+ Preconditions.checkArgument(recordsToSample > 0, "recordsToSample must be greater than 0");
+ this.recordsToSample = recordsToSample;
+ }
+ if (samplingFactor != null) {
+ Preconditions.checkArgument(samplingFactor > 0, "samplingFactor must be greater than 0");
+ this.samplingFactor = samplingFactor;
+ }
+ if (completionFactor != null) {
+ Preconditions.checkArgument(completionFactor > 0, "completionFactor must be greater than 0");
+ Preconditions.checkArgument(completionFactor <= 1.0, "completionFactor cannot be greater than 1.0");
+ this.completionFactor = completionFactor;
+ }
}
@Override
@@ -68,7 +87,8 @@ public class OrderedPartitionExchange extends AbstractExchange {
@Override
public Sender getSender(int minorFragmentId, PhysicalOperator child) {
- return new OrderedPartitionSender(orderings, ref, child, receiverLocations, receiverMajorFragmentId, senderLocations.size());
+ return new OrderedPartitionSender(orderings, ref, child, receiverLocations, receiverMajorFragmentId, senderLocations.size(), recordsToSample,
+ samplingFactor, completionFactor);
}
@Override
@@ -78,7 +98,7 @@ public class OrderedPartitionExchange extends AbstractExchange {
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new OrderedPartitionExchange(orderings, ref, child);
+ return new OrderedPartitionExchange(orderings, ref, child, recordsToSample, samplingFactor, completionFactor);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
index f2d7e5881..48412a0d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
@@ -39,10 +39,15 @@ public class OrderedPartitionSender extends AbstractSender {
private final List<DrillbitEndpoint> endpoints;
private final int sendingWidth;
+ private int recordsToSample;
+ private int samplingFactor;
+ private float completionFactor;
+
@JsonCreator
public OrderedPartitionSender(@JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("ref") FieldReference ref, @JsonProperty("child") PhysicalOperator child,
@JsonProperty("destinations") List<DrillbitEndpoint> endpoints, @JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId,
- @JsonProperty("sending-fragment-width") int sendingWidth) {
+ @JsonProperty("sending-fragment-width") int sendingWidth, @JsonProperty("recordsToSample") int recordsToSample,
+ @JsonProperty("samplingFactor") int samplingFactor, @JsonProperty("completionFactor") float completionFactor) {
super(oppositeMajorFragmentId, child);
if (orderings == null) {
this.orderings = Lists.newArrayList();
@@ -52,6 +57,9 @@ public class OrderedPartitionSender extends AbstractSender {
this.ref = ref;
this.endpoints = endpoints;
this.sendingWidth = sendingWidth;
+ this.recordsToSample = recordsToSample;
+ this.samplingFactor = samplingFactor;
+ this.completionFactor = completionFactor;
}
public int getSendingWidth() {
@@ -88,6 +96,20 @@ public class OrderedPartitionSender extends AbstractSender {
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new OrderedPartitionSender(orderings, ref, child, endpoints, oppositeMajorFragmentId, sendingWidth);
+ return new OrderedPartitionSender(orderings, ref, child, endpoints, oppositeMajorFragmentId, sendingWidth, recordsToSample, samplingFactor,
+ completionFactor);
+ }
+
+ public int getRecordsToSample() {
+ return recordsToSample;
+ }
+
+ public int getSamplingFactor() {
+ return samplingFactor;
+ }
+
+ public float getCompletionFactor() {
+ return completionFactor;
}
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index 465bb3d75..b63a4d0f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -33,8 +33,6 @@ import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.util.BatchPrinter;
-import org.apache.drill.exec.vector.ValueVector;
public class WireRecordBatch implements RecordBatch{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WireRecordBatch.class);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
index e22450e0d..8563d1c94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
@@ -44,6 +44,7 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit
private SelectionVectorMode svMode;
private RecordBatch outBatch;
private SchemaPath outputField;
+ private IntVector partitionValues;
public OrderedPartitionProjectorTemplate() throws SchemaChangeException{
}
@@ -60,7 +61,6 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit
@Override
public final int projectRecords(final int recordCount, int firstOutputIndex) {
final int countN = recordCount;
- IntVector partitionValues = (IntVector) outBatch.getValueAccessorById(outBatch.getValueVectorId(outputField).getFieldId(), IntVector.class).getValueVector();
int counter = 0;
for (int i = 0; i < countN; i++, firstOutputIndex++) {
int partition = getPartition(i);
@@ -80,6 +80,7 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit
this.svMode = incoming.getSchema().getSelectionVectorMode();
this.outBatch = outgoing;
this.outputField = outputField;
+ partitionValues = (IntVector) outBatch.getValueAccessorById(outBatch.getValueVectorId(outputField).getFieldId(), IntVector.class).getValueVector();
switch(svMode){
case FOUR_BYTE:
case TWO_BYTE:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index af0f1ffd2..7dc7d559d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -20,20 +20,14 @@ package org.apache.drill.exec.physical.impl.orderedpartitioner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import com.hazelcast.core.AtomicNumber;
-import com.hazelcast.core.IMap;
-import com.hazelcast.core.MultiMap;
import com.sun.codemodel.JConditional;
import com.sun.codemodel.JExpr;
-import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.defs.OrderDef;
import org.apache.drill.common.expression.*;
-import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.logical.data.Order;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.cache.HazelCache;
-import org.apache.drill.exec.cache.VectorWrap;
+import org.apache.drill.exec.cache.*;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -46,7 +40,6 @@ import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
import org.apache.drill.exec.physical.impl.sort.Sorter;
import org.apache.drill.exec.record.*;
import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.util.BatchPrinter;
import org.apache.drill.exec.vector.*;
import java.io.IOException;
@@ -56,20 +49,26 @@ import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+/**
+ * The purpose of this operator is to generate an ordered partition, rather than a random hash partition. This could be used
+ * to do a total order sort, for example.
+ * This operator reads in a few incoming record batches, samples these batches, and stores them in the distributed cache. The samples
+ * from all the parallel-running fragments are merged, and a partition-table is built and stored in the distributed cache for use by all
+ * fragments. A new column is added to the outgoing batch, whose value is determined by where each record falls in the partition table.
+ * This column is used by PartitionSenderRootExec to determine which bucket to assign each record to.
+ */
public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPartitionSender>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionRecordBatch.class);
- public static final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP);
- public static final MappingSet INCOMING_MAPPING = new MappingSet("inIndex", null, "incoming", null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP);
- public static final MappingSet PARTITION_MAPPING = new MappingSet("partitionIndex", null, "partitionVectors", null,
+ public final MappingSet mainMapping = new MappingSet( (String) null, null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP);
+ public final MappingSet incomingMapping = new MappingSet("inIndex", null, "incoming", null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP);
+ public final MappingSet partitionMapping = new MappingSet("partitionIndex", null, "partitionVectors", null,
CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP);
private static long MAX_SORT_BYTES = 8l * 1024 * 1024 * 1024;
- private static int RECORDS_TO_SAMPLE = 5000;
- private static int SAMPLING_FACTOR = 10;
- private static float COMPLETION_FACTOR = .75f;
- private static String SAMPLE_MAP_NAME = "sampleMap";
- private static String TABLE_MAP_NAME = "TableMap";
+ private final int recordsToSample; // How many records must be received before analyzing
+ private final int samplingFactor; // Will collect samplingFactor * number of partitions to send to distributed cache
+ private final float completionFactor; // What fraction of fragments must be completed before attempting to build partition table
protected final RecordBatch incoming;
private boolean first = true;
private OrderedPartitionProjector projector;
@@ -83,23 +82,40 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
private boolean startedUnsampledBatches = false;
private boolean upstreamNone = false;
private int recordCount;
+ private DistributedMap<VectorContainerSerializable> tableMap;
+ private DistributedMultiMap mmap;
+ private String mapKey;
public OrderedPartitionRecordBatch(OrderedPartitionSender pop, RecordBatch incoming, FragmentContext context){
super(pop, context);
this.incoming = incoming;
this.partitions = pop.getDestinations().size();
this.sendingMajorFragmentWidth = pop.getSendingWidth();
+ this.recordsToSample = pop.getRecordsToSample();
+ this.samplingFactor = pop.getSamplingFactor();
+ this.completionFactor = pop.getCompletionFactor();
}
+ /**
+ * This method is called when the first batch comes in. Incoming batches are collected until a threshold is met. At that point,
+ * the records in the batches are sorted and sampled, and the sampled records are stored in the distributed cache. Once a sufficient
+ * fraction of the fragments have shared their samples, each fragment grabs all the samples, sorts all the records, builds a partition
+ * table, and attempts to push the partition table to the distributed cache. Whichever table gets pushed first becomes the table used by all
+ * fragments for partitioning.
+ * @return
+ */
private boolean getPartitionVectors() {
VectorContainer sampleContainer = new VectorContainer();
recordsSampled = 0;
IterOutcome upstream;
+
+ // Start collecting batches until recordsToSample records have been collected
+
builder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES, sampleContainer);
builder.add(incoming);
recordsSampled += incoming.getRecordCount();
try {
- outer: while (recordsSampled < RECORDS_TO_SAMPLE) {
+ outer: while (recordsSampled < recordsToSample) {
upstream = incoming.next();
switch(upstream) {
case NONE:
@@ -111,93 +127,72 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
builder.add(incoming);
recordsSampled += incoming.getRecordCount();
if (upstream == IterOutcome.NONE) break;
- //TODO handle upstream cases
}
builder.build(context);
+
+ // Sort the records according the orderings given in the configuration
+
Sorter sorter = SortBatch.createNewSorter(context, popConfig.getOrderings(), sampleContainer);
SelectionVector4 sv4 = builder.getSv4();
sorter.setup(context, sv4, sampleContainer);
sorter.sort(sv4, sampleContainer);
+ // Project every Nth record to a new vector container, where N = recordsSampled/(samplingFactor * partitions). Uses the
+ // the expressions from the OrderDefs to populate each column. There is one column for each OrderDef in popConfig.orderings.
+
VectorContainer containerToCache = new VectorContainer();
SampleCopier copier = getCopier(sv4, sampleContainer, containerToCache, popConfig.getOrderings());
- copier.copyRecords(recordsSampled/(SAMPLING_FACTOR * partitions), 0, SAMPLING_FACTOR * partitions);
+ copier.copyRecords(recordsSampled/(samplingFactor * partitions), 0, samplingFactor * partitions);
for (VectorWrapper vw : containerToCache) {
vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
}
-// BatchPrinter.printBatch(containerToCache);
-
- HazelCache cache = new HazelCache(DrillConfig.create());
- cache.run();
- String mapKey = String.format("%s_%d", context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId());
- MultiMap<String, VectorWrap> mmap = cache.getMultiMap(SAMPLE_MAP_NAME);
+ // Get a distributed multimap handle from the distributed cache, and put the vectors from the new vector container
+ // into a serializable wrapper object, and then add to distributed map
+ DistributedCache cache = context.getDrillbitContext().getCache();
+ mapKey = String.format("%s_%d", context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId());
+ mmap = cache.getMultiMap(VectorContainerSerializable.class);
List<ValueVector> vectorList = Lists.newArrayList();
for (VectorWrapper vw : containerToCache) {
vectorList.add(vw.getValueVector());
}
- VectorWrap wrap = new VectorWrap(vectorList);
+ VectorContainerSerializable wrap = new VectorContainerSerializable(containerToCache);
mmap.put(mapKey, wrap);
+ wrap = null;
- AtomicNumber minorFragmentSampleCount = cache.getAtomicNumber(mapKey);
+ Counter minorFragmentSampleCount = cache.getCounter(mapKey);
long val = minorFragmentSampleCount.incrementAndGet();
logger.debug("Incremented mfsc, got {}", val);
-
- for (int i = 0; i < 10 && minorFragmentSampleCount.get() < sendingMajorFragmentWidth * COMPLETION_FACTOR; i++) {
- Thread.sleep(10);
- }
-
- Collection<VectorWrap> allSamplesWrap = mmap.get(mapKey);
- VectorContainer allSamplesContainer = new VectorContainer();
- int orderSize = popConfig.getOrderings().size();
- SortContainerBuilder containerBuilder = new SortContainerBuilder(context.getAllocator(), MAX_SORT_BYTES, allSamplesContainer, orderSize);
- for (VectorWrap w : allSamplesWrap) {
- containerBuilder.add(w.get());
- }
- containerBuilder.build(context);
-
-// BatchPrinter.printHyperBatch(allSamplesContainer);
-
- List<OrderDef> orderDefs = Lists.newArrayList();
- int i = 0;
- for (OrderDef od : popConfig.getOrderings()) {
- SchemaPath sp = new SchemaPath("f" + i++, ExpressionPosition.UNKNOWN);
- orderDefs.add(new OrderDef(od.getDirection(), new FieldReference(sp)));
- }
-
- SelectionVector4 newSv4 = containerBuilder.getSv4();
- Sorter sorter2 = SortBatch.createNewSorter(context, orderDefs, allSamplesContainer);
- sorter2.setup(context, newSv4, allSamplesContainer);
- sorter2.sort(newSv4, allSamplesContainer);
-
- VectorContainer candidatePartitionTable = new VectorContainer();
- SampleCopier copier2 = getCopier(newSv4, allSamplesContainer, candidatePartitionTable, orderDefs);
- int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions;
- copier2.copyRecords(skipRecords, skipRecords, partitions - 1);
- assert copier2.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier2.getOutputRecords(), partitions);
- for (VectorWrapper vw : candidatePartitionTable) {
- vw.getValueVector().getMutator().setValueCount(copier2.getOutputRecords());
- }
-
-// BatchPrinter.printBatch(candidatePartitionTable);
-
- vectorList = Lists.newArrayList();
- for (VectorWrapper vw: candidatePartitionTable) {
- vectorList.add(vw.getValueVector());
+ tableMap = cache.getMap(VectorContainerSerializable.class);
+ Preconditions.checkNotNull(tableMap);
+
+ if (val == Math.ceil(sendingMajorFragmentWidth * completionFactor)) {
+ buildTable();
+ wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+ } else if (val < Math.ceil(sendingMajorFragmentWidth * completionFactor)) {
+ // Wait until sufficient number of fragments have submitted samples, or proceed after 100 ms passed
+ for (int i = 0; i < 100 && wrap == null; i++) {
+ Thread.sleep(10);
+ wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+ if (i == 99) {
+ buildTable();
+ wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+ }
+ }
+ } else {
+ wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
}
- wrap = new VectorWrap(vectorList);
- IMap<String, VectorWrap> tableMap = cache.getMap(TABLE_MAP_NAME);
- tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
- wrap = tableMap.get(mapKey + "final");
Preconditions.checkState(wrap != null);
- for (ValueVector vv : wrap.get()) {
- partitionVectors.add(vv);
+
+ // Extract vectors from the wrapper, and add to partition vectors. These vectors will be used for partitioning in the rest of this operator
+ for (VectorWrapper w : wrap.get()) {
+ partitionVectors.add(w.getValueVector());
}
} catch (ClassTransformationException | IOException | SchemaChangeException | InterruptedException ex) {
kill();
@@ -208,6 +203,56 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
return true;
}
+ private void buildTable() throws SchemaChangeException, ClassTransformationException, IOException {
+ // Get all samples from distributed map
+ Collection<DrillSerializable> allSamplesWrap = mmap.get(mapKey);
+ VectorContainer allSamplesContainer = new VectorContainer();
+
+ SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES, allSamplesContainer);
+ for (DrillSerializable w : allSamplesWrap) {
+ containerBuilder.add(((VectorContainerSerializable)w).get());
+ }
+ containerBuilder.build(context);
+
+ List<OrderDef> orderDefs = Lists.newArrayList();
+ int i = 0;
+ for (OrderDef od : popConfig.getOrderings()) {
+ SchemaPath sp = new SchemaPath("f" + i++, ExpressionPosition.UNKNOWN);
+ orderDefs.add(new OrderDef(od.getDirection(), new FieldReference(sp)));
+ }
+
+ SelectionVector4 newSv4 = containerBuilder.getSv4();
+ Sorter sorter2 = SortBatch.createNewSorter(context, orderDefs, allSamplesContainer);
+ sorter2.setup(context, newSv4, allSamplesContainer);
+ sorter2.sort(newSv4, allSamplesContainer);
+
+ // Copy every Nth record from the samples into a candidate partition table, where N = totalSampledRecords/partitions
+ // Attempt to push this to the distributed map. Only the first candidate to get pushed will be used.
+ VectorContainer candidatePartitionTable = new VectorContainer();
+ SampleCopier copier2 = getCopier(newSv4, allSamplesContainer, candidatePartitionTable, orderDefs);
+ int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions;
+ copier2.copyRecords(skipRecords, skipRecords, partitions - 1);
+ assert copier2.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier2.getOutputRecords(), partitions);
+ for (VectorWrapper vw : candidatePartitionTable) {
+ vw.getValueVector().getMutator().setValueCount(copier2.getOutputRecords());
+ }
+
+ VectorContainerSerializable wrap = new VectorContainerSerializable(candidatePartitionTable);
+
+ tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
+ }
+
+ /**
+ * Creates a copier that does a project for every Nth record from a VectorContainer incoming into VectorContainer outgoing. Each OrderDef in orderings
+ * generates a column, and evaluation of the expression associated with each OrderDef determines the value of each column. These records will later be
+ * sorted based on the values in each column, in the same order as the orderings.
+ * @param sv4
+ * @param incoming
+ * @param outgoing
+ * @param orderings
+ * @return
+ * @throws SchemaChangeException
+ */
private SampleCopier getCopier(SelectionVector4 sv4, VectorContainer incoming, VectorContainer outgoing, List<OrderDef> orderings) throws SchemaChangeException{
List<ValueVector> localAllocationVectors = Lists.newArrayList();
final ErrorCollector collector = new ErrorCollectorImpl();
@@ -232,9 +277,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
logger.debug("Added eval.");
}
for (ValueVector vv : localAllocationVectors) {
- AllocationHelper.allocate(vv, SAMPLING_FACTOR * partitions, 50);
+ AllocationHelper.allocate(vv, samplingFactor * partitions, 50);
}
-// outgoing.addCollection(allocationVectors);
outgoing.buildSchema(BatchSchema.SelectionVectorMode.NONE);
try {
SampleCopier sampleCopier = context.getImplementationClass(cg);
@@ -252,11 +296,17 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
@Override
public IterOutcome next() {
+
+ //if we got IterOutcome.NONE while getting partition vectors, and there are no batches on the queue, then we are done
if (upstreamNone && (batchQueue == null || batchQueue.size() == 0)) return IterOutcome.NONE;
+
+ // if there are batches on the queue, process them first, rather than calling incoming.next()
if (batchQueue != null && batchQueue.size() > 0) {
VectorContainer vc = batchQueue.poll();
recordCount = vc.getRecordCount();
try{
+
+ // Must set up a new schema each time, because ValueVectors are not reused between containers in queue
setupNewSchema(vc);
}catch(SchemaChangeException ex){
kill();
@@ -267,15 +317,22 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
doWork(vc);
return IterOutcome.OK_NEW_SCHEMA;
}
+
+ // Reaching this point, either this is the first iteration, or there are no batches left on the queue and there are more incoming
IterOutcome upstream = incoming.next();
recordCount = incoming.getRecordCount();
+
if(this.first && upstream == IterOutcome.OK) {
- upstream = IterOutcome.OK_NEW_SCHEMA;
+ throw new RuntimeException("Invalid state: First batch should have OK_NEW_SCHEMA");
}
+
+ // If this is the first iteration, we need to generate the partition vectors before we can proceed
if(this.first && upstream == IterOutcome.OK_NEW_SCHEMA) {
if (!getPartitionVectors()) return IterOutcome.STOP;
batchQueue = new LinkedBlockingQueue<>(builder.getContainers());
first = false;
+
+ // Now that we have the partition vectors, we immediately process the first batch on the queue
VectorContainer vc = batchQueue.poll();
try{
setupNewSchema(vc);
@@ -288,6 +345,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
doWork(vc);
return IterOutcome.OK_NEW_SCHEMA;
}
+
+ // if this now that all the batches on the queue are processed, we begin processing the incoming batches. For the first one
+ // we need to generate a new schema, even if the outcome is IterOutcome.OK After that we can reuse the schema.
if (this.startedUnsampledBatches == false) {
this.startedUnsampledBatches = true;
if (upstream == IterOutcome.OK) upstream = IterOutcome.OK_NEW_SCHEMA;
@@ -333,16 +393,12 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
}
}
- /** hack to make ref and full work together... need to figure out if this is still necessary. **/
- private FieldReference getRef(NamedExpression e){
- FieldReference ref = e.getRef();
- PathSegment seg = ref.getRootSegment();
- if(seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())){
- return new FieldReference(ref.getPath().toString().subSequence(7, ref.getPath().length()), ref.getPosition());
- }
- return ref;
- }
-
+ /**
+ * Sets up projection that will transfer all of the columns in batch, and also populate the partition column based on which
+ * partition a record falls into in the partition table
+ * @param batch
+ * @throws SchemaChangeException
+ */
protected void setupNewSchema(VectorAccessible batch) throws SchemaChangeException{
this.allocationVectors = Lists.newArrayList();
container.clear();
@@ -357,20 +413,18 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
container.add(tp.getTo());
}
- cg.setMappingSet(MAIN_MAPPING);
+ cg.setMappingSet(mainMapping);
int count = 0;
for(OrderDef od : popConfig.getOrderings()){
- // first, we rewrite the evaluation stack for each side of the comparison.
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector);
if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
- cg.setMappingSet(INCOMING_MAPPING);
+ cg.setMappingSet(incomingMapping);
CodeGenerator.HoldingContainer left = cg.addExpr(expr, false);
- cg.setMappingSet(PARTITION_MAPPING);
+ cg.setMappingSet(partitionMapping);
CodeGenerator.HoldingContainer right = cg.addExpr(new ValueVectorReadExpression(new TypedFieldId(expr.getMajorType(), count++)), false);
- cg.setMappingSet(MAIN_MAPPING);
+ cg.setMappingSet(mainMapping);
- // next we wrap the two comparison sides and add the expression block for the comparison.
FunctionCall f = new FunctionCall(ComparatorFunctions.COMPARE_TO, ImmutableList.of((LogicalExpression) new HoldingContainerExpression(left), new HoldingContainerExpression(right)), ExpressionPosition.UNKNOWN);
CodeGenerator.HoldingContainer out = cg.addExpr(f, false);
JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SortContainerBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SortContainerBuilder.java
deleted file mode 100644
index d479108fb..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SortContainerBuilder.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.orderedpartitioner;
-
-import com.beust.jcommander.internal.Lists;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.record.*;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class SortContainerBuilder {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortContainerBuilder.class);
-
- private final ArrayListMultimap<BatchSchema, RecordBatchData> batches = ArrayListMultimap.create();
- private final VectorContainer container;
-
- private int recordCount;
- private SelectionVector4 sv4;
- final PreAllocator svAllocator;
- private List<List<ValueVector>> hyperVectors;
-
- public SortContainerBuilder(BufferAllocator a, long maxBytes, VectorContainer container, int fields){
- this.svAllocator = a.getPreAllocator();
- this.container = container;
- this.hyperVectors = Lists.newArrayList();
- for (int i = 0; i < fields; i++) {
- List<ValueVector> list = Lists.newArrayList();
- hyperVectors.add(list);
- }
- }
-
- private long getSize(RecordBatch batch){
- long bytes = 0;
- for(VectorWrapper<?> v : batch){
- bytes += v.getValueVector().getBufferSize();
- }
- return bytes;
- }
-
- public boolean add(List<ValueVector> vectors){
- Preconditions.checkArgument(vectors.size() == hyperVectors.size());
- int i = 0;
- for (ValueVector vv : vectors) {
- if (i == 0) recordCount += vv.getMetadata().getValueCount();
- hyperVectors.get(i++).add(vv);
- }
- return true;
- }
-
- public void build(FragmentContext context) throws SchemaChangeException{
- container.clear();
- svAllocator.preAllocate(recordCount * 4);
- sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
-
- // now we're going to generate the sv4 pointers
- int index = 0;
- int recordBatchId = 0;
- for(ValueVector vv : hyperVectors.get(0)){
- for(int i = 0; i < vv.getMetadata().getValueCount(); i++, index++){
- sv4.set(index, recordBatchId, i);
- }
- recordBatchId++;
- }
-
- for (List<ValueVector> hyperVector : hyperVectors) {
- container.addHyperList(hyperVector);
- }
-
- container.buildSchema(SelectionVectorMode.FOUR_BYTE);
- }
-
- public SelectionVector4 getSv4() {
- return sv4;
- }
-
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 9f38c77da..2647ffc1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -39,7 +39,6 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.bit.BitTunnel;
-import org.apache.drill.exec.util.BatchPrinter;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
import org.apache.drill.exec.work.foreman.ErrorHelper;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index e26ea1b0b..19adee7b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -35,7 +35,6 @@ import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.util.BatchPrinter;
import org.apache.drill.exec.vector.ValueVector;
import com.sun.codemodel.JArray;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
index 7300d9e9d..e01bf376e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
@@ -19,11 +19,8 @@ package org.apache.drill.exec.physical.impl.sort;
import java.util.List;
+import org.apache.drill.exec.record.*;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.vector.ValueVector;
@@ -35,13 +32,17 @@ import com.google.common.collect.Lists;
public class RecordBatchData {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchData.class);
- final List<ValueVector> vectors = Lists.newArrayList();
final SelectionVector2 sv2;
final int recordCount;
- VectorContainer container;
+ VectorContainer container = new VectorContainer();
- public RecordBatchData(RecordBatch batch){
- this.sv2 = batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE ? batch.getSelectionVector2().clone() : null;
+ public RecordBatchData(VectorAccessible batch){
+ List<ValueVector> vectors = Lists.newArrayList();
+ if (batch instanceof RecordBatch && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) {
+ this.sv2 = ((RecordBatch)batch).getSelectionVector2().clone();
+ } else {
+ this.sv2 = null;
+ }
for(VectorWrapper<?> v : batch){
if(v.isHyper()) throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch.");
@@ -49,14 +50,21 @@ public class RecordBatchData {
tp.transfer();
vectors.add(tp.getTo());
}
-
+
+ container.addCollection(vectors);
recordCount = batch.getRecordCount();
+ container.setRecordCount(recordCount);
+ container.buildSchema(batch.getSchema().getSelectionVectorMode());
}
public int getRecordCount(){
return recordCount;
}
public List<ValueVector> getVectors() {
+ List<ValueVector> vectors = Lists.newArrayList();
+ for (VectorWrapper w : container) {
+ vectors.add(w.getValueVector());
+ }
return vectors;
}
@@ -65,16 +73,6 @@ public class RecordBatchData {
}
public VectorContainer getContainer() {
- if (this.container == null) buildContainer();
- return this.container;
- }
-
- private void buildContainer() {
- assert container == null;
- container = new VectorContainer();
- for (ValueVector vv : vectors) {
- container.add(vv);
- }
- container.buildSchema(sv2 == null ? SelectionVectorMode.NONE : SelectionVectorMode.TWO_BYTE);
+ return container;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 4be506586..5478adfed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -158,7 +158,10 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
}
public static Sorter createNewSorter(FragmentContext context, List<OrderDef> orderings, VectorAccessible batch) throws ClassTransformationException, IOException, SchemaChangeException {
- return createNewSorter(context, orderings, batch, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
+ final MappingSet mainMapping = new MappingSet( (String) null, null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP);
+ final MappingSet leftMapping = new MappingSet("leftIndex", null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP);
+ final MappingSet rightMapping = new MappingSet("rightIndex", null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP);
+ return createNewSorter(context, orderings, batch, mainMapping, leftMapping, rightMapping);
}
public static Sorter createNewSorter(FragmentContext context, List<OrderDef> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 4024d5791..cb55a01f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -27,12 +27,8 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.*;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.ValueVector;
@@ -57,7 +53,7 @@ public class SortRecordBatchBuilder {
this.container = container;
}
- private long getSize(RecordBatch batch){
+ private long getSize(VectorAccessible batch){
long bytes = 0;
for(VectorWrapper<?> v : batch){
bytes += v.getValueVector().getBufferSize();
@@ -71,12 +67,15 @@ public class SortRecordBatchBuilder {
* @return True if the requested add completed successfully. Returns false in the case that this builder is full and cannot receive additional packages.
* @throws SchemaChangeException
*/
- public boolean add(RecordBatch batch){
+ public boolean add(VectorAccessible batch){
if(batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch.");
- if (batch.getRecordCount() == 0) return true; // skip over empty record batches.
+ if (batch.getRecordCount() == 0)
+ return true; // skip over empty record batches.
long batchBytes = getSize(batch);
- if (batchBytes == 0) {return true;}
+ if (batchBytes == 0) {
+ return true;
+ }
if(batchBytes + runningBytes > maxBytes) return false; // enough data memory.
if(runningBatches+1 > Character.MAX_VALUE) return false; // allowed in batch.
if(!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available.
@@ -94,6 +93,7 @@ public class SortRecordBatchBuilder {
container.clear();
if(batches.keySet().size() > 1) throw new SchemaChangeException("Sort currently only supports a single schema.");
if(batches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
+ assert batches.keySet().size() > 0;
sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
BatchSchema schema = batches.keySet().iterator().next();
List<RecordBatchData> data = batches.get(schema);
@@ -131,7 +131,7 @@ public class SortRecordBatchBuilder {
// next, we'll create lists of each of the vector types.
ArrayListMultimap<MaterializedField, ValueVector> vectors = ArrayListMultimap.create();
for(RecordBatchData rbd : batches.values()){
- for(ValueVector v : rbd.vectors){
+ for(ValueVector v : rbd.getVectors()){
vectors.put(v.getField(), v);
}
}
@@ -150,7 +150,6 @@ public class SortRecordBatchBuilder {
public List<VectorContainer> getContainers() {
ArrayList containerList = Lists.newArrayList();
- int recordCount = 0;
for (BatchSchema bs : batches.keySet()) {
for (RecordBatchData bd : batches.get(bs)) {
VectorContainer c = bd.getContainer();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
index cb5e20e1d..f9835f683 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
@@ -36,7 +36,7 @@ public abstract class SortTemplate implements Sorter, IndexedSortable{
public void setup(FragmentContext context, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException{
// we pass in the local hyperBatch since that is where we'll be reading data.
- Preconditions.checkArgument(vector4 != null);
+ Preconditions.checkNotNull(vector4);
this.vector4 = vector4;
doSetup(context, hyperBatch, null);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 46c747c45..c13838cc6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -31,7 +31,7 @@ import org.apache.drill.exec.vector.ValueVector;
* A key thing to know is that the Iterator provided by record batch must align with the rank positions of the field ids
* provided utilizing getValueVectorId();
*/
-public interface RecordBatch extends VectorAccessible, Iterable<VectorWrapper<?>>{
+public interface RecordBatch extends VectorAccessible {
/**
* Describes the outcome of a RecordBatch being incremented forward.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 91aa70ba6..462c00a34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -189,4 +189,8 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
w.clear();
}
}
+
+ public int getNumberOfColumns() {
+ return this.wrappers.size();
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
index f08993232..39ec7207b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
@@ -18,26 +18,21 @@
package org.apache.drill.exec.cache;
import com.beust.jcommander.internal.Lists;
-import com.hazelcast.core.MultiMap;
-import com.hazelcast.nio.FastByteArrayInputStream;
-import com.hazelcast.nio.FastByteArrayOutputStream;
-import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.vector.*;
import org.junit.Test;
-import java.io.ByteArrayInputStream;
-import java.io.DataOutput;
import java.util.List;
public class TestVectorCache {
@@ -69,20 +64,17 @@ public class TestVectorCache {
intVector.getMutator().setValueCount(4);
binVector.getMutator().setValueCount(4);
- VectorWrap wrap = new VectorWrap(vectorList);
- /*
- FastByteArrayOutputStream out = new FastByteArrayOutputStream();
- wrap.writeData(out);
- FastByteArrayInputStream in = new FastByteArrayInputStream(out.getBytes());
- VectorWrap newWrap = new VectorWrap();
- newWrap.readData(in);
- */
- MultiMap<String, VectorWrap> mmap = cache.getMultiMap("testMap");
+ VectorContainer container = new VectorContainer();
+ container.addCollection(vectorList);
+ VectorContainerSerializable wrap = new VectorContainerSerializable(container);
+
+ DistributedMultiMap<VectorContainerSerializable> mmap = cache.getMultiMap(VectorContainerSerializable.class);
mmap.put("vectors", wrap);
- VectorWrap newWrap = mmap.get("vectors").iterator().next();
+ VectorContainerSerializable newWrap = (VectorContainerSerializable)mmap.get("vectors").iterator().next();
- List<ValueVector> vectors = newWrap.get();
- for (ValueVector vv : vectors) {
+ VectorContainer newContainer = newWrap.get();
+ for (VectorWrapper w : newContainer) {
+ ValueVector vv = w.getValueVector();
int values = vv.getAccessor().getValueCount();
for (int i = 0; i < values; i++) {
Object o = vv.getAccessor().getObject(i);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java
index 850a40abe..d35a09e98 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java
@@ -46,12 +46,22 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
-
+/**
+ * Tests the OrderedPartitionExchange Operator
+ */
public class TestOrderedPartitionExchange extends PopUnitTestBase {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestOrderedPartitionExchange.class);
+ /**
+ * Starts two drillbits and runs a physical plan with a Mock scan, project, OrderedParititionExchange, Union Exchange,
+ * and sort. The final sort is done first on the partition column, and verifies that the partitions are correct, in that
+ * all rows in partition 0 should come in the sort order before any row in partition 1, etc. Also verifies that the standard
+ * deviation of the size of the partitions is less than one tenth the mean size of the partitions, because we expect all
+ * the partitions to be roughly equal in size.
+ * @throws Exception
+ */
@Test
- public void twoBitTwoExchangeTwoEntryRun() throws Exception {
+ public void twoBitTwoExchangeRun() throws Exception {
RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java
index 063c00abf..aa68752b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java
@@ -28,6 +28,9 @@ import org.apache.drill.exec.vector.ValueVector;
import java.util.List;
+/**
+ * This is a tool for printing the content of record batches to screen. Used for debugging.
+ */
public class BatchPrinter {
public static void printHyperBatch(VectorAccessible batch) {
List<String> columns = Lists.newArrayList();
diff --git a/exec/java-exec/src/test/resources/sender/ordered_exchange.json b/exec/java-exec/src/test/resources/sender/ordered_exchange.json
index 1b3d41e12..129040674 100644
--- a/exec/java-exec/src/test/resources/sender/ordered_exchange.json
+++ b/exec/java-exec/src/test/resources/sender/ordered_exchange.json
@@ -43,7 +43,9 @@
{expr: "col1", order: "ASC"},
{expr: "col2", order: "DESC"}
],
- ref: "partition"
+ ref: "partition",
+ recordsToSample: 15000,
+ completionFactor: 0.8
},
{
@id: 4,