Skip to content

S2

The @livestore/sync-s2 package lets you sync LiveStore with the official S2 backend (s2.dev).

  • Package: pnpm add @livestore/sync-s2
  • Protocol: HTTP push/pull, live pull via SSE

The API proxy handles:

  • Business logic: Any kind of business logic that is specific to your application (e.g. rate limiting, auth, logging, etc.)
  • S2 Stream Management: Creates basins and streams as needed
  • S2 Request Translation: Converts LiveStore sync operations to authenticated S2 API calls

Basic usage in your worker/server code:

import {
const makeSyncBackend: ({ endpoint, ping: pingOptions, retry }: SyncS2Options) => SyncBackendConstructor<SyncMetadata>
makeSyncBackend
} from '@livestore/sync-s2'
const
const _backend: SyncBackendConstructor<{
readonly s2SeqNum: number & Brand<"S2SeqNum">;
}, JsonValue>
_backend
=
function makeSyncBackend({ endpoint, ping: pingOptions, retry }: SyncS2Options): SyncBackendConstructor<SyncMetadata>
makeSyncBackend
({
SyncS2Options.endpoint: string | {
push: string;
pull: string;
ping: string;
}
endpoint
: '/api/s2', // Your API proxy endpoint
// more options...
})

S2 requires authentication and stream management that can’t be handled directly from the browser. You’ll need to implement an API proxy on your server that:

  1. Handles authentication with S2 using your access token
  2. Manages basins and streams (creates them if they don’t exist)
  3. Proxies requests between LiveStore and S2

Your proxy needs three endpoints:

The @livestore/sync-s2 package provides helper functions to simplify the proxy implementation:

import {
import Schema
Schema
} from '@livestore/livestore'
import * as
import S2
S2
from '@livestore/sync-s2'
import * as
import S2Helpers
S2Helpers
from '@livestore/sync-s2/s2-proxy-helpers'
// Configure S2 connection
const
const s2Config: S2.S2Config
s2Config
:
import S2Helpers
S2Helpers
.
interface S2Config

Configuration for S2 connections

S2Config
= {
S2Config.basin: string
basin
:
var process: NodeJS.Process
process
.
NodeJS.Process.env: NodeJS.ProcessEnv

The process.env property returns an object containing the user environment. See environ(7).

An example of this object looks like:

{
TERM: 'xterm-256color',
SHELL: '/usr/local/bin/bash',
USER: 'maciej',
PATH: '~/.bin/:/usr/bin:/bin:/usr/sbin:/sbin:/usr/local/bin',
PWD: '/Users/maciej',
EDITOR: 'vim',
SHLVL: '1',
HOME: '/Users/maciej',
LOGNAME: 'maciej',
_: '/usr/local/bin/node'
}

It is possible to modify this object, but such modifications will not be reflected outside the Node.js process, or (unless explicitly requested) to other Worker threads. In other words, the following example would not work:

node -e 'process.env.foo = "bar"' &#x26;&#x26; echo $foo

While the following will:

import { env } from 'node:process';
env.foo = 'bar';
console.log(env.foo);

Assigning a property on process.env will implicitly convert the value to a string. This behavior is deprecated. Future versions of Node.js may throw an error when the value is not a string, number, or boolean.

import { env } from 'node:process';
env.test = null;
console.log(env.test);
// => 'null'
env.test = undefined;
console.log(env.test);
// => 'undefined'

Use delete to delete a property from process.env.

import { env } from 'node:process';
env.TEST = 1;
delete env.TEST;
console.log(env.TEST);
// => undefined

On Windows operating systems, environment variables are case-insensitive.

import { env } from 'node:process';
env.TEST = 1;
console.log(env.test);
// => 1

Unless explicitly specified when creating a Worker instance, each Worker thread has its own copy of process.env, based on its parent thread's process.env, or whatever was specified as the env option to the Worker constructor. Changes to process.env will not be visible across Worker threads, and only the main thread can make changes that are visible to the operating system or to native add-ons. On Windows, a copy of process.env on a Worker instance operates in a case-sensitive manner unlike the main thread.

@sincev0.1.27

env
.
string | undefined
S2_BASIN
?? 'your-basin',
S2Config.token: string
token
:
var process: NodeJS.Process
process
.
NodeJS.Process.env: NodeJS.ProcessEnv

The process.env property returns an object containing the user environment. See environ(7).

An example of this object looks like:

{
TERM: 'xterm-256color',
SHELL: '/usr/local/bin/bash',
USER: 'maciej',
PATH: '~/.bin/:/usr/bin:/bin:/usr/sbin:/sbin:/usr/local/bin',
PWD: '/Users/maciej',
EDITOR: 'vim',
SHLVL: '1',
HOME: '/Users/maciej',
LOGNAME: 'maciej',
_: '/usr/local/bin/node'
}

It is possible to modify this object, but such modifications will not be reflected outside the Node.js process, or (unless explicitly requested) to other Worker threads. In other words, the following example would not work:

node -e 'process.env.foo = "bar"' &#x26;&#x26; echo $foo

While the following will:

import { env } from 'node:process';
env.foo = 'bar';
console.log(env.foo);

Assigning a property on process.env will implicitly convert the value to a string. This behavior is deprecated. Future versions of Node.js may throw an error when the value is not a string, number, or boolean.

import { env } from 'node:process';
env.test = null;
console.log(env.test);
// => 'null'
env.test = undefined;
console.log(env.test);
// => 'undefined'

Use delete to delete a property from process.env.

import { env } from 'node:process';
env.TEST = 1;
delete env.TEST;
console.log(env.TEST);
// => undefined

On Windows operating systems, environment variables are case-insensitive.

import { env } from 'node:process';
env.TEST = 1;
console.log(env.test);
// => 1

Unless explicitly specified when creating a Worker instance, each Worker thread has its own copy of process.env, based on its parent thread's process.env, or whatever was specified as the env option to the Worker constructor. Changes to process.env will not be visible across Worker threads, and only the main thread can make changes that are visible to the operating system or to native add-ons. On Windows, a copy of process.env on a Worker instance operates in a case-sensitive manner unlike the main thread.

@sincev0.1.27

env
.
string | undefined
S2_ACCESS_TOKEN
!, // Your S2 access token
}
// HEAD /api/s2 - Health check/ping
export async function
function HEAD(): Promise<Response>
HEAD
() {
return new
var Response: new (body?: BodyInit | null, init?: ResponseInit) => Response

The Response interface of the Fetch API represents the response to a request.

MDN Reference

This Fetch API interface represents the response to a request.

MDN Reference

Response
(null, {
ResponseInit.status?: number
status
: 200 })
}
// GET /api/s2 - Pull events
export async function
function GET(request: Request): Promise<Response>
GET
(
request: Request<unknown, CfProperties<unknown>>
request
:
interface Request<CfHostMetadata = unknown, Cf = CfProperties<CfHostMetadata>>

The Request interface of the Fetch API represents a resource request.

MDN Reference

This Fetch API interface represents a resource request.

MDN Reference

Request
) {
const
const url: URL
url
= new
var URL: new (url: string | URL, base?: string | URL) => URL

The URL interface is used to parse, construct, normalize, and encode URL.

MDN Reference

URL
(
request: Request<unknown, CfProperties<unknown>>
request
.
Request<unknown, CfProperties<unknown>>.url: string

The url read-only property of the Request interface contains the URL of the request.

MDN Reference

Returns the URL of request as a string.

MDN Reference

url
)
const
const args: {
readonly storeId: string;
readonly s2SeqNum: (number & Brand<"S2SeqNum">) | "from-start";
readonly live: boolean;
readonly payload: Schema.JsonValue | undefined;
}
args
=
import S2
S2
.
const decodePullArgsFromSearchParams: (searchParams: URLSearchParams) => {
readonly storeId: string;
readonly s2SeqNum: (number & Brand<"S2SeqNum">) | "from-start";
readonly live: boolean;
readonly payload: Schema.JsonValue | undefined;
}

Decode args from URLSearchParams using Effect Schema, mirroring Electric's approach.

decodePullArgsFromSearchParams
(
const url: URL
url
.
URL.searchParams: URLSearchParams

The searchParams read-only property of the access to the [MISSING: httpmethod('GET')] decoded query arguments contained in the URL.

MDN Reference

searchParams
)
const
const streamName: string
streamName
=
import S2
S2
.
const makeS2StreamName: (storeId: string) => string
makeS2StreamName
(
const args: {
readonly storeId: string;
readonly s2SeqNum: (number & Brand<"S2SeqNum">) | "from-start";
readonly live: boolean;
readonly payload: Schema.JsonValue | undefined;
}
args
.
storeId: string
storeId
)
// Ensure basin and stream exist
await
import S2Helpers
S2Helpers
.
const ensureBasin: (config: S2.S2Config) => Promise<void>
ensureBasin
(
const s2Config: S2.S2Config
s2Config
)
await
import S2Helpers
S2Helpers
.
const ensureStream: (config: S2.S2Config, stream: string) => Promise<void>
ensureStream
(
const s2Config: S2.S2Config
s2Config
,
const streamName: string
streamName
)
// Build request with appropriate headers and URL
// Note: buildPullRequest handles cursor+1 conversion internally
const {
url: string
url
:
const pullUrl: string
pullUrl
,
const headers: Record<string, string>
headers
} =
import S2Helpers
S2Helpers
.
const buildPullRequest: ({ config, args, }: {
config: S2.S2Config;
args: S2.ApiSchema.PullArgs;
}) => {
url: string;
headers: Record<string, string>;
}
buildPullRequest
({
config: S2.S2Config
config
:
const s2Config: S2.S2Config
s2Config
,
args: {
readonly storeId: string;
readonly s2SeqNum: (number & Brand<"S2SeqNum">) | "from-start";
readonly live: boolean;
readonly payload: Schema.JsonValue | undefined;
}
args
})
const
const res: Response
res
= await
function fetch(input: string | URL | Request, init?: RequestInit): Promise<Response> (+3 overloads)
fetch
(
const pullUrl: string
pullUrl
, {
RequestInit<CfProperties<unknown>>.headers?: HeadersInit

A Headers object, an object literal, or an array of two-item arrays to set request's headers.

headers
})
// For live pulls (SSE), proxy the response
if (
const args: {
readonly storeId: string;
readonly s2SeqNum: (number & Brand<"S2SeqNum">) | "from-start";
readonly live: boolean;
readonly payload: Schema.JsonValue | undefined;
}
args
.
live: boolean
live
=== true) {
if (!
const res: Response
res
.
Response.ok: boolean

The ok read-only property of the Response interface contains a Boolean stating whether the response was successful (status in the range 200-299) or not.

MDN Reference

ok
) {
return
import S2Helpers
S2Helpers
.
const sseKeepAliveResponse: () => Response
sseKeepAliveResponse
()
}
return new
var Response: new (body?: BodyInit | null, init?: ResponseInit) => Response

The Response interface of the Fetch API represents the response to a request.

MDN Reference

This Fetch API interface represents the response to a request.

MDN Reference

Response
(
const res: Response
res
.
Body.body: ReadableStream<Uint8Array<ArrayBuffer>> | null
body
, {
ResponseInit.status?: number
status
: 200,
ResponseInit.headers?: HeadersInit
headers
: { 'content-type': 'text/event-stream' },
})
}
// For regular pulls
if (!
const res: Response
res
.
Response.ok: boolean

The ok read-only property of the Response interface contains a Boolean stating whether the response was successful (status in the range 200-299) or not.

MDN Reference

ok
) {
return
import S2Helpers
S2Helpers
.
const emptyBatchResponse: () => Response
emptyBatchResponse
()
}
const
const batch: string
batch
= await
const res: Response
res
.
Body.text(): Promise<string> (+1 overload)
text
()
return new
var Response: new (body?: BodyInit | null, init?: ResponseInit) => Response

The Response interface of the Fetch API represents the response to a request.

MDN Reference

This Fetch API interface represents the response to a request.

MDN Reference

Response
(
const batch: string
batch
, {
ResponseInit.headers?: HeadersInit
headers
: { 'content-type': 'application/json' },
})
}
// POST /api/s2 - Push events
export async function
function POST(request: Request): Promise<Response>
POST
(
request: Request<unknown, CfProperties<unknown>>
request
:
interface Request<CfHostMetadata = unknown, Cf = CfProperties<CfHostMetadata>>

The Request interface of the Fetch API represents a resource request.

MDN Reference

This Fetch API interface represents a resource request.

MDN Reference

Request
) {
const
const requestBody: unknown
requestBody
= await
request: Request<unknown, CfProperties<unknown>>
request
.
Body.json<unknown>(): Promise<unknown> (+1 overload)
json
()
const
const parsed: {
readonly storeId: string;
readonly batch: readonly {
readonly args: any;
readonly name: string;
readonly seqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly clientId: string;
readonly sessionId: string;
}[];
}
parsed
=
import Schema
Schema
.
decodeUnknownSync<{
readonly storeId: string;
readonly batch: readonly {
readonly args: any;
readonly name: string;
readonly seqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly clientId: string;
readonly sessionId: string;
}[];
}, {
readonly storeId: string;
readonly batch: readonly {
readonly args: any;
readonly name: string;
readonly seqNum: number;
readonly parentSeqNum: number;
readonly clientId: string;
readonly sessionId: string;
}[];
}>(schema: Schema.Schema<{
readonly storeId: string;
readonly batch: readonly {
readonly args: any;
readonly name: string;
readonly seqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly clientId: string;
readonly sessionId: string;
}[];
}, {
readonly storeId: string;
readonly batch: readonly {
readonly args: any;
readonly name: string;
readonly seqNum: number;
readonly parentSeqNum: number;
readonly clientId: string;
readonly sessionId: string;
}[];
}, never>, options?: ParseOptions): (u: unknown, overrideOptions?: ParseOptions) => {
readonly storeId: string;
readonly batch: readonly {
readonly args: any;
readonly name: string;
readonly seqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly clientId: string;
readonly sessionId: string;
}[];
}
export decodeUnknownSync

@throwsParseError

@since3.10.0

decodeUnknownSync
(
import S2
S2
.
import ApiSchema
ApiSchema
.
const PushPayload: Schema.Struct<{
storeId: typeof Schema.String;
batch: Schema.Array$<Schema.Struct<{
name: typeof Schema.String;
args: typeof Schema.Any;
seqNum: Schema.BrandSchema<number & Brand<"GlobalEventSequenceNumber">, number, never>;
parentSeqNum: Schema.BrandSchema<number & Brand<"GlobalEventSequenceNumber">, number, never>;
clientId: typeof Schema.String;
sessionId: typeof Schema.String;
}>>;
}>
PushPayload
)(
const requestBody: unknown
requestBody
)
const
const streamName: string
streamName
=
import S2
S2
.
const makeS2StreamName: (storeId: string) => string
makeS2StreamName
(
const parsed: {
readonly storeId: string;
readonly batch: readonly {
readonly args: any;
readonly name: string;
readonly seqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly clientId: string;
readonly sessionId: string;
}[];
}
parsed
.
storeId: string
storeId
)
// Ensure basin and stream exist
await
import S2Helpers
S2Helpers
.
const ensureBasin: (config: S2.S2Config) => Promise<void>
ensureBasin
(
const s2Config: S2.S2Config
s2Config
)
await
import S2Helpers
S2Helpers
.
const ensureStream: (config: S2.S2Config, stream: string) => Promise<void>
ensureStream
(
const s2Config: S2.S2Config
s2Config
,
const streamName: string
streamName
)
// Build push request with proper formatting
const
const pushRequests: readonly S2.S2PushRequest[]
pushRequests
=
import S2Helpers
S2Helpers
.
const buildPushRequests: ({ config, storeId, batch, }: {
config: S2.S2Config;
storeId: string;
batch: readonly AnyEncodedGlobal[];
}) => ReadonlyArray<S2.S2PushRequest>

Builds one or more append requests against S2. The helper applies the documented 1 MiB / 1000-record limits via chunkEventsForS2, so callers receive a request per compliant chunk instead of hitting 413 responses at runtime.

buildPushRequests
({
config: S2.S2Config
config
:
const s2Config: S2.S2Config
s2Config
,
storeId: string
storeId
:
const parsed: {
readonly storeId: string;
readonly batch: readonly {
readonly args: any;
readonly name: string;
readonly seqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly clientId: string;
readonly sessionId: string;
}[];
}
parsed
.
storeId: string
storeId
,
batch: readonly {
readonly args: any;
readonly name: string;
readonly seqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly clientId: string;
readonly sessionId: string;
}[]
batch
:
const parsed: {
readonly storeId: string;
readonly batch: readonly {
readonly args: any;
readonly name: string;
readonly seqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly clientId: string;
readonly sessionId: string;
}[];
}
parsed
.
batch: readonly {
readonly args: any;
readonly name: string;
readonly seqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly parentSeqNum: number & Brand<"GlobalEventSequenceNumber">;
readonly clientId: string;
readonly sessionId: string;
}[]
batch
,
})
for (const
const pushRequest: S2.S2PushRequest
pushRequest
of
const pushRequests: readonly S2.S2PushRequest[]
pushRequests
) {
const
const res: Response
res
= await
function fetch(input: string | URL | Request, init?: RequestInit): Promise<Response> (+3 overloads)
fetch
(
const pushRequest: S2.S2PushRequest
pushRequest
.
S2PushRequest.url: string
url
, {
RequestInit<CfProperties<unknown>>.method?: string

A string to set request's method.

method
: 'POST',
RequestInit<CfProperties<unknown>>.headers?: HeadersInit

A Headers object, an object literal, or an array of two-item arrays to set request's headers.

headers
:
const pushRequest: S2.S2PushRequest
pushRequest
.
S2PushRequest.headers: Record<string, string>
headers
,
RequestInit<CfProperties<unknown>>.body?: BodyInit | null

A BodyInit object or null to set request's body.

body
:
const pushRequest: S2.S2PushRequest
pushRequest
.
S2PushRequest.body: string
body
,
})
if (!
const res: Response
res
.
Response.ok: boolean

The ok read-only property of the Response interface contains a Boolean stating whether the response was successful (status in the range 200-299) or not.

MDN Reference

ok
) {
return
import S2Helpers
S2Helpers
.
const errorResponse: (message: string, status?: number) => Response
errorResponse
('Push failed', 500)
}
}
return
import S2Helpers
S2Helpers
.
const successResponse: () => Response
successResponse
()
}

The S2 sync provider uses a cursor that represents the last processed record:

  • The cursor points to the last S2 sequence number we’ve seen
  • S2’s seq_num parameter expects where to start reading from (inclusive)
  • The helper functions automatically handle the +1 conversion: seq_num = cursor + 1
  • When starting from the beginning, cursor is 'from-start' which maps to seq_num = 0
  • Stream provisioning: The helper functions provide ensureBasin() and ensureStream() to handle creation automatically.
  • Error handling: The helpers include fallback responses (emptyBatchResponse(), sseKeepAliveResponse()) to maintain stream continuity during errors.
  • Authentication: Store your S2 access token securely (e.g., environment variables).
  • Rate limiting: Consider implementing rate limiting to protect your S2 quota.
  • Response helpers: Use the provided response helpers (successResponse(), errorResponse()) for consistent API responses.

S2 provider supports live pulls over Server-Sent Events (SSE). When live: true is passed to pull, the client:

  • Immediately emits one page (possibly empty) with pageInfo: NoMore.
  • Parses SSE frames robustly (multi-line data: support) and reacts to typed events:
    • event: batch → parses data as S2 ReadBatch and emits items.
    • event: ping → ignored; keeps the stream alive.
    • event: error → mapped to InvalidPullError.

LiveStore leverages S2 streams for durable event storage. Understanding the mapping between LiveStore concepts and S2 primitives helps developers comprehend the persistence layer, though direct manipulation is discouraged.

Store to Stream: Each LiveStore storeId maps to exactly one S2 stream. The stream name is derived from the storeId after sanitization to meet S2 naming requirements.

Event Encoding: LiveStore events (AnyEncodedGlobal) are JSON-serialized and stored as the body field of S2 records. Each event contains:

  • name: Event type identifier
  • args: Event-specific payload data
  • seqNum: LiveStore’s global event sequence number
  • parentSeqNum: Previous event’s sequence number for ordering
  • clientId: Origin client identifier
  • sessionId: Session that created the event

Record Structure: When pushed to S2, each LiveStore event becomes one S2 record:

S2 Record Example
{
"body": "{\"name\":\"todo/create\",\"args\":{...},\"seqNum\":42,\"parentSeqNum\":41,...}"
}

LiveStore and S2 maintain completely independent sequence numbering systems:

  • LiveStore’s seqNum: Stored inside the JSON event payload (starts at 0). Used for logical event ordering and cursor management within LiveStore.
  • S2’s seq_num: Assigned by S2 to each record in the stream (also starts at 0). Used solely for stream positioning when reading records.

These are two separate numbering systems that happen to both start at 0. While they often align numerically (first event is LiveStore seqNum 0, stored in S2 record with seq_num 0), this is coincidental rather than a direct mapping. The sync provider:

  • Preserves LiveStore’s sequence numbers unchanged in the event payload
  • Uses S2’s seq_num only for querying records from the stream (e.g., “read from position X”)
  • Never relies on S2’s seq_num for LiveStore’s logical event ordering

Format: The provider uses s2-format: raw when communicating with S2, treating record bodies as UTF-8 JSON strings.

Headers: S2 record headers are not utilized; all LiveStore metadata is contained within the JSON body.

Batch Operations: Multiple events can be pushed in a single batch, with each event becoming a separate S2 record while maintaining order.

Direct stream manipulation is strongly discouraged. Always interact with S2 streams through LiveStore’s sync provider to ensure:

  • Proper event encoding/decoding
  • Sequence number integrity
  • Cursor management consistency
  • Compatibility with LiveStore’s sync protocol

Bypassing LiveStore to modify S2 streams directly may corrupt the event log and break synchronization.