Skip to content

Fleet

pluck.fleet({...}) is a full orchestration runtime, not a parallel primitive. N identities × M targets, proxy pool rotation, cross-member rate limits, Ed25519-signed audit chain, reputation circuit breaker, all wired to a pluggable Substrate so the in-process default can later be swapped for a Kite-backed backend without any caller-side change.


The mental model

A single pluck() is one agent doing one thing. A fleet is N agents doing coordinated things at scale, each with its own identity (user-agent, viewport, timezone, cookie jar, proxy) and its own failure-accounting. The runtime binds those members to the three concerns every coordinated campaign eventually hits:

  1. Identity hygiene. Members don't share cookie jars. Requests look like independent clients.
  2. Politeness. Every target enforces a floor rate (default 2 req/s, never below the safety rail). The circuit breaker halts on > 10% fail rate so you can't DDoS yourself into an ISP blacklist.
  3. Auditability. Every member op emits an HLC-timestamped event through the Substrate event log. Receipts chain parent-to-child via parentSig so an entire campaign is a verifiable Merkle forest.

The single-factory call

TypeScript
import { createPluck } from "@sizls/pluck";

const pluck = createPluck({ signingKey: process.env.PLUCK_SIGNING_KEY });

const fleet = pluck.fleet({
  count: 1000,
  identities: loadedIdentities,           // from v0.5 encrypted store
  proxies: proxyPool,
  activeConcurrency: 50,
  rateLimit: { perTarget: { maxRequests: 2, windowMs: 1000 } },
  circuitBreaker: { failureRateThreshold: 0.1, windowMs: 60_000 },
  audit: {
    signingKey: () => process.env.PLUCK_SIGNING_KEY!,
    sink: "./fleet-audit.ndjson",
  },
});

The factory returns a FleetInstance with three workflow coordinators.


Workflow coordinators

fleet.broadcast(task)

Run the same task on every member in parallel. Useful for "check this same thing from 1000 different perspectives."

TypeScript
const results = await fleet.broadcast(async (pluck, member) =>
  pluck("https://api.example.com/public-data", { identity: member.identity }),
);

fleet.plan(planner, task)

Build a per-member plan first (member index → input), then run each input through the task. Useful for "give each member a different target."

TypeScript
const results = await fleet.plan(
  (i, member) => ({ target: platforms[i], identity: member.identity }),
  ({ target, identity }, pluck) =>
    pluck(target, { identity, actor: "browser-agent", instruction: "sign up" }),
);

fleet.pipeline(stages[])

Run a staged pipeline – stage N's per-member output is stage N+1's per-member input. Each stage runs across all members before the next starts. Failed members short-circuit: stage 2 does not run with a bogus undefined input for a member that failed stage 1; the failure propagates directly to the final result.

TypeScript
const results = await fleet.pipeline([
  async (_in, p, member) => await p.context(member.identity.home),
  async (ctx, p, member) => await p.act(ctx.nextUrl, { identity: member.identity, action: "post" }),
  async (actResult) => verifyChain([actResult.signedReceipt!], { ... }),
]);

Guardrails (all on by default)

  • Identity-scoped execution. Each member has its own cookie jar, proxy, UA, viewport, locale, timezone.
  • Proxy pool. Round-robin selection with per-proxy health (max consecutive failures + cooldown). Reputation-hit rotation on status === 429/403, captcha messages, or canonical HTTP_429 / HTTP_403 / CAPTCHA_DETECTED codes.
  • Per-target rate limiter. Token-bucket, default 2 req/s, floor cannot be disabled below 1 req/s/target.
  • Reputation circuit breaker. Halts on > 10% failure rate over 60 s with a 20-sample minimum. fleet.status() surfaces tripped targets. Trip is not self-healing – the breaker stays tripped until an operator calls reset(). That's deliberate: you don't want a runaway campaign quietly repairing itself while burning reputation.
  • Signed audit chain. Ed25519 receipts on every member op, chained by parentSig. Under any activeConcurrency, a chain-tail mutex serialises sign calls so the chain stays linear – verify it end-to-end with verifyChain(). NDJSON sink optional.

The Substrate abstraction

Every piece of the runtime – event emission, policy evaluation, scheduling – flows through a Substrate interface:

TypeScript
interface Substrate {
  emit(event: SubstrateEvent): Promise<void>;
  schedule<T>(work: ScheduledWork<T>): Promise<RunId>;
  await<T>(runId: RunId): Promise<RunResult<T>>;
  eventLog: EventLogAdapter;
  policy: PolicyAdapter;
}

The event envelope is MCP-compatible and Kite-aligned so a future KiteSubstrate (in a separate @sizls/pluck-kite-substrate package, published when Kite v0.1 lands) can plug in without any change to fleet code:

TypeScript
// Ship today – in-process LocalSubstrate backs every fleet by default:
const fleet = pluck.fleet({ ...config });

// Future – when Kite v0.1 + @sizls/pluck-kite-substrate are ready:
// import { KiteSubstrate } from "@sizls/pluck-kite-substrate";
// const fleet = pluck.fleet({ ...config, substrate: new KiteSubstrate({ ... }) });

The same fleet API, a different execution backend. Pluck ships one production substrate today (LocalSubstrate); the Kite bridge is additive and opt-in.

Durable event log (SQLite)

LocalSubstrate defaults to an in-memory ring buffer for the event log – fast, zero-config, but lost on process exit. For campaigns where the audit chain needs to outlive the host process (or where an operator wants to resume after a crash), swap in the SQLite-backed adapter:

TypeScript
import { createPluck, createLocalSubstrate, createSqliteEventLog } from "@sizls/pluck";

const eventLog = await createSqliteEventLog({
  path: "./fleet-audit.sqlite",
  // capacity: 100_000,   // optional: prune oldest events back to N
  // journalMode: "wal",   // default – concurrent reader-friendly
});

const pluck = createPluck();
const fleet = pluck.fleet({
  count: 1000,
  // ...
  substrate: createLocalSubstrate({ eventLog }),
});

try {
  await fleet.broadcast(/* ... */);
} finally {
  // fleet.destroy() calls eventLog.close() internally on v0.15+;
  // explicit close() left here for adapters created without a fleet.
  await fleet.destroy();
}

The SQLite adapter implements the same EventLogAdapter contract as the in-memory log (append, query, size) plus a close() method. The contract added an optional close?(): Promise<void> hook in v0.15; both fleet.destroy() and runtime.destroy() call it on the way out so durable adapters release their file handles automatically. Schema:

  • single events table keyed by event id (ON CONFLICT DO NOTHING so retries are idempotent)
  • indexes on (hlc_physical, hlc_counter), (origin, hlc_physical, hlc_counter), and (kind, hlc_physical, hlc_counter)
  • WAL journal mode by default – the audit tooling (e.g. pluck verify-chain) can read in parallel with the writer

better-sqlite3 is an optional peer dep. Pluck core stays installable without it; the adapter throws a typed MISSING_PEER_DEP error if you call createSqliteEventLog without first running pnpm add better-sqlite3.

Resume-from-checkpoint pattern. After a crash, query the durable log for completion events, build a set of done targets, and re-broadcast only what's outstanding:

TypeScript
import { createSqliteEventLog } from "@sizls/pluck";

// Reopen the audit log from before the crash.
const eventLog = await createSqliteEventLog({ path: "./fleet-audit.sqlite" });

// Pull every fleet.member.complete event the previous run emitted.
const completed = await eventLog.query({
  kind: "fleet.member.complete",
  limit: 100_000,
});
const doneTargets = new Set<string>(
  completed.map((e) => (e.payload as { target: string }).target),
);

// Filter the input list down to just the work that hasn't landed.
const remaining = allTargets.filter((t) => !doneTargets.has(t));

// Resume – same fleet config, same audit log, only undone work.
const fleet = pluck.fleet({
  count: remaining.length,
  substrate: createLocalSubstrate({ eventLog }),
  /* ... */
});
await fleet.broadcast(/* ... over `remaining` ... */);
await fleet.destroy();   // closes the SQLite handle

The HLC sort guarantees query() returns events in causal order even if the wall clock skewed during the original run.

Caveats.

  • Payloads must be JSON.stringify-encodable. BigInt, circular references, and Symbol keys throw at append time. The agent runtime's safeStringifyToolOutput (v0.14) coerces these for tool messages, but raw substrate event payloads do not – wrap problematic values yourself before emitting.
  • path is not sandboxed. The adapter writes wherever the operator points it. Validate paths in your own config layer if untrusted input feeds them.

HLC – Hybrid Logical Clock

Event timestamps use HLC pairs ({physical, counter}) so a later Merkle-chain merge across per-member event logs stays causally ordered even when wall-clock monotonicity doesn't hold. Every event fires through clock.tick(); merge() on a caller-supplied HLC clamps to wall + maxSkewMs (default 10 min) so a misbehaving peer can't jump your clock to Number.MAX_SAFE_INTEGER.

Policy adapter

Policy is a thin hook evaluated on every event before it hits the log:

TypeScript
const substrate = createLocalSubstrate({
  policy: {
    evaluate(event) {
      if (event.capability.includes("target:evil.com")) {
        return { allow: false, reason: "DENY_TARGET" };
      }
      return { allow: true };
    },
  },
});

Capabilities are structured tags: fleet:op, fleet:policy, target:<host>. Fleet events include the target host when known so policies can gate by destination without knowing anything about fleet internals.


fleet.status() – live observability

TypeScript
const status = fleet.status();
// {
//   memberCount: 1000,
//   activeCount: 48,
//   completedCount: 612,
//   failedCount: 19,
//   circuitTripped: false,
//   proxyHealth: [{ proxyId, healthy, consecutiveFailures, reputationDemerits, lastUsedAt }, ...],
// }

Poll from a dashboard / MCP tool to surface in-flight state. Operators running 1000-member campaigns lean on this to catch reputation drift before the breaker trips.


Lifecycle + graceful destroy

TypeScript
try {
  const fleet = pluck.fleet({ ... });
  await fleet.broadcast(task);
} finally {
  await fleet.destroy();   // drains in-flight dispatches, flushes audit sink
}

destroy() refuses new dispatches (throws cannot dispatch after destroy()), waits for every active member op to complete, then drains the chain-tail mutex and the audit sink. Nothing lands in the NDJSON after the final flush returns.


Out of scope for v0.11

Tracked for point-releases:

  • SQLite persistence / resumability. FleetConfig schema is shaped to accept it; the wiring lands when the operator story needs it (crash-mid-campaign → resume from last checkpoint).
  • fleet.agents(...) runtime. Lands in v0.12 – heterogeneous multi-agent execution where each agent drives its own LLM + MCP surface + tool calls through this Substrate. Fleet ships today with broadcast / plan / pipeline; agents is the next coordinator on the same runtime.

What's next

Edit this page on GitHub

Ready to build?

Install Pluck and follow the Quick Start guide to wire MCP-first data pipelines into your agents and fleets in minutes.

Get started →