Commit 1dcffc35 authored by Anna Vovchenko's avatar Anna Vovchenko 🇺🇦
Browse files

feat: Add support for aborting watch request

Adds abortStream method
implemented within the  AbortController's abort function.
parent 86660c81
Loading
Loading
Loading
Loading
+7 −0
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@ export const EVENT_TIMEOUT = "timeout";
export const EVENT_TERMINATE = "terminate";
const EVENT_STRINGS = Object.freeze([
    EVENT_DATA,
    EVENT_PLAIN_TEXT,
    EVENT_ERROR,
    EVENT_TIMEOUT,
    EVENT_TERMINATE,
@@ -23,6 +24,7 @@ export class WatchApi extends BaseAPI {
        super(...arguments);
        this.data = [];
        this.eventListeners = {};
        this.requestController = new AbortController();
    }
    subscribeToStream(path, query) {
        this.getStream(path, query);
@@ -30,6 +32,9 @@ export class WatchApi extends BaseAPI {
            resolve(this);
        });
    }
    abortStream() {
        this.requestController.abort();
    }
    on(eventName, callback) {
        if (!this.eventListeners[eventName]) {
            this.eventListeners[eventName] = [];
@@ -92,6 +97,8 @@ export class WatchApi extends BaseAPI {
            method: "GET",
            headers: {},
            query,
        }, {
            signal: this.requestController.signal,
        });
        return response
            .then((res) => {

src/watch.spec.ts

0 → 100644
+200 −0
Original line number Diff line number Diff line
import {
  WatchApi,
  EVENT_DATA,
  EVENT_TERMINATE,
  EVENT_ERROR,
  EVENT_PLAIN_TEXT,
} from "./watch";
import {
  BaseAPI,
  Configuration,
  RequestContext,
  ResponseContext,
} from "../client/src/runtime";

describe("WatchApi", () => {
  const basePath = "https://test-api.com/k8s-proxy";
  const resourcePath = "/api/v1/namespace/default/pods";
  const query = { container: "my-container", watch: "true" };
  const headers = { "Content-Type": "application/json" };

  const createTestStream = (data: string[]): ReadableStream =>
    new ReadableStream({
      start(controller) {
        data.forEach((line) =>
          controller.enqueue(new TextEncoder().encode(line))
        );
        controller.close();
      },
    });

  let watchApi: WatchApi;
  let mockRequest: jest.Mock;
  let configuration: Configuration;

  beforeEach(() => {
    jest.useFakeTimers();
    mockRequest = jest.fn();
    jest
      .spyOn(BaseAPI.prototype, "request" as keyof BaseAPI)
      .mockImplementation(mockRequest);

    configuration = new Configuration({
      basePath: basePath,
      headers: headers,
    });
    watchApi = new WatchApi(configuration);
  });

  const initializeWatch = () => watchApi.subscribeToStream(resourcePath, query);

  describe("Basic API Operations", () => {
    describe("subscribeToStream", () => {
      it("returns the WatchApi instance", async () => {
        mockRequest.mockResolvedValueOnce(new Response());
        const result = await watchApi.subscribeToStream("/test", {});
        expect(result).toBe(watchApi);
      });

      it("makes request with correct parameters", async () => {
        mockRequest.mockResolvedValueOnce(new Response());
        await initializeWatch();

        expect(mockRequest).toHaveBeenCalledWith(
          {
            method: "GET",
            headers: {},
            path: resourcePath,
            query: query,
          },
          expect.objectContaining({
            signal: expect.any(AbortSignal),
          })
        );
      });
    });

    describe("abortStream", () => {
      it("aborts the request", async () => {
        const abortSpy = jest.spyOn(AbortController.prototype, "abort");
        mockRequest.mockImplementationOnce(() => new Promise(() => {}));
        watchApi.abortStream();
        expect(abortSpy).toHaveBeenCalled();
      });
    });
  });

  describe("Stream Events", () => {
    describe("JSON Data Handling", () => {
      it("emits data events for JSON content including added, modified, and deleted scenarios", async () => {
        const testData = [
          '{"type":"ADDED","object":{"metadata":{"uid":1, "name": "pod-1"}}}\n',
          '{"type":"ADDED","object":{"metadata":{"uid":2, "name": "pod-2"}}}\n',
          '{"type":"MODIFIED","object":{"metadata":{"uid":1, "name": "pod-1-modified"}}}\n',
          '{"type":"DELETED","object":{"metadata":{"uid":2, "name": "pod-2"}}}\n',
        ];

        const mockResponse = new Response(createTestStream(testData), {
          headers: headers,
        });
        mockRequest.mockResolvedValueOnce(mockResponse);

        const dataHandler = jest.fn();
        watchApi.on(EVENT_DATA, dataHandler);

        await initializeWatch();

        const expected = [
          [{ metadata: { uid: 1, name: "pod-1" } }],
          [
            { metadata: { uid: 1, name: "pod-1" } },
            { metadata: { uid: 2, name: "pod-2" } },
          ],
          [
            { metadata: { uid: 1, name: "pod-1-modified" } },
            { metadata: { uid: 2, name: "pod-2" } },
          ],
          [{ metadata: { uid: 1, name: "pod-1-modified" } }],
        ];

        for (const [index, expectedData] of expected.entries()) {
          await jest.runOnlyPendingTimers();
          expect(dataHandler).toHaveBeenNthCalledWith(index + 1, expectedData);
        }

        expect(dataHandler).toHaveBeenCalledTimes(expected.length);
      });
    });

    describe("Plain Text Handling", () => {
      it("emits plain text events", async () => {
        const testLines = ["line1\n", "line2\n", "line3\n"];
        const mockResponse = new Response(createTestStream(testLines), {
          headers: { "Content-Type": "text/plain" },
        });
        mockRequest.mockResolvedValueOnce(mockResponse);

        const textHandler = jest.fn();
        watchApi.on(EVENT_PLAIN_TEXT, textHandler);

        await initializeWatch();
        await jest.runAllTimersAsync();

        expect(textHandler).toHaveBeenCalledTimes(3);
        expect(textHandler).toHaveBeenNthCalledWith(1, "line1");
        expect(textHandler).toHaveBeenNthCalledWith(2, "line2");
        expect(textHandler).toHaveBeenNthCalledWith(3, "line3");
      });
    });

    describe("Stream Termination", () => {
      it("emits terminated event when stream ends", async () => {
        const mockResponse = new Response(createTestStream([]), {
          headers: headers,
        });
        mockRequest.mockResolvedValueOnce(mockResponse);

        const terminatedHandler = jest.fn();
        watchApi.on(EVENT_TERMINATE, terminatedHandler);

        await initializeWatch();
        await jest.runAllTimersAsync();

        expect(terminatedHandler).toHaveBeenCalledTimes(1);
      });
    });
  });

  describe("Error Handling", () => {
    it("handles network errors", async () => {
      const networkError = new Error("Network error");
      mockRequest.mockRejectedValueOnce(networkError);

      const errorHandler = jest.fn();
      watchApi.on(EVENT_ERROR, errorHandler);

      await initializeWatch();
      await jest.runAllTimersAsync();

      expect(errorHandler).toHaveBeenCalledWith(networkError);
    });

    it("handles unknown event types", async () => {
      const mockResponse = new Response(
        createTestStream(['{"type":"UNKNOWN","object":{"uid":1}}\n']),
        { headers: headers }
      );
      mockRequest.mockResolvedValueOnce(mockResponse);

      const errorHandler = jest.fn();
      watchApi.on(EVENT_ERROR, errorHandler);

      await initializeWatch();
      await jest.runAllTimersAsync();

      expect(errorHandler).toHaveBeenCalledWith({
        message: "Unknown event type: UNKNOWN",
      });
    });
  });
});
+17 −6
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ export const EVENT_TERMINATE = "terminate";

const EVENT_STRINGS = Object.freeze([
  EVENT_DATA,
  EVENT_PLAIN_TEXT,
  EVENT_ERROR,
  EVENT_TIMEOUT,
  EVENT_TERMINATE,
@@ -32,6 +33,7 @@ type EventObject = { [key: string]: any };
export class WatchApi extends BaseAPI {
  data: Array<EventObject> = [];
  eventListeners: ListenersDictionary = {};
  requestController = new AbortController();

  subscribeToStream(path: string, query: HTTPQuery) {
    this.getStream(path, query);
@@ -40,6 +42,10 @@ export class WatchApi extends BaseAPI {
    });
  }

  abortStream() {
    this.requestController.abort();
  }

  on(eventName: Events, callback: ListenerCallback) {
    if (!this.eventListeners[eventName]) {
      this.eventListeners[eventName] = [];
@@ -105,12 +111,17 @@ export class WatchApi extends BaseAPI {
  }

  private getStream(path: string, query: HTTPQuery) {
    const response = this.request({
    const response = this.request(
      {
        path,
        method: "GET",
        headers: {},
        query,
    });
      },
      {
        signal: this.requestController.signal,
      }
    );

    return response
      .then((res) => {