git_deploy: improved moved origin handling

Query the detected url and make sure it redirects to the
expected url.  This closes a security vulnerability
where a remote could be changed to an arbitrary repo's url.

The `moved_origin` option is no longer necessary, however it
is currently used as an additional check.  In the future it will be
deprecated.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-07-29 07:22:35 -04:00
parent 4b25b04c4f
commit a776f1f6dc
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
1 changed files with 51 additions and 15 deletions

View File

@ -26,6 +26,7 @@ if TYPE_CHECKING:
from confighelper import ConfigHelper
from components import shell_command
from .update_manager import CommandHelper
from ..http_client import HttpClient
class GitDeploy(AppDeploy):
def __init__(self, config: ConfigHelper, cmd_helper: CommandHelper) -> None:
@ -331,21 +332,8 @@ class GitRepo:
# Fetch the upstream url. If the repo has been moved,
# set the new url
self.upstream_url = await self.remote(f"get-url {self.git_remote}")
if self.moved_origin_url is not None:
origin = self.upstream_url.lower().strip()
if not origin.endswith(".git"):
origin += ".git"
moved_origin = self.moved_origin_url.lower().strip()
if not moved_origin.endswith(".git"):
moved_origin += ".git"
if origin == moved_origin:
logging.info(
f"Git Repo {self.alias}: Moved Repo Detected, Moving "
f"from {self.upstream_url} to {self.origin_url}")
need_fetch = True
await self.remote(
f"set-url {self.git_remote} {self.origin_url}")
self.upstream_url = self.origin_url
if await self._check_moved_origin():
need_fetch = True
if need_fetch:
await self.fetch()
@ -409,6 +397,54 @@ class GitRepo:
self.init_evt.set()
self.init_evt = None
async def _check_moved_origin(self) -> bool:
detected_origin = self.upstream_url.lower().strip()
if not detected_origin.endswith(".git"):
detected_origin += ".git"
if (
self.cmd_helper.is_debug_enabled() or
not detected_origin.startswith("http") or
detected_origin == self.origin_url.lower()
):
# Skip the moved origin check if:
# Repo Debug is enabled
# The detected origin url is not http(s)
# The detected origin matches the expected origin url
return False
moved = False
client: HttpClient = self.server.lookup_component("http_client")
check_url = detected_origin[:-4]
logging.info(
f"Git repo {self.alias}: Performing moved origin check - {check_url}"
)
resp = await client.get(check_url, enable_cache=False)
if not resp.has_error():
final_url = resp.final_url.lower()
if not final_url.endswith(".git"):
final_url += ".git"
logging.debug(f"Git repo {self.alias}: Resolved url - {final_url}")
if final_url == self.origin_url.lower():
logging.info(
f"Git Repo {self.alias}: Moved Repo Detected, Moving "
f"from {self.upstream_url} to {self.origin_url}")
moved = True
await self.remote(
f"set-url {self.git_remote} {self.origin_url}")
self.upstream_url = self.origin_url
if self.moved_origin_url is not None:
moved_origin = self.moved_origin_url.lower().strip()
if not moved_origin.endswith(".git"):
moved_origin += ".git"
if moved_origin != detected_origin:
self.server.add_warning(
f"Git Repo {self.alias}: Origin URL does not "
"not match configured 'moved_origin'option. "
f"Expected: {detected_origin}"
)
else:
logging.debug(f"Move Request Failed: {resp.error}")
return moved
async def _get_dev_versions(self, current_version: str) -> None:
self.upstream_commit = await self.rev_parse(
f"{self.git_remote}/{self.git_branch}")