Publish subscribe on ruby on rails

angga kusumandaru
Aug 17 · 4 min read

Initiation and environment setup

default: &default  
adapter: mysql2 encoding: utf8 pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 5 } %> host: <%= ENV.fetch("RAILS_HOST_DB") %> username: <%= ENV.fetch("RAILS_USERNAME_DB") %> password: <%= ENV.fetch("RAILS_PASSWORD_DB") %> socket: /tmp/mysql.sock
RAILS_HOST_DB=127.0.0.1
RAILS_USERNAME_DB=root
RAILS_PASSWORD_DB=secret_password
gem ‘rspec-rails’gem ‘shoulda-matchers’

Set publisher

@connection ||= begin  instance = Bunny.new(    addresses: ENV['AMQP_ADDRESSES'].try(:split, ','),    username: ENV['AMQP_USER'],    password: ENV['AMQP_PASSWORD'],    vhost: ENV['AMQP_VHOST'],    automatically_recover: true,    connection_timeout: 2,    continuation_timeout: (ENV['CONTINUATION_TIMEOUT'] || 10_000).to_i,    logger: Rails.logger  )  instance.start  instanceend
def publish(options = {})  channel = ::Publisher::BunnyPublisher.connection.create_channel  exchange = channel.exchange(    ENV['AMQP_EXCHANGE'],    type: 'direct',    durable: true  )  headers = { 'x-delay' => options[:delay_time].to_i * 1_000 } if options[:delay_time].present?  exchange.publish(payload.to_json, routing_key: QUEUE_NAME, headers: headers)end

Set consumer

Sneakers.configure  connection: Connection.sneakers,  exchange: ENV['SNEAKER_AMQP_EXCHANGE'], # AMQP exchange  exchange_type: :direct,  runner_config_file: nil,  metric: nil,  daemonize: false,  workers: ENV['SNEAKERS_WORKER'].to_i,  log: STDOUT,  pid_path: 'sneakers.pid',  timeout_job_after: 5.minutes,  prefetch: ENV['SNEAKERS_PREFETCH'].to_i,  threads: ENV['SNEAKERS_THREADS'].to_i,  env: ENV['RAILS_ENV'],   durable: true,  ack: true,  heartbeat: 5,  handler: Sneakers::Handlers::Maxretry,  retry_max_times: 10,  retry_timeout: 3 * 60 * 1000 Sneakers.logger = Rails.loggerSneakers.logger.level = Logger::WARN
include Sneakers::WorkerQUEUE_NAME = ::UserPublisher::QUEUE_NAMEfrom_queue QUEUE_NAME, arguments: { 'x-dead-letter-exchange': "#{QUEUE_NAME}-retry" }def work(msg)  data = ActiveSupport::JSON.decode(msg)  data['users'].each do |user|    update_user(user.to_h)  end  ack!rescue StandardError => e  create_log(false, data, message: e.message)  reject!end
irb(main):002:0> user_params = [{id: 1, first_name: 'first'}]
=> [{:id=>1, :first_name=>"first"}]
irb(main):003:0> UserPublisher.new(user_params).publish
rake sneakers:run           

2019-08-17T05:36:41Z p-83921 t-owt7p5uik DEBUG: [worker-user.create:1:d5ckph][#<Thread:0x00007fd5b0885d78 run>][user.create][#<Sneakers::Configuration:0x00007fd5b6efd240>] New worker: subscribing.
2019-08-17T05:36:41Z p-83921 t-owt7p5uik DEBUG: [worker-user.create:1:d5ckph][#<Thread:0x00007fd5b0885d78 run>][user.create][#<Sneakers::Configuration:0x00007fd5b6efd240>] New worker: I'm alive.2019-08-17T05:41:03Z p-83921 t-owt7x84vw DEBUG: [worker-user.create:1:d5ckph][#<Thread:0x00007fd5b225af78@/Users/ndaru/.rbenv/versions/2.5.0/lib/ruby/gems/2.5.0/gems/bunny-2.14.2/lib/bunny/consumer_work_pool.rb:101 run>][user.create][#<Sneakers::Configuration:0x00007fd5b6efd240>] Working off: "{\"users\":[{\"id\":1,\"first_name\":\"first\"}]}"
Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade