Skip to content

Commit fdedc30

Browse files
authored
fix: Add sync_mode for Sidekiq/Resque compatibility (#112)
* feat: Add sync_mode for Sidekiq/Resque compatibility Send events synchronously on the calling thread instead of queuing them for a background worker. Follows the same pattern as the Python SDK: in sync_mode, events bypass the queue entirely and are sent inline via Transport#send. Retries are capped at 3 in sync mode (vs 10 in async) to avoid blocking the calling thread for extended periods during API outages. test_mode takes precedence over sync_mode to prevent accidental network calls in test environments. Usage: PostHog::Client.new(api_key: 'key', sync_mode: true) Fixes #10 * Bump version to 3.6.0 * Guard sync_mode transport with mutex for thread safety * Coordinate flush and shutdown with sync_lock
1 parent 3a3bbea commit fdedc30

6 files changed

Lines changed: 128 additions & 1 deletion

File tree

lib/posthog/client.rb

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
require 'posthog/utils'
99
require 'posthog/send_worker'
1010
require 'posthog/noop_worker'
11+
require 'posthog/message_batch'
12+
require 'posthog/transport'
1113
require 'posthog/feature_flags'
1214
require 'posthog/send_feature_flags_options'
1315
require 'posthog/exception_capture'
@@ -53,6 +55,9 @@ def _decrement_instance_count(api_key)
5355
# remain queued. Defaults to 10_000.
5456
# @option opts [Bool] :test_mode +true+ if messages should remain
5557
# queued for testing. Defaults to +false+.
58+
# @option opts [Bool] :sync_mode +true+ to send events synchronously
59+
# on the calling thread. Useful in forking environments like Sidekiq
60+
# and Resque. Defaults to +false+.
5661
# @option opts [Proc] :on_error Handles error calls from the API.
5762
# @option opts [String] :host Fully qualified hostname of the PostHog server. Defaults to `https://app.posthog.com`
5863
# @option opts [Integer] :feature_flags_polling_interval How often to poll for feature flag definition changes.
@@ -72,11 +77,23 @@ def initialize(opts = {})
7277
@api_key = opts[:api_key]
7378
@max_queue_size = opts[:max_queue_size] || Defaults::Queue::MAX_SIZE
7479
@worker_mutex = Mutex.new
80+
@sync_mode = opts[:sync_mode] == true && !opts[:test_mode]
81+
@on_error = opts[:on_error] || proc { |status, error| }
7582
@worker = if opts[:test_mode]
7683
NoopWorker.new(@queue)
84+
elsif @sync_mode
85+
nil
7786
else
7887
SendWorker.new(@queue, @api_key, opts)
7988
end
89+
if @sync_mode
90+
@transport = Transport.new(
91+
api_host: opts[:host],
92+
skip_ssl_verification: opts[:skip_ssl_verification],
93+
retries: 3
94+
)
95+
@sync_lock = Mutex.new
96+
end
8097
@worker_thread = nil
8198
@feature_flags_poller = nil
8299
@personal_api_key = opts[:personal_api_key]
@@ -118,6 +135,12 @@ def initialize(opts = {})
118135
# Use only for scripts which are not long-running, and will specifically
119136
# exit
120137
def flush
138+
if @sync_mode
139+
# Wait for any in-flight sync send to complete
140+
@sync_lock.synchronize {} # rubocop:disable Lint/EmptyBlock
141+
return
142+
end
143+
121144
while !@queue.empty? || @worker.is_requesting?
122145
ensure_worker_running
123146
sleep(0.1)
@@ -491,6 +514,11 @@ def shutdown
491514
self.class._decrement_instance_count(@api_key) if @api_key
492515
@feature_flags_poller.shutdown_poller
493516
flush
517+
if @sync_mode
518+
@sync_lock.synchronize { @transport&.shutdown }
519+
else
520+
@worker&.shutdown
521+
end
494522
end
495523

496524
private
@@ -528,6 +556,11 @@ def enqueue(action)
528556
# add our request id for tracing purposes
529557
action[:messageId] ||= uid
530558

559+
if @sync_mode
560+
send_sync(action)
561+
return true
562+
end
563+
531564
if @queue.length < @max_queue_size
532565
@queue << action
533566
ensure_worker_running
@@ -558,6 +591,22 @@ def ensure_worker_running
558591
end
559592
end
560593

594+
def send_sync(action)
595+
batch = MessageBatch.new(1)
596+
begin
597+
batch << action
598+
rescue MessageBatch::JSONGenerationError => e
599+
@on_error.call(-1, e.to_s)
600+
return
601+
end
602+
return if batch.empty?
603+
604+
@sync_lock.synchronize do
605+
res = @transport.send(@api_key, batch)
606+
@on_error.call(res.status, res.error) unless res.status == 200
607+
end
608+
end
609+
561610
def worker_running?
562611
@worker_thread&.alive?
563612
end

lib/posthog/noop_worker.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,9 @@ def run
1515
def is_requesting? # rubocop:disable Naming/PredicateName
1616
false
1717
end
18+
19+
def shutdown
20+
# Does nothing
21+
end
1822
end
1923
end

lib/posthog/send_worker.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ def run
5454
@transport.shutdown
5555
end
5656

57+
def shutdown
58+
@transport.shutdown
59+
end
60+
5761
# public: Check whether we have outstanding requests.
5862
#
5963
# TODO: Rename to `requesting?` in future version

lib/posthog/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# frozen_string_literal: true
22

33
module PostHog
4-
VERSION = '3.5.5'
4+
VERSION = '3.6.0'
55
end

posthog-rails/lib/posthog/rails/railtie.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ def test_mode=(value)
160160
@base_options[:test_mode] = value
161161
end
162162

163+
def sync_mode=(value)
164+
@base_options[:sync_mode] = value
165+
end
166+
163167
def on_error=(value)
164168
@base_options[:on_error] = value
165169
end

spec/posthog/client_spec.rb

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,72 @@ module PostHog
9393
end
9494
end
9595

96+
describe 'sync_mode' do
97+
around do |example|
98+
PostHog::Transport.stub = true
99+
example.call
100+
PostHog::Transport.stub = false
101+
end
102+
103+
it 'sends events inline without using a queue or worker' do
104+
sync_client = Client.new(api_key: API_KEY, sync_mode: true)
105+
sync_client.capture(Queued::CAPTURE)
106+
107+
worker = sync_client.instance_variable_get(:@worker)
108+
worker_thread = sync_client.instance_variable_get(:@worker_thread)
109+
expect(worker).to be_nil
110+
expect(worker_thread).to be_nil
111+
expect(sync_client.queued_messages).to eq(0)
112+
end
113+
114+
it 'calls on_error when the request fails' do
115+
error_status = nil
116+
on_error = proc { |status, _error| error_status = status }
117+
118+
allow_any_instance_of(PostHog::Transport).to(
119+
receive(:send).and_return(PostHog::Response.new(400, 'Bad request'))
120+
)
121+
122+
sync_client = Client.new(api_key: API_KEY, sync_mode: true, on_error: on_error)
123+
sync_client.capture(Queued::CAPTURE)
124+
125+
expect(error_status).to eq(400)
126+
end
127+
128+
it 'flush does not attempt to run a worker' do
129+
sync_client = Client.new(api_key: API_KEY, sync_mode: true)
130+
expect(sync_client).not_to receive(:ensure_worker_running)
131+
sync_client.flush
132+
end
133+
134+
it 'calls on_error with status -1 when message serialization fails' do
135+
error_status = nil
136+
error_message = nil
137+
on_error = proc { |status, error|
138+
error_status = status
139+
error_message = error
140+
}
141+
142+
allow_any_instance_of(PostHog::MessageBatch).to(
143+
receive(:<<).and_raise(PostHog::MessageBatch::JSONGenerationError, 'Serialization error')
144+
)
145+
146+
sync_client = Client.new(api_key: API_KEY, sync_mode: true, on_error: on_error)
147+
sync_client.capture(Queued::CAPTURE)
148+
149+
expect(error_status).to eq(-1)
150+
expect(error_message).to include('Serialization error')
151+
end
152+
153+
it 'prefers test_mode over sync_mode' do
154+
both_client = Client.new(api_key: API_KEY, test_mode: true, sync_mode: true)
155+
worker = both_client.instance_variable_get(:@worker)
156+
sync_mode = both_client.instance_variable_get(:@sync_mode)
157+
expect(worker).to be_a(PostHog::NoopWorker)
158+
expect(sync_mode).to eq(false)
159+
end
160+
end
161+
96162
describe '#capture' do
97163
it 'errors without an event' do
98164
expect { client.capture(distinct_id: 'user') }.to raise_error(

0 commit comments

Comments
 (0)