S2
The @livestore/sync-s2 package lets you sync LiveStore with the official S2 backend (s2.dev).
- Package:
pnpm add @livestore/sync-s2 - Protocol: HTTP push/pull, live pull via SSE
Architecture
Section titled “Architecture”The API proxy handles:
- Business logic: Any kind of business logic that is specific to your application (e.g. rate limiting, auth, logging, etc.)
- S2 Stream Management: Creates basins and streams as needed
- S2 Request Translation: Converts LiveStore sync operations to authenticated S2 API calls
Client Setup
Section titled “Client Setup”Basic usage in your worker/server code:
import { const makeSyncBackend: ({ endpoint, ping: pingOptions, retry }: SyncS2Options) => SyncBackendConstructor<SyncMetadata>
makeSyncBackend } from '@livestore/sync-s2'
const const _backend: SyncBackendConstructor<{ readonly s2SeqNum: number & Brand<"S2SeqNum">;}, JsonValue>
_backend = function makeSyncBackend({ endpoint, ping: pingOptions, retry }: SyncS2Options): SyncBackendConstructor<SyncMetadata>
makeSyncBackend({ SyncS2Options.endpoint: string | { push: string; pull: string; ping: string;}
endpoint: '/api/s2', // Your API proxy endpoint // more options...})API Proxy Implementation
Section titled “API Proxy Implementation”S2 requires authentication and stream management that can’t be handled directly from the browser. You’ll need to implement an API proxy on your server that:
- Handles authentication with S2 using your access token
- Manages basins and streams (creates them if they don’t exist)
- Proxies requests between LiveStore and S2
Your proxy needs three endpoints:
Using Helper Functions
Section titled “Using Helper Functions”The @livestore/sync-s2 package provides helper functions to simplify the proxy implementation:
import { import Schema
Schema } from '@livestore/livestore'import * as import S2
S2 from '@livestore/sync-s2'import * as import S2Helpers
S2Helpers from '@livestore/sync-s2/s2-proxy-helpers'
// Configure S2 connectionconst const s2Config: S2.S2Config
s2Config: import S2Helpers
S2Helpers.interface S2Config
Configuration for S2 connections
S2Config = { S2Config.basin: string
basin: var process: NodeJS.Process
process.NodeJS.Process.env: NodeJS.ProcessEnv
The process.env property returns an object containing the user environment.
See environ(7).
An example of this object looks like:
{ TERM: 'xterm-256color', SHELL: '/usr/local/bin/bash', USER: 'maciej', PATH: '~/.bin/:/usr/bin:/bin:/usr/sbin:/sbin:/usr/local/bin', PWD: '/Users/maciej', EDITOR: 'vim', SHLVL: '1', HOME: '/Users/maciej', LOGNAME: 'maciej', _: '/usr/local/bin/node'}
It is possible to modify this object, but such modifications will not be
reflected outside the Node.js process, or (unless explicitly requested)
to other Worker threads.
In other words, the following example would not work:
node -e 'process.env.foo = "bar"' && echo $foo
While the following will:
import { env } from 'node:process';
env.foo = 'bar';console.log(env.foo);
Assigning a property on process.env will implicitly convert the value
to a string. This behavior is deprecated. Future versions of Node.js may
throw an error when the value is not a string, number, or boolean.
import { env } from 'node:process';
env.test = null;console.log(env.test);// => 'null'env.test = undefined;console.log(env.test);// => 'undefined'
Use delete to delete a property from process.env.
import { env } from 'node:process';
env.TEST = 1;delete env.TEST;console.log(env.TEST);// => undefined
On Windows operating systems, environment variables are case-insensitive.
import { env } from 'node:process';
env.TEST = 1;console.log(env.test);// => 1
Unless explicitly specified when creating a Worker instance,
each Worker thread has its own copy of process.env, based on its
parent thread's process.env, or whatever was specified as the env option
to the Worker constructor. Changes to process.env will not be visible
across Worker threads, and only the main thread can make changes that
are visible to the operating system or to native add-ons. On Windows, a copy of process.env on a Worker instance operates in a case-sensitive manner
unlike the main thread.
env.string | undefined
S2_BASIN ?? 'your-basin', S2Config.token: string
token: var process: NodeJS.Process
process.NodeJS.Process.env: NodeJS.ProcessEnv
The process.env property returns an object containing the user environment.
See environ(7).
An example of this object looks like:
{ TERM: 'xterm-256color', SHELL: '/usr/local/bin/bash', USER: 'maciej', PATH: '~/.bin/:/usr/bin:/bin:/usr/sbin:/sbin:/usr/local/bin', PWD: '/Users/maciej', EDITOR: 'vim', SHLVL: '1', HOME: '/Users/maciej', LOGNAME: 'maciej', _: '/usr/local/bin/node'}
It is possible to modify this object, but such modifications will not be
reflected outside the Node.js process, or (unless explicitly requested)
to other Worker threads.
In other words, the following example would not work:
node -e 'process.env.foo = "bar"' && echo $foo
While the following will:
import { env } from 'node:process';
env.foo = 'bar';console.log(env.foo);
Assigning a property on process.env will implicitly convert the value
to a string. This behavior is deprecated. Future versions of Node.js may
throw an error when the value is not a string, number, or boolean.
import { env } from 'node:process';
env.test = null;console.log(env.test);// => 'null'env.test = undefined;console.log(env.test);// => 'undefined'
Use delete to delete a property from process.env.
import { env } from 'node:process';
env.TEST = 1;delete env.TEST;console.log(env.TEST);// => undefined
On Windows operating systems, environment variables are case-insensitive.
import { env } from 'node:process';
env.TEST = 1;console.log(env.test);// => 1
Unless explicitly specified when creating a Worker instance,
each Worker thread has its own copy of process.env, based on its
parent thread's process.env, or whatever was specified as the env option
to the Worker constructor. Changes to process.env will not be visible
across Worker threads, and only the main thread can make changes that
are visible to the operating system or to native add-ons. On Windows, a copy of process.env on a Worker instance operates in a case-sensitive manner
unlike the main thread.
env.string | undefined
S2_ACCESS_TOKEN!, // Your S2 access token}
// HEAD /api/s2 - Health check/pingexport async function function HEAD(): Promise<Response>
HEAD() { return new var Response: new (body?: BodyInit | null, init?: ResponseInit) => Response
The Response interface of the Fetch API represents the response to a request.
This Fetch API interface represents the response to a request.
Response(null, { ResponseInit.status?: number
status: 200 })}
// GET /api/s2 - Pull eventsexport async function function GET(request: Request): Promise<Response>
GET(request: Request<unknown, CfProperties<unknown>>
request: interface Request<CfHostMetadata = unknown, Cf = CfProperties<CfHostMetadata>>
The Request interface of the Fetch API represents a resource request.
This Fetch API interface represents a resource request.
Request) { const const url: URL
url = new var URL: new (url: string | URL, base?: string | URL) => URL
The URL interface is used to parse, construct, normalize, and encode URL.
URL(request: Request<unknown, CfProperties<unknown>>
request.Request<unknown, CfProperties<unknown>>.url: string
The url read-only property of the Request interface contains the URL of the request.
Returns the URL of request as a string.
url) const const args: { readonly storeId: string; readonly s2SeqNum: (number & Brand<"S2SeqNum">) | "from-start"; readonly live: boolean; readonly payload: Schema.JsonValue | undefined;}
args = import S2
S2.const decodePullArgsFromSearchParams: (searchParams: URLSearchParams) => { readonly storeId: string; readonly s2SeqNum: (number & Brand<"S2SeqNum">) | "from-start"; readonly live: boolean; readonly payload: Schema.JsonValue | undefined;}
Decode args from URLSearchParams using Effect Schema, mirroring Electric's approach.
decodePullArgsFromSearchParams(const url: URL
url.URL.searchParams: URLSearchParams
The searchParams read-only property of the access to the [MISSING: httpmethod('GET')] decoded query arguments contained in the URL.
searchParams) const const streamName: string
streamName = import S2
S2.const makeS2StreamName: (storeId: string) => string
makeS2StreamName(const args: { readonly storeId: string; readonly s2SeqNum: (number & Brand<"S2SeqNum">) | "from-start"; readonly live: boolean; readonly payload: Schema.JsonValue | undefined;}
args.storeId: string
storeId)
// Ensure basin and stream exist await import S2Helpers
S2Helpers.const ensureBasin: (config: S2.S2Config) => Promise<void>
ensureBasin(const s2Config: S2.S2Config
s2Config) await import S2Helpers
S2Helpers.const ensureStream: (config: S2.S2Config, stream: string) => Promise<void>
ensureStream(const s2Config: S2.S2Config
s2Config, const streamName: string
streamName)
// Build request with appropriate headers and URL // Note: buildPullRequest handles cursor+1 conversion internally const { url: string
url: const pullUrl: string
pullUrl, const headers: Record<string, string>
headers } = import S2Helpers
S2Helpers.const buildPullRequest: ({ config, args, }: { config: S2.S2Config; args: S2.ApiSchema.PullArgs;}) => { url: string; headers: Record<string, string>;}
buildPullRequest({ config: S2.S2Config
config: const s2Config: S2.S2Config
s2Config, args: { readonly storeId: string; readonly s2SeqNum: (number & Brand<"S2SeqNum">) | "from-start"; readonly live: boolean; readonly payload: Schema.JsonValue | undefined;}
args })
const const res: Response
res = await function fetch(input: string | URL | Request, init?: RequestInit): Promise<Response> (+3 overloads)
fetch(const pullUrl: string
pullUrl, { RequestInit<CfProperties<unknown>>.headers?: HeadersInit
A Headers object, an object literal, or an array of two-item arrays to set request's headers.
headers })
// For live pulls (SSE), proxy the response if (const args: { readonly storeId: string; readonly s2SeqNum: (number & Brand<"S2SeqNum">) | "from-start"; readonly live: boolean; readonly payload: Schema.JsonValue | undefined;}
args.live: boolean
live === true) { if (!const res: Response
res.Response.ok: boolean
The ok read-only property of the Response interface contains a Boolean stating whether the response was successful (status in the range 200-299) or not.
ok) { return import S2Helpers
S2Helpers.const sseKeepAliveResponse: () => Response
sseKeepAliveResponse() } return new var Response: new (body?: BodyInit | null, init?: ResponseInit) => Response
The Response interface of the Fetch API represents the response to a request.
This Fetch API interface represents the response to a request.
Response(const res: Response
res.Body.body: ReadableStream<Uint8Array<ArrayBuffer>> | null
body, { ResponseInit.status?: number
status: 200, ResponseInit.headers?: HeadersInit
headers: { 'content-type': 'text/event-stream' }, }) }
// For regular pulls if (!const res: Response
res.Response.ok: boolean
The ok read-only property of the Response interface contains a Boolean stating whether the response was successful (status in the range 200-299) or not.
ok) { return import S2Helpers
S2Helpers.const emptyBatchResponse: () => Response
emptyBatchResponse() }
const const batch: string
batch = await const res: Response
res.Body.text(): Promise<string> (+1 overload)
text() return new var Response: new (body?: BodyInit | null, init?: ResponseInit) => Response
The Response interface of the Fetch API represents the response to a request.
This Fetch API interface represents the response to a request.
Response(const batch: string
batch, { ResponseInit.headers?: HeadersInit
headers: { 'content-type': 'application/json' }, })}
// POST /api/s2 - Push eventsexport async function function POST(request: Request): Promise<Response>
POST(request: Request<unknown, CfProperties<unknown>>
request: interface Request<CfHostMetadata = unknown, Cf = CfProperties<CfHostMetadata>>
The Request interface of the Fetch API represents a resource request.
This Fetch API interface represents a resource request.
Request) { const const requestBody: unknown
requestBody = await request: Request<unknown, CfProperties<unknown>>
request.Body.json<unknown>(): Promise<unknown> (+1 overload)
json() const const parsed: { readonly storeId: string; readonly batch: readonly { readonly args: any; readonly name: string; readonly seqNum: number & Brand<"GlobalEventSequenceNumber">; readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">; readonly clientId: string; readonly sessionId: string; }[];}
parsed = import Schema
Schema.decodeUnknownSync<{ readonly storeId: string; readonly batch: readonly { readonly args: any; readonly name: string; readonly seqNum: number & Brand<"GlobalEventSequenceNumber">; readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">; readonly clientId: string; readonly sessionId: string; }[];}, { readonly storeId: string; readonly batch: readonly { readonly args: any; readonly name: string; readonly seqNum: number; readonly parentSeqNum: number; readonly clientId: string; readonly sessionId: string; }[];}>(schema: Schema.Schema<{ readonly storeId: string; readonly batch: readonly { readonly args: any; readonly name: string; readonly seqNum: number & Brand<"GlobalEventSequenceNumber">; readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">; readonly clientId: string; readonly sessionId: string; }[];}, { readonly storeId: string; readonly batch: readonly { readonly args: any; readonly name: string; readonly seqNum: number; readonly parentSeqNum: number; readonly clientId: string; readonly sessionId: string; }[];}, never>, options?: ParseOptions): (u: unknown, overrideOptions?: ParseOptions) => { readonly storeId: string; readonly batch: readonly { readonly args: any; readonly name: string; readonly seqNum: number & Brand<"GlobalEventSequenceNumber">; readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">; readonly clientId: string; readonly sessionId: string; }[];}export decodeUnknownSync
decodeUnknownSync(import S2
S2.import ApiSchema
ApiSchema.const PushPayload: Schema.Struct<{ storeId: typeof Schema.String; batch: Schema.Array$<Schema.Struct<{ name: typeof Schema.String; args: typeof Schema.Any; seqNum: Schema.BrandSchema<number & Brand<"GlobalEventSequenceNumber">, number, never>; parentSeqNum: Schema.BrandSchema<number & Brand<"GlobalEventSequenceNumber">, number, never>; clientId: typeof Schema.String; sessionId: typeof Schema.String; }>>;}>
PushPayload)(const requestBody: unknown
requestBody) const const streamName: string
streamName = import S2
S2.const makeS2StreamName: (storeId: string) => string
makeS2StreamName(const parsed: { readonly storeId: string; readonly batch: readonly { readonly args: any; readonly name: string; readonly seqNum: number & Brand<"GlobalEventSequenceNumber">; readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">; readonly clientId: string; readonly sessionId: string; }[];}
parsed.storeId: string
storeId)
// Ensure basin and stream exist await import S2Helpers
S2Helpers.const ensureBasin: (config: S2.S2Config) => Promise<void>
ensureBasin(const s2Config: S2.S2Config
s2Config) await import S2Helpers
S2Helpers.const ensureStream: (config: S2.S2Config, stream: string) => Promise<void>
ensureStream(const s2Config: S2.S2Config
s2Config, const streamName: string
streamName)
// Build push request with proper formatting const const pushRequests: readonly S2.S2PushRequest[]
pushRequests = import S2Helpers
S2Helpers.const buildPushRequests: ({ config, storeId, batch, }: { config: S2.S2Config; storeId: string; batch: readonly AnyEncodedGlobal[];}) => ReadonlyArray<S2.S2PushRequest>
Builds one or more append requests against S2. The helper applies the
documented 1 MiB / 1000-record limits via chunkEventsForS2, so callers
receive a request per compliant chunk instead of hitting 413 responses at
runtime.
buildPushRequests({ config: S2.S2Config
config: const s2Config: S2.S2Config
s2Config, storeId: string
storeId: const parsed: { readonly storeId: string; readonly batch: readonly { readonly args: any; readonly name: string; readonly seqNum: number & Brand<"GlobalEventSequenceNumber">; readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">; readonly clientId: string; readonly sessionId: string; }[];}
parsed.storeId: string
storeId, batch: readonly { readonly args: any; readonly name: string; readonly seqNum: number & Brand<"GlobalEventSequenceNumber">; readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">; readonly clientId: string; readonly sessionId: string;}[]
batch: const parsed: { readonly storeId: string; readonly batch: readonly { readonly args: any; readonly name: string; readonly seqNum: number & Brand<"GlobalEventSequenceNumber">; readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">; readonly clientId: string; readonly sessionId: string; }[];}
parsed.batch: readonly { readonly args: any; readonly name: string; readonly seqNum: number & Brand<"GlobalEventSequenceNumber">; readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">; readonly clientId: string; readonly sessionId: string;}[]
batch, })
for (const const pushRequest: S2.S2PushRequest
pushRequest of const pushRequests: readonly S2.S2PushRequest[]
pushRequests) { const const res: Response
res = await function fetch(input: string | URL | Request, init?: RequestInit): Promise<Response> (+3 overloads)
fetch(const pushRequest: S2.S2PushRequest
pushRequest.S2PushRequest.url: string
url, { RequestInit<CfProperties<unknown>>.method?: string
A string to set request's method.
method: 'POST', RequestInit<CfProperties<unknown>>.headers?: HeadersInit
A Headers object, an object literal, or an array of two-item arrays to set request's headers.
headers: const pushRequest: S2.S2PushRequest
pushRequest.S2PushRequest.headers: Record<string, string>
headers, RequestInit<CfProperties<unknown>>.body?: BodyInit | null
A BodyInit object or null to set request's body.
body: const pushRequest: S2.S2PushRequest
pushRequest.S2PushRequest.body: string
body, })
if (!const res: Response
res.Response.ok: boolean
The ok read-only property of the Response interface contains a Boolean stating whether the response was successful (status in the range 200-299) or not.
ok) { return import S2Helpers
S2Helpers.const errorResponse: (message: string, status?: number) => Response
errorResponse('Push failed', 500) } }
return import S2Helpers
S2Helpers.const successResponse: () => Response
successResponse()}Cursor Semantics
Section titled “Cursor Semantics”The S2 sync provider uses a cursor that represents the last processed record:
- The cursor points to the last S2 sequence number we’ve seen
- S2’s
seq_numparameter expects where to start reading from (inclusive) - The helper functions automatically handle the
+1conversion:seq_num = cursor + 1 - When starting from the beginning, cursor is
'from-start'which maps toseq_num = 0
Important Considerations
Section titled “Important Considerations”- Stream provisioning: The helper functions provide
ensureBasin()andensureStream()to handle creation automatically. - Error handling: The helpers include fallback responses (
emptyBatchResponse(),sseKeepAliveResponse()) to maintain stream continuity during errors. - Authentication: Store your S2 access token securely (e.g., environment variables).
- Rate limiting: Consider implementing rate limiting to protect your S2 quota.
- Response helpers: Use the provided response helpers (
successResponse(),errorResponse()) for consistent API responses.
Live Pull (SSE)
Section titled “Live Pull (SSE)”S2 provider supports live pulls over Server-Sent Events (SSE). When live: true is passed to pull, the client:
- Immediately emits one page (possibly empty) with
pageInfo: NoMore. - Parses SSE frames robustly (multi-line
data:support) and reacts to typed events:event: batch→ parsesdataas S2ReadBatchand emits items.event: ping→ ignored; keeps the stream alive.event: error→ mapped toInvalidPullError.
Implementation Notes
Section titled “Implementation Notes”Data Storage & Encoding
Section titled “Data Storage & Encoding”LiveStore leverages S2 streams for durable event storage. Understanding the mapping between LiveStore concepts and S2 primitives helps developers comprehend the persistence layer, though direct manipulation is discouraged.
LiveStore → S2 Mapping
Section titled “LiveStore → S2 Mapping”Store to Stream: Each LiveStore storeId maps to exactly one S2 stream. The stream name is derived from the storeId after sanitization to meet S2 naming requirements.
Event Encoding: LiveStore events (AnyEncodedGlobal) are JSON-serialized and stored as the body field of S2 records. Each event contains:
name: Event type identifierargs: Event-specific payload dataseqNum: LiveStore’s global event sequence numberparentSeqNum: Previous event’s sequence number for orderingclientId: Origin client identifiersessionId: Session that created the event
Record Structure: When pushed to S2, each LiveStore event becomes one S2 record:
{ "body": "{\"name\":\"todo/create\",\"args\":{...},\"seqNum\":42,\"parentSeqNum\":41,...}"}Sequence Number Handling
Section titled “Sequence Number Handling”LiveStore and S2 maintain completely independent sequence numbering systems:
- LiveStore’s
seqNum: Stored inside the JSON event payload (starts at 0). Used for logical event ordering and cursor management within LiveStore. - S2’s
seq_num: Assigned by S2 to each record in the stream (also starts at 0). Used solely for stream positioning when reading records.
These are two separate numbering systems that happen to both start at 0. While they often align numerically (first event is LiveStore seqNum 0, stored in S2 record with seq_num 0), this is coincidental rather than a direct mapping. The sync provider:
- Preserves LiveStore’s sequence numbers unchanged in the event payload
- Uses S2’s seq_num only for querying records from the stream (e.g., “read from position X”)
- Never relies on S2’s seq_num for LiveStore’s logical event ordering
Technical Details
Section titled “Technical Details”Format: The provider uses s2-format: raw when communicating with S2, treating record bodies as UTF-8 JSON strings.
Headers: S2 record headers are not utilized; all LiveStore metadata is contained within the JSON body.
Batch Operations: Multiple events can be pushed in a single batch, with each event becoming a separate S2 record while maintaining order.
Important Note
Section titled “Important Note”Direct stream manipulation is strongly discouraged. Always interact with S2 streams through LiveStore’s sync provider to ensure:
- Proper event encoding/decoding
- Sequence number integrity
- Cursor management consistency
- Compatibility with LiveStore’s sync protocol
Bypassing LiveStore to modify S2 streams directly may corrupt the event log and break synchronization.