PocketMine-MP  1.4 - API 1.10.0
 All Classes Namespaces Functions Variables Pages
ServerScheduler.php
1 <?php
2 
3 /*
4  *
5  * ____ _ _ __ __ _ __ __ ____
6  * | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
7  * | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
8  * | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
9  * |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
10  *
11  * This program is free software: you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser General Public License as published by
13  * the Free Software Foundation, either version 3 of the License, or
14  * (at your option) any later version.
15  *
16  * @author PocketMine Team
17 
18  *
19  *
20 */
21 
25 namespace pocketmine\scheduler;
26 
32 
34  public static $WORKERS = 4;
38  protected $queue;
39 
43  protected $tasks = [];
44 
46  protected $asyncPool;
47 
49  protected $asyncTaskStorage = [];
50 
51  protected $asyncTasks = 0;
52 
54  private $ids = 1;
55 
57  protected $currentTick = 0;
58 
59  public function __construct(){
60  $this->queue = new ReversePriorityQueue();
61  $this->asyncPool = new \Pool(self::$WORKERS, AsyncWorker::class);
62  }
63 
69  public function scheduleTask(Task $task){
70  return $this->addTask($task, -1, -1);
71  }
72 
80  public function scheduleAsyncTask(AsyncTask $task){
81  $id = $this->nextId();
82  $task->setTaskId($id);
83  $this->asyncPool->submit($task);
84  $this->asyncTaskStorage[$task->getTaskId()] = $task;
85  ++$this->asyncTasks;
86  }
87 
94  public function scheduleDelayedTask(Task $task, $delay){
95  return $this->addTask($task, (int) $delay, -1);
96  }
97 
104  public function scheduleRepeatingTask(Task $task, $period){
105  return $this->addTask($task, -1, (int) $period);
106  }
107 
115  public function scheduleDelayedRepeatingTask(Task $task, $delay, $period){
116  return $this->addTask($task, (int) $delay, (int) $period);
117  }
118 
122  public function cancelTask($taskId){
123  if($taskId !== null and isset($this->tasks[$taskId])){
124  $this->tasks[$taskId]->cancel();
125  unset($this->tasks[$taskId]);
126  }
127  }
128 
132  public function cancelTasks(Plugin $plugin){
133  foreach($this->tasks as $taskId => $task){
134  $ptask = $task->getTask();
135  if($ptask instanceof PluginTask and $ptask->getOwner() === $plugin){
136  $task->cancel();
137  unset($this->tasks[$taskId]);
138  }
139  }
140  }
141 
142  public function cancelAllTasks(){
143  foreach($this->tasks as $task){
144  $task->cancel();
145  }
146  $this->tasks = [];
147  $this->asyncTaskStorage = [];
148  //$this->asyncPool->shutdown();
149  $this->asyncTasks = 0;
150  $this->queue = new ReversePriorityQueue();
151  $this->asyncPool = new \Pool(self::$WORKERS, AsyncWorker::class);
152  }
153 
159  public function isQueued($taskId){
160  return isset($this->tasks[$taskId]);
161  }
162 
172  private function addTask(Task $task, $delay, $period){
173  if($task instanceof PluginTask){
174  if(!($task->getOwner() instanceof Plugin)){
175  throw new PluginException("Invalid owner of PluginTask " . get_class($task));
176  }elseif(!$task->getOwner()->isEnabled()){
177  throw new PluginException("Plugin '" . $task->getOwner()->getName() . "' attempted to register a task while disabled");
178  }
179  }
180 
181  if($delay <= 0){
182  $delay = -1;
183  }
184 
185  if($period <= -1){
186  $period = -1;
187  }elseif($period < 1){
188  $period = 1;
189  }
190 
191  if($task instanceof CallbackTask){
192  $callable = $task->getCallable();
193  if(is_array($callable)){
194  if(is_object($callable[0])){
195  $taskName = "Callback#" . get_class($callable[0]) . "::" . $callable[1];
196  }else{
197  $taskName = "Callback#" . $callable[0] . "::" . $callable[1];
198  }
199  }else{
200  $taskName = "Callback#" . $callable;
201  }
202  }else{
203  $taskName = get_class($task);
204  }
205 
206  return $this->handle(new TaskHandler($taskName, $task, $this->nextId(), $delay, $period));
207  }
208 
209  private function handle(TaskHandler $handler){
210  if($handler->isDelayed()){
211  $nextRun = $this->currentTick + $handler->getDelay();
212  }else{
213  $nextRun = $this->currentTick;
214  }
215 
216  $handler->setNextRun($nextRun);
217  $this->tasks[$handler->getTaskId()] = $handler;
218  $this->queue->insert($handler, $nextRun);
219 
220  return $handler;
221  }
222 
226  public function mainThreadHeartbeat($currentTick){
227  $this->currentTick = $currentTick;
228  while($this->isReady($this->currentTick)){
230  $task = $this->queue->extract();
231  if($task->isCancelled()){
232  unset($this->tasks[$task->getTaskId()]);
233  continue;
234  }else{
235  $task->timings->startTiming();
236  try{
237  $task->run($this->currentTick);
238  }catch(\Exception $e){
239  Server::getInstance()->getLogger()->critical("Could not execute task " . $task->getTaskName() . ": " . $e->getMessage());
240  if(($logger = Server::getInstance()->getLogger()) instanceof MainLogger){
241  $logger->logException($e);
242  }
243  }
244  $task->timings->stopTiming();
245  }
246  if($task->isRepeating()){
247  $task->setNextRun($this->currentTick + $task->getPeriod());
248  $this->queue->insert($task, $this->currentTick + $task->getPeriod());
249  }else{
250  $task->remove();
251  unset($this->tasks[$task->getTaskId()]);
252  }
253  }
254 
255  if($this->asyncTasks > 0){ //Garbage collector
256  $this->asyncPool->collect([$this, "collectAsyncTask"]);
257 
258  if($this->asyncTasks > 0){
259  foreach($this->asyncTaskStorage as $asyncTask){
260  $this->collectAsyncTask($asyncTask);
261  }
262  }
263  }
264  }
265 
266  public function collectAsyncTask(AsyncTask $task){
267  if($task->isFinished() and !$task->isGarbage()){
268  --$this->asyncTasks;
269  $task->onCompletion(Server::getInstance());
270  $task->setGarbage();
271  unset($this->asyncTaskStorage[$task->getTaskId()]);
272  }
273 
274  return $task->isGarbage();
275  }
276 
277  private function isReady($currentTicks){
278  return count($this->tasks) > 0 and $this->queue->current()->getNextRun() <= $currentTicks;
279  }
280 
284  private function nextId(){
285  return $this->ids++;
286  }
287 
288 }
scheduleRepeatingTask(Task $task, $period)
static getInstance()
Definition: Server.php:1444
scheduleDelayedRepeatingTask(Task $task, $delay, $period)