diff --git a/Cargo.lock b/Cargo.lock index 2efc7ce848..406d2d85b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2848,10 +2848,13 @@ dependencies = [ "anyhow", "cfg-if", "chrono", + "cloud_object_persistence", "cloud_objects", + "diesel", "handlebars", "lazy_static", "log", + "persistence", "regex", "schemars", "serde", @@ -2868,6 +2871,24 @@ dependencies = [ "warp_util", ] +[[package]] +name = "cloud_object_persistence" +version = "0.1.0" +dependencies = [ + "anyhow", + "bincode", + "chrono", + "cloud_objects", + "diesel", + "lazy_static", + "log", + "persistence", + "serde", + "session-sharing-protocol", + "warp_core", + "warp_graphql", +] + [[package]] name = "cloud_objects" version = "0.1.0" @@ -14395,6 +14416,7 @@ dependencies = [ "clap", "cloud_object_client", "cloud_object_models", + "cloud_object_persistence", "cloud_objects", "cocoa 0.26.0", "comfy-table", @@ -15031,19 +15053,16 @@ name = "warp_server_client" version = "0.1.0" dependencies = [ "anyhow", - "bincode", "chrono", "cloud_object_client", "cloud_object_models", "cloud_objects", "cynic", "derivative", - "diesel", "itertools 0.14.0", "lasso", "log", "pathfinder_geometry", - "persistence", "schemars", "serde", "session-sharing-protocol", diff --git a/Cargo.toml b/Cargo.toml index bb0c56dceb..bb471a9e68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ command = { path = "crates/command" } command-signatures-v2 = { path = "crates/command-signatures-v2" } cloud_objects = { path = "crates/cloud_objects" } cloud_object_client = { path = "crates/cloud_object_client" } +cloud_object_persistence = { path = "crates/cloud_object_persistence" } cloud_object_models = { path = "crates/cloud_object_models" } computer_use = { path = "crates/computer_use" } field_mask = { path = "crates/field_mask" } diff --git a/app/Cargo.toml b/app/Cargo.toml index f2027f3481..f24cafa6aa 100644 --- a/app/Cargo.toml +++ b/app/Cargo.toml @@ -281,6 +281,7 @@ wgpu.workspace = true app-installation-detection.workspace = true async-io.workspace = true axum.workspace = true +cloud_object_persistence.workspace = true comfy-table = "7.1.4" inquire = "0.9.1" diesel = { workspace = true, features = ["sqlite", "chrono"] } diff --git a/app/src/persistence/mod.rs b/app/src/persistence/mod.rs index 6995f4e40c..ffd591d22a 100644 --- a/app/src/persistence/mod.rs +++ b/app/src/persistence/mod.rs @@ -4,7 +4,6 @@ cfg_if::cfg_if! { if #[cfg(feature = "local_fs")] { pub mod agent; mod block_list; - mod cloud_objects; mod sqlite; pub mod commands; } diff --git a/app/src/persistence/sqlite.rs b/app/src/persistence/sqlite.rs index c13e89e82b..59bf5ccce2 100644 --- a/app/src/persistence/sqlite.rs +++ b/app/src/persistence/sqlite.rs @@ -9,7 +9,23 @@ use std::{fs, thread}; use ai::project_context::model::ProjectRulePath; use anyhow::{anyhow, bail, Context, Result}; -use chrono::{DateTime, Utc}; +use chrono::Utc; +use cloud_object_models::folder::persistence::upsert_folders; +use cloud_object_models::notebook::persistence::upsert_notebooks; +use cloud_object_models::workflow::persistence::upsert_workflows; +use cloud_object_models::{ + folder::persistence as folder_persistence, + json_model::persistence::{self as generic_string_persistence, PersistedGenericStringObject}, + notebook::persistence as notebook_persistence, + workflow::persistence as workflow_persistence, +}; +use cloud_object_persistence::{ + delete_cloud_object, delete_generic_string_object, increment_retry_count, + load_cloud_object_read_context, mark_object_as_synced, read_time_of_next_force_object_refresh, + record_time_of_next_refresh, update_object_after_server_creation, update_object_metadata, + upsert_generic_string_objects as upsert_generic_string_object_rows, + GenericStringObjectPersistenceData, +}; use diesel::connection::{DefaultLoadingMode, SimpleConnection}; use diesel::result::Error; use diesel::sqlite::SqliteConnection; @@ -26,7 +42,6 @@ use pathfinder_geometry::rect::RectF; use pathfinder_geometry::vector::Vector2F; use persistence::model::AMBIENT_AGENT_PANE_KIND; use uuid::Uuid; -use warp_graphql::scalars::time::ServerTimestamp; use warpui::platform::FullscreenState; use warpui::windowing::{MIN_WINDOW_HEIGHT, MIN_WINDOW_WIDTH}; use warpui::{AppContext, SingletonEntity}; @@ -38,33 +53,21 @@ use super::block_list::{ }; use super::model::{ self, ActiveMCPServer, CurrentUserInformation, MCPEnvironmentVariables, NewActiveMCPServer, - NewApp, NewCommand, NewFolder, NewNotebook, NewServerExperiment, NewTab, NewTeam, NewWindow, - NewWorkspace, NewWorkspaceMetadata, NewWorkspaceTeam, ObjectMetadata, ObjectPermissions, - Project, Tab, Window, WorkspaceMetadata as WorkspaceMetadataModel, AI_DOCUMENT_PANE_KIND, - AI_FACT_PANE_KIND, CODE_PANE_KIND, ENV_VAR_COLLECTION_PANE_KIND, - EXECUTION_PROFILE_EDITOR_PANE_KIND, MCP_SERVER_PANE_KIND, NOTEBOOK_PANE_KIND, - SETTINGS_PANE_KIND, TERMINAL_PANE_KIND, WELCOME_PANE_KIND, WORKFLOW_PANE_KIND, + NewApp, NewCommand, NewServerExperiment, NewTab, NewTeam, NewWindow, NewWorkspace, + NewWorkspaceMetadata, NewWorkspaceTeam, Project, Tab, Window, + WorkspaceMetadata as WorkspaceMetadataModel, AI_DOCUMENT_PANE_KIND, AI_FACT_PANE_KIND, + CODE_PANE_KIND, ENV_VAR_COLLECTION_PANE_KIND, EXECUTION_PROFILE_EDITOR_PANE_KIND, + MCP_SERVER_PANE_KIND, NOTEBOOK_PANE_KIND, SETTINGS_PANE_KIND, TERMINAL_PANE_KIND, + WELCOME_PANE_KIND, WORKFLOW_PANE_KIND, }; use super::{ schema, BlockCompleted, FinishedCommandMetadata, ModelEvent, PersistedData, PersistenceScope, StartedCommandMetadata, WriterHandles, }; use crate::ai::agent::conversation::AIConversationId; -use crate::ai::ambient_agents::scheduled::{ - CloudScheduledAmbientAgent, CloudScheduledAmbientAgentModel, -}; use crate::ai::ambient_agents::AmbientAgentTaskId; -use crate::ai::cloud_environments::{ - CloudAmbientAgentEnvironment, CloudAmbientAgentEnvironmentModel, -}; -use crate::ai::document::ai_document_model::AIDocumentId; -use crate::ai::execution_profiles::{CloudAIExecutionProfile, CloudAIExecutionProfileModel}; -use crate::ai::facts::{CloudAIFact, CloudAIFactModel}; -use crate::ai::mcp::templatable::{CloudTemplatableMCPServer, CloudTemplatableMCPServerModel}; use crate::ai::mcp::templatable_installation::VariableValue; -use crate::ai::mcp::{ - CloudMCPServer, CloudMCPServerModel, TemplatableMCPServer, TemplatableMCPServerInstallation, -}; +use crate::ai::mcp::{TemplatableMCPServer, TemplatableMCPServerInstallation}; use crate::ai::persisted_workspace::EnablementState; use crate::app_state::{ AIFactPaneSnapshot, AmbientAgentPaneSnapshot, AppState, BranchSnapshot, CodePaneSnapShot, @@ -80,36 +83,26 @@ use crate::cloud_object::model::actions::{ object_action_from_persisted, ObjectAction, ObjectActionSubtype, }; use crate::cloud_object::model::generic_string_model::{CloudStringObject, GenericStringObjectId}; -use crate::cloud_object::{ - CloudObject, CloudObjectMetadata, CloudObjectPermissions, CloudObjectStatuses, - CloudObjectSyncStatus, JsonObjectType, NumInFlightRequests, ObjectIdType, ObjectType, Owner, - Revision, RevisionAndLastEditor, ServerCreationInfo, GENERIC_STRING_OBJECT_PREFIX, - JSON_OBJECT_PREFIX, -}; +use crate::cloud_object::{CloudObject, ObjectIdType}; use crate::code::editor_management::CodeSource; -use crate::drive::folders::{CloudFolder, CloudFolderModel, FolderId}; use crate::drive::OpenWarpDriveObjectSettings; -use crate::env_vars::{CloudEnvVarCollection, CloudEnvVarCollectionModel}; -use crate::features::FeatureFlag; -use crate::notebooks::{CloudNotebook, CloudNotebookModel, NotebookId}; +use crate::notebooks::NotebookId; use crate::persistence::agent::read_agent_conversations; use crate::persistence::block_list::{get_all_restored_blocks, read_ai_queries}; use crate::persistence::model::{ - NewCloudObjectsRefresh, NewGenericStringObject, NewPersistedObjectAction, NewTeamSettings, - ProjectRules, UserProfile, CODE_REVIEW_PANE_KIND, GET_STARTED_PANE_KIND, + NewPersistedObjectAction, NewTeamSettings, ProjectRules, UserProfile, CODE_REVIEW_PANE_KIND, + GET_STARTED_PANE_KIND, }; use crate::server::experiments::ServerExperiment; -use crate::server::ids::{ClientId, HashableId, ServerId, SyncId, ToServerId}; +use crate::server::ids::{ClientId, HashableId, ServerId, SyncId}; use crate::server::telemetry::TelemetryEvent; -use crate::settings::cloud_preferences::{CloudPreference, CloudPreferenceModel}; use crate::settings_view::SettingsSection; use crate::suggestions::ignored_suggestions_model::SuggestionType; use crate::tab::SelectedTabColor; use crate::terminal::history::PersistedCommand; use crate::terminal::ShellLaunchData; use crate::themes::theme::AnsiColorIdentifier; -use crate::workflows::workflow_enum::{CloudWorkflowEnum, CloudWorkflowEnumModel}; -use crate::workflows::{CloudWorkflow, CloudWorkflowModel, WorkflowId}; +use crate::workflows::WorkflowId; use crate::workspaces::team::Team as TeamMetadata; use crate::workspaces::user_profiles::{user_profile_from_persistence, UserProfileWithUID}; use crate::workspaces::workspace::{Workspace as WorkspaceMetadata, WorkspaceUid}; @@ -124,16 +117,8 @@ diesel::define_sql_function! { const CHANNEL_SIZE: usize = 1024; const COMMANDS_COUNT_LIMIT: i64 = 10000; -use warp_server_client::persistence::{upsert_cloud_object, CloudObjectId}; - const WARP_SQLITE_FILE_NAME: &str = "warp.sqlite"; -/// When delete a cloud object, this callback is used to delete the cloud -/// object. It takes the id of the cloud object to delete as a parameter. -/// The supplied conn has already started a transaction. -type DeleteCloudObjectFn = - Box Result<(), Error>>; - /// Runs any migrations and creates the Sqlite database if it doesn't exist. /// Reads from the sqlite database to get the app state for session restoration. /// Starts a writer thread that listens for ModelEvents and processes them. @@ -2066,342 +2051,21 @@ fn set_current_workspace(conn: &mut SqliteConnection, workspace_uid: WorkspaceUi Ok(()) } -/// Mark a shareable object as no longer having pending changes. -fn mark_object_as_synced( - conn: &mut SqliteConnection, - hashed_sqlite_id: String, - new_revision_and_editor: RevisionAndLastEditor, - new_metadata_ts: Option, -) -> Result<(), Error> { - use schema::object_metadata::dsl::*; - conn.transaction::<(), Error, _>(|conn| { - diesel::update(object_metadata.filter(server_id.eq(Some(hashed_sqlite_id.as_str())))) - .set(is_pending.eq(false)) - .execute(conn)?; - diesel::update(object_metadata.filter(server_id.eq(Some(hashed_sqlite_id.clone())))) - .set(( - revision_ts.eq(new_revision_and_editor.revision.timestamp_micros()), - last_editor_uid.eq(new_revision_and_editor.last_editor_uid), - )) - .execute(conn)?; - - if let Some(metadata_ts) = new_metadata_ts { - diesel::update(object_metadata.filter(server_id.eq(Some(hashed_sqlite_id)))) - .set((metadata_last_updated_ts.eq(metadata_ts.timestamp_micros()),)) - .execute(conn)?; - } - Ok(()) - }) -} - -fn increment_retry_count( - conn: &mut SqliteConnection, - server_id_string: String, -) -> Result<(), Error> { - use schema::object_metadata::dsl::*; - conn.transaction::<(), Error, _>(|conn| { - diesel::update(object_metadata.filter(server_id.eq(Some(server_id_string)))) - .set(retry_count.eq(retry_count + 1)) - .execute(conn)?; - Ok(()) - }) -} - -fn update_object_after_server_creation( - conn: &mut SqliteConnection, - client_id_string: String, - server_creation_info: ServerCreationInfo, -) -> Result<(), Error> { - use schema::commands::dsl::*; - use schema::object_metadata::dsl::*; - - conn.transaction::<(), Error, _>(|conn| { - diesel::update(object_metadata.filter(client_id.eq(Some(client_id_string.clone())))) - .set(( - server_id.eq(Some( - server_creation_info - .server_id_and_type - .sqlite_type_and_uid_hash(), - )), - creator_uid.eq(server_creation_info.creator_uid), - )) - .execute(conn)?; - - diesel::update(commands.filter(cloud_workflow_id.eq(Some(client_id_string)))) - .set( - cloud_workflow_id.eq(Some( - server_creation_info - .server_id_and_type - .sqlite_type_and_uid_hash(), - )), - ) - .execute(conn)?; - - Ok(()) - }) -} - -/// Helper function to delete a cloud object identified by `sync_id`. If a valid object metadata row -/// for the object is found, `delete_object_fn` is called to delete the actual object. -fn delete_cloud_object( - conn: &mut SqliteConnection, - sync_id: SyncId, - object_id_type: ObjectIdType, - delete_object_fn: DeleteCloudObjectFn, -) -> Result<(), Error> { - use schema::object_metadata::dsl::*; - - // Filter to find metadata row. - // The diesel types for `filter`s are dependent on the columns being filtered - // so while the `hashed_sync_id` will only match one of `client_id` and `server_id`, - // we filter on both here for ergonomics. - let hashed_sync_id = sync_id.sqlite_uid_hash(object_id_type); - let metadata_filter = object_metadata - .filter(client_id.eq(Some(hashed_sync_id.as_str()))) - .or_filter(server_id.eq(Some(hashed_sync_id.as_str()))); - - let metadata: ObjectMetadata = metadata_filter.first(conn)?; - let object_id = metadata.shareable_object_id; - diesel::delete(object_metadata.filter(id.eq(metadata.id))).execute(conn)?; - diesel::delete( - schema::object_permissions::dsl::object_permissions - .filter(schema::object_permissions::object_metadata_id.eq(metadata.id)), - ) - .execute(conn)?; - diesel::delete( - schema::object_actions::dsl::object_actions - .filter(schema::object_actions::hashed_object_id.eq(hashed_sync_id)), - ) - .execute(conn)?; - delete_object_fn(conn, object_id)?; - Ok(()) -} - -/// SQLite endpoint for the ObjectMetadataUpdated RTC message that updates the metadata ts and other -/// metadata like current team_id of the object. -fn update_object_metadata( - conn: &mut SqliteConnection, - hashed_id: String, - metadata: CloudObjectMetadata, -) -> Result<(), Error> { - use schema::object_metadata::dsl::*; - let metadata_last_updated_at = metadata - .metadata_last_updated_ts - .map(|ts| ts.timestamp_micros()); - - let trashed_timestamp = metadata.trashed_ts.map(|ts| ts.timestamp_micros()); - let folder_id_str = metadata - .folder_id - .map(|folder_sync_id| folder_sync_id.sqlite_uid_hash(ObjectIdType::Folder)); - - conn.transaction::<(), Error, _>(|conn| { - diesel::update(object_metadata.filter(server_id.eq(Some(hashed_id.as_str())))) - .set(( - metadata_last_updated_ts.eq(metadata_last_updated_at), - trashed_ts.eq(trashed_timestamp), - folder_id.eq(folder_id_str), - current_editor.eq(metadata.current_editor_uid), - )) - .execute(conn)?; - - Ok(()) - }) -} - fn upsert_generic_string_objects( conn: &mut SqliteConnection, cloud_generic_string_objects: Vec>, ) -> Result<(), Error> { - use schema::generic_string_objects::dsl::*; - conn.transaction::<(), Error, _>(|conn| { - for object in cloud_generic_string_objects { - let serialized_data = Arc::new(object.serialized().take()); - let serialized_data_clone = serialized_data.clone(); - upsert_cloud_object( - conn, - ObjectType::GenericStringObject(object.generic_string_object_format()), - object.id(), - object.metadata().clone(), - object.permissions().clone(), - Box::new(move |conn| { - let new_object = NewGenericStringObject { - data: serialized_data.as_ref(), - }; - diesel::insert_into( - schema::generic_string_objects::dsl::generic_string_objects, - ) - .values(new_object) - .execute(conn)?; - let object_id: i32 = - schema::generic_string_objects::dsl::generic_string_objects - .select(schema::generic_string_objects::columns::id) - .order(schema::generic_string_objects::columns::id.desc()) - .first(conn)?; - Ok(object_id) - }), - Box::new(move |conn, object_id| { - diesel::update( - generic_string_objects - .filter(schema::generic_string_objects::dsl::id.eq(object_id)), - ) - .set((data.eq(serialized_data_clone.as_ref()),)) - .execute(conn)?; - Ok(()) - }), - )? - } - Ok(()) - }) -} - -fn upsert_workflows( - conn: &mut SqliteConnection, - cloud_workflows: Vec, -) -> Result<(), Error> { - use schema::workflows::dsl::*; - conn.transaction::<(), Error, _>(|conn| { - // todo: wrap in an arc to avoid unnecessary cloning. - for cloud_workflow in cloud_workflows { - let workflow_id = cloud_workflow.id; - if let Ok(serialized_workflow) = serde_json::to_string(&cloud_workflow.model().data) { - let serialized_workflow_clone = serialized_workflow.clone(); - upsert_cloud_object( - conn, - ObjectType::Workflow, - workflow_id, - cloud_workflow.metadata, - cloud_workflow.permissions, - Box::new(move |conn| { - let workflow = model::NewWorkflow { - data: serialized_workflow.clone(), - }; - diesel::insert_into(schema::workflows::dsl::workflows) - .values(workflow) - .execute(conn)?; - let workflow_id: i32 = schema::workflows::dsl::workflows - .select(schema::workflows::columns::id) - .order(schema::workflows::columns::id.desc()) - .first(conn)?; - Ok(workflow_id) - }), - Box::new(move |conn, workflow_id| { - diesel::update( - workflows.filter(schema::workflows::dsl::id.eq(workflow_id)), - ) - .set((data.eq(serialized_workflow_clone),)) - .execute(conn)?; - Ok(()) - }), - )? - } - } - Ok(()) - }) -} - -fn upsert_notebooks( - conn: &mut SqliteConnection, - cloud_notebooks: Vec, -) -> Result<(), Error> { - use schema::notebooks::dsl::*; - conn.transaction::<(), Error, _>(|conn| { - for cloud_notebook in cloud_notebooks { - // todo: wrap in an arc to avoid unnecessary cloning. - let notebook_clone = cloud_notebook.clone(); - let title_clone = cloud_notebook.model().title.clone(); - let data_clone = cloud_notebook.model().data.clone(); - let ai_document_id_clone = cloud_notebook - .model() - .ai_document_id - .as_ref() - .map(|doc_id| doc_id.to_string()); - upsert_cloud_object( - conn, - ObjectType::Notebook, - cloud_notebook.id, - cloud_notebook.metadata, - cloud_notebook.permissions, - Box::new(move |conn| { - let new_notebook = NewNotebook { - title: Some(title_clone), - data: Some(data_clone), - ai_document_id: ai_document_id_clone, - }; - diesel::insert_into(schema::notebooks::dsl::notebooks) - .values(new_notebook) - .execute(conn)?; - let notebook_id: i32 = schema::notebooks::dsl::notebooks - .select(schema::notebooks::columns::id) - .order(schema::notebooks::columns::id.desc()) - .first(conn)?; - Ok(notebook_id) - }), - Box::new(move |conn, notebook_id| { - diesel::update(notebooks.filter(schema::notebooks::dsl::id.eq(notebook_id))) - .set(( - title.eq(notebook_clone.model().title.clone()), - data.eq(notebook_clone.model().data.clone()), - ai_document_id.eq(notebook_clone - .model() - .ai_document_id - .as_ref() - .map(|doc_id| doc_id.to_string())), - )) - .execute(conn)?; - Ok(()) - }), - )? - } - Ok(()) - }) -} - -fn upsert_folders( - conn: &mut SqliteConnection, - cloud_folders: Vec, -) -> Result<(), Error> { - use schema::folders::dsl::*; - conn.transaction::<(), Error, _>(|conn| { - for cloud_folder in cloud_folders { - let folder_clone = cloud_folder.clone(); - let folder_name = cloud_folder.model().name.clone(); - let folder_is_open = cloud_folder.model().is_open; - let folder_is_warp_pack = cloud_folder.model().is_warp_pack; - upsert_cloud_object( - conn, - ObjectType::Folder, - cloud_folder.id, - cloud_folder.metadata, - cloud_folder.permissions, - Box::new(move |conn| { - let new_folder = NewFolder { - name: folder_name, - is_open: folder_is_open, - is_warp_pack: folder_is_warp_pack, - }; - diesel::insert_into(schema::folders::dsl::folders) - .values(new_folder) - .execute(conn)?; - let folder_id: i32 = schema::folders::dsl::folders - .select(schema::folders::columns::id) - .order(schema::folders::columns::id.desc()) - .first(conn)?; - Ok(folder_id) - }), - Box::new(move |conn, folder_id| { - diesel::update(folders.filter(schema::folders::dsl::id.eq(folder_id))) - .set(( - name.eq(folder_clone.model().name.clone()), - is_open.eq(folder_clone.model().is_open), - is_warp_pack.eq(folder_clone.model().is_warp_pack), - )) - .execute(conn)?; - Ok(()) - }), - )? - } - Ok(()) - }) + let objects = cloud_generic_string_objects + .into_iter() + .map(|object| GenericStringObjectPersistenceData { + id: object.id(), + format: object.generic_string_object_format(), + metadata: object.metadata().clone(), + permissions: object.permissions().clone(), + data: object.serialized().take(), + }) + .collect(); + upsert_generic_string_object_rows(conn, objects) } /// Parse conversation IDs from JSON string. @@ -2697,6 +2361,22 @@ fn read_node(conn: &mut SqliteConnection, node: model::PaneNode) -> Result Box { + match object { + PersistedGenericStringObject::Preference(object) => Box::new(object), + PersistedGenericStringObject::EnvVarCollection(object) => Box::new(object), + PersistedGenericStringObject::WorkflowEnum(object) => Box::new(object), + PersistedGenericStringObject::AIFact(object) => Box::new(object), + PersistedGenericStringObject::MCPServer(object) => Box::new(object), + PersistedGenericStringObject::TemplatableMCPServer(object) => Box::new(object), + PersistedGenericStringObject::AIExecutionProfile(object) => Box::new(object), + PersistedGenericStringObject::CloudEnvironment(object) => Box::new(object), + PersistedGenericStringObject::ScheduledAmbientAgent(object) => Box::new(object), + } +} + /// This is not in a transaction. The interface for a transaction is a bit awkward, /// and makes it invalid to write the logic recursively. It's ok it's not in a /// transaction because we should be the only connection using the database. @@ -2865,287 +2545,27 @@ fn read_sqlite_data( }) .collect(); - let object_metadata = - schema::object_metadata::dsl::object_metadata.load::(conn)?; - let object_permissions = schema::object_permissions::dsl::object_permissions - .load::(conn)?; - - // Cache metadata and permissions by id so that we aren't doing an n^2 lookups for each object type. - let metadata_by_id = object_metadata - .into_iter() - .map(|metadata| { - let object_type = if metadata - .object_type - .starts_with(GENERIC_STRING_OBJECT_PREFIX) - { - GENERIC_STRING_OBJECT_PREFIX.to_owned() - } else { - metadata.object_type.to_owned() - }; - // Shareable object ids aren't unique across object types, so the object type needs to be - // part of the hashmap key. For generic objects, they are all in the same table, - // so it's safe to use the generic prefix as part of the key. - ((metadata.shareable_object_id, object_type), metadata) - }) - .collect::>(); - let permissions_by_id = object_permissions - .into_iter() - .map(|permissions| (permissions.object_metadata_id, permissions)) - .collect::>(); - + let read_context = load_cloud_object_read_context(conn, current_user_id)?; let mut cloud_objects: Vec> = Vec::new(); cloud_objects.extend( - schema::workflows::dsl::workflows - .load::(conn)? - .iter() - .filter_map(|workflow| { - metadata_by_id - .get(&( - workflow.id, - ObjectType::Workflow.sqlite_object_type_as_str().to_string(), - )) - .and_then(|metadata| { - let workflow_content = serde_json::from_str(workflow.data.as_str()).ok(); - let workflow_id = id_from_metadata::(metadata); - let permissions = permissions_by_id.get(&metadata.id)?; - let cloud_object_permissions = - to_cloud_object_permissions(permissions, current_user_id)?; - workflow_content - .zip(workflow_id) - .map(|(content, workflow_id)| { - let boxed: Box = Box::new(CloudWorkflow::new( - workflow_id, - CloudWorkflowModel::new(content), - to_cloud_object_metadata(metadata), - cloud_object_permissions, - )); - boxed - }) - }) - }) - .collect::>(), + workflow_persistence::read_workflows(conn, &read_context)? + .into_iter() + .map(|workflow| Box::new(workflow) as Box), ); - cloud_objects.extend( - schema::notebooks::dsl::notebooks - .load::(conn)? - .iter() - .filter_map(|notebook| { - metadata_by_id - .get(&( - notebook.id, - ObjectType::Notebook.sqlite_object_type_as_str().to_string(), - )) - .and_then(|metadata| { - let notebook_id = id_from_metadata::(metadata); - let permissions = permissions_by_id.get(&metadata.id)?; - let cloud_object_permissions = - to_cloud_object_permissions(permissions, current_user_id)?; - notebook_id.map(|server_id| { - let ai_document_id = - notebook.ai_document_id.as_ref().and_then(|doc_id_str| { - AIDocumentId::try_from(doc_id_str.as_str()).ok() - }); - let boxed: Box = Box::new(CloudNotebook::new( - server_id, - CloudNotebookModel { - title: notebook.title.clone().unwrap_or_default(), - data: notebook.data.clone().unwrap_or_default(), - ai_document_id, - conversation_id: None, - }, - to_cloud_object_metadata(metadata), - cloud_object_permissions, - )); - boxed - }) - }) - }) - .collect::>(), + notebook_persistence::read_notebooks(conn, &read_context)? + .into_iter() + .map(|notebook| Box::new(notebook) as Box), ); - cloud_objects.extend( - schema::folders::dsl::folders - .load::(conn)? - .iter() - .filter_map(|folder| { - metadata_by_id - .get(&( - folder.id, - ObjectType::Folder.sqlite_object_type_as_str().to_string(), - )) - .and_then(|metadata| { - let folder_id = id_from_metadata::(metadata); - let permissions = permissions_by_id.get(&metadata.id)?; - let cloud_object_permissions = - to_cloud_object_permissions(permissions, current_user_id)?; - folder_id.map(|server_id| { - let boxed: Box = Box::new(CloudFolder::new( - server_id, - CloudFolderModel { - name: folder.name.clone(), - is_open: folder.is_open, - is_warp_pack: folder.is_warp_pack, - }, - to_cloud_object_metadata(metadata), - cloud_object_permissions, - )); - boxed - }) - }) - }) - .collect::>(), + folder_persistence::read_folders(conn, &read_context)? + .into_iter() + .map(|folder| Box::new(folder) as Box), ); - cloud_objects.extend( - schema::generic_string_objects::dsl::generic_string_objects - .load::(conn)? - .iter() - .filter_map(|object| { - metadata_by_id - .get(&(object.id, GENERIC_STRING_OBJECT_PREFIX.to_owned())) - .and_then(|metadata| { - let object_id = id_from_metadata::(metadata); - let permissions = permissions_by_id.get(&metadata.id)?; - let cloud_object_permissions = - to_cloud_object_permissions(permissions, current_user_id)?; - let json_object_type: JsonObjectType = metadata - .object_type - .strip_prefix(&format!( - "{GENERIC_STRING_OBJECT_PREFIX}{JSON_OBJECT_PREFIX}" - ))? - .try_into() - .ok()?; - object_id.and_then(|server_id| match json_object_type { - JsonObjectType::Preference => { - let model = CloudPreferenceModel::deserialize_owned(&object.data); - model.ok().map(|model| { - let boxed: Box = - Box::new(CloudPreference::new( - server_id, - model, - to_cloud_object_metadata(metadata), - cloud_object_permissions, - )); - boxed - }) - } - JsonObjectType::EnvVarCollection => { - let model = - CloudEnvVarCollectionModel::deserialize_owned(&object.data); - model.ok().map(|model| { - let boxed: Box = - Box::new(CloudEnvVarCollection::new( - server_id, - model, - to_cloud_object_metadata(metadata), - cloud_object_permissions, - )); - boxed - }) - } - JsonObjectType::WorkflowEnum => { - let model = CloudWorkflowEnumModel::deserialize_owned(&object.data); - model.ok().map(|model| { - let boxed: Box = - Box::new(CloudWorkflowEnum::new( - server_id, - model, - to_cloud_object_metadata(metadata), - cloud_object_permissions, - )); - boxed - }) - } - JsonObjectType::AIFact => { - let model = CloudAIFactModel::deserialize_owned(&object.data); - model.ok().map(|model| { - let boxed: Box = Box::new(CloudAIFact::new( - server_id, - model, - to_cloud_object_metadata(metadata), - cloud_object_permissions, - )); - boxed - }) - } - JsonObjectType::MCPServer => { - let model = CloudMCPServerModel::deserialize_owned(&object.data); - model.ok().map(|model| { - let boxed: Box = - Box::new(CloudMCPServer::new( - server_id, - model, - to_cloud_object_metadata(metadata), - cloud_object_permissions, - )); - boxed - }) - } - JsonObjectType::TemplatableMCPServer => { - let model = - CloudTemplatableMCPServerModel::deserialize_owned(&object.data); - model.ok().map(|model| { - let boxed: Box = - Box::new(CloudTemplatableMCPServer::new( - server_id, - model, - to_cloud_object_metadata(metadata), - cloud_object_permissions, - )); - boxed - }) - } - JsonObjectType::AIExecutionProfile => { - let model = - CloudAIExecutionProfileModel::deserialize_owned(&object.data); - model.ok().map(|model| { - let boxed: Box = - Box::new(CloudAIExecutionProfile::new( - server_id, - model, - to_cloud_object_metadata(metadata), - cloud_object_permissions, - )); - boxed - }) - } - JsonObjectType::CloudEnvironment => { - let model = CloudAmbientAgentEnvironmentModel::deserialize_owned( - &object.data, - ); - model.ok().map(|model| { - let boxed: Box = - Box::new(CloudAmbientAgentEnvironment::new( - server_id, - model, - to_cloud_object_metadata(metadata), - cloud_object_permissions, - )); - boxed - }) - } - JsonObjectType::ScheduledAmbientAgent => { - let model = CloudScheduledAmbientAgentModel::deserialize_owned( - &object.data, - ); - model.ok().map(|model| { - let boxed: Box = - Box::new(CloudScheduledAmbientAgent::new( - server_id, - model, - to_cloud_object_metadata(metadata), - cloud_object_permissions, - )); - boxed - }) - } - // TODO: Implement CloudAgentConfig model when full sync support is added - JsonObjectType::CloudAgentConfig => None, - }) - }) - }) - .collect::>(), + generic_string_persistence::read_generic_string_objects(conn, &read_context)? + .into_iter() + .map(box_persisted_generic_string_object), ); let db_teams: Vec = schema::teams::dsl::teams.load(conn)?; @@ -3276,13 +2696,7 @@ fn read_sqlite_data( running_mcp_servers, }; - // Find the smallest refresh timestamp to pass into CloudModel. - let time_of_next_force_object_refresh: Option> = - schema::cloud_objects_refreshes::dsl::cloud_objects_refreshes - .load_iter::(conn)? - .filter_map(|refresh| refresh.ok()) - .map(|refresh| refresh.time_of_next_refresh.and_utc()) - .min(); + let time_of_next_force_object_refresh = read_time_of_next_force_object_refresh(conn)?; let ai_queries = read_ai_queries(conn)?; @@ -3317,123 +2731,6 @@ fn read_sqlite_data( }) } -fn id_from_metadata(metadata: &ObjectMetadata) -> Option { - match (&metadata.server_id, &metadata.client_id) { - (Some(server_id), _) => { - K::from_hash(server_id).map(|id| SyncId::ServerId(id.to_server_id())) - } - (None, Some(client_id)) => ClientId::from_hash(client_id).map(SyncId::ClientId), - _ => None, - } -} - -fn to_cloud_object_metadata(metadata: &ObjectMetadata) -> CloudObjectMetadata { - CloudObjectMetadata { - current_editor_uid: metadata.current_editor.clone(), - metadata_last_updated_ts: metadata - .metadata_last_updated_ts - .and_then(|epoch| ServerTimestamp::from_unix_timestamp_micros(epoch).ok()), - revision: metadata - .revision_ts - .and_then(|epoch| Revision::from_unix_timestamp_micros(epoch).ok()), - pending_changes_statuses: CloudObjectStatuses { - pending_delete: false, - content_sync_status: if metadata.is_pending { - CloudObjectSyncStatus::InFlight(NumInFlightRequests(1)) - } else { - CloudObjectSyncStatus::NoLocalChanges - }, - has_pending_metadata_change: false, - has_pending_permissions_change: false, - pending_untrash: false, - }, - trashed_ts: metadata - .trashed_ts - .and_then(|epoch| ServerTimestamp::from_unix_timestamp_micros(epoch).ok()), - folder_id: metadata.folder_id.as_ref().and_then(|folder_id_str| { - // First, attempt to convert the string into a server id. - let as_server_id = - FolderId::from_hash(folder_id_str).map(|id| SyncId::ServerId(id.into())); - - // If the string cannot be converted to server id, it may be a client id. - if as_server_id.is_none() { - ClientId::from_hash(folder_id_str).map(SyncId::ClientId) - } else { - as_server_id - } - }), - is_welcome_object: metadata.is_welcome_object, - creator_uid: metadata.creator_uid.clone(), - last_editor_uid: metadata.last_editor_uid.clone(), - last_task_run_ts: None, - } -} - -fn to_cloud_object_permissions( - permissions: &ObjectPermissions, - default_user_id: Option, -) -> Option { - let owner = owner_for_permissions(permissions, default_user_id)?; - let permissions_last_updated_ts = permissions - .permissions_last_updated_at - .and_then(|ts| ServerTimestamp::from_unix_timestamp_micros(ts).ok()); - - let guests = if FeatureFlag::SharedWithMe.is_enabled() { - permissions - .object_guests - .as_deref() - // If deserializing guests fails, default to None and wait for an eventual refresh. - .and_then(|guests| super::cloud_objects::decode_guests(guests).ok()) - .unwrap_or_default() - } else { - Default::default() - }; - - let anyone_with_link = if FeatureFlag::SharedWithMe.is_enabled() { - permissions - .anyone_with_link_access_level - .as_deref() - .and_then(|access_level| { - super::cloud_objects::decode_link_sharing( - access_level, - permissions.anyone_with_link_source.as_deref(), - ) - // If deserializing link sharing fails, default to None and wait for an - // eventual refresh. - .ok() - }) - } else { - None - }; - - Some(CloudObjectPermissions { - owner, - permissions_last_updated_ts, - guests, - anyone_with_link, - }) -} - -fn owner_for_permissions( - permissions: &ObjectPermissions, - default_user_id: Option, -) -> Option { - match permissions.subject_type.as_str() { - "USER" => { - let user_uid = permissions - .subject_id - .as_deref() - .map(UserUid::new) - .or(default_user_id)?; - Some(Owner::User { user_uid }) - } - "TEAM" => Some(Owner::Team { - team_uid: ServerId::from_string_lossy(&permissions.subject_uid), - }), - _ => None, - } -} - impl From for model::NewCommand { fn from(metadata: StartedCommandMetadata) -> Self { Self { @@ -3569,23 +2866,6 @@ fn clear_user_profiles(conn: &mut SqliteConnection) -> Result<(), Error> { }) } -fn record_time_of_next_refresh( - conn: &mut SqliteConnection, - timestamp: DateTime, -) -> Result<(), Error> { - use schema::cloud_objects_refreshes::dsl::*; - let refresh = NewCloudObjectsRefresh { - time_of_next_refresh: timestamp.naive_utc(), - }; - conn.transaction::<(), Error, _>(|conn| { - diesel::delete(cloud_objects_refreshes).execute(conn)?; - diesel::insert_into(cloud_objects_refreshes) - .values(refresh) - .execute(conn)?; - Ok(()) - }) -} - fn upsert_current_user_information( conn: &mut SqliteConnection, user_information: PersistedCurrentUserInformation, @@ -3727,9 +3007,7 @@ fn delete_objects( sync_id, object_id_type, Box::new(|conn, notebook_id| { - use schema::notebooks::dsl::*; - diesel::delete(notebooks.filter(id.eq(notebook_id))).execute(conn)?; - Ok(()) + notebook_persistence::delete_notebook(conn, notebook_id) }), )?, ObjectIdType::Workflow => delete_cloud_object( @@ -3737,31 +3015,20 @@ fn delete_objects( sync_id, object_id_type, Box::new(|conn, workflow_id| { - use schema::workflows::dsl::*; - diesel::delete(workflows.filter(id.eq(workflow_id))).execute(conn)?; - Ok(()) + workflow_persistence::delete_workflow(conn, workflow_id) }), )?, ObjectIdType::Folder => delete_cloud_object( conn, sync_id, object_id_type, - Box::new(|conn, folder_id| { - use schema::folders::dsl::*; - diesel::delete(folders.filter(id.eq(folder_id))).execute(conn)?; - Ok(()) - }), + Box::new(|conn, folder_id| folder_persistence::delete_folder(conn, folder_id)), )?, ObjectIdType::GenericStringObject => delete_cloud_object( conn, sync_id, object_id_type, - Box::new(|conn, gso_id| { - use schema::generic_string_objects::dsl::*; - diesel::delete(generic_string_objects.filter(id.eq(gso_id))) - .execute(conn)?; - Ok(()) - }), + Box::new(delete_generic_string_object), )?, } } diff --git a/app/src/persistence/sqlite_tests.rs b/app/src/persistence/sqlite_tests.rs index 594c1ae5e5..e313fa936c 100644 --- a/app/src/persistence/sqlite_tests.rs +++ b/app/src/persistence/sqlite_tests.rs @@ -544,7 +544,7 @@ fn test_deserialize_corrupted_guests() { }; // The overall permissions should successfully convert, minus the object guests. - let cloud_permissions = super::to_cloud_object_permissions(&db_permissions, None); + let cloud_permissions = to_cloud_object_permissions(&db_permissions, None); assert_eq!( cloud_permissions, Some(CloudObjectPermissions { diff --git a/crates/cloud_object_models/Cargo.toml b/crates/cloud_object_models/Cargo.toml index 5c334a8db1..c60205a7b6 100644 --- a/crates/cloud_object_models/Cargo.toml +++ b/crates/cloud_object_models/Cargo.toml @@ -34,5 +34,10 @@ warp_core.workspace = true warp_graphql.workspace = true warp_util.workspace = true +[target.'cfg(not(target_family = "wasm"))'.dependencies] +cloud_object_persistence.workspace = true +diesel = { workspace = true, features = ["sqlite", "chrono"] } +persistence.workspace = true + [dev-dependencies] cloud_objects = { workspace = true, features = ["test-util"] } diff --git a/crates/cloud_object_models/src/folder.rs b/crates/cloud_object_models/src/folder.rs index 6550ba5a4f..f3f6a409d9 100644 --- a/crates/cloud_object_models/src/folder.rs +++ b/crates/cloud_object_models/src/folder.rs @@ -2,6 +2,8 @@ use cloud_objects::{ cloud_object::{GenericCloudObject, GenericServerObject, ObjectType, ServerObjectModel}, ids::FolderId, }; +#[cfg(not(target_family = "wasm"))] +pub mod persistence; /// The model for a `CloudFolder`. #[derive(Clone, Debug, PartialEq)] diff --git a/crates/cloud_object_models/src/folder/persistence.rs b/crates/cloud_object_models/src/folder/persistence.rs new file mode 100644 index 0000000000..e3cbaf5e17 --- /dev/null +++ b/crates/cloud_object_models/src/folder/persistence.rs @@ -0,0 +1,92 @@ +use cloud_object_persistence::{ + CloudObjectReadContext, id_from_metadata, to_cloud_object_metadata, upsert_cloud_object, +}; +use cloud_objects::{cloud_object::ObjectType, ids::FolderId}; +use diesel::{ + Connection, ExpressionMethods, QueryDsl, RunQueryDsl, SqliteConnection, result::Error, +}; +use persistence::{ + model::{Folder, NewFolder}, + schema, +}; + +use super::{CloudFolder, CloudFolderModel}; + +pub fn upsert_folders( + conn: &mut SqliteConnection, + cloud_folders: Vec, +) -> Result<(), Error> { + use schema::folders::dsl::*; + conn.transaction::<(), Error, _>(|conn| { + for cloud_folder in cloud_folders { + let folder_clone = cloud_folder.clone(); + let folder_name = cloud_folder.model().name.clone(); + let folder_is_open = cloud_folder.model().is_open; + let folder_is_warp_pack = cloud_folder.model().is_warp_pack; + upsert_cloud_object( + conn, + ObjectType::Folder, + cloud_folder.id, + cloud_folder.metadata, + cloud_folder.permissions, + Box::new(move |conn| { + let new_folder = NewFolder { + name: folder_name, + is_open: folder_is_open, + is_warp_pack: folder_is_warp_pack, + }; + diesel::insert_into(schema::folders::dsl::folders) + .values(new_folder) + .execute(conn)?; + let folder_id: i32 = schema::folders::dsl::folders + .select(schema::folders::columns::id) + .order(schema::folders::columns::id.desc()) + .first(conn)?; + Ok(folder_id) + }), + Box::new(move |conn, folder_id| { + diesel::update(folders.filter(schema::folders::dsl::id.eq(folder_id))) + .set(( + name.eq(folder_clone.model().name.clone()), + is_open.eq(folder_clone.model().is_open), + is_warp_pack.eq(folder_clone.model().is_warp_pack), + )) + .execute(conn)?; + Ok(()) + }), + )? + } + Ok(()) + }) +} + +pub fn read_folders( + conn: &mut SqliteConnection, + read_context: &CloudObjectReadContext, +) -> Result, Error> { + Ok(schema::folders::dsl::folders + .load::(conn)? + .into_iter() + .filter_map(|folder| { + let metadata = read_context.metadata_for_object(folder.id, ObjectType::Folder)?; + let folder_id = id_from_metadata::(metadata)?; + let cloud_object_permissions = read_context.permissions_for_metadata(metadata)?; + Some(CloudFolder::new( + folder_id, + CloudFolderModel { + name: folder.name, + is_open: folder.is_open, + is_warp_pack: folder.is_warp_pack, + }, + to_cloud_object_metadata(metadata), + cloud_object_permissions, + )) + }) + .collect()) +} + +pub fn delete_folder(conn: &mut SqliteConnection, folder_id: i32) -> Result<(), Error> { + use schema::folders::dsl::*; + diesel::delete(folders.filter(id.eq(folder_id))).execute(conn)?; + Ok(()) +} diff --git a/crates/cloud_object_models/src/json_model.rs b/crates/cloud_object_models/src/json_model.rs index 2c7ce54214..bf3c91e03d 100644 --- a/crates/cloud_object_models/src/json_model.rs +++ b/crates/cloud_object_models/src/json_model.rs @@ -6,6 +6,8 @@ use cloud_objects::cloud_object::{ }; use serde::{Serialize, de::DeserializeOwned}; +#[cfg(not(target_family = "wasm"))] +pub mod persistence; /// A JSON-backed cloud object payload. pub trait JsonModel: Clone + Debug + Send + Sync + Serialize + DeserializeOwned + 'static { /// Returns the JSON object type used by the generic string object API. diff --git a/crates/cloud_object_models/src/json_model/persistence.rs b/crates/cloud_object_models/src/json_model/persistence.rs new file mode 100644 index 0000000000..f67cf0cb87 --- /dev/null +++ b/crates/cloud_object_models/src/json_model/persistence.rs @@ -0,0 +1,170 @@ +use cloud_object_persistence::{ + CloudObjectReadContext, id_from_metadata, read_generic_string_object_rows, + to_cloud_object_metadata, +}; +use cloud_objects::{ + cloud_object::{ + GENERIC_STRING_OBJECT_PREFIX, GenericStringObjectFormat, JSON_OBJECT_PREFIX, + JsonObjectType, ObjectType, + }, + ids::GenericStringObjectId, +}; +use diesel::{SqliteConnection, result::Error}; + +use crate::{ + CloudAIExecutionProfile, CloudAIExecutionProfileModel, CloudAIFact, CloudAIFactModel, + CloudAmbientAgentEnvironment, CloudAmbientAgentEnvironmentModel, CloudEnvVarCollection, + CloudEnvVarCollectionModel, CloudMCPServer, CloudMCPServerModel, CloudPreference, + CloudPreferenceModel, CloudScheduledAmbientAgent, CloudScheduledAmbientAgentModel, + CloudTemplatableMCPServer, CloudTemplatableMCPServerModel, CloudWorkflowEnum, + CloudWorkflowEnumModel, +}; + +pub enum PersistedGenericStringObject { + Preference(CloudPreference), + EnvVarCollection(CloudEnvVarCollection), + WorkflowEnum(CloudWorkflowEnum), + AIFact(CloudAIFact), + MCPServer(CloudMCPServer), + TemplatableMCPServer(CloudTemplatableMCPServer), + AIExecutionProfile(CloudAIExecutionProfile), + CloudEnvironment(CloudAmbientAgentEnvironment), + ScheduledAmbientAgent(CloudScheduledAmbientAgent), +} + +pub fn read_generic_string_objects( + conn: &mut SqliteConnection, + read_context: &CloudObjectReadContext, +) -> Result, Error> { + Ok(read_generic_string_object_rows(conn)? + .into_iter() + .filter_map(|object| { + let metadata = read_context.metadata_for_object( + object.id, + ObjectType::GenericStringObject(GenericStringObjectFormat::Json( + JsonObjectType::Preference, + )), + )?; + let object_id = id_from_metadata::(metadata)?; + let cloud_object_permissions = read_context.permissions_for_metadata(metadata)?; + let json_object_type: JsonObjectType = metadata + .object_type + .strip_prefix(&format!( + "{GENERIC_STRING_OBJECT_PREFIX}{JSON_OBJECT_PREFIX}" + ))? + .try_into() + .ok()?; + match json_object_type { + JsonObjectType::Preference => { + let model = CloudPreferenceModel::deserialize_owned(&object.data); + model.ok().map(|model| { + PersistedGenericStringObject::Preference(CloudPreference::new( + object_id, + model, + to_cloud_object_metadata(metadata), + cloud_object_permissions, + )) + }) + } + JsonObjectType::EnvVarCollection => { + let model = CloudEnvVarCollectionModel::deserialize_owned(&object.data); + model.ok().map(|model| { + PersistedGenericStringObject::EnvVarCollection(CloudEnvVarCollection::new( + object_id, + model, + to_cloud_object_metadata(metadata), + cloud_object_permissions, + )) + }) + } + JsonObjectType::WorkflowEnum => { + let model = CloudWorkflowEnumModel::deserialize_owned(&object.data); + model.ok().map(|model| { + PersistedGenericStringObject::WorkflowEnum(CloudWorkflowEnum::new( + object_id, + model, + to_cloud_object_metadata(metadata), + cloud_object_permissions, + )) + }) + } + JsonObjectType::AIFact => { + let model = CloudAIFactModel::deserialize_owned(&object.data); + model.ok().map(|model| { + PersistedGenericStringObject::AIFact(CloudAIFact::new( + object_id, + model, + to_cloud_object_metadata(metadata), + cloud_object_permissions, + )) + }) + } + JsonObjectType::MCPServer => { + let model = CloudMCPServerModel::deserialize_owned(&object.data); + model.ok().map(|model| { + PersistedGenericStringObject::MCPServer(CloudMCPServer::new( + object_id, + model, + to_cloud_object_metadata(metadata), + cloud_object_permissions, + )) + }) + } + JsonObjectType::TemplatableMCPServer => { + let model = CloudTemplatableMCPServerModel::deserialize_owned(&object.data); + model.ok().map(|model| { + PersistedGenericStringObject::TemplatableMCPServer( + CloudTemplatableMCPServer::new( + object_id, + model, + to_cloud_object_metadata(metadata), + cloud_object_permissions, + ), + ) + }) + } + JsonObjectType::AIExecutionProfile => { + let model = CloudAIExecutionProfileModel::deserialize_owned(&object.data); + model.ok().map(|model| { + PersistedGenericStringObject::AIExecutionProfile( + CloudAIExecutionProfile::new( + object_id, + model, + to_cloud_object_metadata(metadata), + cloud_object_permissions, + ), + ) + }) + } + JsonObjectType::CloudEnvironment => { + let model = CloudAmbientAgentEnvironmentModel::deserialize_owned(&object.data); + model.ok().map(|model| { + PersistedGenericStringObject::CloudEnvironment( + CloudAmbientAgentEnvironment::new( + object_id, + model, + to_cloud_object_metadata(metadata), + cloud_object_permissions, + ), + ) + }) + } + JsonObjectType::ScheduledAmbientAgent => { + let model = CloudScheduledAmbientAgentModel::deserialize_owned(&object.data); + model.ok().map(|model| { + PersistedGenericStringObject::ScheduledAmbientAgent( + CloudScheduledAmbientAgent::new( + object_id, + model, + to_cloud_object_metadata(metadata), + cloud_object_permissions, + ), + ) + }) + } + // TODO: Implement CloudAgentConfig model when full sync support is added + JsonObjectType::CloudAgentConfig => None, + } + }) + .collect()) +} diff --git a/crates/cloud_object_models/src/lib.rs b/crates/cloud_object_models/src/lib.rs index 106b48c3fb..264ce0688f 100644 --- a/crates/cloud_object_models/src/lib.rs +++ b/crates/cloud_object_models/src/lib.rs @@ -1,3 +1,16 @@ +//! This crate defines the concrete Warp cloud object models and typed cloud object aliases built +//! on top of `cloud_objects`. +//! +//! Each model module should own the model payload for one cloud object family, plus any model-specific +//! adapters that should move with that model during future verticalization. +//! +//! Native SQLite adapters may live under model-local `persistence` modules, while shared persistence +//! infrastructure should stay in `cloud_object_persistence`. + +// Multiple modules contain `persistence` submodules; it is expected that +// code from the persistence modules is imported with fully-qualified paths. +#![allow(ambiguous_glob_reexports)] + pub mod ai_execution_profile; pub mod ai_fact; pub mod cloud_agent_config; diff --git a/crates/cloud_object_models/src/notebook.rs b/crates/cloud_object_models/src/notebook.rs index 1ed3c59d7f..1b2dc5136a 100644 --- a/crates/cloud_object_models/src/notebook.rs +++ b/crates/cloud_object_models/src/notebook.rs @@ -5,6 +5,8 @@ use cloud_objects::{ }; use serde::{Deserialize, Serialize}; +#[cfg(not(target_family = "wasm"))] +pub mod persistence; /// Serialized representation of a notebook for sync queue /// The AIDocumentID and ConversationID are stored here to avoid polluting the /// generic CreateObjectRequest type. diff --git a/crates/cloud_object_models/src/notebook/persistence.rs b/crates/cloud_object_models/src/notebook/persistence.rs new file mode 100644 index 0000000000..a1eda801ee --- /dev/null +++ b/crates/cloud_object_models/src/notebook/persistence.rs @@ -0,0 +1,107 @@ +use ai::document::AIDocumentId; +use cloud_object_persistence::{ + CloudObjectReadContext, id_from_metadata, to_cloud_object_metadata, upsert_cloud_object, +}; +use cloud_objects::cloud_object::ObjectType; +use diesel::{ + Connection, ExpressionMethods, QueryDsl, RunQueryDsl, SqliteConnection, result::Error, +}; +use persistence::{ + model::{NewNotebook, Notebook}, + schema, +}; + +use super::{CloudNotebook, CloudNotebookModel, NotebookId}; + +pub fn upsert_notebooks( + conn: &mut SqliteConnection, + cloud_notebooks: Vec, +) -> Result<(), Error> { + use schema::notebooks::dsl::*; + conn.transaction::<(), Error, _>(|conn| { + for cloud_notebook in cloud_notebooks { + // todo: wrap in an arc to avoid unnecessary cloning. + let notebook_clone = cloud_notebook.clone(); + let title_clone = cloud_notebook.model().title.clone(); + let data_clone = cloud_notebook.model().data.clone(); + let ai_document_id_clone = cloud_notebook + .model() + .ai_document_id + .as_ref() + .map(|doc_id| doc_id.to_string()); + upsert_cloud_object( + conn, + ObjectType::Notebook, + cloud_notebook.id, + cloud_notebook.metadata, + cloud_notebook.permissions, + Box::new(move |conn| { + let new_notebook = NewNotebook { + title: Some(title_clone), + data: Some(data_clone), + ai_document_id: ai_document_id_clone, + }; + diesel::insert_into(schema::notebooks::dsl::notebooks) + .values(new_notebook) + .execute(conn)?; + let notebook_id: i32 = schema::notebooks::dsl::notebooks + .select(schema::notebooks::columns::id) + .order(schema::notebooks::columns::id.desc()) + .first(conn)?; + Ok(notebook_id) + }), + Box::new(move |conn, notebook_id| { + diesel::update(notebooks.filter(schema::notebooks::dsl::id.eq(notebook_id))) + .set(( + title.eq(notebook_clone.model().title.clone()), + data.eq(notebook_clone.model().data.clone()), + ai_document_id.eq(notebook_clone + .model() + .ai_document_id + .as_ref() + .map(|doc_id| doc_id.to_string())), + )) + .execute(conn)?; + Ok(()) + }), + )? + } + Ok(()) + }) +} + +pub fn read_notebooks( + conn: &mut SqliteConnection, + read_context: &CloudObjectReadContext, +) -> Result, Error> { + Ok(schema::notebooks::dsl::notebooks + .load::(conn)? + .into_iter() + .filter_map(|notebook| { + let metadata = read_context.metadata_for_object(notebook.id, ObjectType::Notebook)?; + let notebook_id = id_from_metadata::(metadata)?; + let cloud_object_permissions = read_context.permissions_for_metadata(metadata)?; + let ai_document_id = notebook + .ai_document_id + .as_ref() + .and_then(|doc_id_str| AIDocumentId::try_from(doc_id_str.as_str()).ok()); + Some(CloudNotebook::new( + notebook_id, + CloudNotebookModel { + title: notebook.title.unwrap_or_default(), + data: notebook.data.unwrap_or_default(), + ai_document_id, + conversation_id: None, + }, + to_cloud_object_metadata(metadata), + cloud_object_permissions, + )) + }) + .collect()) +} + +pub fn delete_notebook(conn: &mut SqliteConnection, notebook_id: i32) -> Result<(), Error> { + use schema::notebooks::dsl::*; + diesel::delete(notebooks.filter(id.eq(notebook_id))).execute(conn)?; + Ok(()) +} diff --git a/crates/cloud_object_models/src/workflow.rs b/crates/cloud_object_models/src/workflow.rs index fd5f72cf0b..fda7b38a39 100644 --- a/crates/cloud_object_models/src/workflow.rs +++ b/crates/cloud_object_models/src/workflow.rs @@ -4,6 +4,8 @@ use cloud_objects::{ }; use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value; +#[cfg(not(target_family = "wasm"))] +pub mod persistence; /// Workflow model used by Warp and warp-internal. #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)] diff --git a/crates/cloud_object_models/src/workflow/persistence.rs b/crates/cloud_object_models/src/workflow/persistence.rs new file mode 100644 index 0000000000..47c0abde71 --- /dev/null +++ b/crates/cloud_object_models/src/workflow/persistence.rs @@ -0,0 +1,86 @@ +use cloud_object_persistence::{ + CloudObjectReadContext, id_from_metadata, to_cloud_object_metadata, upsert_cloud_object, +}; +use cloud_objects::cloud_object::ObjectType; +use diesel::{ + Connection, ExpressionMethods, QueryDsl, RunQueryDsl, SqliteConnection, result::Error, +}; +use persistence::{ + model::{NewWorkflow, Workflow as PersistedWorkflow}, + schema, +}; + +use super::{CloudWorkflow, CloudWorkflowModel, WorkflowId}; + +pub fn upsert_workflows( + conn: &mut SqliteConnection, + cloud_workflows: Vec, +) -> Result<(), Error> { + use schema::workflows::dsl::*; + conn.transaction::<(), Error, _>(|conn| { + for cloud_workflow in cloud_workflows { + let workflow_id = cloud_workflow.id; + if let Ok(serialized_workflow) = serde_json::to_string(&cloud_workflow.model().data) { + // todo: wrap in an arc to avoid unnecessary cloning. + let serialized_workflow_clone = serialized_workflow.clone(); + upsert_cloud_object( + conn, + ObjectType::Workflow, + workflow_id, + cloud_workflow.metadata, + cloud_workflow.permissions, + Box::new(move |conn| { + let workflow = NewWorkflow { + data: serialized_workflow.clone(), + }; + diesel::insert_into(schema::workflows::dsl::workflows) + .values(workflow) + .execute(conn)?; + let workflow_id: i32 = schema::workflows::dsl::workflows + .select(schema::workflows::columns::id) + .order(schema::workflows::columns::id.desc()) + .first(conn)?; + Ok(workflow_id) + }), + Box::new(move |conn, workflow_id| { + diesel::update( + workflows.filter(schema::workflows::dsl::id.eq(workflow_id)), + ) + .set((data.eq(serialized_workflow_clone),)) + .execute(conn)?; + Ok(()) + }), + )? + } + } + Ok(()) + }) +} + +pub fn read_workflows( + conn: &mut SqliteConnection, + read_context: &CloudObjectReadContext, +) -> Result, Error> { + Ok(schema::workflows::dsl::workflows + .load::(conn)? + .into_iter() + .filter_map(|workflow| { + let metadata = read_context.metadata_for_object(workflow.id, ObjectType::Workflow)?; + let workflow_content = serde_json::from_str(workflow.data.as_str()).ok()?; + let workflow_id = id_from_metadata::(metadata)?; + let cloud_object_permissions = read_context.permissions_for_metadata(metadata)?; + Some(CloudWorkflow::new( + workflow_id, + CloudWorkflowModel::new(workflow_content), + to_cloud_object_metadata(metadata), + cloud_object_permissions, + )) + }) + .collect()) +} + +pub fn delete_workflow(conn: &mut SqliteConnection, workflow_id: i32) -> Result<(), Error> { + use schema::workflows::dsl::*; + diesel::delete(workflows.filter(id.eq(workflow_id))).execute(conn)?; + Ok(()) +} diff --git a/crates/cloud_object_persistence/Cargo.toml b/crates/cloud_object_persistence/Cargo.toml new file mode 100644 index 0000000000..6729171a5e --- /dev/null +++ b/crates/cloud_object_persistence/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "cloud_object_persistence" +version = "0.1.0" +edition = "2024" +authors.workspace = true +publish.workspace = true +license.workspace = true + +[dependencies] +anyhow.workspace = true +bincode.workspace = true +chrono.workspace = true +cloud_objects.workspace = true +diesel = { workspace = true, features = ["sqlite", "chrono"] } +log.workspace = true +persistence.workspace = true +serde.workspace = true +warp_core.workspace = true +warp_graphql.workspace = true + +[dev-dependencies] +lazy_static.workspace = true +session-sharing-protocol.workspace = true diff --git a/crates/warp_server_client/src/persistence/cloud_objects.rs b/crates/cloud_object_persistence/src/encoded_permissions.rs similarity index 55% rename from crates/warp_server_client/src/persistence/cloud_objects.rs rename to crates/cloud_object_persistence/src/encoded_permissions.rs index afb6a430d1..d616092935 100644 --- a/crates/warp_server_client/src/persistence/cloud_objects.rs +++ b/crates/cloud_object_persistence/src/encoded_permissions.rs @@ -1,12 +1,12 @@ -//! Supporting types for persisting cloud objects to SQLite. +//! Supporting helpers for persisting cloud-object permissions to SQLite. use anyhow::anyhow; use serde::{Deserialize, Serialize}; -use crate::auth::UserUid; -use crate::cloud_object::{CloudLinkSharing, CloudObjectGuest, ServerObjectContainer}; -use crate::drive::sharing::{SharingAccessLevel, Subject, TeamKind, UserKind}; -use crate::ids::ServerId; +use cloud_objects::auth::UserUid; +use cloud_objects::cloud_object::{CloudLinkSharing, CloudObjectGuest, ServerObjectContainer}; +use cloud_objects::drive::sharing::{SharingAccessLevel, Subject, TeamKind, UserKind}; +use cloud_objects::ids::ServerId; /// Decode a link-sharing setting. pub fn decode_link_sharing( @@ -123,8 +123,88 @@ impl PersistedSubject { Err(anyhow!("Session-sharing teams not supported")) } }, - // Link sharing is persisted separately in the schema. - Subject::AnyoneWithLink(_) => Err(anyhow!("Anyone with the link not supported")), + Subject::AnyoneWithLink(_) => { + // Link sharing is persisted separately in the schema. + Err(anyhow!("Anyone with the link not supported")) + } } } } + +#[cfg(test)] +mod tests { + use cloud_objects::{ + cloud_object::{CloudObjectGuest, ServerObjectContainer}, + drive::sharing::{LinkSharingSubjectType, SharingAccessLevel, Subject, TeamKind, UserKind}, + ids::ServerId, + }; + use lazy_static::lazy_static; + use session_sharing_protocol::common::{InputReplicaId, ProfileData}; + + use super::{decode_guests, encode_guests}; + + #[test] + fn test_roundtrip_guests() { + let guests = vec![ + CloudObjectGuest { + subject: Subject::User(UserKind::Account(cloud_objects::UserUid::new( + "firebase_uid", + ))), + access_level: SharingAccessLevel::Edit, + source: None, + }, + CloudObjectGuest { + subject: Subject::PendingUser { + email: Some("pending@warp.dev".to_string()), + }, + access_level: SharingAccessLevel::View, + source: Some(ServerObjectContainer::Folder { + folder_uid: ServerId::from_string_lossy("1234567890123456789012"), + }), + }, + CloudObjectGuest { + subject: Subject::Team(TeamKind::Team { + team_uid: ServerId::from_string_lossy("abcdefghijklmnopqrstuv"), + }), + access_level: SharingAccessLevel::Edit, + source: None, + }, + ]; + + let encoded = encode_guests(&guests).expect("encode should succeed"); + let decoded = decode_guests(&encoded).expect("decode should succeed"); + + assert_eq!(guests, decoded); + } + + lazy_static! { + /// By construction, [`CloudObjectGuest`] only accepts `'static`-lifetime [`Subject`]s. + /// + /// In most cases, this would prevent persisting a shared session subject, but we work around + /// it here for completeness; + static ref PROFILE_DATA: ProfileData = ProfileData { + firebase_uid: "2YP93GScglXJMdEr2Id12dI7HCG3".to_string(), + display_name: "Some User".to_string(), + photo_url: Some("http://example.com/some-image".to_string()), + email: Some("user@warp.dev".to_string()), + input_replica_id: InputReplicaId::from("some-id".to_string()), + }; + } + + #[test] + fn test_fail_unsupported_subjects() { + let result = encode_guests(&[CloudObjectGuest { + subject: Subject::AnyoneWithLink(LinkSharingSubjectType::Anyone), + access_level: SharingAccessLevel::View, + source: None, + }]); + assert!(result.is_err()); + + let result = encode_guests(&[CloudObjectGuest { + subject: Subject::User(UserKind::SharedSessionParticipant(PROFILE_DATA.clone())), + access_level: SharingAccessLevel::View, + source: None, + }]); + assert!(result.is_err()); + } +} diff --git a/crates/cloud_object_persistence/src/lib.rs b/crates/cloud_object_persistence/src/lib.rs new file mode 100644 index 0000000000..a1e8953a85 --- /dev/null +++ b/crates/cloud_object_persistence/src/lib.rs @@ -0,0 +1,26 @@ +//! This crate defines shared SQLite persistence infrastructure for Warp cloud objects. +//! +//! It owns model-agnostic persistence helpers for object metadata, permissions, refresh +//! scheduling, guest and link-sharing encoding, callback-based object upsert and delete +//! operations, and generic string object table access. +//! +//! It should not depend on `cloud_object_models`; model-specific read and write adapters +//! should live with the corresponding model modules. + +mod encoded_permissions; +mod objects; +mod refresh; + +pub use encoded_permissions::{ + decode_guests, decode_link_sharing, encode_guests, encode_link_sharing, +}; +pub use objects::{ + CloudObjectId, CloudObjectReadContext, CreateCloudObjectFn, DeleteCloudObjectFn, + GenericStringObjectPersistenceData, GenericStringObjectRow, UpdateCloudObjectFn, + delete_cloud_object, delete_generic_string_object, id_from_metadata, increment_retry_count, + load_cloud_object_read_context, mark_object_as_synced, metadata_object_type_key, + read_generic_string_object_rows, to_cloud_object_metadata, to_cloud_object_permissions, + update_object_after_server_creation, update_object_metadata, upsert_cloud_object, + upsert_generic_string_objects, +}; +pub use refresh::{read_time_of_next_force_object_refresh, record_time_of_next_refresh}; diff --git a/crates/cloud_object_persistence/src/objects.rs b/crates/cloud_object_persistence/src/objects.rs new file mode 100644 index 0000000000..9ec4314d6e --- /dev/null +++ b/crates/cloud_object_persistence/src/objects.rs @@ -0,0 +1,666 @@ +use std::collections::HashMap; + +use cloud_objects::{ + UserUid, + cloud_object::{ + CloudObjectMetadata, CloudObjectPermissions, CloudObjectStatuses, CloudObjectSyncStatus, + GENERIC_STRING_OBJECT_PREFIX, GenericStringObjectFormat, NumInFlightRequests, ObjectIdType, + ObjectType, Owner, Revision, RevisionAndLastEditor, ServerCreationInfo, + }, + ids::{ClientId, FolderId, HashableId, SyncId, ToServerId}, +}; +use diesel::{ + Connection, ExpressionMethods, QueryDsl, RunQueryDsl, SqliteConnection, result::Error, +}; +use persistence::{ + model::{ + GenericStringObject as PersistedGenericStringObject, NewGenericStringObject, + NewObjectMetadata, NewObjectPermissions, ObjectMetadata, ObjectPermissions, + }, + schema, +}; +use warp_core::features::FeatureFlag; +use warp_graphql::scalars::time::ServerTimestamp; + +use crate::{decode_guests, decode_link_sharing, encode_guests, encode_link_sharing}; + +/// The SQLite id of a cloud object. +pub type CloudObjectId = i32; + +/// When upserting a cloud object, this callback is used to create the cloud +/// object itself. It returns the id of the created cloud object. +/// Note: the supplied conn has already started a transaction. +pub type CreateCloudObjectFn = + Box Result>; + +/// When upserting a cloud object, this callback is used to update the cloud +/// object. It takes the id of the cloud object to update as a parameter. +/// The supplied conn has already started a transaction. +pub type UpdateCloudObjectFn = + Box Result<(), Error>>; + +/// When delete a cloud object, this callback is used to delete the cloud +/// object. It takes the id of the cloud object to delete as a parameter. +/// The supplied conn has already started a transaction. +pub type DeleteCloudObjectFn = + Box Result<(), Error>>; + +/// Generic string object data prepared for persistence. +pub struct GenericStringObjectPersistenceData { + pub id: SyncId, + pub format: GenericStringObjectFormat, + pub metadata: CloudObjectMetadata, + pub permissions: CloudObjectPermissions, + pub data: String, +} + +/// A generic string object row loaded from SQLite. +pub struct GenericStringObjectRow { + pub id: CloudObjectId, + pub data: String, +} + +/// Cloud-object metadata and permissions loaded from SQLite for reconstructing typed objects. +pub struct CloudObjectReadContext { + metadata_by_id: HashMap<(CloudObjectId, String), ObjectMetadata>, + permissions_by_id: HashMap, + current_user_id: Option, +} + +impl CloudObjectReadContext { + pub fn metadata_for_object( + &self, + shareable_object_id: CloudObjectId, + object_type: ObjectType, + ) -> Option<&ObjectMetadata> { + self.metadata_by_id + .get(&(shareable_object_id, metadata_object_type_key(object_type))) + } + + pub fn permissions_for_metadata( + &self, + metadata: &ObjectMetadata, + ) -> Option { + let permissions = self.permissions_by_id.get(&metadata.id)?; + to_cloud_object_permissions(permissions, self.current_user_id) + } +} + +pub fn load_cloud_object_read_context( + conn: &mut SqliteConnection, + current_user_id: Option, +) -> Result { + let object_metadata = + schema::object_metadata::dsl::object_metadata.load::(conn)?; + let object_permissions = + schema::object_permissions::dsl::object_permissions.load::(conn)?; + + // Cache metadata and permissions by id so that we aren't doing an n^2 lookups for each object type. + let metadata_by_id = object_metadata + .into_iter() + .map(|metadata| { + ( + (metadata.shareable_object_id, metadata_key(&metadata)), + metadata, + ) + }) + .collect::>(); + // Shareable object ids aren't unique across object types, so the object type needs to be + // part of the hashmap key. For generic objects, they are all in the same table, + // so it's safe to use the generic prefix as part of the key. + let permissions_by_id = object_permissions + .into_iter() + .map(|permissions| (permissions.object_metadata_id, permissions)) + .collect::>(); + + Ok(CloudObjectReadContext { + metadata_by_id, + permissions_by_id, + current_user_id, + }) +} + +pub fn metadata_object_type_key(object_type: ObjectType) -> String { + match object_type { + ObjectType::GenericStringObject(_) => GENERIC_STRING_OBJECT_PREFIX.to_owned(), + ObjectType::Notebook | ObjectType::Workflow | ObjectType::Folder => { + object_type.sqlite_object_type_as_str().to_string() + } + } +} + +fn metadata_key(metadata: &ObjectMetadata) -> String { + if metadata + .object_type + .starts_with(GENERIC_STRING_OBJECT_PREFIX) + { + GENERIC_STRING_OBJECT_PREFIX.to_owned() + } else { + metadata.object_type.to_owned() + } +} + +pub fn upsert_cloud_object( + conn: &mut SqliteConnection, + cloud_object_type: ObjectType, + sync_id: SyncId, + cloud_object_metadata: CloudObjectMetadata, + cloud_object_permissions: CloudObjectPermissions, + create_object_fn: CreateCloudObjectFn, + update_object_fn: UpdateCloudObjectFn, +) -> Result<(), Error> { + use schema::object_metadata::dsl::{ + client_id, current_editor, folder_id, is_pending, last_editor_uid, + metadata_last_updated_ts, object_metadata, revision_ts, server_id, trashed_ts, + }; + use schema::object_permissions::dsl::{ + anyone_with_link_access_level, anyone_with_link_source, object_guests, object_metadata_id, + object_permissions, permissions_last_updated_at, subject_id, subject_type, subject_uid, + }; + + let (subject_type_value, subject_id_value, subject_uid_value) = + match cloud_object_permissions.owner { + Owner::User { user_uid } => ("USER", Some(user_uid.to_string()), user_uid.to_string()), + Owner::Team { team_uid } => ("TEAM", None, team_uid.to_string()), + }; + let permissions_ts = cloud_object_permissions + .permissions_last_updated_ts + .map(|ts| ts.timestamp_micros()); + let guests = if FeatureFlag::SharedWithMe.is_enabled() { + match encode_guests(&cloud_object_permissions.guests) { + Ok(guests) => Some(guests), + Err(err) => { + log::warn!("Unable to encode guests: {err:#}"); + None + } + } + } else { + None + }; + let (anyone_with_link_access_level_value, anyone_with_link_source_value) = + if FeatureFlag::SharedWithMe.is_enabled() { + match cloud_object_permissions + .anyone_with_link + .as_ref() + .map(encode_link_sharing) + { + Some(Ok((access_level, source))) => (Some(access_level), source), + Some(Err(err)) => { + log::warn!("Unable to encode link-sharing setting: {err:#}"); + (None, None) + } + None => (None, None), + } + } else { + (None, None) + }; + + let revision = cloud_object_metadata + .revision + .as_ref() + .map(|r| r.timestamp_micros()); + let has_pending_content_changes = cloud_object_metadata.has_pending_content_changes(); + + let hashed_sync_id = sync_id.sqlite_uid_hash(cloud_object_type.into()); + // Filter to find metadata row. + // The diesel types for `filter`s are dependent on the columns being filtered + // so while the `hashed_sync_id` will only match one of `client_id` and `server_id`, + // we filter on both here for ergonomics. + let metadata_filter = object_metadata + .filter(client_id.eq(Some(hashed_sync_id.as_str()))) + .or_filter(server_id.eq(Some(hashed_sync_id.as_str()))); + let metadata: Option = metadata_filter.first(conn).ok(); + + match metadata { + Some(metadata) => { + // The object already exists in sqlite so update the object. + update_object_fn(conn, metadata.shareable_object_id)?; + + let metadata_last_updated_at = cloud_object_metadata + .metadata_last_updated_ts + .map(|ts| ts.timestamp_micros()); + let trashed_timestamp = cloud_object_metadata + .trashed_ts + .map(|ts| ts.timestamp_micros()); + let folder_id_str = cloud_object_metadata + .folder_id + .map(|folder_sync_id| folder_sync_id.sqlite_uid_hash(ObjectIdType::Folder)); + + // Update the metadata. Note: this is holistic write of all the metadata based on the current state of the in-memory object. + // TODO: we need to update author_id as well. + diesel::update(metadata_filter) + .set(( + revision_ts.eq(revision), + is_pending.eq(has_pending_content_changes), + last_editor_uid.eq(cloud_object_metadata.last_editor_uid), + )) + .execute(conn)?; + + if !cloud_object_metadata + .pending_changes_statuses + .has_pending_metadata_change + { + diesel::update(metadata_filter) + .set(( + metadata_last_updated_ts.eq(metadata_last_updated_at), + trashed_ts.eq(trashed_timestamp), + folder_id.eq(folder_id_str), + current_editor.eq(cloud_object_metadata.current_editor_uid), + )) + .execute(conn)?; + } + + if !cloud_object_metadata + .pending_changes_statuses + .has_pending_permissions_change + { + // Update the permissions. + let permissions_filter = + object_permissions.filter(object_metadata_id.eq(metadata.id)); + diesel::update(permissions_filter) + .set(( + subject_type.eq(subject_type_value), + subject_id.eq(subject_id_value), + subject_uid.eq(subject_uid_value), + permissions_last_updated_at.eq(permissions_ts), + object_guests.eq(guests), + anyone_with_link_access_level.eq(anyone_with_link_access_level_value), + anyone_with_link_source.eq(anyone_with_link_source_value), + )) + .execute(conn)?; + } + } + None => { + // The object doesn't exist in sqlite so create the object. + let object_id = create_object_fn(conn)?; + // Create the metadata. + let mut new_object_metadata = NewObjectMetadata { + object_type: cloud_object_type.sqlite_object_type_as_str().to_string(), + revision_ts: revision, + shareable_object_id: object_id, + is_pending: has_pending_content_changes, + retry_count: 0, + // TODO: we need to deserialize this from graphql. + author_id: None, + // One of these is set below. + client_id: None, + server_id: None, + metadata_last_updated_ts: cloud_object_metadata + .metadata_last_updated_ts + .map(|ts| ts.timestamp_micros()), + trashed_ts: cloud_object_metadata + .trashed_ts + .map(|ts| ts.timestamp_micros()), + folder_id: cloud_object_metadata + .folder_id + .map(|sync_id| sync_id.sqlite_uid_hash(ObjectIdType::Folder)), + // When we insert an object, mark whether it's a welcome object. This + // field won't ever be updated and this is the only pathway for it to be set. + is_welcome_object: cloud_object_metadata.is_welcome_object, + creator_uid: cloud_object_metadata.creator_uid, + last_editor_uid: cloud_object_metadata.last_editor_uid, + current_editor: cloud_object_metadata.current_editor_uid, + }; + + // There are two distinct cases: + // - If the client created this object, the clientId will be set. There is another model event to set the server id. + // - Otherwise, the server notified the client about this object so only the serverId will be set. + match sync_id { + SyncId::ClientId(_) => { + new_object_metadata.client_id = Some(hashed_sync_id); + } + SyncId::ServerId(_) => { + new_object_metadata.server_id = Some(hashed_sync_id); + } + } + diesel::insert_into(schema::object_metadata::dsl::object_metadata) + .values(new_object_metadata) + .execute(conn)?; + + // Retrieve the ID of the row that was just inserted. We need to + // do it this way because sqlite doesn't support RETURNING. + let metadata_id: i32 = schema::object_metadata::dsl::object_metadata + .select(schema::object_metadata::dsl::id) + .order(schema::object_metadata::dsl::id.desc()) + .first(conn)?; + + // Create the permissions. + let new_object_permissions = NewObjectPermissions { + object_metadata_id: metadata_id, + subject_type: subject_type_value.to_owned(), + subject_id: subject_id_value, + subject_uid: subject_uid_value, + permissions_last_updated_at: permissions_ts, + object_guests: guests, + anyone_with_link_access_level: anyone_with_link_access_level_value, + anyone_with_link_source: anyone_with_link_source_value, + }; + diesel::insert_into(schema::object_permissions::dsl::object_permissions) + .values(new_object_permissions) + .execute(conn)?; + } + } + + Ok(()) +} + +/// Helper function to delete a cloud object identified by `sync_id`. If a valid object metadata row +/// for the object is found, `delete_object_fn` is called to delete the actual object. +pub fn delete_cloud_object( + conn: &mut SqliteConnection, + sync_id: SyncId, + object_id_type: ObjectIdType, + delete_object_fn: DeleteCloudObjectFn, +) -> Result<(), Error> { + use schema::object_metadata::dsl::*; + + let hashed_sync_id = sync_id.sqlite_uid_hash(object_id_type); + // Filter to find metadata row. + // The diesel types for `filter`s are dependent on the columns being filtered + // so while the `hashed_sync_id` will only match one of `client_id` and `server_id`, + // we filter on both here for ergonomics. + let metadata_filter = object_metadata + .filter(client_id.eq(Some(hashed_sync_id.as_str()))) + .or_filter(server_id.eq(Some(hashed_sync_id.as_str()))); + + let metadata: ObjectMetadata = metadata_filter.first(conn)?; + let object_id = metadata.shareable_object_id; + diesel::delete(object_metadata.filter(id.eq(metadata.id))).execute(conn)?; + diesel::delete( + schema::object_permissions::dsl::object_permissions + .filter(schema::object_permissions::object_metadata_id.eq(metadata.id)), + ) + .execute(conn)?; + diesel::delete( + schema::object_actions::dsl::object_actions + .filter(schema::object_actions::hashed_object_id.eq(hashed_sync_id)), + ) + .execute(conn)?; + delete_object_fn(conn, object_id)?; + Ok(()) +} + +pub fn upsert_generic_string_objects( + conn: &mut SqliteConnection, + cloud_generic_string_objects: Vec, +) -> Result<(), Error> { + use schema::generic_string_objects::dsl::*; + conn.transaction::<(), Error, _>(|conn| { + for object in cloud_generic_string_objects { + let create_data = object.data.clone(); + let update_data = object.data; + upsert_cloud_object( + conn, + ObjectType::GenericStringObject(object.format), + object.id, + object.metadata, + object.permissions, + Box::new(move |conn| { + let new_object = NewGenericStringObject { data: &create_data }; + diesel::insert_into( + schema::generic_string_objects::dsl::generic_string_objects, + ) + .values(new_object) + .execute(conn)?; + let object_id: i32 = + schema::generic_string_objects::dsl::generic_string_objects + .select(schema::generic_string_objects::columns::id) + .order(schema::generic_string_objects::columns::id.desc()) + .first(conn)?; + Ok(object_id) + }), + Box::new(move |conn, object_id| { + diesel::update( + generic_string_objects + .filter(schema::generic_string_objects::dsl::id.eq(object_id)), + ) + .set((data.eq(update_data),)) + .execute(conn)?; + Ok(()) + }), + )? + } + Ok(()) + }) +} + +pub fn read_generic_string_object_rows( + conn: &mut SqliteConnection, +) -> Result, Error> { + Ok(schema::generic_string_objects::dsl::generic_string_objects + .load::(conn)? + .into_iter() + .map(|object| GenericStringObjectRow { + id: object.id, + data: object.data, + }) + .collect()) +} + +pub fn delete_generic_string_object( + conn: &mut SqliteConnection, + generic_string_object_id: CloudObjectId, +) -> Result<(), Error> { + use schema::generic_string_objects::dsl::*; + diesel::delete(generic_string_objects.filter(id.eq(generic_string_object_id))).execute(conn)?; + Ok(()) +} + +/// Mark a shareable object as no longer having pending changes. +pub fn mark_object_as_synced( + conn: &mut SqliteConnection, + hashed_sqlite_id: String, + new_revision_and_editor: RevisionAndLastEditor, + new_metadata_ts: Option, +) -> Result<(), Error> { + use schema::object_metadata::dsl::*; + conn.transaction::<(), Error, _>(|conn| { + diesel::update(object_metadata.filter(server_id.eq(Some(hashed_sqlite_id.as_str())))) + .set(is_pending.eq(false)) + .execute(conn)?; + diesel::update(object_metadata.filter(server_id.eq(Some(hashed_sqlite_id.clone())))) + .set(( + revision_ts.eq(new_revision_and_editor.revision.timestamp_micros()), + last_editor_uid.eq(new_revision_and_editor.last_editor_uid), + )) + .execute(conn)?; + + if let Some(metadata_ts) = new_metadata_ts { + diesel::update(object_metadata.filter(server_id.eq(Some(hashed_sqlite_id)))) + .set((metadata_last_updated_ts.eq(metadata_ts.timestamp_micros()),)) + .execute(conn)?; + } + Ok(()) + }) +} + +pub fn increment_retry_count( + conn: &mut SqliteConnection, + server_id_string: String, +) -> Result<(), Error> { + use schema::object_metadata::dsl::*; + conn.transaction::<(), Error, _>(|conn| { + diesel::update(object_metadata.filter(server_id.eq(Some(server_id_string)))) + .set(retry_count.eq(retry_count + 1)) + .execute(conn)?; + Ok(()) + }) +} + +pub fn update_object_after_server_creation( + conn: &mut SqliteConnection, + client_id_string: String, + server_creation_info: ServerCreationInfo, +) -> Result<(), Error> { + use schema::commands::dsl::*; + use schema::object_metadata::dsl::*; + + conn.transaction::<(), Error, _>(|conn| { + diesel::update(object_metadata.filter(client_id.eq(Some(client_id_string.clone())))) + .set(( + server_id.eq(Some( + server_creation_info + .server_id_and_type + .sqlite_type_and_uid_hash(), + )), + creator_uid.eq(server_creation_info.creator_uid), + )) + .execute(conn)?; + + diesel::update(commands.filter(cloud_workflow_id.eq(Some(client_id_string)))) + .set( + cloud_workflow_id.eq(Some( + server_creation_info + .server_id_and_type + .sqlite_type_and_uid_hash(), + )), + ) + .execute(conn)?; + + Ok(()) + }) +} + +/// SQLite endpoint for the ObjectMetadataUpdated RTC message that updates the metadata ts and other +/// metadata like current team_id of the object. +pub fn update_object_metadata( + conn: &mut SqliteConnection, + hashed_id: String, + metadata: CloudObjectMetadata, +) -> Result<(), Error> { + use schema::object_metadata::dsl::*; + let metadata_last_updated_at = metadata + .metadata_last_updated_ts + .map(|ts| ts.timestamp_micros()); + + let trashed_timestamp = metadata.trashed_ts.map(|ts| ts.timestamp_micros()); + let folder_id_str = metadata + .folder_id + .map(|folder_sync_id| folder_sync_id.sqlite_uid_hash(ObjectIdType::Folder)); + + conn.transaction::<(), Error, _>(|conn| { + diesel::update(object_metadata.filter(server_id.eq(Some(hashed_id.as_str())))) + .set(( + metadata_last_updated_ts.eq(metadata_last_updated_at), + trashed_ts.eq(trashed_timestamp), + folder_id.eq(folder_id_str), + current_editor.eq(metadata.current_editor_uid), + )) + .execute(conn)?; + + Ok(()) + }) +} + +pub fn id_from_metadata(metadata: &ObjectMetadata) -> Option { + match (&metadata.server_id, &metadata.client_id) { + (Some(server_id), _) => { + K::from_hash(server_id).map(|id| SyncId::ServerId(id.to_server_id())) + } + (None, Some(client_id)) => ClientId::from_hash(client_id).map(SyncId::ClientId), + _ => None, + } +} + +pub fn to_cloud_object_metadata(metadata: &ObjectMetadata) -> CloudObjectMetadata { + CloudObjectMetadata { + current_editor_uid: metadata.current_editor.clone(), + metadata_last_updated_ts: metadata + .metadata_last_updated_ts + .and_then(|epoch| ServerTimestamp::from_unix_timestamp_micros(epoch).ok()), + revision: metadata + .revision_ts + .and_then(|epoch| Revision::from_unix_timestamp_micros(epoch).ok()), + pending_changes_statuses: CloudObjectStatuses { + pending_delete: false, + content_sync_status: if metadata.is_pending { + CloudObjectSyncStatus::InFlight(NumInFlightRequests(1)) + } else { + CloudObjectSyncStatus::NoLocalChanges + }, + has_pending_metadata_change: false, + has_pending_permissions_change: false, + pending_untrash: false, + }, + trashed_ts: metadata + .trashed_ts + .and_then(|epoch| ServerTimestamp::from_unix_timestamp_micros(epoch).ok()), + folder_id: metadata.folder_id.as_ref().and_then(|folder_id_str| { + // First, attempt to convert the string into a server id. + let as_server_id = + FolderId::from_hash(folder_id_str).map(|id| SyncId::ServerId(id.into())); + if as_server_id.is_none() { + // If the string cannot be converted to server id, it may be a client id. + ClientId::from_hash(folder_id_str).map(SyncId::ClientId) + } else { + as_server_id + } + }), + is_welcome_object: metadata.is_welcome_object, + creator_uid: metadata.creator_uid.clone(), + last_editor_uid: metadata.last_editor_uid.clone(), + last_task_run_ts: None, + } +} + +pub fn to_cloud_object_permissions( + permissions: &ObjectPermissions, + default_user_id: Option, +) -> Option { + let owner = owner_for_permissions(permissions, default_user_id)?; + let permissions_last_updated_ts = permissions + .permissions_last_updated_at + .and_then(|ts| ServerTimestamp::from_unix_timestamp_micros(ts).ok()); + + // If deserializing guests fails, default to None and wait for an eventual refresh. + let guests = if FeatureFlag::SharedWithMe.is_enabled() { + permissions + .object_guests + .as_deref() + .and_then(|guests| decode_guests(guests).ok()) + .unwrap_or_default() + } else { + Default::default() + }; + + // If deserializing link sharing fails, default to None and wait for an + // eventual refresh. + let anyone_with_link = if FeatureFlag::SharedWithMe.is_enabled() { + permissions + .anyone_with_link_access_level + .as_deref() + .and_then(|access_level| { + decode_link_sharing(access_level, permissions.anyone_with_link_source.as_deref()) + .ok() + }) + } else { + None + }; + + Some(CloudObjectPermissions { + owner, + permissions_last_updated_ts, + guests, + anyone_with_link, + }) +} + +fn owner_for_permissions( + permissions: &ObjectPermissions, + default_user_id: Option, +) -> Option { + match permissions.subject_type.as_str() { + "USER" => { + let user_uid = permissions + .subject_id + .as_deref() + .map(UserUid::new) + .or(default_user_id)?; + Some(Owner::User { user_uid }) + } + "TEAM" => Some(Owner::Team { + team_uid: cloud_objects::ids::ServerId::from_string_lossy(&permissions.subject_uid), + }), + _ => None, + } +} diff --git a/crates/cloud_object_persistence/src/refresh.rs b/crates/cloud_object_persistence/src/refresh.rs new file mode 100644 index 0000000000..01208f8e12 --- /dev/null +++ b/crates/cloud_object_persistence/src/refresh.rs @@ -0,0 +1,33 @@ +use chrono::{DateTime, NaiveDateTime, Utc}; +use diesel::{Connection, QueryDsl, RunQueryDsl, SqliteConnection, result::Error}; +use persistence::{model::NewCloudObjectsRefresh, schema}; + +pub fn record_time_of_next_refresh( + conn: &mut SqliteConnection, + timestamp: DateTime, +) -> Result<(), Error> { + use schema::cloud_objects_refreshes::dsl::*; + let refresh = NewCloudObjectsRefresh { + time_of_next_refresh: timestamp.naive_utc(), + }; + conn.transaction::<(), Error, _>(|conn| { + diesel::delete(cloud_objects_refreshes).execute(conn)?; + diesel::insert_into(cloud_objects_refreshes) + .values(refresh) + .execute(conn)?; + Ok(()) + }) +} + +pub fn read_time_of_next_force_object_refresh( + conn: &mut SqliteConnection, +) -> Result>, Error> { + use schema::cloud_objects_refreshes::dsl::*; + // Find the smallest refresh timestamp to pass into CloudModel. + Ok(cloud_objects_refreshes + .select(time_of_next_refresh) + .load::(conn)? + .into_iter() + .map(|refresh| refresh.and_utc()) + .min()) +} diff --git a/crates/cloud_objects/src/lib.rs b/crates/cloud_objects/src/lib.rs index 9b855f63b8..0d425cdf4b 100644 --- a/crates/cloud_objects/src/lib.rs +++ b/crates/cloud_objects/src/lib.rs @@ -1,3 +1,11 @@ +//! This crate defines the low-level, model-agnostic cloud object substrate shared by Warp crates. +//! +//! It owns server-facing identifiers, user identifiers, object metadata, object type and format +//! definitions, and sharing or drive primitives that do not depend on concrete Warp object models. +//! +//! It should remain independent of model-specific payloads, SQLite persistence, app runtime state, +//! and UI rendering concerns. + pub mod auth; pub mod cloud_object; pub mod drive; diff --git a/crates/warp_server_client/Cargo.toml b/crates/warp_server_client/Cargo.toml index 5dc2932525..d1e43db0a9 100644 --- a/crates/warp_server_client/Cargo.toml +++ b/crates/warp_server_client/Cargo.toml @@ -11,7 +11,6 @@ test-util = ["cloud_object_client/test-util", "cloud_objects/test-util"] [dependencies] anyhow.workspace = true -bincode.workspace = true chrono.workspace = true cloud_object_client.workspace = true cloud_object_models.workspace = true @@ -22,7 +21,6 @@ itertools.workspace = true lasso.workspace = true log.workspace = true pathfinder_geometry.workspace = true -persistence.workspace = true schemars = { version = "1", features = ["uuid1"] } serde.workspace = true settings_value.workspace = true @@ -32,6 +30,3 @@ uuid.workspace = true warp_core.workspace = true warp_graphql.workspace = true warpui_core.workspace = true - -[target.'cfg(not(target_family = "wasm"))'.dependencies] -diesel = { workspace = true, features = ["sqlite", "chrono"] } diff --git a/crates/warp_server_client/src/lib.rs b/crates/warp_server_client/src/lib.rs index c01744ff9b..3d8c9d847a 100644 --- a/crates/warp_server_client/src/lib.rs +++ b/crates/warp_server_client/src/lib.rs @@ -2,8 +2,6 @@ pub mod auth; pub mod cloud_object; pub mod drive; pub mod ids; -#[cfg(not(target_family = "wasm"))] -pub mod persistence; pub use auth::UserUid; pub use cloud_objects::server_id_traits;