Source code for pychron.git_archive.repo_manager

# ===============================================================================
# Copyright 2013 Jake Ross
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ===============================================================================

# ============= enthought library imports =======================
import hashlib
import os
import shutil
import subprocess
import sys
import time
from datetime import datetime

from git import Repo
from git.exc import GitCommandError
from traits.api import Any, Str, List, Event

from pychron.core.helpers.filetools import fileiter
from pychron.core.progress import open_progress
from pychron.envisage.view_util import open_view
from pychron.git_archive.diff_view import DiffView, DiffModel
from pychron.git_archive.git_objects import GitSha
from pychron.git_archive.merge_view import MergeModel, MergeView
from pychron.git_archive.utils import get_head_commit, ahead_behind, from_gitlog
from pychron.git_archive.views import NewBranchView
from pychron.loggable import Loggable
from pychron.pychron_constants import DATE_FORMAT


def get_repository_branch(path):
    r = Repo(path)
    b = r.active_branch
    return b.name


def grep(arg, name):
    process = subprocess.Popen(['grep', '-lr', arg, name], stdout=subprocess.PIPE)
    stdout, stderr = process.communicate()
    return stdout, stderr


def format_date(d):
    return time.strftime("%m/%d/%Y %H:%M", time.gmtime(d))


def isoformat_date(d):
    if isinstance(d, (float, int)):
        d = datetime.fromtimestamp(d)

    return d.strftime('%Y-%m-%d %H:%M:%S')
    # return time.mktime(time.gmtime(d))


class StashCTX(object):
    def __init__(self, repo):
        self._repo = repo

    def __enter__(self):
        self._repo.git.stash()

    def __exit__(self, *args, **kw):
        try:
            self._repo.git.stash('pop')
        except GitCommandError:
            pass


[docs]class GitRepoManager(Loggable): """ manage a local git repository """ _repo = Any # root=Directory path = Str selected = Any selected_branch = Str selected_path_commits = List selected_commits = List refresh_commits_table_needed = Event path_dirty = Event remote = Str def set_name(self, p): self.name = '{}<GitRepo>'.format(os.path.basename(p))
[docs] def open_repo(self, name, root=None): """ name: name of repo root: root directory to create new repo """ if root is None: p = name else: p = os.path.join(root, name) self.path = p self.set_name(p) if os.path.isdir(p): self.init_repo(p) return True else: os.mkdir(p) repo = Repo.init(p) self.debug('created new repo {}'.format(p)) self._repo = repo return False
[docs] def init_repo(self, path): """ path: absolute path to repo return True if git repo exists """ if os.path.isdir(path): g = os.path.join(path, '.git') if os.path.isdir(g): self._repo = Repo(path) self.set_name(path) return True else: self.debug('{} is not a valid repo. Initializing now'.format(path)) self._repo = Repo.init(path) self.set_name(path)
def delete_local_commits(self, remote='origin', branch=None): if branch is None: branch = self._repo.active_branch.name self._repo.git.reset('--hard', '{}/{}'.format(remote, branch)) def delete_commits(self, hexsha, remote='origin', branch=None): if branch is None: branch = self._repo.active_branch.name self._repo.git.reset('--hard', hexsha) self._repo.git.push(remote, branch, '--force') def add_paths(self, apaths): if not isinstance(apaths, (list, tuple)): apaths = (apaths,) changes = self.get_local_changes(change_type=('A', 'R', 'M')) changes = [os.path.join(self.path, c) for c in changes] if changes: self.debug('-------- local changes ---------') for c in changes: self.debug(c) deletes = self.get_local_changes(change_type=('D',)) if deletes: self.debug('-------- deletes ---------') for c in deletes: self.debug(c) untracked = self.untracked_files() if untracked: self.debug('-------- untracked paths --------') for t in untracked: self.debug(t) changes.extend(untracked) self.debug('add paths {}'.format(apaths)) ps = [p for p in apaths if p in changes] self.debug('changed paths {}'.format(ps)) changed = bool(ps) if ps: for p in ps: self.debug('adding to index: {}'.format(os.path.relpath(p, self.path))) self.index.add(ps) ps = [p for p in apaths if p in deletes] self.debug('delete paths {}'.format(ps)) delete_changed = bool(ps) if ps: for p in ps: self.debug('removing from index: {}'.format(os.path.relpath(p, self.path))) self.index.remove(ps, working_tree=True) return changed or delete_changed def add_ignore(self, *args): ignores = [] p = os.path.join(self.path, '.gitignore') if os.path.isfile(p): with open(p, 'r') as rfile: ignores = [line.strip() for line in rfile] args = [a for a in args if a not in ignores] if args: with open(p, 'a') as afile: for a in args: afile.write('{}\n'.format(a)) self.add(p, commit=False)
[docs] def get_modification_date(self, path): """ "Fri May 18 11:13:57 2018 -0600" :param path: :return: """ d = self.cmd('log', '-1', '--format="%ad"', '--date=format:{}'.format(DATE_FORMAT), '--', path) if d: d = d[1:-1] return d
def out_of_date(self, branchname=None): repo = self._repo if branchname is None: branchname = repo.active_branch.name pd = open_progress(2) origin = repo.remotes.origin pd.change_message('Fetching {} {}'.format(origin, branchname)) repo.git.fetch(origin, branchname) pd.change_message('Complete') # try: # oref = origin.refs[branchname] # remote_commit = oref.commit # except IndexError: # remote_commit = None # # branch = getattr(repo.heads, branchname) # local_commit = branch.commit local_commit, remote_commit = self._get_local_remote_commit(branchname) self.debug('out of date {} {}'.format(local_commit, remote_commit)) return local_commit != remote_commit def _get_local_remote_commit(self, branchname=None): repo = self._repo origin = repo.remotes.origin try: oref = origin.refs[branchname] remote_commit = oref.commit except IndexError: remote_commit = None if branchname is None: branch = repo.active_branch.name else: try: branch = repo.heads[branchname] except AttributeError: return None, None local_commit = branch.commit return local_commit, remote_commit @classmethod def clone_from(cls, url, path): repo = cls() repo.clone(url, path) return repo # # progress = open_progress(100) # # # # def func(op_code, cur_count, max_count=None, message=''): # # if max_count: # # progress.max = int(max_count) + 2 # # if message: # # message = 'Cloning repository {} -- {}'.format(url, message[2:]) # # progress.change_message(message, auto_increment=False) # # progress.update(int(cur_count)) # # # # if op_code == 66: # # progress.close() # # rprogress = CallableRemoteProgress(func) # rprogress = None # try: # Repo.clone_from(url, path, progress=rprogress) # except GitCommandError as e: # print(e) # shutil.rmtree(path) # # def foo(): # # try: # # Repo.clone_from(url, path, progress=rprogress) # # except GitCommandError: # # shutil.rmtree(path) # # # # evt.set() # # # t = Thread(target=foo) # # t.start() # # period = 0.1 # # while not evt.is_set(): # # st = time.time() # # # v = prog.get_value() # # # if v == n - 2: # # # prog.increase_max(50) # # # n += 50 # # # # # # prog.increment() # # time.sleep(max(0, period - time.time() + st)) # # prog.close() def clone(self, url, path): try: self._repo = Repo.clone_from(url, path) except GitCommandError as e: self.warning_dialog('Cloning error: {}, url={}, path={}'.format(e, url, path))
[docs] def unpack_blob(self, hexsha, p): """ p: str. should be absolute path """ repo = self._repo tree = repo.commit(hexsha).tree # blob = next((bi for ti in tree.trees # for bi in ti.blobs # if bi.abspath == p), None) blob = None for ts in ((tree,), tree.trees): for ti in ts: for bi in ti.blobs: # print bi.abspath, p if bi.abspath == p: blob = bi break else: print('failed unpacking', p) return blob.data_stream.read() if blob else ''
def shell(self, cmd, *args): repo = self._repo func = getattr(repo.git, cmd) return func(*args) def truncate_repo(self, date='1 month'): repo = self._repo name = os.path.basename(self.path) backup = '.{}'.format(name) repo.git.clone('--mirror', ''.format(name), './{}'.format(backup)) logs = repo.git.log('--pretty=%H', '-after "{}"'.format(date)) logs = reversed(logs.split('\n')) sha = next(logs) gpath = os.path.join(self.path, '.git', 'info', 'grafts') with open(gpath, 'w') as wfile: wfile.write(sha) repo.git.filter_branch('--tag-name-filter', 'cat', '--', '--all') repo.git.gc('--prune=now') def commits_iter(self, p, keys=None, limit='-'): repo = self._repo p = os.path.join(repo.working_tree_dir, p) p = p.replace(' ', '\ ') hx = repo.git.log('--pretty=%H', '--follow', '-{}'.format(limit), '--', p).split('\n') def func(hi): commit = repo.rev_parse(hi) r = [hi, ] if keys: r.extend([getattr(commit, ki) for ki in keys]) return r return (func(ci) for ci in hx) def diff(self, a, b): repo = self._repo return repo.git.diff(a, b, ) def status(self): return self._git_command(self._repo.git.status, 'status') def report_local_changes(self): self.debug('Local Changes to {}'.format(self.path)) for p in self.get_local_changes(): self.debug('\t{}'.format(p)) def commit_dialog(self): from pychron.git_archive.commit_dialog import CommitDialog ps = self.get_local_changes() cd = CommitDialog(ps) info = cd.edit_traits() if info.result: index = self.index index.add([mp.path for mp in cd.valid_paths()]) self.commit(cd.commit_message) return True def get_local_changes(self, change_type=('M',)): repo = self._repo diff = repo.index.diff(None) return [di.a_blob.abspath for change_type in change_type for di in diff.iter_change_type(change_type)] # diff_str = repo.git.diff('HEAD', '--full-index') # diff_str = StringIO(diff_str) # diff_str.seek(0) # # class ProcessWrapper: # stderr = None # stdout = None # # def __init__(self, f): # self._f = f # # def wait(self, *args, **kw): # pass # # def read(self): # return self._f.read() # # proc = ProcessWrapper(diff_str) # # diff = Diff._index_from_patch_format(repo, proc) # root = self.path # # # # for diff_added in hcommit.diff('HEAD~1').iter_change_type('A'): # print(diff_added) # diff = hcommit.diff() # diff = repo.index.diff(repo.head.commit) # return [os.path.relpath(di.a_blob.abspath, root) for di in diff.iter_change_type('M')] # patches = map(str.strip, diff_str.split('diff --git')) # patches = ['\n'.join(p.split('\n')[2:]) for p in patches[1:]] # # diff_str = StringIO(diff_str) # diff_str.seek(0) # index = Diff._index_from_patch_format(repo, diff_str) # # return index, patches # def get_head_object(self): return get_head_commit(self._repo) def get_head(self, commit=True, hexsha=True): head = self._repo if commit: head = head.commit() if hexsha: head = head.hexsha return head # return self._repo.head.commit.hexsha def cmd(self, cmd, *args): return getattr(self._repo.git, cmd)(*args) def is_dirty(self): return self._repo.is_dirty() def untracked_files(self): lines = self._repo.git.status(porcelain=True, untracked_files=True) # Untracked files preffix in porcelain mode prefix = "?? " untracked_files = list() iswindows = sys.platform == 'win32' for line in lines.split('\n'): if not line.startswith(prefix): continue filename = line[len(prefix):].rstrip('\n') # Special characters are escaped if filename[0] == filename[-1] == '"': filename = filename[1:-1].decode('string_escape') if iswindows: filename = filename.replace('/', '\\') untracked_files.append(os.path.join(self.path, filename)) # finalize_process(proc) return untracked_files def has_staged(self): return self._repo.git.diff('HEAD', '--name-only') # return self._repo.is_dirty() def has_unpushed_commits(self, remote='origin', branch='master'): # return self._repo.git.log('--not', '--remotes', '--oneline') return self._repo.git.log('{}/{}..HEAD'.format(remote, branch), '--oneline') def add_unstaged(self, root=None, add_all=False, extension=None, use_diff=False): if root is None: root = self.path index = self.index def func(ps, extension): if extension: if not isinstance(extension, tuple): extension = (extension,) ps = [pp for pp in ps if os.path.splitext(pp)[1] in extension] if ps: self.debug('adding to index {}'.format(ps)) index.add(ps) if use_diff: pass # try: # ps = [diff.a_blob.path for diff in index.diff(None)] # func(ps, extension) # except IOError,e: # print 'exception', e elif add_all: self._repo.git.add('.') else: for r, ds, fs in os.walk(root): ds[:] = [d for d in ds if d[0] != '.'] ps = [os.path.join(r, fi) for fi in fs] func(ps, extension) def update_gitignore(self, *args): p = os.path.join(self.path, '.gitignore') # mode = 'a' if os.path.isfile(p) else 'w' args = list(args) if os.path.isfile(p): with open(p, 'r') as rfile: for line in fileiter(rfile, strip=True): for i, ai in enumerate(args): if line == ai: args.pop(i) if args: with open(p, 'a') as wfile: for ai in args: wfile.write('{}\n'.format(ai)) self._add_to_repo(p, msg='updated .gitignore') def get_commit(self, hexsha): repo = self._repo return repo.commit(hexsha) def tag_branch(self, tagname): repo = self._repo repo.create_tag(tagname) def get_current_branch(self): repo = self._repo return repo.active_branch.name def checkout_branch(self, name, inform=True): repo = self._repo branch = getattr(repo.heads, name) try: branch.checkout() self.selected_branch = name self._load_branch_history() if inform: self.information_dialog('Repository now on branch "{}"'.format(name)) except BaseException as e: self.warning_dialog('There was an issue trying to checkout branch "{}"'.format(name)) raise e def delete_branch(self, name): self._repo.delete_head(name) def get_branch(self, name): return getattr(self._repo.heads, name) def create_branch(self, name=None, commit='HEAD', inform=True): repo = self._repo if name is None: nb = NewBranchView(branches=repo.branches) info = nb.edit_traits() if info.result: name = nb.name else: return if name not in repo.branches: branch = repo.create_head(name, commit=commit) branch.checkout() if inform: self.information_dialog('Repository now on branch "{}"'.format(name)) return name def create_remote(self, url, name='origin', force=False): repo = self._repo if repo: self.debug('setting remote {} {}'.format(name, url)) # only create remote if doesnt exist if not hasattr(repo.remotes, name): self.debug('create remote {}'.format(name, url)) repo.create_remote(name, url) elif force: repo.delete_remote(name) repo.create_remote(name, url) def delete_remote(self, name='origin'): repo = self._repo if repo: if hasattr(repo.remotes, name): repo.delete_remote(name) def get_branch_names(self): return [b.name for b in self._repo.branches]
[docs] def pull(self, branch='master', remote='origin', handled=True, use_progress=True): """ fetch and merge """ self.debug('pulling {} from {}'.format(branch, remote)) repo = self._repo try: remote = self._get_remote(remote) except AttributeError as e: print('repo man pull', e) return if remote: self.debug('pulling from url: {}'.format(remote.url)) if use_progress: prog = open_progress(3, show_percent=False, title='Pull Repository {}'.format(self.name), close_at_end=False) prog.change_message('Fetching branch:"{}" from "{}"'.format(branch, remote)) try: self.fetch(remote) except GitCommandError as e: self.debug(e) if not handled: raise e self.debug('fetch complete') # if use_progress: # for i in range(100): # prog.change_message('Merging {}'.format(i)) # time.sleep(1) try: repo.git.merge('FETCH_HEAD') except GitCommandError: self.smart_pull(branch=branch, remote=remote) # self._git_command(lambda: repo.git.merge('FETCH_HEAD'), 'merge') if use_progress: prog.close() self.debug('pull complete')
def has_remote(self, remote='origin'): return bool(self._get_remote(remote)) def push(self, branch='master', remote=None, inform=False): if remote is None: remote = 'origin' repo = self._repo rr = self._get_remote(remote) if rr: self._git_command(lambda: repo.git.push(remote, branch), tag='GitRepoManager.push') if inform: self.information_dialog('{} push complete'.format(self.name)) else: self.warning('No remote called "{}"'.format(remote)) def _git_command(self, func, tag): try: return func() except GitCommandError as e: self.warning('Git command failed. {}, {}'.format(e, tag)) def rebase(self, onto_branch='master'): if self._repo: repo = self._repo branch = self.get_current_branch() self.checkout_branch(onto_branch) self.pull() repo.git.rebase(onto_branch, branch) def smart_pull(self, branch='master', remote='origin', quiet=True, accept_our=False, accept_their=False): try: ahead, behind = self.ahead_behind(remote) except GitCommandError as e: self.debug('Smart pull error: {}'.format(e)) return self.debug('Smart pull ahead: {} behind: {}'.format(ahead, behind)) repo = self._repo if behind: if ahead: if not quiet: if not self.confirmation_dialog('You are {} behind and {} commits ahead. ' 'There are potential conflicts that you will have to resolve.' 'Would you like to Continue?'.format(behind, ahead)): return # potentially conflicts with StashCTX(repo): # do merge try: repo.git.rebase('--preserve-merges', '{}/{}'.format(remote, branch)) except GitCommandError: if self.confirmation_dialog('There appears to be a problem with {}.' '\n\nWould you like to accept the master copy'.format(self.name)): try: repo.git.rebase('--abort') except GitCommandError: pass repo.git.pull('-X', 'theirs', '--commit', '--no-edit') return True else: return # self._git_command(lambda: repo.git.rebase('--preserve-merges', # '{}/{}'.format(remote, branch)), # 'GitRepoManager.smart_pull/ahead') # try: # repo.git.merge('FETCH_HEAD') # except BaseException: # pass # get conflicted files out, err = grep('<<<<<<<', self.path) conflict_paths = [os.path.relpath(x, self.path) for x in out.splitlines()] self.debug('conflict_paths: {}'.format(conflict_paths)) if conflict_paths: mm = MergeModel(conflict_paths, branch=branch, remote=remote, repo=self) if accept_our: mm.accept_our() elif accept_their: mm.accept_their() else: mv = MergeView(model=mm) mv.edit_traits() else: if not quiet: self.information_dialog('There were no conflicts identified') else: self.debug('merging {} commits'.format(behind)) self._git_command(lambda: repo.git.merge('FETCH_HEAD'), 'GitRepoManager.smart_pull/!ahead') # repo.git.merge('FETCH_HEAD') else: self.debug('Up-to-date with {}'.format(remote)) if not quiet: self.information_dialog('Up-to-date with {}'.format(remote)) return True def fetch(self, remote='origin'): if self._repo: return self._git_command(lambda: self._repo.git.fetch(remote), 'GitRepoManager.fetch') # return self._repo.git.fetch(remote) def ahead_behind(self, remote='origin'): self.debug('ahead behind') repo = self._repo ahead, behind = ahead_behind(repo, remote) return ahead, behind def merge(self, src, dest): repo = self._repo dest = getattr(repo.branches, dest) dest.checkout() src = getattr(repo.branches, src) # repo.git.merge(src.commit) self._git_command(lambda: repo.git.merge(src.commit), 'GitRepoManager.merge') def commit(self, msg): self.debug('commit message={}'.format(msg)) index = self.index if index: index.commit(msg) def add(self, p, msg=None, msg_prefix=None, verbose=True, **kw): repo = self._repo # try: # n = len(repo.untracked_files) # except IOError: # n = 0 # try: # if not repo.is_dirty() and not n: # return # except OSError: # pass bp = os.path.basename(p) dest = os.path.join(repo.working_dir, p) dest_exists = os.path.isfile(dest) if msg_prefix is None: msg_prefix = 'modified' if dest_exists else 'added' if not dest_exists: self.debug('copying to destination.{}>>{}'.format(p, dest)) shutil.copyfile(p, dest) if msg is None: msg = '{}'.format(bp) msg = '{} - {}'.format(msg_prefix, msg) if verbose: self.debug('add to repo msg={}'.format(msg)) self._add_to_repo(dest, msg, **kw) def get_log(self, branch, *args): if branch is None: branch = self._repo.active_branch # repo = self._repo # l = repo.active_branch.log(*args) l = self.cmd('log', branch, '--oneline', *args) return l.split('\n') def get_commits_from_log(self, greps=None): repo = self._repo args = [repo.active_branch.name, '--remove-empty', '--simplify-merges'] if greps: greps = '\|'.join(greps) args.append('--grep=^{}'.format(greps)) args.append('--pretty=%H|%cn|%ce|%ct|%s') txt = self.cmd('log', *args) cs = [] if txt: cs = [from_gitlog(l.strip()) for l in txt.split('\n')] return cs def get_active_branch(self): return self._repo.active_branch.name def get_sha(self, path=None): sha = '' if path: l = self.cmd('ls-tree', 'HEAD', path) try: mode, kind, sha_name = l.split(' ') sha, name = sha_name.split('\t') except ValueError: pass return sha def add_tag(self, name, message, hexsha=None): args = ('-a', name, '-m', message) if hexsha: args = args + (hexsha,) self.cmd('tag', *args) # action handlers def diff_selected(self): if self._validate_diff(): if len(self.selected_commits) == 2: l, r = self.selected_commits dv = self._diff_view_factory(l, r) open_view(dv) def revert_to_selected(self): # check for uncommitted changes # warn user the uncommitted changes will be lost if revert now commit = self.selected_commits[-1] self.revert(commit.hexsha, self.selected) def revert(self, hexsha, path): self._repo.git.checkout(hexsha, path) self.path_dirty = path self._set_active_commit() def load_file_history(self, p): repo = self._repo try: hexshas = repo.git.log('--pretty=%H', '--follow', '--', p).split('\n') self.selected_path_commits = self._parse_commits(hexshas, p) self._set_active_commit() except GitCommandError: self.selected_path_commits = [] # private def _validate_diff(self): return True def _diff_view_factory(self, a, b): # d = self.diff(a.hexsha, b.hexsha) if not a.blob: a.blob = self.unpack_blob(a.hexsha, a.name) if not b.blob: b.blob = self.unpack_blob(b.hexsha, b.name) model = DiffModel(left_text=b.blob, right_text=a.blob) dv = DiffView(model=model) return dv def _add_to_repo(self, p, msg, commit=True): index = self.index if index: if not isinstance(p, list): p = [p] try: index.add(p) except IOError as e: self.warning('Failed to add file. Error:"{}"'.format(e)) # an IOError has been caused in the past by "'...index.lock' could not be obtained" os.remove(os.path.join(self.path, '.git', 'index.lock')) try: self.warning('Retry after "Failed to add file"'.format(e)) index.add(p) except IOError as e: self.warning('Retry failed. Error:"{}"'.format(e)) return if commit: index.commit(msg) def _get_remote(self, remote): repo = self._repo try: return getattr(repo.remotes, remote) except AttributeError: pass def _get_branch_history(self): repo = self._repo hexshas = repo.git.log('--pretty=%H').split('\n') return hexshas def _load_branch_history(self): hexshas = self._get_branch_history() self.commits = self._parse_commits(hexshas) def _parse_commits(self, hexshas, p=''): def factory(ci): repo = self._repo obj = repo.rev_parse(ci) cx = GitSha(message=obj.message, hexsha=obj.hexsha, name=p, date=obj.committed_datetime) # date=format_date(obj.committed_date)) return cx return [factory(ci) for ci in hexshas] def _set_active_commit(self): p = self.selected with open(p, 'r') as rfile: chexsha = hashlib.sha1(rfile.read()).hexdigest() for c in self.selected_path_commits: blob = self.unpack_blob(c.hexsha, p) c.active = chexsha == hashlib.sha1(blob).hexdigest() if blob else False self.refresh_commits_table_needed = True # handlers def _selected_fired(self, new): if new: self._selected_hook(new) self.load_file_history(new) def _selected_hook(self, new): pass def _remote_changed(self, new): if new: self.delete_remote() r = 'https://github.com/{}'.format(new) self.create_remote(r) @property def index(self): return self._repo.index @property def active_repo(self): return self._repo
if __name__ == '__main__': repo = GitRepoManager() repo.open_repo('/Users/ross/Sandbox/mergetest/blocal') repo.smart_pull() # rp = GitRepoManager() # rp.init_repo('/Users/ross/Pychrondata_dev/scripts') # rp.commit_dialog() # ============= EOF ============================================= # repo manager protocol # def get_local_changes(self, repo=None): # repo = self._get_repo(repo) # diff_str = repo.git.diff('--full-index') # patches = map(str.strip, diff_str.split('diff --git')) # patches = ['\n'.join(p.split('\n')[2:]) for p in patches[1:]] # # diff_str = StringIO(diff_str) # diff_str.seek(0) # index = Diff._index_from_patch_format(repo, diff_str) # # return index, patches # def is_dirty(self, repo=None): # repo = self._get_repo(repo) # return repo.is_dirty() # def get_untracked(self): # return self._repo.untracked_files # def _add_repo(self, root): # existed=True # if not os.path.isdir(root): # os.mkdir(root) # existed=False # # gitdir=os.path.join(root, '.git') # if not os.path.isdir(gitdir): # repo = Repo.init(root) # existed = False # else: # repo = Repo(root) # # return repo, existed # def add_repo(self, localpath): # """ # add a blank repo at ``localpath`` # """ # repo, existed=self._add_repo(localpath) # self._repo=repo # self.root=localpath # return existed