Installing

The SDK is available on PyPI, so you can install it using pip:
pip install sailhouse

Basic Usage

Importing

The main components are available directly from the sailhouse package.
from sailhouse import SailhouseClient

Initialising the Client

The package provides a SailhouseClient class which contains all methods for interacting with the Sailhouse platform. After generating a token, you can create a client instance:
import os
from sailhouse import SailhouseClient

client = SailhouseClient(
    token=os.getenv("SAILHOUSE_TOKEN")
)
The client can be customised with optional parameters:
  • timeout: Request timeout in seconds (default: 5.0)

Sending an event

Use the publish() method to send data to a topic. The method is async, so you’ll need to use it with await:
import asyncio
from datetime import datetime

async def main():
    data = {
        "text": "example contents",
        "object": {
            "key": "value"
        }
    }

    # Basic publish
    await client.publish("some-topic", data)

    # Publishing with optional parameters
    await client.publish(
        "some-topic",
        data,
        scheduled_time=datetime(2025, 12, 31),
        metadata={"source": "example"}
    )

asyncio.run(main())
The data argument must be a dictionary that can be serialized to JSON. The method also accepts optional parameters:
  • scheduled_time: A datetime object for scheduled message delivery
  • metadata: Additional metadata as a dictionary

Receiving events from a pull subscription

The pull() method allows you to fetch, and lock for procesing, a single event from a subscription:
from dataclasses import dataclass
import asyncio

@dataclass
class Message:
    text: str

async def main():
    event = await client.pull(
        "user-created",
        "send-welcome-email"
    )

    message = event.as_type(Message)
    print(message.text)

    await event.ack()

asyncio.run(main())
The get_events() method accepts several optional parameters:
  • limit: Maximum number of events to retrieve
  • offset: Number of events to skip
  • time_window: Time window for event retrieval (e.g., “1h”, “24h”)

Subscribing to events

The SDK provides a convenient way to continuously process events using the subscribe() method:
import asyncio
from sailhouse import Event

async def handle_event(event: Event):
    print(f"Received event: {event.data}")
    await event.ack()

async def handle_error(error: Exception):
    print(f"Error occurred: {error}")

async def main():
    await client.subscribe(
        "awesome-example",
        "awesome-subscription",
        handler=handle_event,
        polling_interval=5.0,
        on_error=handle_error,
        exit_on_error=False
    )

asyncio.run(main())
The subscribe() method accepts several parameters:
  • handler: Callback function to process each event
  • polling_interval: Time between polling attempts in seconds (default: 5.0)
  • on_error: Optional error handler function
  • exit_on_error: Whether to exit on error (default: False)

Admin Operations

The admin client provides methods for managing subscriptions programmatically.

Registering Push Subscriptions

import asyncio
from sailhouse import SailhouseClient, Filter, ComplexFilter, FilterCondition

async def main():
    client = SailhouseClient(token=os.getenv("SAILHOUSE_TOKEN"))
    
    # Basic push subscription
    await client.admin.register_push_subscription(
        topic="user-events",
        subscription="welcome-email", 
        endpoint="https://api.example.com/webhooks/welcome"
    )
    
    # With simple filter
    await client.admin.register_push_subscription(
        topic="user-events",
        subscription="premium-users",
        endpoint="https://api.example.com/webhooks/premium",
        filter=Filter(
            path="user.type",
            value="premium"
        )
    )
    
    # With advanced filter
    await client.admin.register_push_subscription(
        topic="user-events",
        subscription="active-premium-users",
        endpoint="https://api.example.com/webhooks/active-premium",
        filter=ComplexFilter(
            filters=[
                FilterCondition(
                    path="user.type",
                    condition="equals",
                    value="premium"
                ),
                FilterCondition(
                    path="user.status", 
                    condition="not_equals",
                    value="inactive"
                )
            ],
            operator="and"
        ),
        rate_limit="100/h",
        deduplication="5m"
    )

asyncio.run(main())

Rate Limiting and Deduplication

  • Rate Limit: Controls delivery frequency using formats like “100/h” (100 per hour), “10/m” (10 per minute)
  • Deduplication: Prevents duplicate deliveries within a time window using formats like “5m” (5 minutes), “1h” (1 hour)