realtime 'working'

This commit is contained in:
Vithor Jaeger 2022-09-23 17:41:27 -04:00
parent 85c818539a
commit 7882598922
4 changed files with 60 additions and 25 deletions

View File

@ -1,6 +1,6 @@
__title__ = "pocketbase"
__description__ = "PocketBase client SDK for python."
__version__ = "0.1.3"
__version__ = "0.2.0"
from .client import Client, ClientResponseError

View File

@ -2,7 +2,7 @@ from __future__ import annotations
from typing import Callable
import dataclasses
import asyncio
import threading
import httpx
@ -17,12 +17,8 @@ class Event:
retry: int | None = None
class SSEClient:
"""Implementation of a server side event client"""
class EventLoop(threading.Thread):
FIELD_SEPARATOR = ":"
_listeners: dict = {}
_loop_running: bool = False
def __init__(
self,
@ -31,21 +27,30 @@ class SSEClient:
headers: dict | None = None,
payload: dict | None = None,
encoding="utf-8",
) -> None:
listeners: dict[str, Callable] | None = None,
**kwargs,
):
threading.Thread.__init__(self, **kwargs)
self.kill = False
self.client = httpx.Client()
self.url = url
self.method = method
self.headers = headers
self.payload = payload
self.encoding = encoding
self.client = httpx.AsyncClient()
self.listeners = listeners or {}
async def _read(self):
def _read(self):
"""Read the incoming event source stream and yield event chunks"""
data = b""
async with self.client.stream(
self.method, self.url, headers=self.headers, data=self.payload, timeout=None
with self.client.stream(
self.method,
self.url,
headers=self.headers,
data=self.payload,
timeout=None,
) as r:
async for chunk in r.aiter_bytes():
for chunk in r.iter_bytes():
for line in chunk.splitlines(True):
data += line
if data.endswith((b"\r\r", b"\n\n", b"\r\n\r\n")):
@ -54,8 +59,8 @@ class SSEClient:
if data:
yield data
async def _events(self):
async for chunk in self._read():
def _events(self):
for chunk in self._read():
event = Event()
for line in chunk.splitlines():
line = line.decode(self.encoding)
@ -83,22 +88,52 @@ class SSEClient:
event.event = event.event or "message"
yield event
async def _loop(self):
self._loop_running = True
async for event in self._events():
if event.event in self._listeners:
self._listeners[event.event](event)
def run(self):
for event in self._events():
if self.kill:
break
if event.event in self.listeners:
self.listeners[event.event](event)
class SSEClient:
"""Implementation of a server side event client"""
_listeners: dict = {}
_loop_thread: threading.Thread | None = None
def __init__(
self,
url: str,
method: str = "GET",
headers: dict | None = None,
payload: dict | None = None,
encoding="utf-8",
) -> None:
self._listeners = {}
self._loop_thread = EventLoop(
url=url,
method=method,
headers=headers,
payload=payload,
encoding=encoding,
listeners=self._listeners,
name="loop",
)
self._loop_thread.daemon = True
self._loop_thread.start()
def add_event_listener(self, event: str, callback: Callable[[Event], None]) -> None:
self._listeners[event] = callback
if not self._loop_running:
asyncio.run(self._loop())
self._loop_thread.listeners = self._listeners
def remove_event_listener(
self, event: str, callback: Callable[[Event], None]
) -> None:
if event in self._listeners:
self._listeners.pop(event)
self._loop_thread.listeners = self._listeners
def close(self) -> None:
pass
# TODO: does not work like this
self._loop_thread.kill = True

View File

@ -28,7 +28,7 @@ dynamic = ["readme", "version"]
[tool.poetry]
name = "pocketbase"
version = "0.1.3"
version = "0.2.0"
description = "PocketBase SDK for python."
authors = ["Vithor Jaeger <vaphes@gmail.com>"]
readme = "README.md"

View File

@ -2,4 +2,4 @@ from pocketbase import __version__
def test_version():
assert __version__ == "0.1.3"
assert __version__ == "0.2.0"