2024/09/30

RTSP

RTSP是1996年由 RealNetworks, Netscape, 哥倫比亞大學開發,提交草案給 IETF,1998年發布為 RFC2326,2016年RTSP 2.0 發布於 RFC 7826。RTSP 是串流媒體伺服器的控制協定,可建立與控制終端設備跟伺服器之間的多媒體 session。

RTSP 協定本身看起來跟 HTTP 類似,但 HTTP 本身是 stateless,而RTSP 是 stateful,故需要追蹤 session。RTSP 是用 TCP 連線,而多媒體本身,是用 RTP 傳輸,可以用 TCP 或 UDP,常見狀況是為求傳輸速度快,使用 UDP。

節錄一個 RTSP client 取得 RTSP 的封包過程

  1. Options

    查詢 RTSP server 支援的 command

    Client -> Server

    Request: OPTIONS rtsp://192.168.1.11:8554/mystream RTSP/1.0\r\n
    CSeq: 2\r\n
    User-Agent: LibVLC/3.0.20 (LIVE555 Streaming Media v2016.11.28)\r\n
    \r\n

    Server -> Client

    Response: RTSP/1.0 200 OK\r\n
    CSeq: 2\r\n
    Public: DESCRIBE, ANNOUNCE, SETUP, PLAY, RECORD, PAUSE, GET_PARAMETER, TEARDOWN\r\n
    Server: gortsplib\r\n
    \r\n
  2. Describe

    查詢可處理的多媒體資料格式,server 以 SDP 方式回覆

    Client -> Server

    Request: DESCRIBE rtsp://192.168.1.11:8554/mystream RTSP/1.0\r\n
    CSeq: 3\r\n
    User-Agent: LibVLC/3.0.20 (LIVE555 Streaming Media v2016.11.28)\r\n
    Accept: application/sdp\r\n
    \r\n

    Server -> Client

    Response: RTSP/1.0 200 OK\r\n
    CSeq: 3\r\n
    Content-Base: rtsp://192.168.1.11:8554/mystream/\r\n
    Content-length: 560
    Content-type: application/sdp
    Server: gortsplib\r\n
    \r\n
    Session Description Protocol Version (v): 0
    Owner/Creator, Session Id (o): - 0 0 IN IP4 127.0.0.1
    Session Name (s): test
    Connection Information (c): IN IP4 0.0.0.0
    Time Description, active time (t): 0 0
    Media Description, name and address (m): video 0 RTP/AVP 96
    Media Attribute (a): control:rtsp://192.168.1.11:8554/mystream/trackID=0
    Media Attribute (a): rtpmap:96 H264/90000
    Media Attribute (a): fmtp:96 packetization-mode=1; profile-level-id=640032; sprop-parameter-sets=Z2QAMqzIUB4AiflwEQAAAwPpAAC7gA8YMZY=,aOk4XLIs
    Media Description, name and address (m): audio 0 RTP/AVP 97
    Media Attribute (a): control:rtsp://192.168.1.11:8554/mystream/trackID=1
    Media Attribute (a): rtpmap:97 mpeg4-generic/48000/2
    Media Attribute (a): fmtp:97 config=1190; indexdeltalength=3; indexlength=3; mode=AAC-hbr; profile-level-id=1; sizelength=13; streamtype=5
  3. Setup

    要求 server 設定傳送某一個 media stream,setup 要在 play 之前完成

    Client -> Server

    Request: SETUP rtsp://192.168.1.11:8554/mystream/trackID=0 RTSP/1.0\r\n
    CSeq: 4\r\n
    User-Agent: LibVLC/3.0.20 (LIVE555 Streaming Media v2016.11.28)\r\n
    Transport: RTP/AVP/TCP;unicast;interleaved=0-1
    \r\n

    Server -> Client

    Response: RTSP/1.0 200 OK\r\n
    CSeq: 4\r\n
    Server: gortsplib\r\n
    Session: 262553a7d5084e8fb57f9d8f485c89f4
    Transport: RTP/AVP/TCP;unicast;interleaved=0-1;ssrc=03DAD41B
    \r\n
  4. Setup

    client 透過 setup 跟 server 確認 stream session

    Client -> Server

    Request: SETUP rtsp://192.168.1.11:8554/mystream/trackID=1 RTSP/1.0\r\n
    CSeq: 5\r\n
    User-Agent: LibVLC/3.0.20 (LIVE555 Streaming Media v2016.11.28)\r\n
    Transport: RTP/AVP/TCP;unicast;interleaved=2-3
    Session: 262553a7d5084e8fb57f9d8f485c89f4
    \r\n

    Server -> Client

    Response: RTSP/1.0 200 OK\r\n
    CSeq: 5\r\n
    Server: gortsplib\r\n
    Session: 262553a7d5084e8fb57f9d8f485c89f4
    Transport: RTP/AVP/TCP;unicast;interleaved=2-3;ssrc=294122AE
    \r\n
  5. Play

    在 setup 設定的 session 裡面,開始播放 media

    Client -> Server

    Request: PLAY rtsp://192.168.1.11:8554/mystream/ RTSP/1.0\r\n
    CSeq: 6\r\n
    User-Agent: LibVLC/3.0.20 (LIVE555 Streaming Media v2016.11.28)\r\n
    Session: 262553a7d5084e8fb57f9d8f485c89f4
    Range: npt=0.000-\r\n
    \r\n

    Server -> Client

    Response: RTSP/1.0 200 OK\r\n
    CSeq: 6\r\n
    RTP-Info: url=rtsp://192.168.1.11:8554/mystream/trackID=0;seq=10453;rtptime=1024864644,url=rtsp://192.168.1.11:8554/mystream/trackID=1;seq=4824;rtptime=3779844790\r\n
    Server: gortsplib\r\n
    Session: 262553a7d5084e8fb57f9d8f485c89f4
    \r\n
  6. RTP

    用 RTP 的格式,傳送 media 內容

    Server -> Client

    10.. .... = Version: RFC 1889 Version (2)
    ..0. .... = Padding: False
    ...0 .... = Extension: False
    .... 0000 = Contributing source identifiers count: 0
    0... .... = Marker: False
    Payload type: DynamicRTP-Type-96 (96)
    Sequence number: 10453
    Timestamp: 1024871144
    Synchronization Source identifier: 0x03dad41b (64672795)
    Payload: 09f0
  7. Teardown

    client 通知 server 停止播放 media

    Client -> Server

    Request: TEARDOWN rtsp://192.168.1.11:8554/mystream/ RTSP/1.0\r\n
    CSeq: 7\r\n
    User-Agent: LibVLC/3.0.20 (LIVE555 Streaming Media v2016.11.28)\r\n
    Session: 262553a7d5084e8fb57f9d8f485c89f4
    \r\n

References

即時串流協定 - 維基百科,自由的百科全書

2024/09/23

Secure Reliable Transport (SRT)

Secure Reliable Transport (SRT) 是開放的 video transport protocol,由 Halvision 提出,目的是要提供一個加密、低延遲的視訊串流傳輸協定,過去比較常見的協定是 RTMP, RTSP, HLS, WebRTC。SRT 是在 2013 年由 Haivision 發表,後來 Haivision 在 2017 年將 protocol 開放,交給 SRT Alliance,然後慢慢有更多廠商支援這個協定。

在這麼長久的網路視訊串流發展歷史中,RTMP 常見於網路影片直播,尤其是行動網路的直播,RTSP 常見於網路攝影機。RTMP 是以 TCP 為基礎,因為發展當時的網路頻寬不大,必須要用 TCP 本身的連線穩定度,封包傳送機制,來確保網路直播的可用性。

SRT 則是完全使用 UDP,在 Haivision 的文件中,提出 SRT 的延遲比 RTMP 少 2.5~3.2 倍。SRT 跟 RTP 的差異是,SRT 借鑒了 RTMP 控制機制,傳輸中除了 video 資料封包,還有控制封包,控制封包可根據網路延遲及品質,動態調整發送端的 video 發送速度,也能有限制地決定要不要重傳遺失的封包。

SRT 可套用加密機制,使用最常見的 AES-128/256 加密方法。

SRT 只是一種影片切割與包裝的方法,因此能適用於任何一種影片 codec。

Server

GitHub - Edward-Wu/srt-live-server: srt live server for low latency 這是一個 SRT streaming server。

安裝前,必須要先安裝 GitHub - Haivision/srt: Secure, Reliable, Transport SRT library,我們在 CentOS 測試,根據這個文件說明,依照以下步驟安裝

sudo yum install tcl pkgconfig openssl-devel cmake gcc gcc-c++ make automake
./configure
make
make install

安裝 SRT library 後,可安裝 server,下載 srt-live-server-master.zip,解壓縮後,直接 make 即可

sudo make

執行

cd bin
./sls -c ../sls.conf

Client

測試 SRT 可使用 OBS Studio

發布的網址為

srt://192.168.1.11:8080?streamid=uplive.sls.com/live/test

接收部分,可用 VLC video player 測試,網址為

srt://192.168.1.11:8080?streamid=live.sls.com/live/test

實際上實測,OBS 發佈到 VLC 接收,大約有 4~5 秒的延遲

iOS app

可安裝 Haivision Play Pro - Haivision iOS APP,這個 app 也可以接收 SRT streaming

設定方式如下

References

什麼是SRT 安全可靠傳輸協議

Secure Reliable Transport - Wikipedia

【ProAV Lab】SRT,互聯網上的最佳視訊串流協定 | Lumens

RTMP vs. SRT: Comparing Latency and Maximum Bandwidth - Haivision

2024/09/09

InfiniBand IB

作為網路互連的技術方案,Ethernet 跟 InfiniBand (IB) 兩者的發展目標不同,導致現在應用的領域跟範圍都不同。

  • 使用場景

一般人比較常聽到 Ethernet,Ethernet 用在區域網路中,可將多個網路設備連接起來,以光纖連接 Internet,以無線網路連接手持設備。InfiniteBand 主要用在HPC 與 data center,這類對高頻寬低延遲的網路連接需求較高的應用場景

  • 頻寬

Ethernet 的應用發展,並不是以高速頻寬為主要發展的重點,他的重點在讓多種異質網路終端能夠互相連接起來,所以頻寬並不是發展的重點。通常 Ethernet 是在 1Gbps 到 100 Gbps,而InfiniBand 可到 100 Gbps 或 200 Gbps,像這份報導的說明,InfiniBand進入400Gb/s世代,Nvidia新款網路交換器揭開序幕 | iThome 已經到了 400Gbps 的時代。

  • 應用範圍

Ethneret 面向一般終端的消費者,用來做個人使用的資料傳輸並連接 Internet。InfiniBand 是用在大量的資料運算,在現在最熱門的 AI 時代,要建構一個 AI cluster,會使用 InfiniBand 作為 cluster 節點的連接技術。

InfiniBand 另一個應用是在超級電腦裡面,因為超級電腦叢集也需要這種高速低延遲的資料傳輸。

  • 成本

InfiniBnad 的效能高,相對成本就很高,沒有特殊的需求,一般是不會使用 InfiniBand

  • 網路模式

InfiniBand 比 Ethernet 容易管理,每一個 end node 會透過一個 layer 2 switch 配置網路節點 ID,再往上以 Router 統計計算網路資料轉發路徑。

Ethernet 是用硬體的 MAC 網路模式,往上使用 IP 搭配 ARP 建構網路,且網路本身沒有學習機制,容易產生環狀網路,這又要透過 STP 協定解決,增加了網路的複雜度

Socket Direct Protocol (SDP)

Trail: Sockets Direct Protocol (The Java™ Tutorials)

Java 7 Sockets Direct Protocol – Write Once, Run Everywhere …. and Run (Some Places) Blazingly - InfoQ

SDP

在 JDK 7 裡面,就提供了一種不同於 TCP/IP 傳統的 Socket 網路的 Socket Direct Protocol,SDP 能夠透過 InifiBand 的 Remote Direct Memory Access (RDMA) ,以低延遲的方法,不透過 OS,遠端存取其他電腦的記憶體。

透過這張圖的說明,可以了解為什麼 SDP 能夠提供高速的通訊,傳統的 TCP/IP 需要一層一層接過 Application Layer -> Transport Layer -> Network Layer -> Physical Layer,SDP 則是從 Application 一步直接穿到 RDMA enabled channel adapter card

參考這個連結的圖片: SDP

References

InfiniBand - 維基百科,自由的百科全書

InfiniBand與以太網:它們是什麼? | 飛速(FS)社區

InfiniBand之技術架構介紹

2024/09/02

Quartz

Quartz 是 java 的 job-scheduling framwork,可使用類似 linux 的 crontab 方式設定定時啟動某個工作。

pom.xml

        <!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz -->
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.3.2</version>
        </dependency>

API

quartz API 的核心是 scheduler,啟動後,Scheduler 會產生多個 worker threads,也就是利用 ThreadPool,執行 Jobs。

主要的 API interface

  • Scheduler

  • Job

  • JobDetail

    • 定義 job instances
  • Trigger

    • 決定 scheduler 的定時機制

    • SimpleTrigger 跟 CronTrigger 兩種

  • JobBuilder

    • 用來產生 JobDetail instances
  • TriggerBuilder

    • 用來產生 Trigger instances

Example

SimpleJob.java

package quartz;

import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

// 要實作 Job interface
public class SimpleJob implements Job {
    // trigger 會透過 scheduler 的 worker 呼叫 execute 執行 job
    // JobExecutionContext 可提供 runtime environment 資訊
    public void execute(JobExecutionContext context) throws JobExecutionException {
        // 在 產生 JobDetail 時,JobData 會透過 JobDataMap 傳進 Job
        // Job 利用 JobDataMap 取得 JobData
        JobDataMap dataMap = context.getJobDetail().getJobDataMap();
        String jobSays = dataMap.getString("jobSays");
        float myFloatValue = dataMap.getFloat("myFloatValue");

        System.out.println("Job says: " + jobSays + ", and val is: " + myFloatValue);
    }
}

JobA.java

@DisallowConcurrentExecution 可限制 Job 不會同時被執行兩次

package quartz;

import org.quartz.*;

@DisallowConcurrentExecution
public class JobA implements Job {

    public void execute(JobExecutionContext context) throws JobExecutionException {
        TriggerKey triggerKey= context.getTrigger().getKey();
        System.out.println("job A. triggerKey="+triggerKey.getName()+", group="+triggerKey.getGroup() + ", fireTime="+context.getFireTime());
    }

}

JobB.java

package quartz;

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.TriggerKey;

public class JobB implements Job {

    public void execute(JobExecutionContext context) throws JobExecutionException {
        TriggerKey triggerKey= context.getTrigger().getKey();
        System.out.println("job B. triggerKey="+triggerKey.getName()+", group="+triggerKey.getGroup() + ", fireTime="+context.getFireTime());
    }

}

QuartzExample.java

package quartz;

import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;

public class QuartzExample {

    public static void main(String args[]) {
        try {
            // 透過 SchedulerFactory 產生 Scheduler
            SchedulerFactory schedFact = new StdSchedulerFactory();
            Scheduler sched = schedFact.getScheduler();
            // 使用 start 啟動,使用 shutdown 停掉這個 scheduler
            // 要先將 job, trigger 綁定 scheduler 後,再啟動 scheduler
            // sched.start();
            // sched.shutdown();

            // 產生 SimpleJob,利用 JobData 傳入資料到 Job
            JobDetail job = JobBuilder.newJob(SimpleJob.class)
                                      .withIdentity("myJob", "group1")
                                      .usingJobData("jobSays", "Hello World!")
                                      .usingJobData("myFloatValue", 3.141f)
                                      .build();

            Trigger trigger = TriggerBuilder.newTrigger()
                                            .withIdentity("myTrigger", "group1")
                                            .startNow()
                                            .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(40).repeatForever())
                                            .build();

            ///////
            JobDetail jobA = JobBuilder.newJob(JobA.class)
                                       .withIdentity("jobA", "group2")
                                       .build();
            JobDetail jobB = JobBuilder.newJob(JobB.class)
                                       .withIdentity("jobB", "group2")
                                       .build();

            // triggerA 比 triggerB 有較高的 priority,比較早先被執行
            // 每 40s 啟動一次
            Trigger triggerA = TriggerBuilder.newTrigger()
                                             .withIdentity("triggerA", "group2")
                                             .startNow()
                                             .withPriority(15)
                                             .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(40).repeatForever())
                                             .build();

            // 每 20s 啟動一次
            Trigger triggerB = TriggerBuilder.newTrigger()
                                             .withIdentity("triggerB", "group2")
                                             .startNow()
                                             .withPriority(10)
                                             .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(20).repeatForever())
                                             .build();

            sched.scheduleJob(job, trigger);
            sched.scheduleJob(jobA, triggerA);
            sched.scheduleJob(jobB, triggerB);
            sched.start();

        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

}

執行結果

Job says: Hello World!, and val is: 3.141
job A. triggerKey=triggerA, group=group2, fireTime=Wed Aug 28 16:16:37 CST 2024
job B. triggerKey=triggerB, group=group2, fireTime=Wed Aug 28 16:16:37 CST 2024
job B. triggerKey=triggerB, group=group2, fireTime=Wed Aug 28 16:16:57 CST 2024
Job says: Hello World!, and val is: 3.141
job A. triggerKey=triggerA, group=group2, fireTime=Wed Aug 28 16:17:17 CST 2024
job B. triggerKey=triggerB, group=group2, fireTime=Wed Aug 28 16:17:17 CST 2024

QuartzExample2.java

package quartz;

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;

public class QuartzExample2 {

    public static void main(String args[]) {
        try {
            SchedulerFactory schedFact = new StdSchedulerFactory();
            Scheduler sched = schedFact.getScheduler();

            // 產生 SimpleJob,利用 JobData 傳入資料到 Job
            JobDetail job = JobBuilder.newJob(SimpleJob.class)
                                      .withIdentity("myJob", "group1")
                                      .usingJobData("jobSays", "Hello World!")
                                      .usingJobData("myFloatValue", 3.141f)
                                      .build();

            Trigger trigger = TriggerBuilder.newTrigger()
                                            .withIdentity("myTrigger", "group1")
                                            .startNow()
                                            .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(40).repeatForever())
                                            .build();

            // 跟上面的 Scheduler 一樣
            // 如果 worker thread 不足,job 可能不會被執行
            // 當 quartz 發現有這種狀況時,會使用 .withMisfireHandlingInstructionFireNow() 這個規則
            // 在 misfire 時,馬上執行一次
            Trigger trigger2 = TriggerBuilder.newTrigger()
                                            .withIdentity("myTrigger", "group1")
                                            .startNow()
                                            .withSchedule(SimpleScheduleBuilder.simpleSchedule().withMisfireHandlingInstructionFireNow().withIntervalInSeconds(40).repeatForever())
                                            .build();

            sched.scheduleJob(job, trigger);
            sched.start();

        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

}

QuartzExample3.java

package quartz;

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;

import java.util.Date;

public class QuartzExample3 {

    public static void main(String args[]) {
        try {
            SchedulerFactory schedFact = new StdSchedulerFactory();
            Scheduler sched = schedFact.getScheduler();

            // 產生 SimpleJob,利用 JobData 傳入資料到 Job
            JobDetail jobA = JobBuilder.newJob(JobA.class)
                                       .withIdentity("jobA", "group2")
                                       .build();

            // SimpleTrigger 可設定在某個特定時間開始
            // 3 seconds 後啟動 trigger
            // 同樣可加上 SimpleScheduleBuilder 定時每 40s 啟動一次
            java.util.Calendar calendar = java.util.Calendar.getInstance();
            calendar.add(java.util.Calendar.SECOND, 3);
            Date myStartTime = calendar.getTime();
            SimpleTrigger trigger =
                    (SimpleTrigger) TriggerBuilder.newTrigger()
                                                  .withIdentity("trigger1", "group1")
                                                  .startAt(myStartTime)
                                                  .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(40).repeatForever())
                                                  .build();

            JobDetail jobB = JobBuilder.newJob(JobB.class)
                                       .withIdentity("jobB", "group1")
                                       .build();
            // 透過 CronScheduleBuilder 產生 scheduler
            // "0 0/1 * * * ?" 代表每分鐘執行一次
            // 然後產生 CronTrigger
            CronTrigger trigger2 = TriggerBuilder.newTrigger()
                                                 .withIdentity("trigger2", "group1")
                                                 .withSchedule(CronScheduleBuilder.cronSchedule("0 0/1 * * * ?"))
                                                 .build();

            sched.scheduleJob(jobA, trigger);
            sched.scheduleJob(jobB, trigger2);
            sched.start();

        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

}

執行結果

job B. triggerKey=trigger2, group=group1, fireTime=Wed Aug 28 16:11:00 CST 2024
job A. triggerKey=trigger1, group=group1, fireTime=Wed Aug 28 16:11:00 CST 2024
job A. triggerKey=trigger1, group=group1, fireTime=Wed Aug 28 16:11:40 CST 2024
job B. triggerKey=trigger2, group=group1, fireTime=Wed Aug 28 16:12:00 CST 2024
job A. triggerKey=trigger1, group=group1, fireTime=Wed Aug 28 16:12:20 CST 2024

References

Introduction to Quartz | Baeldung