2014/08/25

opensips 簡介 1/2

opensips 在SIP 環境的定位是提供 registrar, proxy, redirect, location services。最重要的是,可高速處理 sip headers 並提供封包的 routing 功能。其他的部份,例如 NAT traversal, IMS, load balancing 等等功能則是由 3rd party modules 提供。

registrar 可管理某個 domain 下的 SIP UA,讓這些 UA 註冊並受該 server 管理。

proxy server的用途是,接收 UA 的 request,當受話端不在同一個 domain 時,就把 request 轉送到另一個 SIP proxy server。

redirect server 接收 request 之後,直接以 302 Moved Temporarily 回應給 UA。

opensips history

在 2004 年,德國 FhG Fokus research institute 以 GPL 發布了 SER,其後 OpenSERs fork 該專案,開發 SER 的單位輾轉成立了 Voice System 公司,繼續維護 OpenSER。

OpenSERs 在 2008 年又經歷了一次分裂,成了兩個專案:Kamailio 與 OpenSIPs。opensips 核心非常小,但以上百個 modules 來延伸其功能,目前可以用來提供 SIP firewals, session border controllers(SBCs), load balancers 等等功能。

以下列出重要的 module 與功能

  1. dispatcher, path: 這兩個模組用來提供 load balancing 的功能
  2. mediaproxy, rtpproxy, nathelper: 提供 NAT traversal 的功能
  3. presence: 處理 presense server
  4. IMC, XMPP: instant messaging

安裝 opensip 的程序,可參閱這篇OpenSIPS and Control Panel Install Guide,雖然版本有點差異,安裝過程可套用在目前的版本 opensips 1.11.1, opensips-cp 5.0 上。

opensips.cfg

該設定檔可分成三大區塊: Global Configuration, Modules, Routing Logic。

# Global Configurations
debug=3
log_stderror=no
log_facility=LOG_LOCAL0

fork=yes
children=4

# Modules Section
mpath="/usr/lib64/opensips/modules/"

### Module Loading: 用 loadmodule 載入 modules
loadmodule "signaling.so"
loadmodule "sl.so"
loadmodule "tm.so"

### Module Specific Parameters
modparam("tm", "fr_timer", 5)
modparam("tm", "fr_inv_timer", 30)
modparam("tm", "restart_fr_on_each_reply", 0)
modparam("tm", "onreply_avp_mode", 1)

# Routing Logic
### Main routing block: SIP request 處理的起點,每個 request 都會經過這裡

### Secondary routing blocks: 可定義 route(),用起來類似 subroutines

### Reply routing blocks: 處理 reply messages, 包含 provisional, successfule final replies, negative final replies,通常是 200 OK

### Failure routes blocks: 處理錯誤狀況, 例如 busy or timeout

### Branch route blocks: 在把 request forward 出去之前,每一條 branch 都可執行一段 logic 

### Local routing blocks: 當 opensips 內部產生 request (通常只作為 UAS),就使用 Transaction Module(TM) 

### Error routing block: 當 sip request parsing error 發生時,就執行這個

Sessions, dialogs, and transactions

在處理 opensips processing 之前,我們要先知道這三個名詞的意思

  1. SIP transaction: A SIP request,包含了 resends 與 direct responses,也就是 REGISTER 與 200 OK

  2. SIP dialog: 兩個 SIP entities 在某個時間點,存在的相互關係,換句話說,就是在兩個 UACs 之間,從 INVITE 到 BYE 之間,建立的一個對話

  3. SIP Session: 在兩個 SIP entities 之間傳送的 media flow,可能是 audio/video/text

SIP proxy: stateless or stateful

stateless 的作法,proxy server 會在 forward 訊息之後,把所有 message 的 internal information 丟棄。單純地只會把資料轉送到 request 裡面提供的下一個節點。

如果需要提供 billing, call forwarding, voicemail 功能,就需要使用 stateful mode,每個 transaction 都需要存在記憶體中,並處理 failure, response, retransmissions 等等狀況。這是 Transaction Module(TM) module 提供的功能。

要注意,stateful 的狀態維護是針對 transaction 處理而不是 dialog,因此從 INVITE 到 200 OK response 的處理是 stateful,而不是從 INVITE 到 BYE request,這個是 dialog。

stateful operation 的處理過程

Script and Routing Basics

一般安裝好 opensips 之後,沒有調整什麼特別的設定的時候,設定檔會放在 /usr/etc/opensips 這個目錄中,我們可以自己做個 link,改用 /etc/opensips 這個目錄。

根據上面 opensips.cfg 那個段落的內容,這個設定檔裡面可分為 (1) Global Parameters (2) Load Modules (3) Module Parameters (4) Routing Script 這四個部份。

將 opensips 設定完成後,最基本可以達成以下任務。

  1. 建立一個 SIP Server
  2. 可讓內部網路的 UACs 連接
  3. 在不同 UACs 之間互相撥打電話
  4. 話機不需要驗證,不使用資料庫
  5. 不支援 PSTN,只能在話機之間通話

Global Parameters

Listen Interfaces

不設定 listen 時,系統將會自動綁定所有的網路界面。

port 參數只是針對 SIP Server 設定 UDP service port

listen=udp:192.168.152.148:5060
listen=tcp:192.168.152.148:5061
listen=tls:192.168.152.148:5062
port=5060
Logging

在正式環境,就設定 debug=3,在 debug 環境,就設定 debug=9。

可以用這個 MI command 調整 log level

opensipsctl fifo debug 1

在 opensips.cfg 裡面的 log 相關設定範例如下

debug=3
log_stderror=no
log_facility=LOG_LOCAL0

Number of processes

opensips process 可運作在 foreground 或background,可設定 fork=yes,指定運作在 background。如果設定運作在 foreground,opensips就無法 listen 多個網路界面,tcp與tls 也會自動 disabled。

children 是設定每一個網路界面有幾個 processes 提供服務。

fork=yes
children=4
tcp_children=6
disable_tcp=no
disable_tls=no
Daemon options
gid/group=sip # unix group
uid/user=sip # unix user
wdir="/" # working directory
chroot="/usr/local/opensips-1.6"
SIP identity

設定 SIP request/response 裡面的標準參數。

server_header="Server: My openSIpS 
#default is "openSIpS (<version> (<arch>/<os>))"
server_signature = yes
user_agent_header="User-Agent: My openSIpS
其他

alias 很重要,這是設定 SIP server 支援的 domain name。

alias="mydomain.sip" # to set alias hostnames for the server
auto_aliases=no # discover aliases via reversed DNS
disable_dns_failover = yes
sip_warning=yes #add a debugging header in replies
global parameters 區塊
####### Global parameters #########
debug=3      # set the debug leve to 3
log_stderror=no    # log to syslog
log_facility=LoG_LoCAL0  # Log to facility LoG_LoCAL0
fork=yes      # Run as a daemon
children=4     # open 4 child process for 
each UDpaddress

/* uncomment the next line to disable TCp(default on) */
#disable_tcp=yes

/* uncomment the next line to enable the auto temporary blacklisting of not available destinations (default disabled) */
#disable_dns_blacklist=no

/* uncomment the next line to enable Ipv6 lookup after Ipv4 dns lookup failures (default disabled) */
#dns_try_ipv6=yes

/* uncomment the next line to disable the auto discovery of local aliases based on revers DNS on Ips (default on) */
#auto_aliases=no

/* uncomment the following lines to enable TLS support (default off) */
#disable_tls = no
#listen = tls:your_Ip:5061
#tls_verify_server = 1
#tls_verify_client = 1
#tls_require_client_certificate = 0
#tls_method = TLSv1
#tls_certificate = "//etc/opensips/tls/user/user-cert.pem"
#tls_private_key = "//etc/opensips/tls/user/user-privkey.pem"
#tls_ca_list = "//etc/opensips/tls/user/user-calist.pem"
port=5060 # Run on port 5060

/* uncomment and configure the following line if you want opensips to bind on a specific interface/port/proto (default bind on all  available) */
#listen=udp:192.168.1.2:5060

Modules

指定 modules 的路徑

#set module path
mpath="/usr/lib64/opensips/modules/"

設定 FIFO file 的路徑,用來處理與暫存外部指令。

#### FIFO Management Interface
loadmodule "mi_fifo.so"
modparam("mi_fifo", "fifo_name", "/tmp/opensips_fifo")
modparam("mi_fifo", "fifo_mode", 0666)

Scripting Basics

opensips 使用類似 C 語言的 scripting language,這可以用來 routing SIP requests 與處理 SIP replies。

除了 core functions/values 之外,其他 functions 都是由 modules 提供。

Core functions
  1. forward();
    Route the request "stateless" (based on R-URI)
  2. drop();
    停止執行 script
  3. exit();
    立即終止 script processing

其他 core functions:
seturi()
setflag()
isflagset()
strip()
prefix()
rewritehostport()

Core values

script 裡面預先定義了一些 values 可以使用

  1. INET/INET6
    IPv4 or IPv6
  2. TCP/TLS/UDP
    protocol
  3. myself
    a reference to the list of local IP addresses, hostnames, and aliases
Core keywords

可用來識別 SIP message裡面的特殊欄位的值

  1. af
    Address family(INET/INET6)
  2. proto
    Protocol(TCP/TLS/UDP)
  3. dst_ip
    在 SIP message 中收到的 local interface 的 IP
  4. method
    SIP method of this message
  5. Status
    用在 onreply_route 時,這個變數會參考到 reply 的 status code
  6. retcode
    上一個 function 處理的結果
  7. uri
    request URI
  8. from_uri
    FROM header 裡面的 URI
  9. to_uri
    TO header 裡面的 URI

使用範例

if(af==INET6) {
log("Message received over Ipv6 link\n");
};
if(is_method("INVITE") && from_uri=~".*@opensips.org")
{
log("the caller is from opensips.org\n");
};
Pseudo-variables

就是 system variables,例如 SIP Messages 可取得 headers, R-URI, source IP。從 opensips 可取得 time, process ID。

Pseudo-variables 是以 $ 開頭,如果 script 裡面要使用該符號,就要加上 esacape $$,完整的 Pseudo-variables 可在這個網頁取得。

Script variables

可在 config script 裡面使用的變數,這些變數只存在於 script 的執行期,執行結束後,就會被刪除。該變數可儲存數字或字串。

$var(name)

範例

$var(b)=1;
$var(b)="1";
$var(b)="$fu"+"$tu";
$var(b)=1+2;

可使用以下的 operations

+
-
/
*
%: modulo division
|: Bitwise OR
&: Bitwise AND
^: Bitwise XOR
~: Bitwise NOT

也可使用下列的字串處理方法

{s.len}
{s.int}
{s.substr,offset,length}
{s.select,index,separator}
{uri.user}
{uri.host}
{uri.params}
{param.value,name} - returns the value of parameter "name"

範例

"a=1;b=2;c=3"{param.value,c} = "3"
Attribute-Value Pair (AVP)

這個變數是在 statefule mode,附屬在 SIP message 的變數,所以 AVP 是 transaction-persistent variables。AVP 會在 transaction 開始時被配置,而在 transaction 結束時被釋放。

AVP 的格式:
$avp(id[N])

id
(1) si:name
    AVP identifier name, s 與 i 是字串或數字
(2) name
    AVP alias name, 字串或數字

範例:
$avp(i:700)
$avp(s:blacklist)

AVPs 相關的函數

  1. avp_db_load: 由 DB 將 AVPs 載入至記憶體
  2. avp_db_store: 將 AVPs 存到 DB
  3. avp_db_delete
  4. avp_db_query: 進行 DB 查詢,並將結果存至 AVP
  5. avp_delete: 刪除記憶體中的 AVP
  6. avp_pushto: 將 AVP 放入 SIP message 中
  7. avp_check: 用 operator(equal, greater than, and a value) 取得值,例如 avp_check("i:500", "lt/i:501");
  8. avp_copy: 複製 avp
  9. avp_printf: 格式化 AVP
  10. avp_subst: 在 AVP 內尋找並取代某個值
  11. avp_op: 對 AVPs 做 math operations
  12. is_avp_set: check if this AVP's name is set
  13. avp_print: 列印記憶體中的 AVP

從 DB 中取得 user_preference table 變成 AVPs。

範例:在 call forward 時,可取得該 user 的 user_preference 資料。

user_preference table structure:

  1. id: auto-increment field
  2. uuid: unique user id
  3. username: username
  4. domain: domain
  5. attribute: AVP name
  6. type: 0–Avp str|Val Str,1–Avp str|Val Int,2–Avp int|Val Str,3-Avp int|Val int
  7. value: AVP value
  8. last modified: 上次修改的時間

Flags

用來 trigger some processes 例如 accouting, dialog control, NAT handling。有三種 flags: message, script, branch flags。

Type Persistence Function Purpose
Message flag transaction setflag(flag_idx) 在 transaction level 啟動 some functions
Branch flag Branch setbflag(flag_idx) 在 branch level 啟動 some functions
Script flag Top-level Routing setsflag(flag_idx) 儲存其他 flags

pseudo-variables: $mf (message flags), $bf (branch flags), $sf (script flags)

module GFLAGS

只能用在 external flags,這是用在 MI commands,可在 MI interface 的 external program 中,使用 set_gflag(), is_gflag(), reset_gflag() 這些 functions。

Statements

if-else
if ( t_check_trans() ) {
    t_relay();
    exit;
} else {
    exit;
}
Switch
switch($retcode)
{
    case -1:
        log("process INVITE requests here\n");
        break;
    case 1:
        log("process REGISTER requests here\n");
        break;
    case 2:
    case 3:
        log("process SUBSCRIBE and NoTIFY requests here\n");
        break;
    default:
        log("process other requests here\n");
}
Subroutes
route[1]{
    if(is_method("INVITE")) 
    {
        return(-1);
    };
    if(is_method("REGISTER"))
        return(1);
    }
    if(is_method("SUBSCRIBE"))
        return(2);
    }
    if(is_method("NoTIFY"))
        return(3);
    }
    return(-2);
}

2014/08/18

Using Scala in Eclipse Dynamic Web Project

既然 Scala 跟 Java 是遠房親戚,接下來我們想的是,如何在 Eclipse 開發網頁專案時,可以直接套用 Scala Library,並直接撰寫 Scala Code。

Step 1: Create a Dynamic Web Project

按照既有的步驟,建立一個 Dynamic Web Project。

Step 2: Add Scala Nature

在 project 上點右鍵,會出現一個選單,選擇 Configure,然後再點選 Add Scala Nature。

我們會看到 project 自動增加了 Scala Library [2.11.2],然後我們就可以寫 Scala 測試程式了

Step 3: ScalaFilter.scala

以 Scala 撰寫一個 filter: ScalaFilter.scala class。

package test

import javax.servlet.FilterConfig
import javax.servlet.ServletResponse
import javax.servlet.FilterChain
import javax.servlet.ServletRequest
import javax.servlet.ServletException
import javax.servlet.Filter
import java.util.Date

class ScalaFilter extends Object with Filter {

  @throws(classOf[ServletException])
  def init(filterConfig: FilterConfig): Unit = {
    println("ScalaFilter: init()");
  }

  @throws(classOf[_root_.java.io.IOException])
  @throws(classOf[ServletException])
  def doFilter(request: ServletRequest, response: ServletResponse, chain: FilterChain): Unit = {
    println("ScalaFilter: doFilter()");

    response.getWriter().write("> The time now is " + new Date);
  }

  def destroy(): Unit = {
    println("ScalaFilter: destroy()");
  }

}

Step 4: web.xml

在 web.xml 裡面把 Filter 設定好

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
      version="3.0">
    <display-name>test</display-name>

    <filter>
        <filter-name>ScalaFilter</filter-name>
        <filter-class>test.ScalaFilter</filter-class>
    </filter>

    <filter-mapping>
        <filter-name>ScalaFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>
</web-app>

Step 5: Testing

直接將 project Run on Server 測試看看,結果發生了ClassNotFound 的錯誤,很明顯就是 Deploy 的時候,沒有把 Scala Library 放到 lib 目錄裡面。

解決方式很簡單,到 scala SDK 的 lib 目錄,把以下四個 jar 複製到 project 的 WebContent/WEB-INF/lib 目錄中。

scala-actors-2.11.0.jar
scala-library.jar
scala-reflect.jar
scala-swing_2.11-1.0.1.jar

再一次將 project Run on Server 測試看看,就成功了。

2014/08/11

Java 8 Default Methods

Java 8為介面(Interface)引進了一個新的特性,名為Default Method,講白一點其實就是可以在介面內寫一些已經實作的方法。也就是說,在Java 8介面不再只是開規格給類別實作而已,他現在還可以提供一些已經實作的方法給人使用。

如何宣告與使用?

Default Method的宣告方法很簡單,只要加上default關鍵字即可,需要注意的是,在介面內的方法必須宣告為public的。可以參考以下程式:

public interface MyInterfaceI {
    default public void testDefaultMethod(int i) {
        System.out.println("default method test2()");
        if(i > 3) {
            System.out.println(">3");
        } else {
            System.out.println("<=3");
        }
    }
}

要使用該Default Method時,只要透過實現該介面的類別,呼叫該Default Method即可,如下:

class MyInterfaceImpl implements MyInterfaceI {
}

public class MainClass {
    public static void main(String[] args) {
        MyInterfaceImpl impl = new MyInterfaceImpl();
        impl.testDefaultMethod(2); // output: <=3
    }
}

需要注意?

  • 繼承介面時,若父介面有Default Method,則子介面會繼承他。如底下的範例:

      public class InterfaceImpl implements InterfaceSon{
          public static void main(String[] args) {
              InterfaceSon son = new InterfaceImpl();
              son.test(); // output => InterfaceParent
          }
      }
    
      interface InterfaceParent {
          public default void test() {
              System.out.println("InterfaceParent");
          }
      }
    
      interface InterfaceSon extends InterfaceParent{
      }
    
  • Default Method是可以被覆寫(override)的,因此繼承之後你可以將之覆寫掉。如底下的範例,將繼承的介面Default Method覆寫掉,則程式在執行時就會跑覆寫的那個版本:

      public class InterfaceImpl implements InterfaceSon{
          public static void main(String[] args) {
              InterfaceSon son = new InterfaceImpl();
              son.test(); // output => InterfaceSon
          }
      }
    
      interface InterfaceParent {
          public default void test() {
              System.out.println("InterfaceParent");
          }
      }
    
      interface InterfaceSon extends InterfaceParent{
          public default void test() {
              System.out.println("InterfaceSon");
          }
      }
    
  • 繼承之後,若不想要使用原本父類別的Default Method,則可宣告成抽象方法,讓實作類別實作該方法。可以參考底下範例:

      public class InterfaceImpl implements InterfaceSon{
          public static void main(String[] args) {
              InterfaceSon son = new InterfaceImpl();
              son.test(); // output => InterfaceImpl
          }
    
          @Override
          public void test() {
              System.out.println("InterfaceImpl");
          }
      }
    
      interface InterfaceParent {
          public default void test() {
              System.out.println("InterfaceParent");
          }
      }
    
      interface InterfaceSon extends InterfaceParent{
          public void test();
      }
    
  • 如果類別要實作的介面,剛好有兩個相同名稱,相同輸入參數的Default Method,則編譯器會出錯。可以參考底下程式:

      public class InterfaceImpl implements Interface1, Interface2 {
          // 編譯器會報以下錯誤
          // Duplicate default methods named test with the 
          // parameters () and () are inherited from the types 
          // Interface2 and Interface1
      }
    
      interface Interface1 {
          public default void test() {
              System.out.println("Interface1");
          }
      }
    
      interface Interface2 {
          public default void test() {
              System.out.println("Interface2");
          }
      }
    
  • 上面情況發生時,可以用覆寫來把Default Method蓋掉,在覆寫時還可以用super關鍵字來指定要用哪個實作的Default Method,可以參考底下程式:

      public class InterfaceImpl implements Interface1, Interface2{
          public void test() {
              Interface1.super.test(); // output => Interface1
          }
      }
    
      interface Interface1 {
          public default void test() {
              System.out.println("Interface1");
          }
      }
    
      interface Interface2 {
          public default void test() {
              System.out.println("Interface2");
          }
      }
    
  • Default Method在使用時是沒有狀態的,所謂的狀態指的是在一般類別與抽象(abstract)類別裡,能在類別的欄位(field)存放資料,如存放布林值、整數、字串等等的資料,來代表此類別的狀態,而Default Method是存在於介面內的,介面是不允許存放資料欄位的,因此它唯一能操作的資料,只有透過參數傳進來的資料而已。

Static Methods

此外除了Default Method,現在在Java 8內,你也可以寫Static Method了,他的宣告方法就如同在一般類別內宣告static method一樣,如下:

public interface MyInterfaceI {
    public static void invokeStaticMethod() {
        System.out.println("It is static method!");
    }
}

使用的話如下:

public class MainClass {
    public static void main(String[] args) {
        MyInterfaceI.invokeStaticMethod(); // output: It is static method!
    }
}

Interface Default Methods in Java 8

The Java Tutorials - Default Methods

Hello Scala

因應函數式語言的演化趨勢,Scala 選擇不再繼續用拙劣的 Java 語法加入 Functional Language 的特性,而是用另一個方式強化 Java 語言,以 Scala 撰寫的程式,可以直接在 JVM 裡面運作,這代表 Scala 可以直接使用既有廣大的 Java 函式庫,wiki 裡面第一句話,就明確地定位了 Scala: Scala is an object-functional programming and scripting language for general software applications. 對於 Java Programmer 來說,可以先閱讀這一篇文章,了解 Scala:Scala vs Java:兩者間的差異與相似處

安裝 scala 開發環境,首先是需要把 JDK 跟 scala SDK 裝好,第二步是安裝 scala IDE,第三步撰寫 Hello World 程式,就等於是把 scala 環境準備好了。

Step 1: SDK

到網站 http://www.oracle.com/technetwork/java/javase/downloads/index.html 下載並安裝 JDK,目前我們還是習慣用 JDK 7。

JDK 安裝完成後,記得要自己設定環境變數 JAVA_HOME 到安裝的目錄,並增加 %JAVA_HOME%\bin 到 PATH 環境變數中。

到網站 http://www.scala-lang.org/download/ 下載並安裝 Scala,目前的版本為 2.11.2,安裝完成後,程式會自動把 scala 的 bin 目錄設定到 PATH 裡面。

Step 2: IDE

因為我們已經裝了 Eclipse,根據網站 http://scala-ide.org/download/current.html 的說明,我們使用 plugin 的方式安裝IDE。

把 2.11.2 版的 IDE update site http://download.scala-ide.org/sdk/helium/e38/scala211/stable/site 增加到 Eclipse 軟體列表中就可以進行安裝,除了 Source 不安裝之外,我點選了其他四個項目。

Step 3: Hello World

點擊 File -> New -> New Scala Project 可新增一個 Hello World scala project。

查看 project library 的地方會發現,project 把 JDK 跟 scala SDK 的 libraries 都加進來了。

新增一個 HelloWorld scala object

package test

object HelloWorld {
  def main(args: Array[String]): Unit = {
    println("Hello World")
  }
}

在 scala object 上點右鍵 -> Run As Scala Application,就可以在 console 上看到列印出來的字串 Hello World。

Step 4: Command Line

如果不借助 IDE 的協助,我們可以直接在 Command Line 環境進行編譯與執行。

先打開 windows command prompt 切換到 project 的 src 目錄,用以下指令編譯 object

scalac test\HelloWorld.scala

會在 test 目錄中看到 HelloWorld.class 與 HelloWorld$.class 兩個 class。

再用以下指令執行

scala -classpath . test.HelloWorld

2014/08/10

MQTT with RabbitMQ

RabbitMQ的底層是實作AMQP,如果想透過它來跑MQTT,必須先安裝RabbitMQ MQTT Adapter,只要在安裝完RabbitMQ之後,執行以下指令即安裝完成:
rabbitmq-plugins enable rabbitmq_mqtt
RabbitMQ的MQTT Adapter是基於RabbitMQ本身的Exchange和Queue上來實現的。預設RabbitMQ會把收到的MQTT訊息送到預設的topic exchange,也就是"amp.topic"這個exchange。當Client執行訂閱(subscribe)時,RabbitMQ會先為此Client建立一條Queue,可以參考下圖:

其中名稱mqtt-subscription-XXXXqos1為RabbitMQ幫你建立的Queue,而XXXX為你傳入的clientId。
接著他會為此條Queue和amp.topic建立起routing key,routing key就是你Subscribe時傳過來的topic name,只是他會將 "/"換成".",比如說你Subscribe時是寫"rec/mayer",則他會把routing key換成"rec.mayer"。可以參考下圖:

Client發送對某個MQTT的Topic 發佈(publish)訊息時,首先RabbitMQ會把此訊息送到amq.topic內,接著amq.topic將此訊息要送到的topic name取出來,把它當成routing key送給相對應的queue。這樣一來,訊息就能正確的發送給訂閱者。

修改MQTT預設exchange

預設exchange可以透過config檔案修改,首先先到/etc/rabbitmq/目錄底下,建立rabbitmq.config檔案,然後貼上以下設定檔,其中rabbitmq_mqtt[]裡面的exchange設定,就是預設要使用哪個exchange,不過在使用時必須先把該exchange建立起來。
[{rabbit,        [{tcp_listeners,    [5672]}]},
 {rabbitmq_mqtt, [{default_user,     <<"guest">>},
                  {default_pass,     <<"guest">>},
                  {allow_anonymous,  true},
                  {vhost,            <<"/">>},
                  {exchange,         <<"foo.topic">>},
                  {subscription_ttl, 1800000},
                  {prefetch,         10},
                  {ssl_listeners,    []},
                  %% Default MQTT with TLS port is 8883
                  %% {ssl_listeners,    [8883]}
                  {tcp_listeners,    [1883]},
                  {tcp_listen_options, [binary,
                                        {packet,    raw},
                                        {reuseaddr, true},
                                        {backlog,   128},
                                        {nodelay,   true}]}]}
].

Rabbit MQTT Adapter遺珠

使用RabbitMQ來當MQTT broker,要注意他只實現MQTT 3.1的一些特性:
  • QoS0 and QoS1 publish & consume
  • Last Will and Testament (LWT)
  • SSL
  • Session stickiness
由於AMQP 0-9-1沒有支援QoS 2的特性,因此這邊沒有實作,所以要注意的是如果你想用MQTT的QoS 2來實作服務時,你就不能使用RabbitMQ。關於QoS 2的特性,可以參考MQTT(二)Message Type and Flows - Publish Flows這篇的說明。
另外他也沒有實現Retain Message,當遇到時,RabbitMQ會將他默默的忽略掉。關於Retain Message可以參考 MQTT(四)PUBLISH Message之前我貼的文章來了解其作用。

參考

RabbitMQ-MQTT
RabbitMQ Blog-MQTTAdapter

2014/08/04

Jetty - Part 2/2

接續上一次的說明,我們已經讓 Embedded Jetty Server 能夠支援 http 與 https 了,接下來就是要整合既有的 WebContent,使用 web.xml ,支援 JSP, Servlet 與 Filter 等等。最後,我們再處理要支援多個 WebAppContext 的問題。

WebAppContext

先前在提供網頁服務時,很單純地只有用 Handler 實作,但一個網站並不會這麼單純,一定包含了 servlet, jsp, event listener, filter, html, css, js, images 等等這些東西,換句話說,以往在 Eclipse 使用 Web Project 開發時,WebContent 裡面的資料都要能支援。

原本 setHandler 的地方,我們必須做個調整,改成使用 WebAppContext,然後把 context 指定給 server 的 handler。

// server.setHandler(new HelloHandler());

WebAppContext context = new WebAppContext();
context.setDescriptor("../WebContent/WEB-INF/web.xml");
context.setResourceBase("../WebContent");
context.setContextPath("/examples");
context.setParentLoaderPriority(true);

server.setHandler(context);

在這樣的方式調整下,我們就可以用 http://localhost:8080/examples/ 瀏覽網站。

更複雜的 web application

為了完整 web application 的功能,我們還必須做些調整。

  1. 支援 servlet 與 jsp 的 jar files
    在測試的過程中,我們發現只有 jetty-all 這個 jar 還是不夠的,必須額外增加一些 jar,我們可以在 jetty-distribution-9.2.2\lib 的目錄裡面找到。

     javax.el-3.0.0.jar
     javax.servlet.jsp.jstl-1.2.2.jar
     javax.servlet.jsp-2.3.2.jar
     javax.servlet.jsp-api-2.3.1.jar
     servlet-api-3.1.jar
  2. 讓 WebApplicationContext 支援 JSP
    我們必須在 web.xml 裡面增加以下這個 servlet 處理 JSP files

     <servlet id="jsp">
         <servlet-name>jsp</servlet-name>
         <servlet-class>org.apache.jasper.servlet.JspServlet</servlet-class>
         <init-param>
             <param-name>logVerbosityLevel</param-name>
             <param-value>DEBUG</param-value>
         </init-param>
         <init-param>
             <param-name>fork</param-name>
             <param-value>false</param-value>
         </init-param>
         <init-param>
             <param-name>keepgenerated</param-name>
             <param-value>true</param-value>
         </init-param>
         <load-on-startup>0</load-on-startup>
     </servlet>
    
     <servlet-mapping>
         <servlet-name>jsp</servlet-name>
         <url-pattern>*.jsp</url-pattern>
         <url-pattern>*.jspf</url-pattern>
         <url-pattern>*.jspx</url-pattern>
         <url-pattern>*.xsp</url-pattern>
         <url-pattern>*.JSP</url-pattern>
         <url-pattern>*.JSPF</url-pattern>
         <url-pattern>*.JSPX</url-pattern>
         <url-pattern>*.XSP</url-pattern>
     </servlet-mapping>
  3. jetty-web.xml
    在 WEB-INF 目錄中,增加一個 jetty-web.xml 檔案,讓 WebAppContext 支援 Http Session。

     <?xml version="1.0"  encoding="ISO-8859-1"?>
     <!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure.dtd">
    
     <Configure class="org.eclipse.jetty.webapp.WebAppContext">
         <Get name="sessionHandler">
             <New class="org.eclipse.jetty.server.session.SessionHandler">
                 <Arg>
                     <New class="org.eclipse.jetty.server.session.HashSessionManager">
                         <Set name="storeDirectory">session</Set>
                     </New>
                 </Arg>
             </New>
         </Get>
     </Configure>
  4. 調整 filter-mapping
    測試過程中發現,如果將 filter-mapping 的 url-pattern 設定為 .jsp,瀏覽 jsp 網頁會一直無法先進入 filter 進行前置處理,因此要把 url-pattern 改成 /

     <filter-mapping>
         <filter-name>CookieLoginFilter</filter-name>
         <url-pattern>/*</url-pattern>
     </filter-mapping>
  5. ServletContextListener
    我們都是在 ServletContextListener 裡面處理 webapp 啟動時,必須要一併啟動的一些服務,例如 DB logback, spring, connection pool (dbcp) 還有一些 scheduler,很幸運的在 Jetty 都可以直接支援,不需要再修改程式。

  6. Dynamic Web Project 的路徑
    如果一開始是用 Eclipse 的 Dynamic Web Project 初始化專案,java 程式編譯後將會放在 project 的 /build/classes 目錄中。

    但如果程式中有使用到 ServletContext 的getResourceAsStream 的功能,再加上我們把 Jetty 的 WebAppContext 的 setResourceBase 指定到 WebContent 目錄,這時就會發生找不到檔案的問題。

    我們必須調整 project 設定,在 Java Build Path 中將 Default output folder 由 project/build/classes 改為 project/WebContent/WEB-INF/classes 。

就這樣修改到這邊,我們的 Embedded Jetty 已經可以支援一個 web application 了。

Multiple Contexts

通常我們會希望除了能支援一個 web context 之外,server 的 root context 也要能使用,因此我們參考 ManyContexts.java 的作法。

利用 ContextHandlerCollection 將多個 context handler 集合起來,然後再設定給 server 的 handler。

WebAppContext context = new WebAppContext();
context.setDescriptor("../WebContent/WEB-INF/web.xml");
context.setResourceBase("../WebContent");
context.setContextPath("/examples");
context.setParentLoaderPriority(true);

ContextHandler rootcontext = new ContextHandler("/");
rootcontext.setContextPath("/");
rootcontext.setHandler(new HelloHandler());

ContextHandlerCollection contexts = new ContextHandlerCollection();
contexts.setHandlers(new Handler[] { rootcontext, context });

server.setHandler(contexts);

Jetty - Part 1/2

隨著用了越來越久的 Apache Tomcat,Tomcat 似乎也跟隨著 JDK 的腳步,越來越龐大,這時候,不妨考慮試試看,把原本用在 Tomcat 的 webapp,改成使用 Jetty 來運作。

Jetty目前已經是 9.2 版,除了能像 Tomcat 一樣 stand alone 運作,再將 application 放到 deployment 的 webapps 目錄中,最重要的是能像一般執行 Java Application 一樣,將 webapp server 以 Embedded 的方式啟動。

關於 Embedded Jetty 的文章並不多,有可能是使用的人還不多,另外有個問題,網路上找到的介紹文章,也會因為 Jetty 版本的更新,舊的寫法可能就沒有用了,這對 open source project 來說是個致命傷,使用 Tomcat 基本上就比較不會有這樣的問題,網路上隨手搜尋到的資料,通常都是正確且可以使用的。

因此學習的過程,就是參考官方網頁的 Embedding Jetty Tutorial,以及使用最新版的 sample code,沒有別的方法。

取得 Jetty

我們在 Jetty 官方網頁 下載 頁面中,只能下載到完整的 Jetty (ex: jetty-distribution-9.2.2.v20140723.zip),但是因為我們並不是要 stand alone 執行 Jetty,而是要用 Embedded 的方式,因此我們需要一個更方便使用的 Jetty jar file,把所有跟 Jetty 有關的程式都集中到一個 jar 檔裡面,我們可以到 Maven jetty-all 頁面中,下載這樣的 jar 檔 (ex: jetty-all-9.2.2.v20140723.jar)。

Simplest Server

最簡單的 Server 就是把網頁服務的 Port 啟動,其他什麼事都不做。以下的程式會啟動 TCP Port 8080 作為 HTTP 的 service port。

public class SimplestServer
{
    public static void main(String[] args) throws Exception
    {
        Server server = new Server(8080);
        server.start();
        server.join();
    }
}

接下來,我們要進一步撰寫提供網頁服務的處理器 Handler,以下的 HelloHandler 很單純地就是產生一個 HTML 網頁資料,並寫上 HelloWorld

import java.io.IOException;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;

public class HelloHandler extends AbstractHandler {
    public void handle(String target, Request baseRequest,
            HttpServletRequest request, HttpServletResponse response)
            throws IOException, ServletException {
        response.setContentType("text/html;charset=utf-8");
        response.setStatus(HttpServletResponse.SC_OK);
        baseRequest.setHandled(true);
        response.getWriter().println("<h1>Hello World</h1>");
    }
}

為了讓 HelloHandler 產生作用,我們必須調整剛剛的 Server 程式,以 setHandler 將 HelloHandler 指定給 server 使用。

public class SimplestServer {
    public static void main(String[] args) throws Exception {
        Server server = new Server(8080);
        server.setHandler(new HelloHandler());

        server.start();
        server.join();
    }
}

這時候如果啟動 SimplestServer,就可以用瀏覽器連結 http://localhost:8080/ 看到網頁。

ManyConnectors

一個基本的 web application server 必須要能支援 HTTP 與 HTTPS 兩種協定,因此我們接下來參考 ManyConnectors.java ,將 HTTP 與 HTTPS 環境建立起來。

首先,我們用 JDK 提供的 keytool 產生一個新的 ssl key。

keytool -genkey -dname "cn=maxkitcn, ou=maxkitou, o=maxkit, c=TW" -keyalg RSA -alias server -keypass yourkeypassword -keystore d:\temp\maxkitkeystore -storepass yourstorepassword -validity 36500

KeyStore javadoc 裡面提到,KeyStore Type 有兩種:JKS 或是 PKCS12,我們剛用 keytool 建立的是 PKCS12,這跟 Jetty 的預設 type 是不同的,因此 ManyConnector 裡面啟用 ssl 的方式,必須要做些微的調整。

另外為了增加程式的彈性,我們把 Port 跟 keystore 的一些設定值放到獨立的 properties 檔案中。

首先是設定檔 server.properties

jetty.port=8080
jetty.ssl.port=8443
jetty.ssl.keystore=/maxkitkeystore
jetty.ssl.keystore.storepass=yourkeypassword

另外也將 maxkitkeystore 檔案放在 src 目錄中。

ServerProperties.java 用來讀取 server.properties 檔案的設定。

public class ServerProperties extends Properties {
    private static final String PROPERTIES = "/server.properties";

    private Properties properties = new Properties();

    public Map<Object, Object> getProperties() {
        InputStream stream = this.getClass().getResourceAsStream(PROPERTIES);
        try {
            properties.load(stream);
        }
        catch (IOException ex) {
            ex.printStackTrace();
        }
        return Collections.unmodifiableMap(properties);
    }
}

ManyConnectors.java 設定 http 與 https,並沿用剛剛的 HelloHandler 作為提供網頁資料的處理器。這邊要注意,新舊版 ManyConnectors 範例程式的寫法是完全不一樣的,要注意看一下找到的資料是支援 Jetty 那一個版本。

import java.security.KeyStore;
import java.util.Map;

import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.webapp.WebAppContext;

public class ManyConnectors {
    private static final Map<?, ?> properties = new ServerProperties()
            .getProperties();

    private static final String DEFAULT_PORT = String.valueOf(properties
            .get("jetty.port"));
    private static final String DEFAULT_SSL_PORT = String.valueOf(properties
            .get("jetty.ssl.port"));
    private static final String SSL_KEYSTORE_FILE = String.valueOf(properties
            .get("jetty.ssl.keystore"));
    private static final String SSL_KEYSTORE_PASS = String.valueOf(properties
            .get("jetty.ssl.keystore.storepass"));

    private static Server server;

    public void startServer() throws Exception {
        startServer(DEFAULT_PORT, DEFAULT_SSL_PORT);
    }

    public void startServer(String port, String sslPort) throws Exception {
        server = new Server();
        logger.info("Starting Server...");
        // HttpConfiguration 告訴 server 要支援 https
        HttpConfiguration http_config = new HttpConfiguration();
        http_config.setSecureScheme("https");
        http_config.setSecurePort((sslPort.isEmpty() ? Integer
                .valueOf(DEFAULT_SSL_PORT) : Integer.valueOf(sslPort)));
        http_config.setOutputBufferSize(32768);

        // HTTP connector
        ServerConnector http = new ServerConnector(server,
                new HttpConnectionFactory(http_config));
        http.setPort((port.isEmpty() ? Integer.valueOf(DEFAULT_PORT) : Integer
                .valueOf(port)));
        http.setIdleTimeout(30000);

        // SSL Context Factory for HTTPS
        SslContextFactory sslContextFactory = new SslContextFactory();
        sslContextFactory.setKeyStorePassword(SSL_KEYSTORE_PASS);

        KeyStore keyStore = KeyStore.getInstance("PKCS12");
        keyStore.load(this.getClass().getResourceAsStream(SSL_KEYSTORE_FILE),
                SSL_KEYSTORE_PASS.toCharArray());
        sslContextFactory.setKeyStore(keyStore);

        // HTTPS Configuration
        HttpConfiguration https_config = new HttpConfiguration(http_config);
        https_config.addCustomizer(new SecureRequestCustomizer());

        // HTTPS connector
        ServerConnector https = new ServerConnector(server,
                new SslConnectionFactory(sslContextFactory, "http/1.1"),
                new HttpConnectionFactory(https_config));
        https.setPort((sslPort.isEmpty() ? Integer.valueOf(DEFAULT_SSL_PORT)
                : Integer.valueOf(sslPort)));
        https.setIdleTimeout(500000);

        Connector[] connectors = new Connector[] { http, https };

        server.setConnectors(connectors);

        server.setHandler(new HelloHandler());

        server.start();
        server.join();

        logger.info("Started Server");
    }

    public void stopServer() throws Exception {
        logger.info("Stopping Server...");
        server.stop();
        logger.info("Server Stopped with stopServer() method.");
    }
}

最後寫一個啟動的 Main 程式 SimplestServer2.java

public class SimplestServer2 {
    public static void main(String[] args) throws Exception {
        ManyConnectors server = new ManyConnectors();
        server.startServer();
    }
}

啟動 server 後,就可以用 http://localhost:8080/ 或是 https://localhost:8443/ 看到 HelloWorld 網頁。