2021/10/25

asyncio in python3

asyncio 是 python 3.5+ 用來寫 coroutine 的 library,並提供 async, await syntax sugar 語法

coroutine

subroutine 是從某一個進入點到另一個離開的點。coroutine 則是有 entered, exited 及多個 resumed points。可用 async/await 語法實作

如要判斷函數是否為 coroutine

asyncio.iscoroutinefunction(func)

如要判斷物件是否為 coroutine

asyncio.iscoroutine(obj)

sample

python 3.5+ 可使用 asyncio library 以及 await

python 3.7+ 可使用 asyncio.run()

import asyncio
import time

now = lambda: time.time()

async def dosomething(num):
    print('第 {} 任務,第一步'.format(num))
    await asyncio.sleep(2)
    print('第 {} 任務,第二步'.format(num))

if __name__ == "__main__":
    start = now()
    tasks = [dosomething(i) for i in range(5)]
    asyncio.run(asyncio.wait(tasks))
    print('TIME: ', now() - start)

執行結果

第 0 任務,第一步
第 1 任務,第一步
第 4 任務,第一步
第 3 任務,第一步
第 2 任務,第一步
第 0 任務,第二步
第 1 任務,第二步
第 4 任務,第二步
第 3 任務,第二步
第 2 任務,第二步
TIME:  2.0054779052734375

async await

async 用來宣告 funcition 有非同步執行的功能

  • async def 的函數裡面,不能跟 yield, yield from 一起使用

await 是標記 coroutine 可 suspend/resume 的 entry points

  • await 後面必須要是一個 coroutine 物件,或是 awaitable 類型的物件
  • await 的目的是將控制權回傳給 event loop,實現的方法是用 yield

awaitable 特性的物件有三種

  1. coroutines

    一個 async def 函數就是 coroutine

  2. tasks

    tasks 是可被調度 coroutines 的物件,可利用 asyncio.create_task() 包裝 coroutines

  3. futures

    是一個 asynchronous operation,處理完成回傳的結果

event loop

pythojn 3.5 用 asyncio.geteventloop 產生 event_loop,然後將 coroutine 放到 run_until_complete() 裡面,直到所有 coroutine 完成。

import asyncio

async def hello_world(x):
    print('hello_world x' + str(x))
    await asyncio.sleep(x)

loop = asyncio.get_event_loop()
loop.run_until_complete(hello_world(3))
loop.close()
  • loop.rununtilcomplete(coroutine)
    • 執行 coroutine,完成時,就可以關閉 event loop
  • loop.run_forever()
    • event loop 永遠不會關閉,直到呼叫 loop.stop() 為止

python 3.7+ 有更簡潔寫法,將 loop 封裝,然後用 asyncio.run() 執行就可以了

import asyncio

async def hello_world(x):
    print('hello_world x' + str(x))
    await asyncio.sleep(x)

asyncio.run(hello_world(3))

task

產生 task 的方法有兩種

  • asyncio.ensure_future() 在所有 python 都可以使用
  • asyncio.create_task() 在 python 3.7+ 可使用
import asyncio
import time

async def dosomething(num):
    print('start{}'.format(num))
    await asyncio.sleep(num)
    print('sleep{}'.format(num))

async def main():
    task1 = asyncio.create_task(dosomething(1))
    task2 = asyncio.create_task(dosomething(2))
    task3 = asyncio.create_task(dosomething(3))
    await task1
    await task2
    await task3

if __name__ == '__main__':
    time_start = time.time()
    asyncio.run(main())
    print(time.time() - time_start)

執行結果

start1
start2
start3
sleep1
sleep2
sleep3
3.0059001445770264

同時執行多個 tasks

利用 asyncio.gather() 可同時放入多個 coroutines 或 awaitable 物件進入 event loop

asyncio.gather( *aws, loop=None, return_exceptions=False)

  1. *aws :可傳入多個 awaitable objects
  2. Loop:此參數將會在 Python 3.10 移除
  3. return_exceptions:default 是 False,當發生 exception 時會立即中斷 task,如果設定為 True 則發生錯誤的訊息會與其他成功訊息一起回傳 (最終的 results 結果裡面包含了 ValueError() 結果)
import asyncio
import time

now = lambda: time.time()

async def dosomething(num):
    print('第 {} 任務,第一步'.format(num))
    await asyncio.sleep(2)
    print('第 {} 任務,第二步'.format(num))
    return '第 {} 任務完成'.format(num)

async def raise_error(num):
    raise ValueError
    print('這邊不會執行到')

async def main():
    tasks = [dosomething(i) for i in range(5)]
    tasks1 = [raise_error(i) for i in range(5)]

    results = await asyncio.gather(*tasks, *tasks1, return_exceptions=True)
    print(results)


if __name__ == "__main__":

    start = now()
    asyncio.run(main())
    print('TIME: ', now() - start)

執行結果

第 0 任務,第一步
第 1 任務,第一步
第 2 任務,第一步
第 3 任務,第一步
第 4 任務,第一步
第 0 任務,第二步
第 1 任務,第二步
第 2 任務,第二步
第 3 任務,第二步
第 4 任務,第二步
['第 0 任務完成', '第 1 任務完成', '第 2 任務完成', '第 3 任務完成', '第 4 任務完成', ValueError(), ValueError(), ValueError(), ValueError(), ValueError()]
TIME:  2.0041749477386475

chaining coroutines

可將任務分成三個 coroutines,當第一個完成,會將結果傳入第二個,然後是第三個。最終會透過 asyncio.gather 儲存 coroutine 的結果

import asyncio
import time
import random

now = lambda: time.time()

# 任務一
async def step_one(num, n):
    time_sleep = random.randint(0, 1)
    print('Task {},step one, sleep {}'.format(n, time_sleep))
    await asyncio.sleep(time_sleep)
    print('Task {},step one, wakeup'.format(n))
    num += 1
    return num

# 任務二
async def step_two(num, n):
    time_sleep = random.randint(0, n)
    print('Task {},step two, sleep {}'.format(n, time_sleep))

    await asyncio.sleep(time_sleep)
    print('Task {},step two, wakeup'.format(n))
    num += 2
    return num

# 任務三
async def step_three(num, n):
    time_sleep = random.randint(0, n)
    print('Task {},step three, sleep {}'.format(n, time_sleep))
    await asyncio.sleep(time_sleep)
    print('Task {},step three, wakeup'.format(n))
    num += 3
    return [n, num]

# 任務調度
async def asyncio_chain(n):
    s1 = await step_one(n, n)
    s2 = await step_two(s1, n)
    s3 = await step_three(s2, n)
    return s3

# 收集任務結果
async def main():
    tasks = [asyncio_chain(n) for n in range(3)]
    result = await asyncio.gather(*tasks)
    print(result)


if __name__ == "__main__":

    start = now()
    asyncio.run(main())
    print('TIME: ', now() - start)

執行結果

Task 0,step one, sleep 0
Task 1,step one, sleep 1
Task 2,step one, sleep 1
Task 0,step one, wakeup
Task 0,step two, sleep 0
Task 0,step two, wakeup
Task 0,step three, sleep 0
Task 0,step three, wakeup
Task 1,step one, wakeup
Task 1,step two, sleep 1
Task 2,step one, wakeup
Task 2,step two, sleep 2
Task 1,step two, wakeup
Task 1,step three, sleep 1
Task 2,step two, wakeup
Task 2,step three, sleep 0
Task 2,step three, wakeup
Task 1,step three, wakeup
[[0, 6], [1, 7], [2, 8]]
TIME:  3.0117080211639404

coroutine with queue

將 tasks 分為 producer 與 consumer 兩種角色,但 producer 不直接跟 cosumer 連接,而是透過 queue 傳遞資料。

producers 可任意將多個 task 填入 queue,然後由多個 cosumers 由 queue 取出並執行。

python 標準函式庫有四種 queues

  1. queue.Queue
  2. asyncio.Queue
  3. multiprocessing.Queue
  4. collections.deque

asyncio.Queue 使用時要注意,並不是 thread-safe

  • q = asyncio.Queue(maxsize=num_consumers):限制 queue 的大小
  • q.put()
  • q.get()
  • q.task_done():在完成一項工作之後,向queue發送一個信號
  • q.join():等到queue為空,再執行後續操作
  • q.qsize():回傳 queue size
  • q.empty(): 如果 queue 為空,回傳 True,反之 False
  • q.full(): 如果 queue is full,回傳 True,反之 False
import asyncio

# consumer
async def consumer(q, n):
    print('consumer {}: starting...'.format(n))
    while True:
        print('consumer {}: prepare to get'.format(n))
        item = await q.get()
        print('consumer {}: get {}'.format(n, item))
        await asyncio.sleep(0.15)
        # 完成一項工作,發送訊號給 queue,表示可再容納一個新的 task
        q.task_done()
    print('consumer {}: ending'.format(n))

# producer A
async def producer_a(q, num_products):
    print('producer A: starting producer A...')
    for i in range(num_products):
        await q.put('A ' + str(i))
        print('producer A: create A {}'.format(i))
        await asyncio.sleep(0.01)

# producer B
async def producer_b(q, num_products):
    print('producer B: starting producer B...')
    for i in range(num_products):
        await q.put('B ' + str(i))
        print('producer B: create B {}'.format(i))
        await asyncio.sleep(0.02)

# 任務調度
async def main(num_consumers, num_products):
    # 以 consumers 的數量為 queue size 上限,當 queue 裡面的 items 數量到達上限,就無法再放入 item
    q = asyncio.Queue(maxsize=num_consumers)

    # 產生兩個 producer tasks,每一個 producer 都會產生 num_products 個 products,放入 q
    prod_a = [asyncio.create_task(producer_a(q, num_products))]
    prod_b = [asyncio.create_task(producer_b(q, num_products))]

    # 產生
    consumers = [
        asyncio.create_task(consumer(q, i)) for i in range(num_consumers)
    ]


    # 測試 coroutine 的 cancel
    # await asyncio.sleep(0.2)
    # for c in consumers:
    #     c.cancel()

    await asyncio.gather(*prod_a, *prod_b)
    print('producers All: ending')

    # 表示 queue 裡面已經沒有 product tasks
    await q.join()
    print('consumers All: ending')

    # 以 cancel 確認將所有 consumers 取消,否則 consumer 會一直等待 queue
    for c in consumers:
        c.cancel()

# main(消費者數量, products 數量)
asyncio.run(main(3, 4))

執行結果

producer A: starting producer A...
producer A: create A 0
producer B: starting producer B...
producer B: create B 0
consumer 0: starting...
consumer 0: prepare to get
consumer 0: get A 0
consumer 1: starting...
consumer 1: prepare to get
consumer 1: get B 0
consumer 2: starting...
consumer 2: prepare to get
producer A: create A 1
consumer 2: get A 1
producer B: create B 1
producer A: create A 2
producer A: create A 3
consumer 0: prepare to get
consumer 0: get B 1
consumer 1: prepare to get
consumer 1: get A 2
producer B: create B 2
consumer 2: prepare to get
consumer 2: get A 3
producer B: create B 3
producers All: ending
consumer 0: prepare to get
consumer 0: get B 2
consumer 1: prepare to get
consumer 1: get B 3
consumer 2: prepare to get
consumer 0: prepare to get
consumer 1: prepare to get
consumers All: ending

futures

future 是一個特殊的 awaitable 物件,代表某個 asynchronous operation 的 eventual result

當有一個 Future 物件在 awaited 狀態,表示 coroutine 會一直 wait,直到 Future is resolved

Future 物件用在 async/wait 的 callback-based code

通常在 application level 不需要自行產生 Future object

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

這是自行產生 future 物件的 sample

def mark_done(future, result):
    print('setting future result to {!r}'.format(result))
    future.set_result(result)

async def main():
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    print('scheduling mark_done')
    # 在 event loop 呼叫 mark_done,mark_done 完成時,
    # 會透過 future 物件的 set_result 將結果傳回來
    loop.call_soon(mark_done, future, 'the result')
    
    print('suspending the coroutine')
    result = await future
    print('awaited result: {!r}'.format(result))
    print('future result: {!r}'.format(future.result()))
    return result

if __name__ == '__main__':
    print('entering the event loop')
    result = asyncio.run(main())
    print('returned result: {!r}'.format(result))

References

[Python爬蟲教學]整合asyncio與aiohttp打造Python非同步網頁爬蟲

Python Asyncio 協程(二)

【Python教學】淺談 Coroutine 協程使用方法

asyncio由簡入繁

python的asyncio模組(一):異步執行的好處

2021/10/18

Piping-server

piping-server 可透過 http/https 在 devices 之間傳遞任意資料,可單獨傳送文字或是檔案。也可以使用串流的方式,無限制一直傳送資料。透過這個機制,能夠進一步實現 Text stream chat, screen share, dawing share, E2E encryption file transfer, Web ssh, WebVNC。經過實驗,可持續傳送 64 天的資料,大約傳送了 1PB。

public servers

如果不自行安裝,目前有這些 public servers 可直接測試使用

測試

傳送文字

# Send
echo 'hello, world' | curl -T - https://ppng.io/hello

# Get
curl https://ppng.io/hello > hello.txt

server url (/hello) 路徑可任意指定,將資料透過 PUT/POST,然後用 GET 取得資料,傳送或接收任一方都可以先發起。

傳送檔案

# send
curl -T file https://ppng.io/file
# send with linux pipe
cat file | curl -T - https://ppng.io/file

# receive
curl https://ppng.io/file > file

傳送資料夾

# send with tar
tar zfcp - ~/mydirectory | curl -T - https://ppng.io/dir

# receive
curl https://ppng.io/dir > dir.tar

##########
# send with zip
zip -q -r - ~/mydirctory | curl -T - https://ppng.io/dir

# receive
curl https://ppng.io/dir > dir.zip

傳送檔案並加密

# send
cat file | openssl aes-256-cbc | curl -T - https://ppng.io/encfile

# receive
curl https://ppng.io/encfile | openssl aes-256-cbc -d > file2

指定多個接收端

注意發送端會等待兩個接收端都連上,才會開始進行資料傳輸,否則,當第一個接收端連上時,還是會在等待的狀態

#send
seq 10 | curl -T - https://ppng.io/seq?n=2

# 在兩個 terminal receive
curl https://ppng.io/seq?n=2
curl https://ppng.io/seq?n=2

portable execution

piping-server-pkg/releases 可下載平台的 binary package 執行檔,然後就能直接使用

以下是用 piping-server-linux 搭配 Let's Encrypt 的 SSL 憑證的啟動方式

./piping-server-linux --http-port 9000 --enable-https --https-port 9001 --key-path /etc/letsencrypt/live/testserver.com.tw/privkey.pem --crt-path /etc/letsencrypt/live/testserver.com.tw/fullchain.pem

線上直接查詢 server 的版本及 help

curl https://ppng.io/help

curl https://ppng.io/version

applications

screen share

draw web

References

Piping Server:實現設備間通過純 HTTP 無限傳輸數據

Transfer Files Between Any Devices Using Piping Server

piping-server-streaming-upload-htmls

2021/10/04

RTCDataChannel Sample

Transmit text

Transmit text

單獨一個網頁,自己產生 local 及 remote peer connection,然後兩個產生 data channel 直接互相連接

    <div id="buttons">
        <button id="startButton">Start</button>
        <button id="sendButton" disabled>Send</button>
        <button id="closeButton" disabled>Stop</button>
    </div>

    <div id="sendReceive">
        <div id="send">
            <h2>Send</h2>
            <textarea id="dataChannelSend" disabled
                      placeholder="Press Start, enter some text, then press Send."></textarea>
        </div>
        <div id="receive">
            <h2>Receive</h2>
            <textarea id="dataChannelReceive" disabled></textarea>
        </div>
    </div>
/*
 *  Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree.
 */

'use strict';

let localConnection;
let remoteConnection;
let sendChannel;
let receiveChannel;
const dataChannelSend = document.querySelector('textarea#dataChannelSend');
const dataChannelReceive = document.querySelector('textarea#dataChannelReceive');
const startButton = document.querySelector('button#startButton');
const sendButton = document.querySelector('button#sendButton');
const closeButton = document.querySelector('button#closeButton');

startButton.onclick = createConnection;
sendButton.onclick = sendData;
closeButton.onclick = closeDataChannels;

function enableStartButton() {
  startButton.disabled = false;
}

function disableSendButton() {
  sendButton.disabled = true;
}

function createConnection() {
  dataChannelSend.placeholder = '';
  const servers = null;

  // 產生 local peer connection
  window.localConnection = localConnection = new RTCPeerConnection(servers);
  console.log('Created local peer connection object localConnection');

  // 由 localConnection 產生 DataChannel
  sendChannel = localConnection.createDataChannel('sendDataChannel');
  console.log('Created send data channel');

  // 2個 callback function: onIceCandidate, onSendChannelStateChange
  localConnection.onicecandidate = e => {
    onIceCandidate(localConnection, e);
  };
  sendChannel.onopen = onSendChannelStateChange;
  sendChannel.onclose = onSendChannelStateChange;


  //////////////
  // remote peer connection
  window.remoteConnection = remoteConnection = new RTCPeerConnection(servers);
  console.log('Created remote peer connection object remoteConnection');

  // 2 個 callback function: onIceCandidate, receiveChannelCallback
  remoteConnection.onicecandidate = e => {
    onIceCandidate(remoteConnection, e);
  };
  remoteConnection.ondatachannel = receiveChannelCallback;

  ///////////
  // 由 localConnection 產生 offer -> setLocalDescription
  localConnection.createOffer().then(
      gotDescription1,
      onCreateSessionDescriptionError
  );
  startButton.disabled = true;
  closeButton.disabled = false;
}

function onCreateSessionDescriptionError(error) {
  console.log('Failed to create session description: ' + error.toString());
}

function sendData() {
  // 透過 send data channel 發送訊息
  const data = dataChannelSend.value;
  sendChannel.send(data);
  console.log('Sent Data: ' + data);
}

function closeDataChannels() {
  console.log('Closing data channels');
  sendChannel.close();
  console.log('Closed data channel with label: ' + sendChannel.label);
  receiveChannel.close();
  console.log('Closed data channel with label: ' + receiveChannel.label);
  localConnection.close();
  remoteConnection.close();
  localConnection = null;
  remoteConnection = null;
  console.log('Closed peer connections');
  startButton.disabled = false;
  sendButton.disabled = true;
  closeButton.disabled = true;
  dataChannelSend.value = '';
  dataChannelReceive.value = '';
  dataChannelSend.disabled = true;
  disableSendButton();
  enableStartButton();
}

function gotDescription1(desc) {
  // 設定 local connection 的 local description
  localConnection.setLocalDescription(desc);
  console.log(`Offer from localConnection\n${desc.sdp}`);

  // 將 local description 直接設定給 remote connection 的 remote description
  remoteConnection.setRemoteDescription(desc);
  // 由 remote connection 產生 answer, 呼叫 gotDescription2
  remoteConnection.createAnswer().then(
      gotDescription2,
      onCreateSessionDescriptionError
  );
}

function gotDescription2(desc) {
  // 設定 remote connection 的 local description
  remoteConnection.setLocalDescription(desc);
  console.log(`Answer from remoteConnection\n${desc.sdp}`);

  // 設定 local connection 的 remote description
  localConnection.setRemoteDescription(desc);
}


//////////////////
function getOtherPc(pc) {
  return (pc === localConnection) ? remoteConnection : localConnection;
}

function getName(pc) {
  return (pc === localConnection) ? 'localPeerConnection' : 'remotePeerConnection';
}

function onIceCandidate(pc, event) {
  // 取得 local connection 的 ice candidate, 指定給 remote connection 的 addIceCandidate
  // 取得 remote connection 的 ice candidate, 指定給 local connection 的 addIceCandidate

  getOtherPc(pc)
      .addIceCandidate(event.candidate)
      .then(
          onAddIceCandidateSuccess,
          onAddIceCandidateError
      );
  console.log(`${getName(pc)} ICE candidate: ${event.candidate ? event.candidate.candidate : '(null)'}`);
}

function onAddIceCandidateSuccess() {
  console.log('AddIceCandidate success.');
}

function onAddIceCandidateError(error) {
  console.log(`Failed to add Ice Candidate: ${error.toString()}`);
}

function receiveChannelCallback(event) {
  // 產生 receive data channel
  console.log('Receive Channel Callback');
  receiveChannel = event.channel;
  receiveChannel.onmessage = onReceiveMessageCallback;
  receiveChannel.onopen = onReceiveChannelStateChange;
  receiveChannel.onclose = onReceiveChannelStateChange;
}

function onReceiveMessageCallback(event) {
  // 收到訊息
  console.log('Received Message');
  dataChannelReceive.value = event.data;
}

function onSendChannelStateChange() {
  const readyState = sendChannel.readyState;
  console.log('Send channel state is: ' + readyState);
  if (readyState === 'open') {
    // 當 local connection 的 data channel 為 open 時, 改變 UI 元件的狀態
    dataChannelSend.disabled = false;
    dataChannelSend.focus();
    sendButton.disabled = false;
    closeButton.disabled = false;
  } else {
    dataChannelSend.disabled = true;
    sendButton.disabled = true;
    closeButton.disabled = true;
  }
}

function onReceiveChannelStateChange() {
  //
  const readyState = receiveChannel.readyState;
  console.log(`Receive channel state is: ${readyState}`);
}

Transfer a file

    <section>
      <div >
        <form id="fileInfo">
          <input type="file" id="fileInput" name="files"/>
        </form>
        <button disabled id="sendFile">Send</button>
        <button disabled id="abortButton">Abort</button>
      </div>

      <div class="progress">
        <div class="label">Send progress: </div>
        <progress id="sendProgress" max="0" value="0"></progress>
      </div>

      <div class="progress">
        <div class="label">Receive progress: </div>
        <progress id="receiveProgress" max="0" value="0"></progress>
      </div>

      <div id="bitrate"></div>
      <a id="download"></a>
      <span id="status"></span>

    </section>
/* eslint no-unused-expressions: 0 */
/*
 *  Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree.
 */
'use strict';

let localConnection;
let remoteConnection;
let sendChannel;
let receiveChannel;
let fileReader;
const bitrateDiv = document.querySelector('div#bitrate');
const fileInput = document.querySelector('input#fileInput');
const abortButton = document.querySelector('button#abortButton');
const downloadAnchor = document.querySelector('a#download');
const sendProgress = document.querySelector('progress#sendProgress');
const receiveProgress = document.querySelector('progress#receiveProgress');
const statusMessage = document.querySelector('span#status');
const sendFileButton = document.querySelector('button#sendFile');

let receiveBuffer = [];
let receivedSize = 0;

let bytesPrev = 0;
let timestampPrev = 0;
let timestampStart;
let statsInterval = null;
let bitrateMax = 0;

sendFileButton.addEventListener('click', () => createConnection());
fileInput.addEventListener('change', handleFileInputChange, false);
abortButton.addEventListener('click', () => {
  // 取消 fileReader
  if (fileReader && fileReader.readyState === 1) {
    console.log('Abort read!');
    fileReader.abort();
  }
});

async function handleFileInputChange() {
  // 選擇新的檔案
  const file = fileInput.files[0];
  if (!file) {
    console.log('No file chosen');
  } else {
    sendFileButton.disabled = false;
  }
}

async function createConnection() {
  // 發送檔案按鈕 -> 產生連線
  abortButton.disabled = false;
  sendFileButton.disabled = true;

  // local peer connection
  localConnection = new RTCPeerConnection();
  console.log('Created local peer connection object localConnection');

  // send data channel
  sendChannel = localConnection.createDataChannel('sendDataChannel');
  sendChannel.binaryType = 'arraybuffer';
  console.log('Created send data channel');

  // onSendChannelStateChange
  sendChannel.addEventListener('open', onSendChannelStateChange);
  sendChannel.addEventListener('close', onSendChannelStateChange);
  sendChannel.addEventListener('error', onError);

  // icecandidate
  localConnection.addEventListener('icecandidate', async event => {
    console.log('Local ICE candidate: ', event.candidate);
    // 直接將 local connection 的 icecandidate 加入 remote connection
    await remoteConnection.addIceCandidate(event.candidate);
  });


  //////////////
  // remote peer connection
  remoteConnection = new RTCPeerConnection();
  console.log('Created remote peer connection object remoteConnection');

  // icecandidate
  remoteConnection.addEventListener('icecandidate', async event => {
    console.log('Remote ICE candidate: ', event.candidate);
    // 直接將 remote connection 的 icecandidate 加入 local connection
    await localConnection.addIceCandidate(event.candidate);
  });
  remoteConnection.addEventListener('datachannel', receiveChannelCallback);


  // 由 local connection 產生 offer, 取得 sdp description
  try {
    const offer = await localConnection.createOffer();
    await gotLocalDescription(offer);
  } catch (e) {
    console.log('Failed to create session description: ', e);
  }

  fileInput.disabled = true;
}

function sendData() {
  const file = fileInput.files[0];
  console.log(`File is ${[file.name, file.size, file.type, file.lastModified].join(' ')}`);

  // Handle 0 size files.
  statusMessage.textContent = '';
  downloadAnchor.textContent = '';
  if (file.size === 0) {
    bitrateDiv.innerHTML = '';
    statusMessage.textContent = 'File is empty, please select a non-empty file';
    closeDataChannels();
    return;
  }
  sendProgress.max = file.size;
  receiveProgress.max = file.size;

  // chunkSize  16 kB = 16 * 1024
  const chunkSize = 16384;
  fileReader = new FileReader();
  let offset = 0;
  fileReader.addEventListener('error', error => console.error('Error reading file:', error));
  fileReader.addEventListener('abort', event => console.log('File reading aborted:', event));
  fileReader.addEventListener('load', e => {
    console.log('FileRead.onload ', e);

    // 透過 sendChannel 發送 fileReader 讀取到的 array buffer
    sendChannel.send(e.target.result);
    offset += e.target.result.byteLength;
    sendProgress.value = offset;
    if (offset < file.size) {
      // 當 offset 小於 file.size,就持續一直做 readSlice
      readSlice(offset);
    }
  });
  const readSlice = o => {
    console.log('readSlice ', o);
    // 以 file.slice 取得 file 的 某個區塊
    const slice = file.slice(offset, o + chunkSize);
    fileReader.readAsArrayBuffer(slice);
  };

  // 讀取 第 0 個 bytes, 每一次讀 chunkSize 16k bytes
  readSlice(0);
}

function closeDataChannels() {
  console.log('Closing data channels');
  sendChannel.close();
  console.log(`Closed data channel with label: ${sendChannel.label}`);
  sendChannel = null;
  if (receiveChannel) {
    receiveChannel.close();
    console.log(`Closed data channel with label: ${receiveChannel.label}`);
    receiveChannel = null;
  }
  localConnection.close();
  remoteConnection.close();
  localConnection = null;
  remoteConnection = null;
  console.log('Closed peer connections');

  // re-enable the file select
  fileInput.disabled = false;
  abortButton.disabled = true;
  sendFileButton.disabled = false;
}

async function gotLocalDescription(desc) {
  // local connection 的 desription 設定給 local description 以及 remote connection 的 remote description
  await localConnection.setLocalDescription(desc);
  console.log(`Offer from localConnection\n ${desc.sdp}`);
  await remoteConnection.setRemoteDescription(desc);
  try {

    // 產生 answer
    const answer = await remoteConnection.createAnswer();
    await gotRemoteDescription(answer);
  } catch (e) {
    console.log('Failed to create session description: ', e);
  }
}

async function gotRemoteDescription(desc) {
  // remote connection 的 description 設定為 local description,以及 local connection 的 remote description
  await remoteConnection.setLocalDescription(desc);
  console.log(`Answer from remoteConnection\n ${desc.sdp}`);
  await localConnection.setRemoteDescription(desc);
}

function receiveChannelCallback(event) {
  console.log('Receive Channel Callback');
  receiveChannel = event.channel;
  receiveChannel.binaryType = 'arraybuffer';
  receiveChannel.onmessage = onReceiveMessageCallback;
  receiveChannel.onopen = onReceiveChannelStateChange;
  receiveChannel.onclose = onReceiveChannelStateChange;

  receivedSize = 0;
  bitrateMax = 0;
  downloadAnchor.textContent = '';
  downloadAnchor.removeAttribute('download');
  if (downloadAnchor.href) {
    URL.revokeObjectURL(downloadAnchor.href);
    downloadAnchor.removeAttribute('href');
  }
}

function onReceiveMessageCallback(event) {
  // receive data channel 收到 16kB 資料
  console.log(`Received Message ${event.data.byteLength}`);

  // 把資料 push 到 receiveBuffer
  receiveBuffer.push(event.data);
  receivedSize += event.data.byteLength;
  receiveProgress.value = receivedSize;

  // we are assuming that our signaling protocol told
  // about the expected file size (and name, hash, etc).
  // 假設是透過 signaling protocol 取得 filename, size
  const file = fileInput.files[0];

  // 當接收到的 size 等於 file size
  if (receivedSize === file.size) {
    // 由 receiveBuffer 產生 Blob
    const received = new Blob(receiveBuffer);
    receiveBuffer = [];

    downloadAnchor.href = URL.createObjectURL(received);
    downloadAnchor.download = file.name;
    downloadAnchor.textContent =
      `Click to download '${file.name}' (${file.size} bytes)`;
    downloadAnchor.style.display = 'block';

    const bitrate = Math.round(receivedSize * 8 /
      ((new Date()).getTime() - timestampStart));
    bitrateDiv.innerHTML =
      `<strong>Average Bitrate:</strong> ${bitrate} kbits/sec (max: ${bitrateMax} kbits/sec)`;

    if (statsInterval) {
      // 清除 statsInterval
      clearInterval(statsInterval);
      statsInterval = null;
    }

    // 關閉 data channels
    closeDataChannels();
  }
}

function onSendChannelStateChange() {
  if (sendChannel) {
    const {readyState} = sendChannel;
    console.log(`Send channel state is: ${readyState}`);
    if (readyState === 'open') {

      // 當 sendChannel 的狀態改變為 open 時,就開始 sendData
      sendData();
    }
  }
}

function onError(error) {
  if (sendChannel) {
    console.error('Error in sendChannel:', error);
    return;
  }
  console.log('Error in sendChannel which is already closed:', error);
}

async function onReceiveChannelStateChange() {
  if (receiveChannel) {
    const readyState = receiveChannel.readyState;
    console.log(`Receive channel state is: ${readyState}`);
    if (readyState === 'open') {
      timestampStart = (new Date()).getTime();
      timestampPrev = timestampStart;
      statsInterval = setInterval(displayStats, 500);

      // 當 receive data channel 為 open 時,每 500ms 呼叫一次 displayStats
      await displayStats();
    }
  }
}

// display bitrate statistics.
async function displayStats() {
  if (remoteConnection && remoteConnection.iceConnectionState === 'connected') {
    // 如果 remoteConnection 還在連線中,取得 getStats
    const stats = await remoteConnection.getStats();
    let activeCandidatePair;
    stats.forEach(report => {
      if (report.type === 'transport') {
        activeCandidatePair = stats.get(report.selectedCandidatePairId);
      }
    });
    if (activeCandidatePair) {
      if (timestampPrev === activeCandidatePair.timestamp) {
        return;
      }
      // calculate current bitrate
      const bytesNow = activeCandidatePair.bytesReceived;
      const bitrate = Math.round((bytesNow - bytesPrev) * 8 /
        (activeCandidatePair.timestamp - timestampPrev));
      bitrateDiv.innerHTML = `<strong>Current Bitrate:</strong> ${bitrate} kbits/sec`;
      timestampPrev = activeCandidatePair.timestamp;
      bytesPrev = bytesNow;
      if (bitrate > bitrateMax) {
        bitrateMax = bitrate;
      }
    }
  }
}

Transfer data


    <section>
        <div id="button">
            <button id="sendTheData" type="button">Generate and send data</button>
        </div>
        <div class="input">
            <input type="number" id="megsToSend" min="1" name="megs" value="16"/>
            <label for="megsToSend">MB <b>(warning: very large values will potentially cause memory problems)</b></label>
            <div id="errorMsg"></div>
        </div>
        <div class="input">
            <input type="checkbox" id="ordered" checked>
            <label for="ordered">Ordered mode</label>
        </div>
        <div class="progress">
            <div class="label">Send progress:</div>
            <progress id="sendProgress" max="0" value="0"></progress>
        </div>

        <div class="progress">
            <div class="label">Receive progress:</div>
            <progress id="receiveProgress" max="0" value="0"></progress>
        </div>

        <div>
            <span id="transferStatus"></span>
        </div>
    </section>
/*
 *  Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree.
 */

'use strict';
// 256k = 256 * 1024
const MAX_CHUNK_SIZE = 262144;

let localConnection;
let remoteConnection;
let sendChannel;
let receiveChannel;
// chunkSize = Math.min(localConnection.sctp.maxMessageSize, MAX_CHUNK_SIZE);
let chunkSize;
// lowWaterMark = chunkSize; // A single chunk
let lowWaterMark;
let highWaterMark;
let dataString;
let timeoutHandle = null;
const megsToSend = document.querySelector('input#megsToSend');
const sendButton = document.querySelector('button#sendTheData');
const orderedCheckbox = document.querySelector('input#ordered');
const sendProgress = document.querySelector('progress#sendProgress');
const receiveProgress = document.querySelector('progress#receiveProgress');
const errorMessage = document.querySelector('div#errorMsg');
const transferStatus = document.querySelector('span#transferStatus');

let bytesToSend = 0;
let totalTimeUsedInSend = 0;
let numberOfSendCalls = 0;
let maxTimeUsedInSend = 0;
let sendStartTime = 0;
let currentThroughput = 0;

sendButton.addEventListener('click', createConnection);

// Prevent data sent to be set to 0.
megsToSend.addEventListener('change', function() {
  const number = this.value;
  if (Number.isNaN(number)) {
    errorMessage.innerHTML = `Invalid value for MB to send: ${number}`;
  } else if (number <= 0) {
    sendButton.disabled = true;
    errorMessage.innerHTML = '<p>Please enter a number greater than zero.</p>';
  } else if (number > 64) {
    // 限制小於 64 MB
    sendButton.disabled = true;
    errorMessage.innerHTML = '<p>Please enter a number lower or equal than 64.</p>';
  } else {
    errorMessage.innerHTML = '';
    sendButton.disabled = false;
  }
});

async function createConnection() {
  sendButton.disabled = true;
  megsToSend.disabled = true;

  const servers = null;

  // 計算要發送的資料量
  const number = Number.parseInt(megsToSend.value);
  bytesToSend = number * 1024 * 1024;

  // 產生 local peer connection
  localConnection = new RTCPeerConnection(servers);

  // 產生 local send data channel, 且設定 ordered 參數
  // Let's make a data channel!
  const dataChannelParams = {ordered: false};
  if (orderedCheckbox.checked) {
    dataChannelParams.ordered = true;
  }
  sendChannel = localConnection.createDataChannel('sendDataChannel', dataChannelParams);
  sendChannel.addEventListener('open', onSendChannelOpen);
  sendChannel.addEventListener('close', onSendChannelClosed);
  console.log('Created send data channel: ', sendChannel);

  console.log('Created local peer connection object localConnection: ', localConnection);

  // icecandidate callback
  localConnection.addEventListener('icecandidate', e => onIceCandidate(localConnection, e));

  /////////
  // remote peer connection
  remoteConnection = new RTCPeerConnection(servers);
  remoteConnection.addEventListener('icecandidate', e => onIceCandidate(remoteConnection, e));
  remoteConnection.addEventListener('datachannel', receiveChannelCallback);

  /////////
  try {
    // local peer connection 產生 offer
    const localOffer = await localConnection.createOffer();
    await handleLocalDescription(localOffer);
  } catch (e) {
    console.error('Failed to create session description: ', e);
  }

  transferStatus.innerHTML = 'Peer connection setup complete.';
}

function sendData() {
  // Stop scheduled timer if any (part of the workaround introduced below)
  if (timeoutHandle !== null) {
    clearTimeout(timeoutHandle);
    timeoutHandle = null;
  }

  let bufferedAmount = sendChannel.bufferedAmount;
  while (sendProgress.value < sendProgress.max) {
    transferStatus.innerText = 'Sending data...';
    const timeBefore = performance.now();

    sendChannel.send(dataString);

    const timeUsed = performance.now() - timeBefore;
    if (timeUsed > maxTimeUsedInSend) {
      maxTimeUsedInSend = timeUsed;
      totalTimeUsedInSend += timeUsed;
    }
    numberOfSendCalls += 1;
    bufferedAmount += chunkSize;
    sendProgress.value += chunkSize;

    // Pause sending if we reach the high water mark
    if (bufferedAmount >= highWaterMark) {
      // This is a workaround due to the bug that all browsers are incorrectly calculating the
      // amount of buffered data. Therefore, the 'bufferedamountlow' event would not fire.
      if (sendChannel.bufferedAmount < lowWaterMark) {
        timeoutHandle = setTimeout(() => sendData(), 0);
      }
      console.log(`Paused sending, buffered amount: ${bufferedAmount} (announced: ${sendChannel.bufferedAmount})`);
      break;
    }
  }

  if (sendProgress.value === sendProgress.max) {
    transferStatus.innerHTML = 'Data transfer completed successfully!';
  }
}

function startSendingData() {
  // 開始發送 data
  transferStatus.innerHTML = 'Start sending data.';
  sendProgress.max = bytesToSend;
  receiveProgress.max = sendProgress.max;
  sendProgress.value = 0;
  receiveProgress.value = 0;
  sendStartTime = performance.now();
  maxTimeUsedInSend = 0;
  totalTimeUsedInSend = 0;
  numberOfSendCalls = 0;
  sendData();
}

function maybeReset() {
  if (localConnection === null && remoteConnection === null) {
    sendButton.disabled = false;
    megsToSend.disabled = false;
  }
}

async function handleLocalDescription(desc) {
  // 處理 local peer connection 的 sdp description
  localConnection.setLocalDescription(desc);
  console.log('Offer from localConnection:\n', desc.sdp);
  remoteConnection.setRemoteDescription(desc);
  try {
    // 產生 answer
    const remoteAnswer = await remoteConnection.createAnswer();
    handleRemoteAnswer(remoteAnswer);
  } catch (e) {
    console.error('Error when creating remote answer: ', e);
  }
}

function handleRemoteAnswer(desc) {
  remoteConnection.setLocalDescription(desc);
  console.log('Answer from remoteConnection:\n', desc.sdp);
  localConnection.setRemoteDescription(desc);
}

function getOtherPc(pc) {
  return (pc === localConnection) ? remoteConnection : localConnection;
}

async function onIceCandidate(pc, event) {
  const candidate = event.candidate;
  if (candidate === null) {
    return;
  } // Ignore null candidates
  try {
    await getOtherPc(pc).addIceCandidate(candidate);
    console.log('AddIceCandidate successful: ', candidate);
  } catch (e) {
    console.error('Failed to add Ice Candidate: ', e);
  }
}

function receiveChannelCallback(event) {
  console.log('Receive Channel Callback');
  receiveChannel = event.channel;
  receiveChannel.binaryType = 'arraybuffer';
  receiveChannel.addEventListener('close', onReceiveChannelClosed);
  receiveChannel.addEventListener('message', onReceiveMessageCallback);
}

function onReceiveMessageCallback(event) {
  // 在 receiveChannel 收到 message
  receiveProgress.value += event.data.length;
  currentThroughput = receiveProgress.value / (performance.now() - sendStartTime);
  console.log('Current Throughput is:', currentThroughput, 'bytes/sec');

  // Workaround for a bug in Chrome which prevents the closing event from being raised by the
  // remote side. Also a workaround for Firefox which does not send all pending data when closing
  // the channel.
  if (receiveProgress.value === receiveProgress.max) {
    sendChannel.close();
    receiveChannel.close();
  }
}

function onSendChannelOpen() {
  // sendChannel 成功 open
  console.log('Send channel is open');

  chunkSize = Math.min(localConnection.sctp.maxMessageSize, MAX_CHUNK_SIZE);
  console.log('Determined chunk size: ', chunkSize);
  dataString = new Array(chunkSize).fill('X').join('');
  lowWaterMark = chunkSize; // A single chunk
  highWaterMark = Math.max(chunkSize * 8, 1048576); // 8 chunks or at least 1 MiB
  console.log('Send buffer low water threshold: ', lowWaterMark);
  console.log('Send buffer high water threshold: ', highWaterMark);
  sendChannel.bufferedAmountLowThreshold = lowWaterMark;

  sendChannel.addEventListener('bufferedamountlow', (e) => {
    console.log('BufferedAmountLow event:', e);
    sendData();
  });

  startSendingData();
}

function onSendChannelClosed() {
  console.log('Send channel is closed');
  localConnection.close();
  localConnection = null;
  console.log('Closed local peer connection');
  maybeReset();
  console.log('Average time spent in send() (ms): ' +
              totalTimeUsedInSend / numberOfSendCalls);
  console.log('Max time spent in send() (ms): ' + maxTimeUsedInSend);
  const spentTime = performance.now() - sendStartTime;
  console.log('Total time spent: ' + spentTime);
  console.log('MBytes/Sec: ' + (bytesToSend / 1000) / spentTime);
}

function onReceiveChannelClosed() {
  console.log('Receive channel is closed');
  remoteConnection.close();
  remoteConnection = null;
  console.log('Closed remote peer connection');
  maybeReset();
}

Trickle ICE

Trickle ICE

如果是 stun server,candidate 必須要取得 srflx

如果是 turn server,candidate 必須要取得 relay

Time Component Type Foundation Protocol Address Port Priority
0.004 rtp host 3868393361 udp 192.168.1.157 54299 126 | 32542 | 255
0.008 rtp srflx 1742403877 udp 220.132.127.162 64347 100 | 32542 | 255
0.064 rtp relay 2774939105 udp 211.72.214.206 41556 2 | 32542 | 255
0.107 rtp host 2819687265 tcp 192.168.1.157 9 90 | 32542 | 255
0.107 Done
0.109

References

WebRTC samples