2016/11/28

redis for scala play

scala play 預設是使用 ehcache 來當作 cache 的 library,但這個實作常常會在開發的過程當中,會因為程式自動重載的功能,而造成 cache 出現 The play Cache is not alive (STATUS_SHUTDOWN) 的錯誤訊息,在搜尋過後,了解到這可能是 ehcache 的 bug,因此就嘗試將 ehcache 更換成 redis。

安裝 redis

從開發環境到正式環境,需要在不同作業系統安裝 redis,因此我們在這裡記錄不同作業系統安裝 redis 的方法。

windows

在 windows 安裝 [料理佳餚] 在 Windows 上安裝 Redis

Redis on Windows

要注意 Memory Limit 的設定,不然 redis 應該會把記憶體吃光。

用 telnet 或是 redis-cli 可以測試有沒有裝好。

telnet localhost 6379
Mac
sudo port install redis
sudo port load redis

sudo port unload redis

# start redis manually
redis-server /opt/local/etc/redis.conf

要自己去修改 /opt/local/etc/redis.conf

# limited number of clients in developement environment
maxclients 50

# 限制 redis 最多可使用多少記憶體
maxmemory 500MB

# 當記憶體不夠時,要用什麼方式處理
maxmemory-policy volatile-lru
Debian 8

Dotdeb 是Debian 的 3rd party repository,在裡面選擇 Taiwan mirror site。

vi /etc/apt/sources.list.d/dotdeb.list

deb http://ftp.yzu.edu.tw/Linux/dotdeb/ jessie all
deb-src http://ftp.yzu.edu.tw/Linux/dotdeb/ jessie all

安裝 Dotdeb 的 GPG key

wget https://www.dotdeb.org/dotdeb.gpg
sudo apt-key add dotdeb.gpg

安裝 redis server

sudo apt-get update
sudo apt-get install redis-server

啟動/停止

sudo service redis-server start

sudo service redis-server stop

用 redis-benchmark 測試連線狀況

redis-benchmark -q -n 1000 -c 10 -P 5

調整系統參數

sudo sysctl vm.overcommit_memory=1

vi /etc/sysctl.conf
vm.overcommit_memory = 1
Centos 7

安裝 epel

wget -r --no-parent -A 'epel-release-*.rpm' http://dl.fedoraproject.org/pub/epel/7/x86_64/e/

rpm -Uvh dl.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-*.rpm

安裝 redis

yum install redis

啟動/停止 redis

systemctl enable redis.service
systemctl start redis.service

systemctl stop redis.service
cluster

redis cluster 是用 sharding 的方式實作的,每一個節點負責一部分 hash slot,一般要建構 cluster 環境,要用到三個 redis node,再加上備援,應該要 6 個 redis node,另外三個是 slave node。

如果要測試細節可以參考這篇文章: Redis Cluster 3.0搭建與使用

或是直接研究官方的文件 Redis cluster tutorial

redis for play

play-redis 是 Play framework 2 cache plugin as an adapter to redis-server,使用這個 framework 的優勢是實作了 play 的 CacheAPI,因此我們可以透過設定,就將 ehcache 調整為 redis。

首先要在 build.sbt 將 disable ehcache,並加上 play-redis 的 libray。

// enable Play cache API (based on your Play version) and optionally exclude EhCache implementation
libraryDependencies += play.sbt.PlayImport.cache exclude("net.sf.ehcache", "ehcache-core")
// include play-redis library
libraryDependencies += "com.github.karelcemus" %% "play-redis" % "1.3.0-M1"

接下來修改 application.conf

# disable default Play framework cache plugin
play.modules.disabled += "play.api.cache.EhCacheModule"

# enable redis cache module
play.modules.enabled += "play.api.cache.redis.RedisCacheModule"

並在 application.conf 中設定 localhost redis 的連線方式

play.cache {
  // redis for Play https://github.com/KarelCemus/play-redis
  redis {
    redis.host="localhost"
    redis.port=6379

    ##redis-server database, 1-15, default:1
    redis.database=1
    ##connection timeout, default:1s (Duration)
    redis.timeout=1s
    ## Akka actor
    redis.dispatcher="akka.actor.default-dispatcher"
    ## Defines which configuration source enable. Accepted values are static, env, custom
    redis.configuration="static"
    ## optional
    #redis.password="null"
    ## optional, Name of the environment variable with the connection string.
    #redis.connection-string-variable=""
    ## Defines behavior when command execution fails.
    #redis.recovery="log-and-default"
  }
}

透過這樣的設定,就可以在不修改 scala 程式的狀況下,就把 ehcache 改成 redis。

2016/11/21

Play cache API

Scala Play 建議使用 EHCache 作為 cache solution,這在 java 領域是一個常見的套件,目前先看看官方的 cache 方式,畢竟已經整合到 play framework 中,未來再看看有沒有辦法改用 redis。

設定 cache

build/sbt 中增加 cache 這個 libraryDependencies

libraryDependencies ++= Seq(
  cache,
  ...
)

app/controllers/CacheApplication.scala

  • cache.set 新增 cache item
  • cache.get 取得 cache item
  • cache.remove 移除 cache item
  • cache.getOrElse 取得 cache item,如果找不到就新增
package controllers

import java.util.concurrent.TimeoutException
import javax.inject.Inject

import akka.actor.ActorSystem
import akka.pattern.after
import models.{Project, ProjectRepo, TaskRepo}
import play.api.Logger
import play.api.cache.CacheApi
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.mvc.{Action, Controller}

import scala.concurrent.Future
import scala.concurrent.duration._

class CacheApplication @Inject()(cache: CacheApi)
                           extends Controller {
  def newCache(name: String) = Action {
    val result = s"Add new project ${name} to cache..\n"

    val project: Project = Project(999, name)
    cache.set("project."+name, project)
    Ok(result)
  }

  def newCache2(name: String) = Action {
    val result = s"Add new project ${name} to cache with 5 minuts..\n"

    val project: Project = Project(888, name)
    cache.set("project."+name, project, 5.minutes)

    // 移除 cache item
    //cache.remove("project."+name)
    Ok(result)
  }

  def getCache(name:String) = Action {
    val project: Option[Project] = cache.get[Project]("project."+name)

    val result = project match {
      // 以 Some 測試 name 有沒有存在
      case Some(pj) => s"get project from cache: ${pj.name}"
      case None => s"can't get project from cache.."
    }

    Ok(result)
  }

  def getCache2(name:String) = Action {
    val project: Project = cache.getOrElse[Project]("project."+name) {
      // 如果 cache 中找不到 project.name 就產生一個新的,存到 cache 中
      Project(777, name)
    }

    val result = s"getOrElse project from cache: ${project.name}"
    Ok(result)
  }

}

conf/routes

GET           /cache/:name               controllers.CacheApplication.newCache(name:String)
GET           /cache2/:name               controllers.CacheApplication.newCache2(name:String)
GET           /getcache/:name            controllers.CacheApplication.getCache(name:String)
GET           /getcache2/:name           controllers.CacheApplication.getCache2(name:String)

測試

$ curl 'http://localhost:9000/cache/cproject'
Add new project cproject to cache..

$ curl 'http://localhost:9000/getcache/cproject'
get project from cache: cproject

$ curl 'http://localhost:9000/cache2/cproject'
Add new project cproject to cache with 5 minuts..

$ curl 'http://localhost:9000/getcache2/cproject'
getOrElse project from cache: cproject

預設的 cache store

預設緩存叫 play, 並可以利用 ehcache.xml 來設定新的 cache store。

調整 application.conf

play.cache {
  # If you want to bind several caches, you can bind the individually
  bindCaches = ["db-cache", "user-cache", "session-cache"]
}

在 controller/Application.scala 中使用新的 cache store

import play.api.cache._
import play.api.mvc._
import javax.inject.Inject

class Application @Inject()(
    @NamedCache("session-cache") sessionCache: CacheApi
) extends Controller {

}

Caching HTTP responses

可以將 HTTP response 放進 cache

首先利用 cached: Cached 來 cache actions

import play.api.cache.Cached
import javax.inject.Inject

class Application @Inject() (cached: Cached) extends Controller {

}

例如這個方式,就是將 home 放入 cache

  def cacheAction = cached("homePage") {
    Action {
      Ok("Hello World")
    }
  }

也可以搭配 Authenticated,為每一個 user 暫存不同的 result

def userProfile = Authenticated {
  user =>
    cached(req => "profile." + user) {
      Action {
        Ok(views.html.profile(User.find(user)))
      }
    }
}

可以選擇只要 cache 200 OK 的 response

def get(index: Int) = cached.status(_ => "/resource/"+ index, 200) {
  Action {
    if (index > 0) {
      Ok(Json.obj("id" -> index))
    } else {
      NotFound
    }
  }
}

或是將 404 Not Found 暫存幾分鐘

def get(index: Int) = {
  val caching = cached
    .status(_ => "/resource/"+ index, 200)
    .includeStatus(404, 600)

  caching {
    Action {
      if (index % 2 == 1) {
        Ok(Json.obj("id" -> index))
      } else {
        NotFound
      }
    }
  }
}

Custom Cache API

如果要自己實作新的 Cache API

要先在 application.conf disable EhCacheModule

play.modules.disabled += "play.api.cache.EhCacheModule"

然後用新的 cache API implementation,並 reuse NamedCache 綁定該 implementation。

References

Scala Cache

Play 緩存 API

2016/11/14

slick Database Persistence in Scala play 2.5

在 scala play 2.5 framework 中,要將資料儲存在 DB 中有好幾種方式,都是以整合第三方套件的方式實作,我們測試了原始的 JDBC 以及 slick 兩種,以下是 slick 的部分。

slick 是 functional relational mapping database library,是以 functional programming 方式存取關聯式資料庫。

準備 slick project

首先以 activator 產生一個新的 project,我們是使用Play Framework 2.5 and Slick 3.1

activator new tst6 play-slick3-example

這個 template 原本是使用 H2 memory database,改用 mysql。

build.sbt

name := """play-slick-example"""

version := "1.0"

lazy val root = (project in file(".")).enablePlugins(PlayScala)

scalaVersion := "2.11.7"

routesGenerator := InjectedRoutesGenerator

resolvers += "scalaz-bintray" at "https://dl.bintray.com/scalaz/releases"

libraryDependencies ++= Seq(
    cache,
    ws,
    filters,
    "com.typesafe.play" %% "play-slick" % "2.0.0",
    "com.typesafe.play" %% "play-slick-evolutions" % "2.0.0",
    //"com.h2database" % "h2" % "1.4.187",
    "mysql" % "mysql-connector-java" % "5.1.36",
    "org.scalatestplus.play" %% "scalatestplus-play" % "1.5.0" % "test",
    specs2 % Test
)

resolvers += "Sonatype snapshots" at "http://oss.sonatype.org/content/repositories/snapshots/"

fork in run := true

修改 logback.xml,主要是增加以下這一行的設定。

<logger name="slick.jdbc.JdbcBackend.statement"  level="DEBUG" />

完整的 loback.xml 內容如下

<!-- https://www.playframework.com/documentation/latest/SettingsLogger -->
<configuration>

<conversionRule conversionWord="coloredLevel" converterClass="play.api.libs.logback.ColoredLevel" />

<!--
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
    <file>${application.home:-.}/logs/application.log</file>
    <encoder>
        <pattern>%date [%level] from %logger in %thread\n\t%message%n%xException</pattern>
    </encoder>
</appender>
-->

<appender name="FILE"
          class="ch.qos.logback.core.rolling.RollingFileAppender">
    <append>true</append>
    <rollingPolicy
            class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
        <param name="FileNamePattern"
               value="${application.home:-.}/logs/application.%d{yyyy-MM-dd}.log.zip">
        </param>
    </rollingPolicy>
    <encoder>
        <!-- <pattern>%d %-5p %c %L%n %m%n</pattern> -->
        <!-- <charset class="java.nio.charset.Charset">UTF-8</charset>  -->
        <pattern>%date [%level] from %logger in %thread\n\t%message%n%xException</pattern>
    </encoder>
</appender>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
        <pattern>%coloredLevel %logger{15} in %thread\n\t%message%n%xException{10}</pattern>
    </encoder>
</appender>

<appender name="ASYNCFILE" class="ch.qos.logback.classic.AsyncAppender">
    <appender-ref ref="FILE" />
</appender>

<appender name="ASYNCSTDOUT" class="ch.qos.logback.classic.AsyncAppender">
    <appender-ref ref="STDOUT" />
</appender>

<logger name="play" level="INFO" />
<logger name="application" level="DEBUG" />

<!-- Will log all statements -->
<logger name="slick.jdbc.JdbcBackend.statement"  level="DEBUG" />

<!-- Off these ones as they are annoying, and anyway we manage configuration ourselves -->
<logger name="com.avaje.ebean.config.PropertyMapLoader" level="OFF" />
<logger name="com.avaje.ebeaninternal.server.core.XmlConfigLoader" level="OFF" />
<logger name="com.avaje.ebeaninternal.server.lib.BackgroundThread" level="OFF" />
<logger name="com.gargoylesoftware.htmlunit.javascript" level="OFF" />

<root level="INFO">
    <appender-ref ref="ASYNCFILE" />
    <appender-ref ref="ASYNCSTDOUT" />
</root>

</configuration>

project 設定

修改 application.conf,slick 並不是使用 JDBC 的 DB connection,要另外設定 slick.dbs

#slick.dbs.default.driver="slick.driver.H2Driver$"
#slick.dbs.default.db.driver=org.h2.Driver
#slick.dbs.default.db.url="jdbc:h2:mem:play;DB_CLOSE_DELAY=-1"
//slick.dbs.default.db.user=user
//slick.dbs.default.db.password=""

slick.dbs.default.driver="slick.driver.MySQLDriver$"
slick.dbs.default.db.driver=com.mysql.jdbc.Driver
slick.dbs.default.db.url="jdbc:mysql://localhost:3306/playdb?useUnicode=true&amp;characterEncoding=utf-8"
slick.dbs.default.db.user="root"
slick.dbs.default.db.password="max168kit"
# HikariCP connection pool the min size is numThreads, and the max size is numThreads * 5
slick.dbs.default.db.numThreads=5
slick.dbs.default.db.queueSize=30
slick.dbs.default.db.connectionTimeout=15s
slick.dbs.default.db.connectionTestQuery="select 1"

ref: connection pool

DB evolution

slick 的 db evolution 跟 JDBC 的部分一樣。

application.conf

play.evolutions {
  # You can disable evolutions for a specific datasource if necessary
  db.default.enabled = true
  autoApply = true
  autoApplyDowns = true
}

conf/evolutons.default/1.sql

# DC schema
 
# --- !Ups


CREATE TABLE PROJECT (
    ID integer NOT NULL AUTO_INCREMENT PRIMARY KEY,
    NAME varchar(255) NOT NULL
);


CREATE TABLE TASK (
    ID integer NOT NULL AUTO_INCREMENT PRIMARY KEY,
    COLOR varchar(255) NOT NULL,
    STATUS varchar(255) NOT NULL,
    PROJECT integer NOT NULL,
    FOREIGN KEY (PROJECT) REFERENCES PROJECT (ID)
);


 
# --- !Downs

DROP TABLE TASK;
DROP TABLE PROJECT;

scala codes

這個部分的 code 都是由 play-slick3-example 這個 template 來的,並沒有做什麼修改,主要可以發現,Application 都是使用 Action.async 搭配 Future 的方式,進行非同步的處理。

conf/routes URI 的設定用到了 PUT, PATCH 比較少見的 HTTP Method,我們在測試時,可以用 Chrome Postman 進行測試。

GET           /                          controllers.Application.listProjects
PUT           /projects/:name            controllers.Application.createProject(name: String)
GET           /projects/list             controllers.Application.listProjects
GET           /projects/:id              controllers.Application.projects(id: Long)
PUT           /projects/:id/:name        controllers.Application.addTaskToProject(name: String, id: Long)
PATCH         /tasks/:id                 controllers.Application.modifyTask(id: Long, color:Option[String] ?= None)

DELETE        /projects/:name            controllers.Application.delete(name: String)
  • app/controllers/Application.scala
package controllers

import java.util.concurrent.{TimeoutException, TimeUnit}
import javax.inject.Inject

import akka.actor.ActorSystem
import models.{Project, ProjectRepo, TaskRepo}
import play.api.Logger
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.mvc.{Action, Controller}

import akka.pattern.after
import scala.concurrent.duration._
import scala.concurrent.Future

class Application @Inject()( projectRepo: ProjectRepo, taskRepo: TaskRepo, actorSystem: ActorSystem)
                           extends Controller {

  def addTaskToProject(color: String, projectId: Long) = Action.async { implicit rs =>
    projectRepo.addTask(color, projectId)
      .map{ _ =>  Redirect(routes.Application.projects(projectId)) }
  }

  def modifyTask(taskId: Long, color: Option[String]) = Action.async { implicit rs =>
    taskRepo.partialUpdate(taskId, color, None, None).map(i =>
    Ok(s"Rows affected : $i"))
  }
  def createProject(name: String)= Action.async { implicit rs =>
    projectRepo.create(name)
      .map(id => Ok(s"project $id created") )
  }

  def listProjects = Action.async { implicit rs =>
    projectRepo.all
      .map(projects => Ok(views.html.projects(projects)))
  }

  def projects(id: Long) = Action.async { implicit rs =>
    for {
      Some(project) <-  projectRepo.findById(id)
      tasks <- taskRepo.findByProjectId(id)
    } yield Ok(views.html.project(project, tasks))
  }

  def delete(name: String) = Action.async { implicit rs =>
    projectRepo.delete(name).map(num => Ok(s"$num projects deleted"))
  }
}
  • app/models/Project.scala

ProjectsTable 的部分是在定義 Projects

package models

import javax.inject.Inject
import play.api.Logger
import play.api.db.slick.DatabaseConfigProvider
import slick.dbio
import slick.dbio.Effect.Read
import slick.driver.JdbcProfile
import slick.jdbc.GetResult
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

case class Project(id: Long, name: String)


class ProjectRepo @Inject()(taskRepo: TaskRepo)(protected val dbConfigProvider: DatabaseConfigProvider) {

  val dbConfig = dbConfigProvider.get[JdbcProfile]
  val db = dbConfig.db
  import dbConfig.driver.api._
  private val Projects = TableQuery[ProjectsTable]

  private def _findById(id: Long): DBIO[Option[Project]] =
    Projects.filter(_.id === id).result.headOption

  private def _findByName(name: String): Query[ProjectsTable, Project, List] =
    Projects.filter(_.name === name).to[List]

  def findById(id: Long): Future[Option[Project]] =
    db.run(_findById(id))

  def findByName(name: String): Future[List[Project]] =
    db.run(_findByName(name).result)

  def all: Future[List[Project]] =
    db.run(Projects.to[List].result)

  def create(name: String): Future[Long] = {
    val project = Project(0, name)
    db.run(Projects returning Projects.map(_.id) += project)
  }

  def delete(name: String): Future[Int] = {
    val query = _findByName(name)

    val interaction = for {
      projects        <- query.result
      _               <- DBIO.sequence(projects.map(p => taskRepo._deleteAllInProject(p.id)))
      projectsDeleted <- query.delete
    } yield projectsDeleted

    db.run(interaction.transactionally)
  }

  def addTask(color: String, projectId: Long): Future[Long] = {
    val interaction = for {
      Some(project) <- _findById(projectId)
      id <- taskRepo.insert(Task(0, color, TaskStatus.ready, project.id))
    } yield id

    db.run(interaction.transactionally)
  }


  // 定義 Project 這個 table
  private class ProjectsTable(tag: Tag) extends Table[Project](tag, "PROJECT") {

    // primary key 為 ID
    def id = column[Long]("ID", O.AutoInc, O.PrimaryKey)
    def name = column[String]("NAME")

    def * = (id, name) <> (Project.tupled, Project.unapply)
    def ? = (id.?, name.?).shaped.<>({ r => import r._; _1.map(_ => Project.tupled((_1.get, _2.get))) }, (_: Any) => throw new Exception("Inserting into ? projection not supported."))

  }
}
  • app/models/Task.scala

比較特別的地方,是 TaskStatus,還有 taskStatusColumnType 做自動轉換,這裡對應到一個 DB 欄位,有 ready/set/go 這三種數值,所以用 Enumeration 定義該欄位。

package models

import javax.inject.Inject

import play.api.db.slick.DatabaseConfigProvider

import slick.driver.JdbcProfile

import scala.concurrent.Future

case class Task(id: Long, color: String, status: TaskStatus.Value, project: Long) {

  def patch(color: Option[String], status: Option[TaskStatus.Value], project: Option[Long]): Task =
    this.copy(color = color.getOrElse(this.color),
              status = status.getOrElse(this.status),
              project = project.getOrElse(this.project))

}

object TaskStatus extends Enumeration {
  val ready = Value("ready")
  val set = Value("set")
  val go = Value("go")
}

class TaskRepo @Inject()(protected val dbConfigProvider: DatabaseConfigProvider) {
  val dbConfig = dbConfigProvider.get[JdbcProfile]
  val db = dbConfig.db
  import dbConfig.driver.api._
  private val Tasks = TableQuery[TasksTable]


  def findById(id: Long): Future[Task] =
    db.run(Tasks.filter(_.id === id).result.head)

  def findByColor(color: String): DBIO[Option[Task]] =
    Tasks.filter(_.color === color).result.headOption

  def findByProjectId(projectId: Long): Future[List[Task]] =
    db.run(Tasks.filter(_.project === projectId).to[List].result)

  def findByReadyStatus: DBIO[List[Task]] =
    Tasks.filter(_.status === TaskStatus.ready).to[List].result

  def partialUpdate(id: Long, color: Option[String], status: Option[TaskStatus.Value], project: Option[Long]): Future[Int] = {
    import scala.concurrent.ExecutionContext.Implicits.global

    val query = Tasks.filter(_.id === id)

    val update = query.result.head.flatMap {task =>
      query.update(task.patch(color, status, project))
    }

    db.run(update)
  }

  def all(): DBIO[Seq[Task]] =
    Tasks.result

  def insert(Task: Task): DBIO[Long] =
    Tasks returning Tasks.map(_.id) += Task

  def _deleteAllInProject(projectId: Long): DBIO[Int] =
    Tasks.filter(_.project === projectId).delete

  private class TasksTable(tag: Tag) extends Table[Task](tag, "TASK") {

    def id = column[Long]("ID", O.AutoInc, O.PrimaryKey)
    def color = column[String]("COLOR")
    def status = column[TaskStatus.Value]("STATUS")
    def project = column[Long]("PROJECT")

    def * = (id, color, status, project) <> (Task.tupled, Task.unapply)
    def ? = (id.?, color.?, status.?, project.?).shaped.<>({ r => import r._; _1.map(_ => Task.tupled((_1.get, _2.get, _3.get, _4.get))) }, (_: Any) => throw new Exception("Inserting into ? Taskion not supported."))
  }

  implicit val taskStatusColumnType = MappedColumnType.base[TaskStatus.Value, String](
    _.toString, string => TaskStatus.withName(string))

}

測試

Postman 是一個 Chome APP,可以進行 http 測試

plain SQL in slick

如果要在 slick 裡面使用 SQL,則用別的方式進行。

ref: slick plain sql play-slick 版本對應

activator 中有個 sample template,但只拿來參考而已。

activator new test7 slick-plainsql-3.0
  • conf/routes

首先在 routes 的地方加上 URI

GET           /pj/:id                    controllers.Application.getproject(id:Long)
GET           /pj2/:id                   controllers.Application.getproject2(id:Long)
GET           /pj3/:id                   controllers.Application.getproject3(id:Long)

GET           /update/:id/:name          controllers.Application.updateproject(id:Long, name:String)

Application 的 constructor 要 @Inject() actorSystem: ActorSystem,getproject 的部分是在測試 Future 的寫法,這個部分故意留下錯誤的寫法,因為 Future 區塊裡面的 callback codes,是使用不同的 thread 執行的。

getproject3 加上了非同步的 timeout 檢查,如果 2 seconds 後沒有完成,就會產生 Excetpion。

package controllers

import java.util.concurrent.{TimeoutException, TimeUnit}
import javax.inject.Inject

import akka.actor.ActorSystem
import models.{Project, ProjectRepo, TaskRepo}
import play.api.Logger
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.mvc.{Action, Controller}

import akka.pattern.after
import scala.concurrent.duration._
import scala.concurrent.Future

class Application @Inject()( projectRepo: ProjectRepo, taskRepo: TaskRepo, actorSystem: ActorSystem)
                           extends Controller {

  def getproject(id:Long) = Action {
    Logger.info(s"getproject id=${id}")
    var result = "DB project:\n"

    val pjs: Future[Seq[String]] = projectRepo.findByIdCustom2(id)
    pjs.map{
          // 因為是非同步,這裡是在不同 thread 執行的
      cs => {
        for(c<-cs) {
          Logger.info("c="+c.toString)
          // c=test
          result += c.toString
          Logger.info(s"result=$result")
        }
      }
    }
    // 這是錯誤的寫法
    // 只有 DB project:  沒有 db 查詢的結果....  錯誤 的結果
    Logger.info(s"result=$result")
    Ok(result)
  }

  def getproject2(id: Long) = Action.async {

    //val futureNumRowsDeleted = scala.concurrent.Future{ Transaction.delete(id) }
    val pjs: Future[Seq[String]] = projectRepo.findByIdCustom2(id)

    pjs.map {
      var result = "DB project:\n"
      cs => {
        for (c <- cs) {
          Logger.info("c=" + c.toString)
          // c=test
          result += c.toString+" "
          Logger.info(s"result=$result")
        }
      }
        Logger.info(s"result=$result")
        Ok(result)
    }
  }

  def getproject3(id: Long) = Action.async {

    //val futureNumRowsDeleted = scala.concurrent.Future{ Transaction.delete(id) }
    val pjs: Future[Seq[Project]] = projectRepo.findByIdCustom3(id)

    //val timeout = play.api.libs.concurrent.Promise.timeout("Past max time", 2, TimeUnit.SECONDS)

    //val timeoutFuture = after(2.second, actorSystem.scheduler)(Future.successful("Oops"))
    val timeoutFuture = after(2.second, actorSystem.scheduler)(Future.failed(new TimeoutException("Future timed out!")))

    Future.firstCompletedOf(Seq(pjs, timeoutFuture)).map {
      case cs: Seq[Project]  => {
        var result = "DB project:\n"
        for (c <- cs) {
          Logger.info("c=" + c.name)
          // c=test
          result += c.name + " "
          Logger.info(s"result=$result")
        }
        Ok(result)
      }
      case t: TimeoutException => InternalServerError(t.getMessage)
    }
  }
  
  def updateproject(id: Long, name:String) = Action.async {
    val pjs: Future[Int] = projectRepo.updateproject(id, name)

    val timeoutFuture = after(2.second, actorSystem.scheduler)(Future.failed(new TimeoutException("Future timed out!")))

    Future.firstCompletedOf(Seq(pjs, timeoutFuture)).map {
      case cs: Int  => {
        val result = s"DB update result:${cs}\n"
        Ok(result)
      }
      //case t: Any => InternalServerError()
    }
  }
}

app/models/Project.scala

findByIdCustom3 是在測試直接轉換成 Project 物件的方法。

package models

import javax.inject.Inject
import play.api.Logger
import play.api.db.slick.DatabaseConfigProvider
import slick.dbio
import slick.dbio.Effect.Read
import slick.driver.JdbcProfile
import slick.jdbc.GetResult
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

case class Project(id: Long, name: String)


class ProjectRepo @Inject()(taskRepo: TaskRepo)(protected val dbConfigProvider: DatabaseConfigProvider) {

  val dbConfig = dbConfigProvider.get[JdbcProfile]
  val db = dbConfig.db
  import dbConfig.driver.api._
  private val Projects = TableQuery[ProjectsTable]

  /////////////

  def findByIdCustom2(id:Long): Future[Seq[String]] = {
    val query = sql"select NAME from PROJECT where ID=$id".as[(String)]
    //db.run(query)
    Logger.info("findByIdCustom2")
    //db.run(query)

    val f: Future[Seq[String]] = db.run(query)

    //f.onSuccess { case s => println(s"Result: $s") }

    f
  }

  implicit val getProjectResult = GetResult(r => Project(r.nextLong, r.nextString))

  def findByIdCustom3(id:Long): Future[Seq[Project]] = {
    // as[(Project)] 的部分會參考到 上面的 getProjectResult 的 GetResult,並自動轉換為 Project 物件
    val query = sql"select ID, NAME from PROJECT where ID=$id".as[(Project)]
    //db.run(query)
    Logger.info("findByIdCustom3")
    //db.run(query)

    val f: Future[Seq[Project]] = db.run(query)

    //f.onSuccess { case s => println(s"Result: $s") }

    f
  }
  
  def updateproject(id:Long, name:String): Future[Int] = {
    val update = sqlu"update PROJECT set name=$name where ID=$id"
    //db.run(query)
    Logger.info("updateproject")
    //db.run(query)

    val f: Future[Int] = db.run(update)

    //f.onSuccess { case s => println(s"Result: $s") }

    f
  }
}

測試就直接用 curl 就可以了

curl -v 'http://localhost:9000/pj/1'
curl -v 'http://localhost:9000/pj2/1'
curl -v 'http://localhost:9000/pj3/1'

curl -v 'http://localhost:9000/update/1/test2'

2016/11/07

JDBC Database Persistence in Scala play 2.5

在 scala play 2.5 framework 中,要將資料儲存在 DB 中有好幾種方式,都是以整合第三方套件的方式實作,我們測試了原始的 JDBC 以及 slick 兩種,以下是 JDBC 的部分。

準備資料庫

雖然很多範例都是以 H2 memory database 展示,不過我們還是以 Maria DB 進行測試,首先要準備資料庫。

  • create mysql database: playdb
CREATE DATABASE playdb DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
  • create table: user
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`login_id` varchar(45) NOT NULL,
`password` varchar(50) NOT NULL,
`name` varchar(45) DEFAULT NULL,
`dob` bigint(20) DEFAULT NULL,
`is_active` tinyint(1) NOT NULL DEFAULT '1',
PRIMARY KEY (`id`),
UNIQUE KEY `login_id_UNIQUE` (`login_id`),
UNIQUE KEY `id_UNIQUE` (`id`)
)

BEGIN;
INSERT INTO `user` (login_id, password, name) VALUES ('test1', 'test1', 'test1'), ('test2', 'test2', 'test2');
COMMIT;

準備 scala play project

  • 以 activator 產生一個 scala play project
activator new test5 play-scala
  • 修改 build.sbt

增加 jdbc 及 "mysql" % "mysql-connector-java" % "5.1.36",

name := """test5"""

version := "1.0-SNAPSHOT"

lazy val root = (project in file(".")).enablePlugins(PlayScala)

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  jdbc,
  cache,
  ws,
  filters,
  "mysql" % "mysql-connector-java" % "5.1.36",
  "org.scalatestplus.play" %% "scalatestplus-play" % "1.5.1" % Test
)

resolvers += "scalaz-bintray" at "http://dl.bintray.com/scalaz/releases"
  • application.conf 增加 default db 的設定

scala play 是以 HikariCP 作為 DB connection pool library,可以在 application.conf 中調整 DB 及 connection pool 設定。

play.db {
  config = "db"
  default = "default"

  prototype {
    # Sets a fixed JDBC connection pool size of 2
    hikaricp.minimumIdle = 2
    hikaricp.maximumPoolSize = 5
  }
}

db {

  default.driver=com.mysql.jdbc.Driver
  default.url="jdbc:mysql://localhost:3306/playdb?useUnicode=true&amp;characterEncoding=utf-8"
  default.username = "root"
  default.password = "password"

  default.logSql=true
}

要確認 connection pool 有沒有作用,可以直接在 MySQL client 中,用以下的指令查看 DB 的資訊。

# 查看資料庫狀態
show status;

# 列出連線的數量

show status where `variable_name` = 'Threads_connected';

# 列出有哪些session
show processlist;

# 列出所有的session
show full processlist;
# 列出目前連線最多的 IP
mysql -uroot -p -e "show processlist"|awk '{print $3}' |awk -F: '{print $1}' |sort |uniq -c |sort -nr
  • 修改 routes

因為是測試而已,簡單將 Application 的 Action 各自對應到獨立的 URI。

GET        /                     controllers.Application.index

GET        /dbUser               controllers.Application.fetchDBUser

GET        /dbUser2              controllers.Application.fetchDBUser2

GET        /addUser/:name        controllers.Application.addUser(name:String)
  • 修改 controllers/Application.scala

注意 Application 的 constructor,因為 Play 2.5 已經使用 Inject 的方式取得資源,我們要使用的 DB 要在 constructor 中引用進來: @NamedDatabase("default") db: Database。

後面使用 db 的部分,就跟一般的 JDBC 差不多,createStament 以後,再執行 query 或是 update。

package controllers

import javax.inject.Inject

import play.api.mvc._
import play.api.db._

class Application @Inject()(@NamedDatabase("default") db: Database) extends Controller {

  def index = Action {
    Ok(views.html.main())
  }

  def fetchDBUser = Action {
    var result = "DB User:\n"
    val conn = db.getConnection()
    try {
      val rs = conn.createStatement().executeQuery("SELECT * from user")
      while (rs.next()) {
        result += rs.getString("login_id") + "\n"
      }
    } finally {
      conn.close()
    }
    Ok(result)
  }

  // without try blocks,db.withConnection 的 connection 會在結束時自動關閉
  def fetchDBUser2 = Action {
    var result = "DB User:" + "\n"
    db.withConnection { conn =>
      val rs = conn.createStatement().executeQuery("SELECT * from user")
      while (rs.next()) {
        result += rs.getString("login_id") + "\n"
      }
    }
    Ok(result)
  }

  def addUser(name: String) = Action {
    db.withTransaction { conn =>
      val rs = conn.createStatement().executeUpdate(s"insert into user (login_id, password, name) values('$name', '$name', '$name')")
    }
    Ok
  }

}

測試

> curl 'http://localhost:9000/dbUser'
DB User:
test1
test2
> curl 'http://localhost:9000/dbUser2'
DB User:
test1
test2
> curl -v 'http://localhost:9000/addUser/test3'
*   Trying ::1...
* Connected to localhost (::1) port 9000 (#0)
> GET /addUser/test4 HTTP/1.1
> Host: localhost:9000
> User-Agent: curl/7.50.1
> Accept: */*
>
< HTTP/1.1 200 OK
< X-ExampleFilter: foo
< Content-Length: 0
< Date: Thu, 11 Aug 2016 07:58:02 GMT
<
* Connection #0 to host localhost left intact

JDBC 的 logback 設定

如果要讓 log 記錄 SQL statement,就修改 logback.xml,增加這三行,但不建議在 production 環境加上這個設定

  <logger name="org.jdbcdslog.ConnectionLogger" level="OFF"  /> <!-- Won' log connections -->
  <logger name="org.jdbcdslog.StatementLogger"  level="INFO" /> <!-- Will log all statements -->
  <logger name="org.jdbcdslog.ResultSetLogger"  level="OFF"  /> <!-- Won' log result sets -->

完整的 logback.xml 內容如下

<!-- https://www.playframework.com/documentation/latest/SettingsLogger -->
<configuration>

  <conversionRule conversionWord="coloredLevel" converterClass="play.api.libs.logback.ColoredLevel" />

<!--
  <appender name="FILE" class="ch.qos.logback.core.FileAppender">
    <file>${application.home:-.}/logs/application.log</file>
    <encoder>
      <pattern>%date [%level] from %logger in %thread\n\t%message%n%xException</pattern>
    </encoder>
  </appender>
-->

<appender name="FILE"
          class="ch.qos.logback.core.rolling.RollingFileAppender">
    <append>true</append>
    <rollingPolicy
            class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
        <param name="FileNamePattern"
               value="${application.home:-.}/logs/application.%d{yyyy-MM-dd}.log.zip">
        </param>
    </rollingPolicy>
    <encoder>
        <!-- <pattern>%d %-5p %c %L%n %m%n</pattern> -->
        <!-- <charset class="java.nio.charset.Charset">UTF-8</charset>  -->
        <pattern>%date [%level] from %logger in %thread\n\t%message%n%xException</pattern>
    </encoder>
</appender>

  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%coloredLevel %logger{15} in %thread\n\t%message%n%xException{10}</pattern>
    </encoder>
  </appender>

  <appender name="ASYNCFILE" class="ch.qos.logback.classic.AsyncAppender">
    <appender-ref ref="FILE" />
  </appender>

  <appender name="ASYNCSTDOUT" class="ch.qos.logback.classic.AsyncAppender">
    <appender-ref ref="STDOUT" />
  </appender>

  <logger name="play" level="INFO" />
  <logger name="application" level="DEBUG" />

  <logger name="org.jdbcdslog.ConnectionLogger" level="OFF"  /> <!-- Won' log connections -->
  <logger name="org.jdbcdslog.StatementLogger"  level="INFO" /> <!-- Will log all statements -->
  <logger name="org.jdbcdslog.ResultSetLogger"  level="OFF"  /> <!-- Won' log result sets -->

  <!-- Off these ones as they are annoying, and anyway we manage configuration ourselves -->
  <logger name="com.avaje.ebean.config.PropertyMapLoader" level="OFF" />
  <logger name="com.avaje.ebeaninternal.server.core.XmlConfigLoader" level="OFF" />
  <logger name="com.avaje.ebeaninternal.server.lib.BackgroundThread" level="OFF" />
  <logger name="com.gargoylesoftware.htmlunit.javascript" level="OFF" />

  <root level="WARN">
    <appender-ref ref="ASYNCFILE" />
    <appender-ref ref="ASYNCSTDOUT" />
  </root>

</configuration>

這樣就可以在 log 中看到 SQL statement,當然這個設定只能用在 Dev 環境,不適合用在 Production。

[info] o.j.StatementLogger in application-akka.actor.default-dispatcher-2
    java.sql.Statement.executeQuery: SELECT * from user;
[info] o.j.StatementLogger in application-akka.actor.default-dispatcher-3
    java.sql.Statement.executeUpdate: insert into user (login_id, password, name) values('test4', 'test4', 'test4');

Database Evolution

scala play 內建了 db 升降版本的功能,如果要使用,必須先準備一個全新的空白的資料庫,但先不要加上 tables。

  • build.sbt 中增加 libraryDependencies += evolutions
name := """test5"""

version := "1.0-SNAPSHOT"

lazy val root = (project in file(".")).enablePlugins(PlayScala)

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  jdbc,
  evolutions,
  cache,
  ws,
  filters,
  "mysql" % "mysql-connector-java" % "5.1.36",
  "com.typesafe.play" %% "anorm" % "2.5.0",
  //"com.typesafe.play" %% "play-slick" % "2.0.0",
  //"com.typesafe.play" %% "play-slick-evolutions" % "2.0.0",
  "org.scalatestplus.play" %% "scalatestplus-play" % "1.5.1" % Test
)

resolvers += "scalaz-bintray" at "http://dl.bintray.com/scalaz/releases"

application.conf 增加 ply.evolutions 這個部分的設定

play.evolutions {
  # You can disable evolutions for a specific datasource if necessary
  db.default.enabled = true
  autoApply = true
  autoApplyDowns = true
}
  • 新增兩個 sql file

DB evolution 是以 sql file 的方式,進行 DB 版本升降,要注意 sql file 裡面規定一定要有 Ups 以及 Downs 這兩個部分。

/conf/evolutions/default/1.sql

# Users schema

# --- !Ups

CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`login_id` varchar(45) NOT NULL,
`password` varchar(50) NOT NULL,
`name` varchar(45) DEFAULT NULL,
`dob` bigint(20) DEFAULT NULL,
`is_active` tinyint(1) NOT NULL DEFAULT '1',
PRIMARY KEY (`id`),
UNIQUE KEY `login_id_UNIQUE` (`login_id`),
UNIQUE KEY `id_UNIQUE` (`id`)
);

INSERT INTO `user` (login_id, password, name) VALUES ('test1', 'test1', 'test1'), ('test2', 'test2', 'test2');


# --- !Downs

DROP TABLE user;

/conf/evolutions/default/2.sql

# Users schema

# --- !Ups

CREATE TABLE temp (
    id bigint(20) NOT NULL AUTO_INCREMENT,
    email varchar(255) NOT NULL,
    password varchar(255) NOT NULL,
    fullname varchar(255) NOT NULL,
    isAdmin boolean NOT NULL,
    PRIMARY KEY (id)
);

# --- !Downs

DROP TABLE temp;
  • 測試 evolution

啟動 play,如果有了第一個 DB 連線,就會進行 1.sql, 2.sql 建立兩個 tables,如果想要降版,就把 2.sql 改成 2.sql.bak,server code reload 後,有了第一個 DB 連線,就會自動降版。

mysql DB 裡面會自動產生一個 table: play_evolutions,他會記錄每一次 DB 升降版本執行的 db script。

DROP TABLE IF EXISTS `play_evolutions`;
CREATE TABLE `play_evolutions` (
  `id` int(11) NOT NULL,
  `hash` varchar(255) NOT NULL,
  `applied_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `apply_script` mediumtext,
  `revert_script` mediumtext,
  `state` varchar(255) DEFAULT NULL,
  `last_problem` mediumtext,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Play DB Evolutions