diff --git a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala index 423a5b4a584..e05be601e39 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala @@ -25,13 +25,7 @@ import scala.collection.mutable.Map import scala.jdk.CollectionConverters._ import scala.sys.process._ import java.util.Comparator -import org.apache.texera.common.config.PythonUtils -import org.apache.texera.dao.SqlServer -import com.typesafe.scalalogging.LazyLogging -import org.apache.commons.lang3.SystemUtils -import org.apache.texera.dao.jooq.generated.tables.daos.VirtualEnvironmentsDao -import org.apache.texera.dao.jooq.generated.tables.pojos.VirtualEnvironments -import org.jooq.JSONB +import org.apache.texera.amber.config.PythonUtils /** * PveManager is responsible for managing Python Virtual Environments (PVEs) @@ -46,22 +40,17 @@ import org.jooq.JSONB * /tmp/texera-pve/venvs/{cuid}/{pveName}/ */ -object PveManager extends LazyLogging { +object PveManager { case class PvePackageResponse( pveName: String, userPackages: Seq[String] ) - case class StoredPve(veid: Int, name: String, packagesJson: String) - private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs") private val SafePveName = "^[A-Za-z0-9._-]+$".r - def isValidPveName(name: String): Boolean = - name != null && name.length <= 128 && SafePveName.pattern.matcher(name).matches() - private def cuidDir(cuid: Int, pveName: String): Path = { VenvRoot.resolve(cuid.toString).resolve(pveName) } @@ -69,16 +58,8 @@ object PveManager extends LazyLogging { private def pveDir(cuid: Int, pveName: String): Path = cuidDir(cuid, pveName).resolve("pve") - // Resolves the Python interpreter inside a venv. POSIX puts it at - // `/bin/python`; Windows puts it at `/Scripts/python.exe`. - private def venvPython(venvDir: Path): Path = - if (SystemUtils.IS_OS_WINDOWS) - venvDir.resolve("Scripts").resolve("python.exe") - else - venvDir.resolve("bin").resolve("python") - private def pythonBinPath(cuid: Int, pveName: String): Path = - venvPython(pveDir(cuid, pveName)) + pveDir(cuid, pveName).resolve("bin").resolve("python") /* * Validates the PVE name and returns the Python binary path if it exists, @@ -113,95 +94,25 @@ object PveManager extends LazyLogging { } } - private def locateRequirementsTxt(): Option[Path] = - Seq(Paths.get("/tmp", "requirements.txt"), Paths.get("amber", "requirements.txt")) - .find(Files.exists(_)) - - // Resolves the fully-pinned system package set by installing requirements.txt - // into a throwaway venv and running `pip freeze`. - private def resolveSystemPackages(): Seq[String] = { - val requirementsPath = locateRequirementsTxt() match { - case Some(p) => p - case None => - logger.error("requirements.txt not found; system package set will be empty") - return Seq.empty - } - - val tempVenv = Files.createTempDirectory("texera-system-venv-") - try { - val python = venvPython(tempVenv).toString - val createCode = - Process(Seq(PythonUtils.getPythonExecutable, "-m", "venv", tempVenv.toString)).! - if (createCode != 0) { - logger.error(s"failed to create temp venv for system-package resolution (exit=$createCode)") - return Seq.empty - } - - val installCode = Process( - Seq( - python, - "-u", - "-m", - "pip", - "install", - "--progress-bar", - "off", - "--no-input", - "-r", - requirementsPath.toString - ), - None, - pipEnv.toSeq: _* - ).! - if (installCode != 0) { - logger.error(s"failed to install requirements into temp venv (exit=$installCode)") - return Seq.empty - } - - val collected = scala.collection.mutable.ListBuffer[String]() - val freezeCode = Process(Seq(python, "-m", "pip", "freeze")).!( - ProcessLogger(line => collected += line, _ => ()) - ) - if (freezeCode != 0) { - logger.error(s"pip freeze failed (exit=$freezeCode)") - return Seq.empty - } - collected.toSeq.map(_.trim).filter(line => line.nonEmpty && !line.startsWith("#")) - } finally { - try { - val stream = Files.walk(tempVenv) - try stream - .sorted(Comparator.reverseOrder()) - .iterator() - .asScala - .foreach(Files.deleteIfExists) - finally stream.close() - } catch { - case _: Throwable => () - } - } + private def getSystemPath(isLocal: Boolean): Path = { + val deployPath = Paths.get("/tmp/system-requirements-lock.txt") + if (Files.exists(deployPath)) deployPath + else Paths.get("amber/system-requirements-lock.txt") } - // Cached for the JVM lifetime. The system Python + requirements.txt don't - // change without an app restart, so resolving once is sufficient. - private lazy val systemPackages: Seq[String] = resolveSystemPackages() - - // Normalised package names ("numpy", "pandas") — used to reject user - // attempts to install or delete system packages. - private lazy val systemPackageNames: Set[String] = - systemPackages.map(_.split("==")(0).trim.toLowerCase).toSet - - // Materialised once: a file containing the frozen system requirements, - // passed as `pip install --constraint` so user installs respect system pins. - private lazy val systemConstraintFile: Path = { - val f = Files.createTempFile("texera-system-constraint-", ".txt") - Files.write(f, systemPackages.asJava) - f.toFile.deleteOnExit() - f + def getSystemPackages(isLocal: Boolean): Seq[String] = { + if (!Files.exists(getSystemPath(isLocal))) { + Seq() + } else { + Files + .readAllLines(getSystemPath(isLocal)) + .asScala + .map(_.trim) + .filter(line => line.nonEmpty && !line.startsWith("#")) + .toSeq + } } - def getSystemPackages: Seq[String] = systemPackages - private def runPipInstall( python: String, args: Seq[String], @@ -241,15 +152,20 @@ object PveManager extends LazyLogging { def createNewPve( cuid: Int, queue: BlockingQueue[String], - pveName: String + pveName: String, + isLocal: Boolean ): Unit = { queue.put(s"[PVE] Creating new PVE for cuid: $cuid with name: $pveName") - val requirementsPath = locateRequirementsTxt() match { - case Some(p) => p - case None => - queue.put(s"[PVE][ERR] System requirements not found") - return + val requirementsPath = { + val deployPath = Paths.get("/tmp", "requirements.txt") + if (Files.exists(deployPath)) deployPath + else Paths.get("amber", "requirements.txt") + } + + if (!Files.exists(requirementsPath)) { + queue.put(s"[PVE][ERR] System requirements not found") + return } val venvDirPath = pveDir(cuid, pveName).toAbsolutePath @@ -296,72 +212,6 @@ object PveManager extends LazyLogging { queue.put(s"[PVE] Created new environment for cuid = $cuid") } - // Returns every PVE row belonging to the given user. - def listPvesForUser(uid: Int): List[StoredPve] = { - import org.apache.texera.dao.jooq.generated.Tables.VIRTUAL_ENVIRONMENTS - SqlServer - .getInstance() - .createDSLContext() - .selectFrom(VIRTUAL_ENVIRONMENTS) - .where(VIRTUAL_ENVIRONMENTS.UID.eq(uid)) - .fetchInto(classOf[VirtualEnvironments]) - .asScala - .map { row => - val pkgsJson = Option(row.getPackages).map(_.data).getOrElse("{}") - StoredPve(row.getVeid, row.getName, pkgsJson) - } - .toList - } - - // Deletes a PVE row owned by `uid`. Returns true if a row was deleted, false if no - // matching row was found (either the veid doesn't exist or it belongs to another user). - def deletePveFromDb(veid: Int, uid: Int): Boolean = { - import org.apache.texera.dao.jooq.generated.Tables.VIRTUAL_ENVIRONMENTS - val rows = SqlServer - .getInstance() - .createDSLContext() - .deleteFrom(VIRTUAL_ENVIRONMENTS) - .where( - VIRTUAL_ENVIRONMENTS.VEID - .eq(veid) - .and(VIRTUAL_ENVIRONMENTS.UID.eq(uid)) - ) - .execute() - rows > 0 - } - - // Updates an existing PVE row owned by `uid`. Returns true if a row was - // updated, false if no matching row was found. - def updatePve(veid: Int, uid: Int, name: String, packagesJson: String): Boolean = { - import org.apache.texera.dao.jooq.generated.Tables.VIRTUAL_ENVIRONMENTS - val rows = SqlServer - .getInstance() - .createDSLContext() - .update(VIRTUAL_ENVIRONMENTS) - .set(VIRTUAL_ENVIRONMENTS.NAME, name) - .set(VIRTUAL_ENVIRONMENTS.PACKAGES, JSONB.valueOf(packagesJson)) - .where( - VIRTUAL_ENVIRONMENTS.VEID - .eq(veid) - .and(VIRTUAL_ENVIRONMENTS.UID.eq(uid)) - ) - .execute() - rows > 0 - } - - // Persists a PVE spec (name + packages JSON) for the given user. Returns the new veid. - def savePve(uid: Int, name: String, packagesJson: String): Int = { - val row = new VirtualEnvironments() - row.setUid(uid) - row.setName(name) - row.setPackages(JSONB.valueOf(packagesJson)) - val dao = new VirtualEnvironmentsDao( - SqlServer.getInstance().createDSLContext().configuration - ) - dao.insert(row) - row.getVeid - } - // returns list of PVE names and corresponding user packages for a given CU def getEnvironments(cuid: Int): List[PvePackageResponse] = { @@ -428,7 +278,8 @@ object PveManager extends LazyLogging { packages: List[String], cuid: Int, queue: BlockingQueue[String], - pveName: String + pveName: String, + isLocal: Boolean ): Unit = { val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString @@ -442,6 +293,19 @@ object PveManager extends LazyLogging { var installedPackages = readPackageFile(metadataPath).toSet + val systemPackages = + if (Files.exists(getSystemPath(isLocal))) { + Files + .readAllLines(getSystemPath(isLocal)) + .asScala + .map(_.trim) + .filter(line => line.nonEmpty && !line.startsWith("#")) + .map(line => line.split("==")(0).trim.toLowerCase) + .toSet + } else { + Set[String]() + } + packages.foreach { pkg => val trimmedPkg = pkg.trim @@ -449,7 +313,7 @@ object PveManager extends LazyLogging { val userPackageName = trimmedPkg.split("==")(0).trim.toLowerCase - if (systemPackageNames.contains(userPackageName)) { + if (systemPackages.contains(userPackageName)) { queue.put( s"[PVE][ERR] $trimmedPkg is a system package and cannot be installed or modified by the user." ) @@ -461,8 +325,8 @@ object PveManager extends LazyLogging { val code = runPipInstall( python, Seq( - "--constraint", // pin to the runtime-resolved system set - systemConstraintFile.toString, + "--constraint", // check against system-requirements-lock + getSystemPath(isLocal).toString, trimmedPkg ), queue @@ -500,21 +364,35 @@ object PveManager extends LazyLogging { def deletePackages( cuid: Int, packageName: String, - pveName: String + pveName: String, + isLocal: Boolean ): List[String] = { val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString val metadataPath = cuidDir(cuid, pveName).resolve("user-packages.txt") if (!Files.exists(Paths.get(python))) { val msg = s"[PVE][ERR] Python executable not found for PVE: $python" - logger.error(msg) + println(msg) return List(msg) } val trimmedPackageName = packageName.trim val normalizedPackageName = trimmedPackageName.split("==")(0).trim.toLowerCase - if (systemPackageNames.contains(normalizedPackageName)) { + val systemPackages = + if (Files.exists(getSystemPath(isLocal))) { + Files + .readAllLines(getSystemPath(isLocal)) + .asScala + .map(_.trim) + .filter(line => line.nonEmpty && !line.startsWith("#")) + .map(line => line.split("==")(0).trim.toLowerCase) + .toSet + } else { + Set[String]() + } + + if (systemPackages.contains(normalizedPackageName)) { return List( s"[PVE][ERR] $trimmedPackageName is a system package and cannot be deleted." ) @@ -540,11 +418,11 @@ object PveManager extends LazyLogging { val exitCode = command.!( ProcessLogger( out => { - logger.info(s"[pip] $out") + println(s"[pip] $out") output += s"[pip] $out" }, err => { - logger.error(s"[pip][ERR] $err") + System.err.println(s"[pip][ERR] $err") output += s"[pip][ERR] $err" } ) diff --git a/bin/single-node/nginx.conf b/bin/single-node/nginx.conf index 515a7016da0..d55c96827e5 100644 --- a/bin/single-node/nginx.conf +++ b/bin/single-node/nginx.conf @@ -81,6 +81,12 @@ http { proxy_send_timeout 1d; } + location /pve/ { + proxy_pass http://workflow-runtime-coordinator-service:8085/api/pve/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + } + location /api/ { proxy_pass http://dashboard-service:8080; proxy_set_header Host $host;