aboutsummaryrefslogtreecommitdiff
path: root/logstash-core/src/main/java/org/logstash/ackedqueue
diff options
context:
space:
mode:
Diffstat (limited to 'logstash-core/src/main/java/org/logstash/ackedqueue')
-rw-r--r--logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java74
-rw-r--r--logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java43
2 files changed, 95 insertions, 22 deletions
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java
new file mode 100644
index 00000000..e9517c45
--- /dev/null
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java
@@ -0,0 +1,74 @@
+package org.logstash.ackedqueue;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.jruby.Ruby;
+import org.jruby.RubyBasicObject;
+import org.jruby.RubyClass;
+import org.jruby.anno.JRubyClass;
+import org.jruby.anno.JRubyMethod;
+import org.jruby.runtime.ThreadContext;
+import org.jruby.runtime.builtin.IRubyObject;
+import org.logstash.RubyUtil;
+import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
+import org.logstash.execution.AbstractWrappedQueueExt;
+import org.logstash.ext.JrubyWrappedSynchronousQueueExt;
+
+@JRubyClass(name = "QueueFactory")
+public final class QueueFactoryExt extends RubyBasicObject {
+
+ public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) {
+ super(runtime, metaClass);
+ }
+
+ @JRubyMethod(meta = true)
+ public static AbstractWrappedQueueExt create(final ThreadContext context, final IRubyObject recv,
+ final IRubyObject settings) throws IOException {
+ final String type = getSetting(context, settings, "queue.type").asJavaString();
+ if ("persisted".equals(type)) {
+ final Path queuePath = Paths.get(
+ getSetting(context, settings, "path.queue").asJavaString(),
+ getSetting(context, settings, "pipeline.id").asJavaString()
+ );
+ Files.createDirectories(queuePath);
+ return new JRubyWrappedAckedQueueExt(context.runtime, RubyUtil.WRAPPED_ACKED_QUEUE_CLASS)
+ .initialize(
+ context, new IRubyObject[]{
+ context.runtime.newString(queuePath.toString()),
+ getSetting(context, settings, "queue.page_capacity"),
+ getSetting(context, settings, "queue.max_events"),
+ getSetting(context, settings, "queue.checkpoint.writes"),
+ getSetting(context, settings, "queue.checkpoint.acks"),
+ getSetting(context, settings, "queue.checkpoint.interval"),
+ getSetting(context, settings, "queue.max_bytes")
+ }
+ );
+ } else if ("memory".equals(type)) {
+ return new JrubyWrappedSynchronousQueueExt(
+ context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS
+ ).initialize(
+ context, context.runtime.newFixnum(
+ getSetting(context, settings, "pipeline.batch.size")
+ .convertToInteger().getIntValue()
+ * getSetting(context, settings, "pipeline.workers")
+ .convertToInteger().getIntValue()
+ )
+ );
+ } else {
+ throw context.runtime.newRaiseException(
+ RubyUtil.CONFIGURATION_ERROR_CLASS,
+ String.format(
+ "Invalid setting `%s` for `queue.type`, supported types are: 'memory' or 'persisted'",
+ type
+ )
+ );
+ }
+ }
+
+ private static IRubyObject getSetting(final ThreadContext context, final IRubyObject settings,
+ final String name) {
+ return settings.callMethod(context, "get_value", context.runtime.newString(name));
+ }
+}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java
index 55c3a04c..f506da4d 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java
@@ -6,25 +6,25 @@ import org.jruby.Ruby;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
-import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Arity;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
+import org.logstash.execution.AbstractWrappedQueueExt;
import org.logstash.ext.JrubyAckedReadClientExt;
import org.logstash.ext.JrubyAckedWriteClientExt;
import org.logstash.ext.JrubyEventExtLibrary;
@JRubyClass(name = "WrappedAckedQueue")
-public final class JRubyWrappedAckedQueueExt extends RubyObject {
+public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt {
private JRubyAckedQueueExt queue;
private final AtomicBoolean isClosed = new AtomicBoolean();
- @JRubyMethod(name = "initialize", optional = 7)
- public IRubyObject ruby_initialize(ThreadContext context, IRubyObject[] args) throws IOException {
+ @JRubyMethod(optional = 7)
+ public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] args) throws IOException {
args = Arity.scanArgs(context.runtime, args, 7, 0);
int capacity = RubyFixnum.num2int(args[1]);
int maxEvents = RubyFixnum.num2int(args[2]);
@@ -36,7 +36,7 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject {
checkpointMaxWrites, checkpointMaxAcks, queueMaxBytes);
this.queue.open();
- return context.nil;
+ return this;
}
public JRubyWrappedAckedQueueExt(final Ruby runtime, final RubyClass metaClass) {
@@ -53,16 +53,6 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject {
isClosed.set(true);
}
- @JRubyMethod(name = "close")
- public IRubyObject rubyClose(ThreadContext context) {
- try {
- close();
- } catch (IOException e) {
- throw RubyUtil.newRubyIOError(context.runtime, e);
- }
- return context.nil;
- }
-
@JRubyMethod(name = {"push", "<<"})
public void rubyPush(ThreadContext context, IRubyObject event) {
checkIfClosed("write");
@@ -75,20 +65,29 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject {
return queue.ruby_read_batch(context, size, wait);
}
+ @JRubyMethod(name = "is_empty?")
+ public IRubyObject rubyIsEmpty(ThreadContext context) {
+ return RubyBoolean.newBoolean(context.runtime, this.queue.isEmpty());
+ }
- @JRubyMethod(name = "write_client")
- public IRubyObject rubyWriteClient(final ThreadContext context) {
+ @Override
+ protected IRubyObject getWriteClient(final ThreadContext context) {
return JrubyAckedWriteClientExt.create(queue, isClosed);
}
- @JRubyMethod(name = "read_client")
- public IRubyObject rubyReadClient(final ThreadContext context) {
+ @Override
+ protected IRubyObject getReadClient() {
return JrubyAckedReadClientExt.create(queue);
}
- @JRubyMethod(name = "is_empty?")
- public IRubyObject rubyIsEmpty(ThreadContext context) {
- return RubyBoolean.newBoolean(context.runtime, this.queue.isEmpty());
+ @Override
+ protected IRubyObject doClose(final ThreadContext context) {
+ try {
+ close();
+ } catch (IOException e) {
+ throw RubyUtil.newRubyIOError(context.runtime, e);
+ }
+ return context.nil;
}
private void checkIfClosed(String action) {