Skip to content
Open
14 changes: 14 additions & 0 deletions examples/bin/query
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env ruby
require_relative '../init'

Dir[File.expand_path('../workflows/*.rb', __dir__)].each { |f| require f }

workflow_class_name, workflow_id, run_id, query, args = ARGV
workflow_class = Object.const_get(workflow_class_name)

if ![workflow_class, workflow_id, run_id, query].all?
fail 'Wrong arguments, use `bin/query WORKFLOW WORKFLOW_ID RUN_ID QUERY [ARGS]`'
end

result = Cadence.query_workflow(workflow_class, query, workflow_id, run_id, args)
puts result.inspect
1 change: 1 addition & 0 deletions examples/bin/worker
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ worker.register_workflow(SideEffectWorkflow)
worker.register_workflow(SimpleTimerWorkflow)
worker.register_workflow(TimeoutWorkflow)
worker.register_workflow(TripBookingWorkflow)
worker.register_workflow(QueryWorkflow)

worker.register_activity(AsyncActivity)
worker.register_activity(EchoActivity)
Expand Down
55 changes: 55 additions & 0 deletions examples/spec/integration/query_workflow_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
require 'workflows/query_workflow'
require 'cadence/errors'


describe QueryWorkflow, :integration do
subject { described_class }

it 'returns the correct result for the queries' do
workflow_id, run_id = run_workflow(described_class)

# Query with nil workflow class
expect(Cadence.query_workflow(nil, 'state', workflow_id, run_id))
.to eq 'started'

# Query with arbitrary args
expect(Cadence.query_workflow(described_class, 'state', workflow_id, run_id,
'upcase', 'ignored', 'reverse'))
.to eq 'DETRATS'

# Query with no args
expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id))
.to eq 0

# Query with unregistered handler
expect { Cadence.query_workflow(described_class, 'unknown_query', workflow_id, run_id) }
.to raise_error(Cadence::QueryFailed, 'Workflow did not register a handler for unknown_query')

Cadence.signal_workflow(described_class, 'make_progress', workflow_id, run_id)

# Query for updated signal_count with an unsatisfied reject condition
expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id, query_reject_condition: :not_open))
.to eq 1

Cadence.signal_workflow(described_class, 'finish', workflow_id, run_id)
wait_for_workflow_completion(workflow_id, run_id)

# Repeating original query scenarios above, expecting updated state and signal results
expect(Cadence.query_workflow(nil, 'state', workflow_id, run_id))
.to eq 'finished'

expect(Cadence.query_workflow(described_class, 'state', workflow_id, run_id,
'upcase', 'ignored', 'reverse'))
.to eq 'DEHSINIF'

expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id))
.to eq 2

expect { Cadence.query_workflow(described_class, 'unknown_query', workflow_id, run_id) }
.to raise_error(Cadence::QueryFailed, 'Workflow did not register a handler for unknown_query')

# Now that the workflow is completed, test a query with a reject condition satisfied
expect { Cadence.query_workflow(described_class, 'state', workflow_id, run_id, query_reject_condition: :not_open) }
.to raise_error(Cadence::QueryFailed, 'Query rejected: status WORKFLOW_EXECUTION_STATUS_COMPLETED')
end
end
36 changes: 36 additions & 0 deletions examples/workflows/query_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
class QueryWorkflow < Cadence::Workflow
attr_reader :state, :signal_count, :last_signal_received

def execute
@state = "started"
@signal_count = 0
@last_signal_received = nil

workflow.on_query("state") { |*args| apply_transforms(state, args) }
workflow.on_query("signal_count") { signal_count }

workflow.on_signal do |signal|
@signal_count += 1
@last_signal_received = signal
end

workflow.wait_for { last_signal_received == "finish" }
@state = "finished"

{
signal_count: signal_count,
last_signal_received: last_signal_received,
final_state: state
}
end

private

def apply_transforms(value, transforms)
return value if value.nil? || transforms.empty?
transforms.inject(value) do |memo, input|
next memo unless memo.respond_to?(input)
memo.public_send(input)
end
end
end
1 change: 1 addition & 0 deletions lib/cadence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Cadence
:schedule_workflow,
:register_domain,
:signal_workflow,
:query_workflow,
:reset_workflow,
:terminate_workflow,
:fetch_workflow_execution_info,
Expand Down
12 changes: 12 additions & 0 deletions lib/cadence/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil)
)
end

def query_workflow(workflow, query, workflow_id, run_id, *args, domain: nil, query_reject_condition: nil)
execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options)
connection.query_workflow(
domain: domain || execution_options.domain,
workflow_id: workflow_id,
run_id: run_id,
query: query,
args: args,
query_reject_condition: query_reject_condition
)
end

def reset_workflow(domain, workflow_id, run_id, strategy: nil, decision_task_id: nil, reason: 'manual reset')
# Pick default strategy for backwards-compatibility
strategy ||= :last_decision_task unless decision_task_id
Expand Down
75 changes: 62 additions & 13 deletions lib/cadence/connection/thrift.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@ class Thrift
reject: CadenceThrift::WorkflowIdReusePolicy::RejectDuplicate
}.freeze

QUERY_REJECT_CONDITION = {
# none: CadenceThrift::QueryRejectCondition::NONE,
not_open: CadenceThrift::QueryRejectCondition::NOT_OPEN,
not_completed_cleanly: CadenceThrift::QueryRejectCondition::NOT_COMPLETED_CLEANLY
}.freeze

DEFAULT_OPTIONS = {
polling_ttl: 60, # 1 minute
max_page_size: 100
}.freeze

HISTORY_EVENT_FILTER = {
all: CadenceThrift::HistoryEventFilterType::ALL_EVENT,
close: CadenceThrift::HistoryEventFilterType::CLOSE_EVENT,
}.freeze
all: CadenceThrift::HistoryEventFilterType::ALL_EVENT,
close: CadenceThrift::HistoryEventFilterType::CLOSE_EVENT,
}.freeze

def initialize(host, port, identity, options = {})
@url = "http://#{host}:#{port}"
Expand Down Expand Up @@ -142,11 +148,12 @@ def poll_for_decision_task(domain:, task_list:)
send_request('PollForDecisionTask', request)
end

def respond_decision_task_completed(task_token:, decisions:)
def respond_decision_task_completed(task_token:, decisions:, query_results: {})
request = CadenceThrift::RespondDecisionTaskCompletedRequest.new(
identity: identity,
taskToken: task_token,
decisions: Array(decisions)
decisions: Array(decisions),
queryResults: query_results.transform_values { |value| Serializer.serialize(value) }
)
send_request('RespondDecisionTaskCompleted', request)
end
Expand Down Expand Up @@ -337,16 +344,58 @@ def get_search_attributes
raise NotImplementedError
end

def respond_query_task_completed
raise NotImplementedError
def respond_query_task_completed(task_token:, query_result:)
query_result_thrift = Serializer.serialize(query_result)
request = CadenceThrift::RespondQueryTaskCompletedRequest.new(
taskToken: task_token,
completedType: query_result_thrift.result_type,
queryResult: query_result_thrift.answer,
errorMessage: query_result_thrift.error_message,
)

client.respond_query_task_completed(request)
end

def reset_sticky_task_list
raise NotImplementedError
end

def query_workflow
raise NotImplementedError
def query_workflow(domain:, workflow_id:, run_id:, query:, args: nil, query_reject_condition: nil)
request = CadenceThrift::QueryWorkflowRequest.new(
domain: domain,
execution: CadenceThrift::WorkflowExecution.new(
workflowId: workflow_id,
runId: run_id
),
query: CadenceThrift::WorkflowQuery.new(
queryType: query,
queryArgs: JSON.serialize(args)
)
)
if query_reject_condition
condition = QUERY_REJECT_CONDITION[query_reject_condition]
raise Client::ArgumentError, 'Unknown query_reject_condition specified' unless condition

request.query_reject_condition = condition
end

begin
response = client.query_workflow(request)
puts(response)
# rescue InvalidArgument => e doesn't seem to work
#
rescue Error => e
raise Cadence::QueryFailed, e.details
end

if response.query_rejected
rejection_status = response.query_rejected.status || 'not specified by server'
raise Cadence::QueryFailed, "Query rejected: status #{rejection_status}"
elsif !response.query_result
raise Cadence::QueryFailed, 'Invalid response from server'
else
JSON.deserialize(response.query_result)
end
end

def describe_workflow_execution(domain:, workflow_id:, run_id:)
Expand Down Expand Up @@ -389,9 +438,9 @@ def transport

def connection
@connection ||= begin
protocol = ::Thrift::BinaryProtocol.new(transport)
CadenceThrift::WorkflowService::Client.new(protocol)
end
protocol = ::Thrift::BinaryProtocol.new(transport)
CadenceThrift::WorkflowService::Client.new(protocol)
end
end

def send_request(name, request)
Expand Down Expand Up @@ -435,4 +484,4 @@ def serialize_status_filter(value)
end
end
end
end
end
5 changes: 5 additions & 0 deletions lib/cadence/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ class TimeoutError < ClientError; end
# A superclass for activity exceptions raised explicitly
# with the itent to propagate to a workflow
class ActivityException < ClientError; end

class ApiError < Error; end

class QueryFailed < ApiError; end

end
9 changes: 7 additions & 2 deletions lib/cadence/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ class Workflow
class Context
attr_reader :metadata

def initialize(state_manager, dispatcher, metadata, config)
def initialize(state_manager, dispatcher, metadata, config, query_registry)
@state_manager = state_manager
@dispatcher = dispatcher
@query_registry = query_registry
@metadata = metadata
@config = config
end
Expand Down Expand Up @@ -227,6 +228,10 @@ def on_signal(&block)
end
end

def on_query(query, &block)
query_registry.register(query, &block)
end

def cancel_activity(activity_id)
decision = Decision::RequestActivityCancellation.new(activity_id: activity_id)

Expand All @@ -246,7 +251,7 @@ def cancel(target, cancelation_id)

private

attr_reader :state_manager, :dispatcher, :config
attr_reader :state_manager, :dispatcher, :config, :query_registry

def schedule_decision(decision)
state_manager.schedule(decision)
Expand Down
Loading