以下介紹Pool的概念並以Apache Commons Pool來製作rabbitmq channel Pool。
Pool的概念
當系統在運行過程中需要建立到網路連線例如資料庫連線,像這類建立連線的動作是會比較消耗系統資源。以資料庫驗証的登入為例子,一個登入的動作需要建立資料庫連線,建立完後僅執行一行sql,接著就關閉資料庫連線。建立資料庫連線會比執行一行sql所花的時間還來的久,因此連線物件一經建立後應該要拿來多加使用才有效率。好在目前已經有許多成熟的Open Source資料庫Pool library像是DBCP、Proxool可以讓我們重覆的使用已經建立好的資料庫Connection物件。
Pool如何運作?
我們可以先建立物件後放到Pool裡,如果有thread要使用Pool裡的物件的話則可從Pool裡面取出(borrow),物件使用完後需歸還(return)進Pool供其它thread取用。我們可以限制Pool裡的物件最大與最小的數量。如果thread向Pool borrow物件卻發現Pool裡已經無可用物件時(因為被借光了),我們可以決定要讓這個thread做等待或者是直接丟出例外。
如何自訂Pool
Apache Commons Pool library提供了object-Pooling API讓我們很輕易的開發Pool的功能,以下以Apache Commons Pool來製作rabbitmq channel Pool。
下列為將訊息傳送至rabbitmq的queue裡的程式碼,一般來說程式碼會像這樣子
//建立rabbitmq的連線
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
//利用連線物件來建立channel
Channel channel = connection.createChannel();
//利用channel將訊息Hello World!送至名為[myqueue]的Queue裡
String message = "Hello World!";
channel.basicPublish("", "myeuque", null, message.getBytes());
建立connection, channel是會消耗resource的,rabbitmq允許在一個connection上建立多個channel,因此我們只需製作channel的Pool即可。
以下建立ChannelFactory,此Factory要負責Pool裡物件的建立與銷毀…等。
import org.apache.commons.Pool.BasePoolableObjectFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class ChannelFactory extends BasePoolableObjectFactory<Channel> {
private Connection connection = null;
public ChannelFactory(Connection connection) {
//connection在外部建立完後再傳進來
this.connection = connection;
}
@Override
public Channel makeObject() throws Exception {
//此method負責建立Pool裡的物件
try {
//利用connection來建立channel
return connection.createChannel();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
@Override
public void activateObject(Channel obj) throws Exception {
super.activateObject(obj);
}
@Override
public void destroyObject(Channel obj) throws Exception {
//Pool裡的物件不再使用要銷毀時的處理
super.destroyObject(obj);
//關閉channel
obj.close();
}
@Override
public void passivateObject(Channel obj) throws Exception {
// when an object is returned to the Pool,
// we'll clear it out
super.passivateObject(obj);
}
@Override
public boolean validateObject(Channel obj) {
return super.validateObject(obj);
}
}
以下為ChannelPool, 此類別本身以singleton來實作並決定Pool裡最大建立多少物件,以及取物件時最長等多久,並提供borrowObject, returnObject method
import org.apache.commons.Pool.impl.GenericObjectPool;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ChannelPool {
private volatile static ChannelPool _instance;
private GenericObjectPool<Channel> Pool;
private Connection connection = null;
public static ChannelPool getInstance() {
if (_instance == null) {
synchronized (ChannelPool.class) {
if (_instance == null) {
_instance = new ChannelPool();
}
}
}
return _instance;
}
private ChannelPool() {
init();
}
private void init() {
try {
//建立connection物件
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
//connection物件給ChannelFactory拿來建立Pool裡的channel物件使用
ChannelFactory cf = new ChannelFactory(connection);
Pool = new GenericObjectPool<Channel>(cf);
//Pool裡的物件最多產生幾個
Pool.setMaxActive(5);
//當borrow被呼叫時且Pool裡已無可用物件時,最長可等多久再丟exception
Pool.setMaxWait(30000);
} catch (IOException e) {
logger.error("error:", e);
}
}
public Channel borrowObject() throws Exception {
return Pool.borrowObject();
}
public void returnObject(Channel channel) throws Exception {
Pool.returnObject(channel);
}
public void close() {
try {
//關閉 Pool
Pool.close();
//關閉rabbitmq connection
this.connection.close();
} catch (Exception e) {
logger.error("error:", e);
}
}
}
當Pool相關類別寫完後,我們可以使用下列的程式碼來取用、歸還channel
//取得channel物件
Channel channel = ChannelPool.getInstance().borrowObject();
//送訊息至queue
channel.basicPublish("", "myeuque", null, message.getBytes());
//歸還channel物件
ChannelPool.getInstance().returnObject(channel);