2014/04/01

erlang - interfacing technology

想要把 erlang 跟 C 或 Python 程式連接起來,或是想要在 erlang 裡面執行 shell script,作法是在「獨立的作業系統process」中,執行外部程式,並利用「byte oriented」的communication channel 和該行程通訊。

erlang 利用 port 控制通訊,「負責建立一個 port」的行程,稱為該 port 的 connected process,所有送往外部的訊息都必須貼上connected process的 PID,而外部程式的所有訊息都會被送到 connected process。

port 的作用就像是一個 erlang process,可送訊息給它,可以註冊,如果外部程式 crash,離開訊號就會送到 connected process,如果 connected process 死亡,外部程式就會被 kill。

Port

建立 port

Port = open_port(PortName, PortSettings)

對 Port 發送訊息

Port ! {PidC, {command, Data}}

將 connected process 由 PidC 改為 Pid1

Port ! {PidC, {connect, Pid1}}

關閉 port

Port ! {PidC, close}

connected process 可以收到外部程式送來的訊息,類似這樣:

receive
    {Port, {data, Data}} ->
        ...

open_port

open_port可接受很多Opt設定值,以下列出常見的設定:

@spec open_port(PortName, [Opt]) -> Port

PortName 可以是下列其中之一

  1. {spawn, Command}
    啟動一個外部程式,Command 是外部程式的名稱,除非是 linked-in driver,否則它會是在 erlang 工作空間外部的地方執行
  2. {fd, In, Out}

Opt 可以是下列其中之一

  1. {packet, N}
  2. stream
  3. {line, Max}
  4. {cd, Dir}
  5. {env, Env}

連接外部的 C 程式

如果要撰寫 erlang 程式,呼叫以下的 C 函式

// example1.c
int twice(int x){
  return 2*x;
}

int sum(int x, int y){
  return x+y;
}

在 erlang 中,希望將 example1定義為模組,且用以下方式呼叫,實作的細節隱藏在 example1 中

X1 = example1:twice(23),
Y1 = example1:sum(45, 32),

定義 port 與 外部程式 之間的協定

我們要先定義一個簡單的協定,並分別以 erlang, C 實作。

  1. 所有封包一開始都是 2 bytes 的 Len,後面接著 Len 個 bytes 資料
  2. 呼叫 twice(N),在協定中轉換為 [1,N],1代表 twice,N為參數
  3. 呼叫 sum(N,M), 在協定中轉換為 [2,N,M]
  4. 回傳值的長度為 1 byte

範例

  1. port 送出 [0,3,2,45,32] 給外部程式,0,3 表示封包長度為 3,2 表示呼叫 sum,45 與 32是 sum 的參數
  2. 外部程式從 stdin 讀取這五個位元,呼叫 sum 函數,寫出位元組序列「0,1,77」到 stdout,0,1 表示封包長度為 1,資料內容為 77

C 語言外部程式,實作協定

  1. example1.c: 包含了 twice 與 sum 兩個函式
  2. example1_driver.c: 會終結 byte 串流協定,且呼叫 example1.c 內的函式
  3. erl_comm.c: 具有讀寫記憶體緩衝區的函式

example1_drive.c 執行一個無窮迴圈,持續從 stdin 讀取資料,並把結果寫入 stdout

// example1_drive.c
#include <stdio.h>
typedef unsigned char byte;

int read_cmd(byte *buff);
int write_cmd(byte *buff, int len);

int main() {
  int fn, arg1, arg2, result;
  byte buff[100];

  while (read_cmd(buff) > 0) {
    fn = buff[0];

    if (fn == 1) {
      arg1 = buff[1];
      result = twice(arg1);
    } else if (fn == 2) {
      arg1 = buff[1];
      arg2 = buff[2];
      /* debug -- you can print to stderr to debug
     fprintf(stderr,"calling sum %i %i\n",arg1,arg2); */
      result = sum(arg1, arg2);
    }

    buff[0] = result;
    write_cmd(buff, 1);
  }
}

erl_comm.c,負責在 stdin/stdout 讀寫 2 bytes 開頭的封包。

/* erl_comm.c */
#include <unistd.h>

typedef unsigned char byte;

int read_cmd(byte *buf);
int write_cmd(byte *buf, int len);
int read_exact(byte *buf, int len);
int write_exact(byte *buf, int len);

int read_cmd(byte *buf)
{
  int len;

  if (read_exact(buf, 2) != 2)
    return(-1);
  len = (buf[0] << 8) | buf[1];
  return read_exact(buf, len);
}

int write_cmd(byte *buf, int len)
{
  byte li;

  li = (len >> 8) & 0xff;
  write_exact(&li, 1);

  li = len & 0xff;
  write_exact(&li, 1);

  return write_exact(buf, len);
}

int read_exact(byte *buf, int len)
{
  int i, got=0;

  do {
    if ((i = read(0, buf+got, len-got)) <= 0)
      return(i);
    got += i;
  } while (got<len);

  return(len);
}

int write_exact(byte *buf, int len)
{
  int i, wrote = 0;

  do {
    if ((i = write(1, buf+wrote, len-wrote)) <= 0)
      return (i);
    wrote += i;
  } while (wrote<len);

  return (len);
}

erlang 程式

-module(example1).
-export([start/0, stop/0]).
-export([twice/1, sum/2]).

start() ->
    spawn(fun() ->
          register(example1, self()),
          process_flag(trap_exit, true),
          Port = open_port({spawn, "./example1"}, [{packet, 2}]),
          loop(Port)
      end).

stop() ->
    % 發送訊息,讓 example1 停止,關閉 port 與 外部程式
    example1 ! stop.

twice(X) -> call_port({twice, X}).
sum(X,Y) -> call_port({sum, X, Y}).

call_port(Msg) ->
    % 以訊息方式發送 API request 給 example1
    example1 ! {call, self(), Msg},
    receive
    % 等待接收結果
    {example1, Result} ->
        Result
    end.

loop(Port) ->
    receive
    {call, Caller, Msg} ->
        % 對 Port 發送訊息, 資料內容是將呼叫的參數,轉換為 list
        % self() 為 connected process 的 PID
        Port ! {self(), {command, encode(Msg)}}, 
        receive
        % 收到外部程式送來的訊息
        {Port, {data, Data}} ->
            % 將結果解碼後,發送給 Caller
            Caller ! {example1, decode(Data)}
        end,
        loop(Port);
    stop ->
        % 關閉 port
        Port ! {self(), close},
        receive
        % 收到外部程式 送來關閉的訊息
        {Port, closed} ->
            % 送出 exit signal
            exit(normal)
        end;
    % 收到 exit signal
    {'EXIT', Port, Reason} ->
        exit({port_terminated,Reason})
    end.

encode({twice, X}) -> [1, X];  
encode({sum, X, Y}) -> [2, X, Y]. 

decode([Int]) -> Int.

編譯與測試

編譯
gcc -o example1 example1.c erl_comm.c example1_driver.c
erlc -W *.erl

測試

1> example1:start().
<0.34.0>
2> example1:sum(45,32).
77
3> example1:twice(10).
20
4> example1:twice(14).
28

注意

  1. 此範例並沒有統一 erlang 與 c 對整數的定義。直接假設兩個都是用單一個byte來當作整數,並忽略精確度、正負號的問題。
  2. 必須要先啟動負責界面的driver程式,也就是要先執行 example1:start(),然後才能執行此程式。

附註

erlang 跟外部程式之間傳遞資料,其資料內容的結構必須由 programmer 自行處理,這跟 socket programming 一樣, socket 在兩個程式之間提供 byte streaming 的傳輸,至於建構在 socket 上面的 app 要如何使用,就要由 app 自行決定。

erlang 有幾個函式庫可簡化界面銜接的問題。

  1. http://www.erlang.org/doc/pdf/erl_interface.pdf
    ei 是一組 C 函式與巨集,可編解碼 erlang 外部格式。在 erlang 端,一個 erlang 程式使用 term_to_binary 將 erlang terms 序列化,在 C 語言端, ei 的函式可用來解碼此 binary 資料。相反地,ei 可用來建構二元資料,而 erlang 端就以 binary_to_term 將 binary 資料解碼。
  2. http://www.erlang.org/doc/pdf/ic.pdf
    erlang IDL 編譯器 ic,這是 erlang 對 OMG IDL 編譯器的實作。
  3. http://www.erlang.org/doc/pdf/jinterface.pdf
    Jinterface 是處理 java 跟 erlang 之間的介面,它可以將 erlang 型別完整地對應到 java 物件,為 erlang terms 編碼解碼,連結到 erlang process等等

參考

Erlang and OTP in Action
Programming Erlang: Software for a Concurrent World

沒有留言:

張貼留言