Skip to content

Phase 3: Mesh v2拡張機能ブロックの実装 #453

@takaokouji

Description

@takaokouji

目的

smalruby3-guiにMesh v2拡張機能を実装し、既存Meshブロックと同じUI/UXを提供しつつ 、バックエンドにMesh v2システム(GraphQL)を使用します。Domain概念に対応し、URL パラメーター ?mesh=domain でDomainを指定可能にします。

既存Mesh機能との比較

機能 既存Mesh (SkyWay) Mesh v2 (GraphQL)
Domain指定 ?mesh=domain (最大10文字) ?mesh=domain (最大256文字)
ホスト作成 peer.joinRoom(peerId) createGroup(name, hostId, domain) Mutation
グループ検索 peer.listAllPeers() listGroupsByDomain(domain) Query
ピア参加 peer.joinRoom(hostPeerId) joinGroup(groupId, domain, nodeId) Mutation
変数共有 room.send({type: 'variable'}) reportDataByNode(groupId, domain, nodeId, data) Mutation
イベント通知 room.send({type: 'broadcast'}) fireEventByNode(groupId, domain, nodeId, eventName) Mutation
データ受信 room.on('data') onDataUpdateInGroup(groupId, domain) Subscription
イベント受信 room.on('data') onEventInGroup(groupId, domain) Subscription
ホスト退出 disconnect() → ルーム解散 leaveGroup(groupId, domain, nodeId) Mutation → グループ解散

重要な制限事項

  • AWSの費用を抑えるため、以下の制限を設ける
    • データ受信
      • 制限なし
    • データ送信
      • 1秒間に最大 4 回までとし、1回の送信後に250msは間隔をあけること。
      • 連続でデータに変化があった場合はデータを送信せず、ローカルの状態を変化させて、データ送信のタイミングで最新のデータを送ること
      • さらに、前回送信から値に変化があったときのみ、データを送信する。
    • イベント通知
      • イベント通知前に、かならずデータを送信を完了させること。
      • 1秒間に最大2回までとし、1回の送信後に500msは間隔をあけること。
      • 連続でイベント通知をする場合、イベント通知をする命令そのものをブロックすること。つまり、イベント通知自体の実行に時間がかかっている状況になる。
        • 処理をブロックする。
        • 別のスレッドでのイベント通知も同様として、あるクライアントのすべてのイベント通知がシングルキューで待たされる。
        • 例: 1回目のイベント通知Aは即時実行。その直後に次のイベント通知B、別スレ ッドでイベント通知Cを行うと、500msまたされてから2回目のイベント通知Bを行い、1000ms(1秒)またされてから別スレッドでイベント通知Cが発火する。
    • イベント受信
      • 制限なし
    • 継続してMesh v2に接続できる時間は最大90分とする。
      • 最初に接続してから90分後には、一度、ホスト、ノードともにグループを退出する
      • その時点でのデータもすべて喪失することになる。サーバーでも制限するが、クライアント (Mesh v2拡張機能)でもそのように振る舞うこと。

この制限を実現するために、あらたに、Mesh v2専用のイベント送信のブロックを追加す る。

  • ブロックの日本語の名称は「[メッセージ1▼]を送る」。イベントカテゴリの同名のブロックと同じもの。
  • このブロックのみ、Mesh v2機能を使って、他のノードにイベント通知する。
  • 互換性を保つため、イベントカテゴリの同名のブロックと「[メッセージ1▼]を送って待つ」でも、Mesh v2を使ってイベントを他のノードにも通知するが、そちらを使った場合 は、以下の挙動にする。
    • イベント送信用のキューに追加する。500msごとにキューを確認して、同じイベント の重複を削除したうえで、キューの一番上のイベントをMesh v2で送信する。
    • 同じイベントだとイベント発火が欠落してしまうが、それは制限ということにする。
    • Mesh v2のイベント送信と、従来のイベント送信のキューのタイミングが重なった場 合、両者を実行するタイミングが早い方を優先する。
      • 2025/12/21 10:10:01.000 Mesh v2のイベント送信
      • 2025/12/21 10:10:01.050 従来のイベント送信 (キューイング)
      • 2025/12/21 10:10:01.100 Mesh v2のイベント送信 (ブロック)
      • 2025/12/21 10:10:01.200 従来のイベント送信 (キューイング)
      • 2025/12/21 10:10:01.500 次のイベント送信のタイミングがくる
        • Mesh v2のイベント送信を優先するため、ブロックしていたMesh v2のイベント送信
      • 2025/12/21 10:10:02.000 次のイベント送信のタイミングがくる
        • ブロックしているMesh v2のイベント送信がないため、従来のイベント送信のキ ューを処理する
        • 従来のイベント送信のキューから同じイベントの重複を削除して、キューの一番上のものを送信する

タスク

Phase 1: 環境構築とGraphQLクライアント基盤

  • 依存ライブラリの選定とインストール (aws-appsync, graphql-tag 等)
  • URLパラメータ ?mesh=domain の取得・検証ロジック実装
    • 最大256文字
    • 不正文字処理
  • GraphQLクライアントの初期化設定 (Endpoint, API Key)
  • gui/scratch-vm/src/extensions/scratch3_mesh_v2/ ディレクトリ作成と初期化

Phase 2: Mesh v2 Service (Core Logic) 実装

  • MeshV2Service クラスの設計と実装
  • ホスト機能: createGroup の実装
  • ピア機能: listGroupsByDomain, joinGroup の実装
  • 退出処理: leaveGroup, dissolveGroup の実装
  • GraphQL Subscription (onDataUpdate, onEvent, onGroupDissolve) の接続管理
  • 90分接続制限タイマーの実装

Phase 3: レート制限とデータ通信ロジック

  • RateLimiter クラスの実装
    • データ送信: 4回/秒, 250ms間隔
    • イベント送信: 2回/秒, 500ms間隔
  • データ変更検出 (Diff) ロジックの実装
  • 変数共有: reportDataByNode の実装 (レート制限適用)
  • イベント通知: fireEventByNode の実装 (レート制限・優先度制御・データ送信待機)

Phase 4: Scratch VM拡張機能の実装

  • scratch3_mesh_v2 エントリーポイントの実装
  • 既存Meshブロック定義の移植とMesh v2 Serviceへの接続
  • Mesh v2専用ブロック「[メッセージ]を送る」の実装
  • センサー値取得ブロックの実装
  • scratch-vm への拡張機能登録

Phase 5: GUI統合とテスト

  • 接続モーダル (MeshModal) の改修 (グループ一覧表示対応)
  • GUIからの拡張機能読み込み設定
  • 統合テスト
    • Domain指定あり/なしの動作確認
    • レート制限の動作確認
    • ホスト・ピア間の通信確認

成果物

  • Mesh v2拡張機能コード
    • gui/scratch-vm/src/extensions/scratch3_mesh_v2/
  • GraphQLクライアント統合コード
  • 統合テスト結果

Domain処理実装例

URLパラメーター読み取り

// mesh-v2-service.js
class MeshV2Service {
    constructor(blocks, meshId) {
        this.blocks = blocks;
        this.meshId = meshId;

        // URLパラメーターからdomain取得
        this.domain = this.getDomainFromURL();

        // レート制限管理
        this.dataRateLimiter = new RateLimiter(4, 250); // 4回/秒、250msインターバル
        this.eventRateLimiter = new RateLimiter(2, 500); // 2回/秒、500msインタ ーバル

        // 90分接続制限タイマー
        this.connectionStartTime = Date.now();
        this.maxConnectionDuration = 90 * 60 * 1000; // 90分
        this.connectionTimer = setInterval(() => this.checkConnectionDuration(), 60000); // 1分ごとにチェック
    }

    getDomainFromURL() {
        const urlParams = new URLSearchParams(window.location.search);
        const meshParam = urlParams.get('mesh');

        if (!meshParam) {
            // domain未指定: nullを返す(バックエンドでグローバルIPを使用)
            return null;
        }

        // Mesh v2: より緩い制限(最大256文字)
        if (meshParam.length > 256) {
            throw new Error('Domain must be 256 characters or less');
        }

        return meshParam;
    }

    checkConnectionDuration() {
        const elapsed = Date.now() - this.connectionStartTime;
        if (elapsed >= this.maxConnectionDuration) {
            // 90分経過: グループ退出
            this.leaveGroup();
            clearInterval(this.connectionTimer);
        }
    }
}

レート制限実装例

// rate-limiter.js
class RateLimiter {
    constructor(maxPerSecond, intervalMs) {
        this.maxPerSecond = maxPerSecond;
        this.intervalMs = intervalMs;
        this.queue = [];
        this.lastSendTime = 0;
        this.processing = false;
    }

    async send(data, sendFunction) {
        return new Promise((resolve, reject) => {
            this.queue.push({ data, resolve, reject, sendFunction });
            this.processQueue();
        });
    }

    async processQueue() {
        if (this.processing || this.queue.length === 0) return;

        this.processing = true;

        while (this.queue.length > 0) {
            const now = Date.now();
            const timeSinceLastSend = now - this.lastSendTime;

            if (timeSinceLastSend < this.intervalMs) {
                // インターバル待機
                await new Promise(resolve => setTimeout(resolve, this.intervalMs - timeSinceLastSend));
            }

            const item = this.queue.shift();
            try {
                const result = await item.sendFunction(item.data);
                this.lastSendTime = Date.now();
                item.resolve(result);
            } catch (error) {
                item.reject(error);
            }
        }

        this.processing = false;
    }
}

GraphQL Mutation例(Domain対応)

// ホスト作成
async createGroup(name, hostId) {
    const result = await this.client.mutate({
        mutation: gql`
            mutation CreateGroup($name: String!, $hostId: ID!, $domain: String) {
                createGroup(name: $name, hostId: $hostId, domain: $domain) {
                    id
                    domain
                    fullId
                    name
                    hostId
                    createdAt
                }
            }
        `,
        variables: {
            name,
            hostId,
            domain: this.domain  // null の場合はバックエンドでIPを使用
        }
    });
    return result.data.createGroup;
}

// グループ検索
async listGroups() {
    const result = await this.client.query({
        query: gql`
            query ListGroupsByDomain($domain: String) {
                listGroupsByDomain(domain: $domain) {
                    id
                    domain
                    fullId
                    name
                    hostId
                }
            }
        `,
        variables: {
            domain: this.domain  // null の場合はバックエンドでIPを使用
        }
    });
    return result.data.listGroupsByDomain;
}

// データ送信(レート制限付き)
async reportData(groupId, nodeId, data) {
    // 変更検出
    if (this.isDataUnchanged(data)) {
        return; // 変更なし: 送信スキップ
    }

    // レート制限キューに追加
    return this.dataRateLimiter.send({ groupId, nodeId, data }, async (payload) => {
        await this.client.mutate({
            mutation: gql`
                mutation ReportDataByNode($groupId: ID!, $domain: String!, $nodeId: ID!, $data: [SensorDataInput!]!) {
                    reportDataByNode(groupId: $groupId, domain: $domain, nodeId: $nodeId, data: $data) {
                        nodeId
                        groupId
                        domain
                        data {
                            key
                            value
                        }
                        timestamp
                    }
                }
            `,
            variables: {
                groupId: payload.groupId,
                domain: this.domain,
                nodeId: payload.nodeId,
                data: payload.data.map(d => ({ key: d.key, value: d.value }))
            }
        });

        // 送信後の値を記録
        this.lastSentData = payload.data;
    });
}

// イベント送信(レート制限付き、データ送信完了待機)
async fireEvent(groupId, nodeId, eventName, payload) {
    // データ送信完了待機
    await this.dataRateLimiter.waitForCompletion();

    // レート制限キューに追加(ブロッキング)
    return this.eventRateLimiter.send({ groupId, nodeId, eventName, payload }, async (data) => {
        await this.client.mutate({
            mutation: gql`
                mutation FireEventByNode($groupId: ID!, $domain: String!, $nodeId: ID!, $eventName: String!, $payload: String) {
                    fireEventByNode(groupId: $groupId, domain: $domain, nodeId: $nodeId, eventName: $eventName, payload: $payload) {
                        name
                        firedByNodeId
                        groupId
                        domain
                        payload
                        timestamp
                    }
                }
            `,
            variables: {
                groupId: data.groupId,
                domain: this.domain,
                nodeId: data.nodeId,
                eventName: data.eventName,
                payload: data.payload
            }
        });
    });
}

GraphQL Subscription例(Domain対応)

// データ更新の受信
subscribeToDataUpdates(groupId, callback) {
    this.dataUpdateSubscription = this.client.subscribe({
        query: gql`
            subscription OnDataUpdateInGroup($groupId: ID!, $domain: String!) {
                onDataUpdateInGroup(groupId: $groupId, domain: $domain) {
                    nodeId
                    groupId
                    domain
                    data {
                        key
                        value
                    }
                    timestamp
                }
            }
        `,
        variables: {
            groupId,
            domain: this.domain
        }
    }).subscribe({
        next: (data) => callback(data.data.onDataUpdateInGroup)
    });
}

関連

🤖 Generated with Claude Code

Co-Authored-By: Claude noreply@anthropic.com

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    Status

    Backlog

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions