Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,7 @@ import scala.collection.mutable.Map
import scala.jdk.CollectionConverters._
import scala.sys.process._
import java.util.Comparator
import org.apache.texera.common.config.PythonUtils
import org.apache.texera.dao.SqlServer
import com.typesafe.scalalogging.LazyLogging
import org.apache.commons.lang3.SystemUtils
import org.apache.texera.dao.jooq.generated.tables.daos.VirtualEnvironmentsDao
import org.apache.texera.dao.jooq.generated.tables.pojos.VirtualEnvironments
import org.jooq.JSONB
import org.apache.texera.amber.config.PythonUtils

/**
* PveManager is responsible for managing Python Virtual Environments (PVEs)
Expand All @@ -46,39 +40,26 @@ import org.jooq.JSONB
* /tmp/texera-pve/venvs/{cuid}/{pveName}/
*/

object PveManager extends LazyLogging {
object PveManager {

case class PvePackageResponse(
pveName: String,
userPackages: Seq[String]
)

case class StoredPve(veid: Int, name: String, packagesJson: String)

private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs")

private val SafePveName = "^[A-Za-z0-9._-]+$".r

def isValidPveName(name: String): Boolean =
name != null && name.length <= 128 && SafePveName.pattern.matcher(name).matches()

private def cuidDir(cuid: Int, pveName: String): Path = {
VenvRoot.resolve(cuid.toString).resolve(pveName)
}

private def pveDir(cuid: Int, pveName: String): Path =
cuidDir(cuid, pveName).resolve("pve")

// Resolves the Python interpreter inside a venv. POSIX puts it at
// `<venv>/bin/python`; Windows puts it at `<venv>/Scripts/python.exe`.
private def venvPython(venvDir: Path): Path =
if (SystemUtils.IS_OS_WINDOWS)
venvDir.resolve("Scripts").resolve("python.exe")
else
venvDir.resolve("bin").resolve("python")

private def pythonBinPath(cuid: Int, pveName: String): Path =
venvPython(pveDir(cuid, pveName))
pveDir(cuid, pveName).resolve("bin").resolve("python")

/*
* Validates the PVE name and returns the Python binary path if it exists,
Expand Down Expand Up @@ -113,95 +94,25 @@ object PveManager extends LazyLogging {
}
}

private def locateRequirementsTxt(): Option[Path] =
Seq(Paths.get("/tmp", "requirements.txt"), Paths.get("amber", "requirements.txt"))
.find(Files.exists(_))

// Resolves the fully-pinned system package set by installing requirements.txt
// into a throwaway venv and running `pip freeze`.
private def resolveSystemPackages(): Seq[String] = {
val requirementsPath = locateRequirementsTxt() match {
case Some(p) => p
case None =>
logger.error("requirements.txt not found; system package set will be empty")
return Seq.empty
}

val tempVenv = Files.createTempDirectory("texera-system-venv-")
try {
val python = venvPython(tempVenv).toString
val createCode =
Process(Seq(PythonUtils.getPythonExecutable, "-m", "venv", tempVenv.toString)).!
if (createCode != 0) {
logger.error(s"failed to create temp venv for system-package resolution (exit=$createCode)")
return Seq.empty
}

val installCode = Process(
Seq(
python,
"-u",
"-m",
"pip",
"install",
"--progress-bar",
"off",
"--no-input",
"-r",
requirementsPath.toString
),
None,
pipEnv.toSeq: _*
).!
if (installCode != 0) {
logger.error(s"failed to install requirements into temp venv (exit=$installCode)")
return Seq.empty
}

val collected = scala.collection.mutable.ListBuffer[String]()
val freezeCode = Process(Seq(python, "-m", "pip", "freeze")).!(
ProcessLogger(line => collected += line, _ => ())
)
if (freezeCode != 0) {
logger.error(s"pip freeze failed (exit=$freezeCode)")
return Seq.empty
}
collected.toSeq.map(_.trim).filter(line => line.nonEmpty && !line.startsWith("#"))
} finally {
try {
val stream = Files.walk(tempVenv)
try stream
.sorted(Comparator.reverseOrder())
.iterator()
.asScala
.foreach(Files.deleteIfExists)
finally stream.close()
} catch {
case _: Throwable => ()
}
}
private def getSystemPath(isLocal: Boolean): Path = {
val deployPath = Paths.get("/tmp/system-requirements-lock.txt")
if (Files.exists(deployPath)) deployPath
else Paths.get("amber/system-requirements-lock.txt")
}

// Cached for the JVM lifetime. The system Python + requirements.txt don't
// change without an app restart, so resolving once is sufficient.
private lazy val systemPackages: Seq[String] = resolveSystemPackages()

// Normalised package names ("numpy", "pandas") — used to reject user
// attempts to install or delete system packages.
private lazy val systemPackageNames: Set[String] =
systemPackages.map(_.split("==")(0).trim.toLowerCase).toSet

// Materialised once: a file containing the frozen system requirements,
// passed as `pip install --constraint` so user installs respect system pins.
private lazy val systemConstraintFile: Path = {
val f = Files.createTempFile("texera-system-constraint-", ".txt")
Files.write(f, systemPackages.asJava)
f.toFile.deleteOnExit()
f
def getSystemPackages(isLocal: Boolean): Seq[String] = {
if (!Files.exists(getSystemPath(isLocal))) {
Seq()
} else {
Files
.readAllLines(getSystemPath(isLocal))
.asScala
.map(_.trim)
.filter(line => line.nonEmpty && !line.startsWith("#"))
.toSeq
}
}

def getSystemPackages: Seq[String] = systemPackages

private def runPipInstall(
python: String,
args: Seq[String],
Expand Down Expand Up @@ -241,15 +152,20 @@ object PveManager extends LazyLogging {
def createNewPve(
cuid: Int,
queue: BlockingQueue[String],
pveName: String
pveName: String,
isLocal: Boolean
): Unit = {
queue.put(s"[PVE] Creating new PVE for cuid: $cuid with name: $pveName")

val requirementsPath = locateRequirementsTxt() match {
case Some(p) => p
case None =>
queue.put(s"[PVE][ERR] System requirements not found")
return
val requirementsPath = {
val deployPath = Paths.get("/tmp", "requirements.txt")
if (Files.exists(deployPath)) deployPath
else Paths.get("amber", "requirements.txt")
}

if (!Files.exists(requirementsPath)) {
queue.put(s"[PVE][ERR] System requirements not found")
return
}

val venvDirPath = pveDir(cuid, pveName).toAbsolutePath
Expand Down Expand Up @@ -296,72 +212,6 @@ object PveManager extends LazyLogging {
queue.put(s"[PVE] Created new environment for cuid = $cuid")
}

// Returns every PVE row belonging to the given user.
def listPvesForUser(uid: Int): List[StoredPve] = {
import org.apache.texera.dao.jooq.generated.Tables.VIRTUAL_ENVIRONMENTS
SqlServer
.getInstance()
.createDSLContext()
.selectFrom(VIRTUAL_ENVIRONMENTS)
.where(VIRTUAL_ENVIRONMENTS.UID.eq(uid))
.fetchInto(classOf[VirtualEnvironments])
.asScala
.map { row =>
val pkgsJson = Option(row.getPackages).map(_.data).getOrElse("{}")
StoredPve(row.getVeid, row.getName, pkgsJson)
}
.toList
}

// Deletes a PVE row owned by `uid`. Returns true if a row was deleted, false if no
// matching row was found (either the veid doesn't exist or it belongs to another user).
def deletePveFromDb(veid: Int, uid: Int): Boolean = {
import org.apache.texera.dao.jooq.generated.Tables.VIRTUAL_ENVIRONMENTS
val rows = SqlServer
.getInstance()
.createDSLContext()
.deleteFrom(VIRTUAL_ENVIRONMENTS)
.where(
VIRTUAL_ENVIRONMENTS.VEID
.eq(veid)
.and(VIRTUAL_ENVIRONMENTS.UID.eq(uid))
)
.execute()
rows > 0
}

// Updates an existing PVE row owned by `uid`. Returns true if a row was
// updated, false if no matching row was found.
def updatePve(veid: Int, uid: Int, name: String, packagesJson: String): Boolean = {
import org.apache.texera.dao.jooq.generated.Tables.VIRTUAL_ENVIRONMENTS
val rows = SqlServer
.getInstance()
.createDSLContext()
.update(VIRTUAL_ENVIRONMENTS)
.set(VIRTUAL_ENVIRONMENTS.NAME, name)
.set(VIRTUAL_ENVIRONMENTS.PACKAGES, JSONB.valueOf(packagesJson))
.where(
VIRTUAL_ENVIRONMENTS.VEID
.eq(veid)
.and(VIRTUAL_ENVIRONMENTS.UID.eq(uid))
)
.execute()
rows > 0
}

// Persists a PVE spec (name + packages JSON) for the given user. Returns the new veid.
def savePve(uid: Int, name: String, packagesJson: String): Int = {
val row = new VirtualEnvironments()
row.setUid(uid)
row.setName(name)
row.setPackages(JSONB.valueOf(packagesJson))
val dao = new VirtualEnvironmentsDao(
SqlServer.getInstance().createDSLContext().configuration
)
dao.insert(row)
row.getVeid
}

// returns list of PVE names and corresponding user packages for a given CU
def getEnvironments(cuid: Int): List[PvePackageResponse] = {

Expand Down Expand Up @@ -428,7 +278,8 @@ object PveManager extends LazyLogging {
packages: List[String],
cuid: Int,
queue: BlockingQueue[String],
pveName: String
pveName: String,
isLocal: Boolean
): Unit = {

val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString
Expand All @@ -442,14 +293,27 @@ object PveManager extends LazyLogging {

var installedPackages = readPackageFile(metadataPath).toSet

val systemPackages =
if (Files.exists(getSystemPath(isLocal))) {
Files
.readAllLines(getSystemPath(isLocal))
.asScala
.map(_.trim)
.filter(line => line.nonEmpty && !line.startsWith("#"))
.map(line => line.split("==")(0).trim.toLowerCase)
.toSet
} else {
Set[String]()
}

packages.foreach { pkg =>
val trimmedPkg = pkg.trim

if (trimmedPkg.nonEmpty) {

val userPackageName = trimmedPkg.split("==")(0).trim.toLowerCase

if (systemPackageNames.contains(userPackageName)) {
if (systemPackages.contains(userPackageName)) {
queue.put(
s"[PVE][ERR] $trimmedPkg is a system package and cannot be installed or modified by the user."
)
Expand All @@ -461,8 +325,8 @@ object PveManager extends LazyLogging {
val code = runPipInstall(
python,
Seq(
"--constraint", // pin to the runtime-resolved system set
systemConstraintFile.toString,
"--constraint", // check against system-requirements-lock
getSystemPath(isLocal).toString,
trimmedPkg
),
queue
Expand Down Expand Up @@ -500,21 +364,35 @@ object PveManager extends LazyLogging {
def deletePackages(
cuid: Int,
packageName: String,
pveName: String
pveName: String,
isLocal: Boolean
): List[String] = {
val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString
val metadataPath = cuidDir(cuid, pveName).resolve("user-packages.txt")

if (!Files.exists(Paths.get(python))) {
val msg = s"[PVE][ERR] Python executable not found for PVE: $python"
logger.error(msg)
println(msg)
return List(msg)
}

val trimmedPackageName = packageName.trim
val normalizedPackageName = trimmedPackageName.split("==")(0).trim.toLowerCase

if (systemPackageNames.contains(normalizedPackageName)) {
val systemPackages =
if (Files.exists(getSystemPath(isLocal))) {
Files
.readAllLines(getSystemPath(isLocal))
.asScala
.map(_.trim)
.filter(line => line.nonEmpty && !line.startsWith("#"))
.map(line => line.split("==")(0).trim.toLowerCase)
.toSet
} else {
Set[String]()
}

if (systemPackages.contains(normalizedPackageName)) {
return List(
s"[PVE][ERR] $trimmedPackageName is a system package and cannot be deleted."
)
Expand All @@ -540,11 +418,11 @@ object PveManager extends LazyLogging {
val exitCode = command.!(
ProcessLogger(
out => {
logger.info(s"[pip] $out")
println(s"[pip] $out")
output += s"[pip] $out"
},
err => {
logger.error(s"[pip][ERR] $err")
System.err.println(s"[pip][ERR] $err")
output += s"[pip][ERR] $err"
}
)
Expand Down
6 changes: 6 additions & 0 deletions bin/single-node/nginx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ http {
proxy_send_timeout 1d;
}

location /pve/ {
proxy_pass http://workflow-runtime-coordinator-service:8085/api/pve/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}

location /api/ {
proxy_pass http://dashboard-service:8080;
proxy_set_header Host $host;
Expand Down
Loading