顯示具有 Python 標籤的文章。 顯示所有文章
顯示具有 Python 標籤的文章。 顯示所有文章

2024/06/17

gradio

gradio 是一組 python library,可快速建立網頁介面的 API,目的是提供給 Machine Learning Model demo 的使用者操作介面。大部分的 AI model 是以 python 開發,且使用程式或文字介面操作,gradio 可快速將使用介面轉換為網頁應用程式,不需要撰寫網頁 css, javascript 等程式碼。

Quickstart 是 gradio 的快速入門說明

安裝 gradio

pip3 install gradio

在執行時,遇到這樣的錯誤訊息

ImportError: urllib3 v2 only supports OpenSSL 1.1.1+, currently the 'ssl' module is compiled with 'OpenSSL 1.0.2k-fips  26 Jan 2017'. See: https://github.com/urllib3/urllib3/issues/2168

根據 python - ImportError: urllib3 v2.0 only supports OpenSSL 1.1.1+, currently the 'ssl' module is compiled with LibreSSL 2.8.3 - Stack Overflow 的說明,要 downgrade urllib3

pip3 uninstall urllib3
pip3 install 'urllib3<2.0'

test1.py,launch 時加上 server_name="0.0.0.0" 參數,可讓機器所有 network interfaces 在 TCP Port 7860 提供網頁服務

import gradio as gr

def greet(name, intensity):
    return "Hello " * intensity + name + "!"

demo = gr.Interface(
    fn=greet,
    inputs=["text", "slider"],
    outputs=["text"],
)

demo.launch(server_name="0.0.0.0")

在瀏覽器連接 http://localhost:7860

如果在 launch 加上參數 share=True

啟動時,會出現這樣的訊息

$ python3 test1.py
Running on local URL:  http://127.0.0.1:7860
Running on public URL: https://424222ce32aa52ca72.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)

這表示可透過一個 public redirect 的網址,連接到這個 service

References

gradio

Creating A Chatbot Fast

2019/01/13

decorator in python

python 的 decorator 功能,使用的語法看起來很像 Java 的 annotation,是在 function 前面加上 @decorator,雖然名稱是 decorator,但實際上比 Decorator Design Pattern 的功能還要多一點。

decorator class

function decorator 使用起來,就是直接在 function 定義前面,加上 @my_decorator。

@my_decorator
def aFunction():
    print("inside aFunction()")

實際上,當 compiler 編譯 aFunction 時,會將 aFunction 整個 function 傳入 myDecorator,有取代原本 aFunction 的感覺,也有點像是 my_decorator(aFunction) 這樣的意思。

要實作 my_decorator 必須要實作 __init__ 以及 __call__ 這兩個 function,__init__會在一開始就被呼叫,而 __call__ 是在結束時被呼叫。

class my_decorator(object):

    def __init__(self, f):
        print("inside my_decorator.__init__()")
        f()

    def __call__(self):
        print("inside my_decorator.__call__()")

@my_decorator
def aFunction():
    print("inside aFunction()")

print("Finished decorating aFunction()")

aFunction()

執行結果如下,在程式中,因為 decorator 初始化的關係,會先呼叫 my_decorator.__init__(),在這個函數裡面,會取得 aFunction 的函數定義,所以可以在裡面呼叫 aFunction,在 aFunction 結束後,才列印了 Finished 的部分。

而最後是因為呼叫 aFunction(),所以轉為呼叫 my_decorator.__call__()

inside my_decorator.__init__()
inside aFunction()
Finished decorating aFunction()
inside my_decorator.__call__()

也就是說,decorator 是在 init 的時候,就已經取得了 aFunction 的定義,而不是在呼叫 aFunction 時,才傳入 my_decorator。

decorator 其實就只是將 function 傳入另一個 function 的 syntax sugar。


在剛剛的例子中,我們在 __init__ 直接呼叫了 f(),但以下這個例子,才是比較重要的功能,他先將 f 儲存到變數中,然後等到 __call 才使用 f()

class entry_exit(object):

    def __init__(self, f):
        self.f = f

    def __call__(self):
        print("Entering", self.f.__name__)
        self.f()
        print("Exited", self.f.__name__)

@entry_exit
def func1():
    print("inside func1()")

@entry_exit
def func2():
    print("inside func2()")

func1()
func2()

所以可以在呼叫 func1() 的前面以及執行完成的後面,在 __call__裡面呼叫 self.f() 的前後加上自己需要增加的 code

Entering func1
inside func1()
Exited func1
Entering func2
inside func2()
Exited func2

function as decorator

剛剛是使用 class 作為 decorator,但可以直接定義一個 decorator function,以下的例子中,定義了 decorator function 以 f 為參數,在裡面定義了新的 new_f,取代了原本的 f

def entry_exit(f):
    def new_f():
        print("Entering", f.__name__)
        f()
        print("Exited", f.__name__)
    return new_f

@entry_exit
def func1():
    print("inside func1()")

@entry_exit
def func2():
    print("inside func2()")

func1()
func2()
print(func1.__name__)

執行結果為,因為 new_f 已經取代了 f,所以 func1.__name__ 的名稱結果是 new_f

Entering func1
inside func1()
Exited func1
Entering func2
inside func2()
Exited func2
new_f

如果覺得奇怪,還是可以刻意將 new_f 的名稱修改為跟 f 一樣

def entry_exit(f):
    def new_f():
        print("Entering", f.__name__)
        f()
        print("Exited", f.__name__)
    new_f.__name__ = f.__name__
    return new_f

Decorator with Arguments

因為 decorator 其實也是呼叫一個 function,所以可以直接在 decorator 上加上參數,以下是在 decorator class 裡面取得 decorator 參數的例子

class decorator_with_arguments(object):

    def __init__(self, arg1, arg2, arg3):
        print("Inside __init__()")
        self.arg1 = arg1
        self.arg2 = arg2
        self.arg3 = arg3

    def __call__(self, f):
        print("Inside __call__()")
        def wrapped_f(*args):
            print("Inside wrapped_f()")
            print("Decorator arguments:", self.arg1, self.arg2, self.arg3)
            f(*args)
            print("After f(*args)")
        return wrapped_f

@decorator_with_arguments("hello", "world", 42)
def sayHello(a1, a2, a3, a4):
    print('sayHello arguments:', a1, a2, a3, a4)

print("After decoration")

print("Preparing to call sayHello()")
sayHello("say", "hello", "argument", "list")
print("after first sayHello() call")
sayHello("a", "different", "set of", "arguments")
print("after second sayHello() call")

執行結果如下,在 __init__,可以取得 decorator 的參數,並在 __call__ 裡面使用這些參數

Inside __init__()
Inside __call__()
After decoration

Preparing to call sayHello()
Inside wrapped_f()
Decorator arguments: hello world 42
sayHello arguments: say hello argument list
After f(*args)
after first sayHello() call

Inside wrapped_f()
Decorator arguments: hello world 42
sayHello arguments: a different set of arguments
After f(*args)
after second sayHello() call

function decorator with arguments

同樣的 function decorator 也可以使用參數

def decorator_function_with_arguments(arg1, arg2, arg3):
    def wrap(f):
        print("Inside wrap()")
        def wrapped_f(*args):
            print("Inside wrapped_f()")
            print("Decorator arguments:", arg1, arg2, arg3)
            f(*args)
            print("After f(*args)")
        return wrapped_f
    return wrap

@decorator_function_with_arguments("hello", "world", 42)
def sayHello(a1, a2, a3, a4):
    print('sayHello arguments:', a1, a2, a3, a4)

print("After decoration")

print("Preparing to call sayHello()")
sayHello("say", "hello", "argument", "list")
print("after first sayHello() call")
sayHello("a", "different", "set of", "arguments")
print("after second sayHello() call")
Inside wrap()
After decoration

Preparing to call sayHello()
Inside wrapped_f()
Decorator arguments: hello world 42
sayHello arguments: say hello argument list
After f(*args)
after first sayHello() call

Inside wrapped_f()
Decorator arguments: hello world 42
sayHello arguments: a different set of arguments
After f(*args)
after second sayHello() call

因為被 decorator 修飾的原始 function,無法在實作 decorator 時就預先知道參數的個數,因此在定義 wrapper function 時,是定義為 def wrapped_f(*args):

其中 *args 代表不固定個數的參數,如果要支援有參數名稱的參數,則可以使用 **kwargs

多個 decorator

@a
@b
@c
def f ():
    pass

如果有好幾個 decorator,那麼實際上呼叫的順序,是以下這樣

f = a(b(c(f)))

處理時間的 decorator

以下這是計算函數處理時間的 decorator

from time import time

def timer(func):
    def wraper(*args, **kwargs):
        before = time()
        result = func(*args, **kwargs)
        after = time()
        print("elapsed: ", after - before)
        return result
    return wraper

@timer
def add(x, y = 10):
    return x + y

@timer
def sub(x, y = 10):
    return x - y

print("add(10)",       add(10))
print("add(20, 30)",   add(20, 30))

執行結果為

elapsed:  9.5367431640625e-07
add(10) 20
elapsed:  1.1920928955078125e-06
add(20, 30) 50

函數參數的型別檢查 pylimit

pylimit2

利用裝飾器給python的函式加上型別限制

pylimit

python 因為是動態資料型別語言,變數不需要定義資料型別,隨時會改變,但是在實作時,常常會遇到一個問題,就是實作的 function 只能處理某些資料型別的參數,程式如果寫錯,很容易就 crash。

上面的兩個 pylimit 套件,就是利用 decorator 的做法,在裡面加上 function 參數的資料型別定義,然後就可以在呼叫該函數時,進行資料型別檢查,這樣就不需要在實作時,寫上很多 type() 檢查的程式碼。

References

Decorators

Python Decorator(裝飾器)

[Python] Decorator 介紹

理解 Python 裝飾器看這一篇就夠了

萬惡的 Python Decorator 究竟是什麼?

Python Decorator 入門教學

2019/01/07

python 簡易 websocket server

Websocket Server 是一個簡單的 websocket server,沒有任何其他的關聯的套件,只需要 python 標準的 sdk。

由 github 下載原始檔 zip 後,將 server.py 以及 websocket_server 目錄複製到 project 裡面。

在 server.py 的 message_received 加上一行廣播訊息的程式碼。

server.py

from websocket_server import WebsocketServer

# Called for every client connecting (after handshake)
def new_client(client, server):
    print("New client connected and was given id %d" % client['id'])
    server.send_message_to_all("Hey all, a new client has joined us")


# Called for every client disconnecting
def client_left(client, server):
    print("Client(%d) disconnected" % client['id'])


# Called when a client sends a message
def message_received(client, server, message):
    if len(message) > 200:
        message = message[:200]+'..'
    print("Client(%d) said: %s" % (client['id'], message))
    server.send_message_to_all("Client(%d) said: %s" % (client['id'], message))


PORT=9001
server = WebsocketServer(PORT)
server.set_fn_new_client(new_client)
server.set_fn_client_left(client_left)
server.set_fn_message_received(message_received)
server.run_forever()

另外製作一個 client.html

<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8">
        <title>html5 websocket特性</title>
        <style>
            body{
                overflow: hidden;
            }
            h2{
                margin-top: 30px;
                text-align: center;
                background-color: #393D49;
                color: #fff;
                ont-weight: normal;
            padding: 15px 0
            }
            #chat{
                text-align: center;
            }
            #win{
                margin-top: 20px;
                text-align: center;
            }
            #sse{
                margin-top: 10px;
                text-align: center;
            }
            #sse button{
                background-color: #009688;
                color: #fff;
                height: 40px;
                border: 0;
                border-radius: 3px 3px;
                padding-left: 10px;
                padding-right: 10px;
                cursor: pointer;
            }
        </style>
        <script src="http://code.jquery.com/jquery-1.12.4.min.js"></script>
    </head>
    <body>
        <h2>聊天室</h2>
            <div id="chat">
                <textarea id="history" cols="80" rows="20"></textarea>
            </div>

            <div id="win">
                <textarea id="messagewin" cols="80" rows="5"></textarea>
            </div>

            <div id="sse">
                <button onclick="sendMessage()">傳送對話</button>
            </div>

        <script type="text/javascript">
            var oHistory = $('#history');
            var oWin = $('#messagewin');

            if ("WebSocket" in window){
                console.log("您的瀏覽器支援 WebSocket!");
                var ws = new WebSocket("ws://127.0.0.1:9001");
                //var ws = new WebSocket("ws://localhost:9001");
                ws.onopen = function(){
                    console.log("websocket 已連線上");
                }

                ws.onmessage = function (evt) {
                    var dataReceive = evt.data;
                    console.log("資料已接收..."+dataReceive);
                    $('#history').val($('#history').val()+dataReceive+"\n");
                };

                ws.onclose = function() {
                    console.log("連線已關閉...");
                };

            }else{
                // 瀏覽器不支援 WebSocket
                console.log("您的瀏覽器不支援 WebSocket!");
            }

            function sendMessage(){
                var dataSend = oWin.val().trim();
                ws.send(dataSend);
                oWin.val('');
            }
        </script>
    </body>
</html>

將 client.html 放在任意一個 webserver 上,因為 client.html 會連接到 127.0.0.1 的 websocket server,以 python server.py 啟動 server 後,打開兩個 browser tab,打開 client.html

兩個視窗間,因為剛剛增加的那一行廣播訊息的 code,所以可以看到所有訊息。

References

python與html5 websocket開發聊天對話窗

2018/12/24

python 函數的可變參數 *args 和 **kwargs

一般函數的參數個數都是固定的,但如果遇到參數數量不固定的狀況,通常會將某些參數填上預設值,在 python function 可以支援兩種可變數量的參數 *args**kwargs

以下例子中的 fun 雖然定義了三個參數,但是後面兩個填上預設值,呼叫該函數時,就可以忽略 b 與 c,直接使用預設值。

def fun(a,b=2,c=3):
    print("a={}, b={}, c={}".format(a,b,c))

fun(1)

fun(1,22,33)

執行結果

a=1, b=2, c=3
a=1, b=22, c=33

*args是可變的positional arguments列表,**kwargs是可變的keyword arguments列表。兩個可以同時使用,但在使用時,*args必須在**kwargs的前面,因為positional arguments,有位置順序的對應,必須位於keyword arguments之前。

以下的例子,是定義 fun 有一個必要參數 a,以及可變的 *args

def fun(a, *args):
    print("a={}".format(a))
    for arg in args:
        print('Optional argument: {}'.format( arg ) )

fun(1,22,33)

執行結果為

a=1
Optional argument: 22
Optional argument: 33

如果同時加上 **kwargs

def fun(a, *args, **kwargs):
    print("a={}".format(a))
    for arg in args:
        print('Optional argument: {}'.format( arg ) )

    for k, v in kwargs.items():
        print('Optional kwargs argument key: {} value {}'.format(k, v))

fun(1,22,33, k1=44, k2=55)

執行結果為

a=1
Optional argument: 22
Optional argument: 33
Optional kwargs argument key: k1 value 44
Optional kwargs argument key: k2 value 55

除了在定義函數的部分,在呼叫函數時,也可以使用 *args**kwargs

print("")
args = [1,2,3,4]
fun(*args)

print("")
kwargs = {'k1':10, 'k2':11}
fun(1, **kwargs)

print("")
fun(1, *args, **kwargs)

執行結果為

a=1
Optional argument: 2
Optional argument: 3
Optional argument: 4

a=1
Optional kwargs argument key: k1 value 10
Optional kwargs argument key: k2 value 11

a=1
Optional argument: 1
Optional argument: 2
Optional argument: 3
Optional argument: 4
Optional kwargs argument key: k1 value 10
Optional kwargs argument key: k2 value 11

References

Python定義的函數(或調用)中參數args 和*kwargs的用法

[Python] 令新手驚呆的 **kwargs

PYTHON中如何使用ARGS和*KWARGS

理解 Python 中的 *args 和 **kwargs

2018/12/10

python 如何處理 json

json: JavaScript Object Notation 是 javascript 的子集合,是一種輕量級的資料交換格式,比 XML 簡單,也容易閱讀及編寫,處理過程分為序列化及反序列化兩個部分,分別是 Marshalling/Encoder 及 Unmarshalling/Decoder。

Marshalling/Encoder 就是將 python 的資料物件轉換為 json 字串,使用 json.dumps

Unmarshalling/Decoder 是將 json 字串轉換為 python 物件,使用 json.loads

使用 dumps, loads 的簡單範例

這是 python 的 dict 資料型別,因為 dict 沒有字串的表示方式,透過 repr 將 dict 轉換為可以列印的資料

json_obj = {
            'name' : 'Apple',
            'shares' : 100,
            'price' : 542.1
        }

logging.info( "json_obj type = "+str(type(json_obj))+ ", data =" + repr(json_obj) )

dict 可以透過 json.dumps 轉換為 json 字串

json_string = json.dumps(json_obj)

logging.info( "json_string type="+ str(type(json_string))+ ", data=" + json_string )

再透過 json.loads 將 json 字串轉換為 dict

json_object = json.loads(json_string)

        logging.info( "json_object type="+str(type(json_object))+", data="+repr(json_object) )

測試結果

json_obj type = <class 'dict'>, data ={'name': 'Apple', 'shares': 100, 'price': 542.1}

json_string type=<class 'str'>, data={"name": "Apple", "shares": 100, "price": 542.1}

json_object type=<class 'dict'>, data={'name': 'Apple', 'shares': 100, 'price': 542.1}

由 python 資料轉換為 json 的對照表為

python json
dict object
list, tuple array
str, unicode string
int, long, float number
True true
False false
None null

由 json 轉換為 python 資料的對照表為

json python
object dict
array list
string unicode
number(int) int,long
number(real) float
true True
false False
null None

pprint 及 dumps 的 indent, sort_keys 參數

如果 json 的字串比較長,列印到 console 時,可能會比較難查閱需要的資料,這時候有兩種方式可以處理

使用 pprint 可以直接用 pprint(json_obj)

from pprint imort pprint

json_obj = {
            'name' : 'Apple',
            'shares' : 100,
            'price' : 542.1
        }

logging.info( "json_obj type = "+str(type(json_obj))+ ", data =" + repr(json_obj) )

pprint(json_obj)

執行結果為

{'name': 'Apple', 'price': 542.1, 'shares': 100}

在 dumps 加上 indent 參數

json_string = json.dumps(json_obj, indent=4)

執行結果為

json_string type=<class 'str'>, data={
    "name": "Apple",
    "shares": 100,
    "price": 542.1
}

在 dumps 加上 sort_keys 可在轉換 json 時,同時進行 key 的排序

json.dumps({"c": 0, "b": 0, "a": 0}, sort_keys=True)

執行結果

{"a": 0, "b": 0, "c": 0}

loads 的 object_pairs_hookobject_hook 參數

通常 json 會轉換為 python 的 dict 及 list,可使用 object_pairs_hook,將 json 轉換為 OrderedDict

s = '{"name": "ACME", "shares": 50, "price": 490.1}'
        from collections import OrderedDict
data = json.loads(s, object_pairs_hook=OrderedDict)

logging.info( "json_object type="+str(type(data))+", data="+repr(data) )

執行結果

json_object type=<class 'collections.OrderedDict'>, data=OrderedDict([('name', 'ACME'), ('shares', 50), ('price', 490.1)])

也可以用 object_hook 將 json 轉換為 python 物件

class JSONObject:
    def __init__(self, d):
        self.__dict__ = d

data2 = json.loads(s, object_hook=JSONObject)
logging.info( "json_object type="+str(type(data2))+", name="+data2.name+", shares="+str(data2.shares)+", price="+str(data2.price) )

執行結果

json_object type=<class '__main__.JSONObject'>, name=ACME, shares=50, price=490.1

skipkeys

在 dumps 如果遇到 key 不是字串時,會發生 TypeError,可使用 skipkeys 略過無法處理的 key

data =  { 'b' : 789 , 'c' : 456 ,( 1 , 2 ): 123 }
print( json.dumps(data,skipkeys = True) )

執行結果

{"b": 789, "c": 456}

自訂 python 物件的 轉換函數

如果是自訂的 python 類別,通常是無法直接透過 dumps 序列化,會發生 error

class Point:
    def __init__(self, x, y):
        self.x = x
        self.y = y
p = Point(2, 3)

print(p)

json.dumps(p)

執行結果

TypeError: Object of type 'Point' is not JSON serializable

解決方式:

首先定義兩個獨立的 util function,object2dict 是將物件轉換為 dict,dict2object 則是將 dict 轉換為 object,在 dict2object 裡面會使用 dict 的 __module____class__ 這兩個資料,用來正確找到 class 的定義

def object2dict(obj):
    d = {
        '__class__': obj.__class__.__name__,
        '__module__': obj.__module__
    }
    # d.update( vars(obj) )
    d.update(obj.__dict__)
    return d


def dict2object(d):
    if '__class__' in d:
        module_name = d.pop( '__module__' )
        class_name = d.pop( '__class__' )
        logging.debug("module_name="+str(module_name)+", class_name="+class_name)

        # # from A import B
        import importlib
        objmodule = importlib.import_module(module_name)
        cls = getattr(objmodule, class_name)

        # objmodule = __import__(module_name)
        # cls = getattr(objmodule, class_name)

        # # use class directly
        # cls = classes[class_name]
        # # Make instance without calling __init__
        obj = cls.__new__(cls)
        for key, value in d.items():
            setattr(obj, key, value)
        return obj
    else:
        inst = d
    return inst

如果另外有一個類別定義在 test.point.py 裡面

class Point:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __repr__( self ):
        return 'Point Object x : %d , y : %d' % ( self.x, self.y)

透過 object2dict 及 dict2object 就可以協助進行物件與 json 的轉換

from test import point
p = point.Point(2, 3)
logging.info(p)

json_str = json.dumps(p, default=object2dict)
logging.info(json_str)

o = json.loads(json_str, object_hook=dict2object)
logging.info( "json_object type="+str(type(o))+", data="+repr(o) )

執行結果

{"__class__": "Point", "__module__": "test.point", "x": 2, "y": 3}

module_name=test.point, class_name=Point
2018-09-03 16:01:00,274 INFO {95322,MainProcess} 

json_object type=<class 'test.point.Point'>, data=Point Object x : 2 , y : 3

有時會遇到 list of Point,這時需要修改 object2dict 讓他可以處理 obj 是 list 的狀況

def object2dict(obj):
    d = {}
    if isinstance(obj, list):
        return json.dumps(obj, default=object2dict)
    else:
        # d = {
        #   '__class__': obj.__class__.__name__,
        #   '__module__': obj.__module__
        # }
        d['__class__'] = obj.__class__.__name__
        d['__module__'] = obj.__module__
        # d.update( vars(obj) )
        d.update(obj.__dict__)
        return d

References

20.2. json — JSON encoder and decoder

Python處理JSON

6.2 讀寫JSON數據

Json概述以及python對json的相關操作

2018/11/26

如何使用 python logging

在 python 要列印除錯資訊,最簡單的方式是使用 print,但如果要區分 log 的等級,或是將不同的 log 資訊放到不同的檔案,這時就需要使用 logging module,但 loggin module 的設定會比較複雜一些,以下紀錄使用 logging module 的方式。

在 project 最常被呼叫及使用的 util.py 裡面,初始化 logging module,如果只需要簡單的 logging 設定,可直接在 util 裡面初始化 logging,複雜的設定可以用設定檔的方式處理。

直接在程式裡面初始化 logging 的範例

使用 TimedRotatingFileHandler,搭配 when='midnight',在每天午夜更換log 檔案,並將舊的 log 檔加上日期 postfix,backupCount 是控制保留幾個舊的 log file。formatter 是設定每一行 log 的 pattern。最後將 file handler 以及 console handler 新增到 rootLogger 裡面。

rootLogger = logging.getLogger()
# 用 rootLogger 的 handler 數量判斷是否已經有初始化 logging print( "len(logger.handlers)="+str(len(logger.handlers)) )

if len(rootLogger.handlers) == 0:
    from logging.handlers import TimedRotatingFileHandler
    logger.setLevel(logging.DEBUG)
    
    log_file_path = os.path.join('.', logs_directory+'/'+'project.log').replace('\\', '/')
    fh = TimedRotatingFileHandler(log_file_path,when='midnight',interval=1,backupCount=30)
    fh.suffix = "%Y-%m-%d"

    datefmt = '%Y-%m-%d %H:%M:%S'
    # datefmt = '%Y-%m-%d'
    # format_str = '%(asctime)s %(levelname)s %(message)s '
    format_str = '%(asctime)s %(levelname)s %(module)s.%(funcName)s %(lineno)d: %(message)s'
    # formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
    formatter = logging.Formatter(format_str, datefmt)
    fh.setFormatter(formatter)
    logger.addHandler(fh)

    # 定義 handler 輸出 sys.stderr
    console = logging.StreamHandler()
    console.setLevel(logging.DEBUG)
    console.setFormatter(formatter)

    logger.addHandler(console)

使用時只需要 import logging 預設就會取得 root logger

import logging

logging.info("test")

logging 設定檔

如果需要複雜一些的 logging,例如某個 module 的 log 放到某些特別的檔案中,區分 module 讓不同 module 的 log 不會全部混在一個 log file 裡面。可利用設定檔的方式處理。

logging.cfg 設定檔,定義了四個 logger,root logger 有兩個 handler,module logger 有兩個 handler,其中 console handler 是一樣的,不同的是對應到不同 log 檔案的 file handler,至於 log 的 pattern 樣式跟上面的範例設定一樣。

[loggers]
keys=root,consoleLog,projectFileHandler,moduleLog

[formatters]
keys=consoleFormatter,fileFormatter

[handlers]
keys=consoleHandler,projectFileHandler,moduleFileHandler

[logger_root]
level=DEBUG
handlers=projectFileHandler,consoleHandler

[logger_moduleLog]
level=DEBUG
qualname=moduleLog
handlers=moduleFileHandler,consoleHandler
propagate=0

[logger_consoleLog]
level=NOTSET
handlers=consoleHandler
qualname=consoleLog
propagate=0

[formatter_consoleFormatter]
format=%(asctime)s %(levelname)s {%(process)d,%(processName)s} {%(filename)s} [%(funcName)s] %(lineno)d - %(message)s
datefmt=

[formatter_fileFormatter]
format=%(asctime)s %(levelname)s {%(process)d,%(processName)s} {%(filename)s} {%(module)s} [%(funcName)s] %(lineno)d - %(message)s
datefmt=


[handler_consoleHandler]
class=StreamHandler
level=NOTSET
formatter=consoleFormatter
args=(sys.stdout,)


[handler_projectFileHandler]
class=handlers.TimedRotatingFileHandler
level=NOTSET
formatter=fileFormatter
args=('%(projectlogpath)s','midnight',1,30)

[handler_moduleFileHandler]
class=handlers.TimedRotatingFileHandler
level=NOTSET
formatter=fileFormatter
args=('%(modulelogpath)s','midnight',1,30)

在 util.py 初始化 logging

# logging
from logging import config
import logging

rootLogger = logging.getLogger()
# print( "len(logger.handlers)="+str(len(logger.handlers)) )

if len(rootLogger.handlers) == 0:
    logging_cfg_file_path = os.path.join('.', 'conf/logging.cfg')
    # print("logging_cfg_file_path="+logging_cfg_file_path)

    # create logs directory
    logs_directory = "logs"
    if not os.path.exists(logs_directory):
        try:
            os.makedirs(logs_directory)
        except OSError:
            if not os.path.isdir(logs_directory):
                raise

    project_log_file = 'project.log'
    module_log_file = 'module.log'

    project_log_path = os.path.join('.', logs_directory+'/'+ project_log_file).replace('\\', '/')
    module_log_path = os.path.join('.', logs_directory+'/'+ module_log_file).replace('\\', '/')

    logging.config.fileConfig(logging_cfg_file_path,
                            defaults={'projectlogpath': project_log_path,
                                    'modulelogpath': module_log_path},
                            disable_existing_loggers=False)

在使用時,如果直接用 logging,會對應使用到 root logger

import logging

logging.info("test")

但如果是透過 logging.getLogger(name="moduleLog") 可取得 moduleLog 的 logger,這時候的 log 會寫入到 module 的 log file

import logging

logger = logging.getLogger(name="moduleLog")
logger.info("test")

References

[Python] logging 教學

[Python] logging 教學

2018/11/19

如何使用 SQLAlchemy in python

SQLAlchemy是Python的一款開源軟體,提供 SQL 工具包及物件關係對映(ORM)的功能,以 MIT 授權。SQLAlchemy 的設計概念比較接近 Java 的 Hibernate。

另一個常用的 ORM 套件為 Django ORM,他的設計概念跟 SQLAlchemy 不同,是使用 Active Record 的方式。

安裝套件

因我們是以 MariaDB 測試,除了要安裝 SQLAlchemy,還要安裝 mysql 的 driver,我們是用 python 的 driver: PyMySQL 進行測試

使用 python 3 的版本,是用 pip3 的指令,如果是 python 2 是用 pip。

sudo pip3 install SQLAlchemy
sudo pip3 install PyMySQL

DBEngine.py

以 Table: status 為例,在 DBEngine 中,以 create_engine 產生 SQLAlchemy 最基本的 engine,再由 engine 產生 session_maker 備用,待會要在 DAO 裡面使用。

後面的部分,用 engine.dialect.has_table 判斷 table 是否存在,如果不存在,就建立該 table

另外注意 updatedate 是在每次更新 record 時,要更新時間,createdate 則是在記錄產生 record 的時間。

from sqlalchemy import *
from sqlalchemy.orm import sessionmaker

db_user = 'root'
db_password = 'password'
db_host = 'localhost'
db_port = 3306
db_name = 'testdb'

engine = create_engine('mysql+pymysql://'+db_user+':'+db_password+'@'+db_host+':'+str(db_port)+'/'+db_name, encoding="utf8", echo=False, pool_pre_ping=True, pool_recycle=3600)
# echo: 是否顯示SQL的指令與相關訊息

DBSession = sessionmaker( bind=engine )

if not engine.dialect.has_table(engine, "status"):
    metadata = MetaData(engine)

    radiostatus_table = Table('status', metadata,
                              Column('rseq', Integer, primary_key=True, autoincrement=True),
                              Column('statuscode', Integer, nullable=False),
                              Column('updatedate', TIMESTAMP, default=func.now(), onupdate=func.now()),
                              Column('createdate', TIMESTAMP, default=func.now())
                              )

    metadata.create_all(engine)

Data Value Object

Base = declarative_base() 建立 Status 物件,作為 ORM 的物件定義,在該物件中,分別定義剛剛產生的 table: status的欄位對應。

from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Status(Base):
    __tablename__='status'

    rseq = Column(Integer, primary_key=True, autoincrement=True)
    statuscode = Column(Integer, nullable=False)

    updatedate = Column(TIMESTAMP, default=func.now(), onupdate=func.now)
    createdate = Column(TIMESTAMP, default=func.now())

    def __init__(self):
        pass

    def __init__(self, rseq, statuscode, updatedate, createdate):
        self.rseq = rseq
        self.statuscode = statuscode
        self.updatedate = updatedate
        self.createdate = createdate

    @classmethod
    def to_string(cls, vo):
        result = ''

        result += "(rseq, statuscode, updatedate, createdate)=({rseq}, {statuscode}, {updatedate}, {createdate})\n".format(rseq=str(vo.rseq), statuscode=str(vo.statuscode), updatedate=str(vo.updatedate), createdate=str(vo.createdate))

        return result

Data Access Object

最簡單的是 create data 的 function,首先透過 DBSession() 產生 database session,然後在該 function 中,將要處理的工作做完,完成時要呼叫 commit,發生 error 時要呼叫 rollback,在結束前,以 close 回收 db session。

import logging
from DBEngine import *
from sqlalchemy import *
from sqlalchemy.orm import *

class StatusDao():

    def __init__(self):
        pass
    
    def create(self, vo):
        session = DBSession()
        try:
            logging.debug("vo="+str(vo))
            session.add(vo)
            session.commit()
        except Exception as e:
            logging.error(str(e))
            session.rollback()
            raise
        finally:
            session.close()

這是 create_or_update 的 function,如果檢查該 rseq 的資料不存在,再 create record ,否則就以 update 方式更新該 record

    def create_or_update(self, vo):
        session = DBSession()
        try:
            oldvo = session.query(Status).filter_by(rseq=vo.rseq).first()

            if oldvo:
                oldvo.statuscode = vo.statuscode
                oldvo.updatedate = datetime.datetime.now()

                logging.debug("updating "+str(RadioStatus.to_string(oldvo)))
            else:
                logging.debug("creating "+str(RadioStatus.to_string(vo)) )
                session.add(vo)

            session.commit()

            newvo = session.query(Status).filter_by(rseq=vo.rseq).first()

            return newvo
        except Exception as e:
            logging.error(str(e))
            raise
        finally:
            session.close()

這是將 status 裡面所有資料都刪除的 function

    def get_all(self):
        session = DBSession()
        try:
            radiostatus_list = session.query(Status).all()
            return radiostatus_list
        except Exception as e:
            logging.error(str(e))
            raise
        finally:
            session.close()

取得某個 record 的資料

    def get_by_rseq(self, rseq):
        session = DBSession()
        try:
            vo = session.query(Status).filter_by(rseq=rseq).first()
            return vo
        except Exception as e:
            logging.error(str(e))
            raise
        finally:
            session.close()

取得所有 record 的資料

    def get_all(self):
        session = DBSession()
        try:
            status_list = session.query(Status).all()
            return status_list
        except Exception as e:
            logging.error(str(e))
            raise
        finally:
            session.close()

使用 DAO

產生 dao 物件後,再直接呼叫 function 即可

dao = StatusDao()

status = Status(None, 0, None, None)
vo = dao.create_or_update(status)

dao.delete_all()

這裡的 DAO 使用的 database session 作用範圍,並沒有超過 function 的呼叫範圍,因此這邊的 DAO 比較像是 business method 的功能,可以將一次 database task 要處理的所有工作,放到同一個 DAO function 裡面,在該 function 結束時,再 close db session,這樣的做法,對於簡單的 project 來說,比較容易管理 DAO 及 db session,也比較不會發生忘記 close db session 的問題。

References

SQLAlchemy wiki

Flask零基础到项目实战(四)SQLAlchemy数据库(二)

Column Insert/Update Defaults

Python SQLAlchemy ORM - 1

Connection Pooling

給Django用戶的SQLAlchemy介紹

2015/01/19

Python - 使用 FoldPanelBar 實作可摺疊 ListView


之前為了一些需求,需要使用到類似 line 通訊錄的那種可摺疊的畫面。在 android 可用 BaseExpandableListAdapter 實作;ios 則可用 TableView 的 group 來實作,那 python 呢?
上網爬文研究了幾天,看到有人說 python 官方 demo 裡面有可以做到摺疊的畫面,於是自己加工了一下,做出類似 BaseExpandableListAdapter 的東西...
成品如下圖:
以下就簡單說明一下是如何實作,最後面有範例提供給大家取用

架構



實作步驟

  1. 建立一個 ScrollView
  2. 建立一個 FoldPanelBar
  3. 加入 FoldPanel ( group view )
  4. 加入 child view ( 可自訂任何畫面,主要還是以 panel 為最底層 )

#建立一個 FoldPanelBar
self.panel = fpb.FoldPanelBar(self.parentView, 2, wx.DefaultPosition, self.parentView.GetSize(),
                           agwStyle=fpb.FPB_VERTICAL)
#設定 group 展開/收合的圖片
Images = wx.ImageList(27,27)  #寬高一定要跟圖片一樣,不然不能載入
Images.Add(wx.Bitmap( u"image/news_open.png", wx.BITMAP_TYPE_ANY ))
Images.Add(wx.Bitmap( u"image/news_close.png", wx.BITMAP_TYPE_ANY ))

#設定 group 的顏色等 style
cs = foldPanel.CaptionBarStyle()
cs.SetCaptionStyle(foldPanel.CAPTIONBAR_FILLED_RECTANGLE)
cs.SetFirstColour(wx.Colour(85,147,177,0))
cs.SetCaptionColour(wx.WHITE)

titleDir = {
          0: "myProfile",
          1: "Favorites",
          2: "group",
          3: "application",
          4: "supplier",
          5: "company"
}

#list 是空時
if len(self.coll)==0:
    for i in range(0, MAX_TITLE_COUNT+1, +1):
 #增加一個 group
        item = self.panel.AddFoldPanel(titleDir[i], collapsed=False, foldIcons=Images, cbstyle=cs)
 #增加一條分隔線
        self.panel.AddFoldPanelSeparator(item, spacing=0)
 #綁定 action
        item.Bind(fpb.EVT_CAPTIONBAR, functools.partial(self.OnPressCaption, groupPosition=i))
        self.pitemList.append(item)
else:
    for i in range(0, len(self.coll), +1):
 #增加一個 group
        item = self.panel.AddFoldPanel(titleDir[self.coll[i][0]], collapsed=False, foldIcons=Images, cbstyle=cs)
        childItemList = []

        for j in range(0, len(self.coll[i][1]), +1):
            isGroupTitle = False
            bitmapPaht = u"image/name_icon1.png"
            name = "test"
            positionType = self.coll[i][0]
            vo = self.coll[i][1][j]
            if positionType == LOGIN_USER:
                print "LOGIN_USER"
                name = vo
            elif positionType == FAV_USER:
                print "FAV_USER"
                name = vo
            elif positionType == CHAT_GROUP:
                print "CHAT_GROUP"
                bitmapPaht = u"image/name_icon2.png"
                name = vo
            elif positionType == APP_USER:
                bitmapPaht = u"image/name_icon3.png"
                print "APP_USER"
                name = vo
            elif positionType == SUPPLIER:
                print "SUPPLIER"
                name = vo
            elif positionType == COMPANY:
                print "COMPANY"
                name = vo

            temp = wx.Bitmap( bitmapPaht, wx.BITMAP_TYPE_ANY )
            if isGroupTitle:
                isGroupTitle = False
  #==================================
  #增加一個 child view
                panel = wx.Panel(item, j)
                childItemList.append(panel)
                panel.BackgroundColour = wx.WHITE
                bSizer_gorupTitlePanel = wx.BoxSizer( wx.HORIZONTAL )

                #最上層group
                print "name:"+name
                print "fullname:"+vo.fullname
                print "groupname:"+vo.groupname
                st_groupPath = wx.StaticText( panel, wx.ID_ANY, name, (3, 12), wx.DefaultSize, 0 )
                st_groupPath.Wrap( -1 )

                panel.SetClientSize(wx.Size( -1, temp.GetHeight()))
  #==================================

            else:
  #==================================
  #增加一個 child view
                panel = wx.Panel(item, j)
                childItemList.append(panel)
                panel.BackgroundColour = wx.WHITE

                bSizer2 = wx.BoxSizer( wx.HORIZONTAL )
                size = temp.GetWidth(), temp.GetHeight()

                imgbtn_icon = wx.BitmapButton( panel, j, temp, (5, 5), size, 0 )
                bSizer2.Add( imgbtn_icon, 0, wx.ALL|wx.EXPAND, 5 )

                st_name = wx.StaticText( panel, j, name, (temp.GetWidth()+10, temp.GetHeight()/2), wx.DefaultSize, 0 )
                st_name.Wrap( -1 )
                bSizer2.Add( st_name, 1, wx.ALL|wx.EXPAND, 0 )

                panel.SetClientSize(wx.Size( -1, temp.GetHeight()+10))
  #==================================
  #綁定 action
                panel.Bind(wx.EVT_LEFT_DCLICK, functools.partial(self.doubleClick, groupPosition=i, vo=vo), id=j)
                panel.Bind(wx.EVT_LEFT_DOWN, functools.partial(self.OnItemDown, groupPosition=i, obj="test msg"), id=j)
                panel.Bind(wx.EVT_ENTER_WINDOW, functools.partial(self.onMouseOver, groupPosition=i), id=j)
                panel.Bind(wx.EVT_LEAVE_WINDOW, functools.partial(self.onMouseLeave, groupPosition=i), id=j)
                st_name.Bind(wx.EVT_LEFT_DCLICK, functools.partial(self.doubleClick, groupPosition=i, vo=vo), id=j)
                imgbtn_icon.Bind(wx.EVT_BUTTON, functools.partial(self.clickIcon, groupPosition=i, obj="test msg"), id=j)
  #==================================

     #將 child view add 到 FoldPanel 中
            self.panel.AddFoldPanelWindow(item, panel, fpb.FPB_ALIGN_WIDTH, 0)
            #==================================
            #增加一條分隔線
            self.panel.AddFoldPanelSeparator(item, spacing=0)
            #==================================
 #綁定 action
        item.Bind(fpb.EVT_CAPTIONBAR, functools.partial(self.OnPressCaption, groupPosition=i))

        self.itemList.append(childItemList)
        self.pitemList.append(item)

References