- Docs
- Fleet
Fleet
pluck.fleet({...}) is a full orchestration runtime, not a parallel primitive. N identities × M targets, proxy pool rotation, cross-member rate limits, Ed25519-signed audit chain, reputation circuit breaker, all wired to a pluggable Substrate so the in-process default can later be swapped for a Kite-backed backend without any caller-side change.
The mental model
A single pluck() is one agent doing one thing. A fleet is N agents doing coordinated things at scale, each with its own identity (user-agent, viewport, timezone, cookie jar, proxy) and its own failure-accounting. The runtime binds those members to the three concerns every coordinated campaign eventually hits:
- Identity hygiene. Members don't share cookie jars. Requests look like independent clients.
- Politeness. Every target enforces a floor rate (default 2 req/s, never below the safety rail). The circuit breaker halts on > 10% fail rate so you can't DDoS yourself into an ISP blacklist.
- Auditability. Every member op emits an HLC-timestamped event through the Substrate event log. Receipts chain parent-to-child via
parentSigso an entire campaign is a verifiable Merkle forest.
The single-factory call
import { createPluck } from "@sizls/pluck";
const pluck = createPluck({ signingKey: process.env.PLUCK_SIGNING_KEY });
const fleet = pluck.fleet({
count: 1000,
identities: loadedIdentities, // from v0.5 encrypted store
proxies: proxyPool,
activeConcurrency: 50,
rateLimit: { perTarget: { maxRequests: 2, windowMs: 1000 } },
circuitBreaker: { failureRateThreshold: 0.1, windowMs: 60_000 },
audit: {
signingKey: () => process.env.PLUCK_SIGNING_KEY!,
sink: "./fleet-audit.ndjson",
},
});
The factory returns a FleetInstance with three workflow coordinators.
Workflow coordinators
fleet.broadcast(task)
Run the same task on every member in parallel. Useful for "check this same thing from 1000 different perspectives."
const results = await fleet.broadcast(async (pluck, member) =>
pluck("https://api.example.com/public-data", { identity: member.identity }),
);
fleet.plan(planner, task)
Build a per-member plan first (member index → input), then run each input through the task. Useful for "give each member a different target."
const results = await fleet.plan(
(i, member) => ({ target: platforms[i], identity: member.identity }),
({ target, identity }, pluck) =>
pluck(target, { identity, actor: "browser-agent", instruction: "sign up" }),
);
fleet.pipeline(stages[])
Run a staged pipeline – stage N's per-member output is stage N+1's per-member input. Each stage runs across all members before the next starts. Failed members short-circuit: stage 2 does not run with a bogus undefined input for a member that failed stage 1; the failure propagates directly to the final result.
const results = await fleet.pipeline([
async (_in, p, member) => await p.context(member.identity.home),
async (ctx, p, member) => await p.act(ctx.nextUrl, { identity: member.identity, action: "post" }),
async (actResult) => verifyChain([actResult.signedReceipt!], { ... }),
]);
Guardrails (all on by default)
- Identity-scoped execution. Each member has its own cookie jar, proxy, UA, viewport, locale, timezone.
- Proxy pool. Round-robin selection with per-proxy health (max consecutive failures + cooldown). Reputation-hit rotation on
status === 429/403, captcha messages, or canonicalHTTP_429/HTTP_403/CAPTCHA_DETECTEDcodes. - Per-target rate limiter. Token-bucket, default 2 req/s, floor cannot be disabled below 1 req/s/target.
- Reputation circuit breaker. Halts on > 10% failure rate over 60 s with a 20-sample minimum.
fleet.status()surfaces tripped targets. Trip is not self-healing – the breaker stays tripped until an operator callsreset(). That's deliberate: you don't want a runaway campaign quietly repairing itself while burning reputation. - Signed audit chain. Ed25519 receipts on every member op, chained by
parentSig. Under anyactiveConcurrency, a chain-tail mutex serialises sign calls so the chain stays linear – verify it end-to-end withverifyChain(). NDJSON sink optional.
The Substrate abstraction
Every piece of the runtime – event emission, policy evaluation, scheduling – flows through a Substrate interface:
interface Substrate {
emit(event: SubstrateEvent): Promise<void>;
schedule<T>(work: ScheduledWork<T>): Promise<RunId>;
await<T>(runId: RunId): Promise<RunResult<T>>;
eventLog: EventLogAdapter;
policy: PolicyAdapter;
}
The event envelope is MCP-compatible and Kite-aligned so a future KiteSubstrate (in a separate @sizls/pluck-kite-substrate package, published when Kite v0.1 lands) can plug in without any change to fleet code:
// Ship today – in-process LocalSubstrate backs every fleet by default:
const fleet = pluck.fleet({ ...config });
// Future – when Kite v0.1 + @sizls/pluck-kite-substrate are ready:
// import { KiteSubstrate } from "@sizls/pluck-kite-substrate";
// const fleet = pluck.fleet({ ...config, substrate: new KiteSubstrate({ ... }) });
The same fleet API, a different execution backend. Pluck ships one production substrate today (LocalSubstrate); the Kite bridge is additive and opt-in.
Durable event log (SQLite)
LocalSubstrate defaults to an in-memory ring buffer for the event log – fast, zero-config, but lost on process exit. For campaigns where the audit chain needs to outlive the host process (or where an operator wants to resume after a crash), swap in the SQLite-backed adapter:
import { createPluck, createLocalSubstrate, createSqliteEventLog } from "@sizls/pluck";
const eventLog = await createSqliteEventLog({
path: "./fleet-audit.sqlite",
// capacity: 100_000, // optional: prune oldest events back to N
// journalMode: "wal", // default – concurrent reader-friendly
});
const pluck = createPluck();
const fleet = pluck.fleet({
count: 1000,
// ...
substrate: createLocalSubstrate({ eventLog }),
});
try {
await fleet.broadcast(/* ... */);
} finally {
// fleet.destroy() calls eventLog.close() internally on v0.15+;
// explicit close() left here for adapters created without a fleet.
await fleet.destroy();
}
The SQLite adapter implements the same EventLogAdapter contract as the in-memory log (append, query, size) plus a close() method. The contract added an optional close?(): Promise<void> hook in v0.15; both fleet.destroy() and runtime.destroy() call it on the way out so durable adapters release their file handles automatically. Schema:
- single
eventstable keyed by event id (ON CONFLICT DO NOTHINGso retries are idempotent) - indexes on
(hlc_physical, hlc_counter),(origin, hlc_physical, hlc_counter), and(kind, hlc_physical, hlc_counter) - WAL journal mode by default – the audit tooling (e.g.
pluck verify-chain) can read in parallel with the writer
better-sqlite3 is an optional peer dep. Pluck core stays installable without it; the adapter throws a typed MISSING_PEER_DEP error if you call createSqliteEventLog without first running pnpm add better-sqlite3.
Resume-from-checkpoint pattern. After a crash, query the durable log for completion events, build a set of done targets, and re-broadcast only what's outstanding:
import { createSqliteEventLog } from "@sizls/pluck";
// Reopen the audit log from before the crash.
const eventLog = await createSqliteEventLog({ path: "./fleet-audit.sqlite" });
// Pull every fleet.member.complete event the previous run emitted.
const completed = await eventLog.query({
kind: "fleet.member.complete",
limit: 100_000,
});
const doneTargets = new Set<string>(
completed.map((e) => (e.payload as { target: string }).target),
);
// Filter the input list down to just the work that hasn't landed.
const remaining = allTargets.filter((t) => !doneTargets.has(t));
// Resume – same fleet config, same audit log, only undone work.
const fleet = pluck.fleet({
count: remaining.length,
substrate: createLocalSubstrate({ eventLog }),
/* ... */
});
await fleet.broadcast(/* ... over `remaining` ... */);
await fleet.destroy(); // closes the SQLite handle
The HLC sort guarantees query() returns events in causal order even if the wall clock skewed during the original run.
Caveats.
- Payloads must be
JSON.stringify-encodable.BigInt, circular references, andSymbolkeys throw at append time. The agent runtime'ssafeStringifyToolOutput(v0.14) coerces these for tool messages, but raw substrate event payloads do not – wrap problematic values yourself before emitting.pathis not sandboxed. The adapter writes wherever the operator points it. Validate paths in your own config layer if untrusted input feeds them.
HLC – Hybrid Logical Clock
Event timestamps use HLC pairs ({physical, counter}) so a later Merkle-chain merge across per-member event logs stays causally ordered even when wall-clock monotonicity doesn't hold. Every event fires through clock.tick(); merge() on a caller-supplied HLC clamps to wall + maxSkewMs (default 10 min) so a misbehaving peer can't jump your clock to Number.MAX_SAFE_INTEGER.
Policy adapter
Policy is a thin hook evaluated on every event before it hits the log:
const substrate = createLocalSubstrate({
policy: {
evaluate(event) {
if (event.capability.includes("target:evil.com")) {
return { allow: false, reason: "DENY_TARGET" };
}
return { allow: true };
},
},
});
Capabilities are structured tags: fleet:op, fleet:policy, target:<host>. Fleet events include the target host when known so policies can gate by destination without knowing anything about fleet internals.
fleet.status() – live observability
const status = fleet.status();
// {
// memberCount: 1000,
// activeCount: 48,
// completedCount: 612,
// failedCount: 19,
// circuitTripped: false,
// proxyHealth: [{ proxyId, healthy, consecutiveFailures, reputationDemerits, lastUsedAt }, ...],
// }
Poll from a dashboard / MCP tool to surface in-flight state. Operators running 1000-member campaigns lean on this to catch reputation drift before the breaker trips.
Lifecycle + graceful destroy
try {
const fleet = pluck.fleet({ ... });
await fleet.broadcast(task);
} finally {
await fleet.destroy(); // drains in-flight dispatches, flushes audit sink
}
destroy() refuses new dispatches (throws cannot dispatch after destroy()), waits for every active member op to complete, then drains the chain-tail mutex and the audit sink. Nothing lands in the NDJSON after the final flush returns.
Out of scope for v0.11
Tracked for point-releases:
- SQLite persistence / resumability.
FleetConfigschema is shaped to accept it; the wiring lands when the operator story needs it (crash-mid-campaign → resume from last checkpoint). fleet.agents(...)runtime. Lands in v0.12 – heterogeneous multi-agent execution where each agent drives its own LLM + MCP surface + tool calls through this Substrate. Fleet ships today with broadcast / plan / pipeline; agents is the next coordinator on the same runtime.
What's next
- Concepts: Act – the phase every fleet mutation flows through.
- Concepts: Connect – identity-scoped connector options members inherit.
- Recipe: DriftWatch Fleet – SSH-fleet drift with Merkle-chained audit log.