2014年1月28日

Apache Commons Pool的使用

以下介紹Pool的概念並以Apache Commons Pool來製作rabbitmq channel Pool。

Pool的概念

當系統在運行過程中需要建立到網路連線例如資料庫連線,像這類建立連線的動作是會比較消耗系統資源。以資料庫驗証的登入為例子,一個登入的動作需要建立資料庫連線,建立完後僅執行一行sql,接著就關閉資料庫連線。建立資料庫連線會比執行一行sql所花的時間還來的久,因此連線物件一經建立後應該要拿來多加使用才有效率。好在目前已經有許多成熟的Open Source資料庫Pool library像是DBCPProxool可以讓我們重覆的使用已經建立好的資料庫Connection物件。

Pool如何運作?

我們可以先建立物件後放到Pool裡,如果有thread要使用Pool裡的物件的話則可從Pool裡面取出(borrow),物件使用完後需歸還(return)進Pool供其它thread取用。我們可以限制Pool裡的物件最大與最小的數量。如果thread向Pool borrow物件卻發現Pool裡已經無可用物件時(因為被借光了),我們可以決定要讓這個thread做等待或者是直接丟出例外。

如何自訂Pool

Apache Commons Pool library提供了object-Pooling API讓我們很輕易的開發Pool的功能,以下以Apache Commons Pool來製作rabbitmq channel Pool。

下列為將訊息傳送至rabbitmq的queue裡的程式碼,一般來說程式碼會像這樣子

//建立rabbitmq的連線
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();

//利用連線物件來建立channel
Channel channel = connection.createChannel();

//利用channel將訊息Hello World!送至名為[myqueue]的Queue裡
String message = "Hello World!";
channel.basicPublish("", "myeuque", null, message.getBytes());

建立connection, channel是會消耗resource的,rabbitmq允許在一個connection上建立多個channel,因此我們只需製作channel的Pool即可。

以下建立ChannelFactory,此Factory要負責Pool裡物件的建立與銷毀…等。

import org.apache.commons.Pool.BasePoolableObjectFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class ChannelFactory extends BasePoolableObjectFactory<Channel> {

    private Connection connection = null;

    public ChannelFactory(Connection connection) {
        //connection在外部建立完後再傳進來
        this.connection = connection;
    }

    @Override
    public Channel makeObject() throws Exception {
        //此method負責建立Pool裡的物件
        try {
            //利用connection來建立channel
            return connection.createChannel();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public void activateObject(Channel obj) throws Exception {
        super.activateObject(obj);
    }

    @Override
    public void destroyObject(Channel obj) throws Exception {
        //Pool裡的物件不再使用要銷毀時的處理
        super.destroyObject(obj);

        //關閉channel
        obj.close();
    }

    @Override
    public void passivateObject(Channel obj) throws Exception {
        // when an object is returned to the Pool,
        // we'll clear it out
        super.passivateObject(obj);
    }

    @Override
    public boolean validateObject(Channel obj) {
        return super.validateObject(obj);
    }
}

以下為ChannelPool, 此類別本身以singleton來實作並決定Pool裡最大建立多少物件,以及取物件時最長等多久,並提供borrowObject, returnObject method

import org.apache.commons.Pool.impl.GenericObjectPool;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ChannelPool {

    private volatile static ChannelPool _instance;

    private GenericObjectPool<Channel> Pool;
    private Connection connection = null;

    public static ChannelPool getInstance() {
        if (_instance == null) {
            synchronized (ChannelPool.class) {
                if (_instance == null) {
                    _instance = new ChannelPool();
                }
            }
        }
        return _instance;
    }

    private ChannelPool() {
        init();
    }

    private void init() {
        try {
            //建立connection物件            
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();

            //connection物件給ChannelFactory拿來建立Pool裡的channel物件使用
            ChannelFactory cf = new ChannelFactory(connection);
            Pool = new GenericObjectPool<Channel>(cf);

            //Pool裡的物件最多產生幾個
            Pool.setMaxActive(5);

            //當borrow被呼叫時且Pool裡已無可用物件時,最長可等多久再丟exception
            Pool.setMaxWait(30000);

        } catch (IOException e) {
            logger.error("error:", e);
        }
    }

    public Channel borrowObject() throws Exception {
        return Pool.borrowObject();
    }

    public void returnObject(Channel channel) throws Exception {
        Pool.returnObject(channel);
    }

    public void close() {
        try {
            //關閉 Pool
            Pool.close();

            //關閉rabbitmq connection
            this.connection.close();

        } catch (Exception e) {
            logger.error("error:", e);
        }
    }
}

當Pool相關類別寫完後,我們可以使用下列的程式碼來取用、歸還channel

//取得channel物件
Channel channel = ChannelPool.getInstance().borrowObject();

//送訊息至queue
channel.basicPublish("", "myeuque", null, message.getBytes());

//歸還channel物件
ChannelPool.getInstance().returnObject(channel);