woc.objects

  1import difflib
  2import os
  3import re
  4import warnings
  5from datetime import datetime, timedelta, timezone
  6from functools import cached_property, lru_cache
  7from logging import getLogger
  8from typing import Dict, Generator, List, Optional, Set, Tuple, Union
  9
 10from .base import WocMapsBase
 11from .local import fnvhash
 12
 13_global_woc: Optional[WocMapsBase] = None
 14_logger = getLogger(__name__)
 15_DAY_Z = datetime.fromtimestamp(0, tz=None)
 16
 17
 18def init_woc_objects(woc: WocMapsBase):
 19    """
 20    Stores wocMaps object globally so you don't have to pass it around.
 21
 22    :param woc: a wocMaps object.
 23    """
 24    global _global_woc
 25    _global_woc = woc
 26
 27
 28@lru_cache(maxsize=None)
 29def parse_timezone_offset(offset_str: str) -> timezone:
 30    """
 31    Parse a timezone offset string in the format '+HHMM' or '-HHMM' into a timezone object.
 32
 33    >>> parse_timezone_offset('+0530')
 34    timezone(timedelta(seconds=19800))
 35    """
 36    match = re.match(r"([+-])(\d{2})(\d{2})", offset_str)
 37    if not match:
 38        raise ValueError(f"Invalid timezone offset format: {offset_str}")
 39    sign, hours, minutes = match.groups()
 40    hours, minutes = int(hours), int(minutes)
 41    offset = timedelta(hours=hours, minutes=minutes)
 42
 43    if sign == "-":
 44        offset = -offset
 45
 46    return timezone(offset)
 47
 48
 49class _WocObject:
 50    _ident: str
 51    """Identifier of the object"""
 52    woc: WocMapsBase
 53    """WocMap instance"""
 54    key: str
 55    """Key of the object"""
 56
 57    def __init__(
 58        self,
 59        *args,
 60        woc: Optional[WocMapsBase] = None,
 61        **kwargs,
 62    ):
 63        self.woc = woc or _global_woc
 64        assert (
 65            self.woc is not None
 66        ), "WocMaps not initialized: call init_woc_objects() or supply a woc keyword argument"
 67
 68    def __repr__(self) -> str:
 69        return f"{self.__class__.__name__}({self.key})"
 70
 71    def __str__(self) -> str:
 72        return self.key
 73
 74    def __eq__(self, value: object) -> bool:
 75        if not isinstance(value, self.__class__):
 76            return False
 77        return self.key == value.key
 78
 79    @property
 80    def hash(self) -> str:
 81        return hex(hash(self))[2:]
 82
 83    def _get_list_values(self, map_name: str):
 84        """A thin wrapper around WocMapsBase.get_values to handle KeyError"""
 85        try:
 86            return self.woc.get_values(map_name, self.key)
 87        except KeyError:
 88            return []
 89
 90
 91class _GitObject(_WocObject):
 92    """Base class for SHA1-indexed Git objects (commit, tree, blob)"""
 93
 94    def __init__(
 95        self,
 96        key: str,
 97        *args,
 98        woc: Optional[WocMapsBase] = None,
 99        **kwargs,
100    ):
101        super().__init__(*args, woc=woc, **kwargs)
102        assert len(key) == 40, "SHA1 hash must be 40 characters long"
103        self.key = key
104
105    @cached_property
106    def data(self):
107        obj = self.__class__.__name__.lower()
108        return self.woc.show_content(obj, self.key)
109
110    def __hash__(self):
111        return int(self.key, 16)
112
113    @property
114    def hash(self) -> str:
115        return self.key
116
117
118class _NamedObject(_WocObject):
119    """Base class for objects indexed by a string key"""
120
121    def __init__(
122        self,
123        key: str,
124        *args,
125        woc: Optional[WocMapsBase] = None,
126        **kwargs,
127    ):
128        super().__init__(*args, woc=woc, **kwargs)
129        self.key = key
130
131    def __hash__(self):
132        return fnvhash(self.key.encode())
133
134
135class Author(_NamedObject):
136    _ident = "a"
137
138    @cached_property
139    def _username_email(self) -> Tuple[str, str]:
140        _splited = self.key.split(" <", 1)
141        if len(_splited) == 1:
142            return _splited[0], ""
143        return _splited[0], _splited[1][:-1]
144
145    @property
146    def name(self) -> str:
147        return self._username_email[0]
148
149    @property
150    def email(self) -> str:
151        return self._username_email[1]
152
153    @cached_property
154    def blobs(self) -> "List[Blob]":
155        return [Blob(b) for b in self._get_list_values(f"{self._ident}2b")]
156
157    @cached_property
158    def commits(self) -> "List[Commit]":
159        return [Commit(c) for c in self._get_list_values(f"{self._ident}2c")]
160
161    @cached_property
162    def files(self) -> "List[File]":
163        return [File(f) for f in self._get_list_values(f"{self._ident}2f")]
164
165    @cached_property
166    def projects(self) -> "List[Project]":
167        return [Project(p) for p in self._get_list_values(f"{self._ident}2p")]
168
169    @cached_property
170    def unique_authors(self) -> List["UniqueAuthor"]:
171        return [UniqueAuthor(a) for a in self._get_list_values(f"{self._ident}2A")]
172
173    @property
174    def authors(self):
175        raise NotImplementedError("Author object does not have authors method")
176
177    @property
178    def aliases(self) -> List["Author"]:
179        _unique_authors = self.unique_authors
180        if len(_unique_authors) == 0:
181            return []
182        return _unique_authors[0].authors
183
184    @cached_property
185    def first_blobs(self) -> List["Blob"]:
186        return [Blob(b) for b in self._get_list_values(f"{self._ident}2fb")]
187
188
189class UniqueAuthor(Author):
190    _ident = "A"
191
192    @property
193    def unique_authors(self) -> "List[Author]":
194        raise NotImplementedError(
195            "UniqueAuthor object does not have unique_authors method"
196        )
197
198    @cached_property
199    def authors(self) -> "List[Author]":
200        return [Author(a) for a in self._get_list_values(f"{self._ident}2a")]
201
202
203class Blob(_GitObject):
204    _ident = "b"
205
206    @cached_property
207    def _pos(self) -> Tuple[int, int]:
208        return self.woc._get_pos("blob", self.key)
209
210    def __len__(self) -> int:
211        return self._pos[1]
212
213    def __str__(self) -> str:
214        return self.data
215
216    @cached_property
217    def commits(self) -> "List[Commit]":
218        return [Commit(sha) for sha in self._get_list_values("b2c")]
219
220    @cached_property
221    def first_author(self) -> "Tuple[datetime, Author, Commit]":
222        """
223        Returns the timestamp, author, and commit of the first author.
224
225        >>> woc.get_values('b2fa', '05fe634ca4c8386349ac519f899145c75fff4169'))
226        (datetime.datetime(2014, 9, 7, 2, 59, 48), Author(Audris Mockus <audris@utk.edu>), Commit(e4af89166a17785c1d741b8b1d5775f3223f510f))
227        """
228        _out = self.woc.get_values("b2fa", self.key)
229        _date = datetime.fromtimestamp(int(_out[0]))
230        _author = Author(_out[1])
231        _commit = Commit(_out[2])
232        return _date, _author, _commit
233
234    @cached_property
235    def time_author_commits(self) -> "List[Tuple[datetime, Author, Commit]]":
236        _out = self._get_list_values("b2tac")
237        return [
238            (datetime.fromtimestamp(int(d[0])), Author(d[1]), Commit(d[2])) for d in _out
239        ]
240
241    @cached_property
242    def files(self) -> "List[File]":
243        return [File(f) for f in self._get_list_values("b2f")]
244
245    @cached_property
246    def projects_unique(self) -> "List[RootProject]":
247        return [RootProject(p) for p in self._get_list_values("b2P")]
248
249    @cached_property
250    def changed_from(self) -> "List[Tuple[Blob, Commit, File]]":
251        return [
252            (Blob(b), Commit(c), File(f)) for b, c, f in self._get_list_values("bb2cf")
253        ]
254
255    @cached_property
256    def changed_to(self) -> "List[Tuple[Blob, Commit, File]]":
257        return [
258            (Blob(b), Commit(c), File(f)) for b, c, f in self._get_list_values("obb2cf")
259        ]
260
261
262class Commit(_GitObject):
263    _ident = "c"
264
265    @cached_property
266    def data_obj(self):
267        _ret = {}
268        (
269            _ret["tree"],
270            _ret["parent"],
271            (_ret["author"], _ret["author_timestamp"], _ret["author_timezone"]),
272            (_ret["committer"], _ret["committer_timestamp"], _ret["committer_timezone"]),
273            _ret["message"],
274        ) = self.data
275        return _ret
276
277    @property
278    def author(self) -> Author:
279        return Author(self.data_obj["author"])
280
281    @property
282    def authored_at(self) -> datetime:
283        tz = parse_timezone_offset(self.data_obj["author_timezone"])
284        return datetime.fromtimestamp(int(self.data_obj["author_timestamp"]), tz)
285
286    @property
287    def committer(self) -> Author:
288        return Author(self.data_obj["committer"])
289
290    @property
291    def committed_at(self) -> datetime:
292        tz = parse_timezone_offset(self.data_obj["committer_timezone"])
293        return datetime.fromtimestamp(int(self.data_obj["committer_timestamp"]), tz)
294
295    @property
296    def full_message(self) -> str:
297        """Full message of the commit"""
298        return self.data_obj["message"]
299
300    @property
301    def message(self) -> str:
302        """Short message of the commit"""
303        return self.data_obj["message"].split("\n", 1)[0]
304
305    @cached_property
306    def tree(self) -> "Tree":
307        return Tree(self.data_obj["tree"])
308
309    @property
310    def _parent_shas(self) -> List[str]:
311        return self.data_obj["parent"]
312
313    @property
314    def parents(self) -> List["Commit"]:
315        """Parent commits of this commit"""
316        return [Commit(p) for p in self.data_obj["parent"]]
317
318    @cached_property
319    def projects(self) -> List["Project"]:
320        """Projects associated with this commit"""
321        return [Project(p) for p in self._get_list_values("c2p")]
322
323    @cached_property
324    def root_projects(self) -> List["RootProject"]:
325        """Root projects associated with this commit"""
326        return [RootProject(p) for p in self._get_list_values("c2P")]
327
328    @cached_property
329    def children(self) -> List["Commit"]:
330        """Children of this commit"""
331        return [Commit(c) for c in self._get_list_values("c2cc")]
332
333    @cached_property
334    def _file_names(self) -> List[str]:
335        return self._get_list_values("c2f")
336
337    @cached_property
338    def _file_set(self) -> Set[str]:
339        return set(self._file_names)
340
341    @cached_property
342    def files(self) -> List["File"]:
343        """Files changed in this commit"""
344        return [File(f) for f in self._file_names]
345
346    @cached_property
347    def _blob_shas(self) -> List[str]:
348        return self._get_list_values("c2b")
349
350    @cached_property
351    def _blob_set(self) -> Set[str]:
352        return set(self._blob_shas)
353
354    @cached_property
355    def blobs(self) -> List["Blob"]:
356        """
357        Blobs changed in this commit.
358
359        This relation is known to miss every first file in all trees.
360        Consider using Commit.tree.blobs as a slower but more accurate
361        alternative.
362        """
363        return [Blob(b) for b in self._get_list_values("c2b")]
364
365    @cached_property
366    def time_author(self) -> Tuple[datetime, Author]:
367        """Timestamp and author of the commit"""
368        res = self.woc.get_values("c2ta", self.key)
369        return datetime.fromtimestamp(int(res[0])), Author(res[1])
370
371    @cached_property
372    def root(self) -> "Tuple[Commit, int]":
373        """Root commit of the project"""
374        sha, dis = self.woc.get_values("c2r", self.key)
375        return Commit(sha), int(dis)
376
377    @cached_property
378    def changeset(self) -> "List[Tuple[File, Blob, Blob]]":
379        """Returns changed files, their new and old blobs"""
380        return [
381            (File(f), Blob(new), Blob(old))
382            for f, new, old in self._get_list_values("c2fbb")
383        ]
384
385    def compare(
386        self, parent: Union["Commit", str], threshold=0.5
387    ) -> Generator[
388        Tuple[Optional["File"], Optional["File"], Optional["Blob"], Optional["Blob"]],
389        None,
390        None,
391    ]:
392        """
393        Compare two Commits.
394
395        :param parent: another commit to compare to.
396                Expected order is `diff = child_commit - parent_commit`
397
398        :return: a generator of 4-tuples `(old_path, new_path, old_sha, new_sha)`
399
400        Examples:
401        - a new file 'setup.py' was created:
402            `(None, 'setup.py', None, 'file_sha')`
403        - an existing 'setup.py' was deleted:
404            `('setup.py', None, 'old_file_sha', None)`
405        - setup.py.old was renamed to setup.py, content unchanged:
406            `('setup.py.old', 'setup.py', 'file_sha', 'file_sha')`
407        - setup.py was edited:
408            `('setup.py', 'setup.py', 'old_file_sha', 'new_file_sha')`
409        - setup.py.old was edited and renamed to setup.py:
410            `('setup.py.old', 'setup.py', 'old_file_sha', 'new_file_sha')`
411
412        Detecting the last one is computationally expensive. You can adjust this
413        behaviour by passing the `threshold` parameter, which is 0.5 by default.
414        It means that if roughly 50% of the file content is the same,
415        it is considered a match. `threshold=1` means that only exact
416        matches are considered, effectively disabling this comparison.
417        If threshold is set to 0, any pair of deleted and added file will be
418        considered renamed and edited; this last case doesn't make much sense so
419        don't set it too low.
420        """
421        if isinstance(parent, str):
422            parent = Commit(parent)
423        if not isinstance(parent, Commit):
424            raise TypeError("parent must be a Commit or a commit hash")
425
426        # # filename: (blob sha before, blob sha after)
427        # new_files = self.tree._file_blob_map
428        # new_paths = self.tree._file_set
429        # old_files = parent.tree._file_blob_map
430        # old_paths = parent.tree._file_set
431
432        # !!! We really need to traverse the trees ###
433        new_files: Dict[File, Blob] = {}
434        for f, b in self.tree.traverse():
435            new_files[f] = b
436        old_files: Dict[File, Blob] = {}
437        for f, b in parent.tree.traverse():
438            old_files[f] = b
439
440        # unchanged_paths
441        for f in new_files.keys() & old_files.keys():
442            if new_files[f] != old_files[f]:
443                # i.e. Blob sha Changed!
444                yield f, f, old_files[f], new_files[f]
445
446        added_paths: Set[File] = new_files.keys() - old_files.keys()
447        deleted_paths: Set[File] = old_files.keys() - new_files.keys()
448
449        if threshold >= 1:  # i.e. only exact matches are considered
450            for f in added_paths:  # add
451                yield None, f, None, new_files[f]
452            for f in deleted_paths:
453                yield f, None, old_files[f], None
454            return
455
456        if parent.hash not in self._parent_shas:
457            warnings.warn(
458                "Comparing non-adjacent commits might be "
459                "computationally expensive. Proceed with caution."
460            )
461
462        # search for matches
463        sm = difflib.SequenceMatcher()
464        # for each added blob, try to find a match in deleted blobs
465        #   if there is a match, signal a rename and remove from deleted
466        #   if there is no match, signal a new file
467        # unused deleted blobs are indeed deleted
468        for added_file, added_blob in new_files.items():
469            sm.set_seq1(added_blob.data)
470            matched = False
471            for deleted_file, deleted_blob in old_files.items():
472                sm.set_seq2(deleted_blob.data)
473                # use quick checks first (lower bound by length diff)
474                if (
475                    sm.real_quick_ratio() > threshold
476                    and sm.quick_ratio() > threshold
477                    and sm.ratio() > threshold
478                ):
479                    yield deleted_file, added_file, deleted_blob, added_blob
480                    del old_files[deleted_file]
481                    matched = True
482                    break
483            if not matched:  # this is a new file
484                yield None, added_file, None, added_blob
485
486        for deleted_file, deleted_blob in old_files.items():
487            yield deleted_file, None, deleted_blob, None
488
489    def __sub__(self, parent: "Commit"):
490        return self.compare(parent)
491
492
493class File(_NamedObject):
494    _ident = "f"
495
496    @property
497    def path(self) -> str:
498        return self.key
499
500    @property
501    def name(self) -> str:
502        return self.key.split("/")[-1]
503
504    @cached_property
505    def authors(self) -> List[Author]:
506        return [Author(a) for a in self._get_list_values("f2a")]
507
508    @cached_property
509    def blobs(self) -> List[Blob]:
510        return [Blob(b) for b in self._get_list_values("f2b")]
511
512    @cached_property
513    def commits(self) -> List[Commit]:
514        return [Commit(c) for c in self._get_list_values("f2c")]
515
516
517class Tree(_GitObject):
518    _ident = "t"
519
520    @cached_property
521    def data(self) -> str:
522        return self.woc.show_content("tree", self.key)
523
524    @property
525    def _file_names(self) -> List[str]:
526        return [l[1] for l in self.data]
527
528    @cached_property
529    def _file_set(self) -> Set[str]:
530        return {l[1] for l in self.data}
531
532    @property
533    def files(self) -> List["File"]:
534        return [File(f) for f in self._file_names]
535
536    @property
537    def _blob_shas(self) -> List[str]:
538        return [l[2] for l in self.data]
539
540    @cached_property
541    def _blob_set(self) -> Set[str]:
542        return {l[2] for l in self.data}
543
544    @property
545    def blobs(self) -> List["Blob"]:
546        return [Blob(b) for b in self._blob_shas]
547
548    @cached_property
549    def _file_blob_map(self) -> Dict[str, str]:
550        return {l[1]: l[2] for l in self.data}
551
552    def _traverse(self) -> "Generator[Tuple[str, str], None, None]":
553        for mode, fname, sha in self.data:
554            # trees are always 40000:
555            # https://stackoverflow.com/questions/1071241
556            if mode != "40000":
557                yield fname, sha
558            else:
559                _logger.debug(f"traverse: into {fname} ({sha})")
560                for _fname, _sha in Tree(sha)._traverse():
561                    yield fname + "/" + _fname, _sha
562
563    def traverse(self) -> "Generator[Tuple[File, Blob], None, None]":
564        for fname, sha in self._traverse():
565            yield File(fname), Blob(sha)
566
567    def __contains__(self, item: Union[str, File, Blob]) -> bool:
568        if isinstance(item, str):
569            return item in self._file_names or item in self._blob_shas
570        if isinstance(item, File):
571            return item.text in self._file_names
572        if isinstance(item, Blob):
573            return item.hex in self._blob_shas
574        return False
575
576    def __str__(self) -> str:
577        return "\n".join([" ".join(l) for l in self.data])
578
579    def __len__(self) -> int:
580        return len(self.data)
581
582    def __iter__(self) -> "Generator[Tuple[File, Blob], None, None]":
583        for l in self.data:
584            yield File(l[1]), Blob(l[2])
585
586
587class Project(_NamedObject):
588    _ident = "p"
589
590    @cached_property
591    def _platform_repo(self) -> str:
592        URL_PREFIXES = self.woc.config["sites"]
593        prefix, body = self.key.split("_", 1)
594        if prefix == "sourceforge.net":
595            platform = URL_PREFIXES[prefix]
596        elif prefix in URL_PREFIXES and "_" in body:
597            platform = URL_PREFIXES[prefix]
598            body = body.replace("_", "/", 1)
599        elif "." in prefix:
600            platform = prefix
601            body = body.replace("_", "/", 1)
602        else:
603            platform = "github.com"
604            body = self.key.replace("_", "/", 1)
605        return platform, body
606
607    @property
608    def url(self) -> str:
609        """
610        Get the URL for a given project URI.
611
612        >>> Project('CS340-19_lectures').url
613        'http://github.com/CS340-19/lectures'
614        """
615        platform, body = self._platform_repo
616        URL_PREFIXES = self.woc.config["sites"]
617        if platform in URL_PREFIXES:
618            return f"https://{URL_PREFIXES[platform]}/{body}"
619        return f"https://{platform}/{body}"
620
621    @cached_property
622    def authors(self) -> "List[Author]":
623        return [Author(a) for a in self._get_list_values(f"{self._ident}2a")]
624
625    @cached_property
626    def _commit_shas(self) -> "List[str]":
627        return self._get_list_values(f"{self._ident}2c")
628
629    @cached_property
630    def _commit_set(self) -> "Set[str]":
631        return self._commit_map.keys()
632
633    @cached_property
634    def _commit_map(self) -> "Dict[str, Commit]":
635        return {c.hash: c for c in self.commits}
636
637    @cached_property
638    def commits(self) -> "List[Commit]":
639        return [Commit(c) for c in self._commit_shas]
640
641    @cached_property
642    def root_projects(self) -> "List[RootProject]":
643        return [RootProject(p) for p in self._get_list_values(f"{self._ident}2P")]
644
645    def __contains__(self, item: Union[str, Commit]) -> bool:
646        if isinstance(item, str):
647            return item in self._commit_set
648        elif isinstance(item, Commit):
649            return item.hash in self._commit_set
650        return False
651
652    @cached_property
653    def head(self) -> "Commit":
654        """
655        Get the HEAD commit of the repository.
656
657        >>> Project('user2589_minicms').head
658        Commit(f2a7fcdc51450ab03cb364415f14e634fa69b62c)
659        >>> Project('RoseTHERESA_SimpleCMS').head
660        Commit(a47afa002ccfd3e23920f323b172f78c5c970250)
661        """
662        # Sometimes (very rarely) commit dates are wrong, so the latest commit
663        # is not actually the head. The magic below is to account for this
664        parents = set().union(*(c._parent_shas for c in self.commits))
665        heads = [self._commit_map[c] for c in self._commit_set - parents]
666
667        # it is possible that there is more than one head.
668        # E.g. it happens when HEAD is moved manually (git reset)
669        # and continued with a separate chain of commits.
670        # in this case, let's just use the latest one
671        # actually, storing refs would make it much simpler
672        _heads_sorted = sorted(heads, key=lambda c: c.authored_at or _DAY_Z, reverse=True)
673        if len(_heads_sorted) == 0:
674            raise ValueError("No head commit found")
675        return _heads_sorted[0]
676
677    @cached_property
678    def tail(self) -> "Commit":
679        """
680        Get the first commit SHA by following first parents.
681
682        >>> Project(b'user2589_minicms').tail
683        Commit(1e971a073f40d74a1e72e07c682e1cba0bae159b)
684        """
685        pts = {c._parent_shas[0] for c in self.commits if c._parent_shas}
686        for c in self.commits:
687            if c.hash in pts and not c._parent_shas:
688                return c
689
690    @cached_property
691    def earliest_commit(self) -> "Commit":
692        """Get the earliest commit of the repository"""
693        return min(self.commits, key=lambda c: c.authored_at or _DAY_Z)
694
695    @cached_property
696    def latest_commit(self) -> "Commit":
697        """Get the latest commit of the repository"""
698        return max(self.commits, key=lambda c: c.authored_at or _DAY_Z)
699
700    def commits_fp(self) -> Generator["Commit", None, None]:
701        """
702        Get a commit chain by following only the first parent.
703
704        Mimic https://git-scm.com/docs/git-log#git-log---first-parent.
705        Thus, you only get a small subset of the full commit tree.
706
707        >>> p = Project(b'user2589_minicms')
708        >>> set(c.sha for c in p.commits_fp).issubset(p.commit_shas)
709        True
710
711        In scenarios where branches are not important, it can save a lot
712        of computing.
713
714        Yields:
715            Commit: binary commit shas, following first parent only,
716                from the latest to the earliest.
717        """
718        # Simplified version of self.head():
719        #   - slightly less precise,
720        #   - 20% faster
721        #
722        # out of 500 randomly sampled projects, 493 had the same head.
723        # In the remaining 7:
724        #     2 had the same commit chain length,
725        #     3 had one more commit
726        #     1 had two more commits
727        #     1 had three more commits
728        # Execution time:
729        #   simplified version (argmax): ~153 seconds
730        #   self.head(): ~190 seconds
731
732        # at this point we know all commits are in the dataset
733        # (validated in __iter___)
734        commit = self.latest_commit
735
736        while commit:
737            # no point try-except: the truth value of a list is len(list)
738            first_parent = commit._parent_shas and commit._parent_shas[0]
739            yield commit
740            if not first_parent:
741                break
742            commit = self._commit_map.get(first_parent, Commit(first_parent))
743
744    def __iter__(self) -> "Generator[Commit, None, None]":
745        for c in self.commits:
746            try:
747                if c.author in self.woc.config["ignoredAuthors"]:
748                    continue
749                yield c
750            except KeyError:
751                pass
752
753    @property
754    def projects(self) -> List["Project"]:
755        raise NotImplementedError("Project object does not have projects method")
756
757    def download_blob(self, blob_sha: str) -> str:
758        """
759        Download the blob content from remote.
760        """
761        try:
762            from urllib.parse import quote_plus
763
764            import requests
765        except ImportError:
766            raise ImportError(
767                "This function requires the requests module. Install it via `pip install requests`"
768            )
769
770        if self._platform_repo[0] == "github.com":
771            project = self._platform_repo[1]
772            _r = requests.get(
773                f"https://api.github.com/repos/{project}/git/blobs/{blob_sha}",
774                allow_redirects=True,
775                headers={"Accept": "application/vnd.github.raw+json"},
776            )
777            _r.raise_for_status()
778            return _r.content
779        elif self._platform_repo[0] == "gitlab.com":
780            if not hasattr(self, "gitlab_project_id"):
781                project = quote_plus(self._platform_repo[1])
782                r = requests.get(f"https://gitlab.com/api/v4/projects/{project}")
783                r.raise_for_status()
784                self.gitlab_project_id = r.json()["id"]
785            _r = requests.get(
786                f"https://gitlab.com/api/v4/projects/{self.gitlab_project_id}/repository/blobs/{blob_sha}/raw",
787                allow_redirects=True,
788            )
789            _r.raise_for_status()
790            return _r.content
791        else:
792            raise NotImplementedError(
793                "The function is not implemented for " + self._platform_repo[0]
794            )
795
796    def save(self, path: str, commit: Optional[Commit] = None):
797        """
798        Save the project files to the disk. Binary blobs are retrieved from the remote.
799
800        :param path: The path to save the files.
801        :param commit: Save the files at this commit. If None, the head or latest commit is used.
802        """
803        if commit is None:
804            try:
805                commit = self.head
806            except ValueError:
807                _logger.warning(
808                    f"No head commit found for {self.key}, using latest commit"
809                )
810                commit = self.latest_commit
811
812        flist = list(commit.tree.traverse())
813        for idx, (f, blob) in enumerate(flist):
814            _logger.debug(f"{idx + 1}/{len(flist)}: {f.path}")
815            _p = os.path.join(path, f.path)
816            os.makedirs(os.path.dirname(_p), exist_ok=True)
817            with open(_p, "wb") as f:
818                try:
819                    f.write(blob.data.encode())
820                except KeyError:
821                    _logger.info(f"Missing blob {blob.key}")
822                    try:
823                        if self._platform_repo[0] in ("github.com", "gitlab.com"):
824                            f.write(self.download_blob(blob.hash))
825                    except Exception as e:
826                        _logger.error(f"Failed to download blob {blob.hash}: {e}")
827                except Exception as e:
828                    _logger.error(f"Failed to write blob {blob.hash}: {e}")
829
830
831class RootProject(Project):
832    _ident = "P"
833
834    @cached_property
835    def unique_authors(self) -> "List[Author]":
836        return [UniqueAuthor(a) for a in self._get_list_values(f"{self._ident}2A")]
837
838    @cached_property
839    def commits(self) -> "List[Commit]":
840        return [Commit(c) for c in self._get_list_values(f"{self._ident}2C")]
841
842    @cached_property
843    def projects(self) -> "List[Project]":
844        return [Project(p) for p in self._get_list_values(f"{self._ident}2p")]
845
846    @property
847    def root_projects(self) -> List["RootProject"]:
848        raise NotImplementedError("RootProject object does not have root_projects method")
def init_woc_objects(woc: woc.base.WocMapsBase):
19def init_woc_objects(woc: WocMapsBase):
20    """
21    Stores wocMaps object globally so you don't have to pass it around.
22
23    :param woc: a wocMaps object.
24    """
25    global _global_woc
26    _global_woc = woc

Stores wocMaps object globally so you don't have to pass it around.

Parameters
  • woc: a wocMaps object.
@lru_cache(maxsize=None)
def parse_timezone_offset(offset_str: str) -> datetime.timezone:
29@lru_cache(maxsize=None)
30def parse_timezone_offset(offset_str: str) -> timezone:
31    """
32    Parse a timezone offset string in the format '+HHMM' or '-HHMM' into a timezone object.
33
34    >>> parse_timezone_offset('+0530')
35    timezone(timedelta(seconds=19800))
36    """
37    match = re.match(r"([+-])(\d{2})(\d{2})", offset_str)
38    if not match:
39        raise ValueError(f"Invalid timezone offset format: {offset_str}")
40    sign, hours, minutes = match.groups()
41    hours, minutes = int(hours), int(minutes)
42    offset = timedelta(hours=hours, minutes=minutes)
43
44    if sign == "-":
45        offset = -offset
46
47    return timezone(offset)

Parse a timezone offset string in the format '+HHMM' or '-HHMM' into a timezone object.

>>> parse_timezone_offset('+0530')
timezone(timedelta(seconds=19800))
class Author(_NamedObject):
136class Author(_NamedObject):
137    _ident = "a"
138
139    @cached_property
140    def _username_email(self) -> Tuple[str, str]:
141        _splited = self.key.split(" <", 1)
142        if len(_splited) == 1:
143            return _splited[0], ""
144        return _splited[0], _splited[1][:-1]
145
146    @property
147    def name(self) -> str:
148        return self._username_email[0]
149
150    @property
151    def email(self) -> str:
152        return self._username_email[1]
153
154    @cached_property
155    def blobs(self) -> "List[Blob]":
156        return [Blob(b) for b in self._get_list_values(f"{self._ident}2b")]
157
158    @cached_property
159    def commits(self) -> "List[Commit]":
160        return [Commit(c) for c in self._get_list_values(f"{self._ident}2c")]
161
162    @cached_property
163    def files(self) -> "List[File]":
164        return [File(f) for f in self._get_list_values(f"{self._ident}2f")]
165
166    @cached_property
167    def projects(self) -> "List[Project]":
168        return [Project(p) for p in self._get_list_values(f"{self._ident}2p")]
169
170    @cached_property
171    def unique_authors(self) -> List["UniqueAuthor"]:
172        return [UniqueAuthor(a) for a in self._get_list_values(f"{self._ident}2A")]
173
174    @property
175    def authors(self):
176        raise NotImplementedError("Author object does not have authors method")
177
178    @property
179    def aliases(self) -> List["Author"]:
180        _unique_authors = self.unique_authors
181        if len(_unique_authors) == 0:
182            return []
183        return _unique_authors[0].authors
184
185    @cached_property
186    def first_blobs(self) -> List["Blob"]:
187        return [Blob(b) for b in self._get_list_values(f"{self._ident}2fb")]

Base class for objects indexed by a string key

name: str
146    @property
147    def name(self) -> str:
148        return self._username_email[0]
email: str
150    @property
151    def email(self) -> str:
152        return self._username_email[1]
blobs: List[Blob]
154    @cached_property
155    def blobs(self) -> "List[Blob]":
156        return [Blob(b) for b in self._get_list_values(f"{self._ident}2b")]
commits: List[Commit]
158    @cached_property
159    def commits(self) -> "List[Commit]":
160        return [Commit(c) for c in self._get_list_values(f"{self._ident}2c")]
files: List[File]
162    @cached_property
163    def files(self) -> "List[File]":
164        return [File(f) for f in self._get_list_values(f"{self._ident}2f")]
projects: List[Project]
166    @cached_property
167    def projects(self) -> "List[Project]":
168        return [Project(p) for p in self._get_list_values(f"{self._ident}2p")]
unique_authors: list[UniqueAuthor]
170    @cached_property
171    def unique_authors(self) -> List["UniqueAuthor"]:
172        return [UniqueAuthor(a) for a in self._get_list_values(f"{self._ident}2A")]
authors
174    @property
175    def authors(self):
176        raise NotImplementedError("Author object does not have authors method")
aliases: list[Author]
178    @property
179    def aliases(self) -> List["Author"]:
180        _unique_authors = self.unique_authors
181        if len(_unique_authors) == 0:
182            return []
183        return _unique_authors[0].authors
first_blobs: list[Blob]
185    @cached_property
186    def first_blobs(self) -> List["Blob"]:
187        return [Blob(b) for b in self._get_list_values(f"{self._ident}2fb")]
class UniqueAuthor(Author):
190class UniqueAuthor(Author):
191    _ident = "A"
192
193    @property
194    def unique_authors(self) -> "List[Author]":
195        raise NotImplementedError(
196            "UniqueAuthor object does not have unique_authors method"
197        )
198
199    @cached_property
200    def authors(self) -> "List[Author]":
201        return [Author(a) for a in self._get_list_values(f"{self._ident}2a")]

Base class for objects indexed by a string key

unique_authors: List[Author]
193    @property
194    def unique_authors(self) -> "List[Author]":
195        raise NotImplementedError(
196            "UniqueAuthor object does not have unique_authors method"
197        )
authors: List[Author]
199    @cached_property
200    def authors(self) -> "List[Author]":
201        return [Author(a) for a in self._get_list_values(f"{self._ident}2a")]
class Blob(_GitObject):
204class Blob(_GitObject):
205    _ident = "b"
206
207    @cached_property
208    def _pos(self) -> Tuple[int, int]:
209        return self.woc._get_pos("blob", self.key)
210
211    def __len__(self) -> int:
212        return self._pos[1]
213
214    def __str__(self) -> str:
215        return self.data
216
217    @cached_property
218    def commits(self) -> "List[Commit]":
219        return [Commit(sha) for sha in self._get_list_values("b2c")]
220
221    @cached_property
222    def first_author(self) -> "Tuple[datetime, Author, Commit]":
223        """
224        Returns the timestamp, author, and commit of the first author.
225
226        >>> woc.get_values('b2fa', '05fe634ca4c8386349ac519f899145c75fff4169'))
227        (datetime.datetime(2014, 9, 7, 2, 59, 48), Author(Audris Mockus <audris@utk.edu>), Commit(e4af89166a17785c1d741b8b1d5775f3223f510f))
228        """
229        _out = self.woc.get_values("b2fa", self.key)
230        _date = datetime.fromtimestamp(int(_out[0]))
231        _author = Author(_out[1])
232        _commit = Commit(_out[2])
233        return _date, _author, _commit
234
235    @cached_property
236    def time_author_commits(self) -> "List[Tuple[datetime, Author, Commit]]":
237        _out = self._get_list_values("b2tac")
238        return [
239            (datetime.fromtimestamp(int(d[0])), Author(d[1]), Commit(d[2])) for d in _out
240        ]
241
242    @cached_property
243    def files(self) -> "List[File]":
244        return [File(f) for f in self._get_list_values("b2f")]
245
246    @cached_property
247    def projects_unique(self) -> "List[RootProject]":
248        return [RootProject(p) for p in self._get_list_values("b2P")]
249
250    @cached_property
251    def changed_from(self) -> "List[Tuple[Blob, Commit, File]]":
252        return [
253            (Blob(b), Commit(c), File(f)) for b, c, f in self._get_list_values("bb2cf")
254        ]
255
256    @cached_property
257    def changed_to(self) -> "List[Tuple[Blob, Commit, File]]":
258        return [
259            (Blob(b), Commit(c), File(f)) for b, c, f in self._get_list_values("obb2cf")
260        ]

Base class for SHA1-indexed Git objects (commit, tree, blob)

commits: List[Commit]
217    @cached_property
218    def commits(self) -> "List[Commit]":
219        return [Commit(sha) for sha in self._get_list_values("b2c")]
first_author: Tuple[datetime.datetime, Author, Commit]
221    @cached_property
222    def first_author(self) -> "Tuple[datetime, Author, Commit]":
223        """
224        Returns the timestamp, author, and commit of the first author.
225
226        >>> woc.get_values('b2fa', '05fe634ca4c8386349ac519f899145c75fff4169'))
227        (datetime.datetime(2014, 9, 7, 2, 59, 48), Author(Audris Mockus <audris@utk.edu>), Commit(e4af89166a17785c1d741b8b1d5775f3223f510f))
228        """
229        _out = self.woc.get_values("b2fa", self.key)
230        _date = datetime.fromtimestamp(int(_out[0]))
231        _author = Author(_out[1])
232        _commit = Commit(_out[2])
233        return _date, _author, _commit

Returns the timestamp, author, and commit of the first author.

>>> woc.get_values('b2fa', '05fe634ca4c8386349ac519f899145c75fff4169'))
(datetime.datetime(2014, 9, 7, 2, 59, 48), Author(Audris Mockus <audris@utk.edu>), Commit(e4af89166a17785c1d741b8b1d5775f3223f510f))
time_author_commits: List[Tuple[datetime.datetime, Author, Commit]]
235    @cached_property
236    def time_author_commits(self) -> "List[Tuple[datetime, Author, Commit]]":
237        _out = self._get_list_values("b2tac")
238        return [
239            (datetime.fromtimestamp(int(d[0])), Author(d[1]), Commit(d[2])) for d in _out
240        ]
files: List[File]
242    @cached_property
243    def files(self) -> "List[File]":
244        return [File(f) for f in self._get_list_values("b2f")]
projects_unique: List[RootProject]
246    @cached_property
247    def projects_unique(self) -> "List[RootProject]":
248        return [RootProject(p) for p in self._get_list_values("b2P")]
changed_from: List[Tuple[Blob, Commit, File]]
250    @cached_property
251    def changed_from(self) -> "List[Tuple[Blob, Commit, File]]":
252        return [
253            (Blob(b), Commit(c), File(f)) for b, c, f in self._get_list_values("bb2cf")
254        ]
changed_to: List[Tuple[Blob, Commit, File]]
256    @cached_property
257    def changed_to(self) -> "List[Tuple[Blob, Commit, File]]":
258        return [
259            (Blob(b), Commit(c), File(f)) for b, c, f in self._get_list_values("obb2cf")
260        ]
class Commit(_GitObject):
263class Commit(_GitObject):
264    _ident = "c"
265
266    @cached_property
267    def data_obj(self):
268        _ret = {}
269        (
270            _ret["tree"],
271            _ret["parent"],
272            (_ret["author"], _ret["author_timestamp"], _ret["author_timezone"]),
273            (_ret["committer"], _ret["committer_timestamp"], _ret["committer_timezone"]),
274            _ret["message"],
275        ) = self.data
276        return _ret
277
278    @property
279    def author(self) -> Author:
280        return Author(self.data_obj["author"])
281
282    @property
283    def authored_at(self) -> datetime:
284        tz = parse_timezone_offset(self.data_obj["author_timezone"])
285        return datetime.fromtimestamp(int(self.data_obj["author_timestamp"]), tz)
286
287    @property
288    def committer(self) -> Author:
289        return Author(self.data_obj["committer"])
290
291    @property
292    def committed_at(self) -> datetime:
293        tz = parse_timezone_offset(self.data_obj["committer_timezone"])
294        return datetime.fromtimestamp(int(self.data_obj["committer_timestamp"]), tz)
295
296    @property
297    def full_message(self) -> str:
298        """Full message of the commit"""
299        return self.data_obj["message"]
300
301    @property
302    def message(self) -> str:
303        """Short message of the commit"""
304        return self.data_obj["message"].split("\n", 1)[0]
305
306    @cached_property
307    def tree(self) -> "Tree":
308        return Tree(self.data_obj["tree"])
309
310    @property
311    def _parent_shas(self) -> List[str]:
312        return self.data_obj["parent"]
313
314    @property
315    def parents(self) -> List["Commit"]:
316        """Parent commits of this commit"""
317        return [Commit(p) for p in self.data_obj["parent"]]
318
319    @cached_property
320    def projects(self) -> List["Project"]:
321        """Projects associated with this commit"""
322        return [Project(p) for p in self._get_list_values("c2p")]
323
324    @cached_property
325    def root_projects(self) -> List["RootProject"]:
326        """Root projects associated with this commit"""
327        return [RootProject(p) for p in self._get_list_values("c2P")]
328
329    @cached_property
330    def children(self) -> List["Commit"]:
331        """Children of this commit"""
332        return [Commit(c) for c in self._get_list_values("c2cc")]
333
334    @cached_property
335    def _file_names(self) -> List[str]:
336        return self._get_list_values("c2f")
337
338    @cached_property
339    def _file_set(self) -> Set[str]:
340        return set(self._file_names)
341
342    @cached_property
343    def files(self) -> List["File"]:
344        """Files changed in this commit"""
345        return [File(f) for f in self._file_names]
346
347    @cached_property
348    def _blob_shas(self) -> List[str]:
349        return self._get_list_values("c2b")
350
351    @cached_property
352    def _blob_set(self) -> Set[str]:
353        return set(self._blob_shas)
354
355    @cached_property
356    def blobs(self) -> List["Blob"]:
357        """
358        Blobs changed in this commit.
359
360        This relation is known to miss every first file in all trees.
361        Consider using Commit.tree.blobs as a slower but more accurate
362        alternative.
363        """
364        return [Blob(b) for b in self._get_list_values("c2b")]
365
366    @cached_property
367    def time_author(self) -> Tuple[datetime, Author]:
368        """Timestamp and author of the commit"""
369        res = self.woc.get_values("c2ta", self.key)
370        return datetime.fromtimestamp(int(res[0])), Author(res[1])
371
372    @cached_property
373    def root(self) -> "Tuple[Commit, int]":
374        """Root commit of the project"""
375        sha, dis = self.woc.get_values("c2r", self.key)
376        return Commit(sha), int(dis)
377
378    @cached_property
379    def changeset(self) -> "List[Tuple[File, Blob, Blob]]":
380        """Returns changed files, their new and old blobs"""
381        return [
382            (File(f), Blob(new), Blob(old))
383            for f, new, old in self._get_list_values("c2fbb")
384        ]
385
386    def compare(
387        self, parent: Union["Commit", str], threshold=0.5
388    ) -> Generator[
389        Tuple[Optional["File"], Optional["File"], Optional["Blob"], Optional["Blob"]],
390        None,
391        None,
392    ]:
393        """
394        Compare two Commits.
395
396        :param parent: another commit to compare to.
397                Expected order is `diff = child_commit - parent_commit`
398
399        :return: a generator of 4-tuples `(old_path, new_path, old_sha, new_sha)`
400
401        Examples:
402        - a new file 'setup.py' was created:
403            `(None, 'setup.py', None, 'file_sha')`
404        - an existing 'setup.py' was deleted:
405            `('setup.py', None, 'old_file_sha', None)`
406        - setup.py.old was renamed to setup.py, content unchanged:
407            `('setup.py.old', 'setup.py', 'file_sha', 'file_sha')`
408        - setup.py was edited:
409            `('setup.py', 'setup.py', 'old_file_sha', 'new_file_sha')`
410        - setup.py.old was edited and renamed to setup.py:
411            `('setup.py.old', 'setup.py', 'old_file_sha', 'new_file_sha')`
412
413        Detecting the last one is computationally expensive. You can adjust this
414        behaviour by passing the `threshold` parameter, which is 0.5 by default.
415        It means that if roughly 50% of the file content is the same,
416        it is considered a match. `threshold=1` means that only exact
417        matches are considered, effectively disabling this comparison.
418        If threshold is set to 0, any pair of deleted and added file will be
419        considered renamed and edited; this last case doesn't make much sense so
420        don't set it too low.
421        """
422        if isinstance(parent, str):
423            parent = Commit(parent)
424        if not isinstance(parent, Commit):
425            raise TypeError("parent must be a Commit or a commit hash")
426
427        # # filename: (blob sha before, blob sha after)
428        # new_files = self.tree._file_blob_map
429        # new_paths = self.tree._file_set
430        # old_files = parent.tree._file_blob_map
431        # old_paths = parent.tree._file_set
432
433        # !!! We really need to traverse the trees ###
434        new_files: Dict[File, Blob] = {}
435        for f, b in self.tree.traverse():
436            new_files[f] = b
437        old_files: Dict[File, Blob] = {}
438        for f, b in parent.tree.traverse():
439            old_files[f] = b
440
441        # unchanged_paths
442        for f in new_files.keys() & old_files.keys():
443            if new_files[f] != old_files[f]:
444                # i.e. Blob sha Changed!
445                yield f, f, old_files[f], new_files[f]
446
447        added_paths: Set[File] = new_files.keys() - old_files.keys()
448        deleted_paths: Set[File] = old_files.keys() - new_files.keys()
449
450        if threshold >= 1:  # i.e. only exact matches are considered
451            for f in added_paths:  # add
452                yield None, f, None, new_files[f]
453            for f in deleted_paths:
454                yield f, None, old_files[f], None
455            return
456
457        if parent.hash not in self._parent_shas:
458            warnings.warn(
459                "Comparing non-adjacent commits might be "
460                "computationally expensive. Proceed with caution."
461            )
462
463        # search for matches
464        sm = difflib.SequenceMatcher()
465        # for each added blob, try to find a match in deleted blobs
466        #   if there is a match, signal a rename and remove from deleted
467        #   if there is no match, signal a new file
468        # unused deleted blobs are indeed deleted
469        for added_file, added_blob in new_files.items():
470            sm.set_seq1(added_blob.data)
471            matched = False
472            for deleted_file, deleted_blob in old_files.items():
473                sm.set_seq2(deleted_blob.data)
474                # use quick checks first (lower bound by length diff)
475                if (
476                    sm.real_quick_ratio() > threshold
477                    and sm.quick_ratio() > threshold
478                    and sm.ratio() > threshold
479                ):
480                    yield deleted_file, added_file, deleted_blob, added_blob
481                    del old_files[deleted_file]
482                    matched = True
483                    break
484            if not matched:  # this is a new file
485                yield None, added_file, None, added_blob
486
487        for deleted_file, deleted_blob in old_files.items():
488            yield deleted_file, None, deleted_blob, None
489
490    def __sub__(self, parent: "Commit"):
491        return self.compare(parent)

Base class for SHA1-indexed Git objects (commit, tree, blob)

data_obj
266    @cached_property
267    def data_obj(self):
268        _ret = {}
269        (
270            _ret["tree"],
271            _ret["parent"],
272            (_ret["author"], _ret["author_timestamp"], _ret["author_timezone"]),
273            (_ret["committer"], _ret["committer_timestamp"], _ret["committer_timezone"]),
274            _ret["message"],
275        ) = self.data
276        return _ret
author: Author
278    @property
279    def author(self) -> Author:
280        return Author(self.data_obj["author"])
authored_at: datetime.datetime
282    @property
283    def authored_at(self) -> datetime:
284        tz = parse_timezone_offset(self.data_obj["author_timezone"])
285        return datetime.fromtimestamp(int(self.data_obj["author_timestamp"]), tz)
committer: Author
287    @property
288    def committer(self) -> Author:
289        return Author(self.data_obj["committer"])
committed_at: datetime.datetime
291    @property
292    def committed_at(self) -> datetime:
293        tz = parse_timezone_offset(self.data_obj["committer_timezone"])
294        return datetime.fromtimestamp(int(self.data_obj["committer_timestamp"]), tz)
full_message: str
296    @property
297    def full_message(self) -> str:
298        """Full message of the commit"""
299        return self.data_obj["message"]

Full message of the commit

message: str
301    @property
302    def message(self) -> str:
303        """Short message of the commit"""
304        return self.data_obj["message"].split("\n", 1)[0]

Short message of the commit

tree: Tree
306    @cached_property
307    def tree(self) -> "Tree":
308        return Tree(self.data_obj["tree"])
parents: list[Commit]
314    @property
315    def parents(self) -> List["Commit"]:
316        """Parent commits of this commit"""
317        return [Commit(p) for p in self.data_obj["parent"]]

Parent commits of this commit

projects: list[Project]
319    @cached_property
320    def projects(self) -> List["Project"]:
321        """Projects associated with this commit"""
322        return [Project(p) for p in self._get_list_values("c2p")]

Projects associated with this commit

root_projects: list[RootProject]
324    @cached_property
325    def root_projects(self) -> List["RootProject"]:
326        """Root projects associated with this commit"""
327        return [RootProject(p) for p in self._get_list_values("c2P")]

Root projects associated with this commit

children: list[Commit]
329    @cached_property
330    def children(self) -> List["Commit"]:
331        """Children of this commit"""
332        return [Commit(c) for c in self._get_list_values("c2cc")]

Children of this commit

files: list[File]
342    @cached_property
343    def files(self) -> List["File"]:
344        """Files changed in this commit"""
345        return [File(f) for f in self._file_names]

Files changed in this commit

blobs: list[Blob]
355    @cached_property
356    def blobs(self) -> List["Blob"]:
357        """
358        Blobs changed in this commit.
359
360        This relation is known to miss every first file in all trees.
361        Consider using Commit.tree.blobs as a slower but more accurate
362        alternative.
363        """
364        return [Blob(b) for b in self._get_list_values("c2b")]

Blobs changed in this commit.

This relation is known to miss every first file in all trees. Consider using Commit.tree.blobs as a slower but more accurate alternative.

time_author: Tuple[datetime.datetime, Author]
366    @cached_property
367    def time_author(self) -> Tuple[datetime, Author]:
368        """Timestamp and author of the commit"""
369        res = self.woc.get_values("c2ta", self.key)
370        return datetime.fromtimestamp(int(res[0])), Author(res[1])

Timestamp and author of the commit

root: Tuple[Commit, int]
372    @cached_property
373    def root(self) -> "Tuple[Commit, int]":
374        """Root commit of the project"""
375        sha, dis = self.woc.get_values("c2r", self.key)
376        return Commit(sha), int(dis)

Root commit of the project

changeset: List[Tuple[File, Blob, Blob]]
378    @cached_property
379    def changeset(self) -> "List[Tuple[File, Blob, Blob]]":
380        """Returns changed files, their new and old blobs"""
381        return [
382            (File(f), Blob(new), Blob(old))
383            for f, new, old in self._get_list_values("c2fbb")
384        ]

Returns changed files, their new and old blobs

def compare( self, parent: Union[Commit, str], threshold=0.5) -> collections.abc.Generator[tuple[typing.Union[File, NoneType], typing.Union[File, NoneType], typing.Union[Blob, NoneType], typing.Union[Blob, NoneType]], NoneType, NoneType]:
386    def compare(
387        self, parent: Union["Commit", str], threshold=0.5
388    ) -> Generator[
389        Tuple[Optional["File"], Optional["File"], Optional["Blob"], Optional["Blob"]],
390        None,
391        None,
392    ]:
393        """
394        Compare two Commits.
395
396        :param parent: another commit to compare to.
397                Expected order is `diff = child_commit - parent_commit`
398
399        :return: a generator of 4-tuples `(old_path, new_path, old_sha, new_sha)`
400
401        Examples:
402        - a new file 'setup.py' was created:
403            `(None, 'setup.py', None, 'file_sha')`
404        - an existing 'setup.py' was deleted:
405            `('setup.py', None, 'old_file_sha', None)`
406        - setup.py.old was renamed to setup.py, content unchanged:
407            `('setup.py.old', 'setup.py', 'file_sha', 'file_sha')`
408        - setup.py was edited:
409            `('setup.py', 'setup.py', 'old_file_sha', 'new_file_sha')`
410        - setup.py.old was edited and renamed to setup.py:
411            `('setup.py.old', 'setup.py', 'old_file_sha', 'new_file_sha')`
412
413        Detecting the last one is computationally expensive. You can adjust this
414        behaviour by passing the `threshold` parameter, which is 0.5 by default.
415        It means that if roughly 50% of the file content is the same,
416        it is considered a match. `threshold=1` means that only exact
417        matches are considered, effectively disabling this comparison.
418        If threshold is set to 0, any pair of deleted and added file will be
419        considered renamed and edited; this last case doesn't make much sense so
420        don't set it too low.
421        """
422        if isinstance(parent, str):
423            parent = Commit(parent)
424        if not isinstance(parent, Commit):
425            raise TypeError("parent must be a Commit or a commit hash")
426
427        # # filename: (blob sha before, blob sha after)
428        # new_files = self.tree._file_blob_map
429        # new_paths = self.tree._file_set
430        # old_files = parent.tree._file_blob_map
431        # old_paths = parent.tree._file_set
432
433        # !!! We really need to traverse the trees ###
434        new_files: Dict[File, Blob] = {}
435        for f, b in self.tree.traverse():
436            new_files[f] = b
437        old_files: Dict[File, Blob] = {}
438        for f, b in parent.tree.traverse():
439            old_files[f] = b
440
441        # unchanged_paths
442        for f in new_files.keys() & old_files.keys():
443            if new_files[f] != old_files[f]:
444                # i.e. Blob sha Changed!
445                yield f, f, old_files[f], new_files[f]
446
447        added_paths: Set[File] = new_files.keys() - old_files.keys()
448        deleted_paths: Set[File] = old_files.keys() - new_files.keys()
449
450        if threshold >= 1:  # i.e. only exact matches are considered
451            for f in added_paths:  # add
452                yield None, f, None, new_files[f]
453            for f in deleted_paths:
454                yield f, None, old_files[f], None
455            return
456
457        if parent.hash not in self._parent_shas:
458            warnings.warn(
459                "Comparing non-adjacent commits might be "
460                "computationally expensive. Proceed with caution."
461            )
462
463        # search for matches
464        sm = difflib.SequenceMatcher()
465        # for each added blob, try to find a match in deleted blobs
466        #   if there is a match, signal a rename and remove from deleted
467        #   if there is no match, signal a new file
468        # unused deleted blobs are indeed deleted
469        for added_file, added_blob in new_files.items():
470            sm.set_seq1(added_blob.data)
471            matched = False
472            for deleted_file, deleted_blob in old_files.items():
473                sm.set_seq2(deleted_blob.data)
474                # use quick checks first (lower bound by length diff)
475                if (
476                    sm.real_quick_ratio() > threshold
477                    and sm.quick_ratio() > threshold
478                    and sm.ratio() > threshold
479                ):
480                    yield deleted_file, added_file, deleted_blob, added_blob
481                    del old_files[deleted_file]
482                    matched = True
483                    break
484            if not matched:  # this is a new file
485                yield None, added_file, None, added_blob
486
487        for deleted_file, deleted_blob in old_files.items():
488            yield deleted_file, None, deleted_blob, None

Compare two Commits.

Parameters
  • parent: another commit to compare to. Expected order is diff = child_commit - parent_commit
Returns

a generator of 4-tuples (old_path, new_path, old_sha, new_sha)

Examples:

  • a new file 'setup.py' was created: (None, 'setup.py', None, 'file_sha')
  • an existing 'setup.py' was deleted: ('setup.py', None, 'old_file_sha', None)
  • setup.py.old was renamed to setup.py, content unchanged: ('setup.py.old', 'setup.py', 'file_sha', 'file_sha')
  • setup.py was edited: ('setup.py', 'setup.py', 'old_file_sha', 'new_file_sha')
  • setup.py.old was edited and renamed to setup.py: ('setup.py.old', 'setup.py', 'old_file_sha', 'new_file_sha')

Detecting the last one is computationally expensive. You can adjust this behaviour by passing the threshold parameter, which is 0.5 by default. It means that if roughly 50% of the file content is the same, it is considered a match. threshold=1 means that only exact matches are considered, effectively disabling this comparison. If threshold is set to 0, any pair of deleted and added file will be considered renamed and edited; this last case doesn't make much sense so don't set it too low.

class File(_NamedObject):
494class File(_NamedObject):
495    _ident = "f"
496
497    @property
498    def path(self) -> str:
499        return self.key
500
501    @property
502    def name(self) -> str:
503        return self.key.split("/")[-1]
504
505    @cached_property
506    def authors(self) -> List[Author]:
507        return [Author(a) for a in self._get_list_values("f2a")]
508
509    @cached_property
510    def blobs(self) -> List[Blob]:
511        return [Blob(b) for b in self._get_list_values("f2b")]
512
513    @cached_property
514    def commits(self) -> List[Commit]:
515        return [Commit(c) for c in self._get_list_values("f2c")]

Base class for objects indexed by a string key

path: str
497    @property
498    def path(self) -> str:
499        return self.key
name: str
501    @property
502    def name(self) -> str:
503        return self.key.split("/")[-1]
authors: List[Author]
505    @cached_property
506    def authors(self) -> List[Author]:
507        return [Author(a) for a in self._get_list_values("f2a")]
blobs: List[Blob]
509    @cached_property
510    def blobs(self) -> List[Blob]:
511        return [Blob(b) for b in self._get_list_values("f2b")]
commits: List[Commit]
513    @cached_property
514    def commits(self) -> List[Commit]:
515        return [Commit(c) for c in self._get_list_values("f2c")]
class Tree(_GitObject):
518class Tree(_GitObject):
519    _ident = "t"
520
521    @cached_property
522    def data(self) -> str:
523        return self.woc.show_content("tree", self.key)
524
525    @property
526    def _file_names(self) -> List[str]:
527        return [l[1] for l in self.data]
528
529    @cached_property
530    def _file_set(self) -> Set[str]:
531        return {l[1] for l in self.data}
532
533    @property
534    def files(self) -> List["File"]:
535        return [File(f) for f in self._file_names]
536
537    @property
538    def _blob_shas(self) -> List[str]:
539        return [l[2] for l in self.data]
540
541    @cached_property
542    def _blob_set(self) -> Set[str]:
543        return {l[2] for l in self.data}
544
545    @property
546    def blobs(self) -> List["Blob"]:
547        return [Blob(b) for b in self._blob_shas]
548
549    @cached_property
550    def _file_blob_map(self) -> Dict[str, str]:
551        return {l[1]: l[2] for l in self.data}
552
553    def _traverse(self) -> "Generator[Tuple[str, str], None, None]":
554        for mode, fname, sha in self.data:
555            # trees are always 40000:
556            # https://stackoverflow.com/questions/1071241
557            if mode != "40000":
558                yield fname, sha
559            else:
560                _logger.debug(f"traverse: into {fname} ({sha})")
561                for _fname, _sha in Tree(sha)._traverse():
562                    yield fname + "/" + _fname, _sha
563
564    def traverse(self) -> "Generator[Tuple[File, Blob], None, None]":
565        for fname, sha in self._traverse():
566            yield File(fname), Blob(sha)
567
568    def __contains__(self, item: Union[str, File, Blob]) -> bool:
569        if isinstance(item, str):
570            return item in self._file_names or item in self._blob_shas
571        if isinstance(item, File):
572            return item.text in self._file_names
573        if isinstance(item, Blob):
574            return item.hex in self._blob_shas
575        return False
576
577    def __str__(self) -> str:
578        return "\n".join([" ".join(l) for l in self.data])
579
580    def __len__(self) -> int:
581        return len(self.data)
582
583    def __iter__(self) -> "Generator[Tuple[File, Blob], None, None]":
584        for l in self.data:
585            yield File(l[1]), Blob(l[2])

Base class for SHA1-indexed Git objects (commit, tree, blob)

data: str
521    @cached_property
522    def data(self) -> str:
523        return self.woc.show_content("tree", self.key)
files: list[File]
533    @property
534    def files(self) -> List["File"]:
535        return [File(f) for f in self._file_names]
blobs: list[Blob]
545    @property
546    def blobs(self) -> List["Blob"]:
547        return [Blob(b) for b in self._blob_shas]
def traverse( self) -> Generator[Tuple[File, Blob], NoneType, NoneType]:
564    def traverse(self) -> "Generator[Tuple[File, Blob], None, None]":
565        for fname, sha in self._traverse():
566            yield File(fname), Blob(sha)
class Project(_NamedObject):
588class Project(_NamedObject):
589    _ident = "p"
590
591    @cached_property
592    def _platform_repo(self) -> str:
593        URL_PREFIXES = self.woc.config["sites"]
594        prefix, body = self.key.split("_", 1)
595        if prefix == "sourceforge.net":
596            platform = URL_PREFIXES[prefix]
597        elif prefix in URL_PREFIXES and "_" in body:
598            platform = URL_PREFIXES[prefix]
599            body = body.replace("_", "/", 1)
600        elif "." in prefix:
601            platform = prefix
602            body = body.replace("_", "/", 1)
603        else:
604            platform = "github.com"
605            body = self.key.replace("_", "/", 1)
606        return platform, body
607
608    @property
609    def url(self) -> str:
610        """
611        Get the URL for a given project URI.
612
613        >>> Project('CS340-19_lectures').url
614        'http://github.com/CS340-19/lectures'
615        """
616        platform, body = self._platform_repo
617        URL_PREFIXES = self.woc.config["sites"]
618        if platform in URL_PREFIXES:
619            return f"https://{URL_PREFIXES[platform]}/{body}"
620        return f"https://{platform}/{body}"
621
622    @cached_property
623    def authors(self) -> "List[Author]":
624        return [Author(a) for a in self._get_list_values(f"{self._ident}2a")]
625
626    @cached_property
627    def _commit_shas(self) -> "List[str]":
628        return self._get_list_values(f"{self._ident}2c")
629
630    @cached_property
631    def _commit_set(self) -> "Set[str]":
632        return self._commit_map.keys()
633
634    @cached_property
635    def _commit_map(self) -> "Dict[str, Commit]":
636        return {c.hash: c for c in self.commits}
637
638    @cached_property
639    def commits(self) -> "List[Commit]":
640        return [Commit(c) for c in self._commit_shas]
641
642    @cached_property
643    def root_projects(self) -> "List[RootProject]":
644        return [RootProject(p) for p in self._get_list_values(f"{self._ident}2P")]
645
646    def __contains__(self, item: Union[str, Commit]) -> bool:
647        if isinstance(item, str):
648            return item in self._commit_set
649        elif isinstance(item, Commit):
650            return item.hash in self._commit_set
651        return False
652
653    @cached_property
654    def head(self) -> "Commit":
655        """
656        Get the HEAD commit of the repository.
657
658        >>> Project('user2589_minicms').head
659        Commit(f2a7fcdc51450ab03cb364415f14e634fa69b62c)
660        >>> Project('RoseTHERESA_SimpleCMS').head
661        Commit(a47afa002ccfd3e23920f323b172f78c5c970250)
662        """
663        # Sometimes (very rarely) commit dates are wrong, so the latest commit
664        # is not actually the head. The magic below is to account for this
665        parents = set().union(*(c._parent_shas for c in self.commits))
666        heads = [self._commit_map[c] for c in self._commit_set - parents]
667
668        # it is possible that there is more than one head.
669        # E.g. it happens when HEAD is moved manually (git reset)
670        # and continued with a separate chain of commits.
671        # in this case, let's just use the latest one
672        # actually, storing refs would make it much simpler
673        _heads_sorted = sorted(heads, key=lambda c: c.authored_at or _DAY_Z, reverse=True)
674        if len(_heads_sorted) == 0:
675            raise ValueError("No head commit found")
676        return _heads_sorted[0]
677
678    @cached_property
679    def tail(self) -> "Commit":
680        """
681        Get the first commit SHA by following first parents.
682
683        >>> Project(b'user2589_minicms').tail
684        Commit(1e971a073f40d74a1e72e07c682e1cba0bae159b)
685        """
686        pts = {c._parent_shas[0] for c in self.commits if c._parent_shas}
687        for c in self.commits:
688            if c.hash in pts and not c._parent_shas:
689                return c
690
691    @cached_property
692    def earliest_commit(self) -> "Commit":
693        """Get the earliest commit of the repository"""
694        return min(self.commits, key=lambda c: c.authored_at or _DAY_Z)
695
696    @cached_property
697    def latest_commit(self) -> "Commit":
698        """Get the latest commit of the repository"""
699        return max(self.commits, key=lambda c: c.authored_at or _DAY_Z)
700
701    def commits_fp(self) -> Generator["Commit", None, None]:
702        """
703        Get a commit chain by following only the first parent.
704
705        Mimic https://git-scm.com/docs/git-log#git-log---first-parent.
706        Thus, you only get a small subset of the full commit tree.
707
708        >>> p = Project(b'user2589_minicms')
709        >>> set(c.sha for c in p.commits_fp).issubset(p.commit_shas)
710        True
711
712        In scenarios where branches are not important, it can save a lot
713        of computing.
714
715        Yields:
716            Commit: binary commit shas, following first parent only,
717                from the latest to the earliest.
718        """
719        # Simplified version of self.head():
720        #   - slightly less precise,
721        #   - 20% faster
722        #
723        # out of 500 randomly sampled projects, 493 had the same head.
724        # In the remaining 7:
725        #     2 had the same commit chain length,
726        #     3 had one more commit
727        #     1 had two more commits
728        #     1 had three more commits
729        # Execution time:
730        #   simplified version (argmax): ~153 seconds
731        #   self.head(): ~190 seconds
732
733        # at this point we know all commits are in the dataset
734        # (validated in __iter___)
735        commit = self.latest_commit
736
737        while commit:
738            # no point try-except: the truth value of a list is len(list)
739            first_parent = commit._parent_shas and commit._parent_shas[0]
740            yield commit
741            if not first_parent:
742                break
743            commit = self._commit_map.get(first_parent, Commit(first_parent))
744
745    def __iter__(self) -> "Generator[Commit, None, None]":
746        for c in self.commits:
747            try:
748                if c.author in self.woc.config["ignoredAuthors"]:
749                    continue
750                yield c
751            except KeyError:
752                pass
753
754    @property
755    def projects(self) -> List["Project"]:
756        raise NotImplementedError("Project object does not have projects method")
757
758    def download_blob(self, blob_sha: str) -> str:
759        """
760        Download the blob content from remote.
761        """
762        try:
763            from urllib.parse import quote_plus
764
765            import requests
766        except ImportError:
767            raise ImportError(
768                "This function requires the requests module. Install it via `pip install requests`"
769            )
770
771        if self._platform_repo[0] == "github.com":
772            project = self._platform_repo[1]
773            _r = requests.get(
774                f"https://api.github.com/repos/{project}/git/blobs/{blob_sha}",
775                allow_redirects=True,
776                headers={"Accept": "application/vnd.github.raw+json"},
777            )
778            _r.raise_for_status()
779            return _r.content
780        elif self._platform_repo[0] == "gitlab.com":
781            if not hasattr(self, "gitlab_project_id"):
782                project = quote_plus(self._platform_repo[1])
783                r = requests.get(f"https://gitlab.com/api/v4/projects/{project}")
784                r.raise_for_status()
785                self.gitlab_project_id = r.json()["id"]
786            _r = requests.get(
787                f"https://gitlab.com/api/v4/projects/{self.gitlab_project_id}/repository/blobs/{blob_sha}/raw",
788                allow_redirects=True,
789            )
790            _r.raise_for_status()
791            return _r.content
792        else:
793            raise NotImplementedError(
794                "The function is not implemented for " + self._platform_repo[0]
795            )
796
797    def save(self, path: str, commit: Optional[Commit] = None):
798        """
799        Save the project files to the disk. Binary blobs are retrieved from the remote.
800
801        :param path: The path to save the files.
802        :param commit: Save the files at this commit. If None, the head or latest commit is used.
803        """
804        if commit is None:
805            try:
806                commit = self.head
807            except ValueError:
808                _logger.warning(
809                    f"No head commit found for {self.key}, using latest commit"
810                )
811                commit = self.latest_commit
812
813        flist = list(commit.tree.traverse())
814        for idx, (f, blob) in enumerate(flist):
815            _logger.debug(f"{idx + 1}/{len(flist)}: {f.path}")
816            _p = os.path.join(path, f.path)
817            os.makedirs(os.path.dirname(_p), exist_ok=True)
818            with open(_p, "wb") as f:
819                try:
820                    f.write(blob.data.encode())
821                except KeyError:
822                    _logger.info(f"Missing blob {blob.key}")
823                    try:
824                        if self._platform_repo[0] in ("github.com", "gitlab.com"):
825                            f.write(self.download_blob(blob.hash))
826                    except Exception as e:
827                        _logger.error(f"Failed to download blob {blob.hash}: {e}")
828                except Exception as e:
829                    _logger.error(f"Failed to write blob {blob.hash}: {e}")

Base class for objects indexed by a string key

url: str
608    @property
609    def url(self) -> str:
610        """
611        Get the URL for a given project URI.
612
613        >>> Project('CS340-19_lectures').url
614        'http://github.com/CS340-19/lectures'
615        """
616        platform, body = self._platform_repo
617        URL_PREFIXES = self.woc.config["sites"]
618        if platform in URL_PREFIXES:
619            return f"https://{URL_PREFIXES[platform]}/{body}"
620        return f"https://{platform}/{body}"

Get the URL for a given project URI.

>>> Project('CS340-19_lectures').url
'http://github.com/CS340-19/lectures'
authors: List[Author]
622    @cached_property
623    def authors(self) -> "List[Author]":
624        return [Author(a) for a in self._get_list_values(f"{self._ident}2a")]
commits: List[Commit]
638    @cached_property
639    def commits(self) -> "List[Commit]":
640        return [Commit(c) for c in self._commit_shas]
root_projects: List[RootProject]
642    @cached_property
643    def root_projects(self) -> "List[RootProject]":
644        return [RootProject(p) for p in self._get_list_values(f"{self._ident}2P")]
head: Commit
653    @cached_property
654    def head(self) -> "Commit":
655        """
656        Get the HEAD commit of the repository.
657
658        >>> Project('user2589_minicms').head
659        Commit(f2a7fcdc51450ab03cb364415f14e634fa69b62c)
660        >>> Project('RoseTHERESA_SimpleCMS').head
661        Commit(a47afa002ccfd3e23920f323b172f78c5c970250)
662        """
663        # Sometimes (very rarely) commit dates are wrong, so the latest commit
664        # is not actually the head. The magic below is to account for this
665        parents = set().union(*(c._parent_shas for c in self.commits))
666        heads = [self._commit_map[c] for c in self._commit_set - parents]
667
668        # it is possible that there is more than one head.
669        # E.g. it happens when HEAD is moved manually (git reset)
670        # and continued with a separate chain of commits.
671        # in this case, let's just use the latest one
672        # actually, storing refs would make it much simpler
673        _heads_sorted = sorted(heads, key=lambda c: c.authored_at or _DAY_Z, reverse=True)
674        if len(_heads_sorted) == 0:
675            raise ValueError("No head commit found")
676        return _heads_sorted[0]

Get the HEAD commit of the repository.

>>> Project('user2589_minicms').head
Commit(f2a7fcdc51450ab03cb364415f14e634fa69b62c)
>>> Project('RoseTHERESA_SimpleCMS').head
Commit(a47afa002ccfd3e23920f323b172f78c5c970250)
tail: Commit
678    @cached_property
679    def tail(self) -> "Commit":
680        """
681        Get the first commit SHA by following first parents.
682
683        >>> Project(b'user2589_minicms').tail
684        Commit(1e971a073f40d74a1e72e07c682e1cba0bae159b)
685        """
686        pts = {c._parent_shas[0] for c in self.commits if c._parent_shas}
687        for c in self.commits:
688            if c.hash in pts and not c._parent_shas:
689                return c

Get the first commit SHA by following first parents.

>>> Project(b'user2589_minicms').tail
Commit(1e971a073f40d74a1e72e07c682e1cba0bae159b)
earliest_commit: Commit
691    @cached_property
692    def earliest_commit(self) -> "Commit":
693        """Get the earliest commit of the repository"""
694        return min(self.commits, key=lambda c: c.authored_at or _DAY_Z)

Get the earliest commit of the repository

latest_commit: Commit
696    @cached_property
697    def latest_commit(self) -> "Commit":
698        """Get the latest commit of the repository"""
699        return max(self.commits, key=lambda c: c.authored_at or _DAY_Z)

Get the latest commit of the repository

def commits_fp( self) -> collections.abc.Generator[Commit, NoneType, NoneType]:
701    def commits_fp(self) -> Generator["Commit", None, None]:
702        """
703        Get a commit chain by following only the first parent.
704
705        Mimic https://git-scm.com/docs/git-log#git-log---first-parent.
706        Thus, you only get a small subset of the full commit tree.
707
708        >>> p = Project(b'user2589_minicms')
709        >>> set(c.sha for c in p.commits_fp).issubset(p.commit_shas)
710        True
711
712        In scenarios where branches are not important, it can save a lot
713        of computing.
714
715        Yields:
716            Commit: binary commit shas, following first parent only,
717                from the latest to the earliest.
718        """
719        # Simplified version of self.head():
720        #   - slightly less precise,
721        #   - 20% faster
722        #
723        # out of 500 randomly sampled projects, 493 had the same head.
724        # In the remaining 7:
725        #     2 had the same commit chain length,
726        #     3 had one more commit
727        #     1 had two more commits
728        #     1 had three more commits
729        # Execution time:
730        #   simplified version (argmax): ~153 seconds
731        #   self.head(): ~190 seconds
732
733        # at this point we know all commits are in the dataset
734        # (validated in __iter___)
735        commit = self.latest_commit
736
737        while commit:
738            # no point try-except: the truth value of a list is len(list)
739            first_parent = commit._parent_shas and commit._parent_shas[0]
740            yield commit
741            if not first_parent:
742                break
743            commit = self._commit_map.get(first_parent, Commit(first_parent))

Get a commit chain by following only the first parent.

Mimic https://git-scm.com/docs/git-log#git-log---first-parent. Thus, you only get a small subset of the full commit tree.

>>> p = Project(b'user2589_minicms')
>>> set(c.sha for c in p.commits_fp).issubset(p.commit_shas)
True

In scenarios where branches are not important, it can save a lot of computing.

Yields: Commit: binary commit shas, following first parent only, from the latest to the earliest.

projects: list[Project]
754    @property
755    def projects(self) -> List["Project"]:
756        raise NotImplementedError("Project object does not have projects method")
def download_blob(self, blob_sha: str) -> str:
758    def download_blob(self, blob_sha: str) -> str:
759        """
760        Download the blob content from remote.
761        """
762        try:
763            from urllib.parse import quote_plus
764
765            import requests
766        except ImportError:
767            raise ImportError(
768                "This function requires the requests module. Install it via `pip install requests`"
769            )
770
771        if self._platform_repo[0] == "github.com":
772            project = self._platform_repo[1]
773            _r = requests.get(
774                f"https://api.github.com/repos/{project}/git/blobs/{blob_sha}",
775                allow_redirects=True,
776                headers={"Accept": "application/vnd.github.raw+json"},
777            )
778            _r.raise_for_status()
779            return _r.content
780        elif self._platform_repo[0] == "gitlab.com":
781            if not hasattr(self, "gitlab_project_id"):
782                project = quote_plus(self._platform_repo[1])
783                r = requests.get(f"https://gitlab.com/api/v4/projects/{project}")
784                r.raise_for_status()
785                self.gitlab_project_id = r.json()["id"]
786            _r = requests.get(
787                f"https://gitlab.com/api/v4/projects/{self.gitlab_project_id}/repository/blobs/{blob_sha}/raw",
788                allow_redirects=True,
789            )
790            _r.raise_for_status()
791            return _r.content
792        else:
793            raise NotImplementedError(
794                "The function is not implemented for " + self._platform_repo[0]
795            )

Download the blob content from remote.

def save(self, path: str, commit: Union[Commit, NoneType] = None):
797    def save(self, path: str, commit: Optional[Commit] = None):
798        """
799        Save the project files to the disk. Binary blobs are retrieved from the remote.
800
801        :param path: The path to save the files.
802        :param commit: Save the files at this commit. If None, the head or latest commit is used.
803        """
804        if commit is None:
805            try:
806                commit = self.head
807            except ValueError:
808                _logger.warning(
809                    f"No head commit found for {self.key}, using latest commit"
810                )
811                commit = self.latest_commit
812
813        flist = list(commit.tree.traverse())
814        for idx, (f, blob) in enumerate(flist):
815            _logger.debug(f"{idx + 1}/{len(flist)}: {f.path}")
816            _p = os.path.join(path, f.path)
817            os.makedirs(os.path.dirname(_p), exist_ok=True)
818            with open(_p, "wb") as f:
819                try:
820                    f.write(blob.data.encode())
821                except KeyError:
822                    _logger.info(f"Missing blob {blob.key}")
823                    try:
824                        if self._platform_repo[0] in ("github.com", "gitlab.com"):
825                            f.write(self.download_blob(blob.hash))
826                    except Exception as e:
827                        _logger.error(f"Failed to download blob {blob.hash}: {e}")
828                except Exception as e:
829                    _logger.error(f"Failed to write blob {blob.hash}: {e}")

Save the project files to the disk. Binary blobs are retrieved from the remote.

Parameters
  • path: The path to save the files.
  • commit: Save the files at this commit. If None, the head or latest commit is used.
class RootProject(Project):
832class RootProject(Project):
833    _ident = "P"
834
835    @cached_property
836    def unique_authors(self) -> "List[Author]":
837        return [UniqueAuthor(a) for a in self._get_list_values(f"{self._ident}2A")]
838
839    @cached_property
840    def commits(self) -> "List[Commit]":
841        return [Commit(c) for c in self._get_list_values(f"{self._ident}2C")]
842
843    @cached_property
844    def projects(self) -> "List[Project]":
845        return [Project(p) for p in self._get_list_values(f"{self._ident}2p")]
846
847    @property
848    def root_projects(self) -> List["RootProject"]:
849        raise NotImplementedError("RootProject object does not have root_projects method")

Base class for objects indexed by a string key

unique_authors: List[Author]
835    @cached_property
836    def unique_authors(self) -> "List[Author]":
837        return [UniqueAuthor(a) for a in self._get_list_values(f"{self._ident}2A")]
commits: List[Commit]
839    @cached_property
840    def commits(self) -> "List[Commit]":
841        return [Commit(c) for c in self._get_list_values(f"{self._ident}2C")]
projects: List[Project]
843    @cached_property
844    def projects(self) -> "List[Project]":
845        return [Project(p) for p in self._get_list_values(f"{self._ident}2p")]
root_projects: list[RootProject]
847    @property
848    def root_projects(self) -> List["RootProject"]:
849        raise NotImplementedError("RootProject object does not have root_projects method")