Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/rush/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"@rushstack/rush-amazon-s3-build-cache-plugin": "workspace:*",
"@rushstack/rush-azure-storage-build-cache-plugin": "workspace:*",
"@rushstack/rush-http-build-cache-plugin": "workspace:*",
"@rushstack/rush-serve-plugin": "workspace:*",
"@types/heft-jest": "1.0.1",
"@types/semver": "7.5.0"
},
Expand Down
1 change: 1 addition & 0 deletions apps/rush/src/start-dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ function includePlugin(pluginName: string, pluginPackageName?: string): void {
includePlugin('rush-amazon-s3-build-cache-plugin');
includePlugin('rush-azure-storage-build-cache-plugin');
includePlugin('rush-http-build-cache-plugin');
includePlugin('rush-serve-plugin');
// Including this here so that developers can reuse it without installing the plugin a second time
includePlugin('rush-azure-interactive-auth-plugin', '@rushstack/rush-azure-storage-build-cache-plugin');

Expand Down
2 changes: 2 additions & 0 deletions apps/rushd/bin/rushd
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/usr/bin/env node
require('../lib-commonjs/start.js');
7 changes: 7 additions & 0 deletions apps/rushd/config/rig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
// The "rig.json" file directs tools to look for their config files in an external package.
// Documentation for this system: https://www.npmjs.com/package/@rushstack/rig-package
"$schema": "https://developer.microsoft.com/json-schemas/rig-package/rig.schema.json",

"rigPackageName": "local-node-rig"
}
56 changes: 56 additions & 0 deletions apps/rushd/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{
"name": "@rushstack/rushd",
"version": "0.1.0",
"description": "A background daemon for Rush that keeps state hot between builds",
"keywords": [
"rush",
"daemon",
"monorepo",
"build"
],
"main": "./lib-commonjs/index.js",
"module": "./lib-esm/index.js",
"types": "./dist/rushd.d.ts",
"exports": {
".": {
"types": "./dist/rushd.d.ts",
"node": "./lib-commonjs/index.js",
"import": "./lib-esm/index.js",
"require": "./lib-commonjs/index.js"
},
"./package.json": "./package.json"
},
"repository": {
"type": "git",
"url": "https://github.com/microsoft/rushstack.git",
"directory": "apps/rushd"
},
"engines": {
"node": ">=10.0.0"
},
"engineStrict": true,
"homepage": "https://rushstack.io",
"scripts": {
"build": "heft build --clean",
"_phase:build": "heft run --only build -- --clean"
},
"bin": {
"rushd": "./bin/rushd"
},
"license": "MIT",
"dependencies": {
"@rushstack/node-core-library": "workspace:*",
"@rushstack/terminal": "workspace:*"
},
"devDependencies": {
"@rushstack/heft": "workspace:*",
"eslint": "~9.37.0",
"local-node-rig": "workspace:*",
"typescript": "~5.8.2",
"@types/node": "20.17.19"
},
"sideEffects": [
"lib-commonjs/start.js",
"lib-esm/start.js"
]
}
185 changes: 185 additions & 0 deletions apps/rushd/src/RushdClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license.
// See LICENSE in the project root for license information.

import * as net from 'node:net';

import {
type DaemonMessage,
type IBaseMessage,
type IDaemonStatusResponse,
type IPongResponse,
parseMessages,
serializeMessage
} from './RushdProtocol';
import { getPipePath, isDaemonAlive, readPidFile } from './RushdLifecycle';

export interface IRushdClientOptions {
workspaceRoot: string;
timeoutMs?: number;
}

/**
* Client for communicating with a running rushd daemon.
*/
export class RushdClient {
private readonly _workspaceRoot: string;
private readonly _timeoutMs: number;
private readonly _pipePath: string;

private _socket: net.Socket | undefined;
private _buffer: string = '';
private _messageHandler: ((message: DaemonMessage) => void) | undefined;

public constructor(options: IRushdClientOptions) {
this._workspaceRoot = options.workspaceRoot;
this._timeoutMs = options.timeoutMs ?? 5000;
this._pipePath = getPipePath(this._workspaceRoot);
}

/**
* Check if a daemon is running and reachable for this workspace.
*/
public async isDaemonRunningAsync(): Promise<boolean> {
const pid: number | undefined = readPidFile(this._workspaceRoot);
if (pid === undefined) {
return false;
}
return isDaemonAlive(this._workspaceRoot);
}

/**
* Connect to the running daemon.
*/
public async connectAsync(): Promise<void> {
if (this._socket) {
throw new Error('Already connected');
}

await new Promise<void>((resolve, reject) => {
const socket: net.Socket = net.connect(this._pipePath, () => {
this._socket = socket;
resolve();
});

socket.on('data', (data: Buffer) => {
this._buffer += data.toString();
const { messages, remainder } = parseMessages(this._buffer);
this._buffer = remainder;

for (const message of messages) {
if (this._messageHandler) {
this._messageHandler(message as DaemonMessage);
}
}
});

socket.on('error', (err: Error) => {
if (!this._socket) {
reject(new Error(`Failed to connect to rushd: ${err.message}`));
}
});

socket.setTimeout(this._timeoutMs, () => {
socket.destroy();
reject(new Error('Connection to rushd timed out'));
});
});
}

/**
* Disconnect from the daemon.
*/
public disconnect(): void {
if (this._socket) {
this._socket.end();
this._socket = undefined;
}
this._messageHandler = undefined;
this._buffer = '';
}

/**
* Send a message and wait for a single response.
*/
public async sendAsync(message: IBaseMessage): Promise<DaemonMessage> {
if (!this._socket) {
throw new Error('Not connected to rushd');
}

return new Promise<DaemonMessage>((resolve, reject) => {
const previousHandler: typeof this._messageHandler = this._messageHandler;

this._messageHandler = (response: DaemonMessage) => {
this._messageHandler = previousHandler;
resolve(response);
};

this._socket!.write(serializeMessage(message), (err?: Error) => {
if (err) {
this._messageHandler = previousHandler;
reject(new Error(`Failed to send message: ${err.message}`));
}
});
});
}

/**
* Send a message and stream responses until a terminal message (result or error) is received.
*/
public async sendAndStreamAsync(
message: IBaseMessage,
onMessage: (message: DaemonMessage) => void
): Promise<DaemonMessage> {
if (!this._socket) {
throw new Error('Not connected to rushd');
}

return new Promise<DaemonMessage>((resolve, reject) => {
const previousHandler: typeof this._messageHandler = this._messageHandler;

this._messageHandler = (response: DaemonMessage) => {
onMessage(response);

// Terminal messages end the stream
if (response.type === 'result' || response.type === 'error') {
this._messageHandler = previousHandler;
resolve(response);
}
};

this._socket!.write(serializeMessage(message), (err?: Error) => {
if (err) {
this._messageHandler = previousHandler;
reject(new Error(`Failed to send message: ${err.message}`));
}
});

this._socket!.on('end', () => {
this._messageHandler = previousHandler;
reject(new Error('Daemon disconnected unexpectedly'));
});
});
}

/**
* Convenience: ping the daemon.
*/
public async pingAsync(): Promise<IPongResponse> {
return (await this.sendAsync({ type: 'ping' })) as IPongResponse;
}

/**
* Convenience: get daemon status.
*/
public async getStatusAsync(): Promise<IDaemonStatusResponse> {
return (await this.sendAsync({ type: 'status' })) as IDaemonStatusResponse;
}

/**
* Convenience: request daemon shutdown.
*/
public async shutdownAsync(): Promise<void> {
await this.sendAsync({ type: 'shutdown' });
this.disconnect();
}
}
Loading