2014/08/10

MQTT with RabbitMQ

RabbitMQ的底層是實作AMQP,如果想透過它來跑MQTT,必須先安裝RabbitMQ MQTT Adapter,只要在安裝完RabbitMQ之後,執行以下指令即安裝完成:
rabbitmq-plugins enable rabbitmq_mqtt
RabbitMQ的MQTT Adapter是基於RabbitMQ本身的Exchange和Queue上來實現的。預設RabbitMQ會把收到的MQTT訊息送到預設的topic exchange,也就是"amp.topic"這個exchange。當Client執行訂閱(subscribe)時,RabbitMQ會先為此Client建立一條Queue,可以參考下圖:

其中名稱mqtt-subscription-XXXXqos1為RabbitMQ幫你建立的Queue,而XXXX為你傳入的clientId。
接著他會為此條Queue和amp.topic建立起routing key,routing key就是你Subscribe時傳過來的topic name,只是他會將 "/"換成".",比如說你Subscribe時是寫"rec/mayer",則他會把routing key換成"rec.mayer"。可以參考下圖:

Client發送對某個MQTT的Topic 發佈(publish)訊息時,首先RabbitMQ會把此訊息送到amq.topic內,接著amq.topic將此訊息要送到的topic name取出來,把它當成routing key送給相對應的queue。這樣一來,訊息就能正確的發送給訂閱者。

修改MQTT預設exchange

預設exchange可以透過config檔案修改,首先先到/etc/rabbitmq/目錄底下,建立rabbitmq.config檔案,然後貼上以下設定檔,其中rabbitmq_mqtt[]裡面的exchange設定,就是預設要使用哪個exchange,不過在使用時必須先把該exchange建立起來。
[{rabbit,        [{tcp_listeners,    [5672]}]},
 {rabbitmq_mqtt, [{default_user,     <<"guest">>},
                  {default_pass,     <<"guest">>},
                  {allow_anonymous,  true},
                  {vhost,            <<"/">>},
                  {exchange,         <<"foo.topic">>},
                  {subscription_ttl, 1800000},
                  {prefetch,         10},
                  {ssl_listeners,    []},
                  %% Default MQTT with TLS port is 8883
                  %% {ssl_listeners,    [8883]}
                  {tcp_listeners,    [1883]},
                  {tcp_listen_options, [binary,
                                        {packet,    raw},
                                        {reuseaddr, true},
                                        {backlog,   128},
                                        {nodelay,   true}]}]}
].

Rabbit MQTT Adapter遺珠

使用RabbitMQ來當MQTT broker,要注意他只實現MQTT 3.1的一些特性:
  • QoS0 and QoS1 publish & consume
  • Last Will and Testament (LWT)
  • SSL
  • Session stickiness
由於AMQP 0-9-1沒有支援QoS 2的特性,因此這邊沒有實作,所以要注意的是如果你想用MQTT的QoS 2來實作服務時,你就不能使用RabbitMQ。關於QoS 2的特性,可以參考MQTT(二)Message Type and Flows - Publish Flows這篇的說明。
另外他也沒有實現Retain Message,當遇到時,RabbitMQ會將他默默的忽略掉。關於Retain Message可以參考 MQTT(四)PUBLISH Message之前我貼的文章來了解其作用。

參考

RabbitMQ-MQTT
RabbitMQ Blog-MQTTAdapter

2014/08/04

Jetty - Part 2/2

接續上一次的說明,我們已經讓 Embedded Jetty Server 能夠支援 http 與 https 了,接下來就是要整合既有的 WebContent,使用 web.xml ,支援 JSP, Servlet 與 Filter 等等。最後,我們再處理要支援多個 WebAppContext 的問題。

WebAppContext

先前在提供網頁服務時,很單純地只有用 Handler 實作,但一個網站並不會這麼單純,一定包含了 servlet, jsp, event listener, filter, html, css, js, images 等等這些東西,換句話說,以往在 Eclipse 使用 Web Project 開發時,WebContent 裡面的資料都要能支援。

原本 setHandler 的地方,我們必須做個調整,改成使用 WebAppContext,然後把 context 指定給 server 的 handler。

// server.setHandler(new HelloHandler());

WebAppContext context = new WebAppContext();
context.setDescriptor("../WebContent/WEB-INF/web.xml");
context.setResourceBase("../WebContent");
context.setContextPath("/examples");
context.setParentLoaderPriority(true);

server.setHandler(context);

在這樣的方式調整下,我們就可以用 http://localhost:8080/examples/ 瀏覽網站。

更複雜的 web application

為了完整 web application 的功能,我們還必須做些調整。

  1. 支援 servlet 與 jsp 的 jar files
    在測試的過程中,我們發現只有 jetty-all 這個 jar 還是不夠的,必須額外增加一些 jar,我們可以在 jetty-distribution-9.2.2\lib 的目錄裡面找到。

     javax.el-3.0.0.jar
     javax.servlet.jsp.jstl-1.2.2.jar
     javax.servlet.jsp-2.3.2.jar
     javax.servlet.jsp-api-2.3.1.jar
     servlet-api-3.1.jar
  2. 讓 WebApplicationContext 支援 JSP
    我們必須在 web.xml 裡面增加以下這個 servlet 處理 JSP files

     <servlet id="jsp">
         <servlet-name>jsp</servlet-name>
         <servlet-class>org.apache.jasper.servlet.JspServlet</servlet-class>
         <init-param>
             <param-name>logVerbosityLevel</param-name>
             <param-value>DEBUG</param-value>
         </init-param>
         <init-param>
             <param-name>fork</param-name>
             <param-value>false</param-value>
         </init-param>
         <init-param>
             <param-name>keepgenerated</param-name>
             <param-value>true</param-value>
         </init-param>
         <load-on-startup>0</load-on-startup>
     </servlet>
    
     <servlet-mapping>
         <servlet-name>jsp</servlet-name>
         <url-pattern>*.jsp</url-pattern>
         <url-pattern>*.jspf</url-pattern>
         <url-pattern>*.jspx</url-pattern>
         <url-pattern>*.xsp</url-pattern>
         <url-pattern>*.JSP</url-pattern>
         <url-pattern>*.JSPF</url-pattern>
         <url-pattern>*.JSPX</url-pattern>
         <url-pattern>*.XSP</url-pattern>
     </servlet-mapping>
  3. jetty-web.xml
    在 WEB-INF 目錄中,增加一個 jetty-web.xml 檔案,讓 WebAppContext 支援 Http Session。

     <?xml version="1.0"  encoding="ISO-8859-1"?>
     <!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure.dtd">
    
     <Configure class="org.eclipse.jetty.webapp.WebAppContext">
         <Get name="sessionHandler">
             <New class="org.eclipse.jetty.server.session.SessionHandler">
                 <Arg>
                     <New class="org.eclipse.jetty.server.session.HashSessionManager">
                         <Set name="storeDirectory">session</Set>
                     </New>
                 </Arg>
             </New>
         </Get>
     </Configure>
  4. 調整 filter-mapping
    測試過程中發現,如果將 filter-mapping 的 url-pattern 設定為 .jsp,瀏覽 jsp 網頁會一直無法先進入 filter 進行前置處理,因此要把 url-pattern 改成 /

     <filter-mapping>
         <filter-name>CookieLoginFilter</filter-name>
         <url-pattern>/*</url-pattern>
     </filter-mapping>
  5. ServletContextListener
    我們都是在 ServletContextListener 裡面處理 webapp 啟動時,必須要一併啟動的一些服務,例如 DB logback, spring, connection pool (dbcp) 還有一些 scheduler,很幸運的在 Jetty 都可以直接支援,不需要再修改程式。

  6. Dynamic Web Project 的路徑
    如果一開始是用 Eclipse 的 Dynamic Web Project 初始化專案,java 程式編譯後將會放在 project 的 /build/classes 目錄中。

    但如果程式中有使用到 ServletContext 的getResourceAsStream 的功能,再加上我們把 Jetty 的 WebAppContext 的 setResourceBase 指定到 WebContent 目錄,這時就會發生找不到檔案的問題。

    我們必須調整 project 設定,在 Java Build Path 中將 Default output folder 由 project/build/classes 改為 project/WebContent/WEB-INF/classes 。

就這樣修改到這邊,我們的 Embedded Jetty 已經可以支援一個 web application 了。

Multiple Contexts

通常我們會希望除了能支援一個 web context 之外,server 的 root context 也要能使用,因此我們參考 ManyContexts.java 的作法。

利用 ContextHandlerCollection 將多個 context handler 集合起來,然後再設定給 server 的 handler。

WebAppContext context = new WebAppContext();
context.setDescriptor("../WebContent/WEB-INF/web.xml");
context.setResourceBase("../WebContent");
context.setContextPath("/examples");
context.setParentLoaderPriority(true);

ContextHandler rootcontext = new ContextHandler("/");
rootcontext.setContextPath("/");
rootcontext.setHandler(new HelloHandler());

ContextHandlerCollection contexts = new ContextHandlerCollection();
contexts.setHandlers(new Handler[] { rootcontext, context });

server.setHandler(contexts);

Jetty - Part 1/2

隨著用了越來越久的 Apache Tomcat,Tomcat 似乎也跟隨著 JDK 的腳步,越來越龐大,這時候,不妨考慮試試看,把原本用在 Tomcat 的 webapp,改成使用 Jetty 來運作。

Jetty目前已經是 9.2 版,除了能像 Tomcat 一樣 stand alone 運作,再將 application 放到 deployment 的 webapps 目錄中,最重要的是能像一般執行 Java Application 一樣,將 webapp server 以 Embedded 的方式啟動。

關於 Embedded Jetty 的文章並不多,有可能是使用的人還不多,另外有個問題,網路上找到的介紹文章,也會因為 Jetty 版本的更新,舊的寫法可能就沒有用了,這對 open source project 來說是個致命傷,使用 Tomcat 基本上就比較不會有這樣的問題,網路上隨手搜尋到的資料,通常都是正確且可以使用的。

因此學習的過程,就是參考官方網頁的 Embedding Jetty Tutorial,以及使用最新版的 sample code,沒有別的方法。

取得 Jetty

我們在 Jetty 官方網頁 下載 頁面中,只能下載到完整的 Jetty (ex: jetty-distribution-9.2.2.v20140723.zip),但是因為我們並不是要 stand alone 執行 Jetty,而是要用 Embedded 的方式,因此我們需要一個更方便使用的 Jetty jar file,把所有跟 Jetty 有關的程式都集中到一個 jar 檔裡面,我們可以到 Maven jetty-all 頁面中,下載這樣的 jar 檔 (ex: jetty-all-9.2.2.v20140723.jar)。

Simplest Server

最簡單的 Server 就是把網頁服務的 Port 啟動,其他什麼事都不做。以下的程式會啟動 TCP Port 8080 作為 HTTP 的 service port。

public class SimplestServer
{
    public static void main(String[] args) throws Exception
    {
        Server server = new Server(8080);
        server.start();
        server.join();
    }
}

接下來,我們要進一步撰寫提供網頁服務的處理器 Handler,以下的 HelloHandler 很單純地就是產生一個 HTML 網頁資料,並寫上 HelloWorld

import java.io.IOException;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;

public class HelloHandler extends AbstractHandler {
    public void handle(String target, Request baseRequest,
            HttpServletRequest request, HttpServletResponse response)
            throws IOException, ServletException {
        response.setContentType("text/html;charset=utf-8");
        response.setStatus(HttpServletResponse.SC_OK);
        baseRequest.setHandled(true);
        response.getWriter().println("<h1>Hello World</h1>");
    }
}

為了讓 HelloHandler 產生作用,我們必須調整剛剛的 Server 程式,以 setHandler 將 HelloHandler 指定給 server 使用。

public class SimplestServer {
    public static void main(String[] args) throws Exception {
        Server server = new Server(8080);
        server.setHandler(new HelloHandler());

        server.start();
        server.join();
    }
}

這時候如果啟動 SimplestServer,就可以用瀏覽器連結 http://localhost:8080/ 看到網頁。

ManyConnectors

一個基本的 web application server 必須要能支援 HTTP 與 HTTPS 兩種協定,因此我們接下來參考 ManyConnectors.java ,將 HTTP 與 HTTPS 環境建立起來。

首先,我們用 JDK 提供的 keytool 產生一個新的 ssl key。

keytool -genkey -dname "cn=maxkitcn, ou=maxkitou, o=maxkit, c=TW" -keyalg RSA -alias server -keypass yourkeypassword -keystore d:\temp\maxkitkeystore -storepass yourstorepassword -validity 36500

KeyStore javadoc 裡面提到,KeyStore Type 有兩種:JKS 或是 PKCS12,我們剛用 keytool 建立的是 PKCS12,這跟 Jetty 的預設 type 是不同的,因此 ManyConnector 裡面啟用 ssl 的方式,必須要做些微的調整。

另外為了增加程式的彈性,我們把 Port 跟 keystore 的一些設定值放到獨立的 properties 檔案中。

首先是設定檔 server.properties

jetty.port=8080
jetty.ssl.port=8443
jetty.ssl.keystore=/maxkitkeystore
jetty.ssl.keystore.storepass=yourkeypassword

另外也將 maxkitkeystore 檔案放在 src 目錄中。

ServerProperties.java 用來讀取 server.properties 檔案的設定。

public class ServerProperties extends Properties {
    private static final String PROPERTIES = "/server.properties";

    private Properties properties = new Properties();

    public Map<Object, Object> getProperties() {
        InputStream stream = this.getClass().getResourceAsStream(PROPERTIES);
        try {
            properties.load(stream);
        }
        catch (IOException ex) {
            ex.printStackTrace();
        }
        return Collections.unmodifiableMap(properties);
    }
}

ManyConnectors.java 設定 http 與 https,並沿用剛剛的 HelloHandler 作為提供網頁資料的處理器。這邊要注意,新舊版 ManyConnectors 範例程式的寫法是完全不一樣的,要注意看一下找到的資料是支援 Jetty 那一個版本。

import java.security.KeyStore;
import java.util.Map;

import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.webapp.WebAppContext;

public class ManyConnectors {
    private static final Map<?, ?> properties = new ServerProperties()
            .getProperties();

    private static final String DEFAULT_PORT = String.valueOf(properties
            .get("jetty.port"));
    private static final String DEFAULT_SSL_PORT = String.valueOf(properties
            .get("jetty.ssl.port"));
    private static final String SSL_KEYSTORE_FILE = String.valueOf(properties
            .get("jetty.ssl.keystore"));
    private static final String SSL_KEYSTORE_PASS = String.valueOf(properties
            .get("jetty.ssl.keystore.storepass"));

    private static Server server;

    public void startServer() throws Exception {
        startServer(DEFAULT_PORT, DEFAULT_SSL_PORT);
    }

    public void startServer(String port, String sslPort) throws Exception {
        server = new Server();
        logger.info("Starting Server...");
        // HttpConfiguration 告訴 server 要支援 https
        HttpConfiguration http_config = new HttpConfiguration();
        http_config.setSecureScheme("https");
        http_config.setSecurePort((sslPort.isEmpty() ? Integer
                .valueOf(DEFAULT_SSL_PORT) : Integer.valueOf(sslPort)));
        http_config.setOutputBufferSize(32768);

        // HTTP connector
        ServerConnector http = new ServerConnector(server,
                new HttpConnectionFactory(http_config));
        http.setPort((port.isEmpty() ? Integer.valueOf(DEFAULT_PORT) : Integer
                .valueOf(port)));
        http.setIdleTimeout(30000);

        // SSL Context Factory for HTTPS
        SslContextFactory sslContextFactory = new SslContextFactory();
        sslContextFactory.setKeyStorePassword(SSL_KEYSTORE_PASS);

        KeyStore keyStore = KeyStore.getInstance("PKCS12");
        keyStore.load(this.getClass().getResourceAsStream(SSL_KEYSTORE_FILE),
                SSL_KEYSTORE_PASS.toCharArray());
        sslContextFactory.setKeyStore(keyStore);

        // HTTPS Configuration
        HttpConfiguration https_config = new HttpConfiguration(http_config);
        https_config.addCustomizer(new SecureRequestCustomizer());

        // HTTPS connector
        ServerConnector https = new ServerConnector(server,
                new SslConnectionFactory(sslContextFactory, "http/1.1"),
                new HttpConnectionFactory(https_config));
        https.setPort((sslPort.isEmpty() ? Integer.valueOf(DEFAULT_SSL_PORT)
                : Integer.valueOf(sslPort)));
        https.setIdleTimeout(500000);

        Connector[] connectors = new Connector[] { http, https };

        server.setConnectors(connectors);

        server.setHandler(new HelloHandler());

        server.start();
        server.join();

        logger.info("Started Server");
    }

    public void stopServer() throws Exception {
        logger.info("Stopping Server...");
        server.stop();
        logger.info("Server Stopped with stopServer() method.");
    }
}

最後寫一個啟動的 Main 程式 SimplestServer2.java

public class SimplestServer2 {
    public static void main(String[] args) throws Exception {
        ManyConnectors server = new ManyConnectors();
        server.startServer();
    }
}

啟動 server 後,就可以用 http://localhost:8080/ 或是 https://localhost:8443/ 看到 HelloWorld 網頁。