Skip to content

Core Concepts

Connect

The first phase of the Pluck pipeline. Match a URI to a connector, open a typed channel, and hand typed bytes to the rest of the pipeline.


The mental model

Every pluck() call starts with a URI. The connect phase answers one question:

Given this URI, which connector opens a channel to that source – and what shape of bytes does it give back?

There is no standalone connect() export. Connect is a phase of the pipeline, not a verb you call directly. You invoke the pipeline, and the connect phase runs first:

TypeScript
import { pluck } from "@sizls/pluck";

// The connect phase picks a connector, opens the channel, and hands
// the bytes to the extract / shape / act / sense phases that follow.
const result = await pluck("postgres://localhost/app?limit=10");

The phase is separated out because every downstream phase benefits from it being uniform – once bytes are in hand with a known content type, extraction, shaping, and acting stop caring about where the bytes came from.


The registry

Pluck ships with 30 built-in connectors. A registry matches every URI against each connector's match() function in a deterministic order. The first match wins.

TypeScript
interface Connector {
  name: string;
  match: (uri: string) => boolean;
  connect: (uri: string, options: ConnectOptions) => Promise<ConnectResult>;
  destroy?: () => Promise<void>;
}

Order matters. Specific connectors register before generic ones so reddit://r/typescript routes to the Reddit connector and not the HTTP catch-all. File-storage connectors (drive.google.com/file/d/*, dropbox.com/s/*) register before generic HTTP for the same reason.

The full list lives at Reference → Connectors.


What you get back

Every connector returns the same shape – a ConnectResult:

TypeScript
interface ConnectResult {
  type: "text" | "html" | "json" | "binary" | "stream";
  content: string | Buffer | ReadableStream;
  contentType?: string;
  metadata: Partial<PluckMetadata>;
}

The type field tells the extract phase how to read the content. html goes through DOM parsing. json gets parsed. binary stays a Buffer. stream is an async iterable – used by SSH tail -F, Kafka consumers, WebSocket subscriptions, and anything else long-lived.

metadata carries everything the connector learned during the call – status codes, byte counts, content hashes, timestamps, host fingerprints for SSH, offsets for Kafka. Downstream phases read from it. You read from it on the returned PluckResult.


Streaming connectors

Some sources are inherently long-lived – you don't "fetch" an SSH tail or a Kafka topic, you iterate it. Those connectors implement StreamingConnector in addition to Connector:

TypeScript
import { createWebSocketConnector } from "@sizls/pluck";

const conn = createWebSocketConnector();
const stream = await conn.stream("wss://stream.binance.com:9443/ws/btcusdt@trade", {
  subprotocols: ["binance"],
  heartbeatIntervalMs: 30_000,
  reconnect: { maxAttempts: 5, minDelayMs: 1000, maxDelayMs: 60_000 },
});

for await (const chunk of stream) {
  if (chunk.data) process.stdout.write(chunk.data);
  if (chunk.error) console.error(chunk.error.message); // non-fatal, keep going
  if (chunk.end) break;
}

StreamChunk carries either data, a recoverable error, or a terminal end: true marker. The caller decides when to stop.

Today's streaming connectors: SSH (tail -F across fleet URIs with brace expansion), Kafka (consumer peek with backpressure), and WebSocket (with subprotocol negotiation, ping/pong heartbeat, and auto-reconnect). The database, social, and RSS connectors are one-shot connect() today – streaming variants are on the roadmap.


Custom connectors

The registry is open. Register your own when you want a URI scheme Pluck doesn't ship:

TypeScript
import { createPluck, defineConnector } from "@sizls/pluck";

const stripe = defineConnector({
  name: "stripe",
  match: (uri) => uri.startsWith("stripe://"),
  async connect(uri, options) {
    const path = uri.replace(/^stripe:\/\//, "/v1/");
    const response = await fetch(`https://api.stripe.com${path}`, {
      headers: { Authorization: `Bearer ${process.env.STRIPE_KEY}` },
      signal: options.signal,
    });
    const body = await response.text();
    return {
      type: "json",
      content: body,
      contentType: "application/json",
      metadata: { status: response.status },
    };
  },
});

const pluck = createPluck({ connectors: [stripe] });
const customers = await pluck("stripe://customers?limit=10");

defineConnector is a typed pass-through – it gives you parameter inference inside connect(uri, options) without the Connector type import. Custom connectors run ahead of the built-ins, whether you pass them through createPluck({ connectors: [...] }) or register them post-construction via instance.connectors.register(stripe). Both paths prepend, so your URI match wins over any built-in that happens to match the same pattern.

Want to check what matches a URI without opening a socket?

TypeScript
pluck.connectors.find("stripe://customers"); // "stripe"
pluck.connectors.list();                     // ["stripe", "twitter", "vimeo", ...]

Safety guarantees

The connect phase enforces a handful of guarantees for every connector, built-in or custom:

  • SSRF prevention. Every outbound host is validated through validateHost before the socket opens. Private and link-local ranges are rejected by default – loopback, RFC1918 (10/8, 172.16/12, 192.168/16), link-local (169.254/16, fe80::/10), unique-local IPv6 (fc00::/7), carrier-grade NAT (100.64/10), and the null address (0.0.0.0/8). Redirects are re-validated after resolution – a 302 to 169.254.169.254/latest/meta-data/ will not bypass the gate.
  • Bounded reads. Bodies cap at MAX_BODY_BYTES (10 MB). Individual lines cap at MAX_LINE_BYTES (1 MiB) for streaming connectors. Session totals cap at MAX_SESSION_BYTES (100 MB). A connector that tries to pull the universe gets a BODY_TOO_LARGE error, not an OOM.
  • Fleet expansion caps. Brace expansion (ssh://web-{01..1000}) is capped at MAX_FLEET_EXPANSION (1,000 hosts) with a hard ceiling of 25,000 via --max-fleet to prevent accidental fork bombs.
  • AbortSignal propagation. Every connector honors options.signal. A controller.abort() tears the in-flight socket down and resolves or rejects the pluck() promise cleanly.
  • Peer-dep isolation. The ssh2, kafkajs, ws, and basic-ftp connectors load their peer lazily through loadOptionalPeer – import @sizls/pluck without kafkajs installed and the Kafka connector returns a MISSING_PEER_DEP error only when someone actually tries to connect via kafka://. Database connectors (pg, mysql2, better-sqlite3, mongodb, ioredis) use direct dynamic imports with the same surface result: a missing peer surfaces only on first use, not at import time.

Connection pooling

The built-in connectors that open expensive sockets – Postgres, MySQL, SQLite, MongoDB, Redis, SSH – keep module-scoped pools keyed by the full connection identity. The pool key is strict: hostname + port + user + auth-fingerprint + (for SSH) the bastion chain. Two URIs that differ by a single character land on two different clients.

user@host:port[|bastion:user@host:port][|auth:<8-char-sha256-prefix>]

Back-to-back calls reuse the same client; instance.destroy() closes every pool cleanly.

Minimum end-to-end

TypeScript
import { createPluck } from "@sizls/pluck";

const pluck = createPluck();

// First call opens a real Postgres connection + SELECT cursor.
const first = await pluck(
  "postgres://reader:secret@db.internal:5432/analytics?table=events&limit=100",
);

// Second call reuses the SAME pooled pg.Pool. No TCP handshake, no
// TLS negotiation, no auth round-trip. Just a fresh query on the
// existing connection.
const second = await pluck(
  "postgres://reader:secret@db.internal:5432/analytics?table=events&limit=200",
);

// All pool clients (pg, mysql, mongo, redis, ssh) drain + close here.
await pluck.destroy();

The pool is invisible in normal use – the only signal that it's working is the latency difference between the first call and every subsequent one for the same host.

Why the pool key includes more than host:port

A credential rotation must land on a fresh client. Two different callers against the same host with different keys must not share a connection. The key incorporates a short sha256 fingerprint of the auth material so credential changes force a pool miss without ever storing the secret itself as a map key.

TypeScript
// The auth fingerprint is part of the key, so rotating from `old-secret`
// to `new-secret` produces two distinct pool entries – zero risk of a
// post-rotation call reaching the old client.
await pluck("postgres://reader:old-secret@db.internal/app?limit=10");
await pluck("postgres://reader:new-secret@db.internal/app?limit=10");
// → two pool entries, not one. The `old-secret` client is eligible for
//   LRU eviction once `SSH_POOL_IDLE_MS` (60s) elapses with no use.

Fleet expansion + SSH bastions

SSH is the most aggressive pooling case because handshakes are expensive (~300 ms each, multiplied by the fleet size). Pluck's SSH pool uses the standard ssh2 multiplex primitive: one TCP session can back dozens of concurrent SFTP reads without serialising them.

TypeScript
// 40-host fleet. The first time this runs, 40 handshakes fire in
// parallel. Every subsequent call against the same user / bastion /
// key material reuses the existing pool clients – the second run
// reads all 40 config files in ~50 ms instead of ~12 s.
for (const host of expandFleetUri("ssh://web-{01..40}.prod.example.com")) {
  await pluck(`${host}/etc/nginx/nginx.conf`);
}

When a bastion is in play, the bastion host becomes part of the pool key:

TypeScript
// Two distinct pool entries – bastion is part of the key.
await pluck("ssh://jumphost.prod/web-01/etc/nginx.conf");
await pluck("ssh://web-01/etc/nginx.conf");
//  ^ different routing, different pool entry, no accidental leak
//    of credentials or ports across pool boundaries.

Eviction

Pools are LRU-bounded. SSH defaults to 32 concurrent clients and drops idle entries after 60 seconds of inactivity. Postgres / MySQL / Mongo / Redis use their respective driver's pool (pg.Pool, mysql2/promise, MongoClient, ioredis), configured for sane concurrency limits and idle teardown.

A pool miss is always safe – the connector rebuilds the client on the next call. A pool entry whose underlying socket died mid-flight is flagged destroyed and skipped by the next acquirer.

When you want to NOT pool

The pool is per-PluckInstance. Want isolated clients (e.g. for tests that assert first-call vs nth-call behaviour)?

TypeScript
const isolated = createPluck();
try {
  await isolated("postgres://reader@db.internal/events");
  // every operation here runs through `isolated`'s own pool
} finally {
  await isolated.destroy();
}

See Recipe: Driftwatch Fleet for the full fleet-pooling pattern at scale.


Hardening the network policy

For production apps that should only reach a fixed set of external services, the network config is the right lever. Every field is opt-in and additive.

TypeScript
import { createPluck } from "@sizls/pluck";

const pluck = createPluck({
  network: {
    // Exact names, suffix wildcards, or CIDR (Classless Inter-Domain Routing) ranges (IPv4 + IPv6).
    allowlist: [
      "api.example.com",
      "*.internal.example.com",
      "10.42.0.0/16",
    ],
    httpsOnly: true, // reject http:// outright
    blockMetadataHostnames: true, // default – blocks metadata.google.internal etc.
  },
});

The allowlist is evaluated AFTER the built-in SSRF / private-IP / cloud-metadata checks, so private ranges in the allowlist still reject unless you also set allowPrivateIps: true (a separate, intentionally-scary escape hatch).

validateUrlWithDns + validateHost re-check the resolved IP against the allowlist too, closing the rebinding gap where a supposedly-allowlisted hostname resolves to an attacker-controlled IP outside every CIDR entry.


What's next

  • Navigate – the next phase. Prepare the bytes before extract sees them (Readability, Playwright, agent-driven).
  • Extract – once the channel is open and content is prepared, pull structured data out.
  • Reference: Connectors – every built-in connector, its URI scheme, and the options it honors.
  • MCP-First Pipeline – every connector is also an MCP tool, no glue code required.
Edit this page on GitHub
Previous
MCP-First Pipeline

Ready to build?

Install Pluck and follow the Quick Start guide to wire MCP-first data pipelines into your agents and fleets in minutes.

Get started →