2018/11/19

如何使用 SQLAlchemy in python

SQLAlchemy是Python的一款開源軟體,提供 SQL 工具包及物件關係對映(ORM)的功能,以 MIT 授權。SQLAlchemy 的設計概念比較接近 Java 的 Hibernate。

另一個常用的 ORM 套件為 Django ORM,他的設計概念跟 SQLAlchemy 不同,是使用 Active Record 的方式。

安裝套件

因我們是以 MariaDB 測試,除了要安裝 SQLAlchemy,還要安裝 mysql 的 driver,我們是用 python 的 driver: PyMySQL 進行測試

使用 python 3 的版本,是用 pip3 的指令,如果是 python 2 是用 pip。

sudo pip3 install SQLAlchemy
sudo pip3 install PyMySQL

DBEngine.py

以 Table: status 為例,在 DBEngine 中,以 create_engine 產生 SQLAlchemy 最基本的 engine,再由 engine 產生 session_maker 備用,待會要在 DAO 裡面使用。

後面的部分,用 engine.dialect.has_table 判斷 table 是否存在,如果不存在,就建立該 table

另外注意 updatedate 是在每次更新 record 時,要更新時間,createdate 則是在記錄產生 record 的時間。

from sqlalchemy import *
from sqlalchemy.orm import sessionmaker

db_user = 'root'
db_password = 'password'
db_host = 'localhost'
db_port = 3306
db_name = 'testdb'

engine = create_engine('mysql+pymysql://'+db_user+':'+db_password+'@'+db_host+':'+str(db_port)+'/'+db_name, encoding="utf8", echo=False, pool_pre_ping=True, pool_recycle=3600)
# echo: 是否顯示SQL的指令與相關訊息

DBSession = sessionmaker( bind=engine )

if not engine.dialect.has_table(engine, "status"):
    metadata = MetaData(engine)

    radiostatus_table = Table('status', metadata,
                              Column('rseq', Integer, primary_key=True, autoincrement=True),
                              Column('statuscode', Integer, nullable=False),
                              Column('updatedate', TIMESTAMP, default=func.now(), onupdate=func.now()),
                              Column('createdate', TIMESTAMP, default=func.now())
                              )

    metadata.create_all(engine)

Data Value Object

Base = declarative_base() 建立 Status 物件,作為 ORM 的物件定義,在該物件中,分別定義剛剛產生的 table: status的欄位對應。

from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Status(Base):
    __tablename__='status'

    rseq = Column(Integer, primary_key=True, autoincrement=True)
    statuscode = Column(Integer, nullable=False)

    updatedate = Column(TIMESTAMP, default=func.now(), onupdate=func.now)
    createdate = Column(TIMESTAMP, default=func.now())

    def __init__(self):
        pass

    def __init__(self, rseq, statuscode, updatedate, createdate):
        self.rseq = rseq
        self.statuscode = statuscode
        self.updatedate = updatedate
        self.createdate = createdate

    @classmethod
    def to_string(cls, vo):
        result = ''

        result += "(rseq, statuscode, updatedate, createdate)=({rseq}, {statuscode}, {updatedate}, {createdate})\n".format(rseq=str(vo.rseq), statuscode=str(vo.statuscode), updatedate=str(vo.updatedate), createdate=str(vo.createdate))

        return result

Data Access Object

最簡單的是 create data 的 function,首先透過 DBSession() 產生 database session,然後在該 function 中,將要處理的工作做完,完成時要呼叫 commit,發生 error 時要呼叫 rollback,在結束前,以 close 回收 db session。

import logging
from DBEngine import *
from sqlalchemy import *
from sqlalchemy.orm import *

class StatusDao():

    def __init__(self):
        pass
    
    def create(self, vo):
        session = DBSession()
        try:
            logging.debug("vo="+str(vo))
            session.add(vo)
            session.commit()
        except Exception as e:
            logging.error(str(e))
            session.rollback()
            raise
        finally:
            session.close()

這是 create_or_update 的 function,如果檢查該 rseq 的資料不存在,再 create record ,否則就以 update 方式更新該 record

    def create_or_update(self, vo):
        session = DBSession()
        try:
            oldvo = session.query(Status).filter_by(rseq=vo.rseq).first()

            if oldvo:
                oldvo.statuscode = vo.statuscode
                oldvo.updatedate = datetime.datetime.now()

                logging.debug("updating "+str(RadioStatus.to_string(oldvo)))
            else:
                logging.debug("creating "+str(RadioStatus.to_string(vo)) )
                session.add(vo)

            session.commit()

            newvo = session.query(Status).filter_by(rseq=vo.rseq).first()

            return newvo
        except Exception as e:
            logging.error(str(e))
            raise
        finally:
            session.close()

這是將 status 裡面所有資料都刪除的 function

    def get_all(self):
        session = DBSession()
        try:
            radiostatus_list = session.query(Status).all()
            return radiostatus_list
        except Exception as e:
            logging.error(str(e))
            raise
        finally:
            session.close()

取得某個 record 的資料

    def get_by_rseq(self, rseq):
        session = DBSession()
        try:
            vo = session.query(Status).filter_by(rseq=rseq).first()
            return vo
        except Exception as e:
            logging.error(str(e))
            raise
        finally:
            session.close()

取得所有 record 的資料

    def get_all(self):
        session = DBSession()
        try:
            status_list = session.query(Status).all()
            return status_list
        except Exception as e:
            logging.error(str(e))
            raise
        finally:
            session.close()

使用 DAO

產生 dao 物件後,再直接呼叫 function 即可

dao = StatusDao()

status = Status(None, 0, None, None)
vo = dao.create_or_update(status)

dao.delete_all()

這裡的 DAO 使用的 database session 作用範圍,並沒有超過 function 的呼叫範圍,因此這邊的 DAO 比較像是 business method 的功能,可以將一次 database task 要處理的所有工作,放到同一個 DAO function 裡面,在該 function 結束時,再 close db session,這樣的做法,對於簡單的 project 來說,比較容易管理 DAO 及 db session,也比較不會發生忘記 close db session 的問題。

References

SQLAlchemy wiki

Flask零基础到项目实战(四)SQLAlchemy数据库(二)

Column Insert/Update Defaults

Python SQLAlchemy ORM - 1

Connection Pooling

給Django用戶的SQLAlchemy介紹

沒有留言:

張貼留言