first commit

This commit is contained in:
Vithor Jaeger 2022-09-19 10:53:16 -04:00
commit 28d24981f8
61 changed files with 1258 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
/.venv
/.vscode
/dist
/poetry.lock
/pyproject.toml
/tests

0
README.rst Normal file
View File

3
pocketbase/__init__.py Normal file
View File

@ -0,0 +1,3 @@
__version__ = "0.1.0"
from .client import Client, ClientResponseError

Binary file not shown.

Binary file not shown.

Binary file not shown.

144
pocketbase/client.py Normal file
View File

@ -0,0 +1,144 @@
from typing import Any
from urllib.parse import urlencode
import httpx
from pocketbase.services.admins import Admins
from pocketbase.services.collections import Collections
from pocketbase.services.logs import Logs
from pocketbase.services.realtime import Realtime
from pocketbase.services.records import Records
from pocketbase.services.users import Users
from pocketbase.services.settings import Settings
from pocketbase.stores.base_auth_store import BaseAuthStore
# from pocketbase.stores.local_auth_store import LocalAuthStore
class ClientResponseError(Exception):
url: str = ""
status: int = 0
data: dict = {}
is_abort: bool = False
original_error: Any = None
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args)
self.url = kwargs.get("url", "")
self.status = kwargs.get("status", 0)
self.data = kwargs.get("data", {})
self.is_abort = kwargs.get("is_abort", False)
self.original_error = kwargs.get("original_error", None)
class Client:
base_url: str
lang: str
auth_store: BaseAuthStore
settings: Settings
admins: Admins
users: Users
collections: Collections
records: Records
logs: Logs
realtime: Realtime
def __init__(
self,
base_url: str = "/",
lang: str = "en-US",
auth_store: BaseAuthStore = None,
) -> None:
self.base_url = base_url
self.lang = lang
self.auth_store = auth_store or BaseAuthStore() # LocalAuthStore()
# services
self.admins = Admins(self)
self.users = Users(self)
self.records = Records(self)
self.collections = Collections(self)
self.logs = Logs(self)
self.settings = Settings(self)
self.realtime = Realtime(self)
def cancel_request(self, cancel_key: str):
return self
def cancel_all_requests(self):
return self
def send(self, path: str, req_config: dict[str:Any]) -> Any:
"""Sends an api http request."""
config = {"method": "GET"}
config.update(req_config)
# check if Authorization header can be added
if self.auth_store.token and (
not "headers" in config or "Authorization" not in config["headers"]
):
auth_type = "Admin"
if hasattr(self.auth_store.model, "verified"):
auth_type = "User"
config["headers"] = config.get("headers", {})
config["headers"].update(
{"Authorization": f"{auth_type} {self.auth_store.token}"}
)
# build url + path
url = self.build_url(path)
# send the request
method = config.get("method", "GET")
params = config.get("params", None)
headers = config.get("headers", None)
body = config.get("body", None)
try:
response = httpx.request(
method=method,
url=url,
params=params,
headers=headers,
json=body,
timeout=120,
)
except Exception as e:
raise ClientResponseError(
f"General request error. Original error: {e}",
original_error=e,
)
try:
data = response.json()
except Exception:
data = None
if response.status_code >= 400:
raise ClientResponseError(
f"Response error. Status code:{response.status_code}",
url=response.url,
status=response.status_code,
data=data,
)
return data
def build_url(self, path: str) -> str:
url = self.base_url
if not self.base_url.endswith("/"):
url += "/"
if path.startswith("/"):
path = path[1:]
return url + path
if __name__ == "__main__":
from pocketbase.stores.local_auth_store import LocalAuthStore
pb = Client(base_url="http://ares.olimpo:8090/", auth_store=LocalAuthStore())
# pb.admins.auth_via_email("vaphes@gmail.com", "vaphes2007")
print(pb.auth_store.token)
books = pb.collections.get_one("books")
print("ok")
# sacd = "nwvgaw6iiibv4fp"
# book = {
# "author": sacd,
# "name": "A study in red",
# "rating": 4.5,
# "summary": "The worst Sherlock Homes book",
# }
# data = pb.records.create("books", book)
# print(data)

View File

@ -0,0 +1,6 @@
from .admin import Admin
from .collection import Collection
from .external_auth import ExternalAuth
from .log_request import LogRequest
from .record import Record
from .user import User

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,17 @@
from typing import Any, Union
import datetime
from pocketbase.utils import to_datetime
from pocketbase.models.utils.base_model import BaseModel
class Admin(BaseModel):
avatar: int
email: str
last_reset_sent_at: Union[str, datetime.datetime]
def load(self, data: dict[str:Any]) -> None:
super().load(data)
self.avatar = data.get("avatar", 0)
self.email = data.get("email", "")
self.last_reset_sent_at = to_datetime(data.get("lastResetSentAt", ""))

View File

@ -0,0 +1,29 @@
from typing import Any, Optional
from pocketbase.models.utils.base_model import BaseModel
from pocketbase.models.utils.schema_field import SchemaField
class Collection(BaseModel):
name: str
schema: list[SchemaField]
system: bool
list_rule: Optional[str]
view_rule: Optional[str]
create_rule: Optional[str]
update_rule: Optional[str]
delete_rule: Optional[str]
def load(self, data: dict[str:Any]) -> None:
super().load(data)
self.name = data.get("name", "")
self.system = data.get("system", False)
self.list_rule = data.get("listRule", None)
self.view_rule = data.get("viewRule", None)
self.create_rule = data.get("createRule", None)
self.update_rule = data.get("updateRule", None)
self.delete_rule = data.get("deleteRule", "")
schema = data.get("schema", [])
self.schema = []
for field in schema:
self.schema.append(SchemaField(**field))

View File

@ -0,0 +1,14 @@
from typing import Any
from pocketbase.models.utils.base_model import BaseModel
class ExternalAuth(BaseModel):
user_id: str
provider: str
provider_id: str
def load(self, data: dict[str:Any]) -> None:
super().load(data)
self.user_id = data.get("userId", "")
self.provider = data.get("provider", "")
self.provider_id = data.get("providerId", "")

View File

@ -0,0 +1,27 @@
from typing import Any
from pocketbase.models.utils.base_model import BaseModel
class LogRequest(BaseModel):
url: str
method: str
status: int
auth: str
remote_ip: str
user_ip: str
referer: str
user_agent: str
meta: dict[str:Any]
def load(self, data: dict[str:Any]) -> None:
super().load(data)
self.url = data.get("url", "")
self.method = data.get("method", "")
self.status = data.get("status", 200)
self.auth = data.get("auth", "guest")
self.remote_ip = data.get("remoteIp", data.get("ip", ""))
self.user_ip = data.get("userIp", "")
self.referer = data.get("referer", "")
self.user_agent = data.get("userAgent", "")
self.meta = data.get("meta", {})

View File

@ -0,0 +1,30 @@
from typing import Any
from pocketbase.models.utils.base_model import BaseModel
from pocketbase.utils import camel_to_snake
class Record(BaseModel):
collection_id: str
collection_name: str
expand: dict[str:Any]
def load(self, data: dict[str:Any]) -> None:
super().load(data)
for key, value in data.items():
key = camel_to_snake(key).replace("@", "")
setattr(self, key, value)
self.collection_id = data.get("@collectionId", "")
self.collection_name = data.get("@collectionName", "")
expand = data.get("@expand", {})
if expand:
self.expand = expand
self.load_expanded()
@classmethod
def parse_expanded(cls, data: dict[str:Any]):
return cls(data)
def load_expanded(self) -> None:
for key, value in self.expand.items():
self.expand[key] = self.parse_expanded(value)

27
pocketbase/models/user.py Normal file
View File

@ -0,0 +1,27 @@
from typing import Any, Optional, Union
import datetime
from pocketbase.utils import to_datetime
from pocketbase.models.record import Record
from pocketbase.models.utils.base_model import BaseModel
class User(BaseModel):
email: str
verified: bool
last_reset_sent_at: Union[str, datetime.datetime]
last_verification_sent_at: Union[str, datetime.datetime]
profile: Optional[Record]
def load(self, data: dict[str:Any]) -> None:
super().load(data)
self.email = data.get("email", "")
self.verified = data.get("verified", "")
self.last_reset_sent_at = to_datetime(data.get("lastResetSentAt", ""))
self.last_verification_sent_at = to_datetime(
data.get("lastVerificationSentAt", "")
)
profile = data.get("profile", None)
self.profile = None
if profile:
self.profile = Record(profile)

View File

@ -0,0 +1,3 @@
from .base_model import BaseModel
from .list_result import ListResult
from .schema_field import SchemaField

View File

@ -0,0 +1,26 @@
from abc import ABC
from typing import Any, Union
import datetime
from pocketbase.utils import to_datetime
class BaseModel(ABC):
id: str
created: Union[str, datetime.datetime]
updated: Union[str, datetime.datetime]
def __init__(self, data: dict[str:Any] = {}) -> None:
super().__init__()
self.load(data)
def load(self, data: dict[str:Any]) -> None:
"""Loads `data` into the current model."""
self.id = data.pop("id", "")
self.created = to_datetime(data.pop("created", ""))
self.updated = to_datetime(data.pop("updated", ""))
@property
def is_new(self) -> bool:
"""Returns whether the current loaded data represent a stored db record."""
return not self.id or self.id == "00000000-0000-0000-0000-000000000000"

View File

@ -0,0 +1,12 @@
from dataclasses import dataclass, field
from pocketbase.models.utils.base_model import BaseModel
@dataclass
class ListResult:
page: int = 1
per_page: int = 0
total_items: int = 0
total_pages: int = 0
items: list[BaseModel] = field(default_factory=list)

View File

@ -0,0 +1,13 @@
from dataclasses import dataclass, field
from typing import Any
@dataclass
class SchemaField:
id: str = ""
name: str = ""
type: str = "text"
system: bool = False
required: bool = False
unique: bool = False
options: dict[str:Any] = field(default_factory=dict)

View File

@ -0,0 +1,7 @@
from .admins import Admins, AdminAuthResponse
from .collections import Collections
from .logs import Logs, HourlyStats
from .realtime import Realtime
from .records import Records
from .settings import Settings
from .users import Users, UserAuthResponse, AuthMethodsList, AuthProviderInfo

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,110 @@
from typing import Any
from pocketbase.models.utils.base_model import BaseModel
from pocketbase.services.utils.crud_service import CrudService
from pocketbase.models.admin import Admin
class AdminAuthResponse:
token: str
admin: Admin
def __init__(self, token: str, admin: Admin, **kwargs) -> None:
self.token = token
self.admin = admin
for key, value in kwargs.items():
setattr(self, key, value)
class Admins(CrudService):
def decode(self, data: dict[str:Any]) -> BaseModel:
return Admin(data)
def base_crud_path(self) -> str:
return "/api/admins"
def auth_response(self, response_data: dict) -> AdminAuthResponse:
"""Prepare successful authorize response."""
admin = self.decode(response_data.pop("admin", {}))
token = response_data.pop("token", "")
if token and admin:
self.client.auth_store.save(token, admin)
return AdminAuthResponse(token=token, admin=admin, **response_data)
def auth_via_email(
self, email: str, password: str, body_params: dict = {}, query_params: dict = {}
) -> AdminAuthResponse:
"""
Authenticate an admin account by its email and password
and returns a new admin token and data.
On success this method automatically updates the client's AuthStore data.
"""
body_params.update({"email": email, "password": password})
response_data = self.client.send(
self.base_crud_path() + "/auth-via-email",
{
"method": "POST",
"params": query_params,
"body": body_params,
"headers": {"Authorization": ""},
},
)
return self.auth_response(response_data)
def refresh(
self, body_params: dict = {}, query_params: dict = {}
) -> AdminAuthResponse:
"""
Refreshes the current admin authenticated instance and
returns a new token and admin data.
On success this method automatically updates the client's AuthStore data.
"""
return self.auth_response(
self.client.send(
self.base_crud_path() + "/refresh",
{"method": "POST", "params": query_params, "body": body_params},
)
)
def requestPasswordReset(
self, email: str, body_params: dict = {}, query_params: dict = {}
) -> bool:
"""Sends admin password reset request."""
body_params.update({"email": email})
self.client.send(
self.base_crud_path() + "/request-password-reset",
{
"method": "POST",
"params": query_params,
"body": body_params,
},
)
return True
def confirmPasswordReset(
self,
password_reset_token: str,
password: str,
password_confirm: str,
body_params: dict = {},
query_params: dict = {},
) -> AdminAuthResponse:
"""Confirms admin password reset request."""
body_params.update(
{
"token": password_reset_token,
"password": password,
"passwordConfirm": password_confirm,
}
)
return self.auth_response(
self.client.send(
self.base_crud_path() + "/confirm-password-reset",
{
"method": "POST",
"params": query_params,
"body": body_params,
},
)
)

View File

@ -0,0 +1,36 @@
from typing import Any
from pocketbase.services.utils.crud_service import CrudService
from pocketbase.models.utils.base_model import BaseModel
from pocketbase.models.collection import Collection
class Collections(CrudService):
def decode(self, data: dict[str:Any]) -> BaseModel:
return Collection(data)
def base_crud_path(self) -> str:
return "/api/collections"
def import_collections(
self,
collections: list[Collection],
delete_missing: bool = False,
query_params: dict = {},
) -> bool:
"""
Imports the provided collections.
If `delete_missing` is `True`, all local collections and schema fields,
that are not present in the imported configuration, WILL BE DELETED
(including their related records data)!
"""
self.client.send(
self.base_crud_path() + "/import",
{
"method": "PUT",
"params": query_params,
"body": {"collections": collections, "deleteMissing": delete_missing},
},
)
return True

View File

@ -0,0 +1,58 @@
from dataclasses import dataclass
from typing import Union
from urllib.parse import quote
import datetime
from pocketbase.services.utils.base_service import BaseService
from pocketbase.models.utils.list_result import ListResult
from pocketbase.models.log_request import LogRequest
from pocketbase.utils import to_datetime
@dataclass
class HourlyStats:
total: int
date: Union[str, datetime.datetime]
class Logs(BaseService):
def get_request_list(
self, page: int = 1, per_page: int = 30, query_params: dict = {}
) -> ListResult:
"""Returns paginated logged requests list."""
query_params.update({"page": page, "perPage": per_page})
response_data = self.client.send(
"/api/logs/requests",
{"method": "GET", "params": query_params},
)
items: list[LogRequest] = []
if "items" in response_data:
response_data["items"] = response_data["items"] or []
for item in response_data["items"]:
items.append(LogRequest(item))
return ListResult(
response_data.get("page", 1),
response_data.get("perPage", 0),
response_data.get("totalItems", 0),
response_data.get("totalPages", 0),
items,
)
def get_request(self, id: str, query_params: dict = {}) -> LogRequest:
"""Returns a single logged request by its id."""
return LogRequest(
self.client.send(
"/api/logs/requests/" + quote(id),
{"method": "GET", "params": query_params},
)
)
def get_requests_stats(self, query_params: dict = {}) -> list[HourlyStats]:
"""Returns request logs statistics."""
return [
HourlyStats(total=stat["total"], date=to_datetime(stat["date"]))
for stat in self.client.send(
"/api/logs/requests/stats",
{"method": "GET", "params": query_params},
)
]

View File

@ -0,0 +1,49 @@
from typing import Callable, Optional
from pocketbase.services.utils.base_service import BaseService
from pocketbase.models.record import Record
class Realtime(BaseService):
client_id: str
subscriptions: dict
def subscribe(self, subscription: str, callback: Callable) -> None:
"""Inits the sse connection (if not already) and register the subscription."""
self.subscriptions[subscription] = callback
def unsubscribe(self, subscription: Optional[str] = None) -> None:
"""
Unsubscribe from a subscription.
If the `subscription` argument is not set,
then the client will unsubscribe from all registered subscriptions.
The related sse connection will be autoclosed if after the
unsubscribe operations there are no active subscriptions left.
"""
pass
def _submit_subscriptions(self) -> bool:
self.client.send(
"/api/realtime",
{
"method": "POST",
"body": {
"clientId": self.client_id,
"subscriptions": self.subscriptions.keys(),
},
},
)
return True
def _add_subscription_listeners(self) -> None:
pass
def _remove_subscription_listeners(self) -> None:
pass
def _connect(self) -> None:
pass
def _disconnect(self) -> None:
pass

View File

@ -0,0 +1,26 @@
from typing import Any
from urllib.parse import quote, urlencode
from pocketbase.services.utils.sub_crud_service import SubCrudService
from pocketbase.models.utils.base_model import BaseModel
from pocketbase.models.record import Record
class Records(SubCrudService):
def decode(self, data: dict[str:Any]) -> BaseModel:
return Record(data)
def base_crud_path(self, collection_id_or_name: str) -> str:
return "/api/collections/" + quote(collection_id_or_name) + "/records"
def get_file_url(
self, record: Record, filename: str, query_params: dict = {}
) -> str:
"""Builds and returns an absolute record file url."""
base_url = self.client.base_url
if base_url.endswith("/"):
base_url = base_url[:-1]
result = f"{base_url}/api/files/{record.collection_id}/{record.id}/{filename}"
if query_params:
results += "?" + urlencode(query_params)
return result

View File

@ -0,0 +1,50 @@
from pocketbase.services.utils.base_service import BaseService
class Settings(BaseService):
def get_all(self, query_params: dict = {}) -> dict:
"""Fetch all available app settings."""
return self.client.send(
"/api/settings",
{"method": "GET", "params": query_params},
)
def update(self, body_params: dict = {}, query_params: dict = {}) -> dict:
"""Bulk updates app settings."""
return self.client.send(
"/api/settings",
{
"method": "PATCH",
"params": query_params,
"body": body_params,
},
)
def test_s3(self, query_params: dict = {}) -> bool:
"""Performs a S3 storage connection test."""
self.client.send(
"/api/settings/test/s3",
{"method": "POST", "params": query_params},
)
return True
def test_email(
self, to_email: str, email_template: str, query_params: dict = {}
) -> bool:
"""
Sends a test email.
The possible `email_template` values are:
- verification
- password-reset
- email-change
"""
self.client.send(
"/api/settings/test/email",
{
"method": "POST",
"params": query_params,
"body": {"email": to_email, "template": email_template},
},
)
return True

View File

@ -0,0 +1,280 @@
from dataclasses import dataclass
from typing import Any
from urllib.parse import quote
from pocketbase.services.utils.crud_service import CrudService
from pocketbase.models.utils.base_model import BaseModel
from pocketbase.models.user import User
from pocketbase.models.external_auth import ExternalAuth
class UserAuthResponse:
token: str
user: User
def __init__(self, token: str, user: User, **kwargs) -> None:
self.token = token
self.user = user
for key, value in kwargs.items():
setattr(self, key, value)
@dataclass
class AuthProviderInfo:
name: str
state: str
code_verifier: str
code_challenge: str
code_challenge_method: str
auth_url: str
@dataclass
class AuthMethodsList:
email_password: bool
auth_providers: list[AuthProviderInfo]
class Users(CrudService):
def decode(self, data: dict[str:Any]) -> BaseModel:
return User(data)
def base_crud_path(self) -> str:
return "/api/users"
def auth_response(self, response_data: Any) -> UserAuthResponse:
"""Prepare successful authorization response."""
user = self.decode(response_data.pop("user", {}))
token = response_data.pop("token", "")
if token and user:
self.client.auth_store.save(token, user)
return UserAuthResponse(token=token, user=user, **response_data)
def list_auth_methods(self, query_params: dict = {}) -> AuthMethodsList:
"""Returns all available application auth methods."""
response_data = self.client.send(
self.base_crud_path() + "/auth-methods",
{"method": "GET", "params": query_params},
)
email_password = response_data.get("emailPassword", False)
auth_providers = [
AuthProviderInfo(auth_provider)
for auth_provider in response_data.get("authProviders", [])
]
return AuthMethodsList(email_password, auth_providers)
def auth_via_email(
self, email: str, password: str, body_params: dict = {}, query_params: dict = {}
) -> UserAuthResponse:
"""
Authenticate a user via its email and password.
On success, this method also automatically updates
the client's AuthStore data and returns:
- new user authentication token
- the authenticated user model record
"""
body_params.update({"email": email, "password": password})
response_data = self.client.send(
self.base_crud_path() + "/auth-via-email",
{
"method": "POST",
"params": query_params,
"body": body_params,
"headers": {"Authorization": ""},
},
)
return self.auth_response(response_data)
def auth_via_oauth2(
self,
provider: str,
code: str,
code_verifier: str,
redirect_url: str,
body_params: dict = {},
query_params: dict = {},
) -> UserAuthResponse:
"""
Authenticate a user via OAuth2 client provider.
On success, this method also automatically updates
the client's AuthStore data and returns:
- new user authentication token
- the authenticated user model record
- the OAuth2 user profile data (eg. name, email, avatar, etc.)
"""
body_params.update(
{
"provider": provider,
"code": code,
"codeVerifier": code_verifier,
"redirectUrl": redirect_url,
}
)
response_data = self.client.send(
self.base_crud_path() + "/auth-via-oauth2",
{
"method": "POST",
"params": query_params,
"body": body_params,
"headers": {"Authorization": ""},
},
)
return self.auth_response(response_data)
def refresh(
self, body_params: dict = {}, query_params: dict = {}
) -> UserAuthResponse:
"""
Refreshes the current user authenticated instance and
returns a new token and user data.
On success this method also automatically updates the client's AuthStore data.
"""
return self.auth_response(
self.client.send(
self.base_crud_path() + "/refresh",
{"method": "POST", "params": query_params, "body": body_params},
)
)
def request_password_reset(
self, email: str, body_params: dict = {}, query_params: dict = {}
) -> bool:
"""Sends user password reset request."""
body_params.update({"email": email})
self.client.send(
self.base_crud_path() + "/request-password-reset",
{
"method": "POST",
"params": query_params,
"body": body_params,
},
)
return True
def confirm_password_reset(
self,
password_reset_token: str,
password: str,
password_confirm: str,
body_params: dict = {},
query_params: dict = {},
) -> UserAuthResponse:
"""Confirms user password reset request."""
body_params.update(
{
"token": password_reset_token,
"password": password,
"passwordConfirm": password_confirm,
}
)
return self.auth_response(
self.client.send(
self.base_crud_path() + "/confirm-password-reset",
{
"method": "POST",
"params": query_params,
"body": body_params,
},
)
)
def request_verification(
self, email: str, body_params: dict = {}, query_params: dict = {}
) -> bool:
"""Sends user verification email request."""
body_params.update({"email": email})
self.client.send(
self.base_crud_path() + "/request-verification",
{
"method": "POST",
"params": query_params,
"body": body_params,
},
)
return True
def confirm_verification(
self, verification_token: str, body_params: dict = {}, query_params: dict = {}
) -> UserAuthResponse:
"""Confirms user email verification request."""
body_params.update(
{
"token": verification_token,
}
)
return self.auth_response(
self.client.send(
self.base_crud_path() + "/confirm-verification",
{
"method": "POST",
"params": query_params,
"body": body_params,
},
)
)
def request_email_change(
self, new_email: str, body_params: dict = {}, query_params: dict = {}
) -> bool:
"""Sends an email change request to the authenticated user."""
body_params.update({"newEmail": new_email})
self.client.send(
self.base_crud_path() + "/request-email-change",
{
"method": "POST",
"params": query_params,
"body": body_params,
},
)
return True
def confirm_email_change(
self,
email_change_token: str,
password: str,
body_params: dict = {},
query_params: dict = {},
) -> UserAuthResponse:
"""Confirms user new email address."""
body_params.update(
{
"token": email_change_token,
"password": password,
}
)
return self.auth_response(
self.client.send(
self.base_crud_path() + "/confirm-email-change",
{
"method": "POST",
"params": query_params,
"body": body_params,
},
)
)
def list_external_auths(
self, user_id: str, query_params: dict = {}
) -> list[ExternalAuth]:
"""Lists all linked external auth providers for the specified user."""
response_data = self.client.send(
self.base_crud_path() + "/" + quote(user_id) + "/external-auths",
{"method": "GET", "params": query_params},
)
return [ExternalAuth(item) for item in response_data]
def unlink_external_auth(
self, user_id: str, provider: str, query_params: dict = {}
) -> bool:
"""Unlink a single external auth provider from the specified user."""
self.client.send(
self.base_crud_path()
+ "/"
+ quote(user_id)
+ "/external-auths/"
+ quote(provider),
{"method": "DELETE", "params": query_params},
)
return True

View File

@ -0,0 +1,4 @@
from .base_crud_service import BaseCrudService
from .base_service import BaseService
from .crud_service import CrudService
from .sub_crud_service import SubCrudService

View File

@ -0,0 +1,83 @@
from abc import ABC
from urllib.parse import quote
from typing import Any
from pocketbase.models.utils.base_model import BaseModel
from pocketbase.models.utils.list_result import ListResult
from pocketbase.services.utils.base_service import BaseService
class BaseCrudService(BaseService, ABC):
def decode(self, data: dict[str:Any]) -> BaseModel:
"""Response data decoder"""
def _get_full_list(
self, base_path: str, batch_size: int = 100, query_params: dict = {}
) -> list[BaseModel]:
result: list[BaseModel] = []
def request(result: list[BaseModel], page: int) -> list[Any]:
list = self._get_list(base_path, page, batch_size, query_params)
items = list.items
total_items = list.total_items
result += items
if len(items) > 0 and total_items > len(result):
return request(result, page + 1)
return result
return request(result, 1)
def _get_list(
self, base_path: str, page: int = 1, per_page: int = 30, query_params: dict = {}
) -> ListResult:
query_params.update({"page": page, "perPage": per_page})
response_data = self.client.send(
base_path, {"method": "GET", "params": query_params}
)
items: list[BaseModel] = []
if "items" in response_data:
response_data["items"] = response_data["items"] or []
for item in response_data["items"]:
items.append(self.decode(item))
return ListResult(
response_data.get("page", 1),
response_data.get("perPage", 0),
response_data.get("totalItems", 0),
response_data.get("totalPages", 0),
items,
)
def _get_one(self, base_path: str, id: str, query_params: dict = {}) -> BaseModel:
return self.decode(
self.client.send(
f"{base_path}/{quote(id)}", {"method": "GET", "params": query_params}
)
)
def _create(
self, base_path: str, body_params: dict = {}, query_params: dict = {}
) -> BaseModel:
return self.decode(
self.client.send(
base_path,
{"method": "POST", "params": query_params, "body": body_params},
)
)
def _update(
self, base_path: str, id: str, body_params: dict = {}, query_params: dict = {}
) -> BaseModel:
return self.decode(
self.client.send(
f"{base_path}/{quote(id)}",
{"method": "PATCH", "params": query_params, "body": body_params},
)
)
def _delete(self, base_path: str, id: str, query_params: dict = {}) -> bool:
self.client.send(
f"{base_path}/{quote(id)}", {"method": "DELETE", "params": query_params}
)
return True

View File

@ -0,0 +1,7 @@
from abc import ABC
class BaseService(ABC):
def __init__(self, client) -> None:
super().__init__()
self.client = client

View File

@ -0,0 +1,34 @@
from abc import ABC
from pocketbase.models.utils.base_model import BaseModel
from pocketbase.models.utils.list_result import ListResult
from pocketbase.services.utils.base_crud_service import BaseCrudService
class CrudService(BaseCrudService, ABC):
def base_crud_path(self) -> str:
"""Base path for the crud actions (without trailing slash, eg. '/admins')."""
def get_full_list(
self, batch_size: int = 100, query_params: dict = {}
) -> list[BaseModel]:
return self._get_full_list(self.base_crud_path(), batch_size, query_params)
def get_list(
self, page: int = 1, per_page: int = 30, query_params: dict = {}
) -> ListResult:
return self._get_list(self.base_crud_path(), page, per_page, query_params)
def get_one(self, id: str, query_params: dict = {}) -> BaseModel:
return self._get_one(self.base_crud_path(), id, query_params)
def create(self, body_params: dict = {}, query_params: dict = {}) -> BaseModel:
return self._create(self.base_crud_path(), body_params, query_params)
def update(
self, id: str, body_params: dict = {}, query_params: dict = {}
) -> BaseModel:
return self._update(self.base_crud_path(), id, body_params, query_params)
def delete(self, id: str, query_params: dict = {}) -> bool:
return self._delete(self.base_crud_path(), id, query_params)

View File

@ -0,0 +1,36 @@
from abc import ABC
from pocketbase.models.utils.base_model import BaseModel
from pocketbase.models.utils.list_result import ListResult
from pocketbase.services.utils.base_crud_service import BaseCrudService
class SubCrudService(BaseCrudService, ABC):
def base_crud_path(self) -> str:
"""Base path for the crud actions (without trailing slash, eg. '/admins')."""
def get_full_list(
self, sub: str, batch_size: int = 100, query_params: dict = {}
) -> list[BaseModel]:
return self._get_full_list(self.base_crud_path(sub), batch_size, query_params)
def get_list(
self, sub: str, page: int = 1, per_page: int = 30, query_params: dict = {}
) -> ListResult:
return self._get_list(self.base_crud_path(sub), page, per_page, query_params)
def get_one(self, sub: str, id: str, query_params: dict = {}) -> BaseModel:
return self._get_one(self.base_crud_path(sub), id, query_params)
def create(
self, sub: str, body_params: dict = {}, query_params: dict = {}
) -> BaseModel:
return self._create(self.base_crud_path(sub), body_params, query_params)
def update(
self, sub: str, id: str, body_params: dict = {}, query_params: dict = {}
) -> BaseModel:
return self._update(self.base_crud_path(sub), id, body_params, query_params)
def delete(self, sub: str, id: str, query_params: dict = {}) -> bool:
return self._delete(self.base_crud_path(sub), id, query_params)

View File

@ -0,0 +1,2 @@
from .base_auth_store import BaseAuthStore
from .local_auth_store import LocalAuthStore

Binary file not shown.

View File

@ -0,0 +1,42 @@
from abc import ABC
from typing import Union, Optional
from pocketbase.models.admin import Admin
from pocketbase.models.user import User
class BaseAuthStore(ABC):
"""
Base AuthStore class that is intended to be extended by all other
PocketBase AuthStore implementations.
"""
base_token: str
base_model: Union[User, Admin, None]
def __init__(
self, base_token: str = "", base_model: Optional[Union[User, Admin]] = None
) -> None:
super().__init__()
self.base_token = base_token
self.base_model = base_model
@property
def token(self) -> Union[str, None]:
"""Retrieves the stored token (if any)."""
return self.base_token
@property
def model(self) -> Union[User, Admin, None]:
"""Retrieves the stored model data (if any)."""
return self.base_model
def save(self, token: str = "", model: Optional[Union[User, Admin]] = None) -> None:
"""Saves the provided new token and model data in the auth store."""
self.base_token = token
self.base_model = model
def clear(self) -> None:
"""Removes the stored token and model data form the auth store."""
self.base_token = None
self.base_model = None

View File

@ -0,0 +1,59 @@
from typing import Any, Optional, Union
import pickle
import os
from pocketbase.stores.base_auth_store import BaseAuthStore
from pocketbase.models.user import User
from pocketbase.models.admin import Admin
class LocalAuthStore(BaseAuthStore):
filename: str
filepath: str
def __init__(
self,
filename: str = "pocketbase_auth.data",
filepath: str = "",
base_token: str = "",
base_model: Optional[Union[User, Admin]] = None,
) -> None:
super().__init__(base_token, base_model)
self.filename = filename
self.filepath = filepath
self.complete_filepath = os.path.join(filepath, filename)
@property
def token(self) -> str:
data = self._storage_get(self.complete_filepath)
if not data or not "token" in data:
return None
return data["token"]
@property
def model(self) -> Union[User, Admin, None]:
data = self._storage_get(self.complete_filepath)
if not data or not "model" in data:
return None
return data["model"]
def save(self, token: str = "", model: Optional[Union[User, Admin]] = None) -> None:
self._storage_set(self.complete_filepath, {"token": token, "model": model})
super().save(token, model)
def clear(self) -> None:
self._storage_remove(self.complete_filepath)
super().clear()
def _storage_set(self, key: str, value: Any) -> None:
with open(key, "wb") as f:
pickle.dump(value, f)
def _storage_get(self, key: str) -> Any:
with open(key, "rb") as f:
value = pickle.load(f)
return value
def _storage_remove(self, key: str) -> None:
if os.path.exists(key):
os.remove(key)

18
pocketbase/utils.py Normal file
View File

@ -0,0 +1,18 @@
import re
import datetime
from typing import Union
def camel_to_snake(name: str) -> str:
name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", name).lower()
def to_datetime(
str_datetime: str, format: str = "%Y-%m-%d %H:%M:%S"
) -> Union[datetime.datetime, str]:
str_datetime = str_datetime.split(".")[0]
try:
return datetime.datetime.strptime(str_datetime, format)
except Exception:
return str_datetime