aboutsummaryrefslogtreecommitdiff
path: root/logstash-core/spec/logstash/queue_factory_spec.rb
blob: 8b0193f1de4f133aaac699c46e5752e1715de50c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# encoding: utf-8
require "logstash/settings"
require "stud/temporary"

describe LogStash::QueueFactory do
  let(:pipeline_id) { "my_pipeline" }
  let(:settings_array) do
    [
      LogStash::Setting::WritableDirectory.new("path.queue", Stud::Temporary.pathname),
      LogStash::Setting::String.new("queue.type", "memory", true, ["persisted", "memory"]),
      LogStash::Setting::Bytes.new("queue.page_capacity", "64mb"),
      LogStash::Setting::Bytes.new("queue.max_bytes", "1024mb"),
      LogStash::Setting::Numeric.new("queue.max_events", 0),
      LogStash::Setting::Numeric.new("queue.checkpoint.acks", 1024),
      LogStash::Setting::Numeric.new("queue.checkpoint.writes", 1024),
      LogStash::Setting::Numeric.new("queue.checkpoint.interval", 1000),
      LogStash::Setting::String.new("pipeline.id", pipeline_id),
      LogStash::Setting::PositiveInteger.new("pipeline.batch.size", 125),
      LogStash::Setting::PositiveInteger.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum)
    ]
  end

  let(:settings) do
    s = LogStash::Settings.new

    settings_array.each do |setting|
      s.register(setting)
    end
    s
  end

  subject { described_class }

  context "when `queue.type` is `persisted`" do
    before do
      settings.set("queue.type", "persisted")
    end

    it "returns a `WrappedAckedQueue`" do
      queue =  subject.create(settings)
      expect(queue).to be_kind_of(LogStash::WrappedAckedQueue)
      queue.close
    end

    describe "per pipeline id subdirectory creation" do
      let(:queue_path) { ::File.join(settings.get("path.queue"), pipeline_id) }

      after :each do
        FileUtils.rmdir(queue_path)
      end

      it "creates a queue directory based on the pipeline id" do
        expect(Dir.exist?(queue_path)).to be_falsey
        queue = subject.create(settings)
        expect(Dir.exist?(queue_path)).to be_truthy
        queue.close
      end
    end
  end

  context "when `queue.type` is `memory`" do
    before do
      settings.set("queue.type", "memory")
      settings.set("pipeline.batch.size", 1024)
    end

    it "returns a `WrappedSynchronousQueue`" do
      queue =  subject.create(settings)
      expect(queue).to be_kind_of(LogStash::WrappedSynchronousQueue)
      queue.close
    end
  end
end