diff --git a/CHANGELOG.md b/CHANGELOG.md
index f0ab89e..af730a5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -182,3 +182,33 @@ Added functionality to send initial status messages when gathering peers and sta
- fixed `progress()` to update progress state from all methods
---
+
+## [3.2.0] - 2026-06-13
+
+### Added
+
+- Edit last broadcast message with `editLastBroadcastForAll()`.
+- Optional `broadcastId` targeting for editing or deleting the last message of a specific broadcast.
+- Metadata peer loading for targeted edit/delete calls when `allUsers` is empty and `broadcastId` is provided.
+- Pause/resume/cancel inline controls on live broadcast status messages.
+- Scheduled broadcasts with durable jobs and runner methods.
+- Self-destruct broadcasts with a `0` to `48` hour delay.
+- Persisted per-broadcast metadata in `data/broadcasts/{broadcastId}.json`.
+- Scheduled and self-destruct job runners.
+- Internal error logging to `data/broadcast-errors.log`.
+
+### Changed
+
+- Safer state handling using shared state references by broadcast id.
+- Safer cancel behavior: `cancel()` now marks cancellation without clearing in-flight requests.
+- `progress()` now includes edit, scheduled, self-destruct, total, elapsed, and TPS fields.
+- Message IDs are saved during broadcast after each successful peer instead of only at the end.
+- `editLastBroadcastForAll()` and `deleteLastBroadcastForAll()` can use persisted broadcast metadata instead of only legacy `lastBroadcast.txt`.
+- `deleteAllBroadcastsForAll()` now uses one progress loop instead of concurrent progress edits from workers.
+
+### Fixed
+
+- Pause/resume/cancel state reference issue.
+- Workers not stopping after `done`.
+- Unsafe watchdog behavior that could duplicate sends.
+- Concurrent progress edits in `deleteAllBroadcastsForAll()`.
diff --git a/README.md b/README.md
index a02bea4..49126c1 100644
--- a/README.md
+++ b/README.md
@@ -1,186 +1,233 @@
# BroadcastManager
**High-Performance Telegram Broadcast Manager** for [MadelineProto](https://docs.madelineproto.xyz/).
-Manage broadcasts efficiently: send messages, media albums, pin/unpin messages, control broadcasts in real-time, and track live progress with advanced features.
+Manage Telegram broadcasts efficiently: send text, media, albums, inline buttons, pin/unpin messages, delete broadcasts, edit broadcast, schedule broadcasts, run self-destruct deletion jobs, and track live progress.
[](LICENSE)
-[](https://github.com/WizardLoop/BroadcastManager)
-[](https://packagist.org/packages/wizardloop/broadcastmanager)
+[](https://github.com/WizardLoop/BroadcastManager)
+[](https://packagist.org/packages/wizardloop/broadcastmanager)
[](https://packagist.org/packages/wizardloop/broadcastmanager)

---
-## ๐ Features
+## Requirements
-* ๐ **High-Performance Broadcasts**
- Send messages concurrently to thousands of users, groups, or channels with configurable concurrency.
+* PHP `8.2` or newer
+* `danog/madelineproto` `^8.4`
+* `amphp/amp` `^3.0`
-* โธ **Pause / Resume / Cancel Broadcasts**
- Control ongoing broadcasts in real-time without restarting.
-
-* ๐ **Pin & Unpin Messages**
+---
- * Pin the last broadcasted message automatically.
- * Unpin all messages for all subscribers.
+## Installation
-* ๐งน **Delete Last Broadcast**
+```bash
+composer require wizardloop/broadcastmanager
+```
- * Remove previously sent messages from all users.
- * Retries failed deletions and handles Telegram API limits automatically.
+Include Composer autoload:
-* โป๏ธ **Delete All Broadcasts**
+```php
+require 'vendor/autoload.php';
+```
- * Remove previously all sent messages from all users.
- * Retries failed deletions and handles Telegram API limits automatically.
-
-* ๐ **Live Progress Tracking**
+Create the manager with your MadelineProto API instance:
- * Visual progress bars
- * Messages per second (TPS)
- * Sent, failed, and pending counts
- * Paused/cancelled indicators
+```php
+use BroadcastTool\BroadcastManager;
-* ๐ผ **Media Albums Support**
- * Send multiple images/documents in a single broadcast using `sendMultiMedia`.
- * Supports captions and message entities.
+$manager = new BroadcastManager($api);
+```
-* ๐พ **Saving & Reusing Albums**
- Save albums to a JSON file for reuse in future broadcasts.
- Use [album-bot](https://github.com/WizardLoop/album-bot/blob/main/poll-bot.php) as an example to store album files locally.
+Optional custom data directory:
-* ๐ก **FLOOD_WAIT Handling & Retries**
- Automatically respects Telegram rate limits and retries failed messages.
+```php
+BroadcastManager::setDataDir(__DIR__ . '/data'); // default: __DIR__ . '/../data'
+```
-* ๐ **Inline Buttons / Reply Markup**
+---
- * Include interactive buttons for links, commands, or actions.
+## Features
+
+* Concurrent broadcasts with safe concurrency clamping.
+* Live progress message updates.
+* Pause, resume, and cancel by broadcast id.
+* Text, media, albums, entities, and inline buttons.
+* Optional pinning of the last message sent to each peer.
+* Delete last broadcast or all saved broadcast messages.
+* Edit the last saved broadcast message for each peer.
+* Scheduled broadcasts persisted to disk.
+* Self-destruct broadcasts with automatic deletion after `0` to `48` hours.
+* Per-broadcast metadata in `data/broadcasts/{broadcastId}.json`.
+* FLOOD_WAIT retry handling and hard-fail handling.
+* Internal error log at `data/broadcast-errors.log`.
---
-## โก Installation
+## Basic Broadcast
-```bash
-composer require wizardloop/broadcastmanager
+```php
+$users = ['123456789', '987654321'];
+
+$messages = [
+ [
+ 'message' => 'Hello subscribers',
+ 'parse_mode' => 'HTML',
+ ],
+];
+
+$broadcastId = $manager->broadcastWithProgress(
+ allUsers: $users,
+ messages: $messages,
+ chatId: $adminChatId,
+ pin: false,
+ concurrency: 10
+);
```
-Include autoload:
+`broadcastWithProgress()` returns a string id. Use it with `progress()`, `pause()`, `resume()`, and `cancel()`.
```php
-require 'vendor/autoload.php';
+$progress = $manager->progress($broadcastId);
```
----
+Concurrency is clamped internally:
-## ๐ Usage Example
+* Minimum: `1`
+* Maximum: `50`
+* Default: `20`
+* Recommended examples: `10`
---
-## Send Broadcast
-### 1) live progress update in message to admin:
-```php
-use BroadcastTool\BroadcastManager;
+## Message Payloads
-$manager = new BroadcastManager($api);
+Broadcast messages are passed directly to MadelineProto send methods with `peer` added internally.
+
+### Text message
-$manager->broadcastWithProgress($users, $messages, $adminChatId, true, 20);
+```php
+$messages = [
+ [
+ 'message' => 'Hello',
+ 'parse_mode' => 'HTML',
+ ],
+];
```
-### 2) track on progress without message:
-This method returns an integer ID that can be used.
+### Inline buttons
```php
-use BroadcastTool\BroadcastManager;
+$messages = [
+ [
+ 'message' => 'Click a button below:',
+ 'buttons' => [
+ [['text' => 'Visit Website', 'url' => 'https://example.com']],
+ [['text' => 'Start', 'callback_data' => 'start_action']],
+ ],
+ ],
+];
+```
-$manager = new BroadcastManager($api);
+`buttons` is converted to `reply_markup` internally for sending.
-$broadcastId = $manager->broadcastWithProgress($users, $messages, null, true, 20);
+### Media message
-/**
- * Get progress (can be polled)
- */
-$progress = $manager->progress($broadcastId);
+```php
+$messages = [
+ [
+ 'message' => 'Photo caption',
+ 'media' => [
+ '_' => 'inputMediaUploadedPhoto',
+ 'file' => '/path/to/photo.jpg',
+ ],
+ 'parse_mode' => 'HTML',
+ ],
+];
+```
-if ($progress !== null) {
+When a message has `media`, BroadcastManager uses `messages->sendMedia()`. Otherwise it uses `messages->sendMessage()`.
- // ๐ Core stats
- $processed = $progress['processed'];
- $success = $progress['success'];
- $failed = $progress['failed'];
- $pending = $progress['pending'];
- $flood = $progress['flood'];
-
- // ๐ Progress %
- $progressPercent = $progress['progressPercent'];
-
- // ๐ฆ Breakdown
- $sent = $progress['breakdown']['sent'];
- $deleted = $progress['breakdown']['deleted'];
- $unpin = $progress['breakdown']['unpin'];
-
- // โ๏ธ State
- $done = $progress['done'];
- $paused = $progress['paused'];
- $cancel = $progress['cancel'];
-
- // โฑ Timing
- $startedAt = $progress['startedAt'];
-
- /**
- * Example usage
- */
- echo "Progress: {$progressPercent}%\n";
- echo "Sent: {$sent}\n";
- echo "Failed: {$failed}\n";
-
- if ($done) {
- echo "Broadcast finished!";
- }
+### Album message
- if ($paused) {
- echo "Broadcast paused...";
- }
-}
-```
+If a message contains `albumFile`, BroadcastManager reads that JSON file and sends it with `messages->sendMultiMedia()` in chunks of `10`.
```php
-* progress return array|null {
-* processed: int, // total processed items (sent + deleted + unpin + failed)
-* success: int, // successful operations (sent + deleted + unpin)
-* failed: int, // failed operations count
-* pending: int, // remaining items in queue
-* flood: int, // FLOOD_WAIT occurrences
-*
-* progressPercent: float, // completion percentage (processed / total)
-*
-* breakdown: array {
-* sent: int,
-* deleted: int,
-* unpin: int
-* },
-*
-* done: bool, // process finished
-* paused: bool, // process paused
-* cancel: bool, // process cancelled
-*
-* startedAt: float // microtime start timestamp
-* }
+$messages = [
+ [
+ 'albumFile' => __DIR__ . '/album.json',
+ ],
+];
+```
+
+Example `album.json` item:
+
+```json
+[
+ {
+ "media": {
+ "type": "photo",
+ "botApiFileId": "AgACAgQAAxkBA..."
+ },
+ "caption": "Album caption",
+ "entities": []
+ }
+]
```
+Supported album media types are mapped to `inputMediaPhoto` and `inputMediaDocument`.
+
---
-## Filer Peers
+## Progress
+
```php
-$filterSub = $manager->filterPeers($users, 'users');
-$targets = $filterSub['targets']; # array
-$failed = $filterSub['failed']; # int
-$total = $filterSub['total']; # int
+$progress = $manager->progress($broadcastId);
+
+if ($progress !== null) {
+ echo "Progress: {$progress['progressPercent']}%\n";
+ echo "Success: {$progress['success']}\n";
+ echo "Failed: {$progress['failed']}\n";
+ echo "Pending: {$progress['pending']}\n";
+}
+```
+
+`progress()` returns `array|null`:
+
+```php
+[
+ 'processed' => 0,
+ 'success' => 0,
+ 'failed' => 0,
+ 'pending' => 0,
+ 'flood' => 0,
+ 'progressPercent' => 0.0,
+ 'breakdown' => [
+ 'sent' => 0,
+ 'deleted' => 0,
+ 'unpin' => 0,
+ 'edited' => 0,
+ 'unchanged' => 0,
+ 'scheduled' => 0,
+ ],
+ 'edited' => 0,
+ 'unchanged' => 0,
+ 'scheduled' => 0,
+ 'selfDestruct' => null,
+ 'type' => 'send',
+ 'total' => 0,
+ 'elapsed' => 0.0,
+ 'tps' => 0.0,
+ 'done' => false,
+ 'paused' => false,
+ 'cancel' => false,
+ 'startedAt' => null,
+]
```
---
-## โธ Control Broadcasts
+## Control Running Operations
```php
$manager->pause($broadcastId);
@@ -188,112 +235,431 @@ $manager->resume($broadcastId);
$manager->cancel($broadcastId);
```
-### Check state:
+`cancel()` only marks the operation as cancelled. It does not clear in-flight Telegram requests.
+
+Check operation state:
+
+```php
+$manager->isActive($broadcastId);
+$manager->isPaused($broadcastId);
+$manager->isCancelled($broadcastId);
+```
+
+Pause, resume, cancel, and progress work with send/edit/delete/unpin operations that have a live state id.
+
+---
+
+## Pin And Unpin
+
+Pin the last message sent to each peer:
+
```php
-if ($manager->isActive($broadcastId));
-if ($manager->isPaused($broadcastId));
-if ($manager->isCancelled($broadcastId));
-if (!$manager->hasLastBroadcast($broadcastId));
-if (!$manager->hasAllBroadcast($broadcastId));
-print_r($manager->progress($broadcastId));
+$broadcastId = $manager->broadcastWithProgress(
+ allUsers: $users,
+ messages: $messages,
+ chatId: $adminChatId,
+ pin: true,
+ concurrency: 10
+);
```
-### Set data dir:
+Unpin all messages for all peers:
+
```php
-BroadcastManager::setDataDir(__DIR__ . '/data'); // default: __DIR__ . '/../data'
+$unpinId = $manager->unpinAllMessagesForAll($users, $adminChatId, 10);
```
---
-## ๐งน Delete Last Broadcast
+## Delete Broadcast Messages
+
+Delete the last saved broadcast message for each peer using `data/{peer}/lastBroadcast.txt`:
```php
-$broadcastId = $manager->deleteLastBroadcastForAll($users, $adminChatId, 20);
+$deleteId = $manager->deleteLastBroadcastForAll($users, $adminChatId, 10);
+```
+
+Delete the last message from a specific broadcast using `data/broadcasts/{broadcastId}.json`:
+
+```php
+$deleteId = $manager->deleteLastBroadcastForAll(
+ allUsers: $users,
+ chatId: $adminChatId,
+ concurrency: 10,
+ broadcastId: $broadcastId
+);
+```
+
+If you pass an empty `allUsers` array with `broadcastId`, peers are loaded from that broadcast metadata:
+
+```php
+$deleteId = $manager->deleteLastBroadcastForAll(
+ allUsers: [],
+ chatId: $adminChatId,
+ concurrency: 10,
+ broadcastId: $broadcastId
+);
+```
+
+When `broadcastId` is provided, the legacy `lastBroadcast.txt` file is not used and is not removed. This prevents a newer broadcast from being affected by an older delete request.
+
+Delete all saved broadcast messages for each peer using `data/{peer}/messages.txt`:
+
+```php
+$deleteAllId = $manager->deleteAllBroadcastsForAll($users, $adminChatId, 10);
+```
+
+Check whether legacy saved message files exist:
+
+```php
+$hasLast = $manager->hasLastBroadcast();
+$hasAll = $manager->hasAllBroadcast();
```
---
-## โป๏ธ Delete All Broadcast
+## Edit Last Broadcast
+
+`editLastBroadcastForAll()` reads each peer's `data/{peer}/lastBroadcast.txt` and edits that message id by default.
+
+```php
+$editId = $manager->editLastBroadcastForAll(
+ allUsers: $users,
+ newText: 'Updated text',
+ chatId: $adminChatId,
+ buttons: null,
+ media: null,
+ concurrency: 10,
+ parseMode: 'HTML'
+);
+```
+
+Edit the last message from a specific broadcast using `data/broadcasts/{broadcastId}.json`:
+
+```php
+$editId = $manager->editLastBroadcastForAll(
+ allUsers: $users,
+ newText: 'Updated text for a specific broadcast',
+ chatId: $adminChatId,
+ buttons: null,
+ media: null,
+ concurrency: 10,
+ parseMode: 'HTML',
+ broadcastId: $broadcastId
+);
+```
+
+If you pass an empty `allUsers` array with `broadcastId`, peers are loaded from that broadcast metadata:
+
+```php
+$editId = $manager->editLastBroadcastForAll(
+ allUsers: [],
+ newText: 'Updated text for the stored broadcast peers',
+ chatId: $adminChatId,
+ buttons: null,
+ media: null,
+ concurrency: 10,
+ parseMode: 'HTML',
+ broadcastId: $broadcastId
+);
+```
+
+With buttons:
```php
-$broadcastId = $manager->deleteAllBroadcastsForAll($users, $adminChatId, 20);
+$editId = $manager->editLastBroadcastForAll(
+ allUsers: $users,
+ newText: 'Updated with a button',
+ chatId: $adminChatId,
+ buttons: [
+ [['text' => 'Open', 'url' => 'https://example.com']],
+ ],
+ media: null,
+ concurrency: 10,
+ parseMode: 'HTML'
+);
```
+Edit counters include:
+
+* `edited`
+* `unchanged`
+* `failed`
+* `flood`
+
+`MESSAGE_NOT_MODIFIED` is counted as `unchanged`, not `failed`.
+
---
-## ๐ Get Last Broadcast Data
+## Scheduled Broadcasts
+
+Scheduled broadcasts are saved in `data/scheduled-broadcasts.json`.
+
+```php
+$scheduleId = $manager->scheduleBroadcastForAll(
+ allUsers: $users,
+ messages: [
+ ['message' => 'Scheduled hello'],
+ ],
+ scheduledAt: time() + 3600,
+ chatId: $adminChatId,
+ pin: false,
+ concurrency: 10,
+ selfDestructHours: null
+);
+```
+
+If `scheduledAt` is in the future, the broadcast is saved and not sent yet.
+If `scheduledAt <= time()`, it is marked `running`, executed immediately, and then marked `done`, `cancelled`, or `failed`.
+
+Run due scheduled broadcasts periodically:
+
+```php
+$results = $manager->runDueScheduledBroadcasts();
+```
+
+List scheduled broadcasts:
+
+```php
+$scheduled = $manager->listScheduledBroadcasts();
+```
+
+Cancel a scheduled broadcast that has not started:
```php
-$broadcastId = $manager->lastBroadcastData();
+$cancelled = $manager->cancelScheduledBroadcast($scheduleId);
```
+`cancelScheduledBroadcast()` returns `false` once the job is already `running`, `done`, `cancelled`, or `failed`.
+
---
-## ๐ Pin / Unpin Messages
+## Self-Destruct Broadcasts
+
+Pass `selfDestructHours` as the sixth argument to `broadcastWithProgress()`.
+
+```php
+$broadcastId = $manager->broadcastWithProgress(
+ allUsers: $users,
+ messages: [
+ ['message' => 'This message will be removed later.'],
+ ],
+ chatId: $adminChatId,
+ pin: false,
+ concurrency: 10,
+ selfDestructHours: 6
+);
+```
+
+Rules:
+
+* `null` means no automatic deletion.
+* `0` means delete immediately after the broadcast finishes.
+* `1` through `48` means delete after that many hours.
+
+Invalid values below `0` or above `48` throw `InvalidArgumentException`.
+
+Run due self-destruct jobs periodically:
+
+```php
+$results = $manager->runDueSelfDestructJobs();
+```
+
+List self-destruct jobs:
-## Pin last broadcast automatically:
+```php
+$jobs = $manager->listSelfDestructJobs();
+```
+
+Cancel a self-destruct job that has not started:
```php
-$broadcastId = $manager->broadcastWithProgress(..., pin: true);
+$cancelled = $manager->cancelSelfDestructJob($jobId);
```
-## Unpin all messages:
+Self-destruct deletes by `data/broadcasts/{broadcastId}.json`, not by `lastBroadcast.txt`. If a newer broadcast was sent later, the self-destruct job still deletes only the messages from its own broadcast id.
+
+If a broadcast is cancelled midway, the self-destruct job is created only for messages that were actually sent and saved in metadata.
+
+---
+
+## Periodic Runners
+
+Scheduled broadcasts and self-destruct jobs are durable, but they run only when you call the runners.
+Call them from your bot loop, event handler, or internal cron-style task:
```php
-$broadcastId = $manager->unpinAllMessagesForAll(...);
+$manager->runDueScheduledBroadcasts();
+$manager->runDueSelfDestructJobs();
```
---
-## ๐ Inline Buttons & Reply Markup
+## Filter Peers
```php
-$message = [
- 'message' => "Click a button below:",
- 'buttons' => [
- [['text' => "Visit Website", 'url' => "https://example.com"]],
- [['text' => "Start", 'callback_data' => "start_action"]]
- ]
-];
+$filterSub = $manager->filterPeers($users, 'users');
+
+$targets = $filterSub['targets']; // array
+$failed = $filterSub['failed']; // int
+$total = $filterSub['total']; // int
```
+Supported filter types:
+
+* `users`
+* `groups`
+* `channels`
+* `all`
+
---
-## โ๏ธ Advanced Options
+## Last Broadcast Data
+
+`lastBroadcastData()` returns the latest saved status text from `data/LastBrodDATA.txt`, or `false` if it does not exist.
+
+```php
+$lastData = $manager->lastBroadcastData();
+```
-* **Concurrency** - Number of parallel workers.
-* **Filter Types** - 'users', 'groups', 'channels', 'all'
-* **Album Handling** - JSON-based albums with multiple media files.
-* **Retries & Delays** - Automatic retries with backoff.
-* **Progress Tracking** - Real-time broadcast stats with `progress()`.
+---
+
+## Data Files
+
+Legacy files are still written for backward compatibility:
+
+* `data/{peer}/lastBroadcast.txt`
+* `data/{peer}/messages.txt`
+* `data/LastBrodDATA.txt`
+
+New files:
+
+* `data/broadcasts/{broadcastId}.json`
+* `data/scheduled-broadcasts.json`
+* `data/self-destruct-jobs.json`
+* `data/broadcast-errors.log`
+
+Example broadcast metadata:
+
+```json
+{
+ "id": "2b9a24c5ef2cbb10",
+ "type": "send",
+ "createdAt": 1710000000,
+ "status": "done",
+ "total": 100,
+ "sent": 90,
+ "failed": 10,
+ "peers": {
+ "12345": {
+ "lastMessageId": 111,
+ "messageIds": [111, 112],
+ "status": "sent"
+ }
+ },
+ "selfDestruct": {
+ "enabled": true,
+ "hours": 6,
+ "deleteAt": 1710021600,
+ "deleteJobId": "selfdestruct_..."
+ }
+}
+```
---
-## ๐ค Contributing
+## Public API Reference
-1. Fork repo
-2. Create branch: `git checkout -b feature/my-feature`
-3. Commit changes: `git commit -m "Add feature"`
-4. Push branch: `git push origin feature/my-feature`
-5. Open Pull Request
+```php
+public function __construct(API $api);
+
+public function broadcastWithProgress(
+ array $allUsers,
+ array $messages,
+ $chatId = null,
+ bool $pin = false,
+ int $concurrency = 20,
+ ?int $selfDestructHours = null
+): string;
+
+public function editLastBroadcastForAll(
+ array $allUsers,
+ string $newText,
+ $chatId = null,
+ ?array $buttons = null,
+ ?array $media = null,
+ int $concurrency = 20,
+ string $parseMode = 'HTML',
+ ?string $broadcastId = null
+): string;
+
+public function scheduleBroadcastForAll(
+ array $allUsers,
+ array $messages,
+ int $scheduledAt,
+ $chatId = null,
+ bool $pin = false,
+ int $concurrency = 20,
+ ?int $selfDestructHours = null
+): string;
+
+public function runDueScheduledBroadcasts(): array;
+public function cancelScheduledBroadcast(string $scheduleId): bool;
+public function listScheduledBroadcasts(): array;
+
+public function deleteLastBroadcastForAll(
+ array $allUsers,
+ $chatId = null,
+ int $concurrency = 20,
+ ?string $broadcastId = null
+): string;
+public function deleteAllBroadcastsForAll(array $allUsers, $chatId = null, int $concurrency = 20): string;
+public function unpinAllMessagesForAll(array $allUsers, $chatId = null, int $concurrency = 20): string;
+
+public function runDueSelfDestructJobs(): array;
+public function cancelSelfDestructJob(string $jobId): bool;
+public function listSelfDestructJobs(): array;
+
+public function pause(string $id): void;
+public function resume(string $id): void;
+public function cancel(string $id): void;
+
+public function isPaused(string $id): bool;
+public function isCancelled(string $id): bool;
+public function isActive(?string $id = null): bool;
+
+public function hasLastBroadcast(): bool;
+public function hasAllBroadcast(): bool;
+public function progress(?string $id = null): ?array;
+public function lastBroadcastData(): string|false;
+public function filterPeers(array $allUsers, string $filterType = 'users'): array;
+
+public static function setDataDir(string $path): void;
+```
---
-## ๐ License
+## Error Handling
-**GNU AGPL-3.0** - see [LICENSE](LICENSE).
+Hard-fail Telegram RPC errors are counted as failed and are not retried.
+FLOOD_WAIT errors increase the job attempt count, set a future retry time, and are retried up to three attempts.
+
+Internal logging is written to:
+
+```text
+data/broadcast-errors.log
+```
+
+Log write failures are ignored so they do not crash the bot.
---
-## ๐ Changelog
+## Changelog
-See [CHANGELOG.md] for updates.
+See [CHANGELOG.md](CHANGELOG.md) for updates.
---
-โ
**Pro Tips**
+## License
-* Use `pin: true` to pin important broadcasts.
-* Include `buttons` for interactive messages.
-* Adjust `concurrency` for optimal performance.
-* Use `pause/resume/cancel` for safe broadcast control.
+**GNU AGPL-3.0** - see [LICENSE](LICENSE).
diff --git a/src/BroadcastManager.php b/src/BroadcastManager.php
index 18b605f..00021a3 100644
--- a/src/BroadcastManager.php
+++ b/src/BroadcastManager.php
@@ -8,20 +8,55 @@
* @author - WizardLoop
* @copyright - WizardLoop
* @license - https://opensource.org/licenses/AGPL-3.0 AGPLv3
- *
*/
namespace BroadcastTool;
-use Amp\File;
use danog\MadelineProto\API;
use danog\MadelineProto\RPCErrorException;
+use InvalidArgumentException;
+use JsonException;
+use RuntimeException;
use SplQueue;
+use Throwable;
class BroadcastManager
{
+ private const MAX_ATTEMPTS = 3;
+ private const DEFAULT_CONCURRENCY = 20;
+ private const MAX_CONCURRENCY = 50;
+
+ private const SEND_HARD_FAIL_RPCS = [
+ 'INPUT_USER_DEACTIVATED',
+ 'USER_IS_BOT',
+ 'CHAT_WRITE_FORBIDDEN',
+ 'USER_IS_BLOCKED',
+ 'PEER_ID_INVALID',
+ ];
+
+ private const EDIT_HARD_FAIL_RPCS = [
+ 'INPUT_USER_DEACTIVATED',
+ 'USER_IS_BOT',
+ 'CHAT_WRITE_FORBIDDEN',
+ 'USER_IS_BLOCKED',
+ 'PEER_ID_INVALID',
+ 'MESSAGE_ID_INVALID',
+ 'MESSAGE_EDIT_TIME_EXPIRED',
+ 'MESSAGE_AUTHOR_REQUIRED',
+ ];
+
+ private const DELETE_HARD_FAIL_RPCS = [
+ 'INPUT_USER_DEACTIVATED',
+ 'USER_IS_BOT',
+ 'CHAT_WRITE_FORBIDDEN',
+ 'USER_IS_BLOCKED',
+ 'PEER_ID_INVALID',
+ 'MESSAGE_ID_INVALID',
+ ];
+
private API $api;
- private ?array $currentBroadcastState = null;
+ private array $currentBroadcastState = [];
+ private static array $sharedBroadcastState = [];
private static string $dataDir = '';
public function __construct(API $api)
@@ -30,1172 +65,1860 @@ public function __construct(API $api)
}
/**
- * Set data dir
- */
- public static function setDataDir(string $path): void {
- self::$dataDir = rtrim($path, '/');
+ * Set data dir.
+ */
+ public static function setDataDir(string $path): void
+ {
+ self::$dataDir = rtrim($path, '/\\');
}
/**
- * Get data dir
- */
- private static function getDataDir(): string {
+ * Get data dir.
+ */
+ private static function getDataDir(): string
+ {
if (!self::$dataDir) {
self::$dataDir = __DIR__ . '/../data';
}
+
return self::$dataDir;
}
/**
* Send broadcast.
*
- * @return integer ID that can be used
+ * @return string ID that can be used for progress/pause/resume/cancel.
*/
public function broadcastWithProgress(
array $allUsers,
array $messages,
$chatId = null,
bool $pin = false,
- int $concurrency = 20
+ int $concurrency = self::DEFAULT_CONCURRENCY,
+ ?int $selfDestructHours = null
): string {
- $api = $this->api;
-
- $statusId = null;
- if ($chatId) {
- try {
- /* ===== INIT STATUS ===== */
- $status = $api->messages->sendMessage([
- 'peer' => $chatId,
- 'message' => 'โ GATHERING PEERS...',
- 'parse_mode' => 'HTML'
- ]);
- $statusId = $api->extractMessageId($status);
- } catch (\Throwable) { }
- }
-
- $total = count($allUsers);
-
- $broadcastId = bin2hex(random_bytes(8));
-
- /* ===== STATE ===== */
- $state = [
- 'type' => 'send',
- 'sent' => 0,
- 'failed' => 0,
- 'queue' => new \SplQueue(),
- 'inFlight' => [],
- 'lastMessageIds' => [],
- 'paused' => false,
- 'cancel' => false,
- 'done' => false,
- 'startedAt' => microtime(true),
- ];
-
- $this->currentBroadcastState[$broadcastId] = $state;
-
- foreach ($allUsers as $peer) {
- $state['queue']->enqueue([
- 'peer' => $peer,
- 'attempts' => 0,
- 'startedAt' => null,
- 'availableAt' => 0
+ $this->validateSelfDestructHours($selfDestructHours);
+
+ $concurrency = $this->clampConcurrency($concurrency);
+ $total = count($allUsers);
+ $broadcastId = $this->createId();
+ $state = $this->createState($broadcastId, 'send', $total, [
+ 'selfDestruct' => [
+ 'enabled' => $selfDestructHours !== null,
+ 'hours' => $selfDestructHours,
+ 'deleteAt' => null,
+ 'deleteJobId' => null,
+ ],
]);
- }
-
- /* ===== PROGRESS LOOP ===== */
- \Amp\async(function () use ($api, $chatId, $statusId, &$state, $total) {
- $last = '';
- while (!$state['done']) {
- $processed = $state['sent'] + $state['failed'];
- $pending = max(0, $total - $processed);
- $elapsed = microtime(true) - $state['startedAt'];
- $tps = $elapsed > 0 ? round($state['sent'] / $elapsed, 2) : 0;
-
- $text =
- "๐ Broadcast Progress\n\n".
- "".$this->progressBar($processed, max(1,$total))."\n\n".
- "๐จ Sent: {$state['sent']} / $total\n".
- "โ Failed: {$state['failed']}\n".
- "โณ Pending: $pending\n".
- "โก TPS: {$tps} msg/s".
- ($state['paused'] ? "\nโธ Paused" : '').
- ($state['cancel'] ? "\n๐ Cancelled" : '');
-
- if ($chatId && $statusId && $text !== $last) {
- try {
- $api->messages->editMessage([
- 'peer' => $chatId,
- 'id' => $statusId,
- 'message' => $text,
- 'parse_mode' => 'HTML'
- ]);
- $last = $text;
- } catch (\Throwable) {}
- }
- $api->sleep(1);
- }
- });
-
- /* ===== WATCHDOG ===== */
- \Amp\async(function () use (&$state) {
- while (!$state['done']) {
- foreach ($state['inFlight'] as $peer => $job) {
- if ($job['startedAt'] && microtime(true) - $job['startedAt'] > 60) {
- unset($state['inFlight'][$peer]);
- $job['attempts']++;
- $job['startedAt'] = null;
- if ($job['attempts'] >= 3) $state['failed']++;
- else $state['queue']->enqueue($job);
- }
- }
- \danog\MadelineProto\Tools::sleep(5);
- }
- });
-
- /* ===== WORKERS ===== */
- for ($i=0; $i<$concurrency; $i++) {
- \Amp\async(function () use ($api, &$state, $messages, $pin) {
- while (!$state['cancel']) {
-
- if ($state['queue']->isEmpty()) {
- $api->sleep(1);
- continue;
- }
-
- $job = $state['queue']->dequeue();
- if ($job['availableAt'] > microtime(true)) {
- $state['queue']->enqueue($job);
- $api->sleep(0.5);
- continue;
- }
-
- while ($state['paused']) $api->sleep(1);
-
- $peer = $job['peer'];
- $job['startedAt'] = microtime(true);
- $state['inFlight'][$peer] = $job;
- try {
- $lastMessageId = null;
- $albumMessages = [];
+ $this->registerCurrentState($broadcastId, $state);
+ $this->enqueuePeers($state, $allUsers);
+ $this->initializeBroadcastMetadata($broadcastId, 'send', $total, $selfDestructHours);
- foreach ($messages as $m) {
- if (isset($m['albumFile']) && file_exists($m['albumFile'])) {
- $albumMessages = json_decode(\Amp\File\read($m['albumFile']), true) ?? [];
- }
- }
-
- if ($albumMessages) {
- foreach (array_chunk($albumMessages,10) as $chunk) {
- $multi = [];
- foreach ($chunk as $item) {
- $media = $item['media']['type']==='photo'
- ? ['_' => 'inputMediaPhoto','id'=>$item['media']['botApiFileId']]
- : ['_' => 'inputMediaDocument','id'=>$item['media']['botApiFileId']];
- $multi[] = [
- '_' => 'inputSingleMedia',
- 'media' => $media,
- 'message' => $item['caption'] ?? '',
- 'entities' => $item['entities'] ?? []
- ];
- }
- foreach ($api->messages->sendMultiMedia(['peer' => $peer, 'multi_media' => $multi]) as $u) {
-
- $lastMessageId = $api->extractMessageId($u);
- }
- }
- } else {
- foreach ($messages as $m) {
- $method = isset($m['media']) ? 'sendMedia' : 'sendMessage';
- $payload = $m + ['peer'=>$peer,'floodWaitLimit'=>172800];
- if (isset($m['buttons'])) $payload['reply_markup']=$m['buttons'];
- $res = $api->messages->$method($payload);
- $lastMessageId = $api->extractMessageId($res);
- }
- }
+ $statusId = $this->sendStatusMessage($chatId, 'Gathering peers...', $this->buildStatusControls($state));
+ $this->startProgressLoop($chatId, $statusId, $state, 'Broadcast Progress');
- if ($pin && $lastMessageId) {
- $api->messages->updatePinnedMessage([
- 'peer'=>$peer,'id'=>$lastMessageId,'unpin'=>false
- ]);
- }
+ $this->startQueueWorkers(
+ $state,
+ $concurrency,
+ function (array $job, array &$state) use ($messages, $pin, $broadcastId): void {
+ $peer = (string) $job['peer'];
+ $messageIds = $this->sendMessagesToPeer($peer, $messages);
+ $lastMessageId = $messageIds ? end($messageIds) : null;
- unset($state['inFlight'][$peer]);
- $state['sent']++;
- $state['lastMessageIds'][$peer]=(string)$lastMessageId;
-
- } catch (\danog\MadelineProto\RPCErrorException $e) {
- unset($state['inFlight'][$peer]);
-
- if ($e->rpc === 'INPUT_USER_DEACTIVATED' ||
- $e->rpc === 'USER_IS_BOT' ||
- $e->rpc === 'CHAT_WRITE_FORBIDDEN' ||
- $e->rpc === 'USER_IS_BLOCKED' ||
- $e->rpc === 'PEER_ID_INVALID') {
- $state['failed']++;
- continue;
+ if ($pin && $lastMessageId) {
+ $this->api->messages->updatePinnedMessage([
+ 'peer' => $peer,
+ 'id' => $lastMessageId,
+ 'unpin' => false,
+ ]);
}
- if (preg_match('/FLOOD_WAIT_(\d+)/',$e->getMessage(),$m)) {
- $job['attempts']++;
- $job['availableAt']=microtime(true)+(int)$m[1];
- $job['startedAt']=null;
- if ($job['attempts']>=3) $state['failed']++;
- else $state['queue']->enqueue($job);
- continue;
- }
- if (++$job['attempts']>=3) $state['failed']++;
- else { $job['startedAt']=null; $state['queue']->enqueue($job); }
+ $state['sent']++;
+ $state['lastMessageIds'][$peer] = $lastMessageId ? (string) $lastMessageId : '';
- } catch (\Throwable) {
- unset($state['inFlight'][$peer]);
- $state['failed']++;
+ if ($messageIds) {
+ $this->savePeerMessageIds($peer, $messageIds);
}
- $api->sleep(0.25);
- }
- });
- }
-
- /* ===== WAIT FOR REAL FINISH ===== */
- while (!$state['cancel']) {
- if ($state['queue']->isEmpty() && empty($state['inFlight'])) break;
- $api->sleep(1);
- }
+ $this->saveBroadcastPeerMessageIds(
+ $broadcastId,
+ $peer,
+ $messageIds,
+ (int) $state['sent'],
+ (int) $state['failed']
+ );
+ },
+ self::SEND_HARD_FAIL_RPCS
+ );
- $state['done'] = true;
+ $this->waitForCompletion($state);
+ $state['status'] = $state['cancel'] ? 'cancelled' : 'done';
- /* ===== FINAL UPDATE + SAVE ===== */
- $processed = $state['sent'] + $state['failed'];
- $elapsed = microtime(true) - $state['startedAt'];
- $tps = $elapsed>0 ? round($state['sent']/$elapsed,2):0;
+ $this->finalizeBroadcastMetadata(
+ $broadcastId,
+ (string) $state['status'],
+ (int) $state['sent'],
+ (int) $state['failed']
+ );
- $finalText =
- "๐ Broadcast Progress\n\n".
- "".$this->progressBar($processed,max(1,$total))."\n\n".
- "๐จ Sent: {$state['sent']} / $total\n".
- "โ Failed: {$state['failed']}\n".
- "โก TPS: {$tps} msg/s\n".
- ($state['cancel'] ? "๐ Cancelled" : "โ
Finished");
+ $selfDestructJobId = null;
+ if ($selfDestructHours !== null) {
+ $selfDestructJobId = $this->createSelfDestructJob($broadcastId, $selfDestructHours, $chatId, $concurrency);
+ if ($selfDestructJobId !== null) {
+ $metadata = $this->loadBroadcastMetadata($broadcastId);
+ $state['selfDestruct'] = $metadata['selfDestruct'] ?? $state['selfDestruct'];
+ }
- if ($chatId) {
- try { $api->messages->editMessage(['peer'=>$chatId,'id'=>$statusId,'message'=>$finalText,'parse_mode'=>'HTML']); } catch (\Throwable) {}
- }
+ if ($selfDestructJobId !== null && $selfDestructHours === 0) {
+ $this->runSelfDestructJob($selfDestructJobId);
+ }
+ }
- $dir1 = self::getDataDir();
- try { if(!is_dir($dir1))@mkdir($dir1,0777,true); } catch (\Throwable) {}
- try { \Amp\File\write("$dir1/LastBrodDATA.txt",$finalText); } catch (\Throwable) {}
+ $finalText = $this->buildProgressText($state, 'Broadcast Progress', true);
+ $this->editStatusMessage($chatId, $statusId, $finalText);
+ $this->writeLastBroadcastData($finalText);
- foreach ($state['lastMessageIds'] as $peer=>$id) {
- $dir = self::getDataDir() . "/$peer";
- try { if(!is_dir($dir))@mkdir($dir,0777,true); } catch (\Throwable) {}
- try {
- $fh = \Amp\File\openFile("$dir/messages.txt", "a");
- $fh->write((string)$id . "\n");
- $fh->close();
- } catch (\Throwable) {}
- try{\Amp\File\write("$dir/lastBroadcast.txt",$id);}catch(\Throwable){}
+ return $broadcastId;
}
- $this->currentBroadcastState[$broadcastId] = $state;
- return $broadcastId;
-}
-
/**
- * Deletes the last broadcast message for all users.
- *
- * @return integer ID that can be used
+ * Edit the last saved broadcast message for all users.
*/
- public function deleteLastBroadcastForAll(
+ public function editLastBroadcastForAll(
array $allUsers,
+ string $newText,
$chatId = null,
- int $concurrency = 20
+ ?array $buttons = null,
+ ?array $media = null,
+ int $concurrency = self::DEFAULT_CONCURRENCY,
+ string $parseMode = 'HTML',
+ ?string $broadcastId = null
): string {
- $api = $this->api;
- $total = count($allUsers);
-
- $broadcastId = bin2hex(random_bytes(8));
-
- /* ===== STATE ===== */
- $state = [
- 'type' => 'deletelast',
- 'deleted' => 0,
- 'failed' => 0,
- 'flood' => 0,
- 'queue' => new \SplQueue(),
- 'inFlight' => [],
- 'done' => false,
- 'cancel' => false,
- 'startedAt' => microtime(true),
- ];
-
- $this->currentBroadcastState[$broadcastId] = $state;
-
- foreach ($allUsers as $peer) {
- $state['queue']->enqueue([
- 'peer' => (string)$peer,
- 'attempts' => 0,
- 'startedAt' => null,
- 'availableAt' => 0
+ $concurrency = $this->clampConcurrency($concurrency);
+ $targetBroadcastId = $this->normalizeOptionalId($broadcastId);
+ $targets = $targetBroadcastId !== null && $allUsers === []
+ ? $this->broadcastMetadataPeers($targetBroadcastId)
+ : $allUsers;
+ $operationId = $this->createId();
+ $state = $this->createState($operationId, 'edit', count($targets), [
+ 'targetBroadcastId' => $targetBroadcastId,
]);
- }
- $statusId = null;
- if ($chatId) {
- try {
- /* ===== STATUS MESSAGE ===== */
- $status = $api->messages->sendMessage([
- 'peer' => $chatId,
- 'message' => "โ Deleting last broadcast...",
- 'parse_mode' => 'HTML'
- ]);
- $statusId = $api->extractMessageId($status);
- } catch (\Throwable) { }
- }
-
- /* ===== PROGRESS LOOP ===== */
- \Amp\async(function () use ($api, $chatId, $statusId, &$state, $total) {
- $last = '';
- while (!$state['done']) {
- $processed = $state['deleted'] + $state['failed'];
- $pending = max(0, $total - $processed);
- $elapsed = microtime(true) - $state['startedAt'];
- $tps = $elapsed > 0 ? round($state['deleted'] / $elapsed, 2) : 0;
-
- $text =
- "๐ Deleting Last Broadcast\n\n".
- "".$this->progressBar($processed, max(1,$total))."\n\n".
- "โ
Deleted: {$state['deleted']} / $total\n".
- "โ Failed: {$state['failed']}\n".
- "โณ Pending: $pending\n".
- "โก TPS: {$tps} msg/s".
- ($state['cancel'] ? "\n๐ Cancelled" : '');
-
- if ($chatId && $statusId && $text !== $last) {
- try {
- $api->messages->editMessage([
- 'peer' => $chatId,
- 'id' => $statusId,
- 'message' => $text,
- 'parse_mode' => 'HTML'
- ]);
- $last = $text;
- } catch (\Throwable) {}
- }
+ $this->registerCurrentState($operationId, $state);
+ $this->enqueuePeers($state, $targets);
- $api->sleep(1);
- }
- });
+ $statusId = $this->sendStatusMessage($chatId, 'Editing last broadcast...', $this->buildStatusControls($state));
+ $this->startProgressLoop($chatId, $statusId, $state, 'Editing Last Broadcast');
- /* ===== WATCHDOG ===== */
- \Amp\async(function () use (&$state) {
- while (!$state['done']) {
- foreach ($state['inFlight'] as $peer => $job) {
- if ($job['startedAt'] && microtime(true) - $job['startedAt'] > 60) {
- unset($state['inFlight'][$peer]);
- $job['attempts']++;
- $job['startedAt'] = null;
+ $this->startQueueWorkers(
+ $state,
+ $concurrency,
+ function (array $job, array &$state) use ($newText, $buttons, $media, $parseMode, $targetBroadcastId): void {
+ $peer = (string) $job['peer'];
+ $lastMessageId = $targetBroadcastId !== null
+ ? $this->readBroadcastLastMessageId($targetBroadcastId, $peer)
+ : $this->readLastBroadcastMessageId($peer);
- if ($job['attempts'] >= 3) {
- $state['failed']++;
- } else {
- $state['queue']->enqueue($job);
- }
+ if ($lastMessageId <= 0) {
+ $state['failed']++;
+ return;
}
- }
- \danog\MadelineProto\Tools::sleep(5);
- }
- });
- /* ===== WORKERS ===== */
- for ($i = 0; $i < $concurrency; $i++) {
- \Amp\async(function () use ($api, &$state) {
- while (!$state['cancel']) {
+ $payload = [
+ 'peer' => $peer,
+ 'id' => $lastMessageId,
+ 'message' => $newText,
+ 'parse_mode' => $parseMode,
+ 'floodWaitLimit' => 172800,
+ ];
- if ($state['queue']->isEmpty()) {
- $api->sleep(1);
- continue;
+ if ($buttons !== null) {
+ $payload['reply_markup'] = $buttons;
}
- $job = $state['queue']->dequeue();
-
- if ($job['availableAt'] > microtime(true)) {
- $state['queue']->enqueue($job);
- $api->sleep(0.5);
- continue;
+ if ($media !== null) {
+ $payload['media'] = $media;
}
- $peer = $job['peer'];
- $job['startedAt'] = microtime(true);
- $state['inFlight'][$peer] = $job;
+ $this->api->messages->editMessage($payload);
+ $state['edited']++;
+ },
+ self::EDIT_HARD_FAIL_RPCS,
+ function (RPCErrorException $e, array &$job, array &$state): bool {
+ unset($job);
- try {
- $file = self::getDataDir() ."/$peer/lastBroadcast.txt";
- if (!file_exists($file)) {
- $state['failed']++;
- unset($state['inFlight'][$peer]);
- continue;
- }
+ if (($e->rpc ?? '') === 'MESSAGE_NOT_MODIFIED' || str_contains($e->getMessage(), 'MESSAGE_NOT_MODIFIED')) {
+ $state['unchanged']++;
+ return true;
+ }
- $lastMessageId = (int)\Amp\File\read($file);
+ return false;
+ }
+ );
- $api->messages->deleteMessages([
- 'peer' => $peer,
- 'id' => [$lastMessageId],
- 'revoke' => true
- ]);
+ $this->waitForCompletion($state);
+ $state['status'] = $state['cancel'] ? 'cancelled' : 'done';
- @unlink($file);
+ $this->editStatusMessage($chatId, $statusId, $this->buildProgressText($state, 'Editing Last Broadcast', true));
- $state['deleted']++;
- unset($state['inFlight'][$peer]);
+ return $operationId;
+ }
- } catch (\danog\MadelineProto\RPCErrorException $e) {
- unset($state['inFlight'][$peer]);
+ /**
+ * Schedule a broadcast. Due broadcasts can be run with runDueScheduledBroadcasts().
+ */
+ public function scheduleBroadcastForAll(
+ array $allUsers,
+ array $messages,
+ int $scheduledAt,
+ $chatId = null,
+ bool $pin = false,
+ int $concurrency = self::DEFAULT_CONCURRENCY,
+ ?int $selfDestructHours = null
+ ): string {
+ $this->validateSelfDestructHours($selfDestructHours);
+ $this->assertJsonEncodable($allUsers, 'allUsers');
+ $this->assertJsonEncodable($messages, 'messages');
+
+ $concurrency = $this->clampConcurrency($concurrency);
+ $scheduleId = $this->createId('schedule');
+ $jobs = $this->loadScheduledBroadcasts();
+
+ $jobs[$scheduleId] = [
+ 'id' => $scheduleId,
+ 'status' => 'scheduled',
+ 'scheduledAt' => $scheduledAt,
+ 'createdAt' => time(),
+ 'allUsers' => array_values($allUsers),
+ 'messages' => array_values($messages),
+ 'chatId' => $chatId,
+ 'pin' => $pin,
+ 'concurrency' => $concurrency,
+ 'selfDestructHours' => $selfDestructHours,
+ 'broadcastId' => null,
+ 'error' => null,
+ ];
- if (preg_match('/FLOOD_WAIT_(\d+)/', $e->getMessage(), $m)) {
- $state['flood']++;
- $job['attempts']++;
- $job['availableAt'] = microtime(true) + (int)$m[1];
- $job['startedAt'] = null;
+ $this->saveScheduledBroadcasts($jobs);
- if ($job['attempts'] >= 3) {
- $state['failed']++;
- } else {
- $state['queue']->enqueue($job);
- }
- continue;
- }
+ if ($scheduledAt <= time()) {
+ $this->runScheduledBroadcast($scheduleId);
+ }
- if (++$job['attempts'] >= 3) {
- $state['failed']++;
- } else {
- $job['startedAt'] = null;
- $state['queue']->enqueue($job);
- }
+ return $scheduleId;
+ }
- } catch (\Throwable) {
- $state['failed']++;
- unset($state['inFlight'][$peer]);
- }
+ /**
+ * Run all scheduled broadcasts whose scheduledAt timestamp has passed.
+ */
+ public function runDueScheduledBroadcasts(): array
+ {
+ $jobs = $this->loadScheduledBroadcasts();
+ $results = [];
+ $now = time();
- $api->sleep(0.25);
+ foreach ($jobs as $scheduleId => $job) {
+ if (($job['status'] ?? null) !== 'scheduled') {
+ continue;
+ }
+
+ if ((int) ($job['scheduledAt'] ?? 0) > $now) {
+ continue;
}
- });
- }
- /* ===== WAIT FOR REAL FINISH ===== */
- while (!$state['cancel']) {
- if ($state['queue']->isEmpty() && empty($state['inFlight'])) {
- break;
+ $results[$scheduleId] = $this->runScheduledBroadcast((string) $scheduleId);
}
- $api->sleep(1);
+
+ return $results;
}
- $state['done'] = true;
+ public function cancelScheduledBroadcast(string $scheduleId): bool
+ {
+ $jobs = $this->loadScheduledBroadcasts();
- /* ===== FINAL UPDATE ===== */
- $processed = $state['deleted'] + $state['failed'];
- $elapsed = microtime(true) - $state['startedAt'];
- $tps = $elapsed > 0 ? round($state['deleted'] / $elapsed, 2) : 0;
+ if (!isset($jobs[$scheduleId]) || ($jobs[$scheduleId]['status'] ?? null) !== 'scheduled') {
+ return false;
+ }
- $finalText =
- "๐ Deleting Last Broadcast\n\n".
- "".$this->progressBar($processed, max(1,$total))."\n\n".
- "โ
Deleted: {$state['deleted']} / $total\n".
- "โ Failed: {$state['failed']}\n".
- ($state['cancel'] ? "๐ Cancelled" : "โ
Finished");
+ $jobs[$scheduleId]['status'] = 'cancelled';
+ $jobs[$scheduleId]['cancelledAt'] = time();
+ $this->saveScheduledBroadcasts($jobs);
- if ($chatId) {
- try {
- $api->messages->editMessage([
- 'peer' => $chatId,
- 'id' => $statusId,
- 'message' => $finalText,
- 'parse_mode' => 'HTML'
- ]);
- } catch (\Throwable) {}
+ return true;
}
- $this->currentBroadcastState[$broadcastId] = $state;
- return $broadcastId;
-}
+ public function listScheduledBroadcasts(): array
+ {
+ $items = [];
+
+ foreach ($this->loadScheduledBroadcasts() as $id => $job) {
+ $items[$id] = [
+ 'id' => (string) ($job['id'] ?? $id),
+ 'status' => (string) ($job['status'] ?? 'unknown'),
+ 'scheduledAt' => (int) ($job['scheduledAt'] ?? 0),
+ 'createdAt' => (int) ($job['createdAt'] ?? 0),
+ 'broadcastId' => $job['broadcastId'] ?? null,
+ 'pin' => (bool) ($job['pin'] ?? false),
+ 'concurrency' => (int) ($job['concurrency'] ?? self::DEFAULT_CONCURRENCY),
+ 'selfDestructHours' => $job['selfDestructHours'] ?? null,
+ 'totalUsers' => is_array($job['allUsers'] ?? null) ? count($job['allUsers']) : 0,
+ 'totalMessages' => is_array($job['messages'] ?? null) ? count($job['messages']) : 0,
+ 'error' => $job['error'] ?? null,
+ ];
+ }
+
+ return $items;
+ }
/**
- * Deletes all broadcast message for all users.
+ * Deletes the last broadcast message for all users.
*
- * @return integer ID that can be used
+ * @return string ID that can be used for progress/pause/resume/cancel.
*/
- public function deleteAllBroadcastsForAll(
+ public function deleteLastBroadcastForAll(
array $allUsers,
$chatId = null,
- int $concurrency = 20
+ int $concurrency = self::DEFAULT_CONCURRENCY,
+ ?string $broadcastId = null
): string {
- $api = $this->api;
- $total = count($allUsers);
-
- $broadcastId = bin2hex(random_bytes(8));
-
- $state = [
- 'type' => 'deleteall',
- 'deleted' => 0,
- 'failed' => 0,
- 'flood' => 0,
- 'queue' => new \SplQueue(),
- 'inFlight' => [],
- 'done' => false,
- 'cancel' => false,
- 'startedAt' => microtime(true),
- ];
-
- $this->currentBroadcastState[$broadcastId] = $state;
-
- foreach ($allUsers as $peer) {
- $state['queue']->enqueue([
- 'peer' => (string)$peer,
- 'attempts' => 0,
- 'startedAt' => null,
- 'availableAt' => 0
+ $concurrency = $this->clampConcurrency($concurrency);
+ $targetBroadcastId = $this->normalizeOptionalId($broadcastId);
+ $targets = $targetBroadcastId !== null && $allUsers === []
+ ? $this->broadcastMetadataPeers($targetBroadcastId)
+ : $allUsers;
+ $operationId = $this->createId();
+ $state = $this->createState($operationId, 'deletelast', count($targets), [
+ 'targetBroadcastId' => $targetBroadcastId,
]);
- }
- $statusId = null;
- if ($chatId) {
- try {
- $status = $api->messages->sendMessage([
- 'peer' => $chatId,
- 'message' => "โ Deleting all broadcasts...",
- 'parse_mode' => 'HTML'
- ]);
- $statusId = $api->extractMessageId($status);
- } catch (\Throwable) { }
- }
-
- \Amp\async(function () use (&$state) {
- while (!$state['done']) {
- foreach ($state['inFlight'] as $peer => $job) {
- if ($job['startedAt'] && microtime(true) - $job['startedAt'] > 60) {
- unset($state['inFlight'][$peer]);
- $job['attempts']++;
- $job['startedAt'] = null;
-
- if ($job['attempts'] >= 3) {
- $state['failed']++;
- } else {
- $state['queue']->enqueue($job);
- }
- }
- }
- \danog\MadelineProto\Tools::sleep(5);
- }
- });
+ $this->registerCurrentState($operationId, $state);
+ $this->enqueuePeers($state, $targets);
- for ($i = 0; $i < $concurrency; $i++) {
- \Amp\async(function () use ($api, &$state, $chatId, $statusId, $total) {
- while (!$state['cancel']) {
- if ($state['queue']->isEmpty()) {
- $api->sleep(1);
- continue;
- }
+ $statusId = $this->sendStatusMessage($chatId, 'Deleting last broadcast...', $this->buildStatusControls($state));
+ $this->startProgressLoop($chatId, $statusId, $state, 'Deleting Last Broadcast');
- $job = $state['queue']->dequeue();
+ $this->startQueueWorkers(
+ $state,
+ $concurrency,
+ function (array $job, array &$state) use ($targetBroadcastId): void {
+ $peer = (string) $job['peer'];
+ $lastMessageId = $targetBroadcastId !== null
+ ? $this->readBroadcastLastMessageId($targetBroadcastId, $peer)
+ : $this->readLastBroadcastMessageId($peer);
- if ($job['availableAt'] > microtime(true)) {
- $state['queue']->enqueue($job);
- $api->sleep(0.5);
- continue;
+ if ($lastMessageId <= 0) {
+ $state['failed']++;
+ return;
}
- $peer = $job['peer'];
- $job['startedAt'] = microtime(true);
- $state['inFlight'][$peer] = $job;
-
- $userDeleted = false;
+ $this->api->messages->deleteMessages([
+ 'peer' => $peer,
+ 'id' => [$lastMessageId],
+ 'revoke' => true,
+ ]);
- try {
- $file = self::getDataDir() ."/$peer/messages.txt";
- if (!file_exists($file)) {
- $state['failed']++;
- unset($state['inFlight'][$peer]);
- continue;
- }
+ if ($targetBroadcastId !== null) {
+ $this->markBroadcastPeerMessageDeleted($targetBroadcastId, $peer, $lastMessageId);
+ } else {
+ $this->deleteFile($this->peerDataPath($peer, 'lastBroadcast.txt'));
+ }
- $msgIds = array_filter(
- array_map('intval', explode("\n", trim(\Amp\File\read($file))))
- );
+ $state['deleted']++;
+ },
+ self::DELETE_HARD_FAIL_RPCS
+ );
- foreach ($msgIds as $mid) {
- if ($mid <= 0) continue;
+ $this->waitForCompletion($state);
+ $state['status'] = $state['cancel'] ? 'cancelled' : 'done';
- try {
- $api->messages->deleteMessages([
- 'peer' => $peer,
- 'id' => [$mid],
- 'revoke' => true
- ]);
- $userDeleted = true;
- } catch (\danog\MadelineProto\RPCErrorException $e) {
- $msg = $e->getMessage();
+ $this->editStatusMessage($chatId, $statusId, $this->buildProgressText($state, 'Deleting Last Broadcast', true));
- if (preg_match('/FLOOD_WAIT_(\d+)/', $msg, $m)) {
- $state['flood']++;
- unset($state['inFlight'][$peer]);
+ return $operationId;
+ }
- $job['attempts']++;
- $job['availableAt'] = microtime(true) + (int)$m[1];
- $job['startedAt'] = null;
+ /**
+ * Deletes all broadcast messages for all users.
+ *
+ * @return string ID that can be used for progress/pause/resume/cancel.
+ */
+ public function deleteAllBroadcastsForAll(
+ array $allUsers,
+ $chatId = null,
+ int $concurrency = self::DEFAULT_CONCURRENCY
+ ): string {
+ $concurrency = $this->clampConcurrency($concurrency);
+ $broadcastId = $this->createId();
+ $state = $this->createState($broadcastId, 'deleteall', count($allUsers));
- if ($job['attempts'] >= 3) $state['failed']++;
- else $state['queue']->enqueue($job);
- continue 2;
- }
+ $this->registerCurrentState($broadcastId, $state);
+ $this->enqueuePeers($state, $allUsers);
- if (str_contains($msg, 'USER_IS_BLOCKED') || str_contains($msg, 'PEER_ID_INVALID')) {
- continue;
- }
+ $statusId = $this->sendStatusMessage($chatId, 'Deleting all broadcasts...', $this->buildStatusControls($state));
+ $this->startProgressLoop($chatId, $statusId, $state, 'Deleting All Broadcasts');
- throw $e;
- }
- }
+ $this->startQueueWorkers(
+ $state,
+ $concurrency,
+ function (array $job, array &$state): void {
+ $peer = (string) $job['peer'];
+ $file = $this->peerDataPath($peer, 'messages.txt');
- if ($userDeleted) $state['deleted']++;
- else $state['failed']++;
+ if (!is_file($file)) {
+ $state['failed']++;
+ return;
+ }
- $file2 = self::getDataDir() ."/$peer/lastBroadcast.txt";
- try { @unlink($file2); } catch (\Throwable) { }
- try { @unlink($file); } catch (\Throwable) { }
- unset($state['inFlight'][$peer]);
+ $msgIds = array_values(array_filter(
+ array_map('intval', explode("\n", trim((string) file_get_contents($file)))),
+ static fn (int $id): bool => $id > 0
+ ));
- $processed = $state['deleted'] + $state['failed'];
- $pending = max(0, $total - $processed);
- $elapsed = microtime(true) - $state['startedAt'];
- $tps = $elapsed > 0 ? round($state['deleted'] / $elapsed, 2) : 0;
+ if (!$msgIds) {
+ $state['failed']++;
+ return;
+ }
- $text =
- "๐ Deleting All Broadcasts\n\n".
- "".$this->progressBar($processed, max(1,$total))."\n\n".
- "โ
Deleted: {$state['deleted']} / $total\n".
- "โ Failed: {$state['failed']}\n".
- "โณ Pending: $pending\n".
- "โก TPS: {$tps} msg/s";
+ $userDeleted = false;
- if ($chatId) {
+ foreach ($msgIds as $messageId) {
try {
- $api->messages->editMessage([
- 'peer' => $chatId,
- 'id' => $statusId,
- 'message' => $text,
- 'parse_mode' => 'HTML'
+ $this->api->messages->deleteMessages([
+ 'peer' => $peer,
+ 'id' => [$messageId],
+ 'revoke' => true,
]);
- } catch (\Throwable) {}
- }
+ $userDeleted = true;
+ } catch (RPCErrorException $e) {
+ $message = $e->getMessage();
- } catch (\Throwable) {
- unset($state['inFlight'][$peer]);
- if (++$job['attempts'] >= 3) $state['failed']++;
- else { $job['startedAt'] = null; $state['queue']->enqueue($job); }
- }
+ if ($this->parseFloodWait($e) !== null) {
+ throw $e;
+ }
- $api->sleep(0.25);
- }
- });
- }
+ if (str_contains($message, 'USER_IS_BLOCKED') || str_contains($message, 'PEER_ID_INVALID')) {
+ continue;
+ }
- while (!$state['cancel']) {
- if ($state['queue']->isEmpty() && empty($state['inFlight'])) break;
- $api->sleep(1);
- }
+ throw $e;
+ }
+ }
- $state['done'] = true;
+ if ($userDeleted) {
+ $state['deleted']++;
+ } else {
+ $state['failed']++;
+ }
- $processed = $state['deleted'] + $state['failed'];
- $elapsed = microtime(true) - $state['startedAt'];
- $tps = $elapsed > 0 ? round($state['deleted'] / $elapsed, 2) : 0;
+ $this->deleteFile($this->peerDataPath($peer, 'lastBroadcast.txt'));
+ $this->deleteFile($file);
+ },
+ self::DELETE_HARD_FAIL_RPCS,
+ null,
+ true
+ );
- $finalText =
- "๐ Deleting All Broadcasts\n\n".
- "".$this->progressBar($processed, max(1,$total))."\n\n".
- "โ
Deleted: {$state['deleted']} / $total\n".
- "โ Failed: {$state['failed']}\n".
- ($state['cancel'] ? "๐ Cancelled" : "โ
Finished");
+ $this->waitForCompletion($state);
+ $state['status'] = $state['cancel'] ? 'cancelled' : 'done';
- if ($chatId) {
- try {
- $api->messages->editMessage([
- 'peer' => $chatId,
- 'id' => $statusId,
- 'message' => $finalText,
- 'parse_mode' => 'HTML'
- ]);
- } catch (\Throwable) {}
- }
+ $this->editStatusMessage($chatId, $statusId, $this->buildProgressText($state, 'Deleting All Broadcasts', true));
- $this->currentBroadcastState[$broadcastId] = $state;
- return $broadcastId;
-}
+ return $broadcastId;
+ }
/**
- * Unpin all messages for all users
+ * Unpin all messages for all users.
*
- * @return integer ID that can be used
+ * @return string ID that can be used for progress/pause/resume/cancel.
*/
public function unpinAllMessagesForAll(
array $allUsers,
$chatId = null,
- int $concurrency = 20
+ int $concurrency = self::DEFAULT_CONCURRENCY
): string {
- $api = $this->api;
-
- $statusId = null;
- if ($chatId) {
- try {
- /* ===== STATUS MESSAGE ===== */
- $status = $api->messages->sendMessage([
- 'peer' => $chatId,
- 'message' => 'โ GATHERING PEERS...',
- 'parse_mode' => 'HTML'
- ]);
- $statusId = $api->extractMessageId($status);
- } catch (\Throwable) { }
- }
-
- $total = count($allUsers);
-
- $broadcastId = bin2hex(random_bytes(8));
-
- /* ===== STATE ===== */
- $state = [
- 'type' => 'unpin',
- 'unpin' => 0,
- 'failed' => 0,
- 'flood' => 0,
- 'queue' => new \SplQueue(),
- 'inFlight' => [],
- 'done' => false,
- 'cancel' => false,
- 'startedAt' => microtime(true),
- ];
-
- $this->currentBroadcastState[$broadcastId] = $state;
+ $concurrency = $this->clampConcurrency($concurrency);
+ $broadcastId = $this->createId();
+ $state = $this->createState($broadcastId, 'unpin', count($allUsers));
+
+ $this->registerCurrentState($broadcastId, $state);
+ $this->enqueuePeers($state, $allUsers);
+
+ $statusId = $this->sendStatusMessage($chatId, 'Starting unpin...', $this->buildStatusControls($state));
+ $this->startProgressLoop($chatId, $statusId, $state, 'Unpinning Messages');
+
+ $this->startQueueWorkers(
+ $state,
+ $concurrency,
+ function (array $job, array &$state): void {
+ $this->api->messages->unpinAllMessages([
+ 'peer' => (string) $job['peer'],
+ ]);
+
+ $state['unpin']++;
+ },
+ self::SEND_HARD_FAIL_RPCS
+ );
- foreach ($allUsers as $peer) {
- $state['queue']->enqueue([
- 'peer' => $peer,
- 'attempts' => 0,
- 'startedAt' => null,
- 'availableAt' => 0
- ]);
- }
+ $this->waitForCompletion($state);
+ $state['status'] = $state['cancel'] ? 'cancelled' : 'done';
- if ($chatId) {
- try {
- $api->messages->editMessage([
- 'peer' => $chatId,
- 'id' => $statusId,
- 'message' => "๐โ Starting unpin for all subscribers...",
- 'parse_mode' => 'HTML'
- ]);
- } catch (\Throwable) { }
- }
-
- /* ===== PROGRESS LOOP ===== */
- \Amp\async(function () use ($api, $chatId, $statusId, &$state, $total) {
- $last = '';
- while (!$state['done']) {
- $processed = $state['unpin'] + $state['failed'];
- $pending = max(0, $total - $processed);
- $elapsed = microtime(true) - $state['startedAt'];
- $tps = $elapsed > 0 ? round($state['unpin'] / $elapsed, 2) : 0;
-
- $text =
- "๐ Unpinning Messages\n\n".
- "".$this->progressBar($processed, max(1,$total))."\n\n".
- "๐ค Unpinned: {$state['unpin']} / $total\n".
- "โ Failed: {$state['failed']}\n".
- "โ FLOOD_WAIT: {$state['flood']}\n".
- "โณ Pending: $pending\n".
- "โก TPS: {$tps} msg/s".
- ($state['cancel'] ? "\n๐ Cancelled" : '');
-
- if ($chatId && $statusId && $text !== $last) {
- try {
- $api->messages->editMessage([
- 'peer' => $chatId,
- 'id' => $statusId,
- 'message' => $text,
- 'parse_mode' => 'HTML'
- ]);
- $last = $text;
- } catch (\Throwable) {}
- }
+ $this->editStatusMessage($chatId, $statusId, $this->buildProgressText($state, 'Unpinning Messages', true));
- $api->sleep(1);
- }
- });
+ return $broadcastId;
+ }
- /* ===== WATCHDOG ===== */
- \Amp\async(function () use (&$state) {
- while (!$state['done']) {
- foreach ($state['inFlight'] as $peer => $job) {
- if ($job['startedAt'] && microtime(true) - $job['startedAt'] > 60) {
- unset($state['inFlight'][$peer]);
- $job['attempts']++;
- $job['startedAt'] = null;
+ public function runDueSelfDestructJobs(): array
+ {
+ $jobs = $this->loadSelfDestructJobs();
+ $results = [];
+ $now = time();
- if ($job['attempts'] >= 3) $state['failed']++;
- else $state['queue']->enqueue($job);
- }
+ foreach ($jobs as $jobId => $job) {
+ if (($job['status'] ?? null) !== 'scheduled') {
+ continue;
}
- \danog\MadelineProto\Tools::sleep(5);
- }
- });
-
- /* ===== WORKERS ===== */
- for ($i = 0; $i < $concurrency; $i++) {
- \Amp\async(function () use ($api, &$state) {
- while (!$state['cancel']) {
- if ($state['queue']->isEmpty()) {
- $api->sleep(1);
- continue;
- }
-
- $job = $state['queue']->dequeue();
-
- if ($job['availableAt'] > microtime(true)) {
- $state['queue']->enqueue($job);
- $api->sleep(0.5);
- continue;
- }
-
- $peer = $job['peer'];
- $job['startedAt'] = microtime(true);
- $state['inFlight'][$peer] = $job;
-
- try {
- $api->messages->unpinAllMessages([
- 'peer' => $peer
- ]);
- unset($state['inFlight'][$peer]);
- $state['unpin']++;
-
- } catch (\danog\MadelineProto\RPCErrorException $e) {
- unset($state['inFlight'][$peer]);
- $msg = $e->getMessage();
-
- if (preg_match('/FLOOD_WAIT_(\d+)/', $msg, $m)) {
- $state['flood']++;
- $job['attempts']++;
- $job['availableAt'] = microtime(true) + (int)($m[1] ?? 5);
- $job['startedAt'] = null;
-
- if ($job['attempts'] >= 3) $state['failed']++;
- else $state['queue']->enqueue($job);
- continue;
- }
- if ($job['attempts']++ >= 3) $state['failed']++;
- else {
- $job['startedAt'] = null;
- $state['queue']->enqueue($job);
- $api->sleep(0.5);
- }
+ if ((int) ($job['deleteAt'] ?? 0) > $now) {
+ continue;
+ }
- } catch (\Throwable) {
- unset($state['inFlight'][$peer]);
- $state['failed']++;
- }
+ $results[$jobId] = $this->runSelfDestructJob((string) $jobId);
+ }
- $api->sleep(0.25);
- }
- });
+ return $results;
}
- /* ===== WAIT FOR FINISH ===== */
- while (!$state['cancel']) {
- if ($state['queue']->isEmpty() && empty($state['inFlight'])) break;
- $api->sleep(1);
- }
+ public function cancelSelfDestructJob(string $jobId): bool
+ {
+ $jobs = $this->loadSelfDestructJobs();
- $state['done'] = true;
+ if (!isset($jobs[$jobId]) || ($jobs[$jobId]['status'] ?? null) !== 'scheduled') {
+ return false;
+ }
- /* ===== FINAL UPDATE ===== */
- $processed = $state['unpin'] + $state['failed'];
- $finalText =
- "๐ Unpinning Messages\n\n".
- "".$this->progressBar($processed, max(1,$total))."\n\n".
- "๐ค Unpinned: {$state['unpin']} / $total\n".
- "โ Failed: {$state['failed']}\n".
- ($state['cancel'] ? "๐ Cancelled" : "โ
Finished");
+ $jobs[$jobId]['status'] = 'cancelled';
+ $jobs[$jobId]['cancelledAt'] = time();
+ $this->saveSelfDestructJobs($jobs);
- if ($chatId) {
- try {
- $api->messages->editMessage([
- 'peer' => $chatId,
- 'id' => $statusId,
- 'message' => $finalText,
- 'parse_mode' => 'HTML'
- ]);
- } catch (\Throwable) {}
+ return true;
}
- $this->currentBroadcastState[$broadcastId] = $state;
- return $broadcastId;
-}
+ public function listSelfDestructJobs(): array
+ {
+ $items = [];
+
+ foreach ($this->loadSelfDestructJobs() as $id => $job) {
+ $items[$id] = [
+ 'id' => (string) ($job['id'] ?? $id),
+ 'broadcastId' => (string) ($job['broadcastId'] ?? ''),
+ 'status' => (string) ($job['status'] ?? 'unknown'),
+ 'deleteAt' => (int) ($job['deleteAt'] ?? 0),
+ 'createdAt' => (int) ($job['createdAt'] ?? 0),
+ 'concurrency' => (int) ($job['concurrency'] ?? self::DEFAULT_CONCURRENCY),
+ 'chatId' => $job['chatId'] ?? null,
+ 'totalPeers' => (int) ($job['totalPeers'] ?? 0),
+ 'stats' => $job['stats'] ?? null,
+ 'error' => $job['error'] ?? null,
+ ];
+ }
- /**
- * Progress bar
- */
- private function progressBar(int $current, int $total): string {
- $len = 20;
- $filled = (int) round($current / max($total,1) * $len);
- return str_repeat('โ',$filled).str_repeat('โ',$len-$filled).' '.round(($current/max($total,1))*100).'%';
+ return $items;
}
/**
- * Pause running broadcast
+ * Pause running broadcast.
*/
- public function pause(string $id): void {
+ public function pause(string $id): void
+ {
+ if (isset(self::$sharedBroadcastState[$id])) {
+ self::$sharedBroadcastState[$id]['paused'] = true;
+ }
+
if (isset($this->currentBroadcastState[$id])) {
$this->currentBroadcastState[$id]['paused'] = true;
}
}
/**
- * Resume running broadcast
+ * Resume running broadcast.
*/
- public function resume(string $id): void {
+ public function resume(string $id): void
+ {
+ if (isset(self::$sharedBroadcastState[$id])) {
+ self::$sharedBroadcastState[$id]['paused'] = false;
+ }
+
if (isset($this->currentBroadcastState[$id])) {
$this->currentBroadcastState[$id]['paused'] = false;
}
}
/**
- * cancel running broadcast
+ * Cancel running broadcast.
*/
- public function cancel(string $id): void {
+ public function cancel(string $id): void
+ {
+ if (isset(self::$sharedBroadcastState[$id])) {
+ self::$sharedBroadcastState[$id]['cancel'] = true;
+ }
+
if (isset($this->currentBroadcastState[$id])) {
$this->currentBroadcastState[$id]['cancel'] = true;
- $this->currentBroadcastState[$id]['inFlight'] = [];
}
}
/**
- * Check if broadcast is paused
+ * Check if broadcast is paused.
*/
- public function isPaused(string $id): bool {
- return $this->currentBroadcastState[$id]['paused'] ?? false;
+ public function isPaused(string $id): bool
+ {
+ return self::$sharedBroadcastState[$id]['paused'] ?? $this->currentBroadcastState[$id]['paused'] ?? false;
}
/**
- * Check if broadcast is cancelled
+ * Check if broadcast is cancelled.
*/
- public function isCancelled(string $id): bool {
- return $this->currentBroadcastState[$id]['cancel'] ?? false;
+ public function isCancelled(string $id): bool
+ {
+ return self::$sharedBroadcastState[$id]['cancel'] ?? $this->currentBroadcastState[$id]['cancel'] ?? false;
}
/**
- * Check if broadcast is active
+ * Check if broadcast is active.
*/
- public function isActive(?string $id = null): bool {
- if (!$id || !isset($this->currentBroadcastState[$id])) {
+ public function isActive(?string $id = null): bool
+ {
+ if (!$id || (!isset(self::$sharedBroadcastState[$id]) && !isset($this->currentBroadcastState[$id]))) {
return false;
}
- $state = $this->currentBroadcastState[$id];
-
- if (!$state) return false;
+ $state = self::$sharedBroadcastState[$id] ?? $this->currentBroadcastState[$id];
return (
- empty($state['done']) &&
- empty($state['cancel']) &&
- empty($state['paused'])
+ empty($state['done'])
+ && empty($state['cancel'])
+ && empty($state['paused'])
);
}
/**
- * Check if there is a last broadcast message saved for deletion
+ * Check if there is a last broadcast message saved for deletion.
*/
- public function hasLastBroadcast(): bool {
- foreach (glob(self::getDataDir() ."/*/lastBroadcast.txt") as $file) {
- if (file_exists($file) && trim(\Amp\File\read($file))) {
+ public function hasLastBroadcast(): bool
+ {
+ foreach (glob(self::getDataDir() . '/*/lastBroadcast.txt') ?: [] as $file) {
+ if (is_file($file) && trim((string) file_get_contents($file)) !== '') {
return true;
}
}
+
return false;
}
/**
- * Check if there is a broadcast messages saved for deletion
+ * Check if there are broadcast messages saved for deletion.
*/
- public function hasAllBroadcast(): bool {
- foreach (glob(self::getDataDir() ."/*/messages.txt") as $file) {
- if (file_exists($file) && trim(\Amp\File\read($file))) {
+ public function hasAllBroadcast(): bool
+ {
+ foreach (glob(self::getDataDir() . '/*/messages.txt') ?: [] as $file) {
+ if (is_file($file) && trim((string) file_get_contents($file)) !== '') {
return true;
}
}
+
return false;
}
/**
- * normalize broadcast state
+ * Get current broadcast progress.
*/
- private function normalizeBroadcastState(array $state): array {
- return [
- 'sent' => $state['sent'] ?? 0,
- 'deleted' => $state['deleted'] ?? 0,
- 'unpin' => $state['unpin'] ?? 0,
- 'failed' => $state['failed'] ?? 0,
- 'flood' => $state['flood'] ?? 0,
+ public function progress(?string $id = null): ?array
+ {
+ if (!$id || (!isset(self::$sharedBroadcastState[$id]) && !isset($this->currentBroadcastState[$id]))) {
+ return null;
+ }
- 'queue' => $state['queue'] ?? null,
- 'inFlight' => $state['inFlight'] ?? [],
+ $state = $this->normalizeBroadcastState(self::$sharedBroadcastState[$id] ?? $this->currentBroadcastState[$id]);
- 'done' => $state['done'] ?? false,
- 'paused' => $state['paused'] ?? false,
- 'cancel' => $state['cancel'] ?? false,
+ $sent = (int) $state['sent'];
+ $deleted = (int) $state['deleted'];
+ $unpin = (int) $state['unpin'];
+ $edited = (int) $state['edited'];
+ $unchanged = (int) $state['unchanged'];
+ $scheduled = (int) $state['scheduled'];
+ $failed = (int) $state['failed'];
+ $flood = (int) $state['flood'];
- 'startedAt' => $state['startedAt'] ?? null,
- ];
- }
-
- /**
- * Get current broadcast progress
- *
- * @return array|null {
- * processed: int, // total processed items (sent + deleted + unpin + failed)
- * success: int, // successful operations (sent + deleted + unpin)
- * failed: int, // failed operations count
- * pending: int, // remaining items in queue
- * flood: int, // FLOOD_WAIT occurrences
- *
- * progressPercent: float, // completion percentage (processed / total)
- *
- * breakdown: array {
- * sent: int,
- * deleted: int,
- * unpin: int
- * },
- *
- * done: bool, // process finished
- * paused: bool, // process paused
- * cancel: bool, // process cancelled
- *
- * startedAt: float // microtime start timestamp
- * }
- */
- public function progress(?string $id = null): ?array {
- if (!$id || !isset($this->currentBroadcastState[$id])) {
- return null;
- }
-
- $state = $this->normalizeBroadcastState($this->currentBroadcastState[$id]);
-
- $sent = (int)($state['sent'] ?? 0);
- $deleted = (int)($state['deleted'] ?? 0);
- $unpin = (int)($state['unpin'] ?? 0);
- $failed = (int)($state['failed'] ?? 0);
- $flood = (int)($state['flood'] ?? 0);
-
- $processed = $sent + $deleted + $unpin + $failed;
- $success = $sent + $deleted + $unpin;
-
- $pending = ($state['queue'] instanceof \SplQueue)
- ? $state['queue']->count()
- : 0;
-
- $total = $processed + $pending;
+ $processed = $this->processedCount($state);
+ $success = $sent + $deleted + $unpin + $edited + $unchanged;
+ $pending = $this->pendingCount($state, $processed);
+ $total = (int) $state['total'];
+ $elapsed = $this->elapsedSeconds($state);
+ $tps = $elapsed > 0 ? round($success / $elapsed, 2) : 0.0;
$progressPercent = $total > 0
? round(($processed / $total) * 100, 2)
- : 0;
+ : 0.0;
return [
'processed' => $processed,
- 'success' => $success,
- 'failed' => $failed,
- 'pending' => $pending,
- 'flood' => $flood,
-
+ 'success' => $success,
+ 'failed' => $failed,
+ 'pending' => $pending,
+ 'flood' => $flood,
'progressPercent' => $progressPercent,
-
'breakdown' => [
- 'sent' => $sent,
+ 'sent' => $sent,
'deleted' => $deleted,
- 'unpin' => $unpin,
+ 'unpin' => $unpin,
+ 'edited' => $edited,
+ 'unchanged' => $unchanged,
+ 'scheduled' => $scheduled,
],
-
- 'done' => (bool)($state['done'] ?? false),
- 'paused' => (bool)($state['paused'] ?? false),
- 'cancel' => (bool)($state['cancel'] ?? false),
- 'startedAt' => $state['startedAt'] ?? null,
+ 'edited' => $edited,
+ 'unchanged' => $unchanged,
+ 'scheduled' => $scheduled,
+ 'selfDestruct' => $state['selfDestruct'],
+ 'type' => $state['type'],
+ 'total' => $total,
+ 'elapsed' => $elapsed,
+ 'tps' => $tps,
+ 'done' => (bool) $state['done'],
+ 'paused' => (bool) $state['paused'],
+ 'cancel' => (bool) $state['cancel'],
+ 'startedAt' => $state['startedAt'],
];
}
/**
- * Last broadcast data
+ * Last broadcast data.
*/
- public function lastBroadcastData(): string|false {
+ public function lastBroadcastData(): string|false
+ {
$dir = self::getDataDir();
if (!is_dir($dir)) {
mkdir($dir, 0777, true);
}
- $path = $dir . "/LastBrodDATA.txt";
+ $path = $dir . '/LastBrodDATA.txt';
- if (!file_exists($path)) {
+ if (!is_file($path)) {
return false;
}
- return \Amp\File\read($path);
+ return (string) file_get_contents($path);
}
/**
- * Filter peers
+ * Filter peers.
+ *
* allowedTypes: all / users / groups / channels
*
- * @return array {
- * targets: array, // filtered peers
- * failed: int, // count of failed
- * total: int, // count filtered peers
- * }
+ * @return array{targets: array, failed: int, total: int}
*/
public function filterPeers(
- array $allUsers,
+ array $allUsers,
string $filterType = 'users'
- ): array {
+ ): array {
+ $allowedTypes = [
+ 'all' => ['user', 'chat', 'supergroup', 'channel'],
+ 'users' => ['user'],
+ 'groups' => ['chat', 'supergroup'],
+ 'channels' => ['channel'],
+ ];
- $api = $this->api;
+ $targets = [];
+ $failedCount = 0;
- $allowedTypes = [
- 'all' => ['user','chat','supergroup','channel'],
- 'users' => ['user'],
- 'groups' => ['chat','supergroup'],
- 'channels' => ['channel'],
- ];
+ foreach ($allUsers as $peer) {
+ try {
+ $info = $this->api->getInfo($peer);
+ $type = $info['type'] ?? 'user';
+
+ if (in_array($type, $allowedTypes[$filterType] ?? ['user'], true)) {
+ $targets[] = (string) $peer;
+ }
+ } catch (Throwable $e) {
+ if ($filterType === 'all') {
+ $targets[] = (string) $peer;
+ } else {
+ $failedCount++;
+ $this->logError('Failed to inspect peer type.', $e, ['peer' => (string) $peer]);
+ }
+ }
+ }
+
+ return [
+ 'targets' => $targets,
+ 'failed' => $failedCount,
+ 'total' => count($targets),
+ ];
+ }
+
+ private function createState(string $id, string $type, int $total, array $extra = []): array
+ {
+ return $extra + [
+ 'id' => $id,
+ 'type' => $type,
+ 'status' => 'running',
+ 'total' => max(0, $total),
+ 'sent' => 0,
+ 'deleted' => 0,
+ 'unpin' => 0,
+ 'edited' => 0,
+ 'unchanged' => 0,
+ 'scheduled' => 0,
+ 'failed' => 0,
+ 'flood' => 0,
+ 'queue' => new SplQueue(),
+ 'inFlight' => [],
+ 'lastMessageIds' => [],
+ 'paused' => false,
+ 'cancel' => false,
+ 'done' => false,
+ 'startedAt' => microtime(true),
+ 'selfDestruct' => null,
+ ];
+ }
+
+ private function registerCurrentState(string $id, array &$state): void
+ {
+ $this->currentBroadcastState[$id] =& $state;
+ self::$sharedBroadcastState[$id] =& $state;
+ }
+
+ private function enqueuePeers(array &$state, array $peers): void
+ {
+ foreach ($peers as $peer) {
+ $state['queue']->enqueue([
+ 'peer' => (string) $peer,
+ 'attempts' => 0,
+ 'startedAt' => null,
+ 'availableAt' => 0.0,
+ ]);
+ }
+ }
+
+ private function clampConcurrency(int $concurrency): int
+ {
+ return max(1, min(self::MAX_CONCURRENCY, $concurrency));
+ }
+
+ private function startQueueWorkers(
+ array &$state,
+ int $concurrency,
+ callable $handler,
+ array $hardFailRpcs = [],
+ ?callable $rpcHandler = null,
+ bool $retryThrowable = false
+ ): void {
+ $concurrency = $this->clampConcurrency($concurrency);
+
+ for ($i = 0; $i < $concurrency; $i++) {
+ \Amp\async(function () use (&$state, $handler, $hardFailRpcs, $rpcHandler, $retryThrowable): void {
+ while (!$state['cancel'] && !$state['done']) {
+ if ($state['queue']->isEmpty()) {
+ $this->api->sleep(0.5);
+ continue;
+ }
+
+ if ($state['paused']) {
+ $this->api->sleep(1);
+ continue;
+ }
+
+ $job = $state['queue']->dequeue();
+
+ if (($job['availableAt'] ?? 0) > microtime(true)) {
+ $state['queue']->enqueue($job);
+ $this->api->sleep(0.5);
+ continue;
+ }
+
+ while ($state['paused'] && !$state['cancel'] && !$state['done']) {
+ $this->api->sleep(1);
+ }
- $targets = [];
- $failedCount = 0;
+ if ($state['cancel'] || $state['done']) {
+ continue;
+ }
+
+ $peer = (string) $job['peer'];
+ $job['startedAt'] = microtime(true);
+ $state['inFlight'][$peer] = $job;
+
+ try {
+ $handler($job, $state);
+ unset($state['inFlight'][$peer]);
+ } catch (RPCErrorException $e) {
+ unset($state['inFlight'][$peer]);
+
+ if ($rpcHandler !== null && $rpcHandler($e, $job, $state)) {
+ continue;
+ }
+
+ if ($this->handleRpcRetry($e, $job, $state, $hardFailRpcs)) {
+ continue;
+ }
+
+ $this->logError('RPC error during broadcast job.', $e, [
+ 'type' => (string) ($state['type'] ?? ''),
+ 'peer' => $peer,
+ ]);
+ $this->retryOrFail($job, $state);
+ } catch (Throwable $e) {
+ unset($state['inFlight'][$peer]);
+ $this->logError('Unexpected error during broadcast job.', $e, [
+ 'type' => (string) ($state['type'] ?? ''),
+ 'peer' => $peer,
+ ]);
+
+ if ($retryThrowable) {
+ $this->retryOrFail($job, $state);
+ } else {
+ $state['failed']++;
+ }
+ }
+
+ $this->api->sleep(0.25);
+ }
+ });
+ }
+ }
+
+ private function waitForCompletion(array &$state): void
+ {
+ while (true) {
+ if ($state['queue']->isEmpty() && empty($state['inFlight'])) {
+ break;
+ }
+
+ if ($state['cancel'] && empty($state['inFlight'])) {
+ break;
+ }
+
+ $this->api->sleep(1);
+ }
+
+ $state['done'] = true;
+ }
+
+ private function handleRpcRetry(RPCErrorException $e, array &$job, array &$state, array $hardFailRpcs): bool
+ {
+ if ($this->isHardFailRpc((string) ($e->rpc ?? ''), $hardFailRpcs)) {
+ $state['failed']++;
+ return true;
+ }
+
+ $floodWait = $this->parseFloodWait($e);
+ if ($floodWait !== null) {
+ $state['flood']++;
+ $job['attempts']++;
+ $job['availableAt'] = microtime(true) + $floodWait;
+ $job['startedAt'] = null;
+
+ if ($job['attempts'] >= self::MAX_ATTEMPTS) {
+ $state['failed']++;
+ } else {
+ $state['queue']->enqueue($job);
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ private function isHardFailRpc(string $rpc, array $hardFailRpcs = self::SEND_HARD_FAIL_RPCS): bool
+ {
+ return in_array($rpc, $hardFailRpcs, true);
+ }
+
+ private function parseFloodWait(RPCErrorException $e): ?int
+ {
+ if (preg_match('/FLOOD_WAIT_(\d+)/', $e->getMessage(), $m)) {
+ return (int) $m[1];
+ }
+
+ return null;
+ }
+
+ private function retryOrFail(array &$job, array &$state): void
+ {
+ $job['attempts']++;
+ $job['startedAt'] = null;
- foreach ($allUsers as $peer) {
+ if ($job['attempts'] >= self::MAX_ATTEMPTS) {
+ $state['failed']++;
+ return;
+ }
+
+ $state['queue']->enqueue($job);
+ }
+
+ private function sendMessagesToPeer(string $peer, array $messages): array
+ {
+ $messageIds = [];
+ $albumMessages = [];
+
+ foreach ($messages as $message) {
+ if (isset($message['albumFile']) && is_file((string) $message['albumFile'])) {
+ $decoded = json_decode((string) file_get_contents((string) $message['albumFile']), true);
+ $albumMessages = is_array($decoded) ? $decoded : [];
+ }
+ }
+
+ if ($albumMessages) {
+ foreach (array_chunk($albumMessages, 10) as $chunk) {
+ $multi = [];
+
+ foreach ($chunk as $item) {
+ $media = ($item['media']['type'] ?? null) === 'photo'
+ ? ['_' => 'inputMediaPhoto', 'id' => $item['media']['botApiFileId']]
+ : ['_' => 'inputMediaDocument', 'id' => $item['media']['botApiFileId']];
+
+ $multi[] = [
+ '_' => 'inputSingleMedia',
+ 'media' => $media,
+ 'message' => $item['caption'] ?? '',
+ 'entities' => $item['entities'] ?? [],
+ ];
+ }
+
+ foreach ($this->api->messages->sendMultiMedia(['peer' => $peer, 'multi_media' => $multi]) as $update) {
+ $messageId = (int) $this->api->extractMessageId($update);
+
+ if ($messageId > 0) {
+ $messageIds[] = $messageId;
+ }
+ }
+ }
+
+ return $messageIds;
+ }
+
+ foreach ($messages as $message) {
+ $method = isset($message['media']) ? 'sendMedia' : 'sendMessage';
+ $payload = $message + [
+ 'peer' => $peer,
+ 'floodWaitLimit' => 172800,
+ ];
+
+ if (isset($message['buttons'])) {
+ $payload['reply_markup'] = $message['buttons'];
+ }
+
+ $result = $this->api->messages->{$method}($payload);
+ $messageId = (int) $this->api->extractMessageId($result);
+
+ if ($messageId > 0) {
+ $messageIds[] = $messageId;
+ }
+ }
+
+ return $messageIds;
+ }
+
+ private function savePeerMessageIds(string $peer, array $messageIds): void
+ {
+ $messageIds = array_values(array_filter(array_map('intval', $messageIds), static fn (int $id): bool => $id > 0));
+
+ if (!$messageIds) {
+ return;
+ }
+
+ $dir = self::getDataDir() . '/' . $peer;
+ $this->ensureDirectory($dir);
+
+ try {
+ file_put_contents($dir . '/messages.txt', implode("\n", $messageIds) . "\n", FILE_APPEND | LOCK_EX);
+ file_put_contents($dir . '/lastBroadcast.txt', (string) end($messageIds), LOCK_EX);
+ } catch (Throwable $e) {
+ $this->logError('Failed to save peer message ids.', $e, ['peer' => $peer]);
+ }
+ }
+
+ private function initializeBroadcastMetadata(string $broadcastId, string $type, int $total, ?int $selfDestructHours): void
+ {
+ $metadata = [
+ 'id' => $broadcastId,
+ 'type' => $type,
+ 'createdAt' => time(),
+ 'status' => 'running',
+ 'total' => $total,
+ 'sent' => 0,
+ 'failed' => 0,
+ 'peers' => [],
+ 'selfDestruct' => [
+ 'enabled' => $selfDestructHours !== null,
+ 'hours' => $selfDestructHours,
+ 'deleteAt' => null,
+ 'deleteJobId' => null,
+ ],
+ ];
+
+ $this->saveBroadcastMetadata($broadcastId, $metadata);
+ }
+
+ private function saveBroadcastPeerMessageIds(
+ string $broadcastId,
+ string $peer,
+ array $messageIds,
+ int $sent,
+ int $failed
+ ): void {
+ $metadata = $this->loadBroadcastMetadata($broadcastId);
+ $messageIds = array_values(array_filter(array_map('intval', $messageIds), static fn (int $id): bool => $id > 0));
+
+ $metadata['sent'] = $sent;
+ $metadata['failed'] = $failed;
+ if (!isset($metadata['peers']) || !is_array($metadata['peers'])) {
+ $metadata['peers'] = [];
+ }
+
+ $metadata['peers'][$peer] = [
+ 'lastMessageId' => $messageIds ? end($messageIds) : null,
+ 'messageIds' => $messageIds,
+ 'status' => 'sent',
+ ];
+
+ $this->saveBroadcastMetadata($broadcastId, $metadata);
+ }
+
+ private function finalizeBroadcastMetadata(string $broadcastId, string $status, int $sent, int $failed): void
+ {
+ $metadata = $this->loadBroadcastMetadata($broadcastId);
+ $metadata['status'] = $status;
+ $metadata['sent'] = $sent;
+ $metadata['failed'] = $failed;
+ $metadata['finishedAt'] = time();
+
+ $this->saveBroadcastMetadata($broadcastId, $metadata);
+ }
+
+ private function saveBroadcastMetadata(string $broadcastId, array $metadata): void
+ {
try {
- $info = $api->getInfo($peer);
- $type = $info['type'] ?? 'user';
- if (in_array($type, $allowedTypes[$filterType] ?? ['user'], true)) {
- $targets[] = (string)$peer;
+ $this->writeJsonFileAtomic($this->broadcastMetadataPath($broadcastId), $metadata);
+ } catch (Throwable $e) {
+ $this->logError('Failed to save broadcast metadata.', $e, ['broadcastId' => $broadcastId]);
+ }
+ }
+
+ private function loadBroadcastMetadata(string $broadcastId): array
+ {
+ return $this->readJsonFile($this->broadcastMetadataPath($broadcastId), [
+ 'id' => $broadcastId,
+ 'type' => 'send',
+ 'createdAt' => time(),
+ 'status' => 'running',
+ 'total' => 0,
+ 'sent' => 0,
+ 'failed' => 0,
+ 'peers' => [],
+ 'selfDestruct' => [
+ 'enabled' => false,
+ 'hours' => null,
+ 'deleteAt' => null,
+ 'deleteJobId' => null,
+ ],
+ ]);
+ }
+
+ private function readJsonFile(string $path, array $default = []): array
+ {
+ if (!is_file($path)) {
+ return $default;
+ }
+
+ try {
+ $content = (string) file_get_contents($path);
+
+ if (trim($content) === '') {
+ return $default;
}
- } catch (\Throwable) {
- if ($filterType === 'all') $targets[] = (string)$peer;
- else $failedCount++;
+
+ $decoded = json_decode($content, true, 512, JSON_THROW_ON_ERROR);
+ return is_array($decoded) ? $decoded : $default;
+ } catch (Throwable $e) {
+ $this->logError('Failed to read JSON file.', $e, ['path' => $path]);
+ return $default;
}
}
- return [
- 'targets' => $targets,
- 'failed' => $failedCount,
- 'total' => count($targets)
- ];
+ private function writeJsonFileAtomic(string $path, array $data): void
+ {
+ $this->ensureDirectory(dirname($path));
+
+ $tmp = $path . '.tmp.' . bin2hex(random_bytes(4));
+
+ try {
+ $json = json_encode($data, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES | JSON_THROW_ON_ERROR);
+
+ if (file_put_contents($tmp, $json . "\n", LOCK_EX) === false) {
+ throw new RuntimeException('Unable to write temporary JSON file.');
+ }
+
+ if (!@rename($tmp, $path)) {
+ if (is_file($path)) {
+ @unlink($path);
+ }
+
+ if (!@rename($tmp, $path)) {
+ throw new RuntimeException('Unable to rename temporary JSON file.');
+ }
+ }
+ } catch (Throwable $e) {
+ if (is_file($tmp)) {
+ @unlink($tmp);
+ }
+
+ $this->logError('Failed to write JSON file atomically.', $e, ['path' => $path]);
+ throw $e;
+ }
+ }
+
+ private function logError(string $message, ?Throwable $e = null, array $context = []): void
+ {
+ try {
+ $this->ensureDirectory(self::getDataDir());
+ $entry = [
+ 'time' => date('c'),
+ 'message' => $message,
+ 'error' => $e ? $e->getMessage() : null,
+ 'rpc' => $e instanceof RPCErrorException ? ($e->rpc ?? null) : null,
+ 'context' => $context,
+ ];
+
+ file_put_contents(
+ self::getDataDir() . '/broadcast-errors.log',
+ json_encode($entry, JSON_UNESCAPED_SLASHES) . "\n",
+ FILE_APPEND | LOCK_EX
+ );
+ } catch (Throwable) {
+ // Logging must never break bot execution.
+ }
+ }
+
+ private function runScheduledBroadcast(string $scheduleId): array
+ {
+ $jobs = $this->loadScheduledBroadcasts();
+
+ if (!isset($jobs[$scheduleId])) {
+ return ['status' => 'missing'];
+ }
+
+ if (($jobs[$scheduleId]['status'] ?? null) !== 'scheduled') {
+ return [
+ 'status' => (string) ($jobs[$scheduleId]['status'] ?? 'unknown'),
+ 'broadcastId' => $jobs[$scheduleId]['broadcastId'] ?? null,
+ ];
+ }
+
+ $jobs[$scheduleId]['status'] = 'running';
+ $jobs[$scheduleId]['startedAt'] = time();
+ $this->saveScheduledBroadcasts($jobs);
+
+ try {
+ $job = $jobs[$scheduleId];
+ $broadcastId = $this->broadcastWithProgress(
+ $job['allUsers'] ?? [],
+ $job['messages'] ?? [],
+ $job['chatId'] ?? null,
+ (bool) ($job['pin'] ?? false),
+ (int) ($job['concurrency'] ?? self::DEFAULT_CONCURRENCY),
+ $job['selfDestructHours'] ?? null
+ );
+
+ $progress = $this->progress($broadcastId);
+ $jobs = $this->loadScheduledBroadcasts();
+ $jobs[$scheduleId]['status'] = ($progress['cancel'] ?? false) ? 'cancelled' : 'done';
+ $jobs[$scheduleId]['broadcastId'] = $broadcastId;
+ $jobs[$scheduleId]['finishedAt'] = time();
+ $jobs[$scheduleId]['error'] = null;
+ $this->saveScheduledBroadcasts($jobs);
+
+ return [
+ 'status' => $jobs[$scheduleId]['status'],
+ 'broadcastId' => $broadcastId,
+ ];
+ } catch (Throwable $e) {
+ $jobs = $this->loadScheduledBroadcasts();
+ $jobs[$scheduleId]['status'] = 'failed';
+ $jobs[$scheduleId]['error'] = substr($e->getMessage(), 0, 500);
+ $jobs[$scheduleId]['failedAt'] = time();
+ $this->saveScheduledBroadcasts($jobs);
+ $this->logError('Scheduled broadcast failed.', $e, ['scheduleId' => $scheduleId]);
+
+ return [
+ 'status' => 'failed',
+ 'error' => $jobs[$scheduleId]['error'],
+ ];
+ }
+ }
+
+ private function createSelfDestructJob(string $broadcastId, int $hours, $chatId, int $concurrency): ?string
+ {
+ $metadata = $this->loadBroadcastMetadata($broadcastId);
+ $peers = array_filter(
+ $metadata['peers'] ?? [],
+ static fn ($peerData): bool => is_array($peerData) && !empty($peerData['messageIds'])
+ );
+
+ if (!$peers) {
+ return null;
+ }
+
+ $jobId = $this->createId('selfdestruct');
+ $deleteAt = time() + ($hours * 3600);
+ $jobs = $this->loadSelfDestructJobs();
+
+ $jobs[$jobId] = [
+ 'id' => $jobId,
+ 'broadcastId' => $broadcastId,
+ 'status' => 'scheduled',
+ 'deleteAt' => $deleteAt,
+ 'createdAt' => time(),
+ 'concurrency' => $this->clampConcurrency($concurrency),
+ 'chatId' => $chatId,
+ 'totalPeers' => count($peers),
+ 'stats' => null,
+ 'error' => null,
+ ];
+
+ $this->saveSelfDestructJobs($jobs);
+
+ $metadata['selfDestruct'] = [
+ 'enabled' => true,
+ 'hours' => $hours,
+ 'deleteAt' => $deleteAt,
+ 'deleteJobId' => $jobId,
+ ];
+ $this->saveBroadcastMetadata($broadcastId, $metadata);
+
+ return $jobId;
+ }
+
+ private function runSelfDestructJob(string $jobId): array
+ {
+ $jobs = $this->loadSelfDestructJobs();
+
+ if (!isset($jobs[$jobId])) {
+ return ['status' => 'missing'];
+ }
+
+ if (($jobs[$jobId]['status'] ?? null) !== 'scheduled') {
+ return [
+ 'status' => (string) ($jobs[$jobId]['status'] ?? 'unknown'),
+ 'stats' => $jobs[$jobId]['stats'] ?? null,
+ ];
+ }
+
+ $jobs[$jobId]['status'] = 'running';
+ $jobs[$jobId]['startedAt'] = time();
+ $this->saveSelfDestructJobs($jobs);
+
+ try {
+ $job = $jobs[$jobId];
+ $metadata = $this->loadBroadcastMetadata((string) $job['broadcastId']);
+ $peerJobs = [];
+
+ foreach (($metadata['peers'] ?? []) as $peer => $peerData) {
+ if (!is_array($peerData)) {
+ continue;
+ }
+
+ $messageIds = array_values(array_filter(
+ array_map('intval', $peerData['messageIds'] ?? []),
+ static fn (int $id): bool => $id > 0
+ ));
+
+ if (!$messageIds) {
+ continue;
+ }
+
+ $peerJobs[] = [
+ 'peer' => (string) $peer,
+ 'messageIds' => $messageIds,
+ 'attempts' => 0,
+ 'startedAt' => null,
+ 'availableAt' => 0.0,
+ ];
+ }
+
+ $state = $this->createState($jobId, 'selfdestruct', count($peerJobs), [
+ 'selfDestruct' => [
+ 'jobId' => $jobId,
+ 'broadcastId' => (string) $job['broadcastId'],
+ 'deleteAt' => (int) $job['deleteAt'],
+ ],
+ ]);
+ $this->registerCurrentState($jobId, $state);
+
+ foreach ($peerJobs as $peerJob) {
+ $state['queue']->enqueue($peerJob);
+ }
+
+ $this->startQueueWorkers(
+ $state,
+ (int) ($job['concurrency'] ?? self::DEFAULT_CONCURRENCY),
+ function (array $peerJob, array &$state): void {
+ $messageIds = array_values(array_filter(
+ array_map('intval', $peerJob['messageIds'] ?? []),
+ static fn (int $id): bool => $id > 0
+ ));
+
+ if (!$messageIds) {
+ return;
+ }
+
+ $this->api->messages->deleteMessages([
+ 'peer' => (string) $peerJob['peer'],
+ 'id' => $messageIds,
+ 'revoke' => true,
+ ]);
+
+ $state['deleted']++;
+ },
+ self::DELETE_HARD_FAIL_RPCS
+ );
+
+ $this->waitForCompletion($state);
+ $state['status'] = $state['cancel'] ? 'cancelled' : 'done';
+
+ $stats = [
+ 'deleted' => (int) $state['deleted'],
+ 'failed' => (int) $state['failed'],
+ 'flood' => (int) $state['flood'],
+ 'total' => (int) $state['total'],
+ ];
+
+ $jobs = $this->loadSelfDestructJobs();
+ $jobs[$jobId]['status'] = $state['cancel'] ? 'cancelled' : 'done';
+ $jobs[$jobId]['finishedAt'] = time();
+ $jobs[$jobId]['stats'] = $stats;
+ $jobs[$jobId]['error'] = null;
+ $this->saveSelfDestructJobs($jobs);
+
+ return [
+ 'status' => $jobs[$jobId]['status'],
+ 'stats' => $stats,
+ ];
+ } catch (Throwable $e) {
+ $jobs = $this->loadSelfDestructJobs();
+ $jobs[$jobId]['status'] = 'failed';
+ $jobs[$jobId]['failedAt'] = time();
+ $jobs[$jobId]['error'] = substr($e->getMessage(), 0, 500);
+ $this->saveSelfDestructJobs($jobs);
+ $this->logError('Self-destruct job failed.', $e, ['jobId' => $jobId]);
+
+ return [
+ 'status' => 'failed',
+ 'error' => $jobs[$jobId]['error'],
+ ];
+ }
+ }
+ private function normalizeBroadcastState(array $state): array
+ {
+ return [
+ 'id' => $state['id'] ?? null,
+ 'type' => $state['type'] ?? null,
+ 'sent' => $state['sent'] ?? 0,
+ 'deleted' => $state['deleted'] ?? 0,
+ 'unpin' => $state['unpin'] ?? 0,
+ 'edited' => $state['edited'] ?? 0,
+ 'unchanged' => $state['unchanged'] ?? 0,
+ 'scheduled' => $state['scheduled'] ?? 0,
+ 'failed' => $state['failed'] ?? 0,
+ 'flood' => $state['flood'] ?? 0,
+ 'total' => $state['total'] ?? null,
+ 'queue' => $state['queue'] ?? null,
+ 'inFlight' => $state['inFlight'] ?? [],
+ 'done' => $state['done'] ?? false,
+ 'paused' => $state['paused'] ?? false,
+ 'cancel' => $state['cancel'] ?? false,
+ 'startedAt' => $state['startedAt'] ?? null,
+ 'selfDestruct' => $state['selfDestruct'] ?? null,
+ ];
+ }
+
+ private function processedCount(array $state): int
+ {
+ return (int) $state['sent']
+ + (int) $state['deleted']
+ + (int) $state['unpin']
+ + (int) $state['edited']
+ + (int) $state['unchanged']
+ + (int) $state['scheduled']
+ + (int) $state['failed'];
}
+ private function pendingCount(array $state, int $processed): int
+ {
+ if (isset($state['total']) && is_int($state['total'])) {
+ return max(0, $state['total'] - $processed);
+ }
+
+ return ($state['queue'] instanceof SplQueue) ? $state['queue']->count() : 0;
+ }
+
+ private function elapsedSeconds(array $state): float
+ {
+ if (!isset($state['startedAt']) || !$state['startedAt']) {
+ return 0.0;
+ }
+
+ return max(0.0, microtime(true) - (float) $state['startedAt']);
+ }
+
+ private function buildProgressText(array $state, string $title, bool $final = false): string
+ {
+ $normalized = $this->normalizeBroadcastState($state);
+ $processed = $this->processedCount($normalized);
+ $total = (int) ($normalized['total'] ?? $processed);
+ $pending = $this->pendingCount($normalized, $processed);
+ $success = (int) $normalized['sent']
+ + (int) $normalized['deleted']
+ + (int) $normalized['unpin']
+ + (int) $normalized['edited']
+ + (int) $normalized['unchanged'];
+ $elapsed = $this->elapsedSeconds($normalized);
+ $tps = $elapsed > 0 ? round($success / $elapsed, 2) : 0.0;
+
+ $lines = [
+ '' . $title . '',
+ '',
+ '' . $this->progressBar($processed, max(1, $total)) . '',
+ '',
+ 'Processed: ' . $processed . ' / ' . $total,
+ ];
+
+ foreach (['sent' => 'Sent', 'edited' => 'Edited', 'unchanged' => 'Unchanged', 'deleted' => 'Deleted', 'unpin' => 'Unpinned'] as $key => $label) {
+ if ((int) $normalized[$key] > 0 || $normalized['type'] === $key || ($key === 'edited' && $normalized['type'] === 'edit')) {
+ $lines[] = $label . ': ' . (int) $normalized[$key];
+ }
+ }
+
+ $lines[] = 'Failed: ' . (int) $normalized['failed'];
+ $lines[] = 'FLOOD_WAIT: ' . (int) $normalized['flood'];
+ $lines[] = 'Pending: ' . $pending;
+ $lines[] = 'TPS: ' . $tps . '/s';
+
+ if ($normalized['paused']) {
+ $lines[] = 'Paused';
+ }
+
+ if ($normalized['cancel']) {
+ $lines[] = 'Cancelled';
+ } elseif ($final) {
+ $lines[] = 'Finished';
+ }
+
+ return implode("\n", $lines);
+ }
+
+ /**
+ * Progress bar.
+ */
+ private function progressBar(int $current, int $total): string
+ {
+ $len = 20;
+ $filled = (int) round($current / max($total, 1) * $len);
+
+ return str_repeat('#', $filled)
+ . str_repeat('-', max(0, $len - $filled))
+ . ' '
+ . round(($current / max($total, 1)) * 100)
+ . '%';
+ }
+
+ private function buildStatusControls(array $state): ?array
+ {
+ if (($state['done'] ?? false) || empty($state['id'])) {
+ return null;
+ }
+
+ $id = (string) $state['id'];
+ $toggleAction = !empty($state['paused']) ? 'resume' : 'pause';
+ $toggleText = !empty($state['paused']) ? 'โถ๏ธ ืืืฉื' : 'โธ ืืฉืืืื';
+
+ return [
+ 'inline_keyboard' => [
+ [
+ ['text' => $toggleText, 'callback_data' => 'bm:' . $toggleAction . ':' . $id],
+ ['text' => '๐ ืืืืื', 'callback_data' => 'bm:cancel:' . $id],
+ ],
+ ],
+ ];
+ }
+
+ private function sendStatusMessage($chatId, string $message, ?array $replyMarkup = null): ?int
+ {
+ if (!$chatId) {
+ return null;
+ }
+
+ try {
+ $payload = [
+ 'peer' => $chatId,
+ 'message' => $message,
+ 'parse_mode' => 'HTML',
+ ];
+
+ if ($replyMarkup !== null) {
+ $payload['reply_markup'] = $replyMarkup;
+ }
+
+ $status = $this->api->messages->sendMessage($payload);
+
+ return (int) $this->api->extractMessageId($status);
+ } catch (Throwable $e) {
+ $this->logError('Failed to send status message.', $e);
+ return null;
+ }
+ }
+
+ private function startProgressLoop($chatId, ?int $statusId, array &$state, string $title): void
+ {
+ if (!$chatId || !$statusId) {
+ return;
+ }
+
+ \Amp\async(function () use ($chatId, $statusId, &$state, $title): void {
+ $last = '';
+ $loggedFailures = 0;
+
+ while (!$state['done']) {
+ $text = $this->buildProgressText($state, $title);
+ $replyMarkup = $this->buildStatusControls($state);
+ $fingerprint = $text . "\n" . json_encode($replyMarkup);
+
+ if ($fingerprint !== $last) {
+ try {
+ $payload = [
+ 'peer' => $chatId,
+ 'id' => $statusId,
+ 'message' => $text,
+ 'parse_mode' => 'HTML',
+ ];
+
+ if ($replyMarkup !== null) {
+ $payload['reply_markup'] = $replyMarkup;
+ }
+
+ $this->api->messages->editMessage($payload);
+ $last = $fingerprint;
+ } catch (Throwable $e) {
+ if ($loggedFailures < 3) {
+ $loggedFailures++;
+ $this->logError('Failed to update status message.', $e);
+ }
+ }
+ }
+
+ $this->api->sleep(1);
+ }
+ });
+ }
+
+ private function editStatusMessage($chatId, ?int $statusId, string $text, ?array $replyMarkup = null): void
+ {
+ if (!$chatId || !$statusId) {
+ return;
+ }
+
+ try {
+ $payload = [
+ 'peer' => $chatId,
+ 'id' => $statusId,
+ 'message' => $text,
+ 'parse_mode' => 'HTML',
+ ];
+
+ if ($replyMarkup !== null) {
+ $payload['reply_markup'] = $replyMarkup;
+ }
+
+ $this->api->messages->editMessage($payload);
+ } catch (Throwable $e) {
+ $this->logError('Failed to edit final status message.', $e);
+ }
+ }
+
+ private function writeLastBroadcastData(string $text): void
+ {
+ try {
+ $this->ensureDirectory(self::getDataDir());
+ file_put_contents(self::getDataDir() . '/LastBrodDATA.txt', $text, LOCK_EX);
+ } catch (Throwable $e) {
+ $this->logError('Failed to save last broadcast data.', $e);
+ }
+ }
+
+ private function readLastBroadcastMessageId(string $peer): int
+ {
+ $file = $this->peerDataPath($peer, 'lastBroadcast.txt');
+
+ if (!is_file($file)) {
+ return 0;
+ }
+
+ return (int) trim((string) file_get_contents($file));
+ }
+
+ private function readBroadcastLastMessageId(string $broadcastId, string $peer): int
+ {
+ $metadata = $this->loadBroadcastMetadata($broadcastId);
+ $peerData = $metadata['peers'][$peer] ?? null;
+
+ if (!is_array($peerData)) {
+ return 0;
+ }
+
+ $lastMessageId = (int) ($peerData['lastMessageId'] ?? 0);
+ if ($lastMessageId > 0) {
+ return $lastMessageId;
+ }
+
+ $messageIds = array_values(array_filter(
+ array_map('intval', $peerData['messageIds'] ?? []),
+ static fn (int $id): bool => $id > 0
+ ));
+
+ return $messageIds ? (int) end($messageIds) : 0;
+ }
+
+ private function broadcastMetadataPeers(string $broadcastId): array
+ {
+ $metadata = $this->loadBroadcastMetadata($broadcastId);
+
+ if (!isset($metadata['peers']) || !is_array($metadata['peers'])) {
+ return [];
+ }
+
+ return array_values(array_map('strval', array_keys($metadata['peers'])));
+ }
+
+ private function markBroadcastPeerMessageDeleted(string $broadcastId, string $peer, int $messageId): void
+ {
+ $metadata = $this->loadBroadcastMetadata($broadcastId);
+ $peerData = $metadata['peers'][$peer] ?? null;
+
+ if (!is_array($peerData)) {
+ return;
+ }
+
+ $messageIds = array_values(array_filter(
+ array_map('intval', $peerData['messageIds'] ?? []),
+ static fn (int $id): bool => $id > 0 && $id !== $messageId
+ ));
+
+ $metadata['peers'][$peer]['messageIds'] = $messageIds;
+ $metadata['peers'][$peer]['lastMessageId'] = $messageIds ? end($messageIds) : null;
+ $metadata['peers'][$peer]['status'] = $messageIds ? 'partial' : 'deleted';
+
+ $this->saveBroadcastMetadata($broadcastId, $metadata);
+ }
+
+ private function peerDataPath(string $peer, string $file): string
+ {
+ return self::getDataDir() . '/' . $peer . '/' . $file;
+ }
+
+ private function broadcastMetadataPath(string $broadcastId): string
+ {
+ return self::getDataDir() . '/broadcasts/' . $broadcastId . '.json';
+ }
+
+ private function scheduledBroadcastsPath(): string
+ {
+ return self::getDataDir() . '/scheduled-broadcasts.json';
+ }
+
+ private function selfDestructJobsPath(): string
+ {
+ return self::getDataDir() . '/self-destruct-jobs.json';
+ }
+
+ private function loadScheduledBroadcasts(): array
+ {
+ return $this->readJsonFile($this->scheduledBroadcastsPath(), []);
+ }
+
+ private function saveScheduledBroadcasts(array $jobs): void
+ {
+ $this->writeJsonFileAtomic($this->scheduledBroadcastsPath(), $jobs);
+ }
+
+ private function loadSelfDestructJobs(): array
+ {
+ return $this->readJsonFile($this->selfDestructJobsPath(), []);
+ }
+
+ private function saveSelfDestructJobs(array $jobs): void
+ {
+ $this->writeJsonFileAtomic($this->selfDestructJobsPath(), $jobs);
+ }
+
+ private function validateSelfDestructHours(?int $hours): void
+ {
+ if ($hours === null) {
+ return;
+ }
+
+ if ($hours < 0 || $hours > 48) {
+ throw new InvalidArgumentException('selfDestructHours must be null or an integer between 0 and 48.');
+ }
+ }
+
+ private function normalizeOptionalId(?string $id): ?string
+ {
+ if ($id === null) {
+ return null;
+ }
+
+ $id = trim($id);
+
+ return $id === '' ? null : $id;
+ }
+
+ private function assertJsonEncodable(mixed $value, string $name): void
+ {
+ try {
+ json_encode($value, JSON_THROW_ON_ERROR);
+ } catch (JsonException $e) {
+ throw new InvalidArgumentException($name . ' must be JSON encodable: ' . $e->getMessage(), 0, $e);
+ }
+ }
+
+ private function createId(string $prefix = ''): string
+ {
+ $id = bin2hex(random_bytes(8));
+
+ return $prefix === '' ? $id : $prefix . '_' . $id;
+ }
+
+ private function ensureDirectory(string $dir): void
+ {
+ if (is_dir($dir)) {
+ return;
+ }
+
+ if (!mkdir($dir, 0777, true) && !is_dir($dir)) {
+ throw new RuntimeException('Unable to create directory: ' . $dir);
+ }
+ }
+
+ private function deleteFile(string $file): void
+ {
+ try {
+ if (is_file($file)) {
+ @unlink($file);
+ }
+ } catch (Throwable $e) {
+ $this->logError('Failed to delete file.', $e, ['file' => $file]);
+ }
+ }
}