- Sponsor
-
Notifications
You must be signed in to change notification settings - Fork 699
Fix redis worker debounce #1966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… bug)
|
Caution Review failedThe pull request is closed. WalkthroughThe changes introduce a deduplication key mechanism to the Redis-based queue system. Queue items now include an optional Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Queue
participant Redis
participant Worker
participant JobHandler
Client->>Queue: enqueue(job)
Queue->>Redis: Store job with deduplicationKey
Worker->>Queue: dequeue()
Queue->>Redis: Fetch job with deduplicationKey
Queue->>Worker: Return job, deduplicationKey
Worker->>JobHandler: process(job, deduplicationKey)
JobHandler-->>Worker: Complete
Worker->>Queue: ack(jobId, deduplicationKey)
Queue->>Redis: ackItem(jobId, deduplicationKey)
Redis-->>Queue: Validate & remove if deduplicationKey matches
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (2)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
packages/redis-worker/src/worker.ts (1)
349-377
: 🛠️ Refactor suggestion
ack()
wrapper now bypasses the deduplication checkInside
processItem()
you correctly callthis.queue.ack(id, deduplicationKey)
, but the publicWorker.ack()
helper (lines 281‑287, unchanged) still invokesthis.queue.ack(id)
without the key. Any external user ofworker.ack()
(or future internal use) will wipe out a newer job with the same ID.Either:
- Add an overload that requires the key, or
- Deprecate/remove the wrapper and expose
queue.ack
directly, or- Forward an optional parameter:
- ack(id: string) { + ack(id: string, deduplicationKey?: string) { … - this.queue.ack(id); + this.queue.ack(id, deduplicationKey ?? ""); }
🧹 Nitpick comments (1)
packages/redis-worker/src/worker.test.ts (1)
428-437
: Stale comment may confuse future readersThe inline note “These assertions should fail – demonstrating the bug” is no longer accurate after the deduplication fix; the assertions are expected to pass. Please update or remove the comment to reflect the current intent, otherwise newcomers may treat a CI‑green run as suspicious.
- // These assertions should fail - demonstrating the bug - // The second job should run … + // These assertions confirm that the second job is processed + // after the first one completes, validating the debounce fix.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
packages/redis-worker/src/queue.ts
(8 hunks)packages/redis-worker/src/worker.test.ts
(1 hunks)packages/redis-worker/src/worker.ts
(4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
packages/redis-worker/src/worker.ts (1)
packages/redis-worker/src/queue.ts (1)
AnyQueueItem
(33-41)
packages/redis-worker/src/worker.test.ts (1)
internal-packages/testcontainers/src/index.ts (1)
redisTest
(128-128)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
this.redis.defineCommand("ackItem", { | ||
numberOfKeys: 2, | ||
lua: ` | ||
local queue = KEYS[1] | ||
local items = KEYS[2] | ||
local queueKey = KEYS[1] | ||
local itemsKey = KEYS[2] | ||
local id = ARGV[1] | ||
local deduplicationKey = ARGV[2] | ||
|
||
redis.call('ZREM', queue, id) | ||
redis.call('HDEL', items, id) | ||
-- Get the item from the hash | ||
local item = redis.call('HGET', itemsKey, id) | ||
if not item then | ||
return -1 | ||
end | ||
|
||
-- Only check deduplicationKey if a non-empty one was passed in | ||
if deduplicationKey and deduplicationKey ~= "" then | ||
local success, parsed = pcall(cjson.decode, item) | ||
if success then | ||
if parsed.deduplicationKey and parsed.deduplicationKey ~= deduplicationKey then | ||
return 0 | ||
end | ||
end | ||
end | ||
|
||
-- Remove from sorted set and hash | ||
redis.call('ZREM', queueKey, id) | ||
redis.call('HDEL', itemsKey, id) | ||
return 1 | ||
`, | ||
`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Lua script: avoid false‑positives when no deduplicationKey is stored
If the stored item predates this feature it will have no deduplicationKey
.
When an old item is replaced by a new one with a key while another worker still holds the old key (""
), the current logic bypasses the check (deduplicationKey
argument is empty) and deletes the new item – re‑introducing the race.
Guard by returning 0
whenever the stored item has a key but none is supplied:
- if deduplicationKey and deduplicationKey ~= "" then
+ if deduplicationKey == nil or deduplicationKey == "" then
+ local parsed = cjson.decode(item)
+ if parsed.deduplicationKey then
+ return 0
+ end
+ else
-- existing path
This keeps backward‑compatibility (old items without a key are still ACKed) while protecting new ones.
Committable suggestion skipped: line range outside the PR's diff.
packages/redis-worker/src/queue.ts
Outdated
async ack(id: string, deduplicationKey?: string): Promise<void> { | ||
try { | ||
await this.redis.ackItem(`queue`, `items`, id); | ||
const result = await this.redis.ackItem(`queue`, `items`, id, deduplicationKey ?? ""); | ||
if (result === 0) { | ||
this.logger.error(`SimpleQueue ${this.name}.ack(): ack operation returned 0`, { | ||
queue: this.name, | ||
id, | ||
deduplicationKey, | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Escalate ACK mismatches instead of silently logging them
ack()
only logs when the Lua script returns 0
(deduplication‑key mismatch) or -1
(item missing) but still resolves void
.
Call‑sites (e.g. Worker.processItem
) will therefore treat the job as successfully acknowledged even though it remains in Redis, leading to:
- the job becoming “stuck” until TTL‑cleanup or a manual redrive
- misleading success metrics / traces
Consider propagating the return value (or throwing) so that the caller can retry, reschedule or surface an error.
- const result = await this.redis.ackItem(...);
- if (result === 0) {
- this.logger.error(/* … */);
- }
+ const result = await this.redis.ackItem(...);
+ if (result <= 0) {
+ throw new Error(
+ `ACK failed for id=${id} (result=${result}, dedupKey=${deduplicationKey ?? "∅"})`
+ );
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async ack(id: string, deduplicationKey?: string): Promise<void> { | |
try { | |
await this.redis.ackItem(`queue`, `items`, id); | |
const result = await this.redis.ackItem(`queue`, `items`, id, deduplicationKey ?? ""); | |
if (result === 0) { | |
this.logger.error(`SimpleQueue ${this.name}.ack(): ack operation returned 0`, { | |
queue: this.name, | |
id, | |
deduplicationKey, | |
}); | |
} | |
async ack(id: string, deduplicationKey?: string): Promise<void> { | |
try { | |
const result = await this.redis.ackItem(`queue`, `items`, id, deduplicationKey ?? ""); | |
if (result <= 0) { | |
throw new Error( | |
`ACK failed for id=${id} (result=${result}, dedupKey=${deduplicationKey ?? "∅"})` | |
); | |
} | |
// …rest of the logic… |
When using the "debounce" pattern with our (internal) Redis worker it was possible that we acked a job by mistake.
Example sequence:
id: "job-a"
id: "job-a"
but withavailableAt
in the future.availableAt
ack()
the job id, before the new one starts.This means that second one will never execute. This is a race condition.
This fix means that we will only
ack
an item if the item in the queue has the samededuplicationKey
, i.e. is the exact same item. So if you've added another item with the same id we won't ack it.Summary by CodeRabbit