-
Notifications
You must be signed in to change notification settings - Fork 7
AER-3864 Improved handling changes in number of workers available #104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
To improve the handling of changes in number of workers running the following improvements are made: 1) Changed the event handling of changes in number of workers triggered by RabbitMQ channel event queue watchter. Instead of directly updating the number of workers it will trigger the RabbitMQ api call. The api will give the actual number of workers available. Because the event is triggered for every worker added/removed a timer is used to delay the api call. When a new event is given within 15 seconds, the timer is reset. The actual recurring api call is included in the timer process, so it will not run also when the timer is running from an event or the other way around. This time mechanism assures an update id done within 15 seconds after a change. This seems appropriate. (The alternative is to just do the call each 15 seconds, and remove the whole event triggering). 2) The changes in number of workers is also propagated to the priority scheduler so it can also check it's internal state. 3) The workerFinishedHandlers are guarded from RuntimeExceptions, which could result in not all handlers to be called when an exception happens in one of the handlers. 4) In workerpool to check if a freeworker should be released when a task is finished it doesn't include the number of freeworkers. Because if no change in number of workers this test would always test on equals size(configured == free + running). Unless the number of free workers is just a bit out of sync due to concurrency. While number of running workers is always in sync because when called it's when a task is finished. So if all workers were running it would valid the == condition. 5) To free the workers when the number of workers is reduced no waiting acquire is used. It could block if the data isn't completely in sync. By removing the potential block this is avoided. Even when this would mean less workers are marked as free than are actually free this would recover from when running tasks a finishing and than won't release a free worker, this will than recover itself to the correct condition. It's unclear if it every actually blocked. But just removing the block makes the code more robust. 6) The timeout values on unit tests have been fixed. The default unit is seconds. Therefore all timeout that had no unit actually had a timeout that was 1000 time seconds. The annotations have been fixed and have a explicit time unit added.
Member
BertScholten
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just got some questions
source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java
Outdated
Show resolved
Hide resolved
source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java
Outdated
Show resolved
Hide resolved
source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java
Outdated
Show resolved
Hide resolved
...nager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java
Outdated
Show resolved
Hide resolved
BertScholten
approved these changes
Jul 22, 2025
Member
BertScholten
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
To improve the handling of changes in number of workers running the following improvements are made:
Changed the event handling of changes in number of workers triggered by RabbitMQ channel event queue watchter.
Instead of directly updating the number of workers it will trigger the RabbitMQ api call.
The api will give the actual number of workers available.
Because the event is triggered for every worker added/removed a timer is used to delay the api call.
When a new event is given within 15 seconds, the timer is reset.
The actual recurring api call is included in the timer process, so it will not run also when the timer is running from an event or the other way around.
This time mechanism assures an update id done within 15 seconds after a change.
This seems appropriate. (The alternative is to just do the call each 15 seconds, and remove the whole event triggering).
The changes in number of workers is also propagated to the priority scheduler so it can also check it's internal state.
The workerFinishedHandlers are guarded from RuntimeExceptions, which could result in not all handlers to be called when an exception happens in one of the handlers.
In workerpool to check if a freeworker should be released when a task is finished it doesn't include the number of freeworkers.
Because if no change in number of workers this test would always test on equals size(configured == free + running).
Unless the number of free workers is just a bit out of sync due to concurrency.
While number of running workers is always in sync because when called it's when a task is finished. So if all workers were running it would valid the == condition.
To free the workers when the number of workers is reduced no waiting acquire is used.
It could block if the data isn't completely in sync. By removing the potential block this is avoided.
Even when this would mean less workers are marked as free than are actually free this
would recover from when running tasks a finishing and than won't release a free worker,
this will than recover itself to the correct condition.
It's unclear if it every actually blocked. But just removing the block makes the code more robust.
The timeout values on unit tests have been fixed. The default unit is seconds. Therefore all timeout that had no unit actually had a timeout that was 1000 time seconds. The annotations have been fixed and have a explicit time unit added.