Source code for ggcq.ggcq

# -*- coding: utf-8 -*-

'''

   Copyright 2015 The pyggcq Developers

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.

'''

import logging

import simpy

logger = logging.getLogger(__name__)


[docs]class GGCQServiceTimeStopIteration(RuntimeError): """ Exception thrown if service time generator is exhausted Should not base StopIteration as simpy uses that exception to signify the end of the process! """ pass
[docs]class GGCQServiceTimeTypeError(TypeError): """ Exception thrown if service time is of wrong type """ pass
[docs]class GGCQArrivalTimeTypeError(TypeError): """ Exception thrown if arrival time is of wrong type """ pass
[docs]class GGCQNegativeArrivalTimeError(ValueError): """ Exception thrown if arrival time is negative """ pass
[docs]class GGCQNegativeServiceTimeError(ValueError): """ Exception thrown if service time is negative """ pass
[docs]class GGCQ(object): def __init__( self, arrival_time_generator, service_time_generator, capacity=1 ): # initialize logging self._logger = logging.getLogger( '{}.{}'.format(__name__, self.__class__.__name__) ) self._logger.debug('init') self._arrival_time_generator = arrival_time_generator self._service_time_generator = service_time_generator self._capacity = capacity self._env = simpy.Environment() self._observer = RawDataObserver() self._queue = Queue( env=self._env, service_time_generator=service_time_generator, observer=self._observer, capacity=capacity, ) self._source = Source( env=self._env, queue=self._queue, arrival_time_generator=arrival_time_generator, observer=self._observer, ) # start arrival process self._logger.info('start arrival process') self._env.process(self._source.generate())
[docs] def run(self, until=None): self._env.run(until=until)
@property
[docs] def now(self): return self._env.now
[docs]class RawDataObserver(object): JOB_COLUMNS = ['arrival_epoch', 'service_epoch', 'departure_epoch'] def __init__(self): self._jobs = dict()
[docs] def notify_arrival(self, time, job_id): self._jobs[job_id] = [time] + 2 * [None]
[docs] def notify_service(self, time, job_id): self._jobs[job_id][1] = time
[docs] def notify_departure(self, time, job_id): self._jobs[job_id][2] = time
@property
[docs] def jobs(self): return self._jobs
[docs]class Queue(object): def __init__(self, env, service_time_generator, observer, capacity=1): # initialize logging self._logger = logging.getLogger( '{}.{}'.format(__name__, self.__class__.__name__) ) self._logger.debug('init') self._env = env self._capacity = capacity self._queue = simpy.Resource(env, capacity=self._capacity) self._service_time_generator = service_time_generator self._observer = observer
[docs] def process(self, job_id): """ Process a job by the queue """ self._logger.info( '{:.2f}: Process job {}'.format(self._env.now, job_id) ) # log time of commencement of service self._observer.notify_service(time=self._env.now, job_id=job_id) # draw a new service time try: service_time = next(self._service_time_generator) except StopIteration: # ERROR: no more service times error_msg = ('Service time generator exhausted') self._logger.error(error_msg) # raise a different exception, as simpy uses StopIteration to # signify end of process (generator) raise GGCQServiceTimeStopIteration(error_msg) # wait for the service time to pass try: self._logger.debug('Service time: {:.2f}'.format(service_time)) except: pass try: yield self._env.timeout(service_time) except TypeError: # error: service time of wrong type error_msg = ( "service time '{}' has wrong type '{}'".format( service_time, type(service_time).__name__ ) ) self._logger.error(error_msg) # trigger exception raise GGCQServiceTimeTypeError(error_msg) except ValueError as exc: if str(exc).startswith('Negative delay'): # error: negative service time error_msg = ( "negative service time {:.2f}".format( service_time ) ) self._logger.error(error_msg) # trigger exception raise GGCQNegativeServiceTimeError(error_msg) else: raise # job finished processing -> departing self._logger.info( '{:.2f}: Finished processing job {}'.format(self._env.now, job_id) ) # log departure epoch self._observer.notify_departure(time=self._env.now, job_id=job_id)
@property
[docs] def queue(self): return self._queue
[docs]class Source(object): def __init__(self, env, queue, arrival_time_generator, observer): # initialize logging self._logger = logging.getLogger( '{}.{}'.format(__name__, self.__class__.__name__) ) self._logger.debug('init') self._env = env self._queue = queue self._arrival_time_generator = arrival_time_generator self._job_id = 0 self._observer = observer
[docs] def generate(self): """ Source generates jobs according to the interarrival time distribution """ inter_arrival_time = 0.0 while True: # wait for next job to arrive try: yield self._env.timeout(inter_arrival_time) except TypeError: # error: arrival time of wrong type error_msg = ( "arrival time '{}' has wrong type '{}'".format( inter_arrival_time, type(inter_arrival_time).__name__ ) ) self._logger.error(error_msg) # trigger exception raise GGCQArrivalTimeTypeError(error_msg) except ValueError as exc: if str(exc).startswith('Negative delay'): # error: negative arrival time error_msg = ( "negative arrival time {:.2f}".format( inter_arrival_time ) ) self._logger.error(error_msg) # trigger exception raise GGCQNegativeArrivalTimeError(error_msg) else: raise # job has arrived job_id = self._job_id self._observer.notify_arrival(time=self._env.now, job_id=job_id) # get job process job = self._job_generator(job_id) # submit job to queue self._env.process(job) # time for the next job to arrive try: inter_arrival_time = next(self._arrival_time_generator) self._job_id += 1 except StopIteration: # no more jobs to arrive -- exit process self._env.exit()
def _job_generator(self, job_id): with self._queue.queue.request() as request: # wait for queue to become idle yield request # process job yield self._env.process(self._queue.process(job_id))