2018/11/26

如何使用 python logging

在 python 要列印除錯資訊,最簡單的方式是使用 print,但如果要區分 log 的等級,或是將不同的 log 資訊放到不同的檔案,這時就需要使用 logging module,但 loggin module 的設定會比較複雜一些,以下紀錄使用 logging module 的方式。

在 project 最常被呼叫及使用的 util.py 裡面,初始化 logging module,如果只需要簡單的 logging 設定,可直接在 util 裡面初始化 logging,複雜的設定可以用設定檔的方式處理。

直接在程式裡面初始化 logging 的範例

使用 TimedRotatingFileHandler,搭配 when='midnight',在每天午夜更換log 檔案,並將舊的 log 檔加上日期 postfix,backupCount 是控制保留幾個舊的 log file。formatter 是設定每一行 log 的 pattern。最後將 file handler 以及 console handler 新增到 rootLogger 裡面。

rootLogger = logging.getLogger()
# 用 rootLogger 的 handler 數量判斷是否已經有初始化 logging print( "len(logger.handlers)="+str(len(logger.handlers)) )

if len(rootLogger.handlers) == 0:
    from logging.handlers import TimedRotatingFileHandler
    logger.setLevel(logging.DEBUG)
    
    log_file_path = os.path.join('.', logs_directory+'/'+'project.log').replace('\\', '/')
    fh = TimedRotatingFileHandler(log_file_path,when='midnight',interval=1,backupCount=30)
    fh.suffix = "%Y-%m-%d"

    datefmt = '%Y-%m-%d %H:%M:%S'
    # datefmt = '%Y-%m-%d'
    # format_str = '%(asctime)s %(levelname)s %(message)s '
    format_str = '%(asctime)s %(levelname)s %(module)s.%(funcName)s %(lineno)d: %(message)s'
    # formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
    formatter = logging.Formatter(format_str, datefmt)
    fh.setFormatter(formatter)
    logger.addHandler(fh)

    # 定義 handler 輸出 sys.stderr
    console = logging.StreamHandler()
    console.setLevel(logging.DEBUG)
    console.setFormatter(formatter)

    logger.addHandler(console)

使用時只需要 import logging 預設就會取得 root logger

import logging

logging.info("test")

logging 設定檔

如果需要複雜一些的 logging,例如某個 module 的 log 放到某些特別的檔案中,區分 module 讓不同 module 的 log 不會全部混在一個 log file 裡面。可利用設定檔的方式處理。

logging.cfg 設定檔,定義了四個 logger,root logger 有兩個 handler,module logger 有兩個 handler,其中 console handler 是一樣的,不同的是對應到不同 log 檔案的 file handler,至於 log 的 pattern 樣式跟上面的範例設定一樣。

[loggers]
keys=root,consoleLog,projectFileHandler,moduleLog

[formatters]
keys=consoleFormatter,fileFormatter

[handlers]
keys=consoleHandler,projectFileHandler,moduleFileHandler

[logger_root]
level=DEBUG
handlers=projectFileHandler,consoleHandler

[logger_moduleLog]
level=DEBUG
qualname=moduleLog
handlers=moduleFileHandler,consoleHandler
propagate=0

[logger_consoleLog]
level=NOTSET
handlers=consoleHandler
qualname=consoleLog
propagate=0

[formatter_consoleFormatter]
format=%(asctime)s %(levelname)s {%(process)d,%(processName)s} {%(filename)s} [%(funcName)s] %(lineno)d - %(message)s
datefmt=

[formatter_fileFormatter]
format=%(asctime)s %(levelname)s {%(process)d,%(processName)s} {%(filename)s} {%(module)s} [%(funcName)s] %(lineno)d - %(message)s
datefmt=


[handler_consoleHandler]
class=StreamHandler
level=NOTSET
formatter=consoleFormatter
args=(sys.stdout,)


[handler_projectFileHandler]
class=handlers.TimedRotatingFileHandler
level=NOTSET
formatter=fileFormatter
args=('%(projectlogpath)s','midnight',1,30)

[handler_moduleFileHandler]
class=handlers.TimedRotatingFileHandler
level=NOTSET
formatter=fileFormatter
args=('%(modulelogpath)s','midnight',1,30)

在 util.py 初始化 logging

# logging
from logging import config
import logging

rootLogger = logging.getLogger()
# print( "len(logger.handlers)="+str(len(logger.handlers)) )

if len(rootLogger.handlers) == 0:
    logging_cfg_file_path = os.path.join('.', 'conf/logging.cfg')
    # print("logging_cfg_file_path="+logging_cfg_file_path)

    # create logs directory
    logs_directory = "logs"
    if not os.path.exists(logs_directory):
        try:
            os.makedirs(logs_directory)
        except OSError:
            if not os.path.isdir(logs_directory):
                raise

    project_log_file = 'project.log'
    module_log_file = 'module.log'

    project_log_path = os.path.join('.', logs_directory+'/'+ project_log_file).replace('\\', '/')
    module_log_path = os.path.join('.', logs_directory+'/'+ module_log_file).replace('\\', '/')

    logging.config.fileConfig(logging_cfg_file_path,
                            defaults={'projectlogpath': project_log_path,
                                    'modulelogpath': module_log_path},
                            disable_existing_loggers=False)

在使用時,如果直接用 logging,會對應使用到 root logger

import logging

logging.info("test")

但如果是透過 logging.getLogger(name="moduleLog") 可取得 moduleLog 的 logger,這時候的 log 會寫入到 module 的 log file

import logging

logger = logging.getLogger(name="moduleLog")
logger.info("test")

References

[Python] logging 教學

[Python] logging 教學

2018/11/19

如何使用 SQLAlchemy in python

SQLAlchemy是Python的一款開源軟體,提供 SQL 工具包及物件關係對映(ORM)的功能,以 MIT 授權。SQLAlchemy 的設計概念比較接近 Java 的 Hibernate。

另一個常用的 ORM 套件為 Django ORM,他的設計概念跟 SQLAlchemy 不同,是使用 Active Record 的方式。

安裝套件

因我們是以 MariaDB 測試,除了要安裝 SQLAlchemy,還要安裝 mysql 的 driver,我們是用 python 的 driver: PyMySQL 進行測試

使用 python 3 的版本,是用 pip3 的指令,如果是 python 2 是用 pip。

sudo pip3 install SQLAlchemy
sudo pip3 install PyMySQL

DBEngine.py

以 Table: status 為例,在 DBEngine 中,以 create_engine 產生 SQLAlchemy 最基本的 engine,再由 engine 產生 session_maker 備用,待會要在 DAO 裡面使用。

後面的部分,用 engine.dialect.has_table 判斷 table 是否存在,如果不存在,就建立該 table

另外注意 updatedate 是在每次更新 record 時,要更新時間,createdate 則是在記錄產生 record 的時間。

from sqlalchemy import *
from sqlalchemy.orm import sessionmaker

db_user = 'root'
db_password = 'password'
db_host = 'localhost'
db_port = 3306
db_name = 'testdb'

engine = create_engine('mysql+pymysql://'+db_user+':'+db_password+'@'+db_host+':'+str(db_port)+'/'+db_name, encoding="utf8", echo=False, pool_pre_ping=True, pool_recycle=3600)
# echo: 是否顯示SQL的指令與相關訊息

DBSession = sessionmaker( bind=engine )

if not engine.dialect.has_table(engine, "status"):
    metadata = MetaData(engine)

    radiostatus_table = Table('status', metadata,
                              Column('rseq', Integer, primary_key=True, autoincrement=True),
                              Column('statuscode', Integer, nullable=False),
                              Column('updatedate', TIMESTAMP, default=func.now(), onupdate=func.now()),
                              Column('createdate', TIMESTAMP, default=func.now())
                              )

    metadata.create_all(engine)

Data Value Object

Base = declarative_base() 建立 Status 物件,作為 ORM 的物件定義,在該物件中,分別定義剛剛產生的 table: status的欄位對應。

from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Status(Base):
    __tablename__='status'

    rseq = Column(Integer, primary_key=True, autoincrement=True)
    statuscode = Column(Integer, nullable=False)

    updatedate = Column(TIMESTAMP, default=func.now(), onupdate=func.now)
    createdate = Column(TIMESTAMP, default=func.now())

    def __init__(self):
        pass

    def __init__(self, rseq, statuscode, updatedate, createdate):
        self.rseq = rseq
        self.statuscode = statuscode
        self.updatedate = updatedate
        self.createdate = createdate

    @classmethod
    def to_string(cls, vo):
        result = ''

        result += "(rseq, statuscode, updatedate, createdate)=({rseq}, {statuscode}, {updatedate}, {createdate})\n".format(rseq=str(vo.rseq), statuscode=str(vo.statuscode), updatedate=str(vo.updatedate), createdate=str(vo.createdate))

        return result

Data Access Object

最簡單的是 create data 的 function,首先透過 DBSession() 產生 database session,然後在該 function 中,將要處理的工作做完,完成時要呼叫 commit,發生 error 時要呼叫 rollback,在結束前,以 close 回收 db session。

import logging
from DBEngine import *
from sqlalchemy import *
from sqlalchemy.orm import *

class StatusDao():

    def __init__(self):
        pass
    
    def create(self, vo):
        session = DBSession()
        try:
            logging.debug("vo="+str(vo))
            session.add(vo)
            session.commit()
        except Exception as e:
            logging.error(str(e))
            session.rollback()
            raise
        finally:
            session.close()

這是 create_or_update 的 function,如果檢查該 rseq 的資料不存在,再 create record ,否則就以 update 方式更新該 record

    def create_or_update(self, vo):
        session = DBSession()
        try:
            oldvo = session.query(Status).filter_by(rseq=vo.rseq).first()

            if oldvo:
                oldvo.statuscode = vo.statuscode
                oldvo.updatedate = datetime.datetime.now()

                logging.debug("updating "+str(RadioStatus.to_string(oldvo)))
            else:
                logging.debug("creating "+str(RadioStatus.to_string(vo)) )
                session.add(vo)

            session.commit()

            newvo = session.query(Status).filter_by(rseq=vo.rseq).first()

            return newvo
        except Exception as e:
            logging.error(str(e))
            raise
        finally:
            session.close()

這是將 status 裡面所有資料都刪除的 function

    def get_all(self):
        session = DBSession()
        try:
            radiostatus_list = session.query(Status).all()
            return radiostatus_list
        except Exception as e:
            logging.error(str(e))
            raise
        finally:
            session.close()

取得某個 record 的資料

    def get_by_rseq(self, rseq):
        session = DBSession()
        try:
            vo = session.query(Status).filter_by(rseq=rseq).first()
            return vo
        except Exception as e:
            logging.error(str(e))
            raise
        finally:
            session.close()

取得所有 record 的資料

    def get_all(self):
        session = DBSession()
        try:
            status_list = session.query(Status).all()
            return status_list
        except Exception as e:
            logging.error(str(e))
            raise
        finally:
            session.close()

使用 DAO

產生 dao 物件後,再直接呼叫 function 即可

dao = StatusDao()

status = Status(None, 0, None, None)
vo = dao.create_or_update(status)

dao.delete_all()

這裡的 DAO 使用的 database session 作用範圍,並沒有超過 function 的呼叫範圍,因此這邊的 DAO 比較像是 business method 的功能,可以將一次 database task 要處理的所有工作,放到同一個 DAO function 裡面,在該 function 結束時,再 close db session,這樣的做法,對於簡單的 project 來說,比較容易管理 DAO 及 db session,也比較不會發生忘記 close db session 的問題。

References

SQLAlchemy wiki

Flask零基础到项目实战(四)SQLAlchemy数据库(二)

Column Insert/Update Defaults

Python SQLAlchemy ORM - 1

Connection Pooling

給Django用戶的SQLAlchemy介紹

2018/11/12

Fragile Base Class

這是物件導向程式語言中,類別繼承所造成的問題,當有類別被其他類別繼承時,常會遇到一些 method 被子類別覆寫 override 的狀況,但覆寫 method 後,卻可能會因為不了解父類別實作的細節,導致程式出錯。

以下是實際的例子

Super 提供兩個 method,但 inc1 會在method 裡面呼叫 inc2,Sub 繼承了 Super,但並不知道 inc1 的實作內容,在 Sub 裡面,覆寫了 inc2,將 inc2 改成呼叫 inc1,因此在使用 Sub 的 inc2 時,會一直重複呼叫函數,最後導致發生 java.lang.StackOverflowError

class Super {

  private int counter = 0;

  void inc1() {
    inc2();
  }

  void inc2() {
    counter++;
  }

}

class Sub extends Super {

  @Override
  void inc2() {
    inc1();
  }

  public static void main(String[] args) {
    Sub sub = new Sub();
    sub.inc2();
  }
}
$ java Sub
Exception in thread "main" java.lang.StackOverflowError
    at Super.inc1(Super.java:6)
    at Sub.inc2(Super.java:21)

這跟上面的例子類似,一樣是會發生重複呼叫函數的問題,最後導致發生 java.lang.StackOverflowError

class Base{
    protected int x;
    protected void m(){
        x++;
    }

    protected void n(){
        x++;      // <- defect
        m();
    }

}

class SubBase extends Base{
    protected void m(){
        n();
    }
    public static void main(String[] args) {
        SubBase sub = new SubBase();
        sub.m();
    }
}
$ java SubBase
Exception in thread "main" java.lang.StackOverflowError
    at Base.n(Base.java:9)
    at SubBase.m(Base.java:16)
    at Base.n(Base.java:9)

Java 提供一種解決方式,就是將無法被覆寫的 method 加上 final。

class Super {

  private int counter = 0;

  void inc1() {
    inc2();
  }

  final void inc2() {
    counter++;
  }
}

class Sub extends Super {

  @Override
  void inc2() {
    inc1();
  }
  public static void main(String[] args) {
    Sub sub = new Sub();
    sub.inc2();
  }
}

在編譯時,就會因為 final 的關係,造成編譯錯誤。

$ javac Super.java
Super.java:20: error: inc2() in Sub cannot override inc2() in Super
  void inc2() {
       ^
  overridden method is final
1 error

Kotlin 提供更積極的解決方案,所有 method 在不加上任何 modifier 的狀況下,預設都是有 final 特性的,如果可以讓子類別覆寫的 method,必須要加上 open 這個 modifier。

這樣的調整,可避免 method 任意被覆寫的問題,但相對的,programmer 要承擔更多責任,判斷什麼時候該加上 open,這有時候會造成一些困擾,就是不知道什麼時候要加上 open,就變成不寫 open。

open class Super {
    open fun v() {}
    fun nv() {}
}
class Sub() : Super() {
    override fun v() {}
}

最後只能說,沒有兩全其美的解決方案,就看程式語言的設計者,認定哪一種想法比較重要。

References

Fragile base class wiki

What is the fragile base class problem?

2018/11/05

Phoenix_9_OTP

Managing State with Processes

Functional programs are stateless,改用 Process 保存 state

新增 /rumbl/ib/rumbl/counter.ex

defmodule Rumbl.Counter do

  def inc(pid), do: send(pid, :inc)

  def dec(pid), do: send(pid, :dec)

  def val(pid, timeout \\ 5000) do 
    ref = make_ref()
    send(pid, {:val, self(), ref})
    receive do
      {^ref, val} -> val
    after timeout -> exit(:timeout)
    end
  end

  def start_link(initial_val) do 
    {:ok, spawn_link(fn -> listen(initial_val) end)}
  end

  defp listen(val) do 
    receive do
      :inc -> listen(val + 1)
      :dec -> listen(val - 1)
      {:val, sender, ref} ->
        send sender, {ref, val}
        listen(val)
    end
  end
end

這是一個獨立的 counter service,Counter API 有三個

  1. :inc
  2. :dec
  3. :val

:inc :dec 是非同步的呼叫

:val 不同,發送 message 後,會用 receive 等待回應。

make_ref() 是一個 global unique reference,可在 global (cluster) 環境中運作。

^ref 表示我們是以 pattern matching 的方式,判斷是不是回傳了正確的 process reference

OTP 需要一個 startlink function,並以 initialval 為 counter 初始的 state

程式中沒有看到任何 global variable 儲存 state,而是呼叫 listen,listen 會以 receive 去 block 並等待 message,而 val 就是放在這個 function 的參數上。process state 是用 recursive function 的方式不斷重複發送給下一個 listen,這是 tail recursive。


測試 Counter

$ iex -S mix

iex(1)> alias Rumbl.Counter
Rumbl.Counter
iex(2)> {:ok, counter} = Counter.start_link(0)
{:ok, #PID<0.270.0>}
iex(3)> Counter.inc(counter)
:inc
iex(4)> Counter.inc(counter)
:inc

iex(5)> Counter.val(counter)
2
iex(6)> Counter.dec(counter)
:dec
iex(7)> Counter.val(counter)
1

Building GenServer for OTP

更新 /lib/rumbl/counter.ex

defmodule Rumbl.Counter do
  use GenServer

  def inc(pid), do: GenServer.cast(pid, :inc)

  def dec(pid), do: GenServer.cast(pid, :dec)

  def val(pid) do
    GenServer.call(pid, :val)
  end

  def start_link(initial_val) do
    GenServer.start_link(__MODULE__, initial_val)
  end

  def init(initial_val) do
    {:ok, initial_val}
  end

  def handle_cast(:inc, val) do
    {:noreply, val + 1}
  end

  def handle_cast(:dec, val) do
    {:noreply, val - 1}
  end

  def handle_call(:val, _from, val) do
    {:reply, val, val}
  end
end

GenServer.cast 是非同步呼叫,server 以 handle_cast 處理,最後會 return {:noreply, val + 1} ,因為呼叫者不需要這個 reply message

GenServer.call 是同步呼叫,server 以 handle_call 處理

測試

$ iex -S mix
iex(1)> alias Rumbl.Counter
Rumbl.Counter
iex(2)> {:ok, counter} = Counter.start_link(0)
{:ok, #PID<0.269.0>}
iex(3)> Counter.inc(counter)
:ok
iex(4)> Counter.val(counter)
1
iex(5)> Counter.dec(counter)
:ok
iex(6)> Counter.val(counter)
0

Adding Failover

利用 Supervisor 監控 counter

Phoenix 並沒有很多處理 fail exception 的 code,而是以 error reporting 的方式處理,同時加上自動 restart service。

修改 /lib/rumbl.ex

defmodule Rumbl do
  use Application

  # See http://elixir-lang.org/docs/stable/elixir/Application.html
  # for more information on OTP Applications
  def start(_type, _args) do
    import Supervisor.Spec, warn: false

    children = [
      supervisor(Rumbl.Endpoint, []),
      supervisor(Rumbl.Repo, []),
      worker(Rumbl.Counter, [5]), # new counter worker
    ]

    opts = [strategy: :one_for_one, name: Rumbl.Supervisor]
    Supervisor.start_link(children, opts)
  end

  # Tell Phoenix to update the endpoint configuration
  # whenever the application is updated.
  def config_change(changed, _new, removed) do
    Rumbl.Endpoint.config_change(changed, removed)
    :ok
  end
end

child spec 定義 Elixir application 要啟動的 children,將 counter 這個 worker 加入 children list。

opts 是 supervision policy,這裡是使用 :oneforone

:oneforall 是 kill and restart all child processes


在 /lib/rumbl/counter.ex 加上 :tick 及處理 :tick 的 handle_info,每 1000ms 就倒數一次

  def init(initial_val) do
    Process.send_after(self, :tick, 1000)
    {:ok, initial_val}
  end

  def handle_info(:tick, val) do
    IO.puts "tick #{val}"
    Process.send_after(self, :tick, 1000)
    {:noreply, val - 1}
  end

再加上一點檢查,只倒數到 0 ,就會 raise exception,OTP process 會 crash

def init(initial_val) do
    Process.send_after(self, :tick, 1000)
    {:ok, initial_val}
  end

  def handle_info(:tick, val) when val <= 0, do: raise "boom!"
  def handle_info(:tick, val) do
    IO.puts "tick #{val}"
    Process.send_after(self, :tick, 1000)
    {:noreply, val - 1}
  end

但可發現它會自動重新啟動

$ iex -S mix
iex(1)> tick 5
tick 4
tick 3
tick 2
tick 1
[error] GenServer #PID<0.365.0> terminating
** (RuntimeError) boom!
    (rumbl) lib/rumbl/counter.ex:21: Rumbl.Counter.handle_info/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: :tick
State: 0
tick 5
tick 4
tick 3
tick 2
tick 1

Restart Strategies

預設 child processes 的 restart strategy 是 :permanent,也可以用 restart 指定

worker(Rumbl.Counter, [5], restart: :permanent),
  1. :permanent

    child is always restarted (default)

  2. :temporary

    child is never restarted

  3. :transient

    只在異常終止時 restart,也就是 :normal, :shutdown, {:shutdown, term} 以外的 exit reason

另外還有 maxrestarts 及 maxseconds 參數,在 maxsecodns 時間內可以 restart maxrestarts 次

預設是 3 restarts in 5 seconds

Supervision Strategies

  1. :oneforone

    a child terminates -> supervisor restars only that process

  2. :oneforall

    a child tetminates -> supervisor terminates all children and restarts them

  3. :restforone

    a child terminates -> supervisor terminates all child processes defiend after the one that dies,並 restart all terminated processes

  4. :simpleonefor_one

    類似 :oneforone,用在 supervisor 需要動態 supervise processes 的情況,例如 web server 需要 supervise web requests,通常有 10 ~ 100,000 個 concurrent running processes

把 strategy 換成 :oneforall 測試

def start(_type, _args) do
    import Supervisor.Spec, warn: false

    children = [
      supervisor(Rumbl.Endpoint, []),
      supervisor(Rumbl.Repo, []),
      worker(Rumbl.Counter, [5]), # new counter worker
    ]

    opts = [strategy: :one_for_all, name: Rumbl.Supervisor]
    Supervisor.start_link(children, opts)
  end

啟動 Phoenix server 會發現 Cowboy 也 restart

tick 2
tick 1
[error] GenServer #PID<0.348.0> terminating
** (RuntimeError) boom!
    (rumbl) lib/rumbl/counter.ex:21: Rumbl.Counter.handle_info/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: :tick
State: 0
[info] Running Rumbl.Endpoint with Cowboy using http://localhost:4000
tick 5

現在把 worker 拿掉,strategy 改回 :oneforone

children = [
    supervisor(Rumbl.Endpoint, []),
    supervisor(Rumbl.Repo, [])
]

opts = [strategy: :one_for_one, name: Rumbl.Supervisor]

Using Agents

agent 類似 GenServer,但只有 5 個 main functions

start_link 啟動 agent, stop 停止, update 更新 agent 狀態

$ iex -S mix

iex(1)> import Agent
Agent
iex(2)> {:ok, agent} = start_link fn -> 5 end
{:ok, #PID<0.258.0>}
iex(3)> update agent, &(&1 + 1)
:ok
iex(4)> get agent, &(&1)
6
iex(5)> stop agent
:ok

加上 :name option

iex(7)> {:ok, agent} = start_link fn -> 5 end, name: MyAgent
{:ok, #PID<0.265.0>}
iex(8)> update MyAgent, &(&1 + 1)
:ok
iex(9)> get MyAgent, &(&1)
6
iex(10)> stop MyAgent
:ok

重複名稱會發生 error

iex(11)> {:ok, agent} = start_link fn -> 5 end, name: MyAgent
{:ok, #PID<0.271.0>}
iex(12)> {:ok, agent} = start_link fn -> 5 end, name: MyAgent
** (MatchError) no match of right hand side value: {:error, {:already_started, #PID<0.271.0>}}

Phoenix.Channel 就是用 Agent 實作

Design an Information System with OTP

新增 /lib/rumbl/infosyssupervisor.ex


defmodule Rumbl.InfoSys.Supervisor do
  use Supervisor

  def start_link() do
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_opts) do
    children = [
      worker(Rumbl.InfoSys, [], restart: :temporary)
    ]

    supervise children, strategy: :simple_one_for_one
  end
end

修改 /lib/rumbl.ex

    children = [
      supervisor(Rumbl.Endpoint, []),
      supervisor(Rumbl.InfoSys.Supervisor, []), # new supervisor
      supervisor(Rumbl.Repo, []),
#      worker(Rumbl.Counter, [5]), # new counter worker
    ]

Building a start_link Proxy

啟動時動態決定要幾個 service,要製作一個 worker,可啟動多個 backends

proxy function 是 client, server 之間的 lightweight function interface

新增 /lib/rumbl/info_sys.ex


defmodule Rumbl.InfoSys do
  @backends [Rumbl.InfoSys.Wolfram]

  defmodule Result do
    defstruct score: 0, text: nil, url: nil, backend: nil
  end

  def start_link(backend, query, query_ref, owner, limit) do 
    backend.start_link(query, query_ref, owner, limit)
  end

  def compute(query, opts \\ []) do 
    limit = opts[:limit] || 10
    backends = opts[:backends] || @backends

    backends
    |> Enum.map(&spawn_query(&1, query, limit))
  end

  defp spawn_query(backend, query, limit) do 
    query_ref = make_ref()
    opts = [backend, query, query_ref, self(), limit]
    {:ok, pid} = Supervisor.start_child(Rumbl.InfoSys.Supervisor, opts)
    {pid, query_ref}
  end
end

InfoSys 跟一般 GenServer 有一點不同,裡面存放到 results 到 module attribute -> 所有支援的 backends 為 list

Result struct 會儲存每個 search result 的結果,還有 score 及 relevnace, text to describe the result, url

startlink 就是 proxy,會再呼叫其他 backend 的 startlink

compute 會 maps over all backends,呼叫每個 backend 的 spawn_query

Building the Wolfram into System

修改 /rumbl/mix.exs

{:sweet_xml, "~> 0.5.0"},
mix deps.get

wolfram 申請帳號,及 APP ID

APP NAME: wolfram
APPID: LP93J3-XXXXXXXXXX

新增設定 /config/dev.secret.exs

use Mix.Config
config :rumbl, :wolfram, app_id: "LP93J3-XXXXXXXXXX"

記得將 /config/dev.secret.exs 放到 .gitignore

修改 /config/dev.exs,最後面加上

import_config "dev.secret.exs"

新增 /lib/rumbl/info_sys/wolfram.ex

defmodule Rumbl.InfoSys.Wolfram do
  import SweetXml
  alias Rumbl.InfoSys.Result

  def start_link(query, query_ref, owner, limit) do 
    Task.start_link(__MODULE__, :fetch, [query, query_ref, owner, limit])
  end

  def fetch(query_str, query_ref, owner, _limit) do 
    query_str
    |> fetch_xml()
    |> xpath(~x"/queryresult/pod[contains(@title, 'Result') or
                                 contains(@title, 'Definitions')]
                            /subpod/plaintext/text()")
    |> send_results(query_ref, owner)
  end

  defp send_results(nil, query_ref, owner) do 
    send(owner, {:results, query_ref, []})
  end
  defp send_results(answer, query_ref, owner) do
    results = [%Result{backend: "wolfram", score: 95, text: to_string(answer)}]
    send(owner, {:results, query_ref, results})
  end

  defp fetch_xml(query_str) do 
    {:ok, {_, _, body}} = :httpc.request(
      String.to_char_list("http://api.wolframalpha.com/v2/query" <>
        "?appid=#{app_id()}" <>
        "&input=#{URI.encode(query_str)}&format=plaintext"))
    body
  end

  defp app_id, do: Application.get_env(:rumbl, :wolfram)[:app_id]
end

這個 module 並沒有 GenServer 的 callbacks,因為這個 process 是一個 task,GenServer 是一個 generic server 可計算並儲存 state,但有時我們只需要 store state 或是 只需要執行某個 function。

Agent 是簡化的 GenServer 可儲存 state task 是個簡單的 process 可執行某個 function

SweetXml 用來 parse XML,Result 是 the struct for the results

Task.start_link 是啟動 Task 的方式

fetch_xml 裡面試用 :httpc,這是 erlang 的 standard library,可處理 HTTP request

sendresults(queryref, owner) 將結果回傳給 requester

有兩種 send_results,分為有 answer 或沒有

先用 iex -S mix 測試

iex(4)> Rumbl.InfoSys.compute("what is elixir?")
[{#PID<0.592.0>, #Reference<0.3775272462.1982070785.26789>}]
iex(5)> flush()
:ok
iex(6)> flush()
:ok
iex(7)> flush()
{:results, #Reference<0.3775272462.1982070785.26789>,
 [%Rumbl.InfoSys.Result{backend: "wolfram", score: 95,
   text: "1 | noun | a sweet flavored liquid (usually containing a small amount of alcohol) used in compounding medicines to be taken by mouth in order to mask an unpleasant taste\n2 | noun | hypothetical substance that the alchemists believed to be capable of changing base metals into gold\n3 | noun | a substance believed to cure all ills",
   url: nil}]}
:ok

要讓 service 更堅固,必須做以下工作

  1. 偵測 backend crash,這樣就不要等 results

  2. 由 backend 取得結果要根據 score 排序

  3. 需要 timeout 機制


Monitoring Processes

使用 Process.monitor 在 waiting results 時偵測 backend crashes,一但設定了 monitor,會在該 process dies 時,收到 message。

測試

iex(1)> pid = spawn(fn -> :ok end)
#PID<0.261.0>
iex(2)> Process.monitor(pid)
#Reference<0.777943872.2254438401.3282>
iex(3)> flush()
{:DOWN, #Reference<0.777943872.2254438401.3282>, :process, #PID<0.261.0>,
 :noproc}
:ok

修改 /lib/rumbl/info_sys.ex

defmodule Rumbl.InfoSys do
  @backends [Rumbl.InfoSys.Wolfram]

  defmodule Result do
    defstruct score: 0, text: nil, url: nil, backend: nil
  end

  def start_link(backend, query, query_ref, owner, limit) do
    backend.start_link(query, query_ref, owner, limit)
  end

  def compute(query, opts \\ []) do
    limit = opts[:limit] || 10
    backends = opts[:backends] || @backends

    backends
    |> Enum.map(&spawn_query(&1, query, limit))
    |> await_results(opts)
    |> Enum.sort(&(&1.score >= &2.score))
    |> Enum.take(limit)
  end

  defp spawn_query(backend, query, limit) do
    query_ref = make_ref()
    opts = [backend, query, query_ref, self(), limit]
    {:ok, pid} = Supervisor.start_child(Rumbl.InfoSys.Supervisor, opts)
    monitor_ref = Process.monitor(pid)
    {pid, monitor_ref, query_ref}
  end

  defp await_results(children, _opts) do
    await_result(children, [], :infinity)
  end

  defp await_result([head|tail], acc, timeout) do
    {pid, monitor_ref, query_ref} = head

    receive do
      {:results, ^query_ref, results} ->
        Process.demonitor(monitor_ref, [:flush])
        await_result(tail, results ++ acc, timeout)
      {:DOWN, ^monitor_ref, :process, ^pid, _reason} ->
        await_result(tail, acc, timeout)
    end
  end

  defp await_result([], acc, _) do
    acc
  end
end

compute 會自動等待 results,收到時,會 sorting by score,並回報 top ones

spawn_query 裡面增加了 Process.monitor(pid)

awaitresults 是 recursive function,在每次呼叫 awaitresults 就會新增一個 result 到 list

正確的 result 會 match {:results, ^query_ref, result}

Process.demonitor(monitor_ref, [:flush]) 是將 monitor process 移除

現在 compute 會自動處理結果

iex(1)> Rumbl.InfoSys.compute("what is the meaning of life?")
[%Rumbl.InfoSys.Result{backend: "wolfram", score: 95,
  text: "42\n(according to the book The Hitchhiker's Guide to the Galaxy, by Douglas Adams)",
  url: nil}]

Timeout

receive 可設定 after 這個 timeout 機制

receive do
  :this_will_never_arrive -> :ok
after
  1_000 -> :timedout
end

修改 /lib/rumbl/infosys.ex 等待 backend 5000 ms


defmodule Rumbl.InfoSys do
  @backends [Rumbl.InfoSys.Wolfram]

  defmodule Result do
    defstruct score: 0, text: nil, url: nil, backend: nil
  end

  def start_link(backend, query, query_ref, owner, limit) do
    backend.start_link(query, query_ref, owner, limit)
  end

  def compute(query, opts \\ []) do
    limit = opts[:limit] || 10
    backends = opts[:backends] || @backends

    backends
    |> Enum.map(&spawn_query(&1, query, limit))
    |> await_results(opts)
    |> Enum.sort(&(&1.score >= &2.score))
    |> Enum.take(limit)
  end

  defp spawn_query(backend, query, limit) do
    query_ref = make_ref()
    opts = [backend, query, query_ref, self(), limit]
    {:ok, pid} = Supervisor.start_child(Rumbl.InfoSys.Supervisor, opts)
    monitor_ref = Process.monitor(pid)
    {pid, monitor_ref, query_ref}
  end

  defp await_results(children, opts) do
    timeout = opts[:timeout] || 5000
    timer = Process.send_after(self(), :timedout, timeout) 
    results = await_result(children, [], :infinity)
    cleanup(timer)
    results
  end

  defp await_result([head|tail], acc, timeout) do
    {pid, monitor_ref, query_ref} = head

    receive do
      {:results, ^query_ref, results} ->
        Process.demonitor(monitor_ref, [:flush])
        await_result(tail, results ++ acc, timeout)
      {:DOWN, ^monitor_ref, :process, ^pid, _reason} ->
        await_result(tail, acc, timeout)
      :timedout -> 
        kill(pid, monitor_ref)
        await_result(tail, acc, 0)
    after
      timeout ->
        kill(pid, monitor_ref)
        await_result(tail, acc, 0)
    end
  end

  defp await_result([], acc, _) do
    acc
  end

  defp kill(pid, ref) do 
    Process.demonitor(ref, [:flush])
    Process.exit(pid, :kill)
  end

  defp cleanup(timer) do  
    :erlang.cancel_timer(timer)
    receive do
      :timedout -> :ok
    after
      0 -> :ok
    end
  end
end

Integrating OTP Services with Channels

將剛剛的服務放到 VideoChannel 中

修改 /web/channels/video_channel.ex


defmodule Rumbl.VideoChannel do
  use Rumbl.Web, :channel
  alias Rumbl.AnnotationView

  def join("videos:" <> video_id, params, socket) do
    last_seen_id = params["last_seen_id"] || 0
    video_id = String.to_integer(video_id)
    video = Repo.get!(Rumbl.Video, video_id)

    annotations = Repo.all(
      from a in assoc(video, :annotations),
      where: a.id > ^last_seen_id,
      order_by: [asc: a.at, asc: a.id],
      limit: 200,
      preload: [:user]
    )
    resp = %{annotations: Phoenix.View.render_many(annotations, AnnotationView,
      "annotation.json")}

    {:ok, resp, assign(socket, :video_id, video.id)}
  end

  def handle_in(event, params, socket) do
    user = Repo.get(Rumbl.User, socket.assigns.user_id)
    handle_in(event, params, user, socket)
  end

  def handle_in("new_annotation", params, user, socket) do
    changeset =
      user
      |> build_assoc(:annotations, video_id: socket.assigns.video_id)
      |> Rumbl.Annotation.changeset(params)

    case Repo.insert(changeset) do
      {:ok, ann} ->
        broadcast_annotation(socket, ann)
        Task.start_link(fn -> compute_additional_info(ann, socket) end)
        {:reply, :ok, socket}

      {:error, changeset} ->
        {:reply, {:error, %{errors: changeset}}, socket}
    end
  end

  defp broadcast_annotation(socket, annotation) do
    annotation = Repo.preload(annotation, :user)
    rendered_ann = Phoenix.View.render(AnnotationView, "annotation.json", %{
      annotation: annotation
    })
    broadcast! socket, "new_annotation", rendered_ann
  end

  defp compute_additional_info(ann, socket) do
    for result <- Rumbl.InfoSys.compute(ann.body, limit: 1, timeout: 10_000) do
      attrs = %{url: result.url, body: result.text, at: ann.at}
      info_changeset =
        Repo.get_by!(Rumbl.User, username: result.backend)
        |> build_assoc(:annotations, video_id: ann.video_id)
        |> Rumbl.Annotation.changeset(attrs)

      case Repo.insert(info_changeset) do
        {:ok, info_ann} -> broadcast_annotation(socket, info_ann)
        {:error, _changeset} -> :ignore
      end
    end
  end
end

新增 /rumbl/priv/repo/backend_seeds.exs

alias Rumbl.Repo
alias Rumbl.User

Repo.insert!(%User{name: "Wolfram", username: "wolfram"})
$ mix run priv/repo/backend_seeds.exs
Compiling 19 files (.ex)
warning: String.to_char_list/1 is deprecated, use String.to_charlist/1
  lib/rumbl/info_sys/wolfram.ex:29

warning: function authenticate/2 is unused
  web/controllers/user_controller.ex:40

[debug] QUERY OK db=0.2ms queue=12.1ms
begin []
[debug] QUERY OK db=0.9ms
INSERT INTO `users` (`name`,`username`,`inserted_at`,`updated_at`) VALUES (?,?,?,?) ["Wolfram", "wolfram", {{2017, 9, 7}, {3, 55, 52, 165910}}, {{2017, 9, 7}, {3, 55, 52, 168676}}]
[debug] QUERY OK db=1.8ms
commit []

Inpsecting with Observer

erlang 有個 Observer 工具,可用這個方式啟動

:observer.start
iex -S mix

# 用這個方式啟動,可查看 Phoenix 部分的狀況
iex -S mix phoenix.server

supervision tree 是一個好的工具,可以觀察要怎麼將 application 分開。現在我們要將 application 分成兩個部分 :rumbl 及 :info_sys

利用 umbrella project 來處理

Using Umbrellas

每個 umbrella project 目錄包含以下的部分

  1. shared configuration of the project
  2. dependencies for the project
  3. apps 目錄 with child applications

要重新產生一個 umbrella project

$ mix new rumbrella --umbrella
* creating .gitignore
* creating README.md
* creating mix.exs
* creating apps
* creating config
* creating config/config.exs

Your umbrella project was created successfully.
Inside your project, you will find an apps/ directory
where you can create and host many apps:

    cd rumbrella
    cd apps
    mix new my_app

Commands like "mix compile" and "mix test" when executed
in the umbrella project root will automatically run
for each application in the apps/ directory.

將 InfoSys 移動到 rumbrella 下面

$ cd rumbrella/apps

$ mix new info_sys --sup
* creating README.md
* creating .gitignore
* creating mix.exs
* creating config
* creating config/config.exs
* creating lib
* creating lib/info_sys.ex
* creating lib/info_sys/application.ex
* creating test
* creating test/test_helper.exs
* creating test/info_sys_test.exs

Your Mix project was created successfully.
You can use "mix" to compile it, test it, and more:

    cd info_sys
    mix test

Run "mix help" for more commands.

將 InfoSys 由 rumbl 移到 info_sys

  1. 將 rumbl/lib/rumbl/infosys.ex Rumbl.InfoSys 改為 InfoSys 並複製到 infosys/lib/info_sys.ex ,前面加上

    use Application
    
    def start(_type, _args) do
    InfoSys.Supervisor.start_link()
    end
    
    @backends [InfoSys.Wolfram]
  2. 將 /rumbl/lib/rumbl/infosys/supervisor.ex Rumbl.InfoSys.Supervisor 改為 InfoSys.Supervisor 並複製到 infosys/lib/info_sys/supervisor.ex

  3. 將 /rumbl/lib/rumbl/infosys/wolfram.ex 的 Rumbl.InfoSys.Wolfram module 改為 InfoSys.Wolfram,並複製到 infosys/lib/info_sys/wolfram.ex

  4. 把所有 "Rumbl.InfoSys" 都改為 "InfoSys"

  5. 修改 lib/infosys/wolfram.ex,由 :rumbl 改為 :infosys

    defp app_id, do: Application.get_env(:info_sys, :wolfram)[:app_id]
  6. 修改 apps/info_sys/mix.exs

    def deps do
        [
        {:sweet_xml, "~> 0.5.0"}
        ]
    end
  7. 在 rumbrella 目錄執行 $ mix deps.get


將 rumbl 改為 umbrella child

  1. 將整個 rumbl 移動到 apps 目錄

  2. 修改 rumbrella/apps/rumbl/mix.exs,增加

      build_path: "../../_build",
      config_path: "../../config/config.exs",
      deps_path: "../../deps",
      lockfile: "../../mix.lock",
  3. 修改 applicaton ,增加 :info_sys

    def application do
        [mod: {Rumbl, []},
         applications: [:phoenix, :phoenix_pubsub, :phoenix_html, :cowboy, :logger, :gettext,
                        :phoenix_ecto, :mariaex, :comeonin, :info_sys]]
      end
  4. 更新 dependencies,移除 :sweetxml ,增加 :infosys

    {:info_sys, in_umbrella: true},
  5. 修改 rumbl/lib/rumbl.ex 移除 Rumbl.InfoSys

        children = [
          supervisor(Rumbl.Endpoint, []),
          supervisor(Rumbl.Repo, []),
        ]
  6. 修改 /web/channlel/videochannel.ex 的 computeadditional_info 將 Rumbl.InfoSys 改為 InfoSys

  7. 修改 cofnig/dev.secrets.exs 改為 :info_sys

    use Mix.Config
    config :info_sys, :wolfram, app_id: "LP93J3-XXXXXX"
  8. 修改 rumbl/package.json

      "dependencies": {
        "phoenix": "file:../../deps/phoenix",
        "phoenix_html": "file:../../deps/phoenix_html"
      },

在 rumbrella 目錄中

$ mix deps.get
$ mix test

References

Programming Phoenix