1// Copyright 2024 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15import { TemplateResult } from 'lit'; 16 17export interface RPCPayload { 18 id: number; 19 type: string; 20 data: any; 21 streaming?: boolean; 22 close?: boolean; 23} 24export type RPCStreamHandler = (data: any) => void; 25export type RPCCallback = (data: any, streaming?: boolean) => void; 26export type RPCUnsubscribeStream = () => void; 27 28export abstract class RPCClient { 29 private callbacks: { [key: number]: RPCCallback } = {}; 30 private latest_call_id = 1; 31 public abstract send(payload: RPCPayload): void; 32 public call(requestType: string, data?: any): Promise<any> { 33 const payload: RPCPayload = { 34 id: this.latest_call_id++, 35 type: requestType, 36 data, 37 }; 38 return new Promise((resolve, _reject) => { 39 this.callbacks[payload.id] = resolve; 40 this.send(payload); 41 }); 42 } 43 44 public openStream( 45 requestType: string, 46 data: any, 47 streamHandler: RPCStreamHandler, 48 ): Promise<RPCUnsubscribeStream> { 49 const payload = { 50 id: this.latest_call_id++, 51 type: requestType, 52 data, 53 }; 54 55 const unsub = () => { 56 return new Promise((resolve, _reject) => { 57 this.callbacks[payload.id] = resolve; 58 this.send({ ...payload, close: true }); 59 }); 60 }; 61 return new Promise((resolve, _reject) => { 62 this.callbacks[payload.id] = streamHandler; 63 this.send(payload); 64 resolve(unsub); 65 }); 66 } 67 68 // Check if this is a response to RPC call 69 handleResponse(response: RPCPayload) { 70 try { 71 if (response.id && this.callbacks[response.id]) { 72 this.callbacks[response.id](response.data, response.streaming); 73 if (!response.streaming) delete this.callbacks[response.id]; 74 return true; 75 } else if (response.id) { 76 console.error('callback not found for', response); 77 } 78 } catch (e) { 79 console.log(e); 80 } 81 return false; 82 } 83} 84 85export class WebSocketRPCClient extends RPCClient { 86 private pendingCalls: RPCPayload[] = []; 87 private isConnected = false; 88 constructor(private ws: WebSocket) { 89 super(); 90 91 this.ws.onopen = () => { 92 this.isConnected = true; 93 if (this.pendingCalls.length > 0) { 94 this.pendingCalls.forEach(this.send.bind(this)); 95 } 96 }; 97 98 this.ws.onclose = () => { 99 this.isConnected = false; 100 }; 101 102 this.ws.onmessage = (event) => { 103 this.handleResponse(JSON.parse(event.data)); 104 }; 105 } 106 send(payload: RPCPayload) { 107 if (!this.isConnected) { 108 this.pendingCalls.push(payload); 109 return; 110 } 111 this.ws.send(JSON.stringify(payload)); 112 } 113} 114 115export interface EvalOutput { 116 stdin?: string; 117 stdout?: string; 118 stderr?: string; 119 result?: string | TemplateResult; 120} 121 122export interface AutocompleteSuggestion { 123 text: string; 124 type: string; 125} 126 127export type LogHandler = (text: string) => void; 128export type UnsubscribeLogStream = () => void; 129 130export abstract class ReplKernel { 131 abstract eval(code: string): Promise<EvalOutput>; 132 abstract autocomplete( 133 code: string, 134 cursorPos: number, 135 ): Promise<AutocompleteSuggestion[]>; 136} 137 138export class WebSocketRPCReplKernel extends ReplKernel { 139 constructor(private rpcClient: WebSocketRPCClient) { 140 super(); 141 } 142 eval(code: string) { 143 return this.rpcClient.call('eval', { code }); 144 } 145 autocomplete(code: string, cursorPos: number) { 146 return this.rpcClient.call('autocomplete', { code, cursor_pos: cursorPos }); 147 } 148} 149