pocketbase/pocketbase/services/realtime_service.py
Vithor Jaeger d320125c58
0.8.0 rc2 (#8)
* fix services

* Port of PR #6 to branch 0.8.0 (#7)

* Switch from JSON to multipart file encoding for file upload

File upload is detected when body data contains a value of class FileUpload
Remaining JSON is converted to FormData
Enitre message is sent as multipart

* Switch from JSON to multipart file encoding for file upload (#6)

File upload is detected when body data contains a value of class FileUpload
Remaining JSON is converted to FormData
Enitre message is sent as multipart

* fix readme

* fix client

* Remove "@" chars (#11)

* Remove "@" chars that led to empty collectionId, collectionName and expand

* Make load method more generic

* fix license

---------

Co-authored-by: Paulo Coutinho <paulocoutinhox@gmail.com>
Co-authored-by: Martin <mahe@quantentunnel.de>
Co-authored-by: Eoin Fennessy <85010533+eoinfennessy@users.noreply.github.com>
2023-02-10 13:46:31 -04:00

152 lines
5.1 KiB
Python

from __future__ import annotations
from typing import Callable, List
import dataclasses
import json
from pocketbase.services.utils.base_service import BaseService
from pocketbase.services.utils.sse import Event, SSEClient
from pocketbase.models.record import Record
@dataclasses.dataclass
class MessageData:
action: str
record: Record
class RealtimeService(BaseService):
subscriptions: dict
client_id: str = ""
event_source: SSEClient | None = None
def __init__(self, client) -> None:
super().__init__(client)
self.subscriptions = {}
self.client_id = ""
self.event_source = None
def subscribe(
self, subscription: str, callback: Callable[[MessageData], None]
) -> None:
"""Inits the sse connection (if not already) and register the subscription."""
# unsubscribe existing
if subscription in self.subscriptions and self.event_source:
self.event_source.remove_event_listener(subscription, callback)
# register subscription
self.subscriptions[subscription] = self._make_subscription(callback)
if not self.event_source:
self._connect()
elif self.client_id:
self._submit_subscriptions()
def unsubscribe_by_prefix(self, subscription_prefix: str):
"""
Unsubscribe from all subscriptions starting with the provided prefix.
This method is no-op if there are no active subscriptions with the provided prefix.
The related sse connection will be autoclosed if after the
unsubscribe operation there are no active subscriptions left.
"""
to_unsubscribe = []
for sub in self.subscriptions:
if sub.startswith(subscription_prefix):
to_unsubscribe.append(sub)
if len(to_unsubscribe) == 0:
return
return self.unsubscribe(*to_unsubscribe)
def unsubscribe(self, subscriptions: List[str] | None = None) -> None:
"""
Unsubscribe from a subscription.
If the `subscriptions` argument is not set,
then the client will unsubscribe from all registered subscriptions.
The related sse connection will be autoclosed if after the
unsubscribe operations there are no active subscriptions left.
"""
if not subscriptions or len(subscriptions) == 0:
# remove all subscriptions
self._remove_subscription_listeners()
self.subscriptions = {}
else:
# remove each passed subscription
found = False
for sub in self.subscriptions:
found = True
self.event_source.remove_event_listener(sub, self.subscriptions[sub])
self.subscriptions.pop(sub)
if not found:
return
if self.client_id:
self._submit_subscriptions()
# no more subscriptions -> close the sse connection
if not self.subscriptions:
self._disconnect()
def _make_subscription(
self, callback: Callable[[MessageData], None]
) -> Callable[[Event], None]:
def listener(event: Event) -> None:
data = json.loads(event.data)
if "record" in data and "action" in data:
callback(
MessageData(
action=data["action"],
record=Record(
data=data["record"],
),
)
)
return listener
def _submit_subscriptions(self) -> bool:
self._add_subscription_listeners()
self.client.send(
"/api/realtime",
{
"method": "POST",
"body": {
"clientId": self.client_id,
"subscriptions": list(self.subscriptions.keys()),
},
},
)
return True
def _add_subscription_listeners(self) -> None:
if not self.event_source:
return
self._remove_subscription_listeners()
for subscription, callback in self.subscriptions.items():
self.event_source.add_event_listener(subscription, callback)
def _remove_subscription_listeners(self) -> None:
if not self.event_source:
return
for subscription, callback in self.subscriptions.items():
self.event_source.remove_event_listener(subscription, callback)
def _connect_handler(self, event: Event) -> None:
self.client_id = event.id
self._submit_subscriptions()
def _connect(self) -> None:
self._disconnect()
self.event_source = SSEClient(self.client.build_url("/api/realtime"))
self.event_source.add_event_listener("PB_CONNECT", self._connect_handler)
def _disconnect(self) -> None:
self._remove_subscription_listeners()
self.client_id = ""
if not self.event_source:
return
self.event_source.remove_event_listener("PB_CONNECT", self._connect_handler)
self.event_source.close()
self.event_source = None