2018/12/24

python 函數的可變參數 *args 和 **kwargs

一般函數的參數個數都是固定的,但如果遇到參數數量不固定的狀況,通常會將某些參數填上預設值,在 python function 可以支援兩種可變數量的參數 *args**kwargs

以下例子中的 fun 雖然定義了三個參數,但是後面兩個填上預設值,呼叫該函數時,就可以忽略 b 與 c,直接使用預設值。

def fun(a,b=2,c=3):
    print("a={}, b={}, c={}".format(a,b,c))

fun(1)

fun(1,22,33)

執行結果

a=1, b=2, c=3
a=1, b=22, c=33

*args是可變的positional arguments列表,**kwargs是可變的keyword arguments列表。兩個可以同時使用,但在使用時,*args必須在**kwargs的前面,因為positional arguments,有位置順序的對應,必須位於keyword arguments之前。

以下的例子,是定義 fun 有一個必要參數 a,以及可變的 *args

def fun(a, *args):
    print("a={}".format(a))
    for arg in args:
        print('Optional argument: {}'.format( arg ) )

fun(1,22,33)

執行結果為

a=1
Optional argument: 22
Optional argument: 33

如果同時加上 **kwargs

def fun(a, *args, **kwargs):
    print("a={}".format(a))
    for arg in args:
        print('Optional argument: {}'.format( arg ) )

    for k, v in kwargs.items():
        print('Optional kwargs argument key: {} value {}'.format(k, v))

fun(1,22,33, k1=44, k2=55)

執行結果為

a=1
Optional argument: 22
Optional argument: 33
Optional kwargs argument key: k1 value 44
Optional kwargs argument key: k2 value 55

除了在定義函數的部分,在呼叫函數時,也可以使用 *args**kwargs

print("")
args = [1,2,3,4]
fun(*args)

print("")
kwargs = {'k1':10, 'k2':11}
fun(1, **kwargs)

print("")
fun(1, *args, **kwargs)

執行結果為

a=1
Optional argument: 2
Optional argument: 3
Optional argument: 4

a=1
Optional kwargs argument key: k1 value 10
Optional kwargs argument key: k2 value 11

a=1
Optional argument: 1
Optional argument: 2
Optional argument: 3
Optional argument: 4
Optional kwargs argument key: k1 value 10
Optional kwargs argument key: k2 value 11

References

Python定義的函數(或調用)中參數args 和*kwargs的用法

[Python] 令新手驚呆的 **kwargs

PYTHON中如何使用ARGS和*KWARGS

理解 Python 中的 *args 和 **kwargs

2018/12/10

python 如何處理 json

json: JavaScript Object Notation 是 javascript 的子集合,是一種輕量級的資料交換格式,比 XML 簡單,也容易閱讀及編寫,處理過程分為序列化及反序列化兩個部分,分別是 Marshalling/Encoder 及 Unmarshalling/Decoder。

Marshalling/Encoder 就是將 python 的資料物件轉換為 json 字串,使用 json.dumps

Unmarshalling/Decoder 是將 json 字串轉換為 python 物件,使用 json.loads

使用 dumps, loads 的簡單範例

這是 python 的 dict 資料型別,因為 dict 沒有字串的表示方式,透過 repr 將 dict 轉換為可以列印的資料

json_obj = {
            'name' : 'Apple',
            'shares' : 100,
            'price' : 542.1
        }

logging.info( "json_obj type = "+str(type(json_obj))+ ", data =" + repr(json_obj) )

dict 可以透過 json.dumps 轉換為 json 字串

json_string = json.dumps(json_obj)

logging.info( "json_string type="+ str(type(json_string))+ ", data=" + json_string )

再透過 json.loads 將 json 字串轉換為 dict

json_object = json.loads(json_string)

        logging.info( "json_object type="+str(type(json_object))+", data="+repr(json_object) )

測試結果

json_obj type = <class 'dict'>, data ={'name': 'Apple', 'shares': 100, 'price': 542.1}

json_string type=<class 'str'>, data={"name": "Apple", "shares": 100, "price": 542.1}

json_object type=<class 'dict'>, data={'name': 'Apple', 'shares': 100, 'price': 542.1}

由 python 資料轉換為 json 的對照表為

python json
dict object
list, tuple array
str, unicode string
int, long, float number
True true
False false
None null

由 json 轉換為 python 資料的對照表為

json python
object dict
array list
string unicode
number(int) int,long
number(real) float
true True
false False
null None

pprint 及 dumps 的 indent, sort_keys 參數

如果 json 的字串比較長,列印到 console 時,可能會比較難查閱需要的資料,這時候有兩種方式可以處理

使用 pprint 可以直接用 pprint(json_obj)

from pprint imort pprint

json_obj = {
            'name' : 'Apple',
            'shares' : 100,
            'price' : 542.1
        }

logging.info( "json_obj type = "+str(type(json_obj))+ ", data =" + repr(json_obj) )

pprint(json_obj)

執行結果為

{'name': 'Apple', 'price': 542.1, 'shares': 100}

在 dumps 加上 indent 參數

json_string = json.dumps(json_obj, indent=4)

執行結果為

json_string type=<class 'str'>, data={
    "name": "Apple",
    "shares": 100,
    "price": 542.1
}

在 dumps 加上 sort_keys 可在轉換 json 時,同時進行 key 的排序

json.dumps({"c": 0, "b": 0, "a": 0}, sort_keys=True)

執行結果

{"a": 0, "b": 0, "c": 0}

loads 的 object_pairs_hookobject_hook 參數

通常 json 會轉換為 python 的 dict 及 list,可使用 object_pairs_hook,將 json 轉換為 OrderedDict

s = '{"name": "ACME", "shares": 50, "price": 490.1}'
        from collections import OrderedDict
data = json.loads(s, object_pairs_hook=OrderedDict)

logging.info( "json_object type="+str(type(data))+", data="+repr(data) )

執行結果

json_object type=<class 'collections.OrderedDict'>, data=OrderedDict([('name', 'ACME'), ('shares', 50), ('price', 490.1)])

也可以用 object_hook 將 json 轉換為 python 物件

class JSONObject:
    def __init__(self, d):
        self.__dict__ = d

data2 = json.loads(s, object_hook=JSONObject)
logging.info( "json_object type="+str(type(data2))+", name="+data2.name+", shares="+str(data2.shares)+", price="+str(data2.price) )

執行結果

json_object type=<class '__main__.JSONObject'>, name=ACME, shares=50, price=490.1

skipkeys

在 dumps 如果遇到 key 不是字串時,會發生 TypeError,可使用 skipkeys 略過無法處理的 key

data =  { 'b' : 789 , 'c' : 456 ,( 1 , 2 ): 123 }
print( json.dumps(data,skipkeys = True) )

執行結果

{"b": 789, "c": 456}

自訂 python 物件的 轉換函數

如果是自訂的 python 類別,通常是無法直接透過 dumps 序列化,會發生 error

class Point:
    def __init__(self, x, y):
        self.x = x
        self.y = y
p = Point(2, 3)

print(p)

json.dumps(p)

執行結果

TypeError: Object of type 'Point' is not JSON serializable

解決方式:

首先定義兩個獨立的 util function,object2dict 是將物件轉換為 dict,dict2object 則是將 dict 轉換為 object,在 dict2object 裡面會使用 dict 的 __module____class__ 這兩個資料,用來正確找到 class 的定義

def object2dict(obj):
    d = {
        '__class__': obj.__class__.__name__,
        '__module__': obj.__module__
    }
    # d.update( vars(obj) )
    d.update(obj.__dict__)
    return d


def dict2object(d):
    if '__class__' in d:
        module_name = d.pop( '__module__' )
        class_name = d.pop( '__class__' )
        logging.debug("module_name="+str(module_name)+", class_name="+class_name)

        # # from A import B
        import importlib
        objmodule = importlib.import_module(module_name)
        cls = getattr(objmodule, class_name)

        # objmodule = __import__(module_name)
        # cls = getattr(objmodule, class_name)

        # # use class directly
        # cls = classes[class_name]
        # # Make instance without calling __init__
        obj = cls.__new__(cls)
        for key, value in d.items():
            setattr(obj, key, value)
        return obj
    else:
        inst = d
    return inst

如果另外有一個類別定義在 test.point.py 裡面

class Point:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __repr__( self ):
        return 'Point Object x : %d , y : %d' % ( self.x, self.y)

透過 object2dict 及 dict2object 就可以協助進行物件與 json 的轉換

from test import point
p = point.Point(2, 3)
logging.info(p)

json_str = json.dumps(p, default=object2dict)
logging.info(json_str)

o = json.loads(json_str, object_hook=dict2object)
logging.info( "json_object type="+str(type(o))+", data="+repr(o) )

執行結果

{"__class__": "Point", "__module__": "test.point", "x": 2, "y": 3}

module_name=test.point, class_name=Point
2018-09-03 16:01:00,274 INFO {95322,MainProcess} 

json_object type=<class 'test.point.Point'>, data=Point Object x : 2 , y : 3

有時會遇到 list of Point,這時需要修改 object2dict 讓他可以處理 obj 是 list 的狀況

def object2dict(obj):
    d = {}
    if isinstance(obj, list):
        return json.dumps(obj, default=object2dict)
    else:
        # d = {
        #   '__class__': obj.__class__.__name__,
        #   '__module__': obj.__module__
        # }
        d['__class__'] = obj.__class__.__name__
        d['__module__'] = obj.__module__
        # d.update( vars(obj) )
        d.update(obj.__dict__)
        return d

References

20.2. json — JSON encoder and decoder

Python處理JSON

6.2 讀寫JSON數據

Json概述以及python對json的相關操作

2018/12/03

一樣都是 PM,Project Manager 跟 Product Manager 有什麼不一樣

英文縮寫 PM 有兩種,一個是比較常聽到的 Project Manager,一個是 Product Manager,專業的 Product Manager 比較少見,大部分都是專案管理。另外還有人也將 Product Marketing 列入討論,不過這部分變成是 Product Manager 與 Marketing 的差異,管理跟行銷本質上就不一樣,這部分比較容易理解。

先看看不同的網路文章的意見:

一個 Project Manager 最大的任務就是將 Project 如期、如規格「正確」的執行,而這過程中,我想分為兩種層面的技能:技術與溝通。

技術層面上,PM 就是把業主/老闆心中想要的功能「翻譯」成工程師可以開發的規格、確認功能可行性、與業主/老闆確認規格、評估時間、排時程、確認優先順序、掌握開發進度、測試、驗收。溝通層面上,要去對專案所有關係人,進行不同協調,主要就是業主跟工程師之間的橋樑。

Product Manager 是以「使用者」出發、產品是否解決使用者的痛點、是否真正滿足使用者需求。你要討好的對象不是老闆、不是團隊成員、不是投資者、不是合作廠商,而是「使用者」。產品,就是一個隨時都有真實 user 在使用的東西。

所以做一個 Product Manager、或者待在一個做產品的團隊都可以深刻的感覺到,壞的情況是,當產品出問題或斷線,客服電話馬上進來、客服信箱馬上開始被罵;好的情況是,你也會收到使用者溫馨的回饋與感謝。

項目是局部優化工作。我們讓一組人為客戶創建(或改進已有的)功能,然後交給一個不知道為什麼變更、也不知道這個變更如何實現的團隊。變革的成本很高,需要花很長時間才能交到客戶手中。

圍繞產品(或服務)構建的組織有優勢,因為他們的組織結構可以監督整個工作過程,就是說,從想法到客戶,並基於客戶的反饋或反應想出新點子,形成閉環。我這樣說,是因為團隊為自己負責,對所提供的方案、交付質量、客戶體驗和滿意度有主人翁精神。進行看上去也許很小的變革,但是要允許想法快速地傳遞到客戶手中。

根據美國產品管理協會PDMA的定義,所謂的「產品管理(Product Management)」是指:在新產品開發的過程當中,通過不斷監控和調整市場組合的基本要素(其中包括產品及自身特色、溝通策略、配銷通路和價格),隨時確保產品或者服務能充分滿足客戶需求(Ensuring over time that a product or service profitably meets the needs of customers by continually monitoring and modifying the elements of the marketing mix, including: the product and its features, the communications strategy, distribution channels and price.)。而產品經理就是針對上述特定產品活動肩負所有責任的人。

Project Manager(專案經理):很重要,但通常是在食物鏈的底層 Product Marketing(產品行銷):很重要,但對於產品通常沒有實權 Product Manager(產品經理):通常在食物鏈頂層,但格局可大可小

行銷思維:盡可能讓對的受眾,精準接收到我想傳達的資訊,並做出行動。 專案管理思維:資源與風險的管理與控制,多方平衡。 產品思維:了解顧客的痛點,提出解決方法。 營運思維:流程觀與協作思維,讓各個部門順暢協作。

企業在新產品開發過程中,經常遭遇許多的瓶頸與風險,使得開發專案經常發生延遲、失敗的問題。歸咎其因,除了缺乏一套完整的新產品開發流程做好專案管理掌控之外,更缺少一位可以來整合各部門資源,掌控開發專案的品質、成本與時程,以增進產品競爭力的產品經理。

根據Wheelwright and Clark(1992)在《Creating Project Plans to Focus Product Development》的研究報告指出:企業的新產品開發專案分為以下四種類型:

1.衍生產品專案(Derivative Projects)

局部創新與改進的產品專案,包括對現有產品改良以提升功能、降低成本,或為滿足不同區隔市場客戶的需求而改變功能外型。開發過程的風險較低,專案時程較短,所需資源也比較有限。

2.突破性產品專案(Breakthrough Projects)

突破性產品專案與企業當前主流產品開發專案有很大差異,資源投入的需求量高,開發風險與不確定性很高,但短期間很難產生成果與利潤,因此開發過程中遭遇的阻力就比較大。管理的複雜度遠要高過於其他三種專案類型。

3.平台產品專案(Platform Projects)

平台產品的改變幅度遠高過於衍生產品,但卻不像突破性產品使用從未發展過的新技術與新材料。平台產品比較著重於系統上的創新,衍生產品或許只改變產品中的一項特質(如:成本、品質、產品表現等),而平台產品則針對全面的產品問題進行改善。

4.研究發展專案(R&D Projects)

研究發展專案不屬於商業化應用的範疇,但因為它是上述三種產品開發專案的先驅活動,而且也會佔用相當比例的資源。


Product 與 Project 的差異在於生命週期 以及 時程,任何 Product 產品/商品,其生命週期會比 Project 長,因為身為 Product,其概念就是企業內固定在銷售的一個商品項目,而 Product 開發本身,可以切割為多個 project。

在 美國專案管理學會 Project Management Institute PMI 的定義中,專案是指一項暫時性的任務、配置,以開創某獨特性的產品或服務。專案有起迄時間。專案團隊並不是一直存在的,而是為了某一特定目的或產品組成的。

『專案管理』是將管理知識、技術、工具、方法綜合運用到任何一個專案之上,使專案得以符合要求。 』專案經過啟動、規劃、執行、監控、結案這五大流程,而專案經理人則運用專案管理的九大知識對專案進行管理,讓專案可以如期、符合品質要求並在成本內完成。

簡單來說,Product 每一次版本的功能與規格制定,到開發最後測試完成,這樣一個週期,完成了一個版本,則本產品的本次版本的專案就已經結束。

換句話說,專案是一時的,產品則會存在很久。

因為生命週期不同,產品管理者需要承擔的責任會比較久,至於專案管理的部分,基本上只需要在專案結束時,能夠順利結案就可以了。專案管理者要承擔的責任,就是產品本身所有相關的事情,包含產品要解決的問題,解決的方法,開發的過程,版本功能的分割,除錯,到後續的問題處理,還有維護,甚至是產品的行銷,所有的面向都要承擔責任。

至於重要性,個人認為是都很重要,遇到的問題不同,也各有各的難處。更重要的是要找到正確的負責人,制度問題可以靠人去調整及解決,但是人如果錯了,就很難彌補問題。

2018/11/26

如何使用 python logging

在 python 要列印除錯資訊,最簡單的方式是使用 print,但如果要區分 log 的等級,或是將不同的 log 資訊放到不同的檔案,這時就需要使用 logging module,但 loggin module 的設定會比較複雜一些,以下紀錄使用 logging module 的方式。

在 project 最常被呼叫及使用的 util.py 裡面,初始化 logging module,如果只需要簡單的 logging 設定,可直接在 util 裡面初始化 logging,複雜的設定可以用設定檔的方式處理。

直接在程式裡面初始化 logging 的範例

使用 TimedRotatingFileHandler,搭配 when='midnight',在每天午夜更換log 檔案,並將舊的 log 檔加上日期 postfix,backupCount 是控制保留幾個舊的 log file。formatter 是設定每一行 log 的 pattern。最後將 file handler 以及 console handler 新增到 rootLogger 裡面。

rootLogger = logging.getLogger()
# 用 rootLogger 的 handler 數量判斷是否已經有初始化 logging print( "len(logger.handlers)="+str(len(logger.handlers)) )

if len(rootLogger.handlers) == 0:
    from logging.handlers import TimedRotatingFileHandler
    logger.setLevel(logging.DEBUG)
    
    log_file_path = os.path.join('.', logs_directory+'/'+'project.log').replace('\\', '/')
    fh = TimedRotatingFileHandler(log_file_path,when='midnight',interval=1,backupCount=30)
    fh.suffix = "%Y-%m-%d"

    datefmt = '%Y-%m-%d %H:%M:%S'
    # datefmt = '%Y-%m-%d'
    # format_str = '%(asctime)s %(levelname)s %(message)s '
    format_str = '%(asctime)s %(levelname)s %(module)s.%(funcName)s %(lineno)d: %(message)s'
    # formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
    formatter = logging.Formatter(format_str, datefmt)
    fh.setFormatter(formatter)
    logger.addHandler(fh)

    # 定義 handler 輸出 sys.stderr
    console = logging.StreamHandler()
    console.setLevel(logging.DEBUG)
    console.setFormatter(formatter)

    logger.addHandler(console)

使用時只需要 import logging 預設就會取得 root logger

import logging

logging.info("test")

logging 設定檔

如果需要複雜一些的 logging,例如某個 module 的 log 放到某些特別的檔案中,區分 module 讓不同 module 的 log 不會全部混在一個 log file 裡面。可利用設定檔的方式處理。

logging.cfg 設定檔,定義了四個 logger,root logger 有兩個 handler,module logger 有兩個 handler,其中 console handler 是一樣的,不同的是對應到不同 log 檔案的 file handler,至於 log 的 pattern 樣式跟上面的範例設定一樣。

[loggers]
keys=root,consoleLog,projectFileHandler,moduleLog

[formatters]
keys=consoleFormatter,fileFormatter

[handlers]
keys=consoleHandler,projectFileHandler,moduleFileHandler

[logger_root]
level=DEBUG
handlers=projectFileHandler,consoleHandler

[logger_moduleLog]
level=DEBUG
qualname=moduleLog
handlers=moduleFileHandler,consoleHandler
propagate=0

[logger_consoleLog]
level=NOTSET
handlers=consoleHandler
qualname=consoleLog
propagate=0

[formatter_consoleFormatter]
format=%(asctime)s %(levelname)s {%(process)d,%(processName)s} {%(filename)s} [%(funcName)s] %(lineno)d - %(message)s
datefmt=

[formatter_fileFormatter]
format=%(asctime)s %(levelname)s {%(process)d,%(processName)s} {%(filename)s} {%(module)s} [%(funcName)s] %(lineno)d - %(message)s
datefmt=


[handler_consoleHandler]
class=StreamHandler
level=NOTSET
formatter=consoleFormatter
args=(sys.stdout,)


[handler_projectFileHandler]
class=handlers.TimedRotatingFileHandler
level=NOTSET
formatter=fileFormatter
args=('%(projectlogpath)s','midnight',1,30)

[handler_moduleFileHandler]
class=handlers.TimedRotatingFileHandler
level=NOTSET
formatter=fileFormatter
args=('%(modulelogpath)s','midnight',1,30)

在 util.py 初始化 logging

# logging
from logging import config
import logging

rootLogger = logging.getLogger()
# print( "len(logger.handlers)="+str(len(logger.handlers)) )

if len(rootLogger.handlers) == 0:
    logging_cfg_file_path = os.path.join('.', 'conf/logging.cfg')
    # print("logging_cfg_file_path="+logging_cfg_file_path)

    # create logs directory
    logs_directory = "logs"
    if not os.path.exists(logs_directory):
        try:
            os.makedirs(logs_directory)
        except OSError:
            if not os.path.isdir(logs_directory):
                raise

    project_log_file = 'project.log'
    module_log_file = 'module.log'

    project_log_path = os.path.join('.', logs_directory+'/'+ project_log_file).replace('\\', '/')
    module_log_path = os.path.join('.', logs_directory+'/'+ module_log_file).replace('\\', '/')

    logging.config.fileConfig(logging_cfg_file_path,
                            defaults={'projectlogpath': project_log_path,
                                    'modulelogpath': module_log_path},
                            disable_existing_loggers=False)

在使用時,如果直接用 logging,會對應使用到 root logger

import logging

logging.info("test")

但如果是透過 logging.getLogger(name="moduleLog") 可取得 moduleLog 的 logger,這時候的 log 會寫入到 module 的 log file

import logging

logger = logging.getLogger(name="moduleLog")
logger.info("test")

References

[Python] logging 教學

[Python] logging 教學

2018/11/19

如何使用 SQLAlchemy in python

SQLAlchemy是Python的一款開源軟體,提供 SQL 工具包及物件關係對映(ORM)的功能,以 MIT 授權。SQLAlchemy 的設計概念比較接近 Java 的 Hibernate。

另一個常用的 ORM 套件為 Django ORM,他的設計概念跟 SQLAlchemy 不同,是使用 Active Record 的方式。

安裝套件

因我們是以 MariaDB 測試,除了要安裝 SQLAlchemy,還要安裝 mysql 的 driver,我們是用 python 的 driver: PyMySQL 進行測試

使用 python 3 的版本,是用 pip3 的指令,如果是 python 2 是用 pip。

sudo pip3 install SQLAlchemy
sudo pip3 install PyMySQL

DBEngine.py

以 Table: status 為例,在 DBEngine 中,以 create_engine 產生 SQLAlchemy 最基本的 engine,再由 engine 產生 session_maker 備用,待會要在 DAO 裡面使用。

後面的部分,用 engine.dialect.has_table 判斷 table 是否存在,如果不存在,就建立該 table

另外注意 updatedate 是在每次更新 record 時,要更新時間,createdate 則是在記錄產生 record 的時間。

from sqlalchemy import *
from sqlalchemy.orm import sessionmaker

db_user = 'root'
db_password = 'password'
db_host = 'localhost'
db_port = 3306
db_name = 'testdb'

engine = create_engine('mysql+pymysql://'+db_user+':'+db_password+'@'+db_host+':'+str(db_port)+'/'+db_name, encoding="utf8", echo=False, pool_pre_ping=True, pool_recycle=3600)
# echo: 是否顯示SQL的指令與相關訊息

DBSession = sessionmaker( bind=engine )

if not engine.dialect.has_table(engine, "status"):
    metadata = MetaData(engine)

    radiostatus_table = Table('status', metadata,
                              Column('rseq', Integer, primary_key=True, autoincrement=True),
                              Column('statuscode', Integer, nullable=False),
                              Column('updatedate', TIMESTAMP, default=func.now(), onupdate=func.now()),
                              Column('createdate', TIMESTAMP, default=func.now())
                              )

    metadata.create_all(engine)

Data Value Object

Base = declarative_base() 建立 Status 物件,作為 ORM 的物件定義,在該物件中,分別定義剛剛產生的 table: status的欄位對應。

from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Status(Base):
    __tablename__='status'

    rseq = Column(Integer, primary_key=True, autoincrement=True)
    statuscode = Column(Integer, nullable=False)

    updatedate = Column(TIMESTAMP, default=func.now(), onupdate=func.now)
    createdate = Column(TIMESTAMP, default=func.now())

    def __init__(self):
        pass

    def __init__(self, rseq, statuscode, updatedate, createdate):
        self.rseq = rseq
        self.statuscode = statuscode
        self.updatedate = updatedate
        self.createdate = createdate

    @classmethod
    def to_string(cls, vo):
        result = ''

        result += "(rseq, statuscode, updatedate, createdate)=({rseq}, {statuscode}, {updatedate}, {createdate})\n".format(rseq=str(vo.rseq), statuscode=str(vo.statuscode), updatedate=str(vo.updatedate), createdate=str(vo.createdate))

        return result

Data Access Object

最簡單的是 create data 的 function,首先透過 DBSession() 產生 database session,然後在該 function 中,將要處理的工作做完,完成時要呼叫 commit,發生 error 時要呼叫 rollback,在結束前,以 close 回收 db session。

import logging
from DBEngine import *
from sqlalchemy import *
from sqlalchemy.orm import *

class StatusDao():

    def __init__(self):
        pass
    
    def create(self, vo):
        session = DBSession()
        try:
            logging.debug("vo="+str(vo))
            session.add(vo)
            session.commit()
        except Exception as e:
            logging.error(str(e))
            session.rollback()
            raise
        finally:
            session.close()

這是 create_or_update 的 function,如果檢查該 rseq 的資料不存在,再 create record ,否則就以 update 方式更新該 record

    def create_or_update(self, vo):
        session = DBSession()
        try:
            oldvo = session.query(Status).filter_by(rseq=vo.rseq).first()

            if oldvo:
                oldvo.statuscode = vo.statuscode
                oldvo.updatedate = datetime.datetime.now()

                logging.debug("updating "+str(RadioStatus.to_string(oldvo)))
            else:
                logging.debug("creating "+str(RadioStatus.to_string(vo)) )
                session.add(vo)

            session.commit()

            newvo = session.query(Status).filter_by(rseq=vo.rseq).first()

            return newvo
        except Exception as e:
            logging.error(str(e))
            raise
        finally:
            session.close()

這是將 status 裡面所有資料都刪除的 function

    def get_all(self):
        session = DBSession()
        try:
            radiostatus_list = session.query(Status).all()
            return radiostatus_list
        except Exception as e:
            logging.error(str(e))
            raise
        finally:
            session.close()

取得某個 record 的資料

    def get_by_rseq(self, rseq):
        session = DBSession()
        try:
            vo = session.query(Status).filter_by(rseq=rseq).first()
            return vo
        except Exception as e:
            logging.error(str(e))
            raise
        finally:
            session.close()

取得所有 record 的資料

    def get_all(self):
        session = DBSession()
        try:
            status_list = session.query(Status).all()
            return status_list
        except Exception as e:
            logging.error(str(e))
            raise
        finally:
            session.close()

使用 DAO

產生 dao 物件後,再直接呼叫 function 即可

dao = StatusDao()

status = Status(None, 0, None, None)
vo = dao.create_or_update(status)

dao.delete_all()

這裡的 DAO 使用的 database session 作用範圍,並沒有超過 function 的呼叫範圍,因此這邊的 DAO 比較像是 business method 的功能,可以將一次 database task 要處理的所有工作,放到同一個 DAO function 裡面,在該 function 結束時,再 close db session,這樣的做法,對於簡單的 project 來說,比較容易管理 DAO 及 db session,也比較不會發生忘記 close db session 的問題。

References

SQLAlchemy wiki

Flask零基础到项目实战(四)SQLAlchemy数据库(二)

Column Insert/Update Defaults

Python SQLAlchemy ORM - 1

Connection Pooling

給Django用戶的SQLAlchemy介紹

2018/11/12

Fragile Base Class

這是物件導向程式語言中,類別繼承所造成的問題,當有類別被其他類別繼承時,常會遇到一些 method 被子類別覆寫 override 的狀況,但覆寫 method 後,卻可能會因為不了解父類別實作的細節,導致程式出錯。

以下是實際的例子

Super 提供兩個 method,但 inc1 會在method 裡面呼叫 inc2,Sub 繼承了 Super,但並不知道 inc1 的實作內容,在 Sub 裡面,覆寫了 inc2,將 inc2 改成呼叫 inc1,因此在使用 Sub 的 inc2 時,會一直重複呼叫函數,最後導致發生 java.lang.StackOverflowError

class Super {

  private int counter = 0;

  void inc1() {
    inc2();
  }

  void inc2() {
    counter++;
  }

}

class Sub extends Super {

  @Override
  void inc2() {
    inc1();
  }

  public static void main(String[] args) {
    Sub sub = new Sub();
    sub.inc2();
  }
}
$ java Sub
Exception in thread "main" java.lang.StackOverflowError
    at Super.inc1(Super.java:6)
    at Sub.inc2(Super.java:21)

這跟上面的例子類似,一樣是會發生重複呼叫函數的問題,最後導致發生 java.lang.StackOverflowError

class Base{
    protected int x;
    protected void m(){
        x++;
    }

    protected void n(){
        x++;      // <- defect
        m();
    }

}

class SubBase extends Base{
    protected void m(){
        n();
    }
    public static void main(String[] args) {
        SubBase sub = new SubBase();
        sub.m();
    }
}
$ java SubBase
Exception in thread "main" java.lang.StackOverflowError
    at Base.n(Base.java:9)
    at SubBase.m(Base.java:16)
    at Base.n(Base.java:9)

Java 提供一種解決方式,就是將無法被覆寫的 method 加上 final。

class Super {

  private int counter = 0;

  void inc1() {
    inc2();
  }

  final void inc2() {
    counter++;
  }
}

class Sub extends Super {

  @Override
  void inc2() {
    inc1();
  }
  public static void main(String[] args) {
    Sub sub = new Sub();
    sub.inc2();
  }
}

在編譯時,就會因為 final 的關係,造成編譯錯誤。

$ javac Super.java
Super.java:20: error: inc2() in Sub cannot override inc2() in Super
  void inc2() {
       ^
  overridden method is final
1 error

Kotlin 提供更積極的解決方案,所有 method 在不加上任何 modifier 的狀況下,預設都是有 final 特性的,如果可以讓子類別覆寫的 method,必須要加上 open 這個 modifier。

這樣的調整,可避免 method 任意被覆寫的問題,但相對的,programmer 要承擔更多責任,判斷什麼時候該加上 open,這有時候會造成一些困擾,就是不知道什麼時候要加上 open,就變成不寫 open。

open class Super {
    open fun v() {}
    fun nv() {}
}
class Sub() : Super() {
    override fun v() {}
}

最後只能說,沒有兩全其美的解決方案,就看程式語言的設計者,認定哪一種想法比較重要。

References

Fragile base class wiki

What is the fragile base class problem?

2018/11/05

Phoenix_9_OTP

Managing State with Processes

Functional programs are stateless,改用 Process 保存 state

新增 /rumbl/ib/rumbl/counter.ex

defmodule Rumbl.Counter do

  def inc(pid), do: send(pid, :inc)

  def dec(pid), do: send(pid, :dec)

  def val(pid, timeout \\ 5000) do 
    ref = make_ref()
    send(pid, {:val, self(), ref})
    receive do
      {^ref, val} -> val
    after timeout -> exit(:timeout)
    end
  end

  def start_link(initial_val) do 
    {:ok, spawn_link(fn -> listen(initial_val) end)}
  end

  defp listen(val) do 
    receive do
      :inc -> listen(val + 1)
      :dec -> listen(val - 1)
      {:val, sender, ref} ->
        send sender, {ref, val}
        listen(val)
    end
  end
end

這是一個獨立的 counter service,Counter API 有三個

  1. :inc
  2. :dec
  3. :val

:inc :dec 是非同步的呼叫

:val 不同,發送 message 後,會用 receive 等待回應。

make_ref() 是一個 global unique reference,可在 global (cluster) 環境中運作。

^ref 表示我們是以 pattern matching 的方式,判斷是不是回傳了正確的 process reference

OTP 需要一個 startlink function,並以 initialval 為 counter 初始的 state

程式中沒有看到任何 global variable 儲存 state,而是呼叫 listen,listen 會以 receive 去 block 並等待 message,而 val 就是放在這個 function 的參數上。process state 是用 recursive function 的方式不斷重複發送給下一個 listen,這是 tail recursive。


測試 Counter

$ iex -S mix

iex(1)> alias Rumbl.Counter
Rumbl.Counter
iex(2)> {:ok, counter} = Counter.start_link(0)
{:ok, #PID<0.270.0>}
iex(3)> Counter.inc(counter)
:inc
iex(4)> Counter.inc(counter)
:inc

iex(5)> Counter.val(counter)
2
iex(6)> Counter.dec(counter)
:dec
iex(7)> Counter.val(counter)
1

Building GenServer for OTP

更新 /lib/rumbl/counter.ex

defmodule Rumbl.Counter do
  use GenServer

  def inc(pid), do: GenServer.cast(pid, :inc)

  def dec(pid), do: GenServer.cast(pid, :dec)

  def val(pid) do
    GenServer.call(pid, :val)
  end

  def start_link(initial_val) do
    GenServer.start_link(__MODULE__, initial_val)
  end

  def init(initial_val) do
    {:ok, initial_val}
  end

  def handle_cast(:inc, val) do
    {:noreply, val + 1}
  end

  def handle_cast(:dec, val) do
    {:noreply, val - 1}
  end

  def handle_call(:val, _from, val) do
    {:reply, val, val}
  end
end

GenServer.cast 是非同步呼叫,server 以 handle_cast 處理,最後會 return {:noreply, val + 1} ,因為呼叫者不需要這個 reply message

GenServer.call 是同步呼叫,server 以 handle_call 處理

測試

$ iex -S mix
iex(1)> alias Rumbl.Counter
Rumbl.Counter
iex(2)> {:ok, counter} = Counter.start_link(0)
{:ok, #PID<0.269.0>}
iex(3)> Counter.inc(counter)
:ok
iex(4)> Counter.val(counter)
1
iex(5)> Counter.dec(counter)
:ok
iex(6)> Counter.val(counter)
0

Adding Failover

利用 Supervisor 監控 counter

Phoenix 並沒有很多處理 fail exception 的 code,而是以 error reporting 的方式處理,同時加上自動 restart service。

修改 /lib/rumbl.ex

defmodule Rumbl do
  use Application

  # See http://elixir-lang.org/docs/stable/elixir/Application.html
  # for more information on OTP Applications
  def start(_type, _args) do
    import Supervisor.Spec, warn: false

    children = [
      supervisor(Rumbl.Endpoint, []),
      supervisor(Rumbl.Repo, []),
      worker(Rumbl.Counter, [5]), # new counter worker
    ]

    opts = [strategy: :one_for_one, name: Rumbl.Supervisor]
    Supervisor.start_link(children, opts)
  end

  # Tell Phoenix to update the endpoint configuration
  # whenever the application is updated.
  def config_change(changed, _new, removed) do
    Rumbl.Endpoint.config_change(changed, removed)
    :ok
  end
end

child spec 定義 Elixir application 要啟動的 children,將 counter 這個 worker 加入 children list。

opts 是 supervision policy,這裡是使用 :oneforone

:oneforall 是 kill and restart all child processes


在 /lib/rumbl/counter.ex 加上 :tick 及處理 :tick 的 handle_info,每 1000ms 就倒數一次

  def init(initial_val) do
    Process.send_after(self, :tick, 1000)
    {:ok, initial_val}
  end

  def handle_info(:tick, val) do
    IO.puts "tick #{val}"
    Process.send_after(self, :tick, 1000)
    {:noreply, val - 1}
  end

再加上一點檢查,只倒數到 0 ,就會 raise exception,OTP process 會 crash

def init(initial_val) do
    Process.send_after(self, :tick, 1000)
    {:ok, initial_val}
  end

  def handle_info(:tick, val) when val <= 0, do: raise "boom!"
  def handle_info(:tick, val) do
    IO.puts "tick #{val}"
    Process.send_after(self, :tick, 1000)
    {:noreply, val - 1}
  end

但可發現它會自動重新啟動

$ iex -S mix
iex(1)> tick 5
tick 4
tick 3
tick 2
tick 1
[error] GenServer #PID<0.365.0> terminating
** (RuntimeError) boom!
    (rumbl) lib/rumbl/counter.ex:21: Rumbl.Counter.handle_info/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: :tick
State: 0
tick 5
tick 4
tick 3
tick 2
tick 1

Restart Strategies

預設 child processes 的 restart strategy 是 :permanent,也可以用 restart 指定

worker(Rumbl.Counter, [5], restart: :permanent),
  1. :permanent

    child is always restarted (default)

  2. :temporary

    child is never restarted

  3. :transient

    只在異常終止時 restart,也就是 :normal, :shutdown, {:shutdown, term} 以外的 exit reason

另外還有 maxrestarts 及 maxseconds 參數,在 maxsecodns 時間內可以 restart maxrestarts 次

預設是 3 restarts in 5 seconds

Supervision Strategies

  1. :oneforone

    a child terminates -> supervisor restars only that process

  2. :oneforall

    a child tetminates -> supervisor terminates all children and restarts them

  3. :restforone

    a child terminates -> supervisor terminates all child processes defiend after the one that dies,並 restart all terminated processes

  4. :simpleonefor_one

    類似 :oneforone,用在 supervisor 需要動態 supervise processes 的情況,例如 web server 需要 supervise web requests,通常有 10 ~ 100,000 個 concurrent running processes

把 strategy 換成 :oneforall 測試

def start(_type, _args) do
    import Supervisor.Spec, warn: false

    children = [
      supervisor(Rumbl.Endpoint, []),
      supervisor(Rumbl.Repo, []),
      worker(Rumbl.Counter, [5]), # new counter worker
    ]

    opts = [strategy: :one_for_all, name: Rumbl.Supervisor]
    Supervisor.start_link(children, opts)
  end

啟動 Phoenix server 會發現 Cowboy 也 restart

tick 2
tick 1
[error] GenServer #PID<0.348.0> terminating
** (RuntimeError) boom!
    (rumbl) lib/rumbl/counter.ex:21: Rumbl.Counter.handle_info/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: :tick
State: 0
[info] Running Rumbl.Endpoint with Cowboy using http://localhost:4000
tick 5

現在把 worker 拿掉,strategy 改回 :oneforone

children = [
    supervisor(Rumbl.Endpoint, []),
    supervisor(Rumbl.Repo, [])
]

opts = [strategy: :one_for_one, name: Rumbl.Supervisor]

Using Agents

agent 類似 GenServer,但只有 5 個 main functions

start_link 啟動 agent, stop 停止, update 更新 agent 狀態

$ iex -S mix

iex(1)> import Agent
Agent
iex(2)> {:ok, agent} = start_link fn -> 5 end
{:ok, #PID<0.258.0>}
iex(3)> update agent, &(&1 + 1)
:ok
iex(4)> get agent, &(&1)
6
iex(5)> stop agent
:ok

加上 :name option

iex(7)> {:ok, agent} = start_link fn -> 5 end, name: MyAgent
{:ok, #PID<0.265.0>}
iex(8)> update MyAgent, &(&1 + 1)
:ok
iex(9)> get MyAgent, &(&1)
6
iex(10)> stop MyAgent
:ok

重複名稱會發生 error

iex(11)> {:ok, agent} = start_link fn -> 5 end, name: MyAgent
{:ok, #PID<0.271.0>}
iex(12)> {:ok, agent} = start_link fn -> 5 end, name: MyAgent
** (MatchError) no match of right hand side value: {:error, {:already_started, #PID<0.271.0>}}

Phoenix.Channel 就是用 Agent 實作

Design an Information System with OTP

新增 /lib/rumbl/infosyssupervisor.ex


defmodule Rumbl.InfoSys.Supervisor do
  use Supervisor

  def start_link() do
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_opts) do
    children = [
      worker(Rumbl.InfoSys, [], restart: :temporary)
    ]

    supervise children, strategy: :simple_one_for_one
  end
end

修改 /lib/rumbl.ex

    children = [
      supervisor(Rumbl.Endpoint, []),
      supervisor(Rumbl.InfoSys.Supervisor, []), # new supervisor
      supervisor(Rumbl.Repo, []),
#      worker(Rumbl.Counter, [5]), # new counter worker
    ]

Building a start_link Proxy

啟動時動態決定要幾個 service,要製作一個 worker,可啟動多個 backends

proxy function 是 client, server 之間的 lightweight function interface

新增 /lib/rumbl/info_sys.ex


defmodule Rumbl.InfoSys do
  @backends [Rumbl.InfoSys.Wolfram]

  defmodule Result do
    defstruct score: 0, text: nil, url: nil, backend: nil
  end

  def start_link(backend, query, query_ref, owner, limit) do 
    backend.start_link(query, query_ref, owner, limit)
  end

  def compute(query, opts \\ []) do 
    limit = opts[:limit] || 10
    backends = opts[:backends] || @backends

    backends
    |> Enum.map(&spawn_query(&1, query, limit))
  end

  defp spawn_query(backend, query, limit) do 
    query_ref = make_ref()
    opts = [backend, query, query_ref, self(), limit]
    {:ok, pid} = Supervisor.start_child(Rumbl.InfoSys.Supervisor, opts)
    {pid, query_ref}
  end
end

InfoSys 跟一般 GenServer 有一點不同,裡面存放到 results 到 module attribute -> 所有支援的 backends 為 list

Result struct 會儲存每個 search result 的結果,還有 score 及 relevnace, text to describe the result, url

startlink 就是 proxy,會再呼叫其他 backend 的 startlink

compute 會 maps over all backends,呼叫每個 backend 的 spawn_query

Building the Wolfram into System

修改 /rumbl/mix.exs

{:sweet_xml, "~> 0.5.0"},
mix deps.get

wolfram 申請帳號,及 APP ID

APP NAME: wolfram
APPID: LP93J3-XXXXXXXXXX

新增設定 /config/dev.secret.exs

use Mix.Config
config :rumbl, :wolfram, app_id: "LP93J3-XXXXXXXXXX"

記得將 /config/dev.secret.exs 放到 .gitignore

修改 /config/dev.exs,最後面加上

import_config "dev.secret.exs"

新增 /lib/rumbl/info_sys/wolfram.ex

defmodule Rumbl.InfoSys.Wolfram do
  import SweetXml
  alias Rumbl.InfoSys.Result

  def start_link(query, query_ref, owner, limit) do 
    Task.start_link(__MODULE__, :fetch, [query, query_ref, owner, limit])
  end

  def fetch(query_str, query_ref, owner, _limit) do 
    query_str
    |> fetch_xml()
    |> xpath(~x"/queryresult/pod[contains(@title, 'Result') or
                                 contains(@title, 'Definitions')]
                            /subpod/plaintext/text()")
    |> send_results(query_ref, owner)
  end

  defp send_results(nil, query_ref, owner) do 
    send(owner, {:results, query_ref, []})
  end
  defp send_results(answer, query_ref, owner) do
    results = [%Result{backend: "wolfram", score: 95, text: to_string(answer)}]
    send(owner, {:results, query_ref, results})
  end

  defp fetch_xml(query_str) do 
    {:ok, {_, _, body}} = :httpc.request(
      String.to_char_list("http://api.wolframalpha.com/v2/query" <>
        "?appid=#{app_id()}" <>
        "&input=#{URI.encode(query_str)}&format=plaintext"))
    body
  end

  defp app_id, do: Application.get_env(:rumbl, :wolfram)[:app_id]
end

這個 module 並沒有 GenServer 的 callbacks,因為這個 process 是一個 task,GenServer 是一個 generic server 可計算並儲存 state,但有時我們只需要 store state 或是 只需要執行某個 function。

Agent 是簡化的 GenServer 可儲存 state task 是個簡單的 process 可執行某個 function

SweetXml 用來 parse XML,Result 是 the struct for the results

Task.start_link 是啟動 Task 的方式

fetch_xml 裡面試用 :httpc,這是 erlang 的 standard library,可處理 HTTP request

sendresults(queryref, owner) 將結果回傳給 requester

有兩種 send_results,分為有 answer 或沒有

先用 iex -S mix 測試

iex(4)> Rumbl.InfoSys.compute("what is elixir?")
[{#PID<0.592.0>, #Reference<0.3775272462.1982070785.26789>}]
iex(5)> flush()
:ok
iex(6)> flush()
:ok
iex(7)> flush()
{:results, #Reference<0.3775272462.1982070785.26789>,
 [%Rumbl.InfoSys.Result{backend: "wolfram", score: 95,
   text: "1 | noun | a sweet flavored liquid (usually containing a small amount of alcohol) used in compounding medicines to be taken by mouth in order to mask an unpleasant taste\n2 | noun | hypothetical substance that the alchemists believed to be capable of changing base metals into gold\n3 | noun | a substance believed to cure all ills",
   url: nil}]}
:ok

要讓 service 更堅固,必須做以下工作

  1. 偵測 backend crash,這樣就不要等 results

  2. 由 backend 取得結果要根據 score 排序

  3. 需要 timeout 機制


Monitoring Processes

使用 Process.monitor 在 waiting results 時偵測 backend crashes,一但設定了 monitor,會在該 process dies 時,收到 message。

測試

iex(1)> pid = spawn(fn -> :ok end)
#PID<0.261.0>
iex(2)> Process.monitor(pid)
#Reference<0.777943872.2254438401.3282>
iex(3)> flush()
{:DOWN, #Reference<0.777943872.2254438401.3282>, :process, #PID<0.261.0>,
 :noproc}
:ok

修改 /lib/rumbl/info_sys.ex

defmodule Rumbl.InfoSys do
  @backends [Rumbl.InfoSys.Wolfram]

  defmodule Result do
    defstruct score: 0, text: nil, url: nil, backend: nil
  end

  def start_link(backend, query, query_ref, owner, limit) do
    backend.start_link(query, query_ref, owner, limit)
  end

  def compute(query, opts \\ []) do
    limit = opts[:limit] || 10
    backends = opts[:backends] || @backends

    backends
    |> Enum.map(&spawn_query(&1, query, limit))
    |> await_results(opts)
    |> Enum.sort(&(&1.score >= &2.score))
    |> Enum.take(limit)
  end

  defp spawn_query(backend, query, limit) do
    query_ref = make_ref()
    opts = [backend, query, query_ref, self(), limit]
    {:ok, pid} = Supervisor.start_child(Rumbl.InfoSys.Supervisor, opts)
    monitor_ref = Process.monitor(pid)
    {pid, monitor_ref, query_ref}
  end

  defp await_results(children, _opts) do
    await_result(children, [], :infinity)
  end

  defp await_result([head|tail], acc, timeout) do
    {pid, monitor_ref, query_ref} = head

    receive do
      {:results, ^query_ref, results} ->
        Process.demonitor(monitor_ref, [:flush])
        await_result(tail, results ++ acc, timeout)
      {:DOWN, ^monitor_ref, :process, ^pid, _reason} ->
        await_result(tail, acc, timeout)
    end
  end

  defp await_result([], acc, _) do
    acc
  end
end

compute 會自動等待 results,收到時,會 sorting by score,並回報 top ones

spawn_query 裡面增加了 Process.monitor(pid)

awaitresults 是 recursive function,在每次呼叫 awaitresults 就會新增一個 result 到 list

正確的 result 會 match {:results, ^query_ref, result}

Process.demonitor(monitor_ref, [:flush]) 是將 monitor process 移除

現在 compute 會自動處理結果

iex(1)> Rumbl.InfoSys.compute("what is the meaning of life?")
[%Rumbl.InfoSys.Result{backend: "wolfram", score: 95,
  text: "42\n(according to the book The Hitchhiker's Guide to the Galaxy, by Douglas Adams)",
  url: nil}]

Timeout

receive 可設定 after 這個 timeout 機制

receive do
  :this_will_never_arrive -> :ok
after
  1_000 -> :timedout
end

修改 /lib/rumbl/infosys.ex 等待 backend 5000 ms


defmodule Rumbl.InfoSys do
  @backends [Rumbl.InfoSys.Wolfram]

  defmodule Result do
    defstruct score: 0, text: nil, url: nil, backend: nil
  end

  def start_link(backend, query, query_ref, owner, limit) do
    backend.start_link(query, query_ref, owner, limit)
  end

  def compute(query, opts \\ []) do
    limit = opts[:limit] || 10
    backends = opts[:backends] || @backends

    backends
    |> Enum.map(&spawn_query(&1, query, limit))
    |> await_results(opts)
    |> Enum.sort(&(&1.score >= &2.score))
    |> Enum.take(limit)
  end

  defp spawn_query(backend, query, limit) do
    query_ref = make_ref()
    opts = [backend, query, query_ref, self(), limit]
    {:ok, pid} = Supervisor.start_child(Rumbl.InfoSys.Supervisor, opts)
    monitor_ref = Process.monitor(pid)
    {pid, monitor_ref, query_ref}
  end

  defp await_results(children, opts) do
    timeout = opts[:timeout] || 5000
    timer = Process.send_after(self(), :timedout, timeout) 
    results = await_result(children, [], :infinity)
    cleanup(timer)
    results
  end

  defp await_result([head|tail], acc, timeout) do
    {pid, monitor_ref, query_ref} = head

    receive do
      {:results, ^query_ref, results} ->
        Process.demonitor(monitor_ref, [:flush])
        await_result(tail, results ++ acc, timeout)
      {:DOWN, ^monitor_ref, :process, ^pid, _reason} ->
        await_result(tail, acc, timeout)
      :timedout -> 
        kill(pid, monitor_ref)
        await_result(tail, acc, 0)
    after
      timeout ->
        kill(pid, monitor_ref)
        await_result(tail, acc, 0)
    end
  end

  defp await_result([], acc, _) do
    acc
  end

  defp kill(pid, ref) do 
    Process.demonitor(ref, [:flush])
    Process.exit(pid, :kill)
  end

  defp cleanup(timer) do  
    :erlang.cancel_timer(timer)
    receive do
      :timedout -> :ok
    after
      0 -> :ok
    end
  end
end

Integrating OTP Services with Channels

將剛剛的服務放到 VideoChannel 中

修改 /web/channels/video_channel.ex


defmodule Rumbl.VideoChannel do
  use Rumbl.Web, :channel
  alias Rumbl.AnnotationView

  def join("videos:" <> video_id, params, socket) do
    last_seen_id = params["last_seen_id"] || 0
    video_id = String.to_integer(video_id)
    video = Repo.get!(Rumbl.Video, video_id)

    annotations = Repo.all(
      from a in assoc(video, :annotations),
      where: a.id > ^last_seen_id,
      order_by: [asc: a.at, asc: a.id],
      limit: 200,
      preload: [:user]
    )
    resp = %{annotations: Phoenix.View.render_many(annotations, AnnotationView,
      "annotation.json")}

    {:ok, resp, assign(socket, :video_id, video.id)}
  end

  def handle_in(event, params, socket) do
    user = Repo.get(Rumbl.User, socket.assigns.user_id)
    handle_in(event, params, user, socket)
  end

  def handle_in("new_annotation", params, user, socket) do
    changeset =
      user
      |> build_assoc(:annotations, video_id: socket.assigns.video_id)
      |> Rumbl.Annotation.changeset(params)

    case Repo.insert(changeset) do
      {:ok, ann} ->
        broadcast_annotation(socket, ann)
        Task.start_link(fn -> compute_additional_info(ann, socket) end)
        {:reply, :ok, socket}

      {:error, changeset} ->
        {:reply, {:error, %{errors: changeset}}, socket}
    end
  end

  defp broadcast_annotation(socket, annotation) do
    annotation = Repo.preload(annotation, :user)
    rendered_ann = Phoenix.View.render(AnnotationView, "annotation.json", %{
      annotation: annotation
    })
    broadcast! socket, "new_annotation", rendered_ann
  end

  defp compute_additional_info(ann, socket) do
    for result <- Rumbl.InfoSys.compute(ann.body, limit: 1, timeout: 10_000) do
      attrs = %{url: result.url, body: result.text, at: ann.at}
      info_changeset =
        Repo.get_by!(Rumbl.User, username: result.backend)
        |> build_assoc(:annotations, video_id: ann.video_id)
        |> Rumbl.Annotation.changeset(attrs)

      case Repo.insert(info_changeset) do
        {:ok, info_ann} -> broadcast_annotation(socket, info_ann)
        {:error, _changeset} -> :ignore
      end
    end
  end
end

新增 /rumbl/priv/repo/backend_seeds.exs

alias Rumbl.Repo
alias Rumbl.User

Repo.insert!(%User{name: "Wolfram", username: "wolfram"})
$ mix run priv/repo/backend_seeds.exs
Compiling 19 files (.ex)
warning: String.to_char_list/1 is deprecated, use String.to_charlist/1
  lib/rumbl/info_sys/wolfram.ex:29

warning: function authenticate/2 is unused
  web/controllers/user_controller.ex:40

[debug] QUERY OK db=0.2ms queue=12.1ms
begin []
[debug] QUERY OK db=0.9ms
INSERT INTO `users` (`name`,`username`,`inserted_at`,`updated_at`) VALUES (?,?,?,?) ["Wolfram", "wolfram", {{2017, 9, 7}, {3, 55, 52, 165910}}, {{2017, 9, 7}, {3, 55, 52, 168676}}]
[debug] QUERY OK db=1.8ms
commit []

Inpsecting with Observer

erlang 有個 Observer 工具,可用這個方式啟動

:observer.start
iex -S mix

# 用這個方式啟動,可查看 Phoenix 部分的狀況
iex -S mix phoenix.server

supervision tree 是一個好的工具,可以觀察要怎麼將 application 分開。現在我們要將 application 分成兩個部分 :rumbl 及 :info_sys

利用 umbrella project 來處理

Using Umbrellas

每個 umbrella project 目錄包含以下的部分

  1. shared configuration of the project
  2. dependencies for the project
  3. apps 目錄 with child applications

要重新產生一個 umbrella project

$ mix new rumbrella --umbrella
* creating .gitignore
* creating README.md
* creating mix.exs
* creating apps
* creating config
* creating config/config.exs

Your umbrella project was created successfully.
Inside your project, you will find an apps/ directory
where you can create and host many apps:

    cd rumbrella
    cd apps
    mix new my_app

Commands like "mix compile" and "mix test" when executed
in the umbrella project root will automatically run
for each application in the apps/ directory.

將 InfoSys 移動到 rumbrella 下面

$ cd rumbrella/apps

$ mix new info_sys --sup
* creating README.md
* creating .gitignore
* creating mix.exs
* creating config
* creating config/config.exs
* creating lib
* creating lib/info_sys.ex
* creating lib/info_sys/application.ex
* creating test
* creating test/test_helper.exs
* creating test/info_sys_test.exs

Your Mix project was created successfully.
You can use "mix" to compile it, test it, and more:

    cd info_sys
    mix test

Run "mix help" for more commands.

將 InfoSys 由 rumbl 移到 info_sys

  1. 將 rumbl/lib/rumbl/infosys.ex Rumbl.InfoSys 改為 InfoSys 並複製到 infosys/lib/info_sys.ex ,前面加上

    use Application
    
    def start(_type, _args) do
    InfoSys.Supervisor.start_link()
    end
    
    @backends [InfoSys.Wolfram]
  2. 將 /rumbl/lib/rumbl/infosys/supervisor.ex Rumbl.InfoSys.Supervisor 改為 InfoSys.Supervisor 並複製到 infosys/lib/info_sys/supervisor.ex

  3. 將 /rumbl/lib/rumbl/infosys/wolfram.ex 的 Rumbl.InfoSys.Wolfram module 改為 InfoSys.Wolfram,並複製到 infosys/lib/info_sys/wolfram.ex

  4. 把所有 "Rumbl.InfoSys" 都改為 "InfoSys"

  5. 修改 lib/infosys/wolfram.ex,由 :rumbl 改為 :infosys

    defp app_id, do: Application.get_env(:info_sys, :wolfram)[:app_id]
  6. 修改 apps/info_sys/mix.exs

    def deps do
        [
        {:sweet_xml, "~> 0.5.0"}
        ]
    end
  7. 在 rumbrella 目錄執行 $ mix deps.get


將 rumbl 改為 umbrella child

  1. 將整個 rumbl 移動到 apps 目錄

  2. 修改 rumbrella/apps/rumbl/mix.exs,增加

      build_path: "../../_build",
      config_path: "../../config/config.exs",
      deps_path: "../../deps",
      lockfile: "../../mix.lock",
  3. 修改 applicaton ,增加 :info_sys

    def application do
        [mod: {Rumbl, []},
         applications: [:phoenix, :phoenix_pubsub, :phoenix_html, :cowboy, :logger, :gettext,
                        :phoenix_ecto, :mariaex, :comeonin, :info_sys]]
      end
  4. 更新 dependencies,移除 :sweetxml ,增加 :infosys

    {:info_sys, in_umbrella: true},
  5. 修改 rumbl/lib/rumbl.ex 移除 Rumbl.InfoSys

        children = [
          supervisor(Rumbl.Endpoint, []),
          supervisor(Rumbl.Repo, []),
        ]
  6. 修改 /web/channlel/videochannel.ex 的 computeadditional_info 將 Rumbl.InfoSys 改為 InfoSys

  7. 修改 cofnig/dev.secrets.exs 改為 :info_sys

    use Mix.Config
    config :info_sys, :wolfram, app_id: "LP93J3-XXXXXX"
  8. 修改 rumbl/package.json

      "dependencies": {
        "phoenix": "file:../../deps/phoenix",
        "phoenix_html": "file:../../deps/phoenix_html"
      },

在 rumbrella 目錄中

$ mix deps.get
$ mix test

References

Programming Phoenix

2018/10/29

Phoenix_8_Channels

Phoenix channel 是一個 conversation,channel 會發送, 接收 messages,並保存 state,這些 messages 稱為 events,state 會存放在稱為 socket 的 struct 內。

conversation 就是 topic,就像是 chat room, local map , a game, or a video。超過一個人在同一時間對相同的主題有興趣,Channels 可用在多個 users 之間互通訊息。因為這是用 erlang isolated, dedicated process 實作的。

網頁的 request/response 是 stateless,但 conversation 是長時間運作的的 process,是 stateful 的。

Phoenix Clients with ES6

利用 ECMAScript 6 JavaScript 功能實作 client,ES6的code 可以 transpile 為 ES5,實作 client 對 video 新增 annotations,並發送給所有 users。

每一個 Phonenix conversation 是一個 Topic,因此要先確認,要以什麼為 Topic,目前是以 video 為 topic。

新增 /web/static/js/video.js

import Player from "./player"

let Video = {

  init(socket, element){ if(!element){ return }
    let playerId = element.getAttribute("data-player-id")
    let videoId  = element.getAttribute("data-id")
    socket.connect()
    Player.init(element.id, playerId, () => { 
      this.onReady(videoId, socket)
    })
  },

  onReady(videoId, socket){
    let msgContainer = document.getElementById("msg-container")
    let msgInput     = document.getElementById("msg-input")
    let postButton   = document.getElementById("msg-submit")
    let vidChannel   = socket.channel("videos:" + videoId)
    // TODO join the vidChannel
  }
}
export default Video

socket.connect() 可產生一個 websocket

注意 let vidChannel = socket.channel("videos:" + videoId) ,這是 ES6 Client 連接 Phoenix VideoChannel 的 channel。

Topic 需要一個 identifier,我們選用 "videos:" + videoId 這個格式,我們需要在 topic 內對其他相同 topic 的 users 發送 events。


修改 /web/static/js/app.js

import "phoenix_html"

import socket from "./socket"
import Video from "./video"

Video.init(socket, document.getElementById("video"))

如果瀏覽 http://localhost:4000/watch/2-elixir 時,js console 會出現這樣的錯誤訊息。

Unable to join – {reason: "unmatched topic"}

Preparing Our Server for the Channel

傳統 web request/response 每一次都會產生一個 connection,也就是 Plug.Conn,每個新的 request 都會有新的 conn,接下來用 pipeline 處理,最後 die。

channel 的流程跟上面的不同,client 會用 socket 建立 connection,在建立連線後,socket 會在整個 connection 的過程中持續被 transformed。socket 就是 client/server 之間持續運作的 conversation。

首先要決定是否能建立 connection,然後要產生 initial socket,包含所有 custom application setup。

修改 /web/static/js/socket.js

import {Socket} from "phoenix"

let socket = new Socket("/socket", {
  params: {token: window.userToken},
  logger: (kind, msg, data) => { console.log(`${kind}: ${msg}`, data) }
})

export default socket

let socket = new Socket("/socket".... 會讓 Phoenix 建立新的 socket。

查看 /lib/rumbl/endpoint.ex,有一個 /socket 的定義,UserSocket 就是處理 socket connection 的 module。

socket "/socket", Rumbl.UserSocket

先看一下 /web/channels/user_socket.ex 的內容

defmodule Rumbl.UserSocket do
  use Phoenix.Socket

  ## Transports
  transport :websocket, Phoenix.Transports.WebSocket
  # transport :longpoll, Phoenix.Transports.LongPoll

  def connect(_params, socket) do
    {:ok, socket}
  end

  def id(_socket), do: nil
end

UserSocket 使用 connection 處理所有 channel processes。 Phoenix 支援兩種 Transport protocols: websocket 或是 longpoll,也可以自訂一個。除了 transport 不同之外,其他的部分都是一樣的。

使用 shared socket abstraction,然後讓 Phoenix 處理其他的工作。

UserSocket 有兩個 functions: connect 及 id。id 是用來識別 socket,以便儲存不同的 state。目前 id 為 nil,connect 基本上就是接受所有連線。

接下來要利用 rumbl.Auth 增加 socket authentication。

如果再一次瀏覽網址 http://localhost:4000/watch/2-elixir,在 js console 就可看到此 debug message,表示已經連上 server。

transport: connected to ws://localhost:4000/socket/websocket?token=undefined&vsn=1.0.0

Creating the Channel

channel 就是 a conversation on a topic,topic 的 id 為 videos:video_id,我們希望 user 能取得某個 topic 的所有 events,也就是 video 的所有 annotations。

topic id 的一般形式就是 topic:subtopic,topic 為 resource name,subtopic 為 ID

因為 URL 就有參數,可以識別 conversation,也就是 :id


Joining a Channel

在 /web/channels/user_socket.ex 增加一行

channel "videos:*", Rumbl.VideoChannel

videos:* conversation 以 resource name 及 ID 作為 topic 的分類方式


Building the Channel Module

新增 /web/channels/video_channel.ex

defmodule Rumbl.VideoChannel do
  use Rumbl.Web, :channel

  def join("videos:" <> video_id, _params, socket) do
    {:ok, assign(socket, :video_id, String.to_integer(video_id))}
  end
end

channel 的第一個 callback 就是 join,clients 可 join topics on a channel,如果成功就回傳 {:ok, socket},拒絕連線就回傳 {:error, socket}

現在先讓所有 socket 可任意 join video topics,並新增 video ID (由 topic 取得) 到 socket.assigns。socket 會在 socket.assigns 儲存某個 conversation 的所有狀態 state。

socket 會被 transformed 為 loop,而不是一連串的 pipelines。當 events 進出 channel 時,可同時存取 socket state。

修改 /web/static/js/video.js

import Player from "./player"

let Video = {

    init(socket, element){ if(!element){ return }
        let playerId = element.getAttribute("data-player-id")
        let videoId  = element.getAttribute("data-id")
        socket.connect()
        Player.init(element.id, playerId, () => {
            this.onReady(videoId, socket)
    })
    },

    onReady(videoId, socket){
        let msgContainer = document.getElementById("msg-container")
        let msgInput     = document.getElementById("msg-input")
        let postButton   = document.getElementById("msg-submit")
        
        // 以 "videos:" videoId 產生新的 channel object
        let vidChannel   = socket.channel("videos:" + videoId)

        vidChannel.join()
            .receive("ok", resp => console.log("joined the video channel", resp) )
    .receive("error", reason => console.log("join failed", reason) )
    }
}
export default Video

如果再一次瀏覽網址 http://localhost:4000/watch/2-elixir,在 js console 就可看到

[Log] push: videos:2 phx_join (1) – {} (app.js, line 1586)
[Log] receive: ok videos:2 phx_reply (1) – {status: "ok", response: {}} (app.js, line 1586)
[Log] joined the video channel – {}

server 的 console log 為

[info] JOIN videos:2 to Rumbl.VideoChannel
  Transport:  Phoenix.Transports.WebSocket
  Parameters: %{}
[info] Replied videos:2 :ok

Sending and Receiving Event

在 channel 收到的訊息為 event name + payload + arbitrary data

channel 有三種接收訊息的方式:

  1. handle_in

    receives direct channel events

  2. handle_out

    intercepts broadcast events

  3. handle_info

    receives OTP messages


Taking Channels for a Trial run

目前是讓 join function 每 5 seconds 就發送一次 :ping message 到 channel

修改 /web/channels/video_channel.ex

defmodule Rumbl.VideoChannel do
  use Rumbl.Web, :channel

  def join("videos:" <> video_id, _params, socket) do
    :timer.send_interval(5_000, :ping)
    {:ok, socket}
  end

  # 當 elixir message 到達 channel 就會呼叫 handle_info
  # 目前每收到一次就將 :count +1
  def handle_info(:ping, socket) do
    count = socket.assigns[:count] || 1
    push socket, "ping", %{count: count}

     # :noreply 代表不發送 reply,並將 transformed 後的 socket 回傳回去
    {:noreply, assign(socket, :count, count + 1)}
  end
end

client 要對應修改 video.js,增加 vidChannel.on("ping", ({count}) => console.log("PING", count) )

    let vidChannel   = socket.channel("videos:" + videoId)

    vidChannel.on("ping", ({count}) => console.log("PING", count) )

    vidChannel.join()
      .receive("ok", resp => console.log("joined the video channel", resp) )
      .receive("error", reason => console.log("join failed", reason) )

js console 會持續看到

[Log] receive:  videos:2 ping  – {count: 1} (app.js, line 1586)
[Log] receive:  videos:2 ping  – {count: 2} (app.js, line 1586)
[Log] receive:  videos:2 ping  – {count: 3} (app.js, line 1586)
[Log] receive:  videos:2 ping  – {count: 4} (app.js, line 1586)

handle_info 就是 loop

client js 的部分是以 vidChannel.on(event, callback) 處理訊息

後面會看到怎麼用 handle_in 處理 synchronous messaging

controller 處理 request 而 channels hold a conversation


Annotating Videos

需要一個 Annotation model 儲存 user annotations

修改 /web/static/js/video.js

import Player from "./player"

let Video = {

  init(socket, element){ if(!element){ return }
    let playerId = element.getAttribute("data-player-id")
    let videoId  = element.getAttribute("data-id")
    socket.connect()
    Player.init(element.id, playerId, () => {
      this.onReady(videoId, socket)
    })
  },

  onReady(videoId, socket){
    let msgContainer = document.getElementById("msg-container")
    let msgInput     = document.getElementById("msg-input")
    let postButton   = document.getElementById("msg-submit")
    let vidChannel   = socket.channel("videos:" + videoId)

     // 處理 post 按鈕的 click event
     // 利用 vidChannel.push 發送 new_annotation
    postButton.addEventListener("click", e => {
      let payload = {body: msgInput.value, at: Player.getCurrentTime()}
      vidChannel.push("new_annotation", payload)        
                .receive("error", e => console.log(e) ) 
      msgInput.value = ""
    })

    // 接收 server 發送的 new_annotation,把 annotation 顯示在畫面 msgContainer 上
    vidChannel.on("new_annotation", (resp) => {         
      this.renderAnnotation(msgContainer, resp)
    })

    vidChannel.join()
      .receive("ok", resp => console.log("joined the video channel", resp) )
      .receive("error", reason => console.log("join failed", reason) )
  },

  // safely escape user input,可避免發生 XSS attack
  esc(str){ 
    let div = document.createElement("div")
    div.appendChild(document.createTextNode(str))
    return div.innerHTML
  },

  renderAnnotation(msgContainer, {user, body, at}){ 
    let template = document.createElement("div")

    template.innerHTML = `
    <a href="#" data-seek="${this.esc(at)}">
      <b>${this.esc(user.username)}</b>: ${this.esc(body)}
    </a>
    `
    msgContainer.appendChild(template)
    msgContainer.scrollTop = msgContainer.scrollHeight
  }
}
export default Video

Adding Annotation on the Server

修改 /web/channels/video_channel.ex

defmodule Rumbl.VideoChannel do
  use Rumbl.Web, :channel

  def join("videos:" <> video_id, _params, socket) do
    {:ok, socket}
  end

  # 處理 new_annotation,並 broadcast! 給目前 topic 的所有 users
  # broadcast! 有三個參數  socket, name of the event, payload (任意的 map)
  def handle_in("new_annotation", params, socket) do
    broadcast! socket, "new_annotation", %{
      user: %{username: "anon"},
      body: params["body"],
      at: params["at"]
    }

    # :reply 有兩種 :ok 或是 :error
    # 不然就是用 :noreply
    {:reply, :ok, socket}
  end
end

注意: 將原始的 message payload 直接轉送給其他人,而沒有 inspection,可能會有 security 問題

如果這樣寫,就有可能有資安問題

broadcast! socket, "new_annotation", Map.put(params, "user", %{
username: "anon"
})

現在打開兩個網頁,就可以互傳訊息

Socket Authentication

因為 channel 是 long-duration connection,利用 Phoenix.Token token authentication,可為每個 user 指定一個 unique token

不使用 session cookie 的原因是,可能會有 cross-domain attack。

因為已經有利用 Rumbl.Auth plug 增加的 current_user,現在要做的是利用 authenticated user 產生 token 並傳給 socket 前端。

首先修改 /web/templates/layout/app.html.eex,將 userToken 由 layout assigns 取出並放在 browser window 中

    </div> <!-- /container -->
    <script>window.userToken = "<%= assigns[:user_token] %>"</script>
    <script src="<%= static_path(@conn, "/js/app.js") %>"></script>

修改 /web/controllers/auth.ex

  def call(conn, repo) do
    user_id = get_session(conn, :user_id)

    cond do
      user = conn.assigns[:current_user] ->
        put_current_user(conn, user) 
      user = user_id && repo.get(Rumbl.User, user_id) ->
        put_current_user(conn, user)
      true ->
        assign(conn, :current_user, nil)
    end
  end

  def login(conn, user) do
    conn
    |> put_current_user(user) 
    |> put_session(:user_id, user.id)
    |> configure_session(renew: true)
  end

  # 將 current_user 及 user_token 放到 conn.assigns
  defp put_current_user(conn, user) do
    token = Phoenix.Token.sign(conn, "user socket", user.id)

    conn
    |> assign(:current_user, user)
    |> assign(:user_token, token)
  end

修改 /web/static/js/socket.js ,將 user token 傳入 socket.connect,並在 UserSocket.connect callback 中驗證 token。

import {Socket} from "phoenix"

let socket = new Socket("/socket", {
  // :params 會出現在 UserSocket.connect 的第一個參數
  params: {token: window.userToken},
  logger: (kind, msg, data) => { console.log(`${kind}: ${msg}`, data) }
})

export default socket

修改 /web/channles/user_socket.ex

defmodule Rumbl.UserSocket do
  use Phoenix.Socket

  ## Channels
  channel "videos:*", Rumbl.VideoChannel

  ## Transports
  transport :websocket, Phoenix.Transports.WebSocket
  # transport :longpoll, Phoenix.Transports.LongPoll

  # 2 weeks
  @max_age 2 * 7 * 24 * 60 * 60

  def connect(%{"token" => token}, socket) do
    case Phoenix.Token.verify(socket, "user socket", token, max_age: @max_age) do
      {:ok, user_id} ->
        {:ok, assign(socket, :user_id, user_id)}
      {:error, _reason} ->
        :error
    end
  end
  def connect(_params, _socket), do: :error

  def id(socket), do: "users_socket:#{socket.assigns.user_id}"
end

利用 Phoenix.Token.verify 檢查 token,可設定 max_age

如果 token 正確,就會收到 user_id 並存在 socket.assigns,回傳 {:ok, socket} 用以建立 connection。token 錯誤,就 return :error

refresh your page,application 還是能正常運作,但已經有了 user authentication

Persisting Annotations

建立 Annotation model,create annotations on videos,每個 annotation 會 belong to a user and a video

$ mix phoenix.gen.model Annotation annotations body:text at:integer user_id:references:users video_id:references:videos
* creating web/models/annotation.ex
* creating test/models/annotation_test.exs
* creating priv/repo/migrations/20170906163920_create_annotation.exs

Remember to update your repository by running migrations:

    $ mix ecto.migrate


$ mix ecto.migrate
Compiling 1 file (.ex)
Generated rumbl app
[info] == Running Rumbl.Repo.Migrations.CreateAnnotation.change/0 forward
[info] create table annotations
[info] create index annotations_user_id_index
[info] create index annotations_video_id_index
[info] == Migrated in 0.0s

還要處理 User, Video 的 relationships

修改 /web/models/user.ex 及 /web/models/video.ex,增加 has_many

has_many :annotations, Rumbl.Annotation

回到 /web/channels/video_channel.ex

defmodule Rumbl.VideoChannel do
  use Rumbl.Web, :channel

  def join("videos:" <> video_id, _params, socket) do
    {:ok, assign(socket, :video_id, String.to_integer(video_id))}
  end

  # 確保所有 events 都會有 current_user,然後再呼叫其他 handle_in
  def handle_in(event, params, socket) do 
    user = Repo.get(Rumbl.User, socket.assigns.user_id)
    handle_in(event, params, user, socket)
  end

  def handle_in("new_annotation", params, user, socket) do 
    # 以 new_annotation 產生 changeset 並透過 Repo 存到 DB
    changeset =
      user
      |> build_assoc(:annotations, video_id: socket.assigns.video_id)
      |> Rumbl.Annotation.changeset(params)

    case Repo.insert(changeset) do
      # insert 成功,才 broadcast 給所有 subscribers
      # 也可以用  {:noreply, socket} 不送 reply
      {:ok, annotation} ->
        broadcast! socket, "new_annotation", %{
          id: annotation.id,
          user: Rumbl.UserView.render("user.json", %{user: user}), 
          body: annotation.body,
          at: annotation.at
        }
        {:reply, :ok, socket}

      {:error, changeset} ->
        {:reply, {:error, %{errors: changeset}}, socket}
    end
  end
end

因為也需要 notify subscribers 該 user 的資訊,在 UserView 新增 user.json template

defmodule Rumbl.UserView do
  use Rumbl.Web, :view
  alias Rumbl.User

  def first_name(%User{name: name}) do
    name
    |> String.split(" ")
    |> Enum.at(0)
  end

  def render("user.json", %{user: user}) do
    %{id: user.id, username: user.username}
  end
end

現在新增 annotation, server log 就會出現

INSERT INTO `annotations` (`at`,`body`,`user_id`,`video_id`,`inserted_at`,`updated_at`) VALUES (?,?,?,?,?,?) [0, "test", 1, 2, {{2017, 9, 6}, {16, 49, 10, 444088}}, {{2017, 9, 6}, {16, 49, 10, 447017}}]
[debug] QUERY OK db=0.6ms

在 refresh page 後,annotations 就會消失,所以在 user join channel 時,要把 messages 送給 client

修改 /web/channels/video_channel.ex,改寫 join,取得 video's annotations

defmodule Rumbl.VideoChannel do
  use Rumbl.Web, :channel
  alias Rumbl.AnnotationView

  def join("videos:" <> video_id, _params, socket) do
    video_id = String.to_integer(video_id)
    video = Repo.get!(Rumbl.Video, video_id)

    # 要 preload user
    annotations = Repo.all(
      from a in assoc(video, :annotations),
        order_by: [asc: a.at, asc: a.id],
        limit: 200,
        preload: [:user]
    )

    resp = %{annotations: Phoenix.View.render_many(annotations, AnnotationView,
                                                   "annotation.json")}
    {:ok, resp, assign(socket, :video_id, video_id)}
  end

  def handle_in("new_annotation", params, socket) do
    user = Rumbl.Repo.get(Rumbl.User, socket.assigns.user_id)

    changeset =
      user
      |> build_assoc(:annotations, video_id: socket.assigns.video_id)
      |> Rumbl.Annotation.changeset(params)

    case Repo.insert(changeset) do
      {:ok, annotation} ->
        broadcast! socket, "new_annotation", %{
          id: annotation.id,
          user: Rumbl.UserView.render("user.json", %{user: user}),
          body: annotation.body,
          at: annotation.at
        }
        {:reply, :ok, socket}

      {:error, changeset} ->
        {:reply, {:error, %{errors: changeset}}, socket}
    end
  end
end

Phoenix.View.render_many 能夠 collects the render results for all elements in the enumerable passed to it

新增 /web/views/annotation_view.ex

defmodule Rumbl.AnnotationView do
  use Rumbl.Web, :view

  def render("annotation.json", %{annotation: ann}) do
    %{
      id: ann.id,
      body: ann.body,
      at: ann.at,
      user: render_one(ann.user, Rumbl.UserView, "user.json")
    }
  end
end

注意 annotaion's user 的 render_one,可處理 nil results

更新 vidChannle.join() 以便 render list of annotations on join

    vidChannel.join()
      .receive("ok", ({annotations}) => {
        annotations.forEach( ann => this.renderAnnotation(msgContainer, ann) )
      })
      .receive("error", reason => console.log("join failed", reason) )

現在 reload 頁面就能看到所有 annotations


現在我們需要 schedule the annotations to appear synced up with the video playback

更新 /web/static/js/video.js


import Player from "./player"

let Video = {

    init(socket, element){ if(!element){ return }
        let playerId = element.getAttribute("data-player-id")
        let videoId  = element.getAttribute("data-id")
        socket.connect()
        Player.init(element.id, playerId, () => {
            this.onReady(videoId, socket)
    })
    },

    onReady(videoId, socket){
        let msgContainer = document.getElementById("msg-container")
        let msgInput     = document.getElementById("msg-input")
        let postButton   = document.getElementById("msg-submit")
        let vidChannel   = socket.channel("videos:" + videoId)

        postButton.addEventListener("click", e => {
            let payload = {body: msgInput.value, at: Player.getCurrentTime()}
            vidChannel.push("new_annotation", payload)
            .receive("error", e => console.log(e) )
        msgInput.value = ""
    })

        msgContainer.addEventListener("click", e => {
            e.preventDefault()
        let seconds = e.target.getAttribute("data-seek") ||
            e.target.parentNode.getAttribute("data-seek")
        if(!seconds){ return }

        Player.seekTo(seconds)
    })

        vidChannel.on("new_annotation", (resp) => {
            this.renderAnnotation(msgContainer, resp)
    })

        vidChannel.join()
            .receive("ok", resp => {
            this.scheduleMessages(msgContainer, resp.annotations)
    })
    .receive("error", reason => console.log("join failed", reason) )
    },

    renderAnnotation(msgContainer, {user, body, at}){
        let template = document.createElement("div")
        template.innerHTML = `
    <a href="#" data-seek="${this.esc(at)}">
      [${this.formatTime(at)}]
      <b>${this.esc(user.username)}</b>: ${this.esc(body)}
    </a>
    `
        msgContainer.appendChild(template)
        msgContainer.scrollTop = msgContainer.scrollHeight
    },

    scheduleMessages(msgContainer, annotations){
        setTimeout(() => {
            let ctime = Player.getCurrentTime()
            let remaining = this.renderAtTime(annotations, ctime, msgContainer)
            this.scheduleMessages(msgContainer, remaining)
    }, 1000)
    },

    renderAtTime(annotations, seconds, msgContainer){
        return annotations.filter( ann => {
                if(ann.at > seconds){
            return true
        } else {
            this.renderAnnotation(msgContainer, ann)
            return false
        }
    })
    },

    formatTime(at){
        let date = new Date(null)
        date.setSeconds(at / 1000)
        return date.toISOString().substr(14, 5)
    },

    esc(str){
        let div = document.createElement("div")
        div.appendChild(document.createTextNode(str))
        return div.innerHTML
    }
}
export default Video

根據 current player time 去 render annotations

scheduleMessages 每秒都會執行一次,每次都呼叫 renderAtTime

renderAtTime 會 filter 要 render 的 messages

現在再次 reload 頁面,就可看到 annotation 的時間


增加讓 annotation 可以點擊的功能,就可以直接跳躍影片到該 annotaion 產生的時間

修改 /web/static/js/video.js

    msgContainer.addEventListener("click", e => {
      e.preventDefault()
      let seconds = e.target.getAttribute("data-seek") ||
                    e.target.parentNode.getAttribute("data-seek")
      if(!seconds){ return }

      Player.seekTo(seconds)
    })

Handling Disconnects

JS client 可斷線再 reconnect,Server 可能會 restart,或是網路可能發生問題,這些問題都會造成斷線。

如果發送一個 annotation,然後馬上把 server 關掉,client 會以 exponential back-off 的方式進行 reconnect。重新啟動 server,會發現 server 會認為是新的連線,然後發送所有的 annotations,client 會出現重複的 annotations。client 必須偵測 duplicate annotations 並忽略處理。

我們可以在 client 追蹤 lastseenid,並在每次收到新的 annotation 時更新這個值。

當 client 重連時,可將 lastseenid 發送給 server,server 就只需要發送未收到的訊息。

修改 /web/static/js/video.js,增加 vidChannel.params.lastseenid

        vidChannel.on("new_annotation", (resp) => {
            vidChannel.params.last_seen_id = resp.id
        this.renderAnnotation(msgContainer, resp)
    })

        vidChannel.join()
            .receive("ok", resp => {
            let ids = resp.annotations.map(ann => ann.id)
        if(ids.length > 0){ vidChannel.params.last_seen_id = Math.max(...ids) }
        this.scheduleMessages(msgContainer, resp.annotations)
    })
    .receive("error", reason => console.log("join failed", reason) )

client 的 channel 會儲存 params 物件,並在每次 join 時,發送給 server。在 join 也要更新這個參數。

修改 /web/channels/video_channel.ex 的 join

  def join("videos:" <> video_id, params, socket) do
    last_seen_id = params["last_seen_id"] || 0
    video_id = String.to_integer(video_id)
    video = Repo.get!(Rumbl.Video, video_id)

    annotations = Repo.all(
      from a in assoc(video, :annotations),
      where: a.id > ^last_seen_id,
      order_by: [asc: a.at, asc: a.id],
      limit: 200,
      preload: [:user]
    )

    resp = %{annotations: Phoenix.View.render_many(annotations, AnnotationView,
      "annotation.json")}
    {:ok, resp, assign(socket, :video_id, video_id)}
  end

References

Programming Phoenix

2018/10/22

Phoenix_7_JavaScript

Watching Video

  • 修改 views 改成可以查看 videos
  • 建立一個新的 controller for watching video
  • 修改 router for new routes
  • 增加 JavaScript 以使用 YouTube API

修改 /web/templates/layout/app.html.eex 的 header

        <div class="header">
        <ol class="breadcrumb text-right">
          <%= if @current_user do %>
            <li><%= @current_user.username %></li>
            <li><%= link "My Videos", to: video_path(@conn, :index) %></li>
            <li>
              <%= link "Log out", to: session_path(@conn, :delete, @current_user),
                                  method: "delete" %>
            </li>
          <% else %>
            <li><%= link "Register", to: user_path(@conn, :new) %></li>
            <li><%= link "Log in", to: session_path(@conn, :new) %></li>
          <% end %>
        </ol>
        <span class="logo"></span>
      </div>

登入後,點擊 header 上面的 "My Videos" 進入 http://localhost:4000/manage/videos


新增 /web/controllers/watch_controller.ex

defmodule Rumbl.WatchController do
  use Rumbl.Web, :controller
  alias Rumbl.Video

  def show(conn, %{"id" => id}) do
    video = Repo.get!(Video, id)
    render conn, "show.html", video: video
  end
end

/web/templates/watch/show.html.eex

<h2><%= @video.title %></h2>
<div class="row">
  <div class="col-sm-7">
    <%= content_tag :div, id: "video",
          data: [id: @video.id, player_id: player_id(@video)] do %>
    <% end %>
  </div>
  <div class="col-sm-5">
    <div class="panel panel-default">
      <div class="panel-heading">
        <h3 class="panel-title">Annotations</h3>
      </div>
      <div id="msg-container" class="panel-body annotations">

      </div>
      <div class="panel-footer">
        <textarea id="msg-input"
                  rows="3"
                  class="form-control"
                  placeholder="Comment..."></textarea>
        <button id="msg-submit" class="btn btn-primary form-control"
type="submit">
          Post
        </button>
      </div>
    </div>
  </div>
</div>

/web/views/watch_view.ex

defmodule Rumbl.WatchView do
  use Rumbl.Web, :view

  def player_id(video) do
    ~r{^.*(?:youtu\.be/|\w+/|v=)(?<id>[^#&?]*)}
    |> Regex.named_captures(video.url)
    |> get_in(["id"])
  end
end

修改 /web/router.ex

  scope "/", Rumbl do
    pipe_through :browser # Use the default browser stack

    get "/", PageController, :index
    resources "/users", UserController, only: [:index, :show, :new, :create]

    resources "/sessions", SessionController, only: [:new, :create, :delete]

    get "/watch/:id", WatchController, :show
  end

修改 /web/templates/video/index.html.eex

<h2>Listing videos</h2>

<table class="table">
  <thead>
    <tr>
      <th>User</th>
      <th>Url</th>
      <th>Title</th>
      <th>Description</th>

      <th></th>
    </tr>
  </thead>
  <tbody>
<%= for video <- @videos do %>
    <tr>
      <td><%= video.user_id %></td>
      <td><%= video.url %></td>
      <td><%= video.title %></td>
      <td><%= video.description %></td>

      <td class="text-right">
        <%= link "Watch", to: watch_path(@conn, :show, video),
                          class: "btn btn-default btn-xs" %>

        <%= link "Edit", to: video_path(@conn, :edit, video),
                         class: "btn btn-default btn-xs" %>

        <%= link "Delete", to: video_path(@conn, :delete, video),
                           method: :delete,
                           data: [confirm: "Are you sure?"],
                           class: "btn btn-danger btn-xs" %>
      </td>
    </tr>
<% end %>
  </tbody>
</table>

<%= link "New video", to: video_path(@conn, :new) %>

Adding JavaScript

Brunch 是用 Node.js 撰寫的 build tool,Phoenix 使用 Brunch 去 build, transform, minify JS code,也能處理 css 及 assets。

Brunch 資料夾結構

web/static
  - assets
  - css
  - js
  - vendor

放在 assets 目錄是不需要 Brunch 轉換的資源,只會被複製到 priv/static,這個目錄是由 Phoenix.Static 作為 endpoint。

vendor 目錄是放 3rd party tools 例如 jQuery,external dependencies 不需要 import。

Brunch 是使用 ECMAScript6 (ES6) version,有支援 import 功能。每個 file 是一個 function,除非 import 到 app.js,否則不會自動被 browser 執行。

/web/static/js/app.js 裡面有一行

import "phoenix_html"

這就是 /web/templates/layout/app.html.eex 最後面 include 的 js

<script src="<%= static_path(@conn, "/js/app.js") %>"></script>

可在 brunch-config.js 填寫 Brunch 的設定

brunch 有三個指令

  1. brunch build

    build 所有 static files,compiling & copy 結果到 /priv/static

  2. brunch build --production

    build & minifies

  3. brunch watch

    開發時使用,brunch 會自動 recompile files。通常不需要執行,因為 Phoenix 已經有啟動了。

    /config/dev.exs 裡面有一行 watchers 設定

    watchers: [node: ["node_modules/brunch/bin/brunch", "watch", "--stdin",
                    cd: Path.expand("../", __DIR__)]]

新增 /web/static/js/player.js

let Player = {
  player: null,

  init(domId, playerId, onReady){
    window.onYouTubeIframeAPIReady = () => {
      this.onIframeReady(domId, playerId, onReady)
    }
    let youtubeScriptTag = document.createElement("script")
    youtubeScriptTag.src = "//www.youtube.com/iframe_api"
    document.head.appendChild(youtubeScriptTag)
  },

  onIframeReady(domId, playerId, onReady){
    this.player = new YT.Player(domId, {
      height: "360",
      width: "420",
      videoId: playerId,
      events: {
        "onReady":  (event => onReady(event) ),
        "onStateChange": (event => this.onPlayerStateChange(event) )
      }
    })
  },

  onPlayerStateChange(event){ },
  getCurrentTime(){ return Math.floor(this.player.getCurrentTime() * 1000) },
  seekTo(millsec){ return this.player.seekTo(millsec / 1000) }
}
export default Player

修改 /web/static/js/app.js,這樣才會編譯 player.js

import "phoenix_html"

import Player from "./player"
let video = document.getElementById("video")

if(video) {
    Player.init(video.id, video.getAttribute("data-player-id"), () => {
        console.log("player ready!")
    })
}

新增 /web/static/css/video.css

#msg-container {
  min-height: 190px;
}

Creating Slugs

如希望 videos 有一個唯一的 URL-friendly identified,稱為 slug,就需要一個 table 欄位,記錄給 search engine 使用的 unique URL。ex: 1-elixir

add a slug column to table videos

mix ecto.gen.migration add_slug_to_video

修改 migration

defmodule Rumbl.Repo.Migrations.AddSlugToVideo do
  use Ecto.Migration

  def change do
    alter table(:videos) do
      add :slug, :string
    end
  end
end

升級 DB

$ mix ecto.migrate
[info] == Running Rumbl.Repo.Migrations.AddSlugToVideo.change/0 forward
[info] alter table videos
[info] == Migrated in 0.0s

修改 /web/models/video.ex,增加 slug 欄位,並在 changeset 加上 slugify_title()


defmodule Rumbl.Video do
  use Rumbl.Web, :model

  schema "videos" do
    field :url, :string
    field :title, :string
    field :description, :string
    field :slug, :string
    belongs_to :user, Rumbl.User
    belongs_to :category, Rumbl.Category

    timestamps
  end

  @required_fields ~w(url title description)
  @optional_fields ~w(category_id)

  def changeset(model, params \\ %{}) do
    model
    |> cast(params, @required_fields, @optional_fields)
    |> slugify_title()
    |> assoc_constraint(:category)
  end

  defp slugify_title(changeset) do
    if title = get_change(changeset, :title) do
      put_change(changeset, :slug, slugify(title))
    else
      changeset
    end
  end

  defp slugify(str) do
    str
    |> String.downcase()
    |> String.replace(~r/[^\w-]+/u, "-")
  end
end
  • 因 Ecto 區隔了 changeset 及 record 定義,可將 change policy 分開,也能在 create video 的 JSON API 加上 slug

  • changeset 會 filter and cast 新資料,確保一些敏感資料不會從系統外面進來

  • changeset 可以 validate 資料

  • changeset 讓程式碼更容易閱讀及實作


Extending Phoenix with Protocols

查看 /web/templates/video/index.html.eex 產生 link 的部分

<%= link "Watch", to: watch_path(@conn, :show, video),
class: "btn btn-default btn-xs" %>

為了改用 slug,就修改為

watch_path(@conn, :show, "#{video.id}-#{video.slug}")

Phoenix.Param 是 Elixir Protocol,可謂任意一個 data type 自訂此參數。

修改 /web/models/video.ex 增加 defimpl Phoenix.Param, for: Rumbl.Video

  defimpl Phoenix.Param, for: Rumbl.Video do
    def to_param(%{slug: slug, id: id}) do
      "#{id}-#{slug}"
    end
  end

IEx 測試

iex(1)> video = %Rumbl.Video{id: 1, slug: "hello"}
%Rumbl.Video{__meta__: #Ecto.Schema.Metadata<:built, "videos">,
 category: #Ecto.Association.NotLoaded<association :category is not loaded>,
 category_id: nil, description: nil, id: 1, inserted_at: nil, slug: "hello",
 title: nil, updated_at: nil, url: nil,
 user: #Ecto.Association.NotLoaded<association :user is not loaded>,
 user_id: nil}


iex(2)> Rumbl.Router.Helpers.watch_path(%URI{}, :show, video)
"/watch/1-hello"
iex(4)> url = URI.parse("http://example.com/prefix")
%URI{authority: "example.com", fragment: nil, host: "example.com",
 path: "/prefix", port: 80, query: nil, scheme: "http", userinfo: nil}
iex(5)> Rumbl.Router.Helpers.watch_path(url, :show, video)
"/prefix/watch/1-hello"
iex(6)> Rumbl.Router.Helpers.watch_url(url, :show, video)
"http://example.com/prefix/watch/1-hello"

可使用 Rumbl.Endpoint.struct_url

iex(8)> url = Rumbl.Endpoint.struct_url
%URI{authority: nil, fragment: nil, host: "localhost", path: nil, port: 4000,
 query: nil, scheme: "http", userinfo: nil}
iex(9)> Rumbl.Router.Helpers.watch_url(url, :show, video)
"http://localhost:4000/watch/1-hello"

Extending Schemas with Ecto Types

新增 /lib/rumbl/permalink.ex


defmodule Rumbl.Permalink do
  @behaviour Ecto.Type

  def type, do: :id

  def cast(binary) when is_binary(binary) do
    case Integer.parse(binary) do
      {int, _} when int > 0 -> {:ok, int}
      _ -> :error
    end
  end

  def cast(integer) when is_integer(integer) do
    {:ok, integer}
  end

  def cast(_) do
    :error
  end

  def dump(integer) when is_integer(integer) do
    {:ok, integer}
  end

  def load(integer) when is_integer(integer) do
    {:ok, integer}
  end
end

Rumbl.Permalink 是根據 Ecto.Type behavior 定義的 custom type,需要定義四個 functions

  1. type

    回傳 underlying Ecto type,目前是以 :id 來建構

  2. cast

    當 external data 傳入 Ecto 時會呼叫,在 values in queries 被 interpolated 或是在 changeset 的 cast 被呼叫

  3. dump

    當 data 發送給 database 時被呼叫

  4. load

    由 DB 載入資料時被呼叫

iex(1)> alias Rumbl.Permalink, as: P
Rumbl.Permalink
iex(2)> P.cast "1"
{:ok, 1}
iex(3)> P.cast 1
{:ok, 1}
iex(4)> P.cast "13-hello-world"
{:ok, 13}
iex(5)> P.cast "hello-world-13"
:error

web/models/video.ex 增加 @primary_key

  @primary_key {:id, Rumbl.Permalink, autogenerate: true}
  schema "videos" do

就可以使用 http://localhost:4000/watch/2-elixir 這樣的 URL

References

Programming Phoenix