diff --git a/ee/app/graphql/mutations/audit_events/streaming/headers/create.rb b/ee/app/graphql/mutations/audit_events/streaming/headers/create.rb
index 17353dc0f651d8d43430200cc24ff343094ffdf2..47aed587b04f40fc9d891bce661410823de5ef1c 100644
--- a/ee/app/graphql/mutations/audit_events/streaming/headers/create.rb
+++ b/ee/app/graphql/mutations/audit_events/streaming/headers/create.rb
@@ -18,21 +18,29 @@ class Create < BaseMutation
 
           argument :destination_id, ::Types::GlobalIDType[::AuditEvents::ExternalAuditEventDestination],
                    required: true,
-                description: 'Destination to associate header with.'
+                   description: 'Destination to associate header with.'
 
           field :header, ::Types::AuditEvents::Streaming::HeaderType,
                 null: true,
                 description: 'Created header.'
 
           def resolve(destination_id:, key:, value:)
-            destination = authorized_find!(destination_id)
-            unless Feature.enabled?(:streaming_audit_event_headers, destination.group)
-              raise Gitlab::Graphql::Errors::ResourceNotAvailable, 'feature disabled'
+            params = {
+              destination: authorized_find!(destination_id),
+              key: key,
+              value: value
+            }
+
+            response = ::AuditEvents::Streaming::Headers::CreateService.new(
+              group: params[:destination]&.group,
+              params: params
+            ).execute
+
+            if response.success?
+              { header: response.payload[:header], errors: [] }
+            else
+              { header: nil, errors: response.errors }
             end
-
-            header = destination.headers.new(key: key, value: value)
-
-            { header: (header if header.save), errors: Array(header.errors) }
           end
 
           private
diff --git a/ee/app/graphql/mutations/audit_events/streaming/headers/destroy.rb b/ee/app/graphql/mutations/audit_events/streaming/headers/destroy.rb
index 17143740b332e3e886b444ab115d039c17b0a22b..b1c3b3335b80e0e72e64843c72be68183ac241c7 100644
--- a/ee/app/graphql/mutations/audit_events/streaming/headers/destroy.rb
+++ b/ee/app/graphql/mutations/audit_events/streaming/headers/destroy.rb
@@ -10,19 +10,24 @@ class Destroy < BaseMutation
 
           argument :header_id, ::Types::GlobalIDType[::AuditEvents::Streaming::Header],
                    required: true,
-                description: 'Header to delete.'
+                   description: 'Header to delete.'
 
           def resolve(header_id:)
             header = authorized_find!(id: header_id)
+            params = {
+              header: header,
+              destination: header.external_audit_event_destination
+            }
 
-            unless Feature.enabled?(:streaming_audit_event_headers, header.external_audit_event_destination.group)
-              raise Gitlab::Graphql::Errors::ResourceNotAvailable, 'feature disabled'
-            end
+            response = ::AuditEvents::Streaming::Headers::DestroyService.new(
+              group: params[:destination]&.group,
+              params: params
+            ).execute
 
-            if header.destroy
+            if response.success?
               { header: nil, errors: [] }
             else
-              { header: header, errors: Array(header.errors) }
+              { header: header, errors: response.errors }
             end
           end
 
diff --git a/ee/app/graphql/mutations/audit_events/streaming/headers/update.rb b/ee/app/graphql/mutations/audit_events/streaming/headers/update.rb
index db47650a5f63e24efd3c249057cb383d3a64a47e..0560818b00fd3f9f90da7fe28356523fcdf3e9f0 100644
--- a/ee/app/graphql/mutations/audit_events/streaming/headers/update.rb
+++ b/ee/app/graphql/mutations/audit_events/streaming/headers/update.rb
@@ -10,7 +10,7 @@ class Update < BaseMutation
 
           argument :header_id, ::Types::GlobalIDType[::AuditEvents::Streaming::Header],
                    required: true,
-                description: 'Header to update.'
+                   description: 'Header to update.'
 
           argument :key, GraphQL::Types::String,
                    required: true,
@@ -26,15 +26,22 @@ class Update < BaseMutation
 
           def resolve(header_id:, key:, value:)
             header = authorized_find!(id: header_id)
-
-            unless Feature.enabled?(:streaming_audit_event_headers, header.external_audit_event_destination.group)
-              raise Gitlab::Graphql::Errors::ResourceNotAvailable, 'feature disabled'
-            end
-
-            if header.update(key: key, value: value)
-              { header: header, errors: [] }
+            params = {
+              header: header,
+              destination: header.external_audit_event_destination,
+              key: key,
+              value: value
+            }
+
+            response = ::AuditEvents::Streaming::Headers::UpdateService.new(
+              group: params[:destination]&.group,
+              params: params
+            ).execute
+
+            if response.success?
+              { header: response.payload[:header], errors: [] }
             else
-              { header: header.reset, errors: Array(header.errors) }
+              { header: header.reset, errors: response.errors }
             end
           end
 
diff --git a/ee/app/services/audit_events/streaming/headers/base.rb b/ee/app/services/audit_events/streaming/headers/base.rb
new file mode 100644
index 0000000000000000000000000000000000000000..dc2aaf9df6d3a1316b27decc106c63ec9814d57c
--- /dev/null
+++ b/ee/app/services/audit_events/streaming/headers/base.rb
@@ -0,0 +1,34 @@
+# frozen_string_literal: true
+module AuditEvents
+  module Streaming
+    module Headers
+      class Base < ::BaseGroupService
+        attr_reader :destination
+
+        def execute
+          @destination = params[:destination]
+          return destination_error if @destination.blank?
+
+          unless feature_enabled?
+            raise Gitlab::Graphql::Errors::ResourceNotAvailable, 'feature disabled'
+          end
+        end
+
+        private
+
+        def destination_error
+          ServiceResponse.error(message: "missing destination param")
+        end
+
+        def feature_enabled?
+          Feature.enabled?(:streaming_audit_event_headers, group)
+        end
+
+        # authorization not handled by this service
+        def allowed?
+          true
+        end
+      end
+    end
+  end
+end
diff --git a/ee/app/services/audit_events/streaming/headers/create_service.rb b/ee/app/services/audit_events/streaming/headers/create_service.rb
new file mode 100644
index 0000000000000000000000000000000000000000..884e35c374f9f7ebb42322f49687d29a6e304818
--- /dev/null
+++ b/ee/app/services/audit_events/streaming/headers/create_service.rb
@@ -0,0 +1,22 @@
+# frozen_string_literal: true
+module AuditEvents
+  module Streaming
+    module Headers
+      class CreateService < Base
+        def execute
+          super
+
+          key = params[:key]
+          value = params[:value]
+          header = destination.headers.new(key: key, value: value)
+
+          if header.save
+            ServiceResponse.success(payload: { header: header, errors: [] })
+          else
+            ServiceResponse.error(message: Array(header.errors))
+          end
+        end
+      end
+    end
+  end
+end
diff --git a/ee/app/services/audit_events/streaming/headers/destroy_service.rb b/ee/app/services/audit_events/streaming/headers/destroy_service.rb
new file mode 100644
index 0000000000000000000000000000000000000000..42e4e7a249701b23e42a4e1fd8038e7dead8866b
--- /dev/null
+++ b/ee/app/services/audit_events/streaming/headers/destroy_service.rb
@@ -0,0 +1,27 @@
+# frozen_string_literal: true
+module AuditEvents
+  module Streaming
+    module Headers
+      class DestroyService < Base
+        def execute
+          super
+
+          header = params[:header]
+          return header_error if header.blank?
+
+          if header.destroy
+            ServiceResponse.success
+          else
+            ServiceResponse.error(message: Array(header.errors))
+          end
+        end
+
+        private
+
+        def header_error
+          ServiceResponse.error(message: "missing header param")
+        end
+      end
+    end
+  end
+end
diff --git a/ee/app/services/audit_events/streaming/headers/update_service.rb b/ee/app/services/audit_events/streaming/headers/update_service.rb
new file mode 100644
index 0000000000000000000000000000000000000000..cd10bb3f4cdf5fb961188129210b20b0e4c4b36e
--- /dev/null
+++ b/ee/app/services/audit_events/streaming/headers/update_service.rb
@@ -0,0 +1,27 @@
+# frozen_string_literal: true
+module AuditEvents
+  module Streaming
+    module Headers
+      class UpdateService < Base
+        def execute
+          super
+
+          header = params[:header]
+          return header_error if header.blank?
+
+          if header.update(key: params[:key], value: params[:value])
+            ServiceResponse.success(payload: { header: header, errors: [] })
+          else
+            ServiceResponse.error(message: Array(header.errors))
+          end
+        end
+
+        private
+
+        def header_error
+          ServiceResponse.error(message: "missing header param")
+        end
+      end
+    end
+  end
+end
diff --git a/ee/spec/services/audit_events/streaming/headers/base_spec.rb b/ee/spec/services/audit_events/streaming/headers/base_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..2f707242907e8c6dcc15c373337c9684234e8585
--- /dev/null
+++ b/ee/spec/services/audit_events/streaming/headers/base_spec.rb
@@ -0,0 +1,38 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe AuditEvents::Streaming::Headers::Base do
+  let(:header) { build_stubbed(:audit_events_streaming_header) }
+  let(:destination) { header.external_audit_event_destination }
+
+  subject(:service) do
+    described_class.new(
+      group: destination&.group,
+      params: { destination: destination }
+    )
+  end
+
+  describe '#execute' do
+    subject(:response) { service.execute }
+
+    context 'when destination is missing' do
+      let(:destination) { nil }
+
+      it 'returns an error' do
+        expect(response).to be_error
+        expect(response.errors).to match_array ['missing destination param']
+      end
+    end
+
+    context 'when streaming_audit_event_headers feature flag is disabled' do
+      before do
+        stub_feature_flags(streaming_audit_event_headers: false)
+      end
+
+      it 'raises an exception' do
+        expect { subject }.to raise_error(Gitlab::Graphql::Errors::ResourceNotAvailable)
+      end
+    end
+  end
+end
diff --git a/ee/spec/services/audit_events/streaming/headers/create_service_spec.rb b/ee/spec/services/audit_events/streaming/headers/create_service_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..0a494bea57387293151594c273e7af56a9cc1ea4
--- /dev/null
+++ b/ee/spec/services/audit_events/streaming/headers/create_service_spec.rb
@@ -0,0 +1,38 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe AuditEvents::Streaming::Headers::CreateService do
+  let(:destination) { create(:external_audit_event_destination) }
+  let(:params) { {  destination: destination } }
+
+  subject(:service) do
+    described_class.new(
+      group: destination&.group,
+      params: params
+    )
+  end
+
+  describe '#execute' do
+    subject(:response) { service.execute }
+
+    context 'when there are validation issues' do
+      let(:expected_errors) { ["Key can't be blank", "Value can't be blank"] }
+
+      it 'has an array of errors in the response' do
+        expect(response).to be_error
+        expect(response.errors).to match_array expected_errors
+      end
+    end
+
+    context 'when the header is created successfully' do
+      let(:params) { super().merge( key: 'a_key', value: 'a_value') }
+
+      it 'has the header in the response payload' do
+        expect(response).to be_success
+        expect(response.payload[:header].key).to eq 'a_key'
+        expect(response.payload[:header].value).to eq 'a_value'
+      end
+    end
+  end
+end
diff --git a/ee/spec/services/audit_events/streaming/headers/destroy_service_spec.rb b/ee/spec/services/audit_events/streaming/headers/destroy_service_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..40e731b53d311af2d281b3c740e9abc817752503
--- /dev/null
+++ b/ee/spec/services/audit_events/streaming/headers/destroy_service_spec.rb
@@ -0,0 +1,42 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe AuditEvents::Streaming::Headers::DestroyService do
+  let(:header) { create(:audit_events_streaming_header) }
+  let(:destination) { header.external_audit_event_destination }
+  let(:params) { {  destination: destination, header: header } }
+
+  subject(:service) do
+    described_class.new(
+      group: destination&.group,
+      params: params
+    )
+  end
+
+  describe '#execute' do
+    context 'when no header is provided' do
+      let(:params) { super().merge( header: nil) }
+
+      it 'does not destroy the header' do
+        expect { service.execute }.not_to change { destination.headers.count }
+      end
+
+      it 'has an error response' do
+        response = service.execute
+
+        expect(response).to be_error
+        expect(response.errors).to match_array ['missing header param']
+      end
+    end
+
+    context 'when the header is destroyed successfully' do
+      let(:response) { service.execute }
+
+      it 'destroys the header' do
+        expect { response }.to change { destination.headers.count }.by(-1)
+        expect(response).to be_success
+      end
+    end
+  end
+end
diff --git a/ee/spec/services/audit_events/streaming/headers/update_service_spec.rb b/ee/spec/services/audit_events/streaming/headers/update_service_spec.rb
new file mode 100644
index 0000000000000000000000000000000000000000..4187dc9936121f73455ef6c24608e307e0f18f1b
--- /dev/null
+++ b/ee/spec/services/audit_events/streaming/headers/update_service_spec.rb
@@ -0,0 +1,49 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe AuditEvents::Streaming::Headers::UpdateService do
+  let(:header) { create(:audit_events_streaming_header, key: 'old', value: 'old') }
+  let(:destination) { header.external_audit_event_destination }
+  let(:params) do
+    {
+      destination: destination,
+      header: header,
+      key: 'new',
+      value: 'new'
+    }
+  end
+
+  subject(:service) do
+    described_class.new(
+      group: destination&.group,
+      params: params
+    )
+  end
+
+  describe '#execute' do
+    subject(:response) { service.execute }
+
+    context 'when no header is provided' do
+      let(:params) { super().merge( header: nil) }
+
+      it 'does not update the header' do
+        expect(header.reload.key).to eq 'old'
+        expect(header.value).to eq 'old'
+      end
+
+      it 'has an error response' do
+        expect(response).to be_error
+        expect(response.errors).to match_array ['missing header param']
+      end
+    end
+
+    context 'when the header is updated successfully' do
+      it 'updates the header' do
+        expect(response).to be_success
+        expect(header.reload.key).to eq 'new'
+        expect(header.value).to eq 'new'
+      end
+    end
+  end
+end