Effection Logo
@effectionx/workerv0.5.1thefrontside/effectionx
NPM Badge with published version

Web Worker

Easily use Web Workers to offload CPU-intensive computations or manage external processes. A library for seamlessly integrating Web Workers with Effection programs.

This package provides two functions. useWorker used in the main thread to start and establish communication with the worker. workerMain used in the worker script to invoke a worker function and send data back to the main thread.

Features

  • Establishes two-way communication between the main and the worker threads
  • Gracefully shutdowns the worker from the main thread
  • Propagates errors from the worker to the main thread
  • Type-safe message handling with TypeScript
  • Supports worker-initiated requests handled by the host

Usage: Get worker's return value

The return value of the worker is the return value of the function passed to workerMain.

Worker thread

import { workerMain } from "@effectionx/worker";

await workerMain<number, number, number, number>(function* fibonacci({
  data: n, // data sent to the worker from the main thread
}) {
  if (n <= 1) return n;

  let a = 0,
    b = 1;
  for (let i = 2; i <= n; i++) {
    let temp = a + b;
    a = b;
    b = temp;
  }

  return b;
});

Main Thread

You can easily retrieve this value from the worker object returned by useWorker function in the main thread.

import { run } from "effection";
import { useWorker } from "@effectionx/worker";

await run(function* () {
  const worker = yield* useWorker<number, number, number, number>(
    "./fibonacci.ts",
    {
      type: "module",
      data: 5, // data is passed to the operation function (can be any serializable value)
    },
  );

  const result = yield* worker; // wait for the result to receive the result

  console.log(result); // Output: 5
});

Error handling

Errors thrown in the function passed to workerMain can be captured in the main thread by wrapping yield* worker in a try/catch block;

try {
  const result = yield * worker;

  console.log(result);
} catch (e) {
  console.error(e); // error will be available here
}

Usage: Worker-initiated requests

Workers can initiate requests to the host using the send function provided to workerMain. The host handles these requests with worker.forEach, returning a response for each request.

Worker Thread

import { workerMain } from "@effectionx/worker";

await workerMain<never, never, string, void, string, string>(
  function* ({ send }) {
    const response = yield* send("hello");
    return `received: ${response}`;
  },
);

Main Thread

import { run } from "effection";
import { useWorker } from "@effectionx/worker";

await run(function* () {
  const worker = yield* useWorker<never, never, string, void>(
    "./worker.ts",
    { type: "module" },
  );

  const result = yield* worker.forEach<string, string>(function* (request) {
    return `echo: ${request}`;
  });

  console.log(result); // Output: received: echo: hello
});

Notes

  • Only one forEach can be active at a time; concurrent calls throw.
  • Requests are queued until forEach is called.
  • Errors are serialized and rethrown on the caller side.

Usage: Progress streaming

The host can send progress updates back to the worker during request processing. This enables real-time feedback for long-running operations.

Worker Thread

Use send.stream&lt;TProgress&gt;() to receive a subscription that yields progress values before the final response:

import { workerMain } from "@effectionx/worker";

interface Progress {
  percent: number;
  message: string;
}

await workerMain<never, never, string, void, string, string>(
  function* ({ send }) {
    // Request with progress streaming
    const subscription = yield* send.stream&lt;Progress&gt;("process-data");

    let next = yield* subscription.next();
    while (!next.done) {
      const progress = next.value;
      console.log(`${progress.percent}%: ${progress.message}`);
      next = yield* subscription.next();
    }

    // Final response
    return `completed: ${next.value}`;
  },
);

Main Thread

The forEach handler receives a context object with a progress() method:

import { run } from "effection";
import { useWorker } from "@effectionx/worker";

interface Progress {
  percent: number;
  message: string;
}

await run(function* () {
  const worker = yield* useWorker<never, never, string, void>(
    "./worker.ts",
    { type: "module" },
  );

  const result = yield* worker.forEach<string, string, Progress>(
    function* (request, ctx) {
      yield* ctx.progress({ percent: 25, message: "Loading..." });
      yield* ctx.progress({ percent: 50, message: "Processing..." });
      yield* ctx.progress({ percent: 75, message: "Finalizing..." });
      return "done";
    },
  );

  console.log(result); // Output: completed: done
});

Backpressure

The progress() method implements true backpressure:

  • ctx.progress() blocks until the worker calls subscription.next()
  • The host cannot send progress faster than the worker can receive it
  • If the worker does async work between next() calls, the host remains blocked

This ensures the worker is never overwhelmed with progress updates.

Notes

  • send(request) still works for simple request/response (ignores any progress)
  • Progress type is the third type parameter on forEach&lt;TRequest, TResponse, TProgress&gt;
  • The subscription's final next() returns { done: true, value: TResponse }

Usage: Sending messages to the worker

The worker can respond to incoming messages using forEach function provided by the messages object passed to the workerMain function.

Worker Thread

import { workerMain } from "../worker.ts";

await workerMain<number, number, void, number>(function* ({ messages, data }) {
  let counter = data;

  yield* messages.forEach(function* (message) {
    counter += message;
    return counter;
  });

  return counter;
});

Main Thread

The main thread can send messages to the worker using the send method on the object returned by useWorker. Effection will wait for the value to be returned from the worker before continuing.

import { run } from "effection";
import { useWorker } from "@effectionx/worker";

await run(function* () {
  const worker = yield* useWorker<number, number, number, number>(
    "./counter-worker.ts",
    {
      type: "module",
      data: 5, // initial value (can be any serializable value)
    },
  );

  console.log(yield* worker.send(5)); // Output 10

  console.log(yield* worker.send(10)); // Output: 20

  console.log(yield* worker.send(-5)); // Output: 15
});

Error Handling

You can catch error thrown while computing result for a message by wrapping yield* wrapper.send() in a try/catch.

try {
  console.log(yield * worker.send(5)); // Output 10
} catch (e) {
  console.error(e); // error will be available here
}

API Reference

interface WorkerResource<TSend, TRecv, TReturn> extends Operation<TReturn>

Resource returned by useWorker, providing APIs for worker communication.

Type Parameters

TSend

  • value main thread will send to the worker

TRecv

  • value main thread will receive from the worker

TReturn

  • worker operation return value

Methods

send
(data: TSend): Operation<TRecv>

Send a message to the worker and wait for a response.

forEach
<WRequest, WResponse, WProgress = never>(fn: (request: WRequest, ctx: ForEachContext<WProgress>) => Operation<WResponse>): Operation<TReturn>

Handle requests initiated by the worker. Only one forEach can be active at a time.

The handler receives a context object with a progress method for sending progress updates back to the worker.

function useWorker<TSend, TRecv, TReturn, TData>(url: string | URL, options?: WorkerOptions & {}): Operation<WorkerResource<TSend, TRecv, TReturn>>

Use on the main thread to create and exeecute a well behaved web worker.

Examples

Example 1

Compute a single value

import { run } from "effection";
import { useWorker } from "@effectionx/worker"

await run(function*() {
   const worker = yield* useWorker("script.ts", { type: "module" });

   try {
     const result = yield* worker;
   } catch (e) {
     console.error(e);
   }
});

Example 2

Compute multipe values

import { run } from "effection";
import { useWorker } from "@effectionx/worker"

await run(function*() {
   const worker = yield* useWorker("script.ts", { type: "module" });

   try {
     const result1 = yield* worker.send("Tom");
     const result2 = yield* worker.send("Dick");
     const result2 = yield* worker.send("Harry");

     // get the last result
     const finalResult = yield* worker;
   } catch (e) {
     console.error(e);
   }
});

Type Parameters

TSend

  • value main thread will send to the worker

TRecv

  • value main thread will receive from the worker

TReturn

  • worker operation return value

TData

  • data passed from the main thread to the worker during initialization

Parameters

url: string | URL

URL or string of script

optionsoptional: WorkerOptions & {}

WorkerOptions

Return Type

Operation<WorkerResource<TSend, TRecv, TReturn>>

async function workerMain<TSend, TRecv, TReturn, TData, WRequest = never, WResponse = never>(body: (options: WorkerMainOptions<TSend, TRecv, TData, WRequest, WResponse>) => Operation<TReturn>): Promise<void>

Entrypoint used in the worker that estaliblishes communication with the main thread. It can be used to return a value, respond to messages or both.

Examples

Example 1

Returning a value

import { workerMain } from "../worker.ts";

await workerMain(function* ({ data }) {
 return data;
});

Example 2

Responding to messages

import { workerMain } from "../worker.ts";

await workerMain(function* ({ messages }) {
 yield* messages.forEach(function* (message) {
   return message;
 });
});

Example 3

Responding to messages and return a value

import { workerMain } from "../worker.ts";

await workerMain<number, number, number, number>(
  function* ({ messages, data: initial }) {
    let counter = initial;

    yield* messages.forEach(function* (message) {
      counter += message;
      return counter; // returns a value after each message
    });

    return counter; // returns the final value
  },
);

Example 4

Sending requests to the host

import { workerMain } from "../worker.ts";

await workerMain<never, never, string, void, string, string>(
  function* ({ send }) {
    const response = yield* send("hello");
    return `received: ${response}`;
  },
);

Type Parameters

TSend

  • value main thread will send to the worker

TRecv

  • value main thread will receive from the worker

TReturn

  • worker operation return value

TData

  • data passed from the main thread to the worker during initialization

WRequest = never

  • value worker sends to the host in requests

WResponse = never

  • value worker receives from the host (response to worker's send)

Parameters

body: (options: WorkerMainOptions<TSend, TRecv, TData, WRequest, WResponse>) => Operation<TReturn>

Return Type

Promise<void>

function createWorkerStatesSignal(): Operation<WorkerStateSignal>

Return Type

Operation<WorkerStateSignal>

type WorkerControl = {} | {} | {}

Messages sent from host to worker (control messages).

Type Parameters

TSend

TData

type WorkerToHost = {} | {} | {}

Messages sent from worker to host.

Type Parameters

WRequest

  • value worker sends to host in requests

TReturn

  • return value when worker completes

interface SerializedError

Serialized error format for cross-boundary communication. Error objects cannot be cloned via postMessage, so we serialize them.

Properties

name: string

No documentation available.

message: string

No documentation available.

stackoptional: string

No documentation available.

causeoptional: SerializedError

No documentation available.

type SerializedResult = {} | {}

A Result type for cross-boundary communication where errors are serialized. Unlike effection's Result<T> which uses Error, this uses SerializedError.

Used by channel primitives to send success/error responses over MessageChannel.

Type Parameters

T

type ChannelMessage = {} | {}

Messages sent over a channel that supports progress streaming. Used by useChannelRequest to send progress updates and final response.

Type Parameters

TResponse

TProgress

type ChannelAck = {} | {}

Acknowledgement messages sent back over a channel. Used by useChannelResponse to acknowledge receipt of messages.

interface ForEachContext<TProgress>

Context passed to forEach handler for progress streaming. Allows the handler to send progress updates back to the requester.

Type Parameters

TProgress

  • The progress data type

Methods

progress
(data: TProgress): Operation<void>

Send a progress update to the requester. This operation blocks until the requester acknowledges receipt (backpressure).

function serializeError(error: Error): SerializedError

Serialize an Error for transmission via postMessage. Recursively serializes error.cause if present.

Parameters

error: Error

Return Type

SerializedError

function errorFromSerialized(context: string, serialized: SerializedError): Error

Create an Error from a serialized error, with original data in cause.

Parameters

context: string

  • Description of where the error occurred (e.g., "Host handler failed")

serialized: SerializedError

  • The serialized error data

Return Type

Error

interface WorkerSend<WRequest, WResponse>

A send function that supports both simple request/response and progress streaming.

Type Parameters

WRequest

  • value worker sends to host

WResponse

  • value worker receives from host

Methods

stream
<WProgress>(value: WRequest): Operation<Subscription<WProgress, WResponse>>

Send a request to the host and receive a subscription that yields progress updates and returns the final response.

interface WorkerMainOptions<TSend, TRecv, TData, WRequest = never, WResponse = never>

Options passed to the worker's main function.

Type Parameters

TSend

  • value host sends to worker

TRecv

  • value host receives from worker (response to host's send)

TData

  • initial data passed to worker

WRequest = never

  • value worker sends to host in requests

WResponse = never

  • value worker receives from host (response to worker's send)

Properties

messages: WorkerMessages<TSend, TRecv>

Namespace that provides APIs for working with incoming messages from host.

data: TData

Initial data received by the worker from the main thread used for initialization.

send: WorkerSend<WRequest, WResponse>

Send a request to the host and wait for a response. Also supports progress streaming via send.stream().

interface WorkerMessages<TSend, TRecv>

Object that represents messages the main thread sends to the worker. It provides function for handling messages.

Type Parameters

TSend

  • value main thread will send to the worker

TRecv

  • value main thread will receive from the worker

Methods

forEach
(fn: (message: TSend) => Operation<TRecv>): Operation<void>

No documentation available.