fix(bluebubbles): debounce by messageId to preserve attachments in text+image messages (#4984)

* fix(bluebubbles): debounce by messageId to preserve attachments in text+image messages

BlueBubbles fires multiple webhook events for a single message - first
without attachment metadata, then ~350ms later with it. Previously,
messages with attachments bypassed debouncing and were processed
immediately, while the text-only version was also queued.

Now the debouncer uses messageId as the key when available, coalescing
all webhook events for the same message. The existing combineDebounceEntries
function merges attachments from all events.

Closes #4848

* fix: increase debounce and handle balloon messages

- Increase DEFAULT_INBOUND_DEBOUNCE_MS from 350ms to 500ms
- Update buildKey to use associatedMessageGuid for balloon messages
- Add regression test for debouncing behavior

Fixes issues identified in code review.

---------

Co-authored-by: Yurii Chukhlib <yurii.chukhlib@viber.com>
Co-authored-by: Tyler Yust <TYTYYUST@YAHOO.COM>
This commit is contained in:
Yuri Chukhlib
2026-01-31 00:53:14 +01:00
committed by GitHub
parent 57248a7ca1
commit 65dedef65b
2 changed files with 150 additions and 5 deletions

View File

@@ -1150,6 +1150,136 @@ describe("BlueBubbles webhook monitor", () => {
});
});
describe("inbound debouncing", () => {
it("coalesces text-only then attachment webhook events by messageId", async () => {
vi.useFakeTimers();
try {
const account = createMockAccount({ dmPolicy: "open" });
const config: OpenClawConfig = {};
const core = createMockRuntime();
// Use a timing-aware debouncer test double that respects debounceMs/buildKey/shouldDebounce.
core.channel.debounce.createInboundDebouncer = vi.fn((params: any) => {
type Item = any;
const buckets = new Map<string, { items: Item[]; timer: ReturnType<typeof setTimeout> | null }>();
const flush = async (key: string) => {
const bucket = buckets.get(key);
if (!bucket) return;
if (bucket.timer) {
clearTimeout(bucket.timer);
bucket.timer = null;
}
const items = bucket.items;
bucket.items = [];
if (items.length > 0) {
try {
await params.onFlush(items);
} catch (err) {
params.onError?.(err);
throw err;
}
}
};
return {
enqueue: async (item: Item) => {
if (params.shouldDebounce && !params.shouldDebounce(item)) {
await params.onFlush([item]);
return;
}
const key = params.buildKey(item);
const existing = buckets.get(key);
const bucket = existing ?? { items: [], timer: null };
bucket.items.push(item);
if (bucket.timer) clearTimeout(bucket.timer);
bucket.timer = setTimeout(async () => {
await flush(key);
}, params.debounceMs);
buckets.set(key, bucket);
},
flushKey: vi.fn(async (key: string) => {
await flush(key);
}),
};
}) as unknown as PluginRuntime["channel"]["debounce"]["createInboundDebouncer"];
setBlueBubblesRuntime(core);
unregister = registerBlueBubblesWebhookTarget({
account,
config,
runtime: { log: vi.fn(), error: vi.fn() },
core,
path: "/bluebubbles-webhook",
});
const messageId = "race-msg-1";
const chatGuid = "iMessage;-;+15551234567";
const payloadA = {
type: "new-message",
data: {
text: "hello",
handle: { address: "+15551234567" },
isGroup: false,
isFromMe: false,
guid: messageId,
chatGuid,
date: Date.now(),
},
};
const payloadB = {
type: "new-message",
data: {
text: "hello",
handle: { address: "+15551234567" },
isGroup: false,
isFromMe: false,
guid: messageId,
chatGuid,
attachments: [
{
guid: "att-1",
mimeType: "image/jpeg",
totalBytes: 1024,
},
],
date: Date.now(),
},
};
await handleBlueBubblesWebhookRequest(
createMockRequest("POST", "/bluebubbles-webhook", payloadA),
createMockResponse(),
);
// Simulate the real-world delay where the attachment-bearing webhook arrives shortly after.
await vi.advanceTimersByTimeAsync(300);
await handleBlueBubblesWebhookRequest(
createMockRequest("POST", "/bluebubbles-webhook", payloadB),
createMockResponse(),
);
// Not flushed yet; still within the debounce window.
expect(mockDispatchReplyWithBufferedBlockDispatcher).not.toHaveBeenCalled();
// After the debounce window, the combined message should be processed exactly once.
await vi.advanceTimersByTimeAsync(600);
expect(mockDispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledTimes(1);
const callArgs = mockDispatchReplyWithBufferedBlockDispatcher.mock.calls[0][0];
expect(callArgs.ctx.MediaPaths).toEqual(["/tmp/test-media.jpg"]);
expect(callArgs.ctx.Body).toContain("hello");
} finally {
vi.useRealTimers();
}
});
});
describe("reply metadata", () => {
it("surfaces reply fields in ctx when provided", async () => {
const account = createMockAccount({ dmPolicy: "open" });

View File

@@ -264,7 +264,7 @@ type BlueBubblesDebounceEntry = {
* This helps combine URL text + link preview balloon messages that BlueBubbles
* sends as separate webhook events when no explicit inbound debounce config exists.
*/
const DEFAULT_INBOUND_DEBOUNCE_MS = 350;
const DEFAULT_INBOUND_DEBOUNCE_MS = 500;
/**
* Combines multiple debounced messages into a single message for processing.
@@ -363,7 +363,23 @@ function getOrCreateDebouncer(target: WebhookTarget) {
debounceMs: resolveBlueBubblesDebounceMs(config, core),
buildKey: (entry) => {
const msg = entry.message;
// Build key from account + chat + sender to coalesce messages from same source
// Prefer stable, shared identifiers to coalesce rapid-fire webhook events for the
// same message (e.g., text-only then text+attachment).
//
// For balloons (URL previews, stickers, etc), BlueBubbles often uses a different
// messageId than the originating text. When present, key by associatedMessageGuid
// to keep text + balloon coalescing working.
const balloonBundleId = msg.balloonBundleId?.trim();
const associatedMessageGuid = msg.associatedMessageGuid?.trim();
if (balloonBundleId && associatedMessageGuid) {
return `bluebubbles:${account.accountId}:balloon:${associatedMessageGuid}`;
}
const messageId = msg.messageId?.trim();
if (messageId) {
return `bluebubbles:${account.accountId}:msg:${messageId}`;
}
const chatKey =
msg.chatGuid?.trim() ??
msg.chatIdentifier?.trim() ??
@@ -372,13 +388,12 @@ function getOrCreateDebouncer(target: WebhookTarget) {
},
shouldDebounce: (entry) => {
const msg = entry.message;
// Skip debouncing for messages with attachments - process immediately
if (msg.attachments && msg.attachments.length > 0) return false;
// Skip debouncing for from-me messages (they're just cached, not processed)
if (msg.fromMe) return false;
// Skip debouncing for control commands - process immediately
if (core.channel.text.hasControlCommand(msg.text, config)) return false;
// Debounce normal text messages and URL balloon messages
// Debounce all other messages to coalesce rapid-fire webhook events
// (e.g., text+image arriving as separate webhooks for the same messageId)
return true;
},
onFlush: async (entries) => {