7. First Steps with SiditaΒΆ
First we will implement our task queue in the file TestTaskQueue.py.
# import sidita.Logging
# logger = sidita.Logging.setup_logging('sidita')
# import random
from sidita import TaskQueue
class TestTaskQueue(TaskQueue):
    # _logger = logger.getChild('TestTaskQueue')
    # def __init__(self, *args, **kwargs):
    #    super().__init__(*args, **kwargs)
    async def task_producer(self):
        """Custom coroutine method to sumbit tasks"""
        # this method is a coroutine, cf. async def and await
        # it runs within an asyncio event loop
        for i in range(10):
            # self._logger.info('Producing {}/{}'.format(i, N))
            # simulate workload
            # await asyncio.sleep(random.random())
            # We can submit any pickable object
            task = {
                'action': 'run',
                'payload': 'message {}'.format(i),
            }
            await self.submit(task) # we await on asyncio queue.put
        # stop workers
        await self.send_stop() # in fact submit None * number_of_workers
    # Custom signal handlers
    def on_task_submitted(self, task_metadata):
        super().on_task_submitted(task_metadata)
    def on_task_sent(self, task_metadata):
        super().on_task_sent(task_metadata)
    def on_result(self, task_metadata):
        super().on_result(task_metadata)
    def on_timeout_error(self, task_metadata):
        pass
    def on_stream_error(self, task_metadata):
        # worker likely crashed
        pass
Then we implement our worker in the file TestWorker.py
# import sidita.Logging
# logger = sidita.Logging.setup_logging('sidita-worker')
# import numpy as np
# import logging
# import random
import sys
# import time
from sidita import Worker
class TestWorker(Worker):
    # _logger = logger.getChild('Worker')
    # def __init__(self, worker_id):
    #     super().__init__(worker_id)
    #     self._pool = []
    def on_task(self, task):
        """Custom method to handle task."""
        # simulate workload
        # time.sleep(random.random()/1000)
        # time.sleep(random.random()*10)
        # simulate crash
        # if random.random() < .1:
        #     1/0
        # simulate memory load
        # self._pool.append(np.ones(1024*100))
        # Result can be any pickable object
        return {
            'status': 'completed',
            'payload': task['payload'],
        }
Finally we will write the code to run our task queue.
from datetime import timedelta
from pathlib import Path
from sidita.Units import u_MB
from TestTaskQueue import TestTaskQueue
task_queue = TestTaskQueue(
    # Set the worker implementation : Python Path / worker_module . worker_cls
    python_path=Path(__file__).resolve().parent, # optional path to find our worker_module
    worker_module='TestWorker',
    worker_cls='TestWorker',
    max_queue_size=100, # to limit memory pressure, producer will be blocked when the queue is full
    # Worker sanity check
    max_memory=100@u_MB, # bytes
    memory_check_interval=timedelta(seconds=5),
    task_timeout=timedelta(seconds=1),
)
task_queue.run() # start asyncio event loop until all tasks are done