import asyncio
from sailhouse import SailhouseClient, Filter, ComplexFilter, FilterCondition
async def main():
client = SailhouseClient(token=os.getenv("SAILHOUSE_TOKEN"))
# Basic push subscription
await client.admin.register_push_subscription(
topic="user-events",
subscription="welcome-email",
endpoint="https://api.example.com/webhooks/welcome"
)
# With simple filter
await client.admin.register_push_subscription(
topic="user-events",
subscription="premium-users",
endpoint="https://api.example.com/webhooks/premium",
filter=Filter(
path="user.type",
value="premium"
)
)
# With advanced filter
await client.admin.register_push_subscription(
topic="user-events",
subscription="active-premium-users",
endpoint="https://api.example.com/webhooks/active-premium",
filter=ComplexFilter(
filters=[
FilterCondition(
path="user.type",
condition="equals",
value="premium"
),
FilterCondition(
path="user.status",
condition="not_equals",
value="inactive"
)
],
operator="and"
),
rate_limit="100/h",
deduplication="5m"
)
asyncio.run(main())