diff options
author | Steven Phillips <sphillips@maprtech.com> | 2013-10-21 23:03:49 -0700 |
---|---|---|
committer | Steven Phillips <sphillips@maprtech.com> | 2013-10-30 15:31:37 -0700 |
commit | fe94aa8147beb8c67fca5a184748b151c2b4b7ba (patch) | |
tree | aafffaeee1b1adebe36a34591fbce6607129659a /exec/java-exec | |
parent | 5ca503c141f76d8c01c89d0e3a58e1c117ef051f (diff) |
DRILL-230: Addressing comments in code review, abstract out references to HazelCache and add comments
Diffstat (limited to 'exec/java-exec')
29 files changed, 805 insertions, 381 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/Counter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/Counter.java new file mode 100644 index 000000000..4568878f6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/Counter.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.cache; + +public interface Counter { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Counter.class); + public long get(); + public long incrementAndGet(); + public long decrementAndGet(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java index d1b0e89ad..aed6cc235 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java @@ -18,13 +18,10 @@ package org.apache.drill.exec.cache; import java.io.Closeable; -import java.util.List; import org.apache.drill.exec.exception.DrillbitStartupException; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.ExecProtos.PlanFragment; -import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus; public interface DistributedCache extends Closeable{ @@ -37,4 +34,7 @@ public interface DistributedCache extends Closeable{ public PlanFragment getFragment(FragmentHandle handle); public void storeFragment(PlanFragment fragment); + public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz); + public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz); + public Counter getCounter(String name); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java new file mode 100644 index 000000000..b7595f91a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.cache; + +import java.util.concurrent.TimeUnit; + +public interface DistributedMap<V> { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMap.class); + public DrillSerializable get(String key); + public void put(String key, DrillSerializable value); + public void putIfAbsent(String key, DrillSerializable value); + public void putIfAbsent(String key, DrillSerializable value, long ttl, TimeUnit timeUnit); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java new file mode 100644 index 000000000..886f122d7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.cache; + +import java.util.Collection; + +public interface DistributedMultiMap<V> { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMultiMap.class); + public Collection<DrillSerializable> get(String key); + public void put(String key, DrillSerializable value); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java new file mode 100644 index 000000000..534d78106 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.cache; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Classes that can be put in the Distributed Cache must implement this interface. + */ +public interface DrillSerializable { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSerializable.class); + public void read(DataInput arg0) throws IOException; + public void write (DataOutput arg0) throws IOException; +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java new file mode 100644 index 000000000..3f2c41c5e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.cache; + +import com.hazelcast.nio.DataSerializable; + +import java.io.*; + +/** + * Wraps a DrillSerializable object. Objects of this class can be put in the HazelCast implementation of Distributed Cache + */ +public abstract class HCDrillSerializableWrapper implements DataSerializable { + + private DrillSerializable obj; + + public HCDrillSerializableWrapper() {} + + public HCDrillSerializableWrapper(DrillSerializable obj) { + this.obj = obj; + } + + public void readData(DataInput in) throws IOException { + obj.read(in); + } + + public void writeData(DataOutput out) throws IOException { + obj.write(out); + } + + public DrillSerializable get() { + return obj; + } + + /** + * This is a method that will get a Class specific implementation of HCDrillSerializableWrapper. Class specific implentations + * are necessary because Hazel Cast requires object that have constructors with no parameters. + * @param value + * @param clazz + * @return + */ + public static HCDrillSerializableWrapper getWrapper(DrillSerializable value, Class clazz) { + if (clazz.equals(VectorContainerSerializable.class)) { + return new HCSerializableWrapperClasses.HCVectorListSerializable(value); + } else { + throw new UnsupportedOperationException("HCDrillSerializableWrapper not implemented for " + clazz); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java new file mode 100644 index 000000000..d22723ab0 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.cache; + +public class HCSerializableWrapperClasses { + public static class HCVectorListSerializable extends HCDrillSerializableWrapper { + + public HCVectorListSerializable() { + super(new VectorContainerSerializable()); + } + + public HCVectorListSerializable(DrillSerializable obj) { + super(obj); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java index 41eca9b26..577dfebb1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java @@ -18,22 +18,22 @@ package org.apache.drill.exec.cache; import java.io.IOException; +import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; +import com.google.common.collect.Lists; import com.hazelcast.core.*; -import com.hazelcast.nio.DataSerializable; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.cache.ProtoBufImpl.HWorkQueueStatus; import org.apache.drill.exec.cache.ProtoBufImpl.HandlePlan; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.ExecProtos.PlanFragment; import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.protobuf.InvalidProtocolBufferException; import com.hazelcast.config.Config; public class HazelCache implements DistributedCache { @@ -108,17 +108,88 @@ public class HazelCache implements DistributedCache { } - public <K,V extends DataSerializable> MultiMap<K,V> getMultiMap(String name) { - return this.instance.getMultiMap(name); + @Override + public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) { + return new HCDistributedMultiMapImpl(this.instance.getMultiMap(clazz.toString()), clazz); } - public <K,V extends DataSerializable> IMap<K,V> getMap(String name) { - return this.instance.getMap(name); + @Override + public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) { + return new HCDistributedMapImpl(this.instance.getMap(clazz.toString()), clazz); } - public AtomicNumber getAtomicNumber(String name) { - return this.instance.getAtomicNumber(name); + @Override + public Counter getCounter(String name) { + return new HCCounterImpl(this.instance.getAtomicNumber(name)); + } + + public static class HCDistributedMapImpl<V> implements DistributedMap<V> { + private IMap<String, HCDrillSerializableWrapper> m; + private Class<V> clazz; + + public HCDistributedMapImpl(IMap m, Class<V> clazz) { + this.m = m; + this.clazz = clazz; + } + + public DrillSerializable get(String key) { + return m.get(key).get(); + } + + public void put(String key, DrillSerializable value) { + m.put(key, HCDrillSerializableWrapper.getWrapper(value, clazz)); + } + + public void putIfAbsent(String key, DrillSerializable value) { + m.putIfAbsent(key, HCDrillSerializableWrapper.getWrapper(value, clazz)); + } + + public void putIfAbsent(String key, DrillSerializable value, long ttl, TimeUnit timeunit) { + m.putIfAbsent(key, HCDrillSerializableWrapper.getWrapper(value, clazz), ttl, timeunit); + } + } + + public static class HCDistributedMultiMapImpl<V> implements DistributedMultiMap<V> { + private com.hazelcast.core.MultiMap<String, HCDrillSerializableWrapper> mmap; + private Class<V> clazz; + + public HCDistributedMultiMapImpl(com.hazelcast.core.MultiMap mmap, Class<V> clazz) { + this.mmap = mmap; + this.clazz = clazz; + } + + public Collection<DrillSerializable> get(String key) { + List<DrillSerializable> list = Lists.newArrayList(); + for (HCDrillSerializableWrapper v : mmap.get(key)) { + list.add(v.get()); + } + return list; + } + + @Override + public void put(String key, DrillSerializable value) { + mmap.put(key, HCDrillSerializableWrapper.getWrapper(value, clazz)); + } + } + + public static class HCCounterImpl implements Counter { + private AtomicNumber n; + + public HCCounterImpl(AtomicNumber n) { + this.n = n; + } + + public long get() { + return n.get(); + } + + public long incrementAndGet() { + return n.incrementAndGet(); + } + + public long decrementAndGet() { + return n.decrementAndGet(); + } } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java index 79675c3a6..7ad6ec687 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java @@ -17,9 +17,20 @@ */ package org.apache.drill.exec.cache; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.ExecProtos.PlanFragment; @@ -30,6 +41,9 @@ public class LocalCache implements DistributedCache { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class); private volatile Map<FragmentHandle, PlanFragment> handles; + private volatile ConcurrentMap<Class, DistributedMap> maps; + private volatile ConcurrentMap<Class, DistributedMultiMap> multiMaps; + private volatile ConcurrentMap<String, Counter> counters; @Override public void close() throws IOException { @@ -39,6 +53,9 @@ public class LocalCache implements DistributedCache { @Override public void run() throws DrillbitStartupException { handles = Maps.newConcurrentMap(); + maps = Maps.newConcurrentMap(); + multiMaps = Maps.newConcurrentMap(); + counters = Maps.newConcurrentMap(); } @Override @@ -53,5 +70,132 @@ public class LocalCache implements DistributedCache { handles.put(fragment.getHandle(), fragment); } - + @Override + public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) { + DistributedMultiMap mmap = multiMaps.get(clazz); + if (mmap == null) { + multiMaps.putIfAbsent(clazz, new LocalDistributedMultiMapImpl<V>(clazz)); + return multiMaps.get(clazz); + } else { + return mmap; + } + } + + @Override + public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) { + DistributedMap m = maps.get(clazz); + if (m == null) { + maps.putIfAbsent(clazz, new LocalDistributedMapImpl(clazz)); + return maps.get(clazz); + } else { + return m; + } + } + + @Override + public Counter getCounter(String name) { + Counter c = counters.get(name); + if (c == null) { + counters.putIfAbsent(name, new LocalCounterImpl()); + return counters.get(name); + } else { + return c; + } + } + + public static ByteArrayDataOutput serialize(DrillSerializable obj) { + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + try { + obj.write(out); + } catch (IOException e) { + throw new RuntimeException(e); + } + return out; + } + + public static DrillSerializable deserialize(byte[] bytes, Class clazz) { + ByteArrayDataInput in = ByteStreams.newDataInput(bytes); + try { + DrillSerializable obj = (DrillSerializable)clazz.newInstance(); + obj.read(in); + return obj; + } catch (InstantiationException | IllegalAccessException | IOException e) { + throw new RuntimeException(e); + } + } + + public static class LocalDistributedMultiMapImpl<V> implements DistributedMultiMap<V> { + private ArrayListMultimap<String, ByteArrayDataOutput> mmap; + private Class<DrillSerializable> clazz; + + public LocalDistributedMultiMapImpl(Class clazz) { + mmap = ArrayListMultimap.create(); + this.clazz = clazz; + } + + @Override + public Collection<DrillSerializable> get(String key) { + List<DrillSerializable> list = Lists.newArrayList(); + for (ByteArrayDataOutput o : mmap.get(key)) { + list.add(deserialize(o.toByteArray(), this.clazz)); + } + return list; + } + + @Override + public void put(String key, DrillSerializable value) { + mmap.put(key, serialize(value)); + } + } + + public static class LocalDistributedMapImpl<V> implements DistributedMap<V> { + private ConcurrentMap<String, ByteArrayDataOutput> m; + private Class<DrillSerializable> clazz; + + public LocalDistributedMapImpl(Class clazz) { + m = Maps.newConcurrentMap(); + this.clazz = clazz; + } + + @Override + public DrillSerializable get(String key) { + if (m.get(key) == null) return null; + return deserialize(m.get(key).toByteArray(), this.clazz); + } + + @Override + public void put(String key, DrillSerializable value) { + m.put(key, serialize(value)); + } + + @Override + public void putIfAbsent(String key, DrillSerializable value) { + m.putIfAbsent(key, serialize(value)); + } + + @Override + public void putIfAbsent(String key, DrillSerializable value, long ttl, TimeUnit timeUnit) { + m.putIfAbsent(key, serialize(value)); + logger.warn("Expiration not implemented in local map cache"); + } + } + + public static class LocalCounterImpl implements Counter { + private AtomicLong al = new AtomicLong(); + + @Override + public long get() { + return al.get(); + } + + @Override + public long incrementAndGet() { + return al.incrementAndGet(); + } + + @Override + public long decrementAndGet() { + return al.decrementAndGet(); + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java new file mode 100644 index 000000000..1e6eeacec --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.cache; + +import com.google.common.collect.Lists; +import io.netty.buffer.ByteBuf; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.util.DataInputInputStream; +import org.apache.drill.common.util.DataOutputOutputStream; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.server.BootStrapContext; +import org.apache.drill.exec.vector.BaseDataValueVector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.proto.UserBitShared.FieldMetadata; + +import java.io.*; +import java.util.List; + +public class VectorContainerSerializable implements DrillSerializable { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainerSerializable.class); + +// List<ValueVector> vectorList; + private VectorContainer container; + private BootStrapContext context; + private int listSize = 0; + private int recordCount = -1; + + /** + * + * @param container + */ + public VectorContainerSerializable(VectorContainer container){ + this.container = container; + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + this.context = new BootStrapContext(DrillConfig.create()); + this.listSize = container.getNumberOfColumns(); + } + + public VectorContainerSerializable() { + this.container = new VectorContainer(); + this.context = new BootStrapContext(DrillConfig.create()); + } + + @Override + public void read(DataInput input) throws IOException { + List<ValueVector> vectorList = Lists.newArrayList(); + int size = input.readInt(); + InputStream stream = DataInputInputStream.constructInputStream(input); + for (int i = 0; i < size; i++) { + FieldMetadata metaData = FieldMetadata.parseDelimitedFrom(stream); + if (recordCount == -1) recordCount = metaData.getValueCount(); + int dataLength = metaData.getBufferLength(); + byte[] bytes = new byte[dataLength]; + input.readFully(bytes); + MaterializedField field = MaterializedField.create(metaData.getDef()); + ByteBuf buf = context.getAllocator().buffer(dataLength); + buf.setBytes(0, bytes); + ValueVector vector = TypeHelper.getNewVector(field, context.getAllocator()); + vector.load(metaData, buf); + vectorList.add((BaseDataValueVector) vector); + } + container.addCollection(vectorList); + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + container.setRecordCount(recordCount); + this.listSize = vectorList.size(); + } + + @Override + public void write(DataOutput output) throws IOException { +// int size = vectorList.size(); + output.writeInt(listSize); + for (VectorWrapper w : container) { + OutputStream stream = DataOutputOutputStream.constructOutputStream(output); + ValueVector vector = w.getValueVector(); + if (recordCount == -1) container.setRecordCount(vector.getMetadata().getValueCount()); + vector.getMetadata().writeDelimitedTo(stream); + ByteBuf[] data = vector.getBuffers(); + for (ByteBuf bb : data) { + byte[] bytes = new byte[bb.readableBytes()]; + bb.getBytes(0, bytes); + stream.write(bytes); + } + } + } + + public VectorContainer get() { + return container; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorWrap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorWrap.java deleted file mode 100644 index 4811ea57a..000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorWrap.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.cache; - -import com.beust.jcommander.internal.Lists; -import com.google.protobuf.MessageLite; -import com.google.protobuf.Parser; -import com.hazelcast.nio.DataSerializable; -import io.netty.buffer.ByteBuf; -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.proto.UserBitShared; -import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.server.BootStrapContext; -import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.vector.BaseDataValueVector; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.proto.UserBitShared.FieldMetadata; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.List; - -public class VectorWrap implements DataSerializable{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorWrap.class); - - List<ValueVector> vectorList; - BootStrapContext context; - final Parser<FieldMetadata> parser = FieldMetadata.PARSER; - - public VectorWrap(List<ValueVector> vectorList){ - this.vectorList = vectorList; - this.context = new BootStrapContext(DrillConfig.create()); - } - - public VectorWrap() { - this.vectorList = Lists.newArrayList(); - this.context = new BootStrapContext(DrillConfig.create()); - } - - @Override - public void readData(DataInput arg0) throws IOException { - int size = arg0.readInt(); - for (int i = 0; i < size; i++) { - int metaLength = arg0.readInt(); - byte[] meta = new byte[metaLength]; - arg0.readFully(meta); - FieldMetadata metaData = parser.parseFrom(meta); - int dataLength = metaData.getBufferLength(); - byte[] bytes = new byte[dataLength]; - arg0.readFully(bytes); - MaterializedField field = MaterializedField.create(metaData.getDef()); - ByteBuf buf = context.getAllocator().buffer(dataLength); - buf.setBytes(0, bytes); - ValueVector vector = TypeHelper.getNewVector(field, context.getAllocator()); - vector.load(metaData, buf); - vectorList.add((BaseDataValueVector) vector); - } - } - - @Override - public void writeData(DataOutput arg0) throws IOException { - int size = vectorList.size(); - arg0.writeInt(size); - for (ValueVector vector : vectorList) { - byte[] meta = vector.getMetadata().toByteArray(); - int length = vector.getBufferSize(); - byte[] bytes = new byte[length]; - ByteBuf[] data = vector.getBuffers(); - int pos = 0; - for (ByteBuf bb : data) { - bb.getBytes(0, bytes, pos, bb.readableBytes()); - pos += bb.readableBytes(); - } - arg0.writeInt(meta.length); - arg0.write(meta); - arg0.write(bytes); - } - } - - public List<ValueVector> get() { - return vectorList; - } - - public void set(List<ValueVector> vectorList) { - this.vectorList = vectorList; - } - -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java index 2ae0bb50d..f0aa85bdc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.config; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; import org.apache.drill.common.defs.OrderDef; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.LogicalExpression; @@ -38,16 +39,34 @@ public class OrderedPartitionExchange extends AbstractExchange { private final List<OrderDef> orderings; private final FieldReference ref; + private int recordsToSample = 10000; // How many records must be received before analyzing + private int samplingFactor = 10; // Will collect SAMPLING_FACTOR * number of partitions to send to distributed cache + private float completionFactor = .75f; // What fraction of fragments must be completed before attempting to build partition table //ephemeral for setup tasks. private List<DrillbitEndpoint> senderLocations; private List<DrillbitEndpoint> receiverLocations; @JsonCreator - public OrderedPartitionExchange(@JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("ref") FieldReference ref, @JsonProperty("child") PhysicalOperator child) { + public OrderedPartitionExchange(@JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("ref") FieldReference ref, + @JsonProperty("child") PhysicalOperator child, @JsonProperty("recordsToSample") Integer recordsToSample, + @JsonProperty("samplingFactor") Integer samplingFactor, @JsonProperty("completionFactor") Float completionFactor) { super(child); this.orderings = orderings; this.ref = ref; + if (recordsToSample != null) { + Preconditions.checkArgument(recordsToSample > 0, "recordsToSample must be greater than 0"); + this.recordsToSample = recordsToSample; + } + if (samplingFactor != null) { + Preconditions.checkArgument(samplingFactor > 0, "samplingFactor must be greater than 0"); + this.samplingFactor = samplingFactor; + } + if (completionFactor != null) { + Preconditions.checkArgument(completionFactor > 0, "completionFactor must be greater than 0"); + Preconditions.checkArgument(completionFactor <= 1.0, "completionFactor cannot be greater than 1.0"); + this.completionFactor = completionFactor; + } } @Override @@ -68,7 +87,8 @@ public class OrderedPartitionExchange extends AbstractExchange { @Override public Sender getSender(int minorFragmentId, PhysicalOperator child) { - return new OrderedPartitionSender(orderings, ref, child, receiverLocations, receiverMajorFragmentId, senderLocations.size()); + return new OrderedPartitionSender(orderings, ref, child, receiverLocations, receiverMajorFragmentId, senderLocations.size(), recordsToSample, + samplingFactor, completionFactor); } @Override @@ -78,7 +98,7 @@ public class OrderedPartitionExchange extends AbstractExchange { @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - return new OrderedPartitionExchange(orderings, ref, child); + return new OrderedPartitionExchange(orderings, ref, child, recordsToSample, samplingFactor, completionFactor); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java index f2d7e5881..48412a0d0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java @@ -39,10 +39,15 @@ public class OrderedPartitionSender extends AbstractSender { private final List<DrillbitEndpoint> endpoints; private final int sendingWidth; + private int recordsToSample; + private int samplingFactor; + private float completionFactor; + @JsonCreator public OrderedPartitionSender(@JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("ref") FieldReference ref, @JsonProperty("child") PhysicalOperator child, @JsonProperty("destinations") List<DrillbitEndpoint> endpoints, @JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, - @JsonProperty("sending-fragment-width") int sendingWidth) { + @JsonProperty("sending-fragment-width") int sendingWidth, @JsonProperty("recordsToSample") int recordsToSample, + @JsonProperty("samplingFactor") int samplingFactor, @JsonProperty("completionFactor") float completionFactor) { super(oppositeMajorFragmentId, child); if (orderings == null) { this.orderings = Lists.newArrayList(); @@ -52,6 +57,9 @@ public class OrderedPartitionSender extends AbstractSender { this.ref = ref; this.endpoints = endpoints; this.sendingWidth = sendingWidth; + this.recordsToSample = recordsToSample; + this.samplingFactor = samplingFactor; + this.completionFactor = completionFactor; } public int getSendingWidth() { @@ -88,6 +96,20 @@ public class OrderedPartitionSender extends AbstractSender { @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - return new OrderedPartitionSender(orderings, ref, child, endpoints, oppositeMajorFragmentId, sendingWidth); + return new OrderedPartitionSender(orderings, ref, child, endpoints, oppositeMajorFragmentId, sendingWidth, recordsToSample, samplingFactor, + completionFactor); + } + + public int getRecordsToSample() { + return recordsToSample; + } + + public int getSamplingFactor() { + return samplingFactor; + } + + public float getCompletionFactor() { + return completionFactor; } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java index 465bb3d75..b63a4d0f2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java @@ -33,8 +33,6 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.util.BatchPrinter; -import org.apache.drill.exec.vector.ValueVector; public class WireRecordBatch implements RecordBatch{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WireRecordBatch.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java index e22450e0d..8563d1c94 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java @@ -44,6 +44,7 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit private SelectionVectorMode svMode; private RecordBatch outBatch; private SchemaPath outputField; + private IntVector partitionValues; public OrderedPartitionProjectorTemplate() throws SchemaChangeException{ } @@ -60,7 +61,6 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit @Override public final int projectRecords(final int recordCount, int firstOutputIndex) { final int countN = recordCount; - IntVector partitionValues = (IntVector) outBatch.getValueAccessorById(outBatch.getValueVectorId(outputField).getFieldId(), IntVector.class).getValueVector(); int counter = 0; for (int i = 0; i < countN; i++, firstOutputIndex++) { int partition = getPartition(i); @@ -80,6 +80,7 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit this.svMode = incoming.getSchema().getSelectionVectorMode(); this.outBatch = outgoing; this.outputField = outputField; + partitionValues = (IntVector) outBatch.getValueAccessorById(outBatch.getValueVectorId(outputField).getFieldId(), IntVector.class).getValueVector(); switch(svMode){ case FOUR_BYTE: case TWO_BYTE: diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index af0f1ffd2..7dc7d559d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -20,20 +20,14 @@ package org.apache.drill.exec.physical.impl.orderedpartitioner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.hazelcast.core.AtomicNumber; -import com.hazelcast.core.IMap; -import com.hazelcast.core.MultiMap; import com.sun.codemodel.JConditional; import com.sun.codemodel.JExpr; -import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.defs.OrderDef; import org.apache.drill.common.expression.*; -import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.common.logical.data.Order; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; -import org.apache.drill.exec.cache.HazelCache; -import org.apache.drill.exec.cache.VectorWrap; +import org.apache.drill.exec.cache.*; import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; @@ -46,7 +40,6 @@ import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder; import org.apache.drill.exec.physical.impl.sort.Sorter; import org.apache.drill.exec.record.*; import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.util.BatchPrinter; import org.apache.drill.exec.vector.*; import java.io.IOException; @@ -56,20 +49,26 @@ import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +/** + * The purpose of this operator is to generate an ordered partition, rather than a random hash partition. This could be used + * to do a total order sort, for example. + * This operator reads in a few incoming record batches, samples these batches, and stores them in the distributed cache. The samples + * from all the parallel-running fragments are merged, and a partition-table is built and stored in the distributed cache for use by all + * fragments. A new column is added to the outgoing batch, whose value is determined by where each record falls in the partition table. + * This column is used by PartitionSenderRootExec to determine which bucket to assign each record to. + */ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPartitionSender>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionRecordBatch.class); - public static final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP); - public static final MappingSet INCOMING_MAPPING = new MappingSet("inIndex", null, "incoming", null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP); - public static final MappingSet PARTITION_MAPPING = new MappingSet("partitionIndex", null, "partitionVectors", null, + public final MappingSet mainMapping = new MappingSet( (String) null, null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP); + public final MappingSet incomingMapping = new MappingSet("inIndex", null, "incoming", null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP); + public final MappingSet partitionMapping = new MappingSet("partitionIndex", null, "partitionVectors", null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP); private static long MAX_SORT_BYTES = 8l * 1024 * 1024 * 1024; - private static int RECORDS_TO_SAMPLE = 5000; - private static int SAMPLING_FACTOR = 10; - private static float COMPLETION_FACTOR = .75f; - private static String SAMPLE_MAP_NAME = "sampleMap"; - private static String TABLE_MAP_NAME = "TableMap"; + private final int recordsToSample; // How many records must be received before analyzing + private final int samplingFactor; // Will collect samplingFactor * number of partitions to send to distributed cache + private final float completionFactor; // What fraction of fragments must be completed before attempting to build partition table protected final RecordBatch incoming; private boolean first = true; private OrderedPartitionProjector projector; @@ -83,23 +82,40 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart private boolean startedUnsampledBatches = false; private boolean upstreamNone = false; private int recordCount; + private DistributedMap<VectorContainerSerializable> tableMap; + private DistributedMultiMap mmap; + private String mapKey; public OrderedPartitionRecordBatch(OrderedPartitionSender pop, RecordBatch incoming, FragmentContext context){ super(pop, context); this.incoming = incoming; this.partitions = pop.getDestinations().size(); this.sendingMajorFragmentWidth = pop.getSendingWidth(); + this.recordsToSample = pop.getRecordsToSample(); + this.samplingFactor = pop.getSamplingFactor(); + this.completionFactor = pop.getCompletionFactor(); } + /** + * This method is called when the first batch comes in. Incoming batches are collected until a threshold is met. At that point, + * the records in the batches are sorted and sampled, and the sampled records are stored in the distributed cache. Once a sufficient + * fraction of the fragments have shared their samples, each fragment grabs all the samples, sorts all the records, builds a partition + * table, and attempts to push the partition table to the distributed cache. Whichever table gets pushed first becomes the table used by all + * fragments for partitioning. + * @return + */ private boolean getPartitionVectors() { VectorContainer sampleContainer = new VectorContainer(); recordsSampled = 0; IterOutcome upstream; + + // Start collecting batches until recordsToSample records have been collected + builder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES, sampleContainer); builder.add(incoming); recordsSampled += incoming.getRecordCount(); try { - outer: while (recordsSampled < RECORDS_TO_SAMPLE) { + outer: while (recordsSampled < recordsToSample) { upstream = incoming.next(); switch(upstream) { case NONE: @@ -111,93 +127,72 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart builder.add(incoming); recordsSampled += incoming.getRecordCount(); if (upstream == IterOutcome.NONE) break; - //TODO handle upstream cases } builder.build(context); + + // Sort the records according the orderings given in the configuration + Sorter sorter = SortBatch.createNewSorter(context, popConfig.getOrderings(), sampleContainer); SelectionVector4 sv4 = builder.getSv4(); sorter.setup(context, sv4, sampleContainer); sorter.sort(sv4, sampleContainer); + // Project every Nth record to a new vector container, where N = recordsSampled/(samplingFactor * partitions). Uses the + // the expressions from the OrderDefs to populate each column. There is one column for each OrderDef in popConfig.orderings. + VectorContainer containerToCache = new VectorContainer(); SampleCopier copier = getCopier(sv4, sampleContainer, containerToCache, popConfig.getOrderings()); - copier.copyRecords(recordsSampled/(SAMPLING_FACTOR * partitions), 0, SAMPLING_FACTOR * partitions); + copier.copyRecords(recordsSampled/(samplingFactor * partitions), 0, samplingFactor * partitions); for (VectorWrapper vw : containerToCache) { vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords()); } -// BatchPrinter.printBatch(containerToCache); - - HazelCache cache = new HazelCache(DrillConfig.create()); - cache.run(); - String mapKey = String.format("%s_%d", context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId()); - MultiMap<String, VectorWrap> mmap = cache.getMultiMap(SAMPLE_MAP_NAME); + // Get a distributed multimap handle from the distributed cache, and put the vectors from the new vector container + // into a serializable wrapper object, and then add to distributed map + DistributedCache cache = context.getDrillbitContext().getCache(); + mapKey = String.format("%s_%d", context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId()); + mmap = cache.getMultiMap(VectorContainerSerializable.class); List<ValueVector> vectorList = Lists.newArrayList(); for (VectorWrapper vw : containerToCache) { vectorList.add(vw.getValueVector()); } - VectorWrap wrap = new VectorWrap(vectorList); + VectorContainerSerializable wrap = new VectorContainerSerializable(containerToCache); mmap.put(mapKey, wrap); + wrap = null; - AtomicNumber minorFragmentSampleCount = cache.getAtomicNumber(mapKey); + Counter minorFragmentSampleCount = cache.getCounter(mapKey); long val = minorFragmentSampleCount.incrementAndGet(); logger.debug("Incremented mfsc, got {}", val); - - for (int i = 0; i < 10 && minorFragmentSampleCount.get() < sendingMajorFragmentWidth * COMPLETION_FACTOR; i++) { - Thread.sleep(10); - } - - Collection<VectorWrap> allSamplesWrap = mmap.get(mapKey); - VectorContainer allSamplesContainer = new VectorContainer(); - int orderSize = popConfig.getOrderings().size(); - SortContainerBuilder containerBuilder = new SortContainerBuilder(context.getAllocator(), MAX_SORT_BYTES, allSamplesContainer, orderSize); - for (VectorWrap w : allSamplesWrap) { - containerBuilder.add(w.get()); - } - containerBuilder.build(context); - -// BatchPrinter.printHyperBatch(allSamplesContainer); - - List<OrderDef> orderDefs = Lists.newArrayList(); - int i = 0; - for (OrderDef od : popConfig.getOrderings()) { - SchemaPath sp = new SchemaPath("f" + i++, ExpressionPosition.UNKNOWN); - orderDefs.add(new OrderDef(od.getDirection(), new FieldReference(sp))); - } - - SelectionVector4 newSv4 = containerBuilder.getSv4(); - Sorter sorter2 = SortBatch.createNewSorter(context, orderDefs, allSamplesContainer); - sorter2.setup(context, newSv4, allSamplesContainer); - sorter2.sort(newSv4, allSamplesContainer); - - VectorContainer candidatePartitionTable = new VectorContainer(); - SampleCopier copier2 = getCopier(newSv4, allSamplesContainer, candidatePartitionTable, orderDefs); - int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions; - copier2.copyRecords(skipRecords, skipRecords, partitions - 1); - assert copier2.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier2.getOutputRecords(), partitions); - for (VectorWrapper vw : candidatePartitionTable) { - vw.getValueVector().getMutator().setValueCount(copier2.getOutputRecords()); - } - -// BatchPrinter.printBatch(candidatePartitionTable); - - vectorList = Lists.newArrayList(); - for (VectorWrapper vw: candidatePartitionTable) { - vectorList.add(vw.getValueVector()); + tableMap = cache.getMap(VectorContainerSerializable.class); + Preconditions.checkNotNull(tableMap); + + if (val == Math.ceil(sendingMajorFragmentWidth * completionFactor)) { + buildTable(); + wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final"); + } else if (val < Math.ceil(sendingMajorFragmentWidth * completionFactor)) { + // Wait until sufficient number of fragments have submitted samples, or proceed after 100 ms passed + for (int i = 0; i < 100 && wrap == null; i++) { + Thread.sleep(10); + wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final"); + if (i == 99) { + buildTable(); + wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final"); + } + } + } else { + wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final"); } - wrap = new VectorWrap(vectorList); - IMap<String, VectorWrap> tableMap = cache.getMap(TABLE_MAP_NAME); - tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES); - wrap = tableMap.get(mapKey + "final"); Preconditions.checkState(wrap != null); - for (ValueVector vv : wrap.get()) { - partitionVectors.add(vv); + + // Extract vectors from the wrapper, and add to partition vectors. These vectors will be used for partitioning in the rest of this operator + for (VectorWrapper w : wrap.get()) { + partitionVectors.add(w.getValueVector()); } } catch (ClassTransformationException | IOException | SchemaChangeException | InterruptedException ex) { kill(); @@ -208,6 +203,56 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart return true; } + private void buildTable() throws SchemaChangeException, ClassTransformationException, IOException { + // Get all samples from distributed map + Collection<DrillSerializable> allSamplesWrap = mmap.get(mapKey); + VectorContainer allSamplesContainer = new VectorContainer(); + + SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES, allSamplesContainer); + for (DrillSerializable w : allSamplesWrap) { + containerBuilder.add(((VectorContainerSerializable)w).get()); + } + containerBuilder.build(context); + + List<OrderDef> orderDefs = Lists.newArrayList(); + int i = 0; + for (OrderDef od : popConfig.getOrderings()) { + SchemaPath sp = new SchemaPath("f" + i++, ExpressionPosition.UNKNOWN); + orderDefs.add(new OrderDef(od.getDirection(), new FieldReference(sp))); + } + + SelectionVector4 newSv4 = containerBuilder.getSv4(); + Sorter sorter2 = SortBatch.createNewSorter(context, orderDefs, allSamplesContainer); + sorter2.setup(context, newSv4, allSamplesContainer); + sorter2.sort(newSv4, allSamplesContainer); + + // Copy every Nth record from the samples into a candidate partition table, where N = totalSampledRecords/partitions + // Attempt to push this to the distributed map. Only the first candidate to get pushed will be used. + VectorContainer candidatePartitionTable = new VectorContainer(); + SampleCopier copier2 = getCopier(newSv4, allSamplesContainer, candidatePartitionTable, orderDefs); + int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions; + copier2.copyRecords(skipRecords, skipRecords, partitions - 1); + assert copier2.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier2.getOutputRecords(), partitions); + for (VectorWrapper vw : candidatePartitionTable) { + vw.getValueVector().getMutator().setValueCount(copier2.getOutputRecords()); + } + + VectorContainerSerializable wrap = new VectorContainerSerializable(candidatePartitionTable); + + tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES); + } + + /** + * Creates a copier that does a project for every Nth record from a VectorContainer incoming into VectorContainer outgoing. Each OrderDef in orderings + * generates a column, and evaluation of the expression associated with each OrderDef determines the value of each column. These records will later be + * sorted based on the values in each column, in the same order as the orderings. + * @param sv4 + * @param incoming + * @param outgoing + * @param orderings + * @return + * @throws SchemaChangeException + */ private SampleCopier getCopier(SelectionVector4 sv4, VectorContainer incoming, VectorContainer outgoing, List<OrderDef> orderings) throws SchemaChangeException{ List<ValueVector> localAllocationVectors = Lists.newArrayList(); final ErrorCollector collector = new ErrorCollectorImpl(); @@ -232,9 +277,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart logger.debug("Added eval."); } for (ValueVector vv : localAllocationVectors) { - AllocationHelper.allocate(vv, SAMPLING_FACTOR * partitions, 50); + AllocationHelper.allocate(vv, samplingFactor * partitions, 50); } -// outgoing.addCollection(allocationVectors); outgoing.buildSchema(BatchSchema.SelectionVectorMode.NONE); try { SampleCopier sampleCopier = context.getImplementationClass(cg); @@ -252,11 +296,17 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart @Override public IterOutcome next() { + + //if we got IterOutcome.NONE while getting partition vectors, and there are no batches on the queue, then we are done if (upstreamNone && (batchQueue == null || batchQueue.size() == 0)) return IterOutcome.NONE; + + // if there are batches on the queue, process them first, rather than calling incoming.next() if (batchQueue != null && batchQueue.size() > 0) { VectorContainer vc = batchQueue.poll(); recordCount = vc.getRecordCount(); try{ + + // Must set up a new schema each time, because ValueVectors are not reused between containers in queue setupNewSchema(vc); }catch(SchemaChangeException ex){ kill(); @@ -267,15 +317,22 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart doWork(vc); return IterOutcome.OK_NEW_SCHEMA; } + + // Reaching this point, either this is the first iteration, or there are no batches left on the queue and there are more incoming IterOutcome upstream = incoming.next(); recordCount = incoming.getRecordCount(); + if(this.first && upstream == IterOutcome.OK) { - upstream = IterOutcome.OK_NEW_SCHEMA; + throw new RuntimeException("Invalid state: First batch should have OK_NEW_SCHEMA"); } + + // If this is the first iteration, we need to generate the partition vectors before we can proceed if(this.first && upstream == IterOutcome.OK_NEW_SCHEMA) { if (!getPartitionVectors()) return IterOutcome.STOP; batchQueue = new LinkedBlockingQueue<>(builder.getContainers()); first = false; + + // Now that we have the partition vectors, we immediately process the first batch on the queue VectorContainer vc = batchQueue.poll(); try{ setupNewSchema(vc); @@ -288,6 +345,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart doWork(vc); return IterOutcome.OK_NEW_SCHEMA; } + + // if this now that all the batches on the queue are processed, we begin processing the incoming batches. For the first one + // we need to generate a new schema, even if the outcome is IterOutcome.OK After that we can reuse the schema. if (this.startedUnsampledBatches == false) { this.startedUnsampledBatches = true; if (upstream == IterOutcome.OK) upstream = IterOutcome.OK_NEW_SCHEMA; @@ -333,16 +393,12 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } } - /** hack to make ref and full work together... need to figure out if this is still necessary. **/ - private FieldReference getRef(NamedExpression e){ - FieldReference ref = e.getRef(); - PathSegment seg = ref.getRootSegment(); - if(seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())){ - return new FieldReference(ref.getPath().toString().subSequence(7, ref.getPath().length()), ref.getPosition()); - } - return ref; - } - + /** + * Sets up projection that will transfer all of the columns in batch, and also populate the partition column based on which + * partition a record falls into in the partition table + * @param batch + * @throws SchemaChangeException + */ protected void setupNewSchema(VectorAccessible batch) throws SchemaChangeException{ this.allocationVectors = Lists.newArrayList(); container.clear(); @@ -357,20 +413,18 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart container.add(tp.getTo()); } - cg.setMappingSet(MAIN_MAPPING); + cg.setMappingSet(mainMapping); int count = 0; for(OrderDef od : popConfig.getOrderings()){ - // first, we rewrite the evaluation stack for each side of the comparison. final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector); if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); - cg.setMappingSet(INCOMING_MAPPING); + cg.setMappingSet(incomingMapping); CodeGenerator.HoldingContainer left = cg.addExpr(expr, false); - cg.setMappingSet(PARTITION_MAPPING); + cg.setMappingSet(partitionMapping); CodeGenerator.HoldingContainer right = cg.addExpr(new ValueVectorReadExpression(new TypedFieldId(expr.getMajorType(), count++)), false); - cg.setMappingSet(MAIN_MAPPING); + cg.setMappingSet(mainMapping); - // next we wrap the two comparison sides and add the expression block for the comparison. FunctionCall f = new FunctionCall(ComparatorFunctions.COMPARE_TO, ImmutableList.of((LogicalExpression) new HoldingContainerExpression(left), new HoldingContainerExpression(right)), ExpressionPosition.UNKNOWN); CodeGenerator.HoldingContainer out = cg.addExpr(f, false); JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SortContainerBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SortContainerBuilder.java deleted file mode 100644 index d479108fb..000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SortContainerBuilder.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.orderedpartitioner; - -import com.beust.jcommander.internal.Lists; -import com.google.common.base.Preconditions; -import com.google.common.collect.ArrayListMultimap; -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.memory.BufferAllocator.PreAllocator; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.impl.sort.RecordBatchData; -import org.apache.drill.exec.record.*; -import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.vector.ValueVector; - -import java.util.ArrayList; -import java.util.List; - -public class SortContainerBuilder { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortContainerBuilder.class); - - private final ArrayListMultimap<BatchSchema, RecordBatchData> batches = ArrayListMultimap.create(); - private final VectorContainer container; - - private int recordCount; - private SelectionVector4 sv4; - final PreAllocator svAllocator; - private List<List<ValueVector>> hyperVectors; - - public SortContainerBuilder(BufferAllocator a, long maxBytes, VectorContainer container, int fields){ - this.svAllocator = a.getPreAllocator(); - this.container = container; - this.hyperVectors = Lists.newArrayList(); - for (int i = 0; i < fields; i++) { - List<ValueVector> list = Lists.newArrayList(); - hyperVectors.add(list); - } - } - - private long getSize(RecordBatch batch){ - long bytes = 0; - for(VectorWrapper<?> v : batch){ - bytes += v.getValueVector().getBufferSize(); - } - return bytes; - } - - public boolean add(List<ValueVector> vectors){ - Preconditions.checkArgument(vectors.size() == hyperVectors.size()); - int i = 0; - for (ValueVector vv : vectors) { - if (i == 0) recordCount += vv.getMetadata().getValueCount(); - hyperVectors.get(i++).add(vv); - } - return true; - } - - public void build(FragmentContext context) throws SchemaChangeException{ - container.clear(); - svAllocator.preAllocate(recordCount * 4); - sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE); - - // now we're going to generate the sv4 pointers - int index = 0; - int recordBatchId = 0; - for(ValueVector vv : hyperVectors.get(0)){ - for(int i = 0; i < vv.getMetadata().getValueCount(); i++, index++){ - sv4.set(index, recordBatchId, i); - } - recordBatchId++; - } - - for (List<ValueVector> hyperVector : hyperVectors) { - container.addHyperList(hyperVector); - } - - container.buildSchema(SelectionVectorMode.FOUR_BYTE); - } - - public SelectionVector4 getSv4() { - return sv4; - } - -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java index 9f38c77da..2647ffc1f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java @@ -39,7 +39,6 @@ import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.rpc.BaseRpcOutcomeListener; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.bit.BitTunnel; -import org.apache.drill.exec.util.BatchPrinter; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; import org.apache.drill.exec.work.foreman.ErrorHelper; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index e26ea1b0b..19adee7b0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -35,7 +35,6 @@ import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.util.BatchPrinter; import org.apache.drill.exec.vector.ValueVector; import com.sun.codemodel.JArray; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java index 7300d9e9d..e01bf376e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java @@ -19,11 +19,8 @@ package org.apache.drill.exec.physical.impl.sort; import java.util.List; +import org.apache.drill.exec.record.*; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.TransferPair; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.vector.ValueVector; @@ -35,13 +32,17 @@ import com.google.common.collect.Lists; public class RecordBatchData { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchData.class); - final List<ValueVector> vectors = Lists.newArrayList(); final SelectionVector2 sv2; final int recordCount; - VectorContainer container; + VectorContainer container = new VectorContainer(); - public RecordBatchData(RecordBatch batch){ - this.sv2 = batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE ? batch.getSelectionVector2().clone() : null; + public RecordBatchData(VectorAccessible batch){ + List<ValueVector> vectors = Lists.newArrayList(); + if (batch instanceof RecordBatch && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) { + this.sv2 = ((RecordBatch)batch).getSelectionVector2().clone(); + } else { + this.sv2 = null; + } for(VectorWrapper<?> v : batch){ if(v.isHyper()) throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch."); @@ -49,14 +50,21 @@ public class RecordBatchData { tp.transfer(); vectors.add(tp.getTo()); } - + + container.addCollection(vectors); recordCount = batch.getRecordCount(); + container.setRecordCount(recordCount); + container.buildSchema(batch.getSchema().getSelectionVectorMode()); } public int getRecordCount(){ return recordCount; } public List<ValueVector> getVectors() { + List<ValueVector> vectors = Lists.newArrayList(); + for (VectorWrapper w : container) { + vectors.add(w.getValueVector()); + } return vectors; } @@ -65,16 +73,6 @@ public class RecordBatchData { } public VectorContainer getContainer() { - if (this.container == null) buildContainer(); - return this.container; - } - - private void buildContainer() { - assert container == null; - container = new VectorContainer(); - for (ValueVector vv : vectors) { - container.add(vv); - } - container.buildSchema(sv2 == null ? SelectionVectorMode.NONE : SelectionVectorMode.TWO_BYTE); + return container; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index 4be506586..5478adfed 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -158,7 +158,10 @@ public class SortBatch extends AbstractRecordBatch<Sort> { } public static Sorter createNewSorter(FragmentContext context, List<OrderDef> orderings, VectorAccessible batch) throws ClassTransformationException, IOException, SchemaChangeException { - return createNewSorter(context, orderings, batch, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING); + final MappingSet mainMapping = new MappingSet( (String) null, null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP); + final MappingSet leftMapping = new MappingSet("leftIndex", null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP); + final MappingSet rightMapping = new MappingSet("rightIndex", null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP); + return createNewSorter(context, orderings, batch, mainMapping, leftMapping, rightMapping); } public static Sorter createNewSorter(FragmentContext context, List<OrderDef> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java index 4024d5791..cb55a01f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java @@ -27,12 +27,8 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.BufferAllocator.PreAllocator; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.*; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.ValueVector; @@ -57,7 +53,7 @@ public class SortRecordBatchBuilder { this.container = container; } - private long getSize(RecordBatch batch){ + private long getSize(VectorAccessible batch){ long bytes = 0; for(VectorWrapper<?> v : batch){ bytes += v.getValueVector().getBufferSize(); @@ -71,12 +67,15 @@ public class SortRecordBatchBuilder { * @return True if the requested add completed successfully. Returns false in the case that this builder is full and cannot receive additional packages. * @throws SchemaChangeException */ - public boolean add(RecordBatch batch){ + public boolean add(VectorAccessible batch){ if(batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch."); - if (batch.getRecordCount() == 0) return true; // skip over empty record batches. + if (batch.getRecordCount() == 0) + return true; // skip over empty record batches. long batchBytes = getSize(batch); - if (batchBytes == 0) {return true;} + if (batchBytes == 0) { + return true; + } if(batchBytes + runningBytes > maxBytes) return false; // enough data memory. if(runningBatches+1 > Character.MAX_VALUE) return false; // allowed in batch. if(!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available. @@ -94,6 +93,7 @@ public class SortRecordBatchBuilder { container.clear(); if(batches.keySet().size() > 1) throw new SchemaChangeException("Sort currently only supports a single schema."); if(batches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE); + assert batches.keySet().size() > 0; sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE); BatchSchema schema = batches.keySet().iterator().next(); List<RecordBatchData> data = batches.get(schema); @@ -131,7 +131,7 @@ public class SortRecordBatchBuilder { // next, we'll create lists of each of the vector types. ArrayListMultimap<MaterializedField, ValueVector> vectors = ArrayListMultimap.create(); for(RecordBatchData rbd : batches.values()){ - for(ValueVector v : rbd.vectors){ + for(ValueVector v : rbd.getVectors()){ vectors.put(v.getField(), v); } } @@ -150,7 +150,6 @@ public class SortRecordBatchBuilder { public List<VectorContainer> getContainers() { ArrayList containerList = Lists.newArrayList(); - int recordCount = 0; for (BatchSchema bs : batches.keySet()) { for (RecordBatchData bd : batches.get(bs)) { VectorContainer c = bd.getContainer(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java index cb5e20e1d..f9835f683 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java @@ -36,7 +36,7 @@ public abstract class SortTemplate implements Sorter, IndexedSortable{ public void setup(FragmentContext context, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException{ // we pass in the local hyperBatch since that is where we'll be reading data. - Preconditions.checkArgument(vector4 != null); + Preconditions.checkNotNull(vector4); this.vector4 = vector4; doSetup(context, hyperBatch, null); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index 46c747c45..c13838cc6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -31,7 +31,7 @@ import org.apache.drill.exec.vector.ValueVector; * A key thing to know is that the Iterator provided by record batch must align with the rank positions of the field ids * provided utilizing getValueVectorId(); */ -public interface RecordBatch extends VectorAccessible, Iterable<VectorWrapper<?>>{ +public interface RecordBatch extends VectorAccessible { /** * Describes the outcome of a RecordBatch being incremented forward. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 91aa70ba6..462c00a34 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -189,4 +189,8 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess w.clear(); } } + + public int getNumberOfColumns() { + return this.wrappers.size(); + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java index f08993232..39ec7207b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java @@ -18,26 +18,21 @@ package org.apache.drill.exec.cache; import com.beust.jcommander.internal.Lists; -import com.hazelcast.core.MultiMap; -import com.hazelcast.nio.FastByteArrayInputStream; -import com.hazelcast.nio.FastByteArrayOutputStream; -import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.RemoteServiceSet; import org.apache.drill.exec.vector.*; import org.junit.Test; -import java.io.ByteArrayInputStream; -import java.io.DataOutput; import java.util.List; public class TestVectorCache { @@ -69,20 +64,17 @@ public class TestVectorCache { intVector.getMutator().setValueCount(4); binVector.getMutator().setValueCount(4); - VectorWrap wrap = new VectorWrap(vectorList); - /* - FastByteArrayOutputStream out = new FastByteArrayOutputStream(); - wrap.writeData(out); - FastByteArrayInputStream in = new FastByteArrayInputStream(out.getBytes()); - VectorWrap newWrap = new VectorWrap(); - newWrap.readData(in); - */ - MultiMap<String, VectorWrap> mmap = cache.getMultiMap("testMap"); + VectorContainer container = new VectorContainer(); + container.addCollection(vectorList); + VectorContainerSerializable wrap = new VectorContainerSerializable(container); + + DistributedMultiMap<VectorContainerSerializable> mmap = cache.getMultiMap(VectorContainerSerializable.class); mmap.put("vectors", wrap); - VectorWrap newWrap = mmap.get("vectors").iterator().next(); + VectorContainerSerializable newWrap = (VectorContainerSerializable)mmap.get("vectors").iterator().next(); - List<ValueVector> vectors = newWrap.get(); - for (ValueVector vv : vectors) { + VectorContainer newContainer = newWrap.get(); + for (VectorWrapper w : newContainer) { + ValueVector vv = w.getValueVector(); int values = vv.getAccessor().getValueCount(); for (int i = 0; i < values; i++) { Object o = vv.getAccessor().getObject(i); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java index 850a40abe..d35a09e98 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java @@ -46,12 +46,22 @@ import java.util.List; import static org.junit.Assert.assertEquals; - +/** + * Tests the OrderedPartitionExchange Operator + */ public class TestOrderedPartitionExchange extends PopUnitTestBase { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestOrderedPartitionExchange.class); + /** + * Starts two drillbits and runs a physical plan with a Mock scan, project, OrderedParititionExchange, Union Exchange, + * and sort. The final sort is done first on the partition column, and verifies that the partitions are correct, in that + * all rows in partition 0 should come in the sort order before any row in partition 1, etc. Also verifies that the standard + * deviation of the size of the partitions is less than one tenth the mean size of the partitions, because we expect all + * the partitions to be roughly equal in size. + * @throws Exception + */ @Test - public void twoBitTwoExchangeTwoEntryRun() throws Exception { + public void twoBitTwoExchangeRun() throws Exception { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java index 063c00abf..aa68752b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java @@ -28,6 +28,9 @@ import org.apache.drill.exec.vector.ValueVector; import java.util.List; +/** + * This is a tool for printing the content of record batches to screen. Used for debugging. + */ public class BatchPrinter { public static void printHyperBatch(VectorAccessible batch) { List<String> columns = Lists.newArrayList(); diff --git a/exec/java-exec/src/test/resources/sender/ordered_exchange.json b/exec/java-exec/src/test/resources/sender/ordered_exchange.json index 1b3d41e12..129040674 100644 --- a/exec/java-exec/src/test/resources/sender/ordered_exchange.json +++ b/exec/java-exec/src/test/resources/sender/ordered_exchange.json @@ -43,7 +43,9 @@ {expr: "col1", order: "ASC"}, {expr: "col2", order: "DESC"} ], - ref: "partition" + ref: "partition", + recordsToSample: 15000, + completionFactor: 0.8 }, { @id: 4, |