Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ AMQP::Client.start("amqp://guest:guest@localhost") do |c|

name, message_count, consumer_count =
ch.queue_declare(name: "myqueue", passive: false, durable: true,
exclusive: false, auto_delete: false,
arguments: AMQP::Client::Arguments.new)
exclusive: false, auto_delete: false)
q = ch.queue # temporary queue that is deleted when the channel is closed
ch.queue_purge("myqueue")
ch.queue_bind("myqueue", "amq.topic", "routing-key")
Expand Down
4 changes: 2 additions & 2 deletions examples/streams.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ AMQP::Client.start do |c|
# prefetch required when consuming from stream queues
ch.prefetch(10)
# declare a stream queue using the x-queue-type argument
q = ch.queue("stream1", args: AMQP::Client::Arguments.new({"x-queue-type": "stream"}))
q = ch.queue("stream1", arguments: {"x-queue-type": "stream"})
puts "Waiting for messages. To exit press CTRL+C"
# Decide from where to subscribe using the x-stream-offset argument
q.subscribe(block: true, no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-offset": "first"})) do |msg|
q.subscribe(block: true, no_ack: false, arguments: {"x-stream-offset": "first"}) do |msg|
puts "Received: #{msg.body_io}"
msg.ack
end
Expand Down
5 changes: 3 additions & 2 deletions shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ authors:

dependencies:
amq-protocol:
github: cloudamqp/amq-protocol.cr
version: ~>1
#github: cloudamqp/amq-protocol.cr
#version: ~>1
path: ../amq-protocol.cr
development_dependencies:
ameba:
github: crystal-ameba/ameba
Expand Down
48 changes: 47 additions & 1 deletion src/amqp-client/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,12 @@ class AMQP::Client
ok.consumer_tag
end

def basic_consume(queue, tag = "", no_ack = true, exclusive = false,
block = false, arguments : NamedTuple = NamedTuple.new, work_pool = 1,
&blk : DeliverMessage -> Nil)
basic_consume(queue, tag, no_ack, exclusive, block, Arguments.new(arguments), work_pool, blk)
end

private def consume(consumer_tag, deliveries, done, i, log_errors, blk)
Log.context.set channel_id: @id.to_i, consumer: consumer_tag, worker: i
while msg = deliveries.receive?
Expand Down Expand Up @@ -498,11 +504,15 @@ class AMQP::Client
end

# Declares a queue with a name, by default durable and not auto-deleted
def queue(name : String, passive = false, durable = true, exclusive = false, auto_delete = false, args arguments = Arguments.new)
def queue(name : String, passive = false, durable = true, exclusive = false, auto_delete = false, arguments = Arguments.new)
q = queue_declare(name, passive, durable, exclusive, auto_delete, false, arguments)
Queue.new(self, q[:queue_name])
end

def queue(name : String, passive = false, durable = true, exclusive = false, auto_delete = false, arguments : NamedTuple = NamedTuple.new)
queue(name, passive, durable, exclusive, auto_delete, Arguments.new(arguments))
end

# Declare a queue with *name*
# *passive* will raise if the queue doesn't already exists, other arguments are ignored
# *durable* will make the queue durable on the server (note that messages have have the persistent flag set to make the messages persistent)
Expand Down Expand Up @@ -532,6 +542,15 @@ class AMQP::Client
end
end

def queue_declare(name : String, passive = false,
durable = name.empty? ? false : true,
exclusive = name.empty? ? true : false,
auto_delete = name.empty? ? true : false,
no_wait = false,
arguments : NamedTuple = NamedTuple.new)
queue_declare(name, passive, durable, exclusive, auto_delete, no_wait, Arguments.new(arguments))
end

# Delete a queue
def queue_delete(name : String, if_unused = false, if_empty = false, no_wait = false)
write Frame::Queue::Delete.new(@id, 0_u16, name, if_unused, if_empty, no_wait)
Expand Down Expand Up @@ -560,12 +579,20 @@ class AMQP::Client
expect Frame::Queue::BindOk unless no_wait
end

def queue_bind(queue : String, exchange : String, routing_key : String, no_wait = false, arguments : NamedTuple = NamedTuple.new) : Nil
queue_bind(queue, exchange, routing_key, no_wait, Arguments.new(arguments))
end

# Unbind a *queue* from an *exchange*, with a *routing_key* and optionally some *arguments*
def queue_unbind(queue : String, exchange : String, routing_key : String, args arguments = Arguments.new) : Nil
write Frame::Queue::Unbind.new(@id, 0_u16, queue, exchange, routing_key, arguments)
expect Frame::Queue::UnbindOk
end

def queue_unbind(queue : String, exchange : String, routing_key : String, arguments : NamedTuple = NamedTuple.new) : Nil
queue_unbind(queue, exchange, routing_key, Arguments.new(arguments))
end

def topic_exchange(name = "amq.topic", passive = true)
exchange(name, "topic", passive)
end
Expand Down Expand Up @@ -593,6 +620,11 @@ class AMQP::Client
Exchange.new(self, name)
end

def exchange(name, type, passive = false, durable = true,
internal = false, auto_delete = false, arguments : NamedTuple = NamedTuple.new)
exchange(name, type, passive, durable, internal, auto_delete, Arguments.new(arguments))
end

# Declares an exchange
def exchange_declare(name : String, type : String, passive = false,
durable = true, internal = false, auto_delete = false,
Expand All @@ -604,6 +636,12 @@ class AMQP::Client
expect Frame::Exchange::DeclareOk unless no_wait
end

def exchange_declare(name : String, type : String, passive = false,
durable = true, internal = false, auto_delete = false,
no_wait = false, arguments : NamedTuple = NamedTuple.new) : Nil
exchange_declare(name, type, passive, durable, internal, auto_delete, no_wait, Arguments.new(arguments))
end

# Deletes an exchange
def exchange_delete(name, if_unused = false, no_wait = false) : Nil
write Frame::Exchange::Delete.new(@id, 0_u16, name, if_unused, no_wait)
Expand All @@ -616,12 +654,20 @@ class AMQP::Client
expect Frame::Exchange::BindOk unless no_wait
end

def exchange_bind(source : String, destination : String, routing_key : String, no_wait = false, arguments : NamedTuple = NamedTuple.new) : Nil
exchange_bind(source, destination, routing_key, no_wait, Arguments.new(arguments))
end

# Unbind an exchange from another exchange
def exchange_unbind(source : String, destination : String, routing_key : String, no_wait = false, args arguments = Arguments.new) : Nil
write Frame::Exchange::Unbind.new(@id, 0_u16, source, destination, routing_key, no_wait, arguments)
expect Frame::Exchange::UnbindOk unless no_wait
end

def exchange_unbind(source : String, destination : String, routing_key : String, no_wait = false, arguments : NamedTuple = NamedTuple.new) : Nil
exchange_unbind(source, destination, routing_key, no_wait, Arguments.new(arguments))
end

# Sets the channel in publish confirm mode, each published message will be acked or nacked
def confirm_select(no_wait = false) : Nil
return if @confirm_mode
Expand Down
8 changes: 8 additions & 0 deletions src/amqp-client/exchange.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,20 @@ class AMQP::Client
self
end

def bind(exchange : String, routing_key : String, no_wait = false, arguments = NamedTuple.new)
bind(exchange, routing_key, no_wait, Arguments.new(arguments))
end

# Unbind the exchange from another exchange
def unbind(exchange : String, routing_key : String, no_wait = false, args arguments = Arguments.new)
@channel.exchange_unbind(@name, exchange, routing_key, no_wait, arguments)
self
end

def unbind(exchange : String, routing_key : String, no_wait = false, arguments = NamedTuple.new)
unbind(exchange, routing_key, no_wait, Arguments.new(arguments))
end

# Publish a message to the exchange
def publish(message, routing_key : String, mandatory = false, immediate = false, props properties = Properties.new)
@channel.basic_publish(message, @name, routing_key, mandatory, immediate, properties)
Expand Down
15 changes: 14 additions & 1 deletion src/amqp-client/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ class AMQP::Client
self
end

def bind(exchange : String, routing_key : String, no_wait = false, arguments = NamedTuple.new)
bind(exchange, routing_key, no_wait, Arguments.new(arguments))
end

# Unbind the queue from an exchange
def unbind(exchange : String, routing_key : String, args arguments = Arguments.new)
@channel.queue_unbind(@name, exchange, routing_key, arguments)
self
end

def unbind(exchange : String, routing_key : String, arguments = NamedTuple.new)
unbind(exchange, routing_key, Arguments.new(arguments))
end

# Publish a message directly to the queue
def publish(message, mandatory = false, immediate = false, props properties = Properties.new)
@channel.basic_publish(message, "", @name, mandatory, immediate, properties)
Expand Down Expand Up @@ -74,10 +82,15 @@ class AMQP::Client
#
# See `Channel#basic_consume`
def subscribe(tag = "", no_ack = true, exclusive = false, block = false,
args arguments = Arguments.new, work_pool = 1, &blk : DeliverMessage -> Nil)
arguments = Arguments.new, work_pool = 1, &blk : DeliverMessage -> Nil)
@channel.basic_consume(@name, tag, no_ack, exclusive, block, arguments, work_pool, &blk)
end

def subscribe(tag = "", no_ack = true, exclusive = false, block = false,
arguments : NamedTuple = NamedTuple.new, work_pool = 1, &blk : DeliverMessage -> Nil)
subscribe(tag, no_ack, exclusive, block, Arguments.new(arguments), work_pool, &blk)
end

# Unsubscribe from the queue
#
# See `Channel#basic_cancel` for more details
Expand Down
Loading