diff options
Diffstat (limited to 'logstash-core/src/main/java/org/logstash/ackedqueue')
-rw-r--r-- | logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java | 74 | ||||
-rw-r--r-- | logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java | 43 |
2 files changed, 95 insertions, 22 deletions
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java new file mode 100644 index 00000000..e9517c45 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java @@ -0,0 +1,74 @@ +package org.logstash.ackedqueue; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.jruby.Ruby; +import org.jruby.RubyBasicObject; +import org.jruby.RubyClass; +import org.jruby.anno.JRubyClass; +import org.jruby.anno.JRubyMethod; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.RubyUtil; +import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt; +import org.logstash.execution.AbstractWrappedQueueExt; +import org.logstash.ext.JrubyWrappedSynchronousQueueExt; + +@JRubyClass(name = "QueueFactory") +public final class QueueFactoryExt extends RubyBasicObject { + + public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) { + super(runtime, metaClass); + } + + @JRubyMethod(meta = true) + public static AbstractWrappedQueueExt create(final ThreadContext context, final IRubyObject recv, + final IRubyObject settings) throws IOException { + final String type = getSetting(context, settings, "queue.type").asJavaString(); + if ("persisted".equals(type)) { + final Path queuePath = Paths.get( + getSetting(context, settings, "path.queue").asJavaString(), + getSetting(context, settings, "pipeline.id").asJavaString() + ); + Files.createDirectories(queuePath); + return new JRubyWrappedAckedQueueExt(context.runtime, RubyUtil.WRAPPED_ACKED_QUEUE_CLASS) + .initialize( + context, new IRubyObject[]{ + context.runtime.newString(queuePath.toString()), + getSetting(context, settings, "queue.page_capacity"), + getSetting(context, settings, "queue.max_events"), + getSetting(context, settings, "queue.checkpoint.writes"), + getSetting(context, settings, "queue.checkpoint.acks"), + getSetting(context, settings, "queue.checkpoint.interval"), + getSetting(context, settings, "queue.max_bytes") + } + ); + } else if ("memory".equals(type)) { + return new JrubyWrappedSynchronousQueueExt( + context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS + ).initialize( + context, context.runtime.newFixnum( + getSetting(context, settings, "pipeline.batch.size") + .convertToInteger().getIntValue() + * getSetting(context, settings, "pipeline.workers") + .convertToInteger().getIntValue() + ) + ); + } else { + throw context.runtime.newRaiseException( + RubyUtil.CONFIGURATION_ERROR_CLASS, + String.format( + "Invalid setting `%s` for `queue.type`, supported types are: 'memory' or 'persisted'", + type + ) + ); + } + } + + private static IRubyObject getSetting(final ThreadContext context, final IRubyObject settings, + final String name) { + return settings.callMethod(context, "get_value", context.runtime.newString(name)); + } +} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java index 55c3a04c..f506da4d 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java @@ -6,25 +6,25 @@ import org.jruby.Ruby; import org.jruby.RubyBoolean; import org.jruby.RubyClass; import org.jruby.RubyFixnum; -import org.jruby.RubyObject; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.runtime.Arity; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; +import org.logstash.execution.AbstractWrappedQueueExt; import org.logstash.ext.JrubyAckedReadClientExt; import org.logstash.ext.JrubyAckedWriteClientExt; import org.logstash.ext.JrubyEventExtLibrary; @JRubyClass(name = "WrappedAckedQueue") -public final class JRubyWrappedAckedQueueExt extends RubyObject { +public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt { private JRubyAckedQueueExt queue; private final AtomicBoolean isClosed = new AtomicBoolean(); - @JRubyMethod(name = "initialize", optional = 7) - public IRubyObject ruby_initialize(ThreadContext context, IRubyObject[] args) throws IOException { + @JRubyMethod(optional = 7) + public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] args) throws IOException { args = Arity.scanArgs(context.runtime, args, 7, 0); int capacity = RubyFixnum.num2int(args[1]); int maxEvents = RubyFixnum.num2int(args[2]); @@ -36,7 +36,7 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject { checkpointMaxWrites, checkpointMaxAcks, queueMaxBytes); this.queue.open(); - return context.nil; + return this; } public JRubyWrappedAckedQueueExt(final Ruby runtime, final RubyClass metaClass) { @@ -53,16 +53,6 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject { isClosed.set(true); } - @JRubyMethod(name = "close") - public IRubyObject rubyClose(ThreadContext context) { - try { - close(); - } catch (IOException e) { - throw RubyUtil.newRubyIOError(context.runtime, e); - } - return context.nil; - } - @JRubyMethod(name = {"push", "<<"}) public void rubyPush(ThreadContext context, IRubyObject event) { checkIfClosed("write"); @@ -75,20 +65,29 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject { return queue.ruby_read_batch(context, size, wait); } + @JRubyMethod(name = "is_empty?") + public IRubyObject rubyIsEmpty(ThreadContext context) { + return RubyBoolean.newBoolean(context.runtime, this.queue.isEmpty()); + } - @JRubyMethod(name = "write_client") - public IRubyObject rubyWriteClient(final ThreadContext context) { + @Override + protected IRubyObject getWriteClient(final ThreadContext context) { return JrubyAckedWriteClientExt.create(queue, isClosed); } - @JRubyMethod(name = "read_client") - public IRubyObject rubyReadClient(final ThreadContext context) { + @Override + protected IRubyObject getReadClient() { return JrubyAckedReadClientExt.create(queue); } - @JRubyMethod(name = "is_empty?") - public IRubyObject rubyIsEmpty(ThreadContext context) { - return RubyBoolean.newBoolean(context.runtime, this.queue.isEmpty()); + @Override + protected IRubyObject doClose(final ThreadContext context) { + try { + close(); + } catch (IOException e) { + throw RubyUtil.newRubyIOError(context.runtime, e); + } + return context.nil; } private void checkIfClosed(String action) { |