Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 127 additions & 81 deletions scripts/bootstrap_repositories.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,19 @@
import yaml
from fastapi_sqla import open_async_session
from sqlalchemy import update
from syncer import sync

from alws import models
from alws.crud import platform as pl_crud
from alws.crud import repository as repo_crud
from alws.dependencies import get_async_db_key
from alws.schemas import platform_schema, remote_schema, repository_schema
from alws.utils import pulp_client as pulp_client_module
from alws.utils.fastapi_sqla_setup import setup_all
from alws.utils.pulp_client import PulpClient
from scripts.bootstrap_permissions import ensure_system_user_exists

REPO_CACHE = {}
BOOTSTRAP_CONCURRENCY = int(os.environ.get("BOOTSTRAP_CONCURRENCY", "10"))


def parse_args():
Expand Down Expand Up @@ -234,6 +235,101 @@ async def add_repositories_to_platform(
)


async def populate_repo_cache(
pulp_client: PulpClient, repositories_data: list
):
async def fetch(repo_info):
repo_name = f'{repo_info["name"]}-{repo_info["arch"]}'
repo = await pulp_client.get_rpm_repository(repo_name)
if not repo:
return
distro = await pulp_client.get_rpm_distro(repo_name)
if distro:
REPO_CACHE[repo_name] = (distro["base_url"], repo["pulp_href"])

sem = asyncio.Semaphore(BOOTSTRAP_CONCURRENCY)

async def bounded(repo_info):
async with sem:
await fetch(repo_info)

await asyncio.gather(*(bounded(r) for r in repositories_data))


async def process_repository(
pulp_client: PulpClient,
repo_info: dict,
no_remotes: bool,
no_sync: bool,
logger: logging.Logger,
):
"""Process a single repo: create repo + remote, return (id, sync_info)."""
logger.info("Creating repository from the following data: %s", str(repo_info))
repo_name = f'{repo_info["name"]}-{repo_info["arch"]}'
is_production = repo_info.get("production", False)
repo_sync_policy = repo_info.pop("repository_sync_policy", None)
remote_sync_policy = repo_info.pop("remote_sync_policy", None)

repository = await get_repository(
pulp_client, repo_info, repo_name, is_production, logger
)
logger.debug("Repository instance: %s", repository)

if no_remotes:
logger.warning("Not creating a remote for repository %s", repository)
return repository.id, None
if not is_production:
logger.info(
"Repository %s is not marked as production and "
"does not need remote setup",
repository,
)
return repository.id, None

remote = await get_remote(repo_info, remote_sync_policy)
pulp_remote = await pulp_client.get_rpm_remote(
f'{repo_info["name"]}-{repo_info["arch"]}-{remote_sync_policy}',
)
if pulp_remote['pulp_href'] != remote.pulp_href:
remote = await update_remote(
remote_id=remote.id,
remote_data={
'name': remote.name,
'pulp_href': pulp_remote['pulp_href'],
'arch': remote.arch,
'url': remote.url,
},
)
if no_sync:
logger.info("Synchronization from remote is disabled, skipping")
return repository.id, None

logger.info("Appending %s to sync from %s...", repository, remote)
return repository.id, {
'repo_href': repository.pulp_href,
'remote_href': remote.pulp_href,
'sync_policy': repo_sync_policy,
}


async def process_repositories(
pulp_client: PulpClient,
repositories_data: list,
no_remotes: bool,
no_sync: bool,
logger: logging.Logger,
):
sem = asyncio.Semaphore(BOOTSTRAP_CONCURRENCY)

async def bounded(repo_info):
async with sem:
return await process_repository(
pulp_client, repo_info, no_remotes, no_sync, logger
)

return await asyncio.gather(*(bounded(r) for r in repositories_data))


async def sync_repositories(repo_sync_list: list, pulp_client: PulpClient):
sync_tasks = []
publish_tasks = []
Expand Down Expand Up @@ -271,7 +367,7 @@ async def publish_repo(repo_):
await asyncio.gather(*publish_tasks)


def main():
async def main_async():
pulp_host = os.environ["PULP_HOST"]
pulp_user = os.environ["PULP_USER"]
pulp_password = os.environ["PULP_PASSWORD"]
Expand All @@ -286,19 +382,22 @@ def main():
loader = yaml.Loader(f)
platforms_data = loader.get_data()

# Rebind PULP_SEMAPHORE to the current event loop.
pulp_client_module.PULP_SEMAPHORE = asyncio.Semaphore(BOOTSTRAP_CONCURRENCY)

pulp_client = PulpClient(pulp_host, pulp_user, pulp_password)

sync(setup_all())
await setup_all()

for platform_data in platforms_data:
if args.only_update:
sync(update_platform(platform_data))
await update_platform(platform_data)
platform_name = platform_data.get("name")
logger.info(
"Updating %s platform data is completed",
platform_name,
)
db_repos = sync(get_repositories_for_update(platform_name))
db_repos = await get_repositories_for_update(platform_name)
repos_to_update = {}
for repo in platform_data.get("repositories", []):
for db_repo in db_repos:
Expand All @@ -316,7 +415,7 @@ def main():
platform_name,
)
for repo_id, repo_data in repos_to_update.items():
sync(update_repository(repo_id, repo_data))
await update_repository(repo_id, repo_data)
logger.info(
"Updating repository data for platform %s is completed",
platform_name,
Expand All @@ -328,85 +427,32 @@ def main():
repository_ids = []
repositories_data = platform_data.pop("repositories", [])

# populate repos cache
# populate repos cache (parallel, bounded by BOOTSTRAP_CONCURRENCY)
logger.info('Making repository data cache')
for repo_info in repositories_data:
repo_name = f'{repo_info["name"]}-{repo_info["arch"]}'
distro = None
repo = sync(pulp_client.get_rpm_repository(repo_name))
if repo:
distro = sync(pulp_client.get_rpm_distro(repo_name))
if repo and distro:
REPO_CACHE[repo_name] = (
distro["base_url"],
repo["pulp_href"],
)

await populate_repo_cache(pulp_client, repositories_data)

# process repos in parallel (bounded by BOOTSTRAP_CONCURRENCY)
results = await process_repositories(
pulp_client,
repositories_data,
args.no_remotes,
args.no_sync,
logger,
)
repos_to_sync = []
for repo_info in repositories_data:
logger.info(
"Creating repository from the following data: %s",
str(repo_info),
)
# If repository is not marked as production, do not remove `url` field
repo_name = f'{repo_info["name"]}-{repo_info["arch"]}'
is_production = repo_info.get("production", False)
repo_sync_policy = repo_info.pop("repository_sync_policy", None)
remote_sync_policy = repo_info.pop("remote_sync_policy", None)
repository = sync(
get_repository(
pulp_client, repo_info, repo_name, is_production, logger
)
)
repository_ids.append(repository.id)

logger.debug("Repository instance: %s", repository)
if args.no_remotes:
logger.warning(
"Not creating a remote for repository %s", repository
)
continue
if not is_production:
logger.info(
"Repository %s is not marked as production and "
"does not need remote setup",
repository,
)
continue

remote = sync(get_remote(repo_info, remote_sync_policy))
pulp_remote = sync(
pulp_client.get_rpm_remote(
f'{repo_info["name"]}-{repo_info["arch"]}-{remote_sync_policy}',
)
)
if pulp_remote['pulp_href'] != remote.pulp_href:
remote = sync(
update_remote(
remote_id=remote.id,
remote_data={
'name': remote.name,
'pulp_href': pulp_remote['pulp_href'],
'arch': remote.arch,
'url': remote.url,
},
)
)
if args.no_sync:
logger.info("Synchronization from remote is disabled, skipping")
continue
logger.info("Appending %s to sync from %s...", repository, remote)
repo_sync_info = {
'repo_href': repository.pulp_href,
'remote_href': remote.pulp_href,
'sync_policy': repo_sync_policy,
}
repos_to_sync.append(repo_sync_info)
for repo_id, sync_info in results:
repository_ids.append(repo_id)
if sync_info is not None:
repos_to_sync.append(sync_info)
if repos_to_sync and not args.no_sync:
sync(sync_repositories(repos_to_sync, pulp_client))
await sync_repositories(repos_to_sync, pulp_client)
return
sync(add_repositories_to_platform(platform_data, repository_ids))
sync(add_owner_id())
await add_repositories_to_platform(platform_data, repository_ids)
await add_owner_id()


def main():
asyncio.run(main_async())


if __name__ == "__main__":
Expand Down
Loading