woc.remote

  1import asyncio
  2import logging
  3import os
  4import threading
  5from datetime import datetime, timedelta
  6from functools import cached_property
  7from json import JSONDecodeError
  8from typing import (
  9    AsyncGenerator,
 10    Awaitable,
 11    Dict,
 12    Generator,
 13    List,
 14    Optional,
 15    Tuple,
 16    Union,
 17)
 18from urllib.parse import quote_plus
 19
 20import httpx
 21from tqdm.asyncio import tqdm_asyncio
 22
 23from .base import WocMap, WocMapsBase, WocObject
 24
 25
 26class RateLimitHandler:
 27    def __init__(self):
 28        self.next_request_time = datetime.now()
 29        self.lock = asyncio.Lock()
 30
 31    async def wait_if_needed(self):
 32        async with self.lock:
 33            now = datetime.now()
 34            if now < self.next_request_time:
 35                wait_time = (self.next_request_time - now).total_seconds()
 36                await asyncio.sleep(wait_time)
 37
 38    def update_next_request_time(self, retry_after: int):
 39        self.next_request_time = datetime.now() + timedelta(seconds=retry_after)
 40
 41
 42async def fetch_with_rate_limit(
 43    url: str,
 44    client: httpx.AsyncClient,
 45    rate_limiter: RateLimitHandler,
 46    max_retries: int = 4,
 47    base_delay: int = 4,
 48) -> Optional[httpx.Response]:
 49    for attempt in range(max_retries):
 50        await rate_limiter.wait_if_needed()
 51
 52        response = await client.get(url)
 53
 54        if response.status_code == 429:
 55            retry_after = int(response.headers.get("Retry-After", 60))
 56            rate_limiter.update_next_request_time(retry_after)
 57
 58            if attempt < max_retries - 1:
 59                delay = base_delay * (2**attempt)  # exponential backoff
 60                logging.warning(
 61                    f"Rate limit hit for {url}. "
 62                    f"Waiting {retry_after} seconds. Attempt {attempt + 1}/{max_retries}"
 63                )
 64                await asyncio.sleep(delay)
 65                continue
 66
 67        if response.status_code in (404, 400):
 68            try:
 69                _msg = response.json()["detail"]
 70            except (KeyError, JSONDecodeError):
 71                _msg = response.text
 72            raise KeyError(_msg)
 73
 74        response.raise_for_status()
 75        return response.json()
 76
 77
 78class WocMapsRemoteAsync(WocMapsBase):
 79    def __init__(
 80        self,
 81        base_url: Optional[str] = None,
 82        api_key: Optional[str] = None,
 83        max_connections: Optional[int] = 10,
 84        version: Optional[Union[str, List[str]]] = None,
 85    ):
 86        self.base_url = (
 87            base_url or os.getenv("WOC_BASE_URL") or "https://worldofcode.org/api"
 88        )
 89        self.api_key = api_key or os.getenv("WOC_API_KEY")
 90        self.client = httpx.AsyncClient(
 91            headers={"Authorization": f"Bearer {self.api_key}"} if self.api_key else None,
 92            base_url=self.base_url,
 93            limits=httpx.Limits(
 94                max_connections=max_connections if max_connections else 10
 95            ),
 96        )
 97        self.rate_limiter = RateLimitHandler()
 98        if version is not None:
 99            raise NotImplementedError(
100                "Versioning is not implemented in WoC HTTP API. "
101                "If you feel it is necessary, please create a feature request at "
102                "https://github.com/ssc-oscar/python-woc/issues/new"
103            )
104
105    async def get_maps(self) -> List[WocMap]:
106        r = await fetch_with_rate_limit(
107            "/lookup/map",
108            self.client,
109            self.rate_limiter,
110        )
111        return [
112            WocMap(
113                name=m["name"],
114                version=m["version"],
115                sharding_bits=m["sharding_bits"],
116                shards=[],
117                larges={},
118                dtypes=m["dtypes"],
119            )
120            for m in r["data"]
121        ]
122
123    @property
124    def maps(self):
125        raise NotImplementedError("use await get_maps() instead")
126
127    async def get_objects(self) -> List[WocObject]:
128        r = await fetch_with_rate_limit(
129            "/lookup/object",
130            self.client,
131            self.rate_limiter,
132        )
133        return [
134            WocObject(
135                name=o["name"],
136                sharding_bits=o["sharding_bits"],
137                shards=[],
138            )
139            for o in r["data"]
140        ]
141
142    @property
143    def objects(self):
144        raise NotImplementedError("use await get_objects() instead")
145
146    async def get_values(
147        self, map_name: str, key: Union[bytes, str]
148    ) -> Union[List[str], Tuple[str], List[Tuple[str]]]:
149        r = await fetch_with_rate_limit(
150            f"/lookup/map/{map_name}/{key}",
151            self.client,
152            self.rate_limiter,
153        )
154        return r["data"]
155
156    async def _get_many(
157        self,
158        url_prefix: str,
159        keys: List[Union[bytes, str]],
160        progress: bool = False,
161    ):
162        # first we need to split the keys into chunks of 10
163        chunks = [keys[i : i + 10] for i in range(0, len(keys), 10)]
164        result = {}
165        errors = {}
166        promises = [
167            fetch_with_rate_limit(
168                f"{url_prefix}?{'&'.join([f'q={quote_plus(k)}' for k in chunk])}",
169                self.client,
170                self.rate_limiter,
171            )
172            for chunk in chunks
173        ]
174        # fire all requests in parallel
175        if progress:
176            responses = await tqdm_asyncio.gather(*promises)
177        else:
178            responses = await asyncio.gather(*promises)
179        for response in responses:
180            result.update(response["data"])
181            if "errors" in response:
182                errors.update(response["errors"])
183        return result, errors
184
185    async def get_values_many(
186        self, map_name: str, keys: List[Union[bytes, str]], progress: bool = False
187    ) -> Tuple[Dict[str, Union[List[str], Tuple[str], List[Tuple[str]]]], Dict[str, str]]:
188        """
189        Eqivalent to getValues in WoC Perl API but fetch multiple keys at once.
190
191        Similar to get_values, this function uses WoC batch API to fetch multiple keys at once.
192        It returns a tuple of two dictionaries: [results, errors].
193        """
194        return await self._get_many(f"/lookup/map/{map_name}", keys, progress)
195
196    async def iter_values(
197        self, map_name: str, key: Union[bytes, str]
198    ) -> AsyncGenerator[List[str], None]:
199        cursor = 0
200        while cursor is not None:
201            r = await fetch_with_rate_limit(
202                f"/lookup/map/{map_name}/{key}?cursor={cursor}",
203                self.client,
204                self.rate_limiter,
205            )
206            cursor = r["nextCursor"] if "nextCursor" in r else None
207            yield r["data"]
208
209    async def show_content(
210        self, obj_name: str, key: Union[bytes, str]
211    ) -> Union[
212        List[Tuple[str, str, str]],
213        str,
214        Tuple[str, Tuple[str, str, str], Tuple[str, str, str], str],
215    ]:
216        r = await fetch_with_rate_limit(
217            f"/lookup/object/{obj_name}/{key}",
218            self.client,
219            self.rate_limiter,
220        )
221        return r["data"]
222
223    async def show_content_many(
224        self, obj_name: str, keys: List[Union[bytes, str]], progress: bool = False
225    ) -> Tuple[
226        Dict[
227            str,
228            Union[
229                List[Tuple[str, str, str]],
230                str,
231                Tuple[str, Tuple[str, str, str], Tuple[str, str, str], str],
232            ],
233        ],
234        Dict[str, str],
235    ]:
236        """
237        Eqivalent to showCnt in WoC Perl API but fetch multiple keys at once.
238
239        Similar to show_content, this function uses WoC batch API to fetch multiple keys at once.
240        It returns a tuple of two dictionaries: [results, errors].
241        """
242        return await self._get_many(f"/lookup/object/{obj_name}", keys, progress)
243
244    async def count(self, map_name: str) -> int:
245        if "map_name" in ("blob", "commit", "tag", "tree"):
246            r = await fetch_with_rate_limit(
247                f"/lookup/object/{map_name}/count",
248                self.client,
249                self.rate_limiter,
250            )
251            return r["data"]
252        else:
253            r = await fetch_with_rate_limit(
254                f"/lookup/map/{map_name}/count",
255                self.client,
256                self.rate_limiter,
257            )
258            return r["data"]
259
260    async def all_keys(self, map_name: str) -> Generator[bytes, None, None]:
261        raise NotImplementedError(
262            "all_keys is not implemented in WoC HTTP API. "
263            "If you feel it is necessary, please create a feature request at "
264            "https://github.com/ssc-oscar/python-woc/issues/new"
265        )
266
267
268def _start_background_loop(loop):
269    asyncio.set_event_loop(loop)
270    loop.run_forever()
271
272
273class WocMapsRemote(WocMapsRemoteAsync):
274    def __init__(self, *args, **kwargs):
275        super().__init__(*args, **kwargs)
276        # run the event loop on a background thread
277        # so it works in jupyter notebook
278        # ref: https://stackoverflow.com/questions/52232177/runtimeerror-timeout-context-manager-should-be-used-inside-a-task/69514930#69514930
279        self._loop = asyncio.new_event_loop()
280        _t = threading.Thread(
281            target=_start_background_loop, args=(self._loop,), daemon=True
282        )
283        _t.start()
284
285    def _asyncio_run(self, coro: Awaitable, timeout=30):
286        """
287        Runs the coroutine in an event loop running on a background thread, and blocks the current thread until it returns a result. This plays well with gevent, since it can yield on the Future result call.
288
289        :param coro: A coroutine, typically an async method
290        :param timeout: How many seconds we should wait for a result before raising an error
291        """
292        return asyncio.run_coroutine_threadsafe(coro, self._loop).result(timeout=timeout)
293
294    def _asyncio_gather(self, *futures, return_exceptions=False) -> list:
295        """
296        A version of asyncio.gather that runs on the internal event loop
297        """
298
299        async def gather():
300            return await asyncio.gather(*futures, return_exceptions=return_exceptions)
301
302        return asyncio.run_coroutine_threadsafe(gather(), loop=self._loop).result()
303
304    def get_values(self, map_name: str, key: str) -> List[str]:
305        return self._asyncio_run(super().get_values(map_name, key))
306
307    def get_values_many(
308        self, map_name: str, keys: List[str], progress: bool = False
309    ) -> Tuple[Dict[str, List[str]], Dict[str, str]]:
310        return self._asyncio_run(super().get_values_many(map_name, keys, progress))
311
312    def iter_values(self, map_name: str, key: str) -> Generator[List[str], None, None]:
313        async_gen = super().iter_values(map_name, key)
314        while True:
315            try:
316                for i in self._asyncio_run(async_gen.__anext__()):
317                    yield i
318            except StopAsyncIteration:
319                break
320
321    def show_content(self, obj_name: str, key: str) -> List[Tuple[str, str, str]]:
322        return self._asyncio_run(super().show_content(obj_name, key))
323
324    def show_content_many(
325        self, obj_name: str, keys: List[str], progress: bool = False
326    ) -> Tuple[Dict[str, List[Tuple[str, str, str]]], Dict[str, str]]:
327        return self._asyncio_run(super().show_content_many(obj_name, keys, progress))
328
329    def count(self, map_name: str) -> int:
330        return self._asyncio_run(super().count(map_name))
331
332    def all_keys(self, map_name: str) -> Generator[bytes, None, None]:
333        return self._asyncio_run(super().all_keys(map_name))
334
335    @cached_property
336    def maps(self) -> List[WocMap]:
337        return self._asyncio_run(super().get_maps())
338
339    @cached_property
340    def objects(self) -> List[WocObject]:
341        return self._asyncio_run(super().get_objects())
class RateLimitHandler:
27class RateLimitHandler:
28    def __init__(self):
29        self.next_request_time = datetime.now()
30        self.lock = asyncio.Lock()
31
32    async def wait_if_needed(self):
33        async with self.lock:
34            now = datetime.now()
35            if now < self.next_request_time:
36                wait_time = (self.next_request_time - now).total_seconds()
37                await asyncio.sleep(wait_time)
38
39    def update_next_request_time(self, retry_after: int):
40        self.next_request_time = datetime.now() + timedelta(seconds=retry_after)
next_request_time
lock
async def wait_if_needed(self):
32    async def wait_if_needed(self):
33        async with self.lock:
34            now = datetime.now()
35            if now < self.next_request_time:
36                wait_time = (self.next_request_time - now).total_seconds()
37                await asyncio.sleep(wait_time)
def update_next_request_time(self, retry_after: int):
39    def update_next_request_time(self, retry_after: int):
40        self.next_request_time = datetime.now() + timedelta(seconds=retry_after)
async def fetch_with_rate_limit( url: str, client: httpx.AsyncClient, rate_limiter: RateLimitHandler, max_retries: int = 4, base_delay: int = 4) -> Union[httpx.Response, NoneType]:
43async def fetch_with_rate_limit(
44    url: str,
45    client: httpx.AsyncClient,
46    rate_limiter: RateLimitHandler,
47    max_retries: int = 4,
48    base_delay: int = 4,
49) -> Optional[httpx.Response]:
50    for attempt in range(max_retries):
51        await rate_limiter.wait_if_needed()
52
53        response = await client.get(url)
54
55        if response.status_code == 429:
56            retry_after = int(response.headers.get("Retry-After", 60))
57            rate_limiter.update_next_request_time(retry_after)
58
59            if attempt < max_retries - 1:
60                delay = base_delay * (2**attempt)  # exponential backoff
61                logging.warning(
62                    f"Rate limit hit for {url}. "
63                    f"Waiting {retry_after} seconds. Attempt {attempt + 1}/{max_retries}"
64                )
65                await asyncio.sleep(delay)
66                continue
67
68        if response.status_code in (404, 400):
69            try:
70                _msg = response.json()["detail"]
71            except (KeyError, JSONDecodeError):
72                _msg = response.text
73            raise KeyError(_msg)
74
75        response.raise_for_status()
76        return response.json()
class WocMapsRemoteAsync(woc.base.WocMapsBase):
 79class WocMapsRemoteAsync(WocMapsBase):
 80    def __init__(
 81        self,
 82        base_url: Optional[str] = None,
 83        api_key: Optional[str] = None,
 84        max_connections: Optional[int] = 10,
 85        version: Optional[Union[str, List[str]]] = None,
 86    ):
 87        self.base_url = (
 88            base_url or os.getenv("WOC_BASE_URL") or "https://worldofcode.org/api"
 89        )
 90        self.api_key = api_key or os.getenv("WOC_API_KEY")
 91        self.client = httpx.AsyncClient(
 92            headers={"Authorization": f"Bearer {self.api_key}"} if self.api_key else None,
 93            base_url=self.base_url,
 94            limits=httpx.Limits(
 95                max_connections=max_connections if max_connections else 10
 96            ),
 97        )
 98        self.rate_limiter = RateLimitHandler()
 99        if version is not None:
100            raise NotImplementedError(
101                "Versioning is not implemented in WoC HTTP API. "
102                "If you feel it is necessary, please create a feature request at "
103                "https://github.com/ssc-oscar/python-woc/issues/new"
104            )
105
106    async def get_maps(self) -> List[WocMap]:
107        r = await fetch_with_rate_limit(
108            "/lookup/map",
109            self.client,
110            self.rate_limiter,
111        )
112        return [
113            WocMap(
114                name=m["name"],
115                version=m["version"],
116                sharding_bits=m["sharding_bits"],
117                shards=[],
118                larges={},
119                dtypes=m["dtypes"],
120            )
121            for m in r["data"]
122        ]
123
124    @property
125    def maps(self):
126        raise NotImplementedError("use await get_maps() instead")
127
128    async def get_objects(self) -> List[WocObject]:
129        r = await fetch_with_rate_limit(
130            "/lookup/object",
131            self.client,
132            self.rate_limiter,
133        )
134        return [
135            WocObject(
136                name=o["name"],
137                sharding_bits=o["sharding_bits"],
138                shards=[],
139            )
140            for o in r["data"]
141        ]
142
143    @property
144    def objects(self):
145        raise NotImplementedError("use await get_objects() instead")
146
147    async def get_values(
148        self, map_name: str, key: Union[bytes, str]
149    ) -> Union[List[str], Tuple[str], List[Tuple[str]]]:
150        r = await fetch_with_rate_limit(
151            f"/lookup/map/{map_name}/{key}",
152            self.client,
153            self.rate_limiter,
154        )
155        return r["data"]
156
157    async def _get_many(
158        self,
159        url_prefix: str,
160        keys: List[Union[bytes, str]],
161        progress: bool = False,
162    ):
163        # first we need to split the keys into chunks of 10
164        chunks = [keys[i : i + 10] for i in range(0, len(keys), 10)]
165        result = {}
166        errors = {}
167        promises = [
168            fetch_with_rate_limit(
169                f"{url_prefix}?{'&'.join([f'q={quote_plus(k)}' for k in chunk])}",
170                self.client,
171                self.rate_limiter,
172            )
173            for chunk in chunks
174        ]
175        # fire all requests in parallel
176        if progress:
177            responses = await tqdm_asyncio.gather(*promises)
178        else:
179            responses = await asyncio.gather(*promises)
180        for response in responses:
181            result.update(response["data"])
182            if "errors" in response:
183                errors.update(response["errors"])
184        return result, errors
185
186    async def get_values_many(
187        self, map_name: str, keys: List[Union[bytes, str]], progress: bool = False
188    ) -> Tuple[Dict[str, Union[List[str], Tuple[str], List[Tuple[str]]]], Dict[str, str]]:
189        """
190        Eqivalent to getValues in WoC Perl API but fetch multiple keys at once.
191
192        Similar to get_values, this function uses WoC batch API to fetch multiple keys at once.
193        It returns a tuple of two dictionaries: [results, errors].
194        """
195        return await self._get_many(f"/lookup/map/{map_name}", keys, progress)
196
197    async def iter_values(
198        self, map_name: str, key: Union[bytes, str]
199    ) -> AsyncGenerator[List[str], None]:
200        cursor = 0
201        while cursor is not None:
202            r = await fetch_with_rate_limit(
203                f"/lookup/map/{map_name}/{key}?cursor={cursor}",
204                self.client,
205                self.rate_limiter,
206            )
207            cursor = r["nextCursor"] if "nextCursor" in r else None
208            yield r["data"]
209
210    async def show_content(
211        self, obj_name: str, key: Union[bytes, str]
212    ) -> Union[
213        List[Tuple[str, str, str]],
214        str,
215        Tuple[str, Tuple[str, str, str], Tuple[str, str, str], str],
216    ]:
217        r = await fetch_with_rate_limit(
218            f"/lookup/object/{obj_name}/{key}",
219            self.client,
220            self.rate_limiter,
221        )
222        return r["data"]
223
224    async def show_content_many(
225        self, obj_name: str, keys: List[Union[bytes, str]], progress: bool = False
226    ) -> Tuple[
227        Dict[
228            str,
229            Union[
230                List[Tuple[str, str, str]],
231                str,
232                Tuple[str, Tuple[str, str, str], Tuple[str, str, str], str],
233            ],
234        ],
235        Dict[str, str],
236    ]:
237        """
238        Eqivalent to showCnt in WoC Perl API but fetch multiple keys at once.
239
240        Similar to show_content, this function uses WoC batch API to fetch multiple keys at once.
241        It returns a tuple of two dictionaries: [results, errors].
242        """
243        return await self._get_many(f"/lookup/object/{obj_name}", keys, progress)
244
245    async def count(self, map_name: str) -> int:
246        if "map_name" in ("blob", "commit", "tag", "tree"):
247            r = await fetch_with_rate_limit(
248                f"/lookup/object/{map_name}/count",
249                self.client,
250                self.rate_limiter,
251            )
252            return r["data"]
253        else:
254            r = await fetch_with_rate_limit(
255                f"/lookup/map/{map_name}/count",
256                self.client,
257                self.rate_limiter,
258            )
259            return r["data"]
260
261    async def all_keys(self, map_name: str) -> Generator[bytes, None, None]:
262        raise NotImplementedError(
263            "all_keys is not implemented in WoC HTTP API. "
264            "If you feel it is necessary, please create a feature request at "
265            "https://github.com/ssc-oscar/python-woc/issues/new"
266        )
WocMapsRemoteAsync( base_url: Union[str, NoneType] = None, api_key: Union[str, NoneType] = None, max_connections: Union[int, NoneType] = 10, version: Union[str, List[str], NoneType] = None)
 80    def __init__(
 81        self,
 82        base_url: Optional[str] = None,
 83        api_key: Optional[str] = None,
 84        max_connections: Optional[int] = 10,
 85        version: Optional[Union[str, List[str]]] = None,
 86    ):
 87        self.base_url = (
 88            base_url or os.getenv("WOC_BASE_URL") or "https://worldofcode.org/api"
 89        )
 90        self.api_key = api_key or os.getenv("WOC_API_KEY")
 91        self.client = httpx.AsyncClient(
 92            headers={"Authorization": f"Bearer {self.api_key}"} if self.api_key else None,
 93            base_url=self.base_url,
 94            limits=httpx.Limits(
 95                max_connections=max_connections if max_connections else 10
 96            ),
 97        )
 98        self.rate_limiter = RateLimitHandler()
 99        if version is not None:
100            raise NotImplementedError(
101                "Versioning is not implemented in WoC HTTP API. "
102                "If you feel it is necessary, please create a feature request at "
103                "https://github.com/ssc-oscar/python-woc/issues/new"
104            )
base_url
api_key
client
rate_limiter
async def get_maps(self) -> List[woc.base.WocMap]:
106    async def get_maps(self) -> List[WocMap]:
107        r = await fetch_with_rate_limit(
108            "/lookup/map",
109            self.client,
110            self.rate_limiter,
111        )
112        return [
113            WocMap(
114                name=m["name"],
115                version=m["version"],
116                sharding_bits=m["sharding_bits"],
117                shards=[],
118                larges={},
119                dtypes=m["dtypes"],
120            )
121            for m in r["data"]
122        ]
maps
124    @property
125    def maps(self):
126        raise NotImplementedError("use await get_maps() instead")

List of basemaps available in the WoC database.

async def get_objects(self) -> List[woc.base.WocObject]:
128    async def get_objects(self) -> List[WocObject]:
129        r = await fetch_with_rate_limit(
130            "/lookup/object",
131            self.client,
132            self.rate_limiter,
133        )
134        return [
135            WocObject(
136                name=o["name"],
137                sharding_bits=o["sharding_bits"],
138                shards=[],
139            )
140            for o in r["data"]
141        ]
objects
143    @property
144    def objects(self):
145        raise NotImplementedError("use await get_objects() instead")

List of objects available in the WoC database.

async def get_values( self, map_name: str, key: Union[bytes, str]) -> Union[List[str], Tuple[str], List[Tuple[str]]]:
147    async def get_values(
148        self, map_name: str, key: Union[bytes, str]
149    ) -> Union[List[str], Tuple[str], List[Tuple[str]]]:
150        r = await fetch_with_rate_limit(
151            f"/lookup/map/{map_name}/{key}",
152            self.client,
153            self.rate_limiter,
154        )
155        return r["data"]

Eqivalent to getValues in WoC Perl API.

Parameters
  • map_name: The name of the map, e.g. 'c2p', 'c2r', 'P2c'
  • key: The key of the object. For git objects, it is the SHA-1 hash of the object (in bytes or hex string). For other objects like Author, it is the name of the object.
Returns

The value of the object. Can be a list of strings, a tuple of strings, or a list of tuples of strings. Please refer to the documentation for details.

>>> self.get_values('P2c', 'user2589_minicms')
['05cf84081b63cda822ee407e688269b494a642de', ...]
>>> self.get_values('c2r', 'e4af89166a17785c1d741b8b1d5775f3223f510f')
('9531fc286ef1f4753ca4be9a3bf76274b929cdeb', 27)
>>> self.get_values('b2fa', '05fe634ca4c8386349ac519f899145c75fff4169')
('1410029988',
 'Audris Mockus <audris@utk.edu>',
 'e4af89166a17785c1d741b8b1d5775f3223f510f')
async def get_values_many( self, map_name: str, keys: List[Union[bytes, str]], progress: bool = False) -> Tuple[Dict[str, Union[List[str], Tuple[str], List[Tuple[str]]]], Dict[str, str]]:
186    async def get_values_many(
187        self, map_name: str, keys: List[Union[bytes, str]], progress: bool = False
188    ) -> Tuple[Dict[str, Union[List[str], Tuple[str], List[Tuple[str]]]], Dict[str, str]]:
189        """
190        Eqivalent to getValues in WoC Perl API but fetch multiple keys at once.
191
192        Similar to get_values, this function uses WoC batch API to fetch multiple keys at once.
193        It returns a tuple of two dictionaries: [results, errors].
194        """
195        return await self._get_many(f"/lookup/map/{map_name}", keys, progress)

Eqivalent to getValues in WoC Perl API but fetch multiple keys at once.

Similar to get_values, this function uses WoC batch API to fetch multiple keys at once. It returns a tuple of two dictionaries: [results, errors].

async def iter_values( self, map_name: str, key: Union[bytes, str]) -> AsyncGenerator[List[str], NoneType]:
197    async def iter_values(
198        self, map_name: str, key: Union[bytes, str]
199    ) -> AsyncGenerator[List[str], None]:
200        cursor = 0
201        while cursor is not None:
202            r = await fetch_with_rate_limit(
203                f"/lookup/map/{map_name}/{key}?cursor={cursor}",
204                self.client,
205                self.rate_limiter,
206            )
207            cursor = r["nextCursor"] if "nextCursor" in r else None
208            yield r["data"]

Similar to get_values, but returns a generator instead of a list. This is useful when querying large maps (on_large='all').

Parameters
  • map_name: The name of the map, e.g. 'c2p', 'c2r', 'P2c'
  • key: The key of the object. For git objects, it is the SHA-1 hash of the object (in bytes or hex string). For other objects like Author, it is the name of the object.
Returns

The value of the object. Can be a list of strings, a tuple of strings, or a list of tuples of strings. Please refer to the documentation for details.

>>> list(self.iter_values('P2c', 'user2589_minicms'))
['05cf84081b63cda822ee407e688269b494a642de', ...]
async def show_content( self, obj_name: str, key: Union[bytes, str]) -> Union[List[Tuple[str, str, str]], str, Tuple[str, Tuple[str, str, str], Tuple[str, str, str], str]]:
210    async def show_content(
211        self, obj_name: str, key: Union[bytes, str]
212    ) -> Union[
213        List[Tuple[str, str, str]],
214        str,
215        Tuple[str, Tuple[str, str, str], Tuple[str, str, str], str],
216    ]:
217        r = await fetch_with_rate_limit(
218            f"/lookup/object/{obj_name}/{key}",
219            self.client,
220            self.rate_limiter,
221        )
222        return r["data"]

Eqivalent to showCnt in WoC Perl API.

Parameters
  • obj_name: The name of the object, e.g. 'blob', 'tree', 'commit'
  • key: The key of the object. It is the SHA-1 hash of the object (in bytes or hex string).
Returns

The content of the object. Can be a list of tuples of strings, a string, or a tuple of strings.

>>> self.show_content('blob', '05fe634ca4c8386349ac519f899145c75fff4169')
'This is the content of the blob'
Eqivalent to showCnt in WoC perl API
>>> self.show_content('tree', '7a374e58c5b9dec5f7508391246c48b73c40d200')
[('100644', '.gitignore', '8e9e1...'), ...]
>>> self.show_content('commit', 'e4af89166a17785c1d741b8b1d5775f3223f510f')
('f1b66dcca490b5c4455af319bc961a34f69c72c2',
 ('c19ff598808b181f1ab2383ff0214520cb3ec659',),
 ('Audris Mockus <audris@utk.edu> 1410029988', '1410029988', '-0400'),
 ('Audris Mockus <audris@utk.edu>', '1410029988', '-0400'),
'News for Sep 5, 2014\n')
async def show_content_many( self, obj_name: str, keys: List[Union[bytes, str]], progress: bool = False) -> Tuple[Dict[str, Union[List[Tuple[str, str, str]], str, Tuple[str, Tuple[str, str, str], Tuple[str, str, str], str]]], Dict[str, str]]:
224    async def show_content_many(
225        self, obj_name: str, keys: List[Union[bytes, str]], progress: bool = False
226    ) -> Tuple[
227        Dict[
228            str,
229            Union[
230                List[Tuple[str, str, str]],
231                str,
232                Tuple[str, Tuple[str, str, str], Tuple[str, str, str], str],
233            ],
234        ],
235        Dict[str, str],
236    ]:
237        """
238        Eqivalent to showCnt in WoC Perl API but fetch multiple keys at once.
239
240        Similar to show_content, this function uses WoC batch API to fetch multiple keys at once.
241        It returns a tuple of two dictionaries: [results, errors].
242        """
243        return await self._get_many(f"/lookup/object/{obj_name}", keys, progress)

Eqivalent to showCnt in WoC Perl API but fetch multiple keys at once.

Similar to show_content, this function uses WoC batch API to fetch multiple keys at once. It returns a tuple of two dictionaries: [results, errors].

async def count(self, map_name: str) -> int:
245    async def count(self, map_name: str) -> int:
246        if "map_name" in ("blob", "commit", "tag", "tree"):
247            r = await fetch_with_rate_limit(
248                f"/lookup/object/{map_name}/count",
249                self.client,
250                self.rate_limiter,
251            )
252            return r["data"]
253        else:
254            r = await fetch_with_rate_limit(
255                f"/lookup/map/{map_name}/count",
256                self.client,
257                self.rate_limiter,
258            )
259            return r["data"]

Count the number of keys in a map.

Parameters
  • map_name: The name of the mapping / object, e.g. 'c2p', 'c2r', 'commit'.
Returns

The number of keys in the tch databases plus the number of large files.

>>> self.count('c2r')
12345
async def all_keys(self, map_name: str) -> Generator[bytes, NoneType, NoneType]:
261    async def all_keys(self, map_name: str) -> Generator[bytes, None, None]:
262        raise NotImplementedError(
263            "all_keys is not implemented in WoC HTTP API. "
264            "If you feel it is necessary, please create a feature request at "
265            "https://github.com/ssc-oscar/python-woc/issues/new"
266        )

Iterate over all keys in a map.

Parameters
  • map_name: The name of the mapping / object, e.g. 'c2p', 'c2r', 'commit'. When on_large is 'ignore', keys in large maps are excluded.
Returns

A generator of keys in the map.

>>> for key in self.iter_map('P2c'):
...     print(key)  # hash or encoded string
class WocMapsRemote(WocMapsRemoteAsync):
274class WocMapsRemote(WocMapsRemoteAsync):
275    def __init__(self, *args, **kwargs):
276        super().__init__(*args, **kwargs)
277        # run the event loop on a background thread
278        # so it works in jupyter notebook
279        # ref: https://stackoverflow.com/questions/52232177/runtimeerror-timeout-context-manager-should-be-used-inside-a-task/69514930#69514930
280        self._loop = asyncio.new_event_loop()
281        _t = threading.Thread(
282            target=_start_background_loop, args=(self._loop,), daemon=True
283        )
284        _t.start()
285
286    def _asyncio_run(self, coro: Awaitable, timeout=30):
287        """
288        Runs the coroutine in an event loop running on a background thread, and blocks the current thread until it returns a result. This plays well with gevent, since it can yield on the Future result call.
289
290        :param coro: A coroutine, typically an async method
291        :param timeout: How many seconds we should wait for a result before raising an error
292        """
293        return asyncio.run_coroutine_threadsafe(coro, self._loop).result(timeout=timeout)
294
295    def _asyncio_gather(self, *futures, return_exceptions=False) -> list:
296        """
297        A version of asyncio.gather that runs on the internal event loop
298        """
299
300        async def gather():
301            return await asyncio.gather(*futures, return_exceptions=return_exceptions)
302
303        return asyncio.run_coroutine_threadsafe(gather(), loop=self._loop).result()
304
305    def get_values(self, map_name: str, key: str) -> List[str]:
306        return self._asyncio_run(super().get_values(map_name, key))
307
308    def get_values_many(
309        self, map_name: str, keys: List[str], progress: bool = False
310    ) -> Tuple[Dict[str, List[str]], Dict[str, str]]:
311        return self._asyncio_run(super().get_values_many(map_name, keys, progress))
312
313    def iter_values(self, map_name: str, key: str) -> Generator[List[str], None, None]:
314        async_gen = super().iter_values(map_name, key)
315        while True:
316            try:
317                for i in self._asyncio_run(async_gen.__anext__()):
318                    yield i
319            except StopAsyncIteration:
320                break
321
322    def show_content(self, obj_name: str, key: str) -> List[Tuple[str, str, str]]:
323        return self._asyncio_run(super().show_content(obj_name, key))
324
325    def show_content_many(
326        self, obj_name: str, keys: List[str], progress: bool = False
327    ) -> Tuple[Dict[str, List[Tuple[str, str, str]]], Dict[str, str]]:
328        return self._asyncio_run(super().show_content_many(obj_name, keys, progress))
329
330    def count(self, map_name: str) -> int:
331        return self._asyncio_run(super().count(map_name))
332
333    def all_keys(self, map_name: str) -> Generator[bytes, None, None]:
334        return self._asyncio_run(super().all_keys(map_name))
335
336    @cached_property
337    def maps(self) -> List[WocMap]:
338        return self._asyncio_run(super().get_maps())
339
340    @cached_property
341    def objects(self) -> List[WocObject]:
342        return self._asyncio_run(super().get_objects())
WocMapsRemote(*args, **kwargs)
275    def __init__(self, *args, **kwargs):
276        super().__init__(*args, **kwargs)
277        # run the event loop on a background thread
278        # so it works in jupyter notebook
279        # ref: https://stackoverflow.com/questions/52232177/runtimeerror-timeout-context-manager-should-be-used-inside-a-task/69514930#69514930
280        self._loop = asyncio.new_event_loop()
281        _t = threading.Thread(
282            target=_start_background_loop, args=(self._loop,), daemon=True
283        )
284        _t.start()
def get_values(self, map_name: str, key: str) -> List[str]:
305    def get_values(self, map_name: str, key: str) -> List[str]:
306        return self._asyncio_run(super().get_values(map_name, key))

Eqivalent to getValues in WoC Perl API.

Parameters
  • map_name: The name of the map, e.g. 'c2p', 'c2r', 'P2c'
  • key: The key of the object. For git objects, it is the SHA-1 hash of the object (in bytes or hex string). For other objects like Author, it is the name of the object.
Returns

The value of the object. Can be a list of strings, a tuple of strings, or a list of tuples of strings. Please refer to the documentation for details.

>>> self.get_values('P2c', 'user2589_minicms')
['05cf84081b63cda822ee407e688269b494a642de', ...]
>>> self.get_values('c2r', 'e4af89166a17785c1d741b8b1d5775f3223f510f')
('9531fc286ef1f4753ca4be9a3bf76274b929cdeb', 27)
>>> self.get_values('b2fa', '05fe634ca4c8386349ac519f899145c75fff4169')
('1410029988',
 'Audris Mockus <audris@utk.edu>',
 'e4af89166a17785c1d741b8b1d5775f3223f510f')
def get_values_many( self, map_name: str, keys: List[str], progress: bool = False) -> Tuple[Dict[str, List[str]], Dict[str, str]]:
308    def get_values_many(
309        self, map_name: str, keys: List[str], progress: bool = False
310    ) -> Tuple[Dict[str, List[str]], Dict[str, str]]:
311        return self._asyncio_run(super().get_values_many(map_name, keys, progress))

Eqivalent to getValues in WoC Perl API but fetch multiple keys at once.

Similar to get_values, this function uses WoC batch API to fetch multiple keys at once. It returns a tuple of two dictionaries: [results, errors].

def iter_values( self, map_name: str, key: str) -> Generator[List[str], NoneType, NoneType]:
313    def iter_values(self, map_name: str, key: str) -> Generator[List[str], None, None]:
314        async_gen = super().iter_values(map_name, key)
315        while True:
316            try:
317                for i in self._asyncio_run(async_gen.__anext__()):
318                    yield i
319            except StopAsyncIteration:
320                break

Similar to get_values, but returns a generator instead of a list. This is useful when querying large maps (on_large='all').

Parameters
  • map_name: The name of the map, e.g. 'c2p', 'c2r', 'P2c'
  • key: The key of the object. For git objects, it is the SHA-1 hash of the object (in bytes or hex string). For other objects like Author, it is the name of the object.
Returns

The value of the object. Can be a list of strings, a tuple of strings, or a list of tuples of strings. Please refer to the documentation for details.

>>> list(self.iter_values('P2c', 'user2589_minicms'))
['05cf84081b63cda822ee407e688269b494a642de', ...]
def show_content(self, obj_name: str, key: str) -> List[Tuple[str, str, str]]:
322    def show_content(self, obj_name: str, key: str) -> List[Tuple[str, str, str]]:
323        return self._asyncio_run(super().show_content(obj_name, key))

Eqivalent to showCnt in WoC Perl API.

Parameters
  • obj_name: The name of the object, e.g. 'blob', 'tree', 'commit'
  • key: The key of the object. It is the SHA-1 hash of the object (in bytes or hex string).
Returns

The content of the object. Can be a list of tuples of strings, a string, or a tuple of strings.

>>> self.show_content('blob', '05fe634ca4c8386349ac519f899145c75fff4169')
'This is the content of the blob'
Eqivalent to showCnt in WoC perl API
>>> self.show_content('tree', '7a374e58c5b9dec5f7508391246c48b73c40d200')
[('100644', '.gitignore', '8e9e1...'), ...]
>>> self.show_content('commit', 'e4af89166a17785c1d741b8b1d5775f3223f510f')
('f1b66dcca490b5c4455af319bc961a34f69c72c2',
 ('c19ff598808b181f1ab2383ff0214520cb3ec659',),
 ('Audris Mockus <audris@utk.edu> 1410029988', '1410029988', '-0400'),
 ('Audris Mockus <audris@utk.edu>', '1410029988', '-0400'),
'News for Sep 5, 2014\n')
def show_content_many( self, obj_name: str, keys: List[str], progress: bool = False) -> Tuple[Dict[str, List[Tuple[str, str, str]]], Dict[str, str]]:
325    def show_content_many(
326        self, obj_name: str, keys: List[str], progress: bool = False
327    ) -> Tuple[Dict[str, List[Tuple[str, str, str]]], Dict[str, str]]:
328        return self._asyncio_run(super().show_content_many(obj_name, keys, progress))

Eqivalent to showCnt in WoC Perl API but fetch multiple keys at once.

Similar to show_content, this function uses WoC batch API to fetch multiple keys at once. It returns a tuple of two dictionaries: [results, errors].

def count(self, map_name: str) -> int:
330    def count(self, map_name: str) -> int:
331        return self._asyncio_run(super().count(map_name))

Count the number of keys in a map.

Parameters
  • map_name: The name of the mapping / object, e.g. 'c2p', 'c2r', 'commit'.
Returns

The number of keys in the tch databases plus the number of large files.

>>> self.count('c2r')
12345
def all_keys(self, map_name: str) -> Generator[bytes, NoneType, NoneType]:
333    def all_keys(self, map_name: str) -> Generator[bytes, None, None]:
334        return self._asyncio_run(super().all_keys(map_name))

Iterate over all keys in a map.

Parameters
  • map_name: The name of the mapping / object, e.g. 'c2p', 'c2r', 'commit'. When on_large is 'ignore', keys in large maps are excluded.
Returns

A generator of keys in the map.

>>> for key in self.iter_map('P2c'):
...     print(key)  # hash or encoded string
maps: List[woc.base.WocMap]
336    @cached_property
337    def maps(self) -> List[WocMap]:
338        return self._asyncio_run(super().get_maps())

List of basemaps available in the WoC database.

objects: List[woc.base.WocObject]
340    @cached_property
341    def objects(self) -> List[WocObject]:
342        return self._asyncio_run(super().get_objects())

List of objects available in the WoC database.