diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a39a413..f835655 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -1,36 +1,84 @@ -# This is a basic workflow to help you get started with Actions - -name: CI - -# Controls when the action will run. Triggers the workflow on push or pull request -# events but only for the master branch -on: - push: - branches: [ master ] - tags: - - v* - pull_request: - branches: [ master ] - -# A workflow run is made up of one or more jobs that can run sequentially or in parallel -jobs: - # This workflow contains a single job called "build" - publish: - # The type of runner that the job will run on - runs-on: ubuntu-latest - - # Steps represent a sequence of tasks that will be executed as part of the job - steps: - # Runs a single command using the runners shell - - uses: actions/checkout@v2 - with: - fetch-depth: 0 - - name: Build And Push Influunt - uses: docker/build-push-action@v1.1.0 - with: - repository: eluki/influunt - username: ${{ secrets.DOCKER_USERNAME }} - password: ${{ secrets.DOCKER_PASSWORD }} - target: influunt - tag_with_ref: true - push: ${{ github.ref == 'refs/heads/master' || startsWith(github.ref, 'refs/tags/') }} +# This is a basic workflow to help you get started with Actions + +name: CI + +# Controls when the action will run. Triggers the workflow on push or pull request +# events but only for the master branch +on: + push: + branches: [ master ] + tags: + - v* + pull_request: + branches: [ master ] + +# A workflow run is made up of one or more jobs that can run sequentially or in parallel +jobs: + publish-webhost: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + fetch-depth: 0 + - name: Docker meta + id: meta + uses: docker/metadata-action@v5 + with: + # list of Docker images to use as base name for tags + images: eluki/influunt + # generate Docker tags based on the following events/attributes + tags: | + type=semver,pattern={{version}} + type=raw,value=latest,enable={{is_default_branch}} + type=raw,value=main,enable={{is_default_branch}} + - name: Login to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Build and push Influunt Host + uses: docker/build-push-action@v6 + with: + context: . + target: influunt + push: ${{ github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/') }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + publish-worker: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + fetch-depth: 0 + - name: Docker meta + id: meta + uses: docker/metadata-action@v5 + with: + # list of Docker images to use as base name for tags + images: eluki/influunt-worker + # generate Docker tags based on the following events/attributes + tags: | + type=semver,pattern={{version}} + type=raw,value=latest,enable={{is_default_branch}} + type=raw,value=main,enable={{is_default_branch}} + - name: Login to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Build and push Influunt Worker + uses: docker/build-push-action@v6 + with: + context: . + target: influunt + push: ${{ github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/') }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index d88866c..3ce1833 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -20,6 +20,20 @@ "sourceFileMap": { "/Views": "${workspaceFolder}/src/Influunt.Host/Views" } + }, + { + "name": "Influunt.Worker", + "type": "coreclr", + "request": "launch", + "preLaunchTask": "build-Influunt.Worker", + "program": "${workspaceFolder}/src/Influunt.Worker/bin/Debug/net8/Influunt.Worker.dll", + "args": [], + "env": { + "DOTNET_ENVIRONMENT": "Development" + }, + "cwd": "${workspaceFolder}/src/Influunt.Worker", + "stopAtEntry": false, + "console": "internalConsole" } ] } \ No newline at end of file diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 3b1c986..f107345 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -1,17 +1,29 @@ -{ - "version": "2.0.0", - "tasks": [ - { - "label": "build-Influunt.Host", - "command": "dotnet", - "type": "process", - "args": [ - "build", - "${workspaceFolder}/src/Influunt.Host/Influunt.Host.csproj", - "/property:GenerateFullPaths=true", - "/consoleloggerparameters:NoSummary" - ], - "problemMatcher": "$msCompile" - } - ] +{ + "version": "2.0.0", + "tasks": [ + { + "label": "build-Influunt.Host", + "command": "dotnet", + "type": "process", + "args": [ + "build", + "${workspaceFolder}/src/Influunt.Host/Influunt.Host.csproj", + "/property:GenerateFullPaths=true", + "/consoleloggerparameters:NoSummary" + ], + "problemMatcher": "$msCompile" + }, + { + "label": "build-Influunt.Worker", + "command": "dotnet", + "type": "process", + "args": [ + "build", + "${workspaceFolder}/src/Influunt.Worker/Influunt.Worker.csproj", + "/property:GenerateFullPaths=true", + "/consoleloggerparameters:NoSummary" + ], + "problemMatcher": "$msCompile" + } + ] } \ No newline at end of file diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..3181299 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,62 @@ +# Influunt Developer Guide + +## Project Overview +Influunt is a .NET 8 + Vue.js RSS aggregator with MongoDB storage and Redis caching. + +## Key Technologies +- Backend: .NET 8, ASP.NET Core +- Frontend: Vue.js 2.x with Vue CLI 3 +- Database: MongoDB +- Caching: Redis +- Authentication: Google OAuth2 and guest login + +## Project Structure +``` +src/ +├── Influunt.Host/ # Main .NET backend with Vue frontend +│ ├── ClientApp/ # Vue.js frontend +│ └── appsettings.json # Configuration +└── Influunt.Feed/ # Feed processing logic +└── Influunt.Storage/ # Data access layer +``` + +## Development Commands +### Backend (.NET) +- Build: `dotnet build` +- Run: `dotnet run` (from src/Influunt.Host/) +- Test: `dotnet test` (from tests/ directory) + +### Frontend (Vue.js) +- Install dependencies: `npm install` (in src/Influunt.Host/ClientApp/) +- Development server: `npm run serve` +- Production build: `npm run build` +- Lint: `npm run lint` + +## Important Notes +1. **Environment Setup**: + - Requires .NET 8 SDK + - Requires Node.js 16+ (due to legacy provider flag in package.json) + - Requires MongoDB and Redis instances + +2. **Configuration**: + - All settings in `appsettings.json` or environment variables + - Google Auth requires ClientId/ClientSecret + - Connection strings for MongoDB and Redis + +3. **Build Process**: + - .NET builds automatically restore npm packages for the frontend + - Vue build requires `NODE_OPTIONS=--openssl-legacy-provider` environment variable + +4. **Testing**: + - Tests are in the `tests/` directory + - Run with `dotnet test` from the test project directory + +5. **Docker Deployment**: + - Use provided docker-compose or docker run commands in README + - Requires MongoDB and Redis connection strings + - Google Auth credentials for authentication + +6. **Architecture**: + - Uses VueCliMiddleware for frontend integration + - Separated concerns: Host (web), Feed (processing), Storage (data) + - Background feed crawler enabled by default \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 515a154..a7235c9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,6 +17,7 @@ RUN sed -i -e "s/0-develop<\/Version>/$(cat version | cut -c2- dotnet restore -s https://api.nuget.org/v3/index.json &&\ dotnet build --no-restore -c Release &&\ dotnet publish ./src/Influunt.Host/Influunt.Host.csproj -c Release -o /app --no-build &&\ + dotnet publish ./src/Influunt.Worker/Influunt.Worker.csproj -c Release -o /worker --no-build &&\ dotnet nuget locals http-cache --clear &&\ dotnet nuget locals temp --clear @@ -28,3 +29,9 @@ WORKDIR /influunt EXPOSE 80 ENV ASPNETCORE_URLS=http://*:80 ENTRYPOINT ["dotnet", "Influunt.Host.dll"] + +######## Influunt worker +FROM mcr.microsoft.com/dotnet/aspnet:8.0-bookworm-slim as worker +COPY --from=build /worker /influunt +WORKDIR /influunt +ENTRYPOINT ["dotnet", "Influunt.Worker.dll"] diff --git a/Influunt.sln b/Influunt.sln index b1b0565..5f8f337 100644 --- a/Influunt.sln +++ b/Influunt.sln @@ -11,32 +11,138 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Influunt.Storage", "src\Inf EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Influunt.Feed.Rss", "src\Influunt.Feed.Rss\Influunt.Feed.Rss.csproj", "{C8997ACE-506F-470F-8BD5-EC955BE522E0}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{827E0CD3-B72D-47B6-A68D-7590B98EB39B}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Influunt.Scheduler", "src\Influunt.Scheduler\Influunt.Scheduler.csproj", "{44AC6B8E-C694-48E7-95B6-5481EB122559}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Influunt.Scheduler.Abstractions", "src\Influunt.Scheduler.Abstractions\Influunt.Scheduler.Abstractions.csproj", "{D065CA50-272A-4E29-8D75-3A373EBCB98B}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{0AB3BF05-4346-4AA6-1389-037BE0695223}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Influunt.Scheduler.Tests", "tests\Influunt.Scheduler.Tests\Influunt.Scheduler.Tests.csproj", "{E9EA202F-F5BF-446B-A7BB-886643AE20C6}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Influunt.Worker", "src\Influunt.Worker\Influunt.Worker.csproj", "{5DF597D9-BDF2-472A-917B-F82E0D42EC6C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU + Debug|x64 = Debug|x64 + Debug|x86 = Debug|x86 Release|Any CPU = Release|Any CPU + Release|x64 = Release|x64 + Release|x86 = Release|x86 EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution {D5232164-9233-4B32-8022-75799161F113}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {D5232164-9233-4B32-8022-75799161F113}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D5232164-9233-4B32-8022-75799161F113}.Debug|x64.ActiveCfg = Debug|Any CPU + {D5232164-9233-4B32-8022-75799161F113}.Debug|x64.Build.0 = Debug|Any CPU + {D5232164-9233-4B32-8022-75799161F113}.Debug|x86.ActiveCfg = Debug|Any CPU + {D5232164-9233-4B32-8022-75799161F113}.Debug|x86.Build.0 = Debug|Any CPU {D5232164-9233-4B32-8022-75799161F113}.Release|Any CPU.ActiveCfg = Release|Any CPU {D5232164-9233-4B32-8022-75799161F113}.Release|Any CPU.Build.0 = Release|Any CPU + {D5232164-9233-4B32-8022-75799161F113}.Release|x64.ActiveCfg = Release|Any CPU + {D5232164-9233-4B32-8022-75799161F113}.Release|x64.Build.0 = Release|Any CPU + {D5232164-9233-4B32-8022-75799161F113}.Release|x86.ActiveCfg = Release|Any CPU + {D5232164-9233-4B32-8022-75799161F113}.Release|x86.Build.0 = Release|Any CPU {82E2087D-00F6-47A4-90DE-47BFAF40DF39}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {82E2087D-00F6-47A4-90DE-47BFAF40DF39}.Debug|Any CPU.Build.0 = Debug|Any CPU + {82E2087D-00F6-47A4-90DE-47BFAF40DF39}.Debug|x64.ActiveCfg = Debug|Any CPU + {82E2087D-00F6-47A4-90DE-47BFAF40DF39}.Debug|x64.Build.0 = Debug|Any CPU + {82E2087D-00F6-47A4-90DE-47BFAF40DF39}.Debug|x86.ActiveCfg = Debug|Any CPU + {82E2087D-00F6-47A4-90DE-47BFAF40DF39}.Debug|x86.Build.0 = Debug|Any CPU {82E2087D-00F6-47A4-90DE-47BFAF40DF39}.Release|Any CPU.ActiveCfg = Release|Any CPU {82E2087D-00F6-47A4-90DE-47BFAF40DF39}.Release|Any CPU.Build.0 = Release|Any CPU + {82E2087D-00F6-47A4-90DE-47BFAF40DF39}.Release|x64.ActiveCfg = Release|Any CPU + {82E2087D-00F6-47A4-90DE-47BFAF40DF39}.Release|x64.Build.0 = Release|Any CPU + {82E2087D-00F6-47A4-90DE-47BFAF40DF39}.Release|x86.ActiveCfg = Release|Any CPU + {82E2087D-00F6-47A4-90DE-47BFAF40DF39}.Release|x86.Build.0 = Release|Any CPU {9F87FB47-0B6A-435A-BC52-5AE6AC25245C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {9F87FB47-0B6A-435A-BC52-5AE6AC25245C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9F87FB47-0B6A-435A-BC52-5AE6AC25245C}.Debug|x64.ActiveCfg = Debug|Any CPU + {9F87FB47-0B6A-435A-BC52-5AE6AC25245C}.Debug|x64.Build.0 = Debug|Any CPU + {9F87FB47-0B6A-435A-BC52-5AE6AC25245C}.Debug|x86.ActiveCfg = Debug|Any CPU + {9F87FB47-0B6A-435A-BC52-5AE6AC25245C}.Debug|x86.Build.0 = Debug|Any CPU {9F87FB47-0B6A-435A-BC52-5AE6AC25245C}.Release|Any CPU.ActiveCfg = Release|Any CPU {9F87FB47-0B6A-435A-BC52-5AE6AC25245C}.Release|Any CPU.Build.0 = Release|Any CPU + {9F87FB47-0B6A-435A-BC52-5AE6AC25245C}.Release|x64.ActiveCfg = Release|Any CPU + {9F87FB47-0B6A-435A-BC52-5AE6AC25245C}.Release|x64.Build.0 = Release|Any CPU + {9F87FB47-0B6A-435A-BC52-5AE6AC25245C}.Release|x86.ActiveCfg = Release|Any CPU + {9F87FB47-0B6A-435A-BC52-5AE6AC25245C}.Release|x86.Build.0 = Release|Any CPU {C8997ACE-506F-470F-8BD5-EC955BE522E0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {C8997ACE-506F-470F-8BD5-EC955BE522E0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C8997ACE-506F-470F-8BD5-EC955BE522E0}.Debug|x64.ActiveCfg = Debug|Any CPU + {C8997ACE-506F-470F-8BD5-EC955BE522E0}.Debug|x64.Build.0 = Debug|Any CPU + {C8997ACE-506F-470F-8BD5-EC955BE522E0}.Debug|x86.ActiveCfg = Debug|Any CPU + {C8997ACE-506F-470F-8BD5-EC955BE522E0}.Debug|x86.Build.0 = Debug|Any CPU {C8997ACE-506F-470F-8BD5-EC955BE522E0}.Release|Any CPU.ActiveCfg = Release|Any CPU {C8997ACE-506F-470F-8BD5-EC955BE522E0}.Release|Any CPU.Build.0 = Release|Any CPU + {C8997ACE-506F-470F-8BD5-EC955BE522E0}.Release|x64.ActiveCfg = Release|Any CPU + {C8997ACE-506F-470F-8BD5-EC955BE522E0}.Release|x64.Build.0 = Release|Any CPU + {C8997ACE-506F-470F-8BD5-EC955BE522E0}.Release|x86.ActiveCfg = Release|Any CPU + {C8997ACE-506F-470F-8BD5-EC955BE522E0}.Release|x86.Build.0 = Release|Any CPU + {44AC6B8E-C694-48E7-95B6-5481EB122559}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {44AC6B8E-C694-48E7-95B6-5481EB122559}.Debug|Any CPU.Build.0 = Debug|Any CPU + {44AC6B8E-C694-48E7-95B6-5481EB122559}.Debug|x64.ActiveCfg = Debug|Any CPU + {44AC6B8E-C694-48E7-95B6-5481EB122559}.Debug|x64.Build.0 = Debug|Any CPU + {44AC6B8E-C694-48E7-95B6-5481EB122559}.Debug|x86.ActiveCfg = Debug|Any CPU + {44AC6B8E-C694-48E7-95B6-5481EB122559}.Debug|x86.Build.0 = Debug|Any CPU + {44AC6B8E-C694-48E7-95B6-5481EB122559}.Release|Any CPU.ActiveCfg = Release|Any CPU + {44AC6B8E-C694-48E7-95B6-5481EB122559}.Release|Any CPU.Build.0 = Release|Any CPU + {44AC6B8E-C694-48E7-95B6-5481EB122559}.Release|x64.ActiveCfg = Release|Any CPU + {44AC6B8E-C694-48E7-95B6-5481EB122559}.Release|x64.Build.0 = Release|Any CPU + {44AC6B8E-C694-48E7-95B6-5481EB122559}.Release|x86.ActiveCfg = Release|Any CPU + {44AC6B8E-C694-48E7-95B6-5481EB122559}.Release|x86.Build.0 = Release|Any CPU + {D065CA50-272A-4E29-8D75-3A373EBCB98B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D065CA50-272A-4E29-8D75-3A373EBCB98B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D065CA50-272A-4E29-8D75-3A373EBCB98B}.Debug|x64.ActiveCfg = Debug|Any CPU + {D065CA50-272A-4E29-8D75-3A373EBCB98B}.Debug|x64.Build.0 = Debug|Any CPU + {D065CA50-272A-4E29-8D75-3A373EBCB98B}.Debug|x86.ActiveCfg = Debug|Any CPU + {D065CA50-272A-4E29-8D75-3A373EBCB98B}.Debug|x86.Build.0 = Debug|Any CPU + {D065CA50-272A-4E29-8D75-3A373EBCB98B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D065CA50-272A-4E29-8D75-3A373EBCB98B}.Release|Any CPU.Build.0 = Release|Any CPU + {D065CA50-272A-4E29-8D75-3A373EBCB98B}.Release|x64.ActiveCfg = Release|Any CPU + {D065CA50-272A-4E29-8D75-3A373EBCB98B}.Release|x64.Build.0 = Release|Any CPU + {D065CA50-272A-4E29-8D75-3A373EBCB98B}.Release|x86.ActiveCfg = Release|Any CPU + {D065CA50-272A-4E29-8D75-3A373EBCB98B}.Release|x86.Build.0 = Release|Any CPU + {E9EA202F-F5BF-446B-A7BB-886643AE20C6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E9EA202F-F5BF-446B-A7BB-886643AE20C6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E9EA202F-F5BF-446B-A7BB-886643AE20C6}.Debug|x64.ActiveCfg = Debug|Any CPU + {E9EA202F-F5BF-446B-A7BB-886643AE20C6}.Debug|x64.Build.0 = Debug|Any CPU + {E9EA202F-F5BF-446B-A7BB-886643AE20C6}.Debug|x86.ActiveCfg = Debug|Any CPU + {E9EA202F-F5BF-446B-A7BB-886643AE20C6}.Debug|x86.Build.0 = Debug|Any CPU + {E9EA202F-F5BF-446B-A7BB-886643AE20C6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E9EA202F-F5BF-446B-A7BB-886643AE20C6}.Release|Any CPU.Build.0 = Release|Any CPU + {E9EA202F-F5BF-446B-A7BB-886643AE20C6}.Release|x64.ActiveCfg = Release|Any CPU + {E9EA202F-F5BF-446B-A7BB-886643AE20C6}.Release|x64.Build.0 = Release|Any CPU + {E9EA202F-F5BF-446B-A7BB-886643AE20C6}.Release|x86.ActiveCfg = Release|Any CPU + {E9EA202F-F5BF-446B-A7BB-886643AE20C6}.Release|x86.Build.0 = Release|Any CPU + {5DF597D9-BDF2-472A-917B-F82E0D42EC6C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5DF597D9-BDF2-472A-917B-F82E0D42EC6C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5DF597D9-BDF2-472A-917B-F82E0D42EC6C}.Debug|x64.ActiveCfg = Debug|Any CPU + {5DF597D9-BDF2-472A-917B-F82E0D42EC6C}.Debug|x64.Build.0 = Debug|Any CPU + {5DF597D9-BDF2-472A-917B-F82E0D42EC6C}.Debug|x86.ActiveCfg = Debug|Any CPU + {5DF597D9-BDF2-472A-917B-F82E0D42EC6C}.Debug|x86.Build.0 = Debug|Any CPU + {5DF597D9-BDF2-472A-917B-F82E0D42EC6C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5DF597D9-BDF2-472A-917B-F82E0D42EC6C}.Release|Any CPU.Build.0 = Release|Any CPU + {5DF597D9-BDF2-472A-917B-F82E0D42EC6C}.Release|x64.ActiveCfg = Release|Any CPU + {5DF597D9-BDF2-472A-917B-F82E0D42EC6C}.Release|x64.Build.0 = Release|Any CPU + {5DF597D9-BDF2-472A-917B-F82E0D42EC6C}.Release|x86.ActiveCfg = Release|Any CPU + {5DF597D9-BDF2-472A-917B-F82E0D42EC6C}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {D5232164-9233-4B32-8022-75799161F113} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} + {82E2087D-00F6-47A4-90DE-47BFAF40DF39} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} + {9F87FB47-0B6A-435A-BC52-5AE6AC25245C} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} + {C8997ACE-506F-470F-8BD5-EC955BE522E0} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} + {44AC6B8E-C694-48E7-95B6-5481EB122559} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} + {D065CA50-272A-4E29-8D75-3A373EBCB98B} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} + {E9EA202F-F5BF-446B-A7BB-886643AE20C6} = {0AB3BF05-4346-4AA6-1389-037BE0695223} + {5DF597D9-BDF2-472A-917B-F82E0D42EC6C} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B} + EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {FDF77C48-6495-4B15-9513-34DB5A584086} EndGlobalSection diff --git a/README.md b/README.md index 929287b..1307752 100644 --- a/README.md +++ b/README.md @@ -54,8 +54,7 @@ Influunt use MongoDB as persistent data storage and Redis for faster distributed ### Feed -* `FeedCrawler:Enabled` - enable background channels fetch (default: `true`). _In the future, can be disabled in api, and enabled in external worker_ -* `FeedCrawler:FetchInterval` - interval between fetching news from channels (default: `00:30:00` (30min)) +* `FeedCrawler:Enabled` - enable job host inside web host (default: `true`). _It may be disabled if you run Worker service_ * `FeedCrawler:LastActivityDaysAgo` - Minimal user last activity date, for fetch news. If user don't use service more then n days, don't fetch news from user channel (default 93 days (~3 month)) ## Screenshots @@ -66,7 +65,7 @@ Influunt use MongoDB as persistent data storage and Redis for faster distributed ## Using in docker -**Docker compose file for swarm mode:** +**Docker compose file for swarm mode (with worker):** ```Dockerfile version: '3.8' @@ -78,6 +77,7 @@ services: - "ConnectionStrings:Redis:ConnectionString=[Redis address]" # optional redis-based distributed cache connection - "Authentication:Google:ClientSecret=[GOOGLE CLIENT SECRET]" - "Authentication:Google:ClientId=[GOOGLE CLIENT ID]" + - "FeedCrawler:Enabled=false" ports: - target: 80 published: 30002 @@ -90,6 +90,17 @@ services: options: max-size: "3m" max-file: "3" + host: + image: eluki/influunt-worker + environment: + - "ConnectionStrings:Mongo:ConnectionString=[MONGO DB CONNECTION STRING]" + deploy: + replicas: 1 + logging: + driver: "json-file" + options: + max-size: "3m" + max-file: "3" ``` Run stack: ```bash diff --git a/src/Influunt.Feed.Rss/Influunt.Feed.Rss.csproj b/src/Influunt.Feed.Rss/Influunt.Feed.Rss.csproj index f4c5ec1..daf69b0 100644 --- a/src/Influunt.Feed.Rss/Influunt.Feed.Rss.csproj +++ b/src/Influunt.Feed.Rss/Influunt.Feed.Rss.csproj @@ -14,7 +14,7 @@ - + diff --git a/src/Influunt.Feed/Crawler/CrawlerOptions.cs b/src/Influunt.Feed/Crawler/CrawlerOptions.cs index 3ca4f80..d2f5f57 100644 --- a/src/Influunt.Feed/Crawler/CrawlerOptions.cs +++ b/src/Influunt.Feed/Crawler/CrawlerOptions.cs @@ -1,9 +1,6 @@ -using System; - namespace Influunt.Feed.Crawler; public class CrawlerOptions { - public TimeSpan FetchInterval { get; set; } = TimeSpan.FromMinutes(30); public int LastActivityDaysAgo { get; set; } = 31*3; // 3 Month default } \ No newline at end of file diff --git a/src/Influunt.Feed/Crawler/FeedCrawlerBackgroundWorker.cs b/src/Influunt.Feed/Crawler/FeedCrawlerBackgroundWorker.cs deleted file mode 100644 index 68a4407..0000000 --- a/src/Influunt.Feed/Crawler/FeedCrawlerBackgroundWorker.cs +++ /dev/null @@ -1,95 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Influunt.Feed.Entity; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Skidbladnir.Utility.Common; - -namespace Influunt.Feed.Crawler; - -public class FeedCrawlerBackgroundWorker : BackgroundService -{ - private readonly IEnumerable _feedSourceProviders; - private readonly IFeedService _feedService; - private readonly IUserService _userService; - private readonly IChannelService _channelService; - private readonly CrawlerOptions _options; - private readonly ILogger _logger; - - public FeedCrawlerBackgroundWorker(IEnumerable feedSourceProviders, - IFeedService feedService, IUserService userService, - IOptions options, - IChannelService channelService, - ILogger logger) - { - _feedSourceProviders = feedSourceProviders; - _feedService = feedService; - _userService = userService; - _channelService = channelService; - _options = options.Value; - _logger = logger; - } - - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - _logger.LogInformation("Feed Crawler Background Worker running."); - - - while (!stoppingToken.IsCancellationRequested) - { - try - { - await InnerExecute(stoppingToken); - } - catch (Exception e) - { - if (e is OperationCanceledException) - break; - _logger.LogError(e, "Error in Feed Crawler Background Worker when fetch feeds."); - } - await Task.Delay(_options.FetchInterval, stoppingToken); - } - _logger.LogInformation("Feed Crawler Background Worker is stopping."); - - } - - private async Task InnerExecute(CancellationToken token) - { - var minimumLastActivityDate = DateTime.UtcNow - TimeSpan.FromDays(_options.LastActivityDaysAgo); - var users = await _userService.GetUsers(); - token.ThrowIfCancellationRequested(); - foreach (var user in users.Where(x => x.LastActivity > minimumLastActivityDate)) - { - token.ThrowIfCancellationRequested(); - var channels = await Try.DoAsync(() => _channelService.GetUserChannels(user)); - if (channels is null) - continue; - try - { - var fetchTasks = channels.Select(x => FetchFeedFromChannel(x, user, token)); - await Task.WhenAll(fetchTasks); - } - catch (Exception e) - { - if (e is OperationCanceledException) - throw; - _logger.LogError(e, $"Can't fetch feed for user {user.Id}"); - } - } - } - - private async Task FetchFeedFromChannel(FeedChannel channel, User user, CancellationToken token) - { - token.ThrowIfCancellationRequested(); - var remoteFeedProvider = _feedSourceProviders.FirstOrDefault(x => x.CanProcessChannel(channel)); - if (remoteFeedProvider is null) - return; - var remoteFeed = await remoteFeedProvider.GetRemoteFeed(channel); - var count = await _feedService.TryAddToFeed(user, remoteFeed, channel); - _logger.LogInformation($"For user {user.Id} added {count} posts from channel {channel.Id}"); - } -} \ No newline at end of file diff --git a/src/Influunt.Feed/Crawler/FeedCrawlerJob.cs b/src/Influunt.Feed/Crawler/FeedCrawlerJob.cs new file mode 100644 index 0000000..4aa2a42 --- /dev/null +++ b/src/Influunt.Feed/Crawler/FeedCrawlerJob.cs @@ -0,0 +1,47 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Influunt.Scheduler.Abstractions.Models; +using Influunt.Scheduler.Abstractions.Services; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace Influunt.Feed.Crawler; + +public class FeedCrawlerJob : IJob +{ + private readonly IUserService _userService; + private readonly IScheduler _scheduler; + private readonly ILogger _logger; + private readonly CrawlerOptions _options; + + public FeedCrawlerJob(IOptions options, + IUserService userService, + IScheduler scheduler, + ILogger logger) + { + _options = options.Value; + _userService = userService; + _scheduler = scheduler; + _logger = logger; + } + + public async Task ExecuteAsync(CancellationToken cancellationToken) + { + _logger.LogInformation("Feed Crawler job started"); + var minimumLastActivityDate = DateTime.UtcNow - TimeSpan.FromDays(_options.LastActivityDaysAgo); + var users = await _userService.GetUsers(); + + foreach (var user in users.Where(x => x.LastActivity > minimumLastActivityDate)) + { + var triggerId = await _scheduler.ScheduleOneTimeJobAsync(new UserFeedChannelUpdateJobSettings() + { + User = user + }); + _logger.LogInformation("Feed channel update job enqueued for user {email} with trigger id {triggerId}", user.Email, triggerId); + } + _logger.LogInformation("Feed Crawler job ended"); + } + +} diff --git a/src/Influunt.Feed/Crawler/UserFeedChannelUpdateJob.cs b/src/Influunt.Feed/Crawler/UserFeedChannelUpdateJob.cs new file mode 100644 index 0000000..b42b1b7 --- /dev/null +++ b/src/Influunt.Feed/Crawler/UserFeedChannelUpdateJob.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Influunt.Feed.Entity; +using Influunt.Scheduler.Abstractions.Models; +using Microsoft.Extensions.Logging; +using Skidbladnir.Utility.Common; + +namespace Influunt.Feed.Crawler; + +public class UserFeedChannelUpdateJob : IJob +{ + private readonly IEnumerable _feedSourceProviders; + private readonly IFeedService _feedService; + private readonly IChannelService _channelService; + private readonly ILogger _logger; + + public UserFeedChannelUpdateJob(IEnumerable feedSourceProviders, + IFeedService feedService, + IChannelService channelService, + ILogger logger) + { + _feedSourceProviders = feedSourceProviders; + _feedService = feedService; + _channelService = channelService; + _logger = logger; + } + + public async Task ExecuteAsync(UserFeedChannelUpdateJobSettings settings, CancellationToken cancellationToken) + { + if (settings?.User is null) + { + _logger.LogError("User is null"); + return; + } + _logger.BeginScope("Update feed channels job for user {email}", settings.User.Email); + var channels = await Try.DoAsync(() => _channelService.GetUserChannels(settings.User)); + if (channels is null) + return; + + try + { + var fetchTasks = channels.Select(x => FetchFeedFromChannel(x, settings.User)); + await Task.WhenAll(fetchTasks); + } + catch (Exception e) + { + _logger.LogError(e, $"Can't fetch feed"); + } + } + + + private async Task FetchFeedFromChannel(FeedChannel channel, User user) + { + var remoteFeedProvider = _feedSourceProviders.FirstOrDefault(x => x.CanProcessChannel(channel)); + if (remoteFeedProvider is null) + return; + var remoteFeed = await remoteFeedProvider.GetRemoteFeed(channel); + var count = await _feedService.TryAddToFeed(user, remoteFeed, channel); + _logger.LogInformation("Added {count} posts from channel {channelId} to {email} feed", count, channel.Id, user.Email); + } + +} diff --git a/src/Influunt.Feed/Crawler/UserFeedChannelUpdateJobSettings.cs b/src/Influunt.Feed/Crawler/UserFeedChannelUpdateJobSettings.cs new file mode 100644 index 0000000..c7e60ea --- /dev/null +++ b/src/Influunt.Feed/Crawler/UserFeedChannelUpdateJobSettings.cs @@ -0,0 +1,8 @@ +using Influunt.Feed.Entity; + +namespace Influunt.Feed.Crawler; + +public class UserFeedChannelUpdateJobSettings +{ + public User User { get; set; } +} diff --git a/src/Influunt.Feed/Extensions/ServicesExtensions.cs b/src/Influunt.Feed/Extensions/ServicesExtensions.cs new file mode 100644 index 0000000..ef9549c --- /dev/null +++ b/src/Influunt.Feed/Extensions/ServicesExtensions.cs @@ -0,0 +1,15 @@ +using System.Threading.Tasks; +using Influunt.Feed.Crawler; +using Microsoft.Extensions.DependencyInjection; + +namespace Influunt.Feed.Extensions; + +public static class ServicesExtensions +{ + public static IServiceCollection AddFeedJobs(this IServiceCollection services) + { + return services.AddScoped() + .AddScoped() + .AddHostedService(); + } +} diff --git a/src/Influunt.Feed/FeedJobsRegistrationService.cs b/src/Influunt.Feed/FeedJobsRegistrationService.cs new file mode 100644 index 0000000..359d855 --- /dev/null +++ b/src/Influunt.Feed/FeedJobsRegistrationService.cs @@ -0,0 +1,29 @@ +using System.Threading; +using System.Threading.Tasks; +using Influunt.Feed.Crawler; +using Influunt.Scheduler.Abstractions.Services; +using Microsoft.Extensions.Hosting; + +namespace Influunt.Feed; + +public class FeedJobsRegistrationService : BackgroundService +{ + private const string CrawlerJobTriggerId = "RegularFeedCrawlerJob"; + private readonly IScheduler _scheduler; + + public FeedJobsRegistrationService(IScheduler scheduler) + { + _scheduler = scheduler; + + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + await _scheduler.RegisterJobAsync("Users feeds updating job", "The task starts updating RSS feeds for each active user"); + await _scheduler.RegisterJobAsync("User feed updating job", "The task receives new posts from the user's RSS feeds and adds them to the news feed"); + + await _scheduler.UnScheduleJobAsync(CrawlerJobTriggerId); + await _scheduler.ScheduleRecurringJobAsync("*/30 * * * *", CrawlerJobTriggerId); + } + +} diff --git a/src/Influunt.Feed/Influunt.Feed.csproj b/src/Influunt.Feed/Influunt.Feed.csproj index 9d4e64d..385e87d 100644 --- a/src/Influunt.Feed/Influunt.Feed.csproj +++ b/src/Influunt.Feed/Influunt.Feed.csproj @@ -1,19 +1,21 @@ - - - - net8 - Klabukov Erik - Influunt - Influunt - Simple Rss Agregator - Klabukov Erik - false - - - - - - - - - - + + + net8 + Klabukov Erik + Influunt + Influunt - Simple Rss Agregator + Klabukov Erik + false + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/Influunt.Host/Controllers/ChannelController.cs b/src/Influunt.Host/Controllers/ChannelController.cs index 3a9cdfc..afbf504 100644 --- a/src/Influunt.Host/Controllers/ChannelController.cs +++ b/src/Influunt.Host/Controllers/ChannelController.cs @@ -6,6 +6,9 @@ using System.Net; using System.Threading.Tasks; using Influunt.Host.ViewModels; +using Influunt.Scheduler.Abstractions.Services; +using Influunt.Feed.Entity; +using Influunt.Feed.Crawler; namespace Influunt.Host.Controllers; @@ -20,15 +23,18 @@ public class ChannelController : ControllerBase private readonly IUserService _userService; private readonly IChannelService _channelService; private readonly IFeedService _feedService; + private readonly IScheduler _scheduler; /// public ChannelController(IUserService userService, IChannelService channelService, - IFeedService feedService) + IFeedService feedService, + IScheduler scheduler) { _userService = userService; _channelService = channelService; _feedService = feedService; + _scheduler = scheduler; } /// @@ -58,7 +64,7 @@ public async Task Get(string id) if (channel.UserId.Equals(user.Id, StringComparison.OrdinalIgnoreCase)) return channel.ToModel(); - Response.StatusCode = (int) HttpStatusCode.Forbidden; + Response.StatusCode = (int)HttpStatusCode.Forbidden; return null; } @@ -113,7 +119,7 @@ public async Task Delete(string id) { var user = await _userService.GetCurrentUser(); var channel = await _channelService.Get(id); - if (!channel.UserId.Equals(user.Id, StringComparison.OrdinalIgnoreCase)) + if (!channel.UserId.Equals(user.Id, StringComparison.OrdinalIgnoreCase)) return Forbid(); await _channelService.Remove(user, channel); #pragma warning disable 4014 @@ -121,4 +127,17 @@ public async Task Delete(string id) #pragma warning restore 4014 return Ok(); } + + private async Task ScheduleFeedUpdateJob(User user) + { + var triggerId = $"user_{user.Id}_manual_feed_update"; + await _scheduler.UnScheduleJobAsync(triggerId); + await _scheduler.ScheduleOneTimeJobAsync( + new UserFeedChannelUpdateJobSettings() + { + User = user + }, + DateTime.UtcNow + TimeSpan.FromMinutes(10), + triggerId); + } } \ No newline at end of file diff --git a/src/Influunt.Host/Influunt.Host.csproj b/src/Influunt.Host/Influunt.Host.csproj index c04b05d..9acdf12 100644 --- a/src/Influunt.Host/Influunt.Host.csproj +++ b/src/Influunt.Host/Influunt.Host.csproj @@ -24,8 +24,7 @@ - - + @@ -34,16 +33,17 @@ - - - - - + + + + + + diff --git a/src/Influunt.Host/Program.cs b/src/Influunt.Host/Program.cs index a5a52d7..5c1f85d 100644 --- a/src/Influunt.Host/Program.cs +++ b/src/Influunt.Host/Program.cs @@ -1,6 +1,8 @@ +using System; using Influunt.Feed.Crawler; using Influunt.Host; using Influunt.Host.Configurations; +using Influunt.Scheduler; using Influunt.Storage; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.HttpOverrides; @@ -13,10 +15,17 @@ var builder = WebApplication.CreateBuilder(args); builder.Services.AddOptions(); +Action schedulerConfigureAction = (c) => +{ + builder.Configuration.GetSection("Scheduler").Bind(c); + c.ConnectionString = builder.Configuration.GetSection("ConnectionStrings:Mongo").GetValue("ConnectionString"); +}; builder.Services.Configure(builder.Configuration.GetSection("FeedCrawler")); var crawlerEnabled = builder.Configuration.GetSection("FeedCrawler:Enabled").Get(); if (crawlerEnabled) - builder.Services.AddHostedService(); + builder.Services.AddJobSchedulerHost(schedulerConfigureAction); +else + builder.Services.AddJobScheduler(schedulerConfigureAction); builder.Services.AddSkidbladnirModules(configuration => { diff --git a/src/Influunt.Host/StartupModule.cs b/src/Influunt.Host/StartupModule.cs index 95d56c1..96d3931 100644 --- a/src/Influunt.Host/StartupModule.cs +++ b/src/Influunt.Host/StartupModule.cs @@ -1,4 +1,5 @@ using System; +using Influunt.Feed.Extensions; using Influunt.Feed.Rss; using Influunt.Host.Configurations; using Influunt.Host.Services; @@ -23,6 +24,7 @@ public override void Configure(IServiceCollection services) .PersistKeysToMongoDb(Configuration.AppConfiguration["ConnectionStrings:Mongo:ConnectionString"]); ConfigureDistributedCache(services); services.TryAddSingleton(); + services.AddFeedJobs(); } private void ConfigureDistributedCache(IServiceCollection services) diff --git a/src/Influunt.Host/WebModule.cs b/src/Influunt.Host/WebModule.cs index 1db7180..26edf98 100644 --- a/src/Influunt.Host/WebModule.cs +++ b/src/Influunt.Host/WebModule.cs @@ -7,7 +7,7 @@ using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.HttpOverrides; using Microsoft.Extensions.DependencyInjection; -using Microsoft.OpenApi.Models; +using Microsoft.OpenApi; using Skidbladnir.Modules; namespace Influunt.Host; diff --git a/src/Influunt.Host/appsettings.Development.json b/src/Influunt.Host/appsettings.Development.json index 3f90f31..827a2f8 100644 --- a/src/Influunt.Host/appsettings.Development.json +++ b/src/Influunt.Host/appsettings.Development.json @@ -1,10 +1,7 @@ -{ - "Logging": { - "LogLevel": { - "Default": "Debug" - } - }, - "FeedService": { - "FeedUpdateCron": "* * * * *" - } -} +{ + "Logging": { + "LogLevel": { + "Default": "Debug" + } + } +} diff --git a/src/Influunt.Host/appsettings.json b/src/Influunt.Host/appsettings.json index ea6dbdd..fe4bd46 100644 --- a/src/Influunt.Host/appsettings.json +++ b/src/Influunt.Host/appsettings.json @@ -23,5 +23,11 @@ }, "FeedCrawler": { "Enabled": true - } + }, + "Scheduler":{ + "ScanIntervalSeconds": 30, + "LockTimeoutSeconds": 600, + "HeartbeatIntervalSeconds": 60, + "MaxConcurrentJobs": 10 + } } diff --git a/src/Influunt.Scheduler.Abstractions/Influunt.Scheduler.Abstractions.csproj b/src/Influunt.Scheduler.Abstractions/Influunt.Scheduler.Abstractions.csproj new file mode 100644 index 0000000..a6b9113 --- /dev/null +++ b/src/Influunt.Scheduler.Abstractions/Influunt.Scheduler.Abstractions.csproj @@ -0,0 +1,7 @@ + + + net8.0 + enable + enable + + \ No newline at end of file diff --git a/src/Influunt.Scheduler.Abstractions/Models/ExecutionStatus.cs b/src/Influunt.Scheduler.Abstractions/Models/ExecutionStatus.cs new file mode 100644 index 0000000..d9ee44d --- /dev/null +++ b/src/Influunt.Scheduler.Abstractions/Models/ExecutionStatus.cs @@ -0,0 +1,9 @@ +namespace Influunt.Scheduler.Abstractions.Models; + +public enum ExecutionStatus +{ + Pending, // Awaiting execution (lock not acquired) + Running, // Running (lock acquired) + Completed, // Successfully completed + Failed // Failed +} \ No newline at end of file diff --git a/src/Influunt.Scheduler.Abstractions/Models/IJob.cs b/src/Influunt.Scheduler.Abstractions/Models/IJob.cs new file mode 100644 index 0000000..c22e21b --- /dev/null +++ b/src/Influunt.Scheduler.Abstractions/Models/IJob.cs @@ -0,0 +1,10 @@ +namespace Influunt.Scheduler.Abstractions.Models; +public interface IJob +{ + Task ExecuteAsync(CancellationToken cancellationToken); +} + +public interface IJob +{ + Task ExecuteAsync(TSettings settings, CancellationToken cancellationToken); +} \ No newline at end of file diff --git a/src/Influunt.Scheduler.Abstractions/Models/JobDefinition.cs b/src/Influunt.Scheduler.Abstractions/Models/JobDefinition.cs new file mode 100644 index 0000000..747af6b --- /dev/null +++ b/src/Influunt.Scheduler.Abstractions/Models/JobDefinition.cs @@ -0,0 +1,11 @@ +namespace Influunt.Scheduler.Abstractions.Models; + +public class JobDefinition +{ + public string Id { get; set; } + public string Name { get; set; } + public string? Description { get; set; } + public string JobType { get; set; } + public string SettingsType { get; set; } + public bool AllowConcurrentExecution { get; set; } +} \ No newline at end of file diff --git a/src/Influunt.Scheduler.Abstractions/Models/JobExecution.cs b/src/Influunt.Scheduler.Abstractions/Models/JobExecution.cs new file mode 100644 index 0000000..1e435b1 --- /dev/null +++ b/src/Influunt.Scheduler.Abstractions/Models/JobExecution.cs @@ -0,0 +1,15 @@ +namespace Influunt.Scheduler.Abstractions.Models; + +public class JobExecution +{ + public string Id { get; set; } + public string TriggerId { get; set; } + public string JobType { get; set; } + public ExecutionStatus Status { get; set; } + public DateTime StartTime { get; set; } + public DateTime? EndTime { get; set; } + public string Result { get; set; } + public string LockId { get; set; } + public DateTime LockExpiry { get; set; } + public DateTime LastHeartbeat { get; set; } +} \ No newline at end of file diff --git a/src/Influunt.Scheduler.Abstractions/Models/JobTrigger.cs b/src/Influunt.Scheduler.Abstractions/Models/JobTrigger.cs new file mode 100644 index 0000000..da4986d --- /dev/null +++ b/src/Influunt.Scheduler.Abstractions/Models/JobTrigger.cs @@ -0,0 +1,12 @@ +namespace Influunt.Scheduler; + +public class JobTrigger +{ + public string Id { get; set; } + public string TriggerId { get; set; } + public string JobType { get; set; } + public DateTime NextRunTime { get; set; } + public bool IsRecurring => !string.IsNullOrEmpty(CronExpression); + public string? CronExpression { get; set; } + public string? SettingsJson { get; set; } +} \ No newline at end of file diff --git a/src/Influunt.Scheduler.Abstractions/Services/IScheduler.cs b/src/Influunt.Scheduler.Abstractions/Services/IScheduler.cs new file mode 100644 index 0000000..61dcd2c --- /dev/null +++ b/src/Influunt.Scheduler.Abstractions/Services/IScheduler.cs @@ -0,0 +1,86 @@ +using Influunt.Scheduler.Abstractions.Models; + +namespace Influunt.Scheduler.Abstractions.Services; + +/// +/// Scheduler service that manages job registration and scheduling. +/// +public interface IScheduler +{ + /// + /// Registers a job with the specified name and optional description. + /// + /// The type of the job implementing . + /// The type of the job settings. + /// The name of the job. + /// An optional description of the job. + /// Whether the job allows concurrent execution. + Task RegisterJobAsync(string name, string? description = null, bool allowConcurrentExecution = false) where TJob : IJob; + + /// + /// Registers a job with the specified name and optional description. + /// + /// The type of the job implementing . + /// The name of the job. + /// An optional description of the job. + /// Whether the job allows concurrent execution. + Task RegisterJobAsync(string name, string? description = null, bool allowConcurrentExecution = false) where TJob : IJob; + + /// + /// Schedules a one-time job to run at the specified time. + /// + /// The type of the job implementing . + /// The time at which the job should run. If null, the job will run as soon as possible. + /// An optional identifier for the trigger. + /// The identifier of the scheduled trigger. + Task ScheduleOneTimeJobAsync(DateTime? runAt = null, string? triggerId = null) where TJob : IJob; + + /// + /// Schedules a recurring job based on the specified cron expression. + /// + /// The type of the job implementing . + /// The cron expression defining the schedule. + /// An optional identifier for the trigger. + /// The identifier of the scheduled trigger. + Task ScheduleRecurringJobAsync(string cronExpression, string? triggerId = null) where TJob : IJob; + + /// + /// Schedules a one-time job with settings to run at the specified time. + /// + /// The type of the job implementing . + /// The type of the job settings. + /// The settings for the job. + /// The time at which the job should run. If null, the job will run as soon as possible. + /// An optional identifier for the trigger. + /// The identifier of the scheduled trigger. + Task ScheduleOneTimeJobAsync(TSettings settings, DateTime? runAt = null, string? triggerId = null) where TJob : IJob; + + /// + /// Schedules a recurring job with settings based on the specified cron expression. + /// + /// The type of the job implementing . + /// The type of the job settings. + /// The cron expression defining the schedule. + /// The settings for the job. + /// An optional identifier for the trigger. + /// The identifier of the scheduled trigger. + Task ScheduleRecurringJobAsync(string cronExpression, TSettings settings, string? triggerId = null) where TJob : IJob; + + /// + /// Unschedules a job with the specified trigger identifier. + /// + /// The identifier of the trigger to unschedule. + Task UnScheduleJobAsync(string triggerId); + + /// + /// Retrieves all registered job definitions. + /// + /// A collection of registered job definitions. + Task> GetRegisteredJobsAsync(); + + /// + /// Retrieves all scheduled job triggers. + /// + /// A collection of scheduled job triggers. + Task> GetScheduledTriggersAsync(); +} diff --git a/src/Influunt.Scheduler/Constants.cs b/src/Influunt.Scheduler/Constants.cs new file mode 100644 index 0000000..fb57fb5 --- /dev/null +++ b/src/Influunt.Scheduler/Constants.cs @@ -0,0 +1,22 @@ +namespace Influunt.Scheduler; + +/// +/// Constants for the job scheduler module. +/// +public static class Constants +{ + /// + /// Job definitions collection name. + /// + public const string JobDefinitionsCollectionName = "job_definitions"; + + /// + /// Job triggers collection name. + /// + public const string JobTriggersCollectionName = "job_triggers"; + + /// + /// Job executions collection name. + /// + public const string JobExecutionsCollectionName = "job_executions"; +} \ No newline at end of file diff --git a/src/Influunt.Scheduler/CronHelper.cs b/src/Influunt.Scheduler/CronHelper.cs new file mode 100644 index 0000000..6c7ca2a --- /dev/null +++ b/src/Influunt.Scheduler/CronHelper.cs @@ -0,0 +1,12 @@ +using Cronos; + +namespace Influunt.Scheduler; + +public static class CronHelper +{ + public static DateTime? GetNextOccurrence(string cronExpression, DateTime currentTime) + { + CronExpression expression = CronExpression.Parse(cronExpression); + return expression.GetNextOccurrence(currentTime); + } +} diff --git a/src/Influunt.Scheduler/IMongoContext.cs b/src/Influunt.Scheduler/IMongoContext.cs new file mode 100644 index 0000000..d787a5e --- /dev/null +++ b/src/Influunt.Scheduler/IMongoContext.cs @@ -0,0 +1,27 @@ +using MongoDB.Driver; + +namespace Influunt.Scheduler; + +/// +/// Interface for working with MongoDB context +/// +public interface IMongoContext +{ + /// + /// Gets the MongoDB database + /// + IMongoDatabase Database { get; } + + /// + /// Gets the collection of the specified type + /// + /// Type of collection documents + /// Collection name (if null, the type name will be used) + /// MongoDB collection + IMongoCollection GetCollection(string collectionName = null); + + /// + /// Releases resources + /// + void Dispose(); +} \ No newline at end of file diff --git a/src/Influunt.Scheduler/IndexCreationBackgroundService.cs b/src/Influunt.Scheduler/IndexCreationBackgroundService.cs new file mode 100644 index 0000000..8a51b1c --- /dev/null +++ b/src/Influunt.Scheduler/IndexCreationBackgroundService.cs @@ -0,0 +1,158 @@ +using Influunt.Scheduler.Abstractions.Models; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using MongoDB.Driver; + +namespace Influunt.Scheduler; + +/// +/// Background service for creating MongoDB indexes for the scheduler module. +/// Runs once when the application starts. +/// +public class IndexCreationBackgroundService : BackgroundService +{ + private readonly IServiceProvider _serviceProvider; + private readonly ILogger _logger; + + public IndexCreationBackgroundService( + IServiceProvider serviceProvider, + ILogger logger) + { + _serviceProvider = serviceProvider; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("Index Creation Background Service started."); + + try + { + await CreateIndexesAsync(stoppingToken); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error creating MongoDB indexes"); + throw; + } + finally + { + _logger.LogInformation("Index Creation Background Service finished."); + } + } + + /// + /// Creates indexes for scheduler collections. + /// + private async Task CreateIndexesAsync(CancellationToken cancellationToken) + { + _logger.LogInformation("Starting MongoDB index creation for scheduler module..."); + + // Using scope to get dependencies with correct lifetime + using var scope = _serviceProvider.CreateScope(); + var context = scope.ServiceProvider.GetRequiredService(); + + await CreateJobTriggerIndexesAsync(context, cancellationToken); + await CreateJobDefinitionIndexesAsync(context, cancellationToken); + await CreateJobExecutionIndexesAsync(context, cancellationToken); + + _logger.LogInformation("MongoDB index creation for scheduler module completed."); + } + + private async Task CreateJobDefinitionIndexesAsync(IMongoContext context, CancellationToken cancellationToken) + { + var collection = context.GetCollection(Constants.JobDefinitionsCollectionName); + + // Unique index by job name + await collection.Indexes.CreateOneAsync( + new CreateIndexModel( + Builders.IndexKeys.Ascending(x => x.Name), + new CreateIndexOptions { Unique = true, Background = true }), + cancellationToken: cancellationToken); + + _logger.LogDebug("Created unique index JobDefinition.Name"); + + // Unique index by job type + await collection.Indexes.CreateOneAsync( + new CreateIndexModel( + Builders.IndexKeys.Ascending(x => x.JobType), + new CreateIndexOptions { Unique = true, Background = true }), + cancellationToken: cancellationToken); + + _logger.LogDebug("Created unique index JobDefinition.JobType"); + } + + private async Task CreateJobTriggerIndexesAsync(IMongoContext context, CancellationToken cancellationToken) + { + var collection = context.GetCollection(Constants.JobTriggersCollectionName); + + // Index for searching triggers by next run time + await collection.Indexes.CreateOneAsync( + new CreateIndexModel( + Builders.IndexKeys.Ascending(x => x.NextRunTime), + new CreateIndexOptions { Background = true }), + cancellationToken: cancellationToken); + + _logger.LogDebug("Created index JobTrigger.NextRunTime"); + + // Unique index by trigger identifier + await collection.Indexes.CreateOneAsync( + new CreateIndexModel( + Builders.IndexKeys.Ascending(x => x.TriggerId), + new CreateIndexOptions { Unique = true, Background = true }), + cancellationToken: cancellationToken); + + _logger.LogDebug("Created unique index JobTrigger.TriggerId"); + } + + private async Task CreateJobExecutionIndexesAsync(IMongoContext context, CancellationToken cancellationToken) + { + var collection = context.GetCollection(Constants.JobExecutionsCollectionName); + + // Unique index by trigger identifier (to ensure only one execution per trigger) + await collection.Indexes.CreateOneAsync( + new CreateIndexModel( + Builders.IndexKeys.Ascending(x => x.TriggerId), + new CreateIndexOptions { Unique = true, Background = true }), + cancellationToken: cancellationToken); + + _logger.LogDebug("Created unique index JobExecution.TriggerId"); + + // Index for searching running jobs by lock + await collection.Indexes.CreateOneAsync( + new CreateIndexModel( + Builders.IndexKeys.Ascending(x => x.LockId).Ascending(x => x.Status), + new CreateIndexOptions { Background = true }), + cancellationToken: cancellationToken); + + _logger.LogDebug("Created compound index JobExecution.LockId+Status"); + + // Index by job type and status + await collection.Indexes.CreateOneAsync( + new CreateIndexModel( + Builders.IndexKeys.Ascending(x => x.JobType).Ascending(x => x.Status), + new CreateIndexOptions { Background = true }), + cancellationToken: cancellationToken); + + _logger.LogDebug("Created compound index JobExecution.JobType+Status"); + + // Index for searching executions by trigger and status (used in ScheduleJobAsync) + await collection.Indexes.CreateOneAsync( + new CreateIndexModel( + Builders.IndexKeys.Ascending(x => x.TriggerId).Ascending(x => x.Status), + new CreateIndexOptions { Background = true }), + cancellationToken: cancellationToken); + + _logger.LogDebug("Created compound index JobExecution.TriggerId+Status"); + + // Index for searching expired locks (used in CreateAndStartJobExecution and TryCaptureExistingExecutionLock) + await collection.Indexes.CreateOneAsync( + new CreateIndexModel( + Builders.IndexKeys.Ascending(x => x.LockExpiry), + new CreateIndexOptions { Background = true }), + cancellationToken: cancellationToken); + + _logger.LogDebug("Created index JobExecution.LockExpiry"); + } +} \ No newline at end of file diff --git a/src/Influunt.Scheduler/Influunt.Scheduler.csproj b/src/Influunt.Scheduler/Influunt.Scheduler.csproj new file mode 100644 index 0000000..f1d7e75 --- /dev/null +++ b/src/Influunt.Scheduler/Influunt.Scheduler.csproj @@ -0,0 +1,20 @@ + + + net8.0 + enable + enable + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/Influunt.Scheduler/JobDefinitionClassMap.cs b/src/Influunt.Scheduler/JobDefinitionClassMap.cs new file mode 100644 index 0000000..d81a0d2 --- /dev/null +++ b/src/Influunt.Scheduler/JobDefinitionClassMap.cs @@ -0,0 +1,21 @@ +using Influunt.Scheduler.Abstractions.Models; +using MongoDB.Bson; +using MongoDB.Bson.Serialization; +using MongoDB.Bson.Serialization.IdGenerators; +using MongoDB.Bson.Serialization.Serializers; + +namespace Influunt.Scheduler; + +public class JobDefinitionClassMap : BsonClassMap +{ + public JobDefinitionClassMap() + { + AutoMap(); + MapIdMember(x => x.Id) + .SetIdGenerator(StringObjectIdGenerator.Instance) + .SetSerializer(new StringSerializer(BsonType.ObjectId)); + SetIgnoreExtraElements(true); + MapProperty(x => x.Id) + .SetIgnoreIfNull(true); + } +} diff --git a/src/Influunt.Scheduler/JobExecutionClassMap.cs b/src/Influunt.Scheduler/JobExecutionClassMap.cs new file mode 100644 index 0000000..208a046 --- /dev/null +++ b/src/Influunt.Scheduler/JobExecutionClassMap.cs @@ -0,0 +1,21 @@ +using Influunt.Scheduler.Abstractions.Models; +using MongoDB.Bson; +using MongoDB.Bson.Serialization; +using MongoDB.Bson.Serialization.IdGenerators; +using MongoDB.Bson.Serialization.Serializers; + +namespace Influunt.Scheduler; + +public class JobExecutionClassMap : BsonClassMap +{ + public JobExecutionClassMap() + { + AutoMap(); + MapIdMember(x => x.Id) + .SetIdGenerator(StringObjectIdGenerator.Instance) + .SetSerializer(new StringSerializer(BsonType.ObjectId)); + SetIgnoreExtraElements(true); + MapProperty(x => x.Id) + .SetIgnoreIfNull(true); + } +} diff --git a/src/Influunt.Scheduler/JobScheduler.cs b/src/Influunt.Scheduler/JobScheduler.cs new file mode 100644 index 0000000..4bf4b28 --- /dev/null +++ b/src/Influunt.Scheduler/JobScheduler.cs @@ -0,0 +1,516 @@ +using System.Collections.Concurrent; +using System.Text.Json; +using Influunt.Scheduler.Abstractions.Models; +using Influunt.Scheduler.Abstractions.Services; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using MongoDB.Driver; + +namespace Influunt.Scheduler; + + internal class JobScheduler : IScheduler, IHostedService, IDisposable +{ + private readonly IServiceProvider _serviceProvider; + private readonly ILogger _logger; + private readonly JobSchedulerOptions _options; + private readonly IMongoContext _context; + private readonly string _instanceId; // unique scheduler instance identifier + private readonly ConcurrentDictionary _runningJobs = new(); + private Timer _scanTimer; + private Timer _heartbeatTimer; + private bool _disposed; + + public JobScheduler(IServiceProvider serviceProvider, IOptions options, ILogger logger, IMongoContext context) + { + _serviceProvider = serviceProvider; + _logger = logger; + _options = options.Value; + _context = context; + + _instanceId = $"{Environment.MachineName}_{Guid.NewGuid():N}"; + } + + private IMongoCollection _jobDefinitions => _context.GetCollection(Constants.JobDefinitionsCollectionName); + private IMongoCollection _triggers => _context.GetCollection(Constants.JobTriggersCollectionName); + private IMongoCollection _executions => _context.GetCollection(Constants.JobExecutionsCollectionName); + + public Task StartAsync(CancellationToken cancellationToken) + { + _logger.LogInformation("JobScheduler starting. InstanceId: {InstanceId}", _instanceId); + + // Start timers + _scanTimer = new Timer(ScanTriggers, null, TimeSpan.Zero, TimeSpan.FromSeconds(_options.ScanIntervalSeconds)); + _heartbeatTimer = new Timer(Heartbeat, null, TimeSpan.FromSeconds(_options.HeartbeatIntervalSeconds), TimeSpan.FromSeconds(_options.HeartbeatIntervalSeconds)); + + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + _logger.LogInformation("JobScheduler stopping..."); + + _scanTimer?.Change(Timeout.Infinite, 0); + _heartbeatTimer?.Change(Timeout.Infinite, 0); + + // Cancel all running jobs + foreach (var cts in _runningJobs.Values) + cts.Cancel(); + + // Wait for all jobs to complete (simplified) + // In a real project, more careful waiting for completion is needed + return Task.CompletedTask; + } + + public async Task RegisterJobAsync(string name, string? description = null, bool allowConcurrentExecution = false) + where TJob : IJob + { + var jobDef = new JobDefinition + { + Name = name, + Description = description, + JobType = typeof(TJob)!.AssemblyQualifiedName!, + SettingsType = typeof(TSettings)!.AssemblyQualifiedName!, + AllowConcurrentExecution = allowConcurrentExecution + }; + await _jobDefinitions.ReplaceOneAsync(d => d.Name == name, jobDef, new ReplaceOptions { IsUpsert = true }); + } + + public async Task RegisterJobAsync(string name, string? description = null, bool allowConcurrentExecution = false) + where TJob : IJob + { + var jobDef = new JobDefinition + { + Name = name, + Description = description, + JobType = typeof(TJob)!.AssemblyQualifiedName!, + AllowConcurrentExecution = allowConcurrentExecution + }; + await _jobDefinitions.ReplaceOneAsync(d => d.Name == name, jobDef, new ReplaceOptions { IsUpsert = true }); + } + + public async Task ScheduleOneTimeJobAsync(DateTime? runAt = null, string? triggerId = null) where TJob : IJob + { + var cursor = await _jobDefinitions.FindAsync(x => x.JobType == typeof(TJob).AssemblyQualifiedName); + var registeredJob = await cursor.FirstOrDefaultAsync(); + if (registeredJob is null) + throw new InvalidOperationException("Job not registered"); + + var trigger = new JobTrigger + { + TriggerId = string.IsNullOrEmpty(triggerId) ? Guid.NewGuid().ToString() : triggerId, + JobType = registeredJob.JobType, + NextRunTime = runAt.HasValue ? runAt.Value.ToUniversalTime() : DateTime.UtcNow + }; + await _triggers.InsertOneAsync(trigger); + return trigger.TriggerId; + } + + public async Task ScheduleRecurringJobAsync(string cronExpression, string? triggerId = null) where TJob : IJob + { + var cursor = await _jobDefinitions.FindAsync(x => x.JobType == typeof(TJob).AssemblyQualifiedName); + var registeredJob = await cursor.FirstOrDefaultAsync(); + if (registeredJob is null) + throw new InvalidOperationException("Job not registered"); + + var nextRun = CronHelper.GetNextOccurrence(cronExpression, DateTime.UtcNow); + if (!nextRun.HasValue) + throw new ArgumentException("Cron expression does not produce any future occurrence"); + + var trigger = new JobTrigger + { + TriggerId = string.IsNullOrEmpty(triggerId) ? Guid.NewGuid().ToString() : triggerId, + JobType = registeredJob.JobType, + NextRunTime = nextRun.Value, + CronExpression = cronExpression + }; + await _triggers.InsertOneAsync(trigger); + return trigger.TriggerId; + } + + public async Task ScheduleOneTimeJobAsync(TSettings settings, DateTime? runAt = null, string? triggerId = null) where TJob : IJob + { + var cursor = await _jobDefinitions.FindAsync(x => x.JobType == typeof(TJob).AssemblyQualifiedName); + var registeredJob = await cursor.FirstOrDefaultAsync(); + if (registeredJob is null) + throw new InvalidOperationException("Job not registered"); + if (registeredJob.SettingsType != typeof(TSettings).AssemblyQualifiedName) + throw new InvalidOperationException("The task settings type does not match the registered type."); + + var trigger = new JobTrigger + { + TriggerId = string.IsNullOrEmpty(triggerId) ? Guid.NewGuid().ToString() : triggerId, + JobType = registeredJob.JobType, + NextRunTime = runAt.HasValue ? runAt.Value.ToUniversalTime() : DateTime.UtcNow, + SettingsJson = JsonSerializer.Serialize(settings) + }; + await _triggers.InsertOneAsync(trigger); + return trigger.TriggerId; + } + + public async Task ScheduleRecurringJobAsync(string cronExpression, TSettings settings, string? triggerId = null) where TJob : IJob + { + var cursor = await _jobDefinitions.FindAsync(x => x.JobType == typeof(TJob).AssemblyQualifiedName); + var registeredJob = await cursor.FirstOrDefaultAsync(); + if (registeredJob is null) + throw new InvalidOperationException("Job not registered"); + if (registeredJob.SettingsType != typeof(TSettings).AssemblyQualifiedName) + throw new InvalidOperationException("The task settings type does not match the registered type."); + + var nextRun = CronHelper.GetNextOccurrence(cronExpression, DateTime.UtcNow); + if (!nextRun.HasValue) + throw new ArgumentException("Cron expression does not produce any future occurrence"); + + var trigger = new JobTrigger + { + TriggerId = string.IsNullOrEmpty(triggerId) ? Guid.NewGuid().ToString() : triggerId, + JobType = registeredJob.JobType, + NextRunTime = nextRun.Value, + CronExpression = cronExpression, + SettingsJson = JsonSerializer.Serialize(settings) + }; + await _triggers.InsertOneAsync(trigger); + return trigger.TriggerId; + } + + public async Task UnScheduleJobAsync(string triggerId) + { + await _triggers.DeleteOneAsync(x => x.TriggerId == triggerId); + } + + public async Task> GetRegisteredJobsAsync() + { + var cursor = await _jobDefinitions.FindAsync(x => true); + return await cursor.ToListAsync(); + } + + public async Task> GetScheduledTriggersAsync() + { + var cursor = await _triggers.FindAsync(x => true); + return await cursor.ToListAsync(); + } + + // ============================== Trigger Scanning ============================== + + private async void ScanTriggers(object? state) + { + try + { + var now = DateTime.UtcNow; + // Find triggers where NextRunTime <= now and are active + var filter = Builders.Filter.And( + Builders.Filter.Lte(t => t.NextRunTime, now)); + var triggers = await _triggers.Find(filter).ToListAsync(); + + // Check current number of running jobs + var runningCount = _runningJobs.Count; + + // If limit is already reached, skip all triggers + if (runningCount >= _options.MaxConcurrentJobs) + { + _logger.LogDebug("Concurrency limit reached ({Running}/{Max}). Skipping all triggers in this scan cycle.", + runningCount, _options.MaxConcurrentJobs); + return; + } + + // Calculate how many more jobs can be started + var availableSlots = _options.MaxConcurrentJobs - runningCount; + var processedTriggers = 0; + + foreach (var trigger in triggers) + { + // If maximum possible number of triggers have been processed, exit + if (processedTriggers >= availableSlots) + { + _logger.LogDebug("Available slots exhausted. Processed {Processed}/{Available} triggers in this cycle.", + processedTriggers, availableSlots); + break; + } + + try + { + // For each trigger, attempt to create execution + await ScheduleJobAsync(trigger); + processedTriggers++; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error scheduling job for trigger {TriggerId}", trigger.TriggerId); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error scanning triggers"); + } + } + + private async Task ScheduleJobAsync(JobTrigger trigger) + { + // 1. Get JobDefinition to determine if the same trigger can be executed concurrently for a job + var jobDef = await _jobDefinitions.Find(d => d.JobType == trigger.JobType).FirstOrDefaultAsync(); + if (jobDef == null) + { + _logger.LogWarning("Job definition not found for trigger {TriggerId}", trigger.Id); + return; + } + + // 2. Check if there is a lock for the current TRIGGER + var existingExecutionFilter = Builders.Filter.And( + Builders.Filter.Eq(e => e.TriggerId, trigger.TriggerId), + Builders.Filter.Eq(e => e.Status, ExecutionStatus.Running) + ); + var existingExecution = await _executions.Find(existingExecutionFilter).FirstOrDefaultAsync(); + + if (existingExecution == null) + { + // 3. Clean up any previous terminal executions for this trigger + // (Completed or Failed from a previous recurring run) + await _executions.DeleteManyAsync( + Builders.Filter.And( + Builders.Filter.Eq(e => e.TriggerId, trigger.TriggerId), + Builders.Filter.Or( + Builders.Filter.Eq(e => e.Status, ExecutionStatus.Completed), + Builders.Filter.Eq(e => e.Status, ExecutionStatus.Failed) + ) + ) + ); + + // 4. If JobExecution doesn't exist, create it, attempt to acquire lock, start the job + await CreateAndStartJobExecution(trigger, jobDef); + return; + } + + // Trigger is already running, check the time + var now = DateTime.UtcNow; + + if (trigger.IsRecurring) + { + // 2.2 If job is recurring, check if jobExecution belongs to the current run period + // nextRunTime is approximately equal to JobExecution startTime (epsilon time of 30 seconds) + var epsilon = TimeSpan.FromSeconds(30); + var timeDiff = Math.Abs((trigger.NextRunTime - existingExecution.StartTime).TotalSeconds); + + if (timeDiff <= epsilon.TotalSeconds) + { + _logger.LogDebug("Recurring job '{JobName}' trigger {TriggerId} is already running for current period. Skipping.", + jobDef.Name, trigger.TriggerId); + return; + } + } + else + { + // 2.1 If jobs are one-time, behavior should be like with AllowConcurrentExecution=false + // Don't allow to start, but only if this lock is not expired + // (another replica could have died and the lock is long expired) + if (existingExecution.LockExpiry > now) + { + _logger.LogDebug("One-time job '{JobName}' trigger {TriggerId} is already running and lock is not expired. Skipping.", + jobDef.Name, trigger.TriggerId); + return; + } + } + + // If lock is expired (for one-time) or for new period (for recurring), + // attempt to capture lock of existing JobExecution + await TryCaptureExistingExecutionLock(trigger, jobDef, existingExecution); + } + + private async Task CreateAndStartJobExecution(JobTrigger trigger, JobDefinition jobDef) + { + // Create new execution record + var execution = new JobExecution + { + TriggerId = trigger.TriggerId, + JobType = trigger.JobType, + Status = ExecutionStatus.Pending, + StartTime = DateTime.UtcNow, + LockId = null, + LockExpiry = DateTime.UtcNow.AddSeconds(_options.LockTimeoutSeconds), + LastHeartbeat = DateTime.UtcNow + }; + + try + { + await _executions.InsertOneAsync(execution); + } + catch (MongoWriteException ex) when (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey || ex.WriteError?.Code == 11000) + { + // Duplicate key - another replica already created JobExecution for this trigger + _logger.LogDebug("Duplicate JobExecution for trigger {TriggerId} detected. Another replica already created it.", trigger.TriggerId); + return; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error creating JobExecution for trigger {TriggerId}", trigger.TriggerId); + return; + } + + await TryCaptureExistingExecutionLock(trigger, jobDef, execution, ExecutionStatus.Pending); + } + + private async Task TryCaptureExistingExecutionLock(JobTrigger trigger, JobDefinition jobDef, JobExecution existingExecution, ExecutionStatus expectedExecutionStatus = ExecutionStatus.Running ) + { + var lockExpiry = DateTime.UtcNow.AddSeconds(_options.LockTimeoutSeconds); + + // Attempt to capture lock of existing JobExecution + var filter = Builders.Filter.And( + Builders.Filter.Eq(e => e.Id, existingExecution.Id), + Builders.Filter.Eq(e => e.Status, expectedExecutionStatus), + Builders.Filter.Or( + Builders.Filter.Eq(e => e.LockId, null), + Builders.Filter.Lt(e => e.LockExpiry, DateTime.UtcNow) + ) + ); + + var update = Builders.Update + .Set(e => e.LockId, _instanceId) + .Set(e => e.LockExpiry, lockExpiry) + .Set(e => e.LastHeartbeat, DateTime.UtcNow); + + var updated = await _executions.FindOneAndUpdateAsync(filter, update); + if (updated == null) + { + // If failed to acquire lock, skip the trigger + _logger.LogDebug("Failed to acquire lock for existing execution of trigger {TriggerId}", trigger.TriggerId); + return; + } + + // If lock acquisition succeeded, start the job + _logger.LogInformation("Starting job '{JobName}' with existing execution. TriggerId: {TriggerId}. ExecutionId: {ExecutionId}", + jobDef.Name, trigger.TriggerId, existingExecution.Id); + + var cts = new CancellationTokenSource(); + _runningJobs.TryAdd(existingExecution.Id, cts); + + // Start asynchronous execution + _ = Task.Run(() => ExecuteJobAsync(trigger, existingExecution, jobDef, cts.Token)); + } + + private async Task ExecuteJobAsync(JobTrigger trigger, JobExecution execution, JobDefinition jobDef, CancellationToken cancellationToken) + { + try + { + // Get job instance from DI (transient or scoped) + using var scope = _serviceProvider.CreateScope(); + var jobType = Type.GetType(jobDef.JobType) ?? throw new InvalidOperationException($"Cannot load job type {jobDef.JobType}"); + + // Check if there are settings + if (string.IsNullOrEmpty(jobDef.SettingsType)) + { + // Job without settings + var job = scope.ServiceProvider.GetRequiredService(jobType) as IJob; + if (job == null) + throw new InvalidOperationException($"Job {jobDef.JobType} does not implement IJob"); + await job.ExecuteAsync(cancellationToken); + } + else + { + // Job with settings + var settingsType = Type.GetType(jobDef.SettingsType); + if (settingsType == null) + throw new InvalidOperationException($"Cannot load settings type {jobDef.SettingsType}"); + + var settings = string.IsNullOrEmpty(trigger.SettingsJson) + ? Activator.CreateInstance(settingsType) + : JsonSerializer.Deserialize(trigger.SettingsJson, settingsType); + + var jobTypeGeneric = typeof(IJob<>).MakeGenericType(settingsType); + var job = scope.ServiceProvider.GetRequiredService(jobType); + var method = jobType.GetMethod("ExecuteAsync"); + if (method == null) + throw new InvalidOperationException($"ExecuteAsync method not found on {jobDef.JobType}"); + await (Task)method.Invoke(job, [settings, cancellationToken]); + } + + // Successful completion + await UpdateExecutionStatus(execution.Id, ExecutionStatus.Completed, result: "Success"); + // Update next run time for trigger + await UpdateTriggerNextRun(trigger); + _logger.LogInformation("Job {JobName} (ExecutionId: {ExecutionId}) completed successfully", jobDef.Name, execution.Id); + } + catch (OperationCanceledException) + { + await UpdateExecutionStatus(execution.Id, ExecutionStatus.Failed, result: "Cancelled"); + _logger.LogWarning("Job {JobName} (TriggerId: {TriggerId}; ExecutionId: {ExecutionId}) was cancelled", jobDef.Name, trigger.TriggerId, execution.Id); + } + catch (Exception ex) + { + await UpdateExecutionStatus(execution.Id, ExecutionStatus.Failed, result: ex.ToString()); + _logger.LogError(ex, "Job {JobName} (TriggerId: {TriggerId}; ExecutionId: {ExecutionId}) failed", jobDef.Name, trigger.TriggerId, execution.Id); + } + finally + { + _runningJobs.TryRemove(execution.Id, out _); + } + } + + private async Task UpdateExecutionStatus(string executionId, ExecutionStatus status, string result) + { + var update = Builders.Update + .Set(e => e.Status, status) + .Set(e => e.EndTime, DateTime.UtcNow) + .Set(e => e.Result, result) + .Unset(e => e.LockId) // remove lock + .Unset(e => e.LockExpiry); + await _executions.UpdateOneAsync(e => e.Id == executionId, update); + } + + private async Task UpdateTriggerNextRun(JobTrigger trigger) + { + if (!trigger.IsRecurring || string.IsNullOrEmpty(trigger.CronExpression)) + { + // One-time trigger — delete it + await _triggers.DeleteOneAsync(t => t.Id == trigger.Id); + return; + } + + // Calculate next run time based on Cron + var nextRun = CronHelper.GetNextOccurrence(trigger.CronExpression, DateTime.UtcNow); + if (!nextRun.HasValue) + { + // Cron will no longer fire (e.g., expired) — delete it + await _triggers.DeleteOneAsync(t => t.Id == trigger.Id); + return; + } + + await _triggers.UpdateOneAsync(t => t.Id == trigger.Id, + Builders.Update.Set(t => t.NextRunTime, nextRun.Value)); + } + + // ============================== Heartbeat ============================== + + private async void Heartbeat(object? state) + { + try + { + var now = DateTime.UtcNow; + var newExpiry = now.AddSeconds(_options.LockTimeoutSeconds); + + // Find all executions that belong to this instance and are in Running status + var filter = Builders.Filter.And( + Builders.Filter.Eq(e => e.LockId, _instanceId), + Builders.Filter.Eq(e => e.Status, ExecutionStatus.Running)); + + var update = Builders.Update + .Set(e => e.LockExpiry, newExpiry) + .Set(e => e.LastHeartbeat, now); + + var result = await _executions.UpdateManyAsync(filter, update); + _logger.LogDebug("Heartbeat: updated {Count} executions", result.ModifiedCount); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error during heartbeat"); + } + } + + public void Dispose() + { + if (_disposed) return; + _scanTimer?.Dispose(); + _heartbeatTimer?.Dispose(); + _disposed = true; + } +} \ No newline at end of file diff --git a/src/Influunt.Scheduler/JobSchedulerOptions.cs b/src/Influunt.Scheduler/JobSchedulerOptions.cs new file mode 100644 index 0000000..b5a7a9e --- /dev/null +++ b/src/Influunt.Scheduler/JobSchedulerOptions.cs @@ -0,0 +1,19 @@ +namespace Influunt.Scheduler; + + public class JobSchedulerOptions + { + /// MongoDB connection string. + public string ConnectionString { get; set; } = ""; + + /// Trigger scan interval (seconds). + public int ScanIntervalSeconds { get; set; } = 60; + + /// Lock timeout duration (seconds). + public int LockTimeoutSeconds { get; set; } = 300; + + /// Heartbeat update interval (seconds). + public int HeartbeatIntervalSeconds { get; set; } = 60; + + /// Maximum number of concurrently running jobs. + public int MaxConcurrentJobs { get; set; } = 10; + } diff --git a/src/Influunt.Scheduler/JobTriggerClassMap.cs b/src/Influunt.Scheduler/JobTriggerClassMap.cs new file mode 100644 index 0000000..fc3ffb2 --- /dev/null +++ b/src/Influunt.Scheduler/JobTriggerClassMap.cs @@ -0,0 +1,19 @@ +using MongoDB.Bson; +using MongoDB.Bson.Serialization; +using MongoDB.Bson.Serialization.IdGenerators; +using MongoDB.Bson.Serialization.Serializers; + +namespace Influunt.Scheduler; + +public class JobTriggerClassMap : BsonClassMap +{ + public JobTriggerClassMap() + { + AutoMap(); + MapIdMember(x => x.Id) + .SetIdGenerator(StringObjectIdGenerator.Instance) + .SetSerializer(new StringSerializer(BsonType.ObjectId)); + UnmapMember(x => x.IsRecurring); + SetIgnoreExtraElements(true); + } +} diff --git a/src/Influunt.Scheduler/MongoContext.cs b/src/Influunt.Scheduler/MongoContext.cs new file mode 100644 index 0000000..5ed4e9e --- /dev/null +++ b/src/Influunt.Scheduler/MongoContext.cs @@ -0,0 +1,90 @@ +using Microsoft.Extensions.Options; +using MongoDB.Driver; + +namespace Influunt.Scheduler; + +/// +/// MongoDB context implementation +/// +public class MongoContext : IMongoContext, IDisposable +{ + private readonly MongoUrl _url; + private readonly Lazy _mongoClient; + private IMongoDatabase _database; + private bool _disposed; + + /// + /// Creates a new instance of MongoContext with the specified connection string + /// + /// Job scheduler options + public MongoContext(IOptions options) + { + if (options?.Value?.ConnectionString == null) + throw new ArgumentException("Connection string cannot be null or empty", nameof(options)); + + var connectionString = options.Value.ConnectionString; + + // Connection string parsing + _url = MongoUrl.Create(connectionString); + + // Lazy creation of MongoClient + _mongoClient = new Lazy(() => new MongoClient(_url)); + } + + /// + /// Gets the MongoDB database + /// + public IMongoDatabase Database => _database ??= _mongoClient.Value.GetDatabase(_url.DatabaseName); + + /// + /// Gets the collection of the specified type + /// + /// Type of collection documents + /// Collection name (if null, the type name will be used) + /// MongoDB collection + public IMongoCollection GetCollection(string? collectionName = null) + { + if (_disposed) + throw new ObjectDisposedException(nameof(MongoContext)); + + var name = collectionName ?? typeof(T).Name.ToLowerInvariant(); + return Database.GetCollection(name); + } + + /// + /// Releases resources + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Releases resources + /// + protected virtual void Dispose(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + // Cleaning up managed resources + if (_mongoClient.IsValueCreated) + { + // MongoClient does not require explicit Dispose, but it can be added if needed + } + } + + _disposed = true; + } + } + + /// + /// Finalizer + /// + ~MongoContext() + { + Dispose(false); + } +} \ No newline at end of file diff --git a/src/Influunt.Scheduler/SchedulerModule.cs b/src/Influunt.Scheduler/SchedulerModule.cs new file mode 100644 index 0000000..e307e96 --- /dev/null +++ b/src/Influunt.Scheduler/SchedulerModule.cs @@ -0,0 +1,42 @@ +using Influunt.Scheduler.Abstractions.Services; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using MongoDB.Bson.Serialization; + +namespace Influunt.Scheduler; + +public static class SchedulerModule +{ + public static IServiceCollection AddJobScheduler(this IServiceCollection services, Action configureOptions) + { + services.Configure(configureOptions); + + // Register IMongoContext + services.AddSingleton(); + + // Register JobScheduler + services.AddSingleton(); + + services.AddHostedService(); + + // Register BSON class maps + try{ + BsonClassMap.RegisterClassMap(new JobDefinitionClassMap()); + BsonClassMap.RegisterClassMap(new JobExecutionClassMap()); + BsonClassMap.RegisterClassMap(new JobTriggerClassMap()); + } + catch (ArgumentException) + { + // Do nothing. Needs for tests + } + + return services; + } + + public static IServiceCollection AddJobSchedulerHost(this IServiceCollection services, Action configureOptions) + { + services.AddJobScheduler(configureOptions); + services.AddSingleton(s => (JobScheduler)s.GetRequiredService()); + return services; + } +} diff --git a/src/Influunt.Storage/IMongoContext.cs b/src/Influunt.Storage/IMongoContext.cs new file mode 100644 index 0000000..fe738e4 --- /dev/null +++ b/src/Influunt.Storage/IMongoContext.cs @@ -0,0 +1,27 @@ +using MongoDB.Driver; + +namespace Influunt.Storage; + +/// +/// Интерфейс для работы с MongoDB контекстом +/// +public interface IMongoContext +{ + /// + /// Получает базу данных MongoDB + /// + IMongoDatabase Database { get; } + + /// + /// Получает коллекцию указанного типа + /// + /// Тип документов коллекции + /// Имя коллекции (если null, будет использовано имя типа) + /// Коллекция MongoDB + IMongoCollection GetCollection(string collectionName = null); + + /// + /// Освобождает ресурсы + /// + void Dispose(); +} \ No newline at end of file diff --git a/src/Influunt.Storage/MongoContext.cs b/src/Influunt.Storage/MongoContext.cs new file mode 100644 index 0000000..a6aaa38 --- /dev/null +++ b/src/Influunt.Storage/MongoContext.cs @@ -0,0 +1,88 @@ +using System; +using MongoDB.Driver; + +namespace Influunt.Storage; + +/// +/// Реализация контекста MongoDB +/// +public class MongoContext : IMongoContext, IDisposable +{ + private readonly MongoUrl _url; + private readonly Lazy _mongoClient; + private IMongoDatabase _database; + private bool _disposed; + + /// + /// Создает новый экземпляр MongoContext с указанной строкой подключения + /// + /// Строка подключения к MongoDB + public MongoContext(string connectionString) + { + if (string.IsNullOrWhiteSpace(connectionString)) + throw new ArgumentException("Connection string cannot be null or empty", nameof(connectionString)); + + // Парсинг строки подключения + _url = MongoUrl.Create(connectionString); + + // Lazy создание MongoClient + _mongoClient = new Lazy(() => new MongoClient(_url)); + } + + /// + /// Получает базу данных MongoDB + /// + public IMongoDatabase Database => _database ??= _mongoClient.Value.GetDatabase(_url.DatabaseName); + + /// + /// Получает коллекцию указанного типа + /// + /// Тип документов коллекции + /// Имя коллекции (если null, будет использовано имя типа) + /// Коллекция MongoDB + public IMongoCollection GetCollection(string collectionName = null) + { + if (_disposed) + throw new ObjectDisposedException(nameof(MongoContext)); + + var name = collectionName ?? typeof(T).Name.ToLowerInvariant(); + return Database.GetCollection(name); + } + + /// + /// Освобождает ресурсы + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Освобождает ресурсы + /// + protected virtual void Dispose(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + // Очищаем управляемые ресурсы + if (_mongoClient.IsValueCreated) + { + // MongoClient не требует явного Dispose, но если нужно, можно добавить + } + } + + _disposed = true; + } + } + + /// + /// Финализатор + /// + ~MongoContext() + { + Dispose(false); + } +} \ No newline at end of file diff --git a/src/Influunt.Worker/Influunt.Worker.csproj b/src/Influunt.Worker/Influunt.Worker.csproj new file mode 100644 index 0000000..91fca51 --- /dev/null +++ b/src/Influunt.Worker/Influunt.Worker.csproj @@ -0,0 +1,20 @@ + + + net8 + enable + enable + f7b6a3ab-5dd3-487c-8763-0c114addb57b + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/Influunt.Worker/NoOpHttpContextAccessor.cs b/src/Influunt.Worker/NoOpHttpContextAccessor.cs new file mode 100644 index 0000000..c0bea91 --- /dev/null +++ b/src/Influunt.Worker/NoOpHttpContextAccessor.cs @@ -0,0 +1,7 @@ +using Microsoft.AspNetCore.Http; + +public class NoOpHttpContextAccessor : IHttpContextAccessor +{ + public HttpContext? HttpContext { get => null; set => throw new NotImplementedException(); } + +} \ No newline at end of file diff --git a/src/Influunt.Worker/Program.cs b/src/Influunt.Worker/Program.cs new file mode 100644 index 0000000..d6c5ffe --- /dev/null +++ b/src/Influunt.Worker/Program.cs @@ -0,0 +1,22 @@ +using Influunt.Scheduler; +using Influunt.Storage; +using Influunt.Worker; +using Skidbladnir.Modules; + +var builder = Host.CreateApplicationBuilder(args); +if (builder.Environment.IsDevelopment()) + builder.Configuration.AddUserSecrets(); +builder.Services.AddOptions(); +builder.Services.AddJobSchedulerHost(c => +{ + builder.Configuration.GetSection("Scheduler").Bind(c); + c.ConnectionString = builder.Configuration.GetSection("ConnectionStrings:Mongo").GetValue("ConnectionString")!; +}); +builder.Services.AddSkidbladnirModules(configuration => +{ + var storageConfiguration = builder.Configuration.GetSection("ConnectionStrings:Mongo").Get(); + configuration.Add(storageConfiguration); +}, builder.Configuration); + +var host = builder.Build(); +host.Run(); diff --git a/src/Influunt.Worker/Properties/launchSettings.json b/src/Influunt.Worker/Properties/launchSettings.json new file mode 100644 index 0000000..c865ceb --- /dev/null +++ b/src/Influunt.Worker/Properties/launchSettings.json @@ -0,0 +1,12 @@ +{ + "$schema": "https://json.schemastore.org/launchsettings.json", + "profiles": { + "Influunt.Worker": { + "commandName": "Project", + "dotnetRunMessages": true, + "environmentVariables": { + "DOTNET_ENVIRONMENT": "Development" + } + } + } +} diff --git a/src/Influunt.Worker/StartupModule.cs b/src/Influunt.Worker/StartupModule.cs new file mode 100644 index 0000000..965a0b8 --- /dev/null +++ b/src/Influunt.Worker/StartupModule.cs @@ -0,0 +1,33 @@ +using Influunt.Feed.Extensions; +using Influunt.Feed.Rss; +using Influunt.Storage; +using Microsoft.AspNetCore.Http; +using Skidbladnir.Caching.Distributed.MongoDB; +using Skidbladnir.Modules; + +namespace Influunt.Worker; + +public class StartupModule : Module +{ + public override Type[] DependsModules => [typeof(StorageModule), typeof(RssModule)]; + + public override void Configure(IServiceCollection services) + { + services.AddSingleton(); + ConfigureDistributedCache(services); + services.AddFeedJobs(); + } + + private void ConfigureDistributedCache(IServiceCollection services) + { + var redisConfiguration = Configuration.AppConfiguration["ConnectionStrings:Redis:ConnectionString"]; + if(string.IsNullOrWhiteSpace(redisConfiguration)) + { + services.AddMongoDistributedCache(Configuration.AppConfiguration["ConnectionStrings:Mongo:ConnectionString"]); + } + else + { + services.AddStackExchangeRedisCache(c => c.Configuration = redisConfiguration); + } + } +} \ No newline at end of file diff --git a/src/Influunt.Worker/appsettings.Development.json b/src/Influunt.Worker/appsettings.Development.json new file mode 100644 index 0000000..b2dcdb6 --- /dev/null +++ b/src/Influunt.Worker/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.Hosting.Lifetime": "Information" + } + } +} diff --git a/src/Influunt.Worker/appsettings.json b/src/Influunt.Worker/appsettings.json new file mode 100644 index 0000000..43cc794 --- /dev/null +++ b/src/Influunt.Worker/appsettings.json @@ -0,0 +1,22 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.Hosting.Lifetime": "Information" + } + }, + "ConnectionStrings": { + "Mongo": { + "ConnectionString": "" + }, + "Redis":{ + "ConnectionString": "" + } + }, + "Scheduler":{ + "ScanIntervalSeconds": 30, + "LockTimeoutSeconds": 600, + "HeartbeatIntervalSeconds": 60, + "MaxConcurrentJobs": 10 + } +} diff --git a/tests/Influunt.Scheduler.Tests/Influunt.Scheduler.Tests.csproj b/tests/Influunt.Scheduler.Tests/Influunt.Scheduler.Tests.csproj new file mode 100644 index 0000000..a7c2629 --- /dev/null +++ b/tests/Influunt.Scheduler.Tests/Influunt.Scheduler.Tests.csproj @@ -0,0 +1,32 @@ + + + net8.0 + enable + enable + false + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/tests/Influunt.Scheduler.Tests/JobSchedulerIntegrationTests.cs b/tests/Influunt.Scheduler.Tests/JobSchedulerIntegrationTests.cs new file mode 100644 index 0000000..35b7654 --- /dev/null +++ b/tests/Influunt.Scheduler.Tests/JobSchedulerIntegrationTests.cs @@ -0,0 +1,275 @@ +using AwesomeAssertions; +using Influunt.Scheduler; +using Influunt.Scheduler.Abstractions.Models; +using Influunt.Scheduler.Abstractions.Services; +using Microsoft.Extensions.DependencyInjection; +using MongoDB.Driver; + +namespace Influunt.Scheduler.Tests; + +public class JobSchedulerIntegrationTests : SchedulerIntegrationTestBase +{ + protected override void ConfigureServices(IServiceCollection services, string connectionString) + { + // Register test jobs + services.AddTransient(); + services.AddTransient(); + + // Add scheduler module with hosted service registration + services.AddJobSchedulerHost(options => + { + options.ConnectionString = connectionString; + options.ScanIntervalSeconds = 1; // Faster scanning for tests + options.LockTimeoutSeconds = 5; + options.HeartbeatIntervalSeconds = 2; + options.MaxConcurrentJobs = 10; + }); + } + + protected override async Task InitializeTestDataAsync() + { + // Reset static fields + TestJobWithoutSettings.WasExecuted = false; + TestJobWithoutSettings.ExecutionCount = 0; + TestJobWithSettings.WasExecuted = false; + TestJobWithSettings.ExecutionCount = 0; + TestJobWithSettings.LastSettings = null; + + await base.InitializeTestDataAsync(); + } + + [Fact] + public async Task Should_Register_Job_Without_Settings() + { + // Arrange + var scheduler = GetRequiredService(); + + // Act + await scheduler.RegisterJobAsync("TestJobWithoutSettings", "Test job without settings"); + + // Assert + var jobs = await scheduler.GetRegisteredJobsAsync(); + jobs.Should().ContainSingle(); + var job = jobs.First(); + job.Id.Should().NotBeNullOrEmpty(); + job.Name.Should().Be("TestJobWithoutSettings"); + job.Description.Should().Be("Test job without settings"); + job.JobType.Should().Be(typeof(TestJobWithoutSettings).AssemblyQualifiedName); + job.SettingsType.Should().BeNullOrEmpty(); + } + + [Fact] + public async Task Should_Register_Job_With_Settings() + { + // Arrange + var scheduler = GetRequiredService(); + + // Act + await scheduler.RegisterJobAsync("TestJobWithSettings", "Test job with settings"); + + // Assert + var jobs = await scheduler.GetRegisteredJobsAsync(); + jobs.Should().ContainSingle(); + var job = jobs.First(); + job.Id.Should().NotBeNullOrEmpty(); + job.Name.Should().Be("TestJobWithSettings"); + job.Description.Should().Be("Test job with settings"); + job.JobType.Should().Be(typeof(TestJobWithSettings).AssemblyQualifiedName); + job.SettingsType.Should().Be(typeof(TestJobSettings).AssemblyQualifiedName); + } + + [Fact] + public async Task Should_Schedule_OneTime_Job_Without_Settings() + { + // Arrange + var scheduler = GetRequiredService(); + await scheduler.RegisterJobAsync("TestJobWithoutSettings"); + + // Act + var triggerId = await scheduler.ScheduleOneTimeJobAsync(DateTime.UtcNow.AddSeconds(1)); + + // Assert + triggerId.Should().NotBeNullOrEmpty(); + + var triggers = await scheduler.GetScheduledTriggersAsync(); + triggers.Should().ContainSingle(); + var trigger = triggers.First(); + trigger.Id.Should().NotBeNullOrEmpty(); + trigger.TriggerId.Should().Be(triggerId); + trigger.JobType.Should().Be(typeof(TestJobWithoutSettings).AssemblyQualifiedName); + trigger.IsRecurring.Should().BeFalse(); + + // Wait for job to execute + await Task.Delay(3000); + + TestJobWithoutSettings.WasExecuted.Should().BeTrue(); + + // Trigger should be removed after execution + triggers = await scheduler.GetScheduledTriggersAsync(); + triggers.Should().BeEmpty(); + } + + [Fact] + public async Task Should_Schedule_Recurring_Job_Without_Settings() + { + // Arrange + var scheduler = GetRequiredService(); + await scheduler.RegisterJobAsync("TestJobWithoutSettings"); + + // Act + var triggerId = await scheduler.ScheduleRecurringJobAsync("* * * * *"); // Every minute + + // Assert + triggerId.Should().NotBeNullOrEmpty(); + + var triggers = await scheduler.GetScheduledTriggersAsync(); + triggers.Should().ContainSingle(); + var trigger = triggers.First(); + trigger.Id.Should().NotBeNullOrEmpty(); + trigger.TriggerId.Should().Be(triggerId); + trigger.JobType.Should().Be(typeof(TestJobWithoutSettings).AssemblyQualifiedName); + trigger.IsRecurring.Should().BeTrue(); + trigger.CronExpression.Should().Be("* * * * *"); + + await Task.Delay(TimeSpan.FromSeconds(70)); // Wait 70 seconds to ensure job runs (around next minute) + + TestJobWithoutSettings.WasExecuted.Should().BeTrue(); + + // Trigger should still exist + triggers = await scheduler.GetScheduledTriggersAsync(); + triggers.Should().ContainSingle(); + } + + [Fact] + public async Task Should_Execute_Recurring_Job_Multiple_Times() + { + // Arrange + var scheduler = GetRequiredService(); + await scheduler.RegisterJobAsync("TestJobWithoutSettings"); + + var triggerId = await scheduler.ScheduleRecurringJobAsync("* * * * *"); + + // Force the trigger to be due immediately + var context = GetRequiredService(); + var triggersCollection = context.GetCollection(Constants.JobTriggersCollectionName); + await ForceRunTrigger(triggerId, triggersCollection); + + // Wait for first execution to complete + await Task.Delay(TimeSpan.FromSeconds(3)); + + TestJobWithoutSettings.ExecutionCount.Should().BeGreaterThanOrEqualTo(1); + + // Force trigger to be due again for second execution + await ForceRunTrigger(triggerId, triggersCollection); + + // Wait for second scan cycle + await Task.Delay(TimeSpan.FromSeconds(3)); + + // Verify job ran at least twice + TestJobWithoutSettings.ExecutionCount.Should().BeGreaterThanOrEqualTo(2); + + // Trigger should still exist for recurring job + var remainingTriggers = await scheduler.GetScheduledTriggersAsync(); + remainingTriggers.Should().ContainSingle(); + } + + [Fact] + public async Task Should_Schedule_OneTime_Job_With_Settings() + { + // Arrange + var scheduler = GetRequiredService(); + await scheduler.RegisterJobAsync("TestJobWithSettings"); + + var settings = new TestJobSettings + { + Message = "Hello Test", + Count = 42 + }; + + // Act + var triggerId = await scheduler.ScheduleOneTimeJobAsync( + settings, DateTime.UtcNow.AddSeconds(1)); + + // Assert + triggerId.Should().NotBeNullOrEmpty(); + + // Wait for job to execute + await Task.Delay(TimeSpan.FromSeconds(3)); + + TestJobWithSettings.WasExecuted.Should().BeTrue(); + TestJobWithSettings.LastSettings.Should().NotBeNull(); + TestJobWithSettings.LastSettings!.Message.Should().Be("Hello Test"); + TestJobWithSettings.LastSettings!.Count.Should().Be(42); + + // Trigger should be removed after execution + var triggers = await scheduler.GetScheduledTriggersAsync(); + triggers.Should().BeEmpty(); + } + + [Fact] + public async Task Should_Unschedule_Job() + { + // Arrange + var scheduler = GetRequiredService(); + await scheduler.RegisterJobAsync("TestJobWithoutSettings"); + var triggerId = await scheduler.ScheduleOneTimeJobAsync(DateTime.UtcNow.AddMinutes(5)); + + // Act + await scheduler.UnScheduleJobAsync(triggerId); + + // Assert + var triggers = await scheduler.GetScheduledTriggersAsync(); + triggers.Should().BeEmpty(); + } + + [Fact] + public async Task Should_Not_Allow_Duplicate_Trigger_Execution() + { + // Arrange + var scheduler = GetRequiredService(); + await scheduler.RegisterJobAsync("TestJobWithoutSettings", allowConcurrentExecution: false); + + var runAt = DateTime.UtcNow.AddSeconds(1); + await scheduler.ScheduleOneTimeJobAsync(runAt, "test-trigger"); + var duplicateTriggerId = () => scheduler.ScheduleOneTimeJobAsync(runAt.AddSeconds(10), "test-trigger"); + + var exception = await Assert.ThrowsAsync(duplicateTriggerId); + exception.WriteError.Category.Should().Be(ServerErrorCategory.DuplicateKey); + exception.WriteError.Code.Should().Be(11000); + } + + [Fact] + public async Task Should_Handle_Multiple_Concurrent_Jobs() + { + // Arrange + var scheduler = GetRequiredService(); + await scheduler.RegisterJobAsync("TestJobWithoutSettings", allowConcurrentExecution: true); + + // Act + var triggerId1 = await scheduler.ScheduleOneTimeJobAsync(DateTime.UtcNow.AddSeconds(1), "trigger1"); + var triggerId2 = await scheduler.ScheduleOneTimeJobAsync(DateTime.UtcNow.AddSeconds(1), "trigger2"); + var triggerId3 = await scheduler.ScheduleOneTimeJobAsync(DateTime.UtcNow.AddSeconds(1), "trigger3"); + + // Assert + var triggers = await scheduler.GetScheduledTriggersAsync(); + triggers.Should().HaveCount(3); + + // Wait for execution + await Task.Delay(TimeSpan.FromSeconds(4)); + + TestJobWithoutSettings.WasExecuted.Should().BeTrue(); + TestJobWithoutSettings.ExecutionCount.Should().Be(3); + + // All triggers should be removed after execution + triggers = await scheduler.GetScheduledTriggersAsync(); + triggers.Should().BeEmpty(); + } + + + private static async Task ForceRunTrigger(string triggerId, IMongoCollection triggersCollection) + { + await triggersCollection.UpdateOneAsync( + t => t.TriggerId == triggerId, + Builders.Update.Set(t => t.NextRunTime, DateTime.UtcNow.AddSeconds(-5))); + } +} \ No newline at end of file diff --git a/tests/Influunt.Scheduler.Tests/SchedulerIntegrationTestBase.cs b/tests/Influunt.Scheduler.Tests/SchedulerIntegrationTestBase.cs new file mode 100644 index 0000000..d3c56dc --- /dev/null +++ b/tests/Influunt.Scheduler.Tests/SchedulerIntegrationTestBase.cs @@ -0,0 +1,77 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Testcontainers.MongoDb; + +namespace Influunt.Scheduler.Tests; + +public abstract class SchedulerIntegrationTestBase : IAsyncLifetime +{ + private readonly MongoDbContainer _mongoContainer; + private IHost _host; + private IServiceProvider _serviceProvider; + + protected SchedulerIntegrationTestBase() + { + _mongoContainer = new MongoDbBuilder() + .WithImage("mongo:latest") + .Build(); + } + + protected IServiceProvider ServiceProvider => _serviceProvider ?? throw new InvalidOperationException("Test not initialized"); + + protected T GetRequiredService() where T : notnull + { + return ServiceProvider.GetRequiredService(); + } + + public async Task InitializeAsync() + { + await _mongoContainer.StartAsync(); + + var connectionStringUriBuilder = new UriBuilder(_mongoContainer.GetConnectionString()); + connectionStringUriBuilder.Path = "test-db"; + connectionStringUriBuilder.Query += "&authMechanism=SCRAM-SHA-256&authSource=admin"; + + _host = Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + ConfigureServices(services, connectionStringUriBuilder.ToString()); + }) + .ConfigureLogging(logging => + { + logging.ClearProviders(); + logging.AddConsole(); + }) + .Build(); + + _serviceProvider = _host.Services; + + await InitializeTestDataAsync(); + + _host.StartAsync(CancellationToken.None); + + await Task.Delay(TimeSpan.FromMinutes(1)); // Time for index creation + } + + protected virtual void ConfigureServices(IServiceCollection services, string connectionString) + { + // Override in derived tests to add additional services + } + + protected virtual Task InitializeTestDataAsync() + { + return Task.CompletedTask; + } + + public async Task DisposeAsync() + { + if (_host != null) + { + await _host.StopAsync(); + _host.Dispose(); + } + + await _mongoContainer.DisposeAsync(); + } +} \ No newline at end of file diff --git a/tests/Influunt.Scheduler.Tests/TestJobs.cs b/tests/Influunt.Scheduler.Tests/TestJobs.cs new file mode 100644 index 0000000..6320d4c --- /dev/null +++ b/tests/Influunt.Scheduler.Tests/TestJobs.cs @@ -0,0 +1,37 @@ +using Influunt.Scheduler.Abstractions.Models; + +namespace Influunt.Scheduler.Tests; + +public class TestJobWithoutSettings : IJob +{ + public static int ExecutionCount = 0; + public static bool WasExecuted { get; set; } + + public Task ExecuteAsync(CancellationToken cancellationToken) + { + WasExecuted = true; + Interlocked.Increment(ref ExecutionCount); + return Task.CompletedTask; + } +} + +public class TestJobWithSettings : IJob +{ + public static int ExecutionCount = 0; + public static TestJobSettings? LastSettings { get; set; } + public static bool WasExecuted { get; set; } + + public Task ExecuteAsync(TestJobSettings settings, CancellationToken cancellationToken) + { + LastSettings = settings; + WasExecuted = true; + Interlocked.Increment(ref ExecutionCount); + return Task.CompletedTask; + } +} + +public class TestJobSettings +{ + public string Message { get; set; } = string.Empty; + public int Count { get; set; } +} \ No newline at end of file