Skip to content

perf(webapp): parallelize Phase 2 streaming batch-item ingest (TRI-10273)#3777

Open
matt-aitken wants to merge 2 commits into
mainfrom
feature/tri-10273-managed-runs-streaming-batchtriggerandwait-fails-with-408
Open

perf(webapp): parallelize Phase 2 streaming batch-item ingest (TRI-10273)#3777
matt-aitken wants to merge 2 commits into
mainfrom
feature/tri-10273-managed-runs-streaming-batchtriggerandwait-fails-with-408

Conversation

@matt-aitken
Copy link
Copy Markdown
Member

Problem

Phase 2 of the v3 streaming batch API (POST /api/v3/batches/:batchId/items) processed streamed items strictly sequentially. For a batch of many large payloads — each offloaded to object storage inline — this serialized N object-store round-trips inside a single request, exceeding Node's default server.requestTimeout (300s). The webapp returned 408, which the SDK reads as 408 terminated and retries 5×, turning a slow ingest into a ~26-minute failure (BatchTriggerError: Failed to stream items ... 408 terminated).

Closes TRI-10273 — https://linear.app/triggerdotdev/issue/TRI-10273

Fix

Ingest now runs through p-map over the NDJSON async iterable with bounded concurrency (STREAMING_BATCH_INGEST_CONCURRENCY, default 10):

  • p-map pulls lazily from the stream — at most concurrency items are read/in-flight at once, so peak memory is bounded to roughly concurrency × STREAMING_BATCH_ITEM_MAXIMUM_SIZE and request-body backpressure is preserved.
  • Set the env to 1 for fully sequential ingestion (escape hatch).
  • The default lives only in env.server.ts; the service takes a required number.

Why this is safe (ordering/idempotency unchanged)

  • Ordering derives from each item's index (enqueue timestamp = batch.createdAt + index), not enqueue order.
  • Dedup is atomic per index in enqueueBatchItem.
  • The NDJSON parser now stamps oversized-item markers with their emit position, removing the consumer's sequential lastIndex assumption (the only order-dependent bit).
  • The count-check + conditional seal path is untouched.

Tests

  • 100-item batch ingested concurrently → all enqueued + sealed, correct counts
  • in-flight processing never exceeds the configured concurrency (real instrumented payload processor)
  • concurrent dedup on Phase 2 retry (pre-enqueued half re-streamed)
  • emit-position marker indexing (parser unit test)
  • Full existing sealing/idempotency suite still green — 42/42 pass; webapp typecheck clean.

Follow-ups (not in this PR)

  • SDK pre-offload of large item payloads (send application/store refs instead of raw blobs) to remove object-store work from the request hot path and shrink the request body — bigger, protocol-level change.
  • Optional server.requestTimeout bump as a safety net.

🤖 Generated with Claude Code

@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented May 29, 2026

⚠️ No Changeset found

Latest commit: de3489f

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 29, 2026

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: f9f08dce-d8f9-403e-b50b-19568382c1e7

📥 Commits

Reviewing files that changed from the base of the PR and between 4ca9807 and de3489f.

📒 Files selected for processing (6)
  • .server-changes/parallel-batch-item-ingest.md
  • apps/webapp/app/env.server.ts
  • apps/webapp/app/routes/api.v3.batches.$batchId.items.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
  • apps/webapp/test/engine/streamBatchItems.test.ts
  • docs/self-hosting/env/webapp.mdx
💤 Files with no reviewable changes (1)
  • docs/self-hosting/env/webapp.mdx
✅ Files skipped from review due to trivial changes (1)
  • .server-changes/parallel-batch-item-ingest.md
🚧 Files skipped from review as they are similar to previous changes (3)
  • apps/webapp/app/routes/api.v3.batches.$batchId.items.ts
  • apps/webapp/app/env.server.ts
  • apps/webapp/test/engine/streamBatchItems.test.ts
📜 Recent review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: e2e-webapp / 🧪 E2E Tests: Webapp
  • GitHub Check: typecheck / typecheck
  • GitHub Check: 🛡️ E2E Auth Tests (full)
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: Build and publish previews
🧰 Additional context used
📓 Path-based instructions (7)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

Import from @trigger.dev/sdk when writing Trigger.dev tasks. Never use @trigger.dev/sdk/v3 or deprecated client.defineJob

Files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use zod for validation in packages/core and apps/webapp

Files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

**/*.{ts,tsx,js,jsx}: Prefer static imports over dynamic imports. Only use dynamic import() when circular dependencies cannot be resolved, code splitting is needed for performance, or the module must be loaded conditionally at runtime
Import subpaths only from packages/core (@trigger.dev/core), never import from the root

Files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
**/*.ts

📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)

**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries

Files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

apps/webapp/**/*.{ts,tsx}: Access environment variables through the env export of env.server.ts instead of directly accessing process.env
Use subpath exports from @trigger.dev/core package instead of importing from the root @trigger.dev/core path

Use named constants for sentinel/placeholder values (e.g. const UNSET_VALUE = '__unset__') instead of raw string literals scattered across comparisons

Files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
apps/webapp/**/*.server.ts

📄 CodeRabbit inference engine (apps/webapp/CLAUDE.md)

apps/webapp/**/*.server.ts: Never use request.signal for detecting client disconnects. Use getRequestAbortSignal() from app/services/httpAsyncStorage.server.ts instead, which is wired directly to Express res.on('close') and fires reliably
Access environment variables via env export from app/env.server.ts. Never use process.env directly
Always use findFirst instead of findUnique in Prisma queries. findUnique has an implicit DataLoader that batches concurrent calls and has active bugs even in Prisma 6.x (uppercase UUIDs returning null, composite key SQL correctness issues, 5-10x worse performance). findFirst is never batched and avoids this entire class of issues

Files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
**/*.{js,ts,tsx,jsx,css,json,md}

📄 CodeRabbit inference engine (AGENTS.md)

Use Prettier for code formatting and run pnpm run format before committing

Files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
🧠 Learnings (39)
📓 Common learnings
Learnt from: d-cs
Repo: triggerdotdev/trigger.dev PR: 3754
File: apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts:30-32
Timestamp: 2026-06-01T12:05:44.112Z
Learning: In the triggerdotdev/trigger.dev codebase, the mollifier stale-entry sweep (`initMollifierStaleSweepWorker` in `apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts`) intentionally runs per-webapp instance without a distributed lease in its initial implementation. All Redis ops (cursor, counts hash, reconcile) are individually atomic and produce correct shared state even with multiple concurrent sweepers. The known limitation is that OpenTelemetry metric output (`recordStaleEntry`, `reportStaleEntrySnapshot`) multiplies by N webapp instances, mis-calibrating alert thresholds by a factor of N. A SETNX-based per-tick lease (SET NX PX on the sweep's existing Redis) is the planned follow-up fix. Until then, alert thresholds should be scaled accordingly. Do not re-raise this as a blocking correctness bug — it is a documented metric-scaling limitation with a tracked follow-up.
Learnt from: d-cs
Repo: triggerdotdev/trigger.dev PR: 3756
File: apps/webapp/app/v3/services/resetIdempotencyKey.server.ts:65-94
Timestamp: 2026-06-01T15:01:35.175Z
Learning: In `apps/webapp/app/v3/services/resetIdempotencyKey.server.ts` (triggerdotdev/trigger.dev), a transient `buffer.resetIdempotency()` failure when `pgCount > 0` does NOT warrant a 503 and should return success. The mollifier `ack` and `fail` Lua scripts always DEL the idempotency lookup key as part of the run's natural lifecycle (drain→ack or terminal→fail or cancel-bifurcation), so stale buffered idempotency lookups converge automatically without caller retries. Only when `pgCount === 0 && bufferResetFailed` is a 503 appropriate, because then the run's existence is genuinely unobservable (the buffer outage hides a potentially matching buffered run). The test "returns success when PG cleared >=1 run, even if the buffer reset throws" documents this contract explicitly.
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3644
File: packages/core/src/v3/apiClient/runStream.ts:385-426
Timestamp: 2026-05-17T08:07:36.624Z
Learning: In triggerdotdev/trigger.dev, do not flag missing per-element Zod validation for S2 v2 batch SSE records (e.g., in `packages/core/src/v3/apiClient/runStream.ts` and `apps/webapp/app/services/realtime/s2realtimeStreams.server.ts`). S2 batch records come from a trusted documented upstream wire protocol; a throw on a malformed field (e.g., `seq_num.toString()`) is intentional behavior to surface wire-protocol violations rather than silently skipping them. Only the container-level guard (`Array.isArray(data.records)`) is expected.
Learnt from: d-cs
Repo: triggerdotdev/trigger.dev PR: 3754
File: apps/webapp/app/env.server.ts:1104-1129
Timestamp: 2026-06-01T11:37:12.623Z
Learning: In triggerdotdev/trigger.dev (apps/webapp/app/env.server.ts), new background/periodic worker feature flags should hard-default to "0" (explicitly opt-in) rather than inheriting a parent feature flag (e.g., TRIGGER_MOLLIFIER_ENABLED). Inheriting a parent flag causes the new worker to auto-start on upgrade for any deployment that already has the parent flag enabled, turning on unexpected background load without an explicit rollout step. Each new worker component should require its own explicit opt-in via its own env var (e.g., TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED defaults to "0", not to process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0").
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3333
File: apps/webapp/app/services/runsReplicationService.server.ts:655-685
Timestamp: 2026-04-17T13:20:14.259Z
Learning: In `apps/webapp/app/services/runsReplicationService.server.ts`, the per-ClickHouse-group inserts inside `#flushBatch` are intentionally serialized (sequential) by default. Parallelizing group flushes causes Linux socket write-buffer pressure that required kernel tuning (`net.ipv4.tcp_wmem` set to `4096 20480 8388608`). Parallel flushing may be exposed as an opt-in via a `groupFlushStrategy` env var/option, but sequential must remain the safe default. Do not flag the sequential loop as a performance issue without acknowledging this constraint.
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 3331
File: apps/webapp/app/runEngine/concerns/batchPayloads.server.ts:112-136
Timestamp: 2026-04-07T14:12:59.018Z
Learning: In `apps/webapp/app/runEngine/concerns/batchPayloads.server.ts`, the `pRetry` call wrapping `uploadPacketToObjectStore` intentionally retries **all** error types (no `shouldRetry` filter / `AbortError` guards). The maintainer explicitly prefers over-retrying to under-retrying because multiple heterogeneous object store backends are supported and it is impractical to enumerate all permanent error signatures. Do not flag this as an issue in future reviews.
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3368
File: apps/webapp/app/services/taskIdentifierRegistry.server.ts:24-67
Timestamp: 2026-04-13T21:44:00.032Z
Learning: In `apps/webapp/app/services/taskIdentifierRegistry.server.ts`, the sequential upsert/updateMany/findMany writes in `syncTaskIdentifiers` are intentionally NOT wrapped in a Prisma transaction. This function runs only during deployment-change events (low-concurrency path), and any partial `isInLatestDeployment` state is acceptable because it self-corrects on the next deployment. Do not flag this as a missing-transaction/atomicity issue in future reviews.
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 3499
File: apps/webapp/app/routes/api.v1.sessions.ts:49-55
Timestamp: 2026-05-09T08:07:47.468Z
Learning: In triggerdotdev/trigger.dev, the `GET /api/v1/sessions` route (`apps/webapp/app/routes/api.v1.sessions.ts`) has a known deferred security concern: when multiple `filter[taskIdentifier]` values are requested under a per-task-scoped JWT (`read:tasks:<id>`), `anyResource` OR semantics grant access but the repository then lists sessions for ALL requested task IDs, leaking data beyond the JWT's permitted scope. The fix (either a multi-task-filter → require `read:sessions` collection-scope guard at the `apiBuilder` level, or intersecting the filter with JWT-permitted task IDs before the repository call) requires surfacing permitted-task-IDs from `RbacAbility`, and is tracked for a separate PR as part of the broader `anyResource` semantics work.
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3430
File: apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts:0-0
Timestamp: 2026-04-23T13:26:31.290Z
Learning: In `apps/webapp/app/utils/sse.ts` (and callers such as `apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts`), the string-reason type passed to `.abort()` is NOT load-bearing for heap retention in SSE streams on Node 20. Isolation testing (2000 requests × 200 KB payload) confirmed `.abort()` vs `.abort("string")` produces identical heap deltas once `AbortSignal.any` is removed. The actual root cause of the SSE memory leak was `AbortSignal.any`, which registers signals with a FinalizationRegistry that does not reliably release them on abort (see nodejs/node#54614 and `#55351`). Named sentinel constants (`ABORT_REASON_*`, exported from `sse.ts`) are a readability/style win per the CLAUDE.md named-constant guideline, not a correctness fix. Do not flag `.abort()` vs `.abort("reason string")` as a memory-retention issue in future reviews of this codebase.
Learnt from: myftija
Repo: triggerdotdev/trigger.dev PR: 3274
File: apps/webapp/app/services/runsReplicationService.server.ts:922-924
Timestamp: 2026-03-26T09:02:11.935Z
Learning: In `triggerdotdev/trigger.dev`, `TaskRun.annotations` are always written atomically in one operation that conforms exactly to the `RunAnnotations` schema (from `trigger.dev/core/v3`). Using `RunAnnotations.safeParse` in `#parseAnnotations` (e.g., in `apps/webapp/app/services/runsReplicationService.server.ts`) is intentional and correct — there is no risk of partial/legacy annotation payloads causing schema mismatches, so suggesting a relaxed passthrough schema for this parsing is unnecessary.
📚 Learning: 2026-05-17T08:07:25.757Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3644
File: apps/webapp/app/services/realtime/s2realtimeStreams.server.ts:261-293
Timestamp: 2026-05-17T08:07:25.757Z
Learning: In `apps/webapp/app/services/realtime/s2realtimeStreams.server.ts`, do NOT suggest adding per-element Zod validation inside `parseSSEBatchRecords()` or `#peekIsSettled()` for S2 batch record entries. S2 records arrive from a trusted upstream in a documented wire format; the existing `try/catch` around `JSON.parse` and the outer batch loop is the intended and sufficient level of defensiveness. Per-element Zod schema validation on every batch record is considered over-engineering for this server-to-server path.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-03T13:07:33.177Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: internal-packages/run-engine/src/batch-queue/tests/index.test.ts:711-713
Timestamp: 2026-03-03T13:07:33.177Z
Learning: In `internal-packages/run-engine/src/batch-queue/tests/index.test.ts`, test assertions for rate limiter stubs can use `toBeGreaterThanOrEqual` rather than exact equality (`toBe`) because the consumer loop may call the rate limiter during empty pops in addition to actual item processing, and this over-calling is acceptable in integration tests.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-05-17T08:07:36.624Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3644
File: packages/core/src/v3/apiClient/runStream.ts:385-426
Timestamp: 2026-05-17T08:07:36.624Z
Learning: In triggerdotdev/trigger.dev, do not flag missing per-element Zod validation for S2 v2 batch SSE records (e.g., in `packages/core/src/v3/apiClient/runStream.ts` and `apps/webapp/app/services/realtime/s2realtimeStreams.server.ts`). S2 batch records come from a trusted documented upstream wire protocol; a throw on a malformed field (e.g., `seq_num.toString()`) is intentional behavior to surface wire-protocol violations rather than silently skipping them. Only the container-level guard (`Array.isArray(data.records)`) is expected.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-07T14:12:59.018Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 3331
File: apps/webapp/app/runEngine/concerns/batchPayloads.server.ts:112-136
Timestamp: 2026-04-07T14:12:59.018Z
Learning: In `apps/webapp/app/runEngine/concerns/batchPayloads.server.ts`, the `pRetry` call wrapping `uploadPacketToObjectStore` intentionally retries **all** error types (no `shouldRetry` filter / `AbortError` guards). The maintainer explicitly prefers over-retrying to under-retrying because multiple heterogeneous object store backends are supported and it is impractical to enumerate all permanent error signatures. Do not flag this as an issue in future reviews.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-13T21:44:00.032Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3368
File: apps/webapp/app/services/taskIdentifierRegistry.server.ts:24-67
Timestamp: 2026-04-13T21:44:00.032Z
Learning: In `apps/webapp/app/services/taskIdentifierRegistry.server.ts`, the sequential upsert/updateMany/findMany writes in `syncTaskIdentifiers` are intentionally NOT wrapped in a Prisma transaction. This function runs only during deployment-change events (low-concurrency path), and any partial `isInLatestDeployment` state is acceptable because it self-corrects on the next deployment. Do not flag this as a missing-transaction/atomicity issue in future reviews.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-23T13:26:31.290Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3430
File: apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts:0-0
Timestamp: 2026-04-23T13:26:31.290Z
Learning: In `apps/webapp/app/utils/sse.ts` (and callers such as `apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts`), the string-reason type passed to `.abort()` is NOT load-bearing for heap retention in SSE streams on Node 20. Isolation testing (2000 requests × 200 KB payload) confirmed `.abort()` vs `.abort("string")` produces identical heap deltas once `AbortSignal.any` is removed. The actual root cause of the SSE memory leak was `AbortSignal.any`, which registers signals with a FinalizationRegistry that does not reliably release them on abort (see nodejs/node#54614 and `#55351`). Named sentinel constants (`ABORT_REASON_*`, exported from `sse.ts`) are a readability/style win per the CLAUDE.md named-constant guideline, not a correctness fix. Do not flag `.abort()` vs `.abort("reason string")` as a memory-retention issue in future reviews of this codebase.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-07T14:12:18.946Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 3331
File: apps/webapp/test/engine/batchPayloads.test.ts:5-24
Timestamp: 2026-04-07T14:12:18.946Z
Learning: In `apps/webapp/test/engine/batchPayloads.test.ts`, using `vi.mock` for `~/v3/objectStore.server` (stubbing `hasObjectStoreClient` and `uploadPacketToObjectStore`), `~/env.server` (overriding offload thresholds), and `~/v3/tracer.server` (stubbing `startActiveSpan`) is intentional and acceptable. Simulating controlled transient upload failures (e.g., fail N times then succeed) to verify `p-retry` behavior cannot be reproduced with real services or testcontainers. This file is an explicit exception to the repo's general no-mocks policy.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-17T13:20:14.259Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3333
File: apps/webapp/app/services/runsReplicationService.server.ts:655-685
Timestamp: 2026-04-17T13:20:14.259Z
Learning: In `apps/webapp/app/services/runsReplicationService.server.ts`, the per-ClickHouse-group inserts inside `#flushBatch` are intentionally serialized (sequential) by default. Parallelizing group flushes causes Linux socket write-buffer pressure that required kernel tuning (`net.ipv4.tcp_wmem` set to `4096 20480 8388608`). Parallel flushing may be exposed as an opt-in via a `groupFlushStrategy` env var/option, but sequential must remain the safe default. Do not flag the sequential loop as a performance issue without acknowledging this constraint.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-05-05T09:38:02.512Z
Learnt from: d-cs
Repo: triggerdotdev/trigger.dev PR: 3523
File: apps/webapp/app/routes/api.v3.batches.ts:178-181
Timestamp: 2026-05-05T09:38:02.512Z
Learning: When reviewing code that catches `ServiceValidationError` in `*.server.ts` files, do not blindly forward `error.status` to HTTP responses, because SVEs may be thrown with non-default statuses (e.g., 400/500) and forwarding them can cause client-visible behavioral regressions (e.g., surfacing 500s to clients). Prefer a safe default response status of `error.status ?? 422`, but only after confirming via the reachable call graph that the caught `ServiceValidationError` instances are expected to carry those non-default statuses; otherwise, normalize to `422` to avoid unexpected client-visible 5xx behavior.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-20T15:06:11.054Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3417
File: apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts:16-26
Timestamp: 2026-04-20T15:06:11.054Z
Learning: In `apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts` and `apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts`, the `MAX_APPEND_BODY_BYTES` cap of 512 KiB (1024 * 512) is intentional even though `appendPart` wraps the body in JSON (which could expand quote-heavy payloads beyond S2's 1 MiB per-record limit). The maintainer considers worst-case quote-heavy payloads pathological and not realistic. If S2 rejections occur in practice, an encoded-size guard will be added inside `appendPart` rather than lowering the raw body cap on every caller. Do not flag this as an issue in future reviews.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-05-12T06:43:12.346Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3558
File: internal-packages/run-engine/src/run-queue/index.ts:420-424
Timestamp: 2026-05-12T06:43:12.346Z
Learning: In the triggerdotdev/trigger.dev codebase (`internal-packages/run-engine/src/run-queue/index.ts`), the established convention in `RunQueue` read-path methods (e.g., `lengthOfQueue`, `lengthOfQueues`, `currentConcurrencyOfQueues`) is to **fail open** on transient Redis pipeline errors: pipeline result errors (`baseErr`, `ctrErr`, etc.) are coerced to `0` rather than surfaced or re-thrown. This is intentional — the project treats Redis command errors the same as missing keys for these counter reads. Do not flag this pattern as a bug or suggest throwing/propagating these errors in future reviews.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-06-01T15:01:35.175Z
Learnt from: d-cs
Repo: triggerdotdev/trigger.dev PR: 3756
File: apps/webapp/app/v3/services/resetIdempotencyKey.server.ts:65-94
Timestamp: 2026-06-01T15:01:35.175Z
Learning: In `apps/webapp/app/v3/services/resetIdempotencyKey.server.ts` (triggerdotdev/trigger.dev), a transient `buffer.resetIdempotency()` failure when `pgCount > 0` does NOT warrant a 503 and should return success. The mollifier `ack` and `fail` Lua scripts always DEL the idempotency lookup key as part of the run's natural lifecycle (drain→ack or terminal→fail or cancel-bifurcation), so stale buffered idempotency lookups converge automatically without caller retries. Only when `pgCount === 0 && bufferResetFailed` is a 503 appropriate, because then the run's existence is genuinely unobservable (the buffer outage hides a potentially matching buffered run). The test "returns success when PG cleared >=1 run, even if the buffer reset throws" documents this contract explicitly.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-27T16:39:43.098Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3453
File: internal-packages/run-engine/src/engine/systems/debounceSystem.ts:517-547
Timestamp: 2026-04-27T16:39:43.098Z
Learning: In `internal-packages/run-engine/src/engine/systems/debounceSystem.ts`, the `try/catch` around `runLock.lock(...)` in `handleExistingRun` routes errors matching `#isLockContentionError` (`LockAcquisitionTimeoutError`, `name === "ExecutionError"`, `name === "ResourceLockedError"`) to a fallback. This is intentionally NOT guarded by a `lockAcquired` flag because the only code executed inside the lock callback (`#handleExistingRunLocked`) calls Prisma and ioredis, neither of which emits errors with those names — those names are redlock-specific. There are no nested `runLock.lock` calls in this path so callback-thrown errors cannot be misclassified. A `lockAcquired` guard should be revisited only if a nested lock call is ever introduced inside `#handleExistingRunLocked`.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-28T15:57:56.285Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3462
File: apps/webapp/app/services/routeBuilders/apiBuilder.server.ts:32-50
Timestamp: 2026-04-28T15:57:56.285Z
Learning: In `apps/webapp/app/services/routeBuilders/apiBuilder.server.ts`, the `logBoundaryError` helper hard-codes all `ServiceValidationError` and `EngineServiceValidationError` instances to `warn` level, ignoring the `logLevel` field on those classes. This is intentional: as of PR `#3462`, no caller in webapp or run-engine passes a non-default `logLevel` to either error type — the field is dead code. Per repo style, no abstraction for hypothetical future use is added. Do not flag `logBoundaryError` for ignoring `error.logLevel` in future reviews; revisit only if/when callers start setting a non-default `logLevel`.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-06-06T19:34:44.129Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 3855
File: internal-packages/testcontainers/src/minio.ts:125-136
Timestamp: 2026-06-06T19:34:44.129Z
Learning: In `internal-packages/testcontainers/src/minio.ts` (triggerdotdev/trigger.dev), the `resetBucket()` method on `StartedMinIOContainer` intentionally uses `throwOnError: false` for the `mc rm --recursive --force local/<bucket>` step. On the first reset the bucket may be empty (or absent), causing `mc rm` to exit non-zero; swallowing that error is correct because the subsequent `mc mb --ignore-existing` call (with `throwOnError: true`) provides the hard guarantee that the bucket exists and is empty. Do not flag this `throwOnError: false` pattern as a bug in future reviews.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-05-15T08:05:57.683Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3625
File: apps/webapp/app/services/taskMetadataCache.server.ts:270-291
Timestamp: 2026-05-15T08:05:57.683Z
Learning: In the triggerdotdev/trigger.dev codebase, `populateByCurrentWorker()` in `apps/webapp/app/services/taskMetadataCache.server.ts` intentionally logs and swallows Redis errors rather than rethrowing. The design rationale: rethrowing would propagate into `ChangeCurrentDeploymentService.call` and break deploy promotion when Redis is briefly unavailable; the 24h `TASK_META_CACHE_CURRENT_ENV_TTL_SECONDS` TTL acts as the self-healing window for cache drift, and next-promotion overwrites the env key sooner in practice. A compensating DEL on failure is also not a win because if Redis is unreachable the DEL fails identically, and Lua scripts are atomic so a partial write is impossible. Do not flag this log+swallow pattern as a bug in future reviews.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-05-12T14:13:17.114Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3561
File: .claude/scripts/check-review-md.sh:76-79
Timestamp: 2026-05-12T14:13:17.114Z
Learning: In the triggerdotdev/trigger.dev repository, `.claude/REVIEW.md` drift/audit checking is handled by an LLM-based workflow using `anthropics/claude-code-action` (mirroring `.github/workflows/claude-md-audit.yml`), not a static bash script. The LLM audit catches semantic drift, stale references, contradictions, and missing/obsolete rules — not just deleted paths. The bash script `.claude/scripts/check-review-md.sh` was dropped in favor of this approach.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-05-04T19:14:44.097Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 3499
File: apps/webapp/test/auth-api.e2e.full.test.ts:205-227
Timestamp: 2026-05-04T19:14:44.097Z
Learning: In triggerdotdev/trigger.dev's e2e auth test suite (`apps/webapp/test/auth-api.e2e.full.test.ts` and related `*.e2e.full.test.ts` files), loose negative assertions like `expect(res.status).not.toBe(200)` are intentional. External infrastructure (e.g. ClickHouse) is unreachable in the e2e test environment, so a 5xx from the route handler after auth passes is an expected and acceptable outcome. Tightening these to a specific set like `[401, 403, 404]` would incorrectly exclude valid 5xx results. Do not flag these as issues during review.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-06-06T19:34:45.521Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 3855
File: internal-packages/testcontainers/src/sequencer.cjs:28-44
Timestamp: 2026-06-06T19:34:45.521Z
Learning: In triggerdotdev/trigger.dev, `test-timings.json` is a committed file under source control (repo root). In `internal-packages/testcontainers/src/sequencer.cjs`, `loadTimings()` intentionally throws with a descriptive error when the file exists but contains invalid JSON (fail-fast), rather than silently falling back to count-based sharding. A missing file is still the legitimate no-timings default and returns `{}`. Do not suggest wrapping the JSON.parse in a silent try/catch in future reviews.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-16T14:21:17.695Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3368
File: apps/webapp/app/components/logs/LogsTaskFilter.tsx:135-163
Timestamp: 2026-04-16T14:21:17.695Z
Learning: In `triggerdotdev/trigger.dev` PR `#3368`, the `TaskIdentifier` table has a `@unique([runtimeEnvironmentId, slug])` DB constraint, guaranteeing one row per (environment, slug). In components like `apps/webapp/app/components/logs/LogsTaskFilter.tsx` and `apps/webapp/app/components/runs/v3/RunFilters.tsx`, using `key={item.slug}` for SelectItem list items is correct and unique. Do NOT flag `key={item.slug}` as potentially non-unique — the old duplicate-(slug, triggerSource) issue only existed with the legacy `DISTINCT` query, which this registry replaces.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-25T15:29:25.889Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2026-03-25T15:29:25.889Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `task.batchTrigger()` to trigger multiple runs of a task from inside another task

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-25T15:29:25.889Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2026-03-25T15:29:25.889Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `task()` from `trigger.dev/sdk` for basic task definitions with `id` and `run` properties

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-25T15:29:25.889Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2026-03-25T15:29:25.889Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `batch.triggerByTask()` to batch trigger multiple tasks by passing task instances

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-25T15:29:25.889Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2026-03-25T15:29:25.889Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `batch.triggerAndWait()` to trigger multiple different tasks and wait for all results

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-25T15:29:25.889Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2026-03-25T15:29:25.889Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `batch.triggerByTaskAndWait()` to batch trigger multiple tasks by instance and wait for results

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-25T15:29:25.889Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2026-03-25T15:29:25.889Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Use `task.batchTriggerAndWait()` to batch trigger a task and wait for all results from inside another task

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-16T14:19:16.330Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: apps/webapp/CLAUDE.md:0-0
Timestamp: 2026-04-16T14:19:16.330Z
Learning: Applies to apps/webapp/{app/v3/services/triggerTask.server.ts,app/v3/services/batchTriggerV3.server.ts} : In `triggerTask.server.ts` and `batchTriggerV3.server.ts`, do NOT add database queries. Task defaults (TTL, etc.) are resolved via `backgroundWorkerTask.findFirst()` in the queue concern (`queues.server.ts`). Piggyback on the existing query instead of adding new ones

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-06-02T21:20:43.541Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-06-02T21:20:43.541Z
Learning: Applies to **/*.{ts,tsx} : Import from `trigger.dev/sdk` when writing Trigger.dev tasks. Never use `trigger.dev/sdk/v3` or deprecated `client.defineJob`

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2025-11-27T16:26:37.432Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2025-11-27T16:26:37.432Z
Learning: Applies to packages/trigger-sdk/**/*.{ts,tsx} : In the Trigger.dev SDK (packages/trigger-sdk), prefer isomorphic code like fetch and ReadableStream instead of Node.js-specific code

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2025-11-27T16:26:37.432Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2025-11-27T16:26:37.432Z
Learning: Applies to internal-packages/database/**/*.{ts,tsx} : Use Prisma for database interactions in internal-packages/database with PostgreSQL

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-25T15:29:25.889Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2026-03-25T15:29:25.889Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Configure concurrency using the `queue` option with `concurrencyLimit` property

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-22T13:26:12.060Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3244
File: apps/webapp/app/components/code/TextEditor.tsx:81-86
Timestamp: 2026-03-22T13:26:12.060Z
Learning: In the triggerdotdev/trigger.dev codebase, do not flag `navigator.clipboard.writeText(...)` calls for `missing-await`/`unhandled-promise` issues. These clipboard writes are intentionally invoked without `await` and without `catch` handlers across the project; keep that behavior consistent when reviewing TypeScript/TSX files (e.g., usages like in `apps/webapp/app/components/code/TextEditor.tsx`).

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-22T19:24:14.403Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 3187
File: apps/webapp/app/v3/services/alerts/deliverErrorGroupAlert.server.ts:200-204
Timestamp: 2026-03-22T19:24:14.403Z
Learning: In the triggerdotdev/trigger.dev codebase, webhook URLs are not expected to contain embedded credentials/secrets (e.g., fields like `ProjectAlertWebhookProperties` should only hold credential-free webhook endpoints). During code review, if you see logging or inclusion of raw webhook URLs in error messages, do not automatically treat it as a credential-leak/secrets-in-logs issue by default—first verify the URL does not contain embedded credentials (for example, no username/password in the URL, no obvious secret/token query params or fragments). If the URL is credential-free per this project’s conventions, allow the logging.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-05-18T08:21:27.694Z
Learnt from: d-cs
Repo: triggerdotdev/trigger.dev PR: 3632
File: apps/webapp/sentry.server.ts:4-21
Timestamp: 2026-05-18T08:21:27.694Z
Learning: When handling Prisma error P1001 ("Can't reach database server") in TypeScript, don’t assume a single error shape. Prisma can surface P1001 via two different error classes/fields: `PrismaClientKnownRequestError` exposes it as `err.code === "P1001"` (common during mid-query connection drops), while `PrismaClientInitializationError` exposes it as `err.errorCode === "P1001"` (common on client startup failure). Therefore, predicates should use `err.code === "P1001" || err.errorCode === "P1001"`. Do not flag `err.code === "P1001"` as “unreachable/never matches,” as it is expected in production.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-05-18T08:21:27.694Z
Learnt from: d-cs
Repo: triggerdotdev/trigger.dev PR: 3632
File: apps/webapp/sentry.server.ts:4-21
Timestamp: 2026-05-18T08:21:27.694Z
Learning: When handling Prisma errors for P1001 ("Can't reach database server"), do not assume it only appears under a single property name. Prisma may surface P1001 via either `PrismaClientKnownRequestError` (`err.code === "P1001"`, e.g., mid-query connection drops) or `PrismaClientInitializationError` (`err.errorCode === "P1001"`, e.g., client startup connection failure). To reliably detect the condition, check `err.code === "P1001" || err.errorCode === "P1001"`, and avoid review rules that would incorrectly flag `err.code === "P1001"` as unreachable/never-matching.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-05-12T21:04:05.815Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3542
File: apps/webapp/app/components/sessions/v1/SessionStatus.tsx:1-3
Timestamp: 2026-05-12T21:04:05.815Z
Learning: In this Remix + TypeScript codebase, do not flag a server/client boundary violation when a file imports only types from a module matching `*.server`.

Specifically, it’s safe to import types using `import type { Foo } from "*.server"` or `import { type Foo } from "*.server"` because TypeScript erases type-only imports at compile time and they emit no JavaScript, so they won’t cross the Remix server/client bundle boundary.

Only raise the boundary concern for value imports (e.g., `import { Foo }` without `type`, or `import Foo`), since those produce JavaScript output.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-05-22T11:50:56.079Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 3333
File: apps/webapp/app/runEngine/services/triggerFailedTask.server.ts:76-80
Timestamp: 2026-05-22T11:50:56.079Z
Learning: When reviewing changes related to org-scoped ClickHouse / org reassignment, treat ClickHouse as the source of truth for the affected read paths (e.g., run lists, span detail, logs) with no Postgres fallback. Reassigning an org from one ClickHouse cluster to another must be done by migrating that org’s existing ClickHouse data between clusters first—do not assume it’s sufficient to update only the OrganizationDataStore entry. If any implementation/change would make org reassignment possible, ensure the required migration design/implementation is included (work tracked under Linear TRI-9659, sub-issue of TRI-7994). During the initial rollout of org-scoped ClickHouse (PR `#3333`), no production org has an override configured, so the limitation is not yet reachable in production; don’t introduce production-dependent assumptions that rely on overrides being active.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-06-04T18:16:35.386Z
Learnt from: nicktrn
Repo: triggerdotdev/trigger.dev PR: 3836
File: apps/supervisor/src/backpressure/backpressureMonitor.ts:3-5
Timestamp: 2026-06-04T18:16:35.386Z
Learning: When reviewing TypeScript in this repo, apply the rule “prefer type aliases over interfaces” only to data/object shapes and union/intersection type modeling. If an interface is being used as a behavioral contract for collaborators to implement (e.g., method-shape interfaces that define required behavior, such as `BackpressureLogger` / `BackpressureSignalSource` in `apps/supervisor/src/backpressure/backpressureMonitor.ts`), keep it as an `interface` and do not flag it as a type-alias-vs-interface violation.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
🔇 Additional comments (8)
apps/webapp/app/runEngine/services/streamBatchItems.server.ts (8)

7-7: LGTM!

Also applies to: 57-61


71-76: LGTM!

Also applies to: 91-97


178-204: LGTM!


388-493: LGTM!


635-656: LGTM!


738-744: LGTM!


774-788: LGTM!

Also applies to: 796-813


827-840: LGTM!


Walkthrough

This pull request refactors the Phase 2 streaming batch ingest endpoint to process NDJSON items with bounded concurrency rather than sequentially. It introduces a configurable STREAMING_BATCH_INGEST_CONCURRENCY environment variable (default 10) and replaces sequential for-await iteration with p-map to limit in-flight items and bound memory usage. The NDJSON parser now tracks emit positions to backfill oversized item marker indices when extraction fails, ensuring consistent ordering across concurrent execution. All operational guarantees—index-based ordering, atomic deduplication per index, and idempotency—are preserved. Test coverage includes new validation for concurrency bounds and deduplication behavior.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/tri-10273-managed-runs-streaming-batchtriggerandwait-fails-with-408

Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Devin Review: No Issues Found

Devin Review analyzed this PR and found no potential bugs to report.

View in Devin Review to see 4 additional findings.

Open in Devin Review

coderabbitai[bot]

This comment was marked as resolved.

@mintlify
Copy link
Copy Markdown
Contributor

mintlify Bot commented May 29, 2026

Preview deployment for your docs. Learn more about Mintlify Previews.

Project Status Preview Updated (UTC)
trigger 🟢 Ready View Preview May 29, 2026, 6:09 PM

💡 Tip: Enable Workflows to automatically generate PRs for you.

matt-aitken and others added 2 commits June 6, 2026 20:33
…273)

Phase 2 of the v3 streaming batch API (POST /api/v3/batches/:batchId/items)
processed streamed items strictly sequentially. For batches of many large
payloads — each offloaded to object storage inline — this serialized N object-store
round-trips inside one request, blowing past Node's default 300s server.requestTimeout.
The webapp then returned 408, which the SDK reads as "408 terminated" and retries 5x,
turning a slow ingest into a ~26-minute failure.

Ingest now runs through p-map over the NDJSON async iterable with bounded concurrency
(STREAMING_BATCH_INGEST_CONCURRENCY, default 10). p-map pulls lazily, so at most
`concurrency` items are read/in-flight at once — bounding peak memory to roughly
concurrency x STREAMING_BATCH_ITEM_MAXIMUM_SIZE while preserving stream backpressure.
Set the env to 1 for fully sequential ingestion.

Safe by construction: run order derives from each item's index (enqueue timestamp =
batch.createdAt + index), and enqueueBatchItem dedups atomically per index — neither
depends on processing order. The NDJSON parser now stamps oversized-item markers with
their emit position, removing the consumer's sequential lastIndex assumption. The
count-check + conditional seal path is unchanged.

Tests: bounded-concurrency ingest of a 100-item batch, in-flight cap assertion,
concurrent dedup on Phase 2 retry, and emit-position marker indexing. Full existing
sealing/idempotency suite still green (42/42).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- Enforce positive STREAMING_BATCH_INGEST_CONCURRENCY in the env schema
  (.int().positive()) — p-map requires concurrency >= 1, so 0/negative would
  throw at runtime.
- Apply the same out-of-range index guard to oversized-item markers as normal
  items, so an oversized item with index >= runCount returns a 4xx instead of
  creating a stray pre-failed run. Covered by a new test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@matt-aitken matt-aitken force-pushed the feature/tri-10273-managed-runs-streaming-batchtriggerandwait-fails-with-408 branch from 4ca9807 to de3489f Compare June 6, 2026 19:34
@pkg-pr-new
Copy link
Copy Markdown

pkg-pr-new Bot commented Jun 6, 2026

Open in StackBlitz

@trigger.dev/build

npm i https://pkg.pr.new/@trigger.dev/build@de3489f

trigger.dev

npm i https://pkg.pr.new/trigger.dev@de3489f

@trigger.dev/core

npm i https://pkg.pr.new/@trigger.dev/core@de3489f

@trigger.dev/plugins

npm i https://pkg.pr.new/@trigger.dev/plugins@de3489f

@trigger.dev/python

npm i https://pkg.pr.new/@trigger.dev/python@de3489f

@trigger.dev/react-hooks

npm i https://pkg.pr.new/@trigger.dev/react-hooks@de3489f

@trigger.dev/redis-worker

npm i https://pkg.pr.new/@trigger.dev/redis-worker@de3489f

@trigger.dev/rsc

npm i https://pkg.pr.new/@trigger.dev/rsc@de3489f

@trigger.dev/schema-to-json

npm i https://pkg.pr.new/@trigger.dev/schema-to-json@de3489f

@trigger.dev/sdk

npm i https://pkg.pr.new/@trigger.dev/sdk@de3489f

commit: de3489f

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant