Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 156 additions & 57 deletions gsma_dataset_creation/argilla_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,8 @@ def delete_workspace(

@app.command(name="add-user")
def add_user(
username: str = typer.Option(..., "--username", "-u", help="Username for the new user"),
password: str = typer.Option(..., "--password", "-p", help="Password for the new user"),
username: str = typer.Option(..., "--username", "-u", help="Username for the user"),
password: str | None = typer.Option(None, "--password", "-p", help="Password for the user (optional for SSO users)"),
workspaces: list[str] = typer.Option(
..., "--workspace", "-w", help="Workspace name(s) to add user to (can be specified multiple times)"
),
Expand All @@ -515,31 +515,38 @@ def add_user(
logger_level: str = typer.Option("INFO", "--logger-level", help="Logging level"),
) -> None:
"""
Create a new user and add them to one or more workspaces.
Create a new user or update an existing user's role and workspaces.

This command:
1. Creates a new user with specified credentials and role
2. Adds the user to all specified workspaces
1. If user doesn't exist: Creates a new user with specified credentials and role
2. If user exists: Updates their role (if different) and adds them to specified workspaces
3. Adds the user to all specified workspaces

Available roles:
- annotator: Can annotate datasets (default)
- admin: Can manage workspaces and users
- owner: Full administrative access

Examples:
# Create user and add to single workspace
# Create user with password and add to single workspace
uv run gsma argilla add-user -u john.doe -p mypassword123 -w mantis

# Create SSO user (no password needed)
uv run gsma argilla add-user -u john.doe -w mantis

# Create user and add to multiple workspaces
uv run gsma argilla add-user -u john.doe -p mypassword123 -w tsg -w fasg -w ng

# Create admin user
uv run gsma argilla add-user -u admin.user -p securepass -w mantis -r admin

# Update existing user's role to owner and add to new workspace
uv run gsma argilla add-user -u existing.user -w new-workspace -r owner
"""
logger.remove()
logger.add(lambda msg: typer.echo(msg, err=False), level=logger_level)

logger.info("👤 Creating user...")
logger.info("👤 Processing user...")
logger.info(f" Username: {username}")
logger.info(f" Workspaces: {', '.join(workspaces)}")
logger.info(f" Role: {role}")
Expand Down Expand Up @@ -580,42 +587,80 @@ def add_user(
ws = client.workspaces(name=ws_name)
if not ws:
logger.error(f"❌ Workspace '{ws_name}' not found")
logger.error(" All workspaces must exist before creating user")
logger.error(" All workspaces must exist before adding user")
raise typer.Exit(code=1)
workspace_objs.append(ws)

logger.info(f"✅ All {len(workspaces)} workspace(s) found")

# Check if user already exists
existing_user = client.users(username=username)

if existing_user:
logger.error(f"❌ User '{username}' already exists")
logger.error(" Use 'add-to-workspace' command to add existing user to workspaces")
raise typer.Exit(code=1)
logger.info(f"ℹ️ User '{username}' already exists - updating role and workspaces")
user = existing_user

# Update role if different
current_role = str(user.role).replace("Role.", "")
if current_role != role:
logger.info(f"🔄 Updating role from '{current_role}' to '{role}'")
user.role = role
try:
user.update()
logger.info(f"✅ Updated user role to: {role}")
except AttributeError:
# Try save() if update() doesn't exist
try:
user.save()
logger.info(f"✅ Updated user role to: {role}")
except AttributeError:
logger.warning(f"⚠️ Could not update role - SDK method not available")
else:
logger.info(f"ℹ️ User already has role '{role}'")

# Create user
user = client.users.add(
rg.User(
username=username,
password=password,
first_name=first_name or username,
last_name=last_name or "",
role=role,
if password:
logger.warning(f"⚠️ Password parameter ignored for existing user '{username}'")
logger.warning(f" Passwords can only be set during user creation")
else:
# Create new user
logger.info(f"📝 Creating new user: {username}")

# Password is required for new users (unless SSO is configured)
if not password:
logger.warning(f"⚠️ No password provided for new user")
logger.warning(f" This will only work if SSO (e.g., Hugging Face) is enabled")
logger.warning(f" Otherwise, user creation will fail")

user = client.users.add(
rg.User(
username=username,
password=password or "", # Use empty string if no password
first_name=first_name or username,
last_name=last_name or "",
role=role,
)
)
)
logger.info(f"✅ Created user: {username}")
logger.info(f"✅ Created user: {username}")

# Add user to all workspaces
for ws in workspace_objs:
ws.add_user(user)
logger.info(f"✅ Added {username} to workspace {ws.name}")
# Check if user is already in workspace
existing_users = [u.username for u in ws.users]
if user.username in existing_users:
logger.info(f"ℹ️ {username} already in workspace {ws.name}")
else:
ws.add_user(user)
logger.info(f"✅ Added {username} to workspace {ws.name}")

logger.info(f"🎉 Successfully created user and added to {len(workspaces)} workspace(s)")
if existing_user:
logger.info(f"🎉 Successfully updated user and ensured membership in {len(workspaces)} workspace(s)")
else:
logger.info(f"🎉 Successfully created user and added to {len(workspaces)} workspace(s)")

except typer.Exit:
raise
except Exception as e:
logger.error(f"❌ User creation failed: {e}")
logger.error(f"❌ User operation failed: {e}")
raise typer.Exit(code=1) from e


Expand Down Expand Up @@ -860,8 +905,8 @@ def add_users(

@app.command(name="list-users")
def list_users(
workspace: str = typer.Option(
..., "--workspace", "-w", help="Workspace name to list users for"
workspace: str | None = typer.Option(
None, "--workspace", "-w", help="Workspace name to list users for (if not provided, lists all users)"
),
output_csv: Path = typer.Option(
None, "--output-csv", "-o", help="Path to save CSV file with user credentials"
Expand All @@ -875,12 +920,16 @@ def list_users(
logger_level: str = typer.Option("INFO", "--logger-level", help="Logging level"),
) -> None:
"""
List all users for a given workspace.
List users and their roles.

This command retrieves all users associated with a workspace and displays their usernames.
If --workspace is provided, lists users in that workspace with their roles.
If --workspace is not provided, lists ALL users in Argilla with their roles.
Note: Passwords cannot be retrieved from Argilla as they are hashed.

Examples:
# List all users with roles
uv run gsma argilla list-users

# List users for TSG workspace
uv run gsma argilla list-users -w tsg-wg -o data/tsg_users.csv

Expand All @@ -890,8 +939,12 @@ def list_users(
logger.remove()
logger.add(lambda msg: typer.echo(msg, err=False), level=logger_level)

logger.info("📋 Listing workspace users...")
logger.info(f" Workspace: {workspace}")
if workspace:
logger.info("📋 Listing workspace users...")
logger.info(f" Workspace: {workspace}")
else:
logger.info("📋 Listing all users...")

logger.debug(f"Logger level set to: {logger_level}")

# Import here to provide better error message if argilla not installed
Expand All @@ -916,57 +969,103 @@ def list_users(
# Create client
client = rg.Argilla(api_url=api_url, api_key=api_key)

# Get workspace
workspace_obj = client.workspaces(name=workspace)
if not workspace_obj:
logger.error(f"❌ Workspace '{workspace}' not found")
raise typer.Exit(code=1)
if workspace:
# Get workspace-specific users
workspace_obj = client.workspaces(name=workspace)
if not workspace_obj:
logger.error(f"❌ Workspace '{workspace}' not found")
raise typer.Exit(code=1)

logger.info(f"✅ Workspace '{workspace}' found")
logger.info(f"✅ Workspace '{workspace}' found")

# Get all users in the workspace
users_in_workspace = list(workspace_obj.users)
# Get all users in the workspace
users_list = list(workspace_obj.users)

if not users_in_workspace:
logger.warning(f"⚠️ No users found in workspace '{workspace}'")
logger.info("📄 No CSV file written (no users)")
raise typer.Exit(code=0)
if not users_list:
logger.warning(f"⚠️ No users found in workspace '{workspace}'")
logger.info("📄 No CSV file written (no users)")
raise typer.Exit(code=0)

logger.info(f"✅ Found {len(users_in_workspace)} users in workspace")
logger.info(f"✅ Found {len(users_list)} users in workspace")

# Collect usernames (passwords are hashed in Argilla and cannot be retrieved)
usernames = [user.username for user in users_in_workspace]
# Print to console in table format
print(f"\n👥 Users in workspace '{workspace}' ({len(users_list)} users)\n")

# Print to console in table format
print(f"\n👥 Users in workspace '{workspace}' ({len(usernames)} users)\n")
else:
# Get all users in Argilla
users_list = list(client.users)

if not users_list:
logger.warning("⚠️ No users found")
logger.info("📄 No CSV file written (no users)")
raise typer.Exit(code=0)

logger.info(f"✅ Found {len(users_list)} users")

# Print to console in table format
print(f"\n👥 All users ({len(users_list)} users)\n")

# Prepare data with role information
user_data = []
for user in users_list:
role_str = str(user.role).replace("Role.", "")
user_data.append({
"username": user.username,
"role": role_str,
})

# Sort by role (owner, admin, annotator) then by username
role_order = {"owner": 0, "admin": 1, "annotator": 2}
user_data_sorted = sorted(
user_data,
key=lambda u: (role_order.get(u["role"], 999), u["username"])
)

try:
from tabulate import tabulate

# Prepare data for table
table_data = [[i+1, username] for i, username in enumerate(usernames)]
print(tabulate(table_data, headers=["#", "Username"], tablefmt="simple_grid"))
table_data = [
[i+1, u["username"], u["role"]]
for i, u in enumerate(user_data_sorted)
]
print(tabulate(
table_data,
headers=["#", "Username", "Role"],
tablefmt="simple_grid"
))
print()

except ImportError:
# Fallback to simple format
for i, username in enumerate(usernames, 1):
print(f" {i}. {username}")
for i, u in enumerate(user_data_sorted, 1):
print(f" {i}. {u['username']:<30} {u['role']:<15}")
print()

# Print summary by role
role_counts = {}
for u in user_data:
role_counts[u["role"]] = role_counts.get(u["role"], 0) + 1

print("📊 Summary by role:")
for role in ["owner", "admin", "annotator"]:
if role in role_counts:
print(f" {role}: {role_counts[role]}")
print()

# Write CSV output if requested
if output_csv:
output_csv = Path(output_csv)
output_csv.parent.mkdir(parents=True, exist_ok=True)

with open(output_csv, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["username"])
for username in usernames:
writer.writerow([username])
writer.writerow(["username", "role"])
for u in user_data_sorted:
writer.writerow([u["username"], u["role"]])

logger.info(f"✅ Successfully exported {len(usernames)} usernames")
logger.info(f"📄 Usernames saved to: {output_csv}")
logger.info(f"✅ Successfully exported {len(user_data)} users")
logger.info(f"📄 Users saved to: {output_csv}")

logger.info(f"ℹ️ Note: Passwords are only available during user creation (add-users command)")
logger.info(f"ℹ️ Save the CSV when creating users to share passwords with annotators")
Expand Down