Am i using eventmachine in the right way?
I am using ruby-smpp and redis to achive a queue based background worker to send SMPP messages.
And i am wondering if I am using eventmachine in the right way. It works but it doesnt feel right.
#!/usr/bin/env ruby
# Sample SMS gateway that can receive MOs (mobile originated messages) and
# DRs (delivery reports), and send MTs (mobile terminated messages).
# MTs are, in the name of simplicity, entered on the command line in the format
# <sender> <receiver> <message body>
# MOs and DRs will be dumped to standard out.
require 'smpp'
require 'redis/connection/hiredis'
require 'redis'
require 'yajl'
require 'time'
LOGFILE = File.dirname(__FILE__) + "/sms_gateway.log"
PIDFILE = File.dirname(__FILE__) + '/worker_test.pid'
Smpp::Base.logger = Logger.new(LOGFILE)
#Smpp::Base.logger.level = Logger::WARN
REDIS = Redis.new
class MbloxGateway
# MT id counter.
@@mt_id = 0
# expose SMPP transceiver's send_mt method
def self.send_mt(sender, receiver, body)
if sender =~ /[a-z]+/i
source_addr_ton = 5
else
source_addr_ton = 2
end
@@mt_id += 1
@@tx.send_mt(('smpp' + @@mt_id.to_s), sender, receiver, body, {
:source_addr_ton => source_addr_ton
# :service_type => 1,
# :source_addr_ton => 5,
# :source_addr_npi => 0 ,
# :dest_addr_ton => 2,
# :dest_addr_npi => 1,
# :esm_class => 3 ,
# :protocol_id => 0,
# :priority_flag => 0,
# :schedule_delivery_time => nil,
# :validity_period => nil,
# :registered_delivery=> 1,
# :replace_if_present_flag => 0,
# :data_coding => 0,
# :sm_default_msg_id => 0
#
})
end
def logger
Smpp::Base.logger
end
def start(config)
# Write this workers pid to a file
File.open(PIDFILE, 'w') { |f| f << Process.pid }
# The transceiver sends MT messages to the SMSC. It needs a storage with Hash-like
# semantics to map SMSC message IDs to your own message IDs.
pdr_storage = {}
# Run EventMachine in loop so we can reconnect when the SMSC drops our connection.开发者_运维技巧
loop do
EventMachine::run do
@@tx = EventMachine::connect(
config[:host],
config[:port],
Smpp::Transceiver,
config,
self # delegate that will receive callbacks on MOs and DRs and other events
)
# Let the connection start before we check for messages
EM.add_timer(3) do
# Maybe there is some better way to do this. IDK, But it works!
EM.defer do
loop do
# Pop a message
message = REDIS.lpop 'messages:send:queue'
if message # If there is a message. Process it and check the queue again
message = Yajl::Parser.parse(message, :check_utf8 => false) # Parse the message from Json to Ruby hash
if !message['send_after'] or (message['send_after'] and Time.parse(message['send_after']) < Time.now)
self.class.send_mt(message['sender'], message['receiver'], message['body']) # Send the message
REDIS.publish 'log:messages', "#{message['sender']} -> #{message['receiver']}: #{message['body']}" # Push the message to the redis queue so we can listen to the channel
else
REDIS.lpush 'messages:queue', Yajl::Encoder.encode(message)
end
else # If there is no message. Sleep for a second
sleep 1
end
end
end
end
end
sleep 2
end
end
# ruby-smpp delegate methods
def mo_received(transceiver, pdu)
logger.info "Delegate: mo_received: from #{pdu.source_addr} to #{pdu.destination_addr}: #{pdu.short_message}"
end
def delivery_report_received(transceiver, pdu)
logger.info "Delegate: delivery_report_received: ref #{pdu.msg_reference} stat #{pdu.stat}"
end
def message_accepted(transceiver, mt_message_id, pdu)
logger.info "Delegate: message_accepted: id #{mt_message_id} smsc ref id: #{pdu.message_id}"
end
def message_rejected(transceiver, mt_message_id, pdu)
logger.info "Delegate: message_rejected: id #{mt_message_id} smsc ref id: #{pdu.message_id}"
end
def bound(transceiver)
logger.info "Delegate: transceiver bound"
end
def unbound(transceiver)
logger.info "Delegate: transceiver unbound"
EventMachine::stop_event_loop
end
end
# Start the Gateway
begin
puts "Starting SMS Gateway. Please check the log at #{LOGFILE}"
# SMPP properties. These parameters work well with the Logica SMPP simulator.
# Consult the SMPP spec or your mobile operator for the correct settings of
# the other properties.
config = {
:host => 'server.com',
:port => 3217,
:system_id => 'user',
:password => 'password',
:system_type => 'type', # default given according to SMPP 3.4 Spec
:interface_version => 52,
:source_ton => 0,
:source_npi => 1,
:destination_ton => 1,
:destination_npi => 1,
:source_address_range => '',
:destination_address_range => '',
:enquire_link_delay_secs => 10
}
gw = MbloxGateway.new
gw.start(config)
rescue Exception => ex
puts "Exception in SMS Gateway: #{ex} at #{ex.backtrace.join("\n")}"
end
Some easy steps to make this code more EventMachine-ish:
- Get rid of the blocking Redis driver, use em-hiredis
- Stop using defer. Pushing work out to threads with the Redis driver will make things even worse as it relies on locks around the socket it's using.
- Get rid of the add_timer(3)
- Get rid of the inner loop, replace it by rescheduling a block for the next event loop using EM.next_tick. The outer one is somewhat unnecessary. You shouldn't loop around EM.run as well, it's cleaner to properly handle a disconnect by doing a reconnect in your unbound method instead of stopping and restarting the event loop, by calling the @@tx.reconnect.
- Don't sleep, just wait. EventMachine will tell you when new things come in on a network socket.
Here's how the core code around EventMachine would look like with some of the improvements:
def start(config)
File.open(PIDFILE, 'w') { |f| f << Process.pid }
pdr_storage = {}
EventMachine::run do
@@tx = EventMachine::connect(
config[:host],
config[:port],
Smpp::Transceiver,
config,
self
)
REDIS = EM::Hiredis.connect
pop_message = lambda do
REDIS.lpop 'messages:send:queue' do |message|
if message # If there is a message. Process it and check the queue again
message = Yajl::Parser.parse(message, :check_utf8 => false) # Parse the message from Json to Ruby hash
if !message['send_after'] or (message['send_after'] and Time.parse(message['send_after']) < Time.now)
self.class.send_mt(message['sender'], message['receiver'], message['body'])
REDIS.publish 'log:messages', "#{message['sender']} -> #{message['receiver']}: #{message['body']}"
else
REDIS.lpush 'messages:queue', Yajl::Encoder.encode(message)
end
end
EM.next_tick &pop_message
end
end
end
end
Not perfect and could use some cleaning up too, but this is more what it should be like in an EventMachine manner. No sleeps, avoid using defer if possible, and don't use network drivers that potentially block, implement traditional loop by rescheduling things on the next reactor loop. In terms of Redis, the difference is not that big, but it's more EventMachine-y this way imho.
Hope this helps. Happy to explain further if you still have questions.
You're doing blocking Redis calls in EM's reactor loop. It works, but isn't the way to go. You could take a look at em-hiredis to properly integrate Redis calls with EM.
精彩评论