about summary refs log tree commit diff
path: root/src/petthecord/cache.py
blob: d111e9923b58303b9e79a8932e1a719ee16f7526 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from asyncio import sleep
from json import load, dump
from io import BytesIO
from logging import getLogger
from os import PathLike, makedirs, remove, listdir
from os.path import getmtime
from pathlib import Path
from time import time
from typing import NoReturn

import discord
from discord import Client
from petpetgif import petpet


class NotFound(Exception):
    pass


class HTTPException(Exception):
    pass


class CachedPet:
    def __init__(
        self,

        client: Client,
        caching: bool = True,
        path: str | PathLike = "/var/cache/petthecord",
        lifetime: int = 86400,
        gc_delay: int = 14400,
    ) -> None:
        self._client = client
        self._caching = caching
        self._path = Path(path).resolve()
        self._lifetime = lifetime
        self._gc_delay = gc_delay

        self._logger = getLogger(__name__)

        index_path = self._path / "index.json"
        try:
            if not index_path.exists():
                self._logger.warning("`index.json` doesn't exist, trying to create...")
                if not self._path.exists():
                    self._logger.warning("Cache folder doesnt exist, trying to create...")
                    makedirs(self._path, mode=0o755, exist_ok=True)
                with open(index_path, "w") as f:
                    f.write("{}")
        except OSError:
            self._logger.error("Cannot create environment")
            raise

        with open(index_path, "r") as f:
            self._cache = load(f)

    async def petpet(self, uid: int) -> bytes:
        try:
            user = await self._client.fetch_user(uid)
        except discord.NotFound:
            raise NotFound
        except discord.HTTPException:
            raise HTTPException

        if user.avatar is None:
            raise NotFound

        if self._caching:
            avatar_path = self._path / f"{user.id}_{user.avatar.key}.gif"
            if (path := self._cache.get(user.id)) != str(avatar_path):
                self._logger.debug("Generating new gif for {user.id}")
                if path:
                    remove(path)
                self._cache[user.id] = str(avatar_path)
                with open(self._path / "index.json", "w") as f:
                    dump(self._cache, f)

            if not avatar_path.exists():
                with open(avatar_path, "wb") as f:
                    image = await user.avatar.read()
                    petpet.make(BytesIO(image), f)

            avatar_path.touch()

            with open(avatar_path, "rb") as f:
                return f.read()
        else:
            with BytesIO() as f:
                image = await user.avatar.read()
                petpet.make(BytesIO(image), f)

                f.seek(0)

                return f.read()

    async def gc_loop(self) -> NoReturn:
        while True:
            if self._caching:
                self._logger.info("Starting new cache's gc iteration")

                for filename in listdir(self._path):
                    path = (self._path / filename)
                    if path.is_file() and filename != "index.json":
                        if (time() - getmtime(path) > self._lifetime):
                            self._logger.debug(f"Removing {filename}")
                            to_delete = filename.split('_')[0]
                            try:
                                del self._cache[to_delete]
                            except KeyError:
                                self._logger.warning(f"{to_delete} has been already removed from the index")
                            remove(path)
                with open(self._path / "index.json", "w") as f:
                    dump(self._cache, f)

                self._logger.debug("Finished collecting old cache")

            await sleep(self._gc_delay)