microblog.pub/core/db.py

39 lines
911 B
Python
Raw Normal View History

2019-08-01 15:24:18 -05:00
from enum import Enum
from enum import unique
from typing import Any
2019-08-01 15:25:58 -05:00
from typing import Dict
from typing import Iterable
2019-08-01 15:24:18 -05:00
from typing import Optional
from config import DB
_Q = Dict[str, Any]
_D = Dict[str, Any]
_Doc = Optional[_D]
2019-08-01 15:24:18 -05:00
@unique
class CollectionName(Enum):
ACTIVITIES = "activities"
2019-08-07 16:27:57 -05:00
REMOTE = "remote"
2019-08-01 15:24:18 -05:00
def find_one_activity(q: _Q) -> _Doc:
return DB[CollectionName.ACTIVITIES.value].find_one(q)
def find_activities(q: _Q) -> Iterable[_D]:
return DB[CollectionName.ACTIVITIES.value].find(q)
def update_one_activity(q: _Q, update: _Q) -> None:
DB[CollectionName.ACTIVITIES.value].update_one(q, update)
def update_many_activities(q: _Q, update: _Q) -> None:
DB[CollectionName.ACTIVITIES.value].update_many(q, update)
2019-08-07 16:27:57 -05:00
def update_one_remote(filter_: _Q, update: _Q, upsert: bool = False) -> None:
DB[CollectionName.REMOTE.value].update_one(filter_, update, upsert)