Event Processing and Parsers
Relevant source files
The following files were used as context for generating this wiki page:
This document provides a comprehensive guide for developers working with eCapture's event processing system and parsers. It covers the architecture of event processing, the parser system, protocol detection mechanisms, and step-by-step instructions for adding new event types and implementing custom parsers.
For information about event structures and types themselves, see Event Structures and Types. For the broader event processing pipeline architecture, see Event Processing Pipeline. For protocol-specific parsing implementations, see Protocol Parsing.
Event Processing Architecture Overview
The event processing system transforms raw eBPF events into formatted, protocol-aware output. The system consists of three primary layers:
Sources: pkg/event_processor/processor.go:1-216, pkg/event_processor/iworker.go:1-366, pkg/event_processor/iparser.go:1-167
Event Structures (IEventStruct)
All events entering the processing system implement the IEventStruct interface, which provides a standard contract for event handling.
IEventStruct Interface
The interface is defined in user/event/ievent.go:41-52:
| Method | Return Type | Purpose |
|---|---|---|
Decode(payload []byte) | error | Deserialize raw eBPF data into structured event |
Payload() | []byte | Return event payload data |
PayloadLen() | int | Return payload length |
String() | string | Return human-readable representation |
StringHex() | string | Return hex dump representation |
Clone() | IEventStruct | Create deep copy of event |
EventType() | Type | Return event classification (Output, ModuleData, EventProcessor) |
GetUUID() | string | Return unique connection identifier |
Base() | Base | Return base metadata (PID, IP, port, etc.) |
ToProtobufEvent() | *pb.Event | Convert to protobuf format for serialization |
Event Types
Events are classified by their intended processing path user/event/ievent.go:26-37:
Sources: user/event/ievent.go:15-71
EventProcessor Component
The EventProcessor is the central coordinator that receives events from modules and routes them to appropriate workers. It maintains a pool of workers, each responsible for processing events from a specific connection.
EventProcessor Structure
Key fields from pkg/event_processor/processor.go:30-50:
| Field | Type | Purpose |
|---|---|---|
incoming | chan event.IEventStruct | Receives new events from modules (buffer: 1024) |
outComing | chan []byte | Sends formatted output to logger (buffer: 1024) |
destroyConn | chan uint64 | Signals socket destruction for cleanup (buffer: 1024) |
workerQueue | map[string]IWorker | Maps UUID to worker instances |
logger | io.Writer | Output destination (console, file, WebSocket) |
isHex | bool | Whether to output in hexadecimal format |
truncateSize | uint64 | Maximum payload size before truncation |
closeChan | chan bool | Shutdown signal |
errChan | chan error | Error reporting channel (buffer: 16) |
Event Processing Flow
EventProcessor Methods
Core Methods:
Write(e event.IEventStruct)pkg/event_processor/processor.go:165-175 - External entry point for adding events to the processing queueServe() errorpkg/event_processor/processor.go:66-89 - Main event loop that processes incoming events, destruction signals, and outputdispatch(e event.IEventStruct) errorpkg/event_processor/processor.go:91-109 - Routes events to workers based on UUIDWriteDestroyConn(s uint64)pkg/event_processor/processor.go:177-185 - Signals that a socket has been destroyed and its workers should be cleaned upClose() errorpkg/event_processor/processor.go:187-200 - Gracefully shuts down the processor
Worker Management:
getWorkerByUUID(uuid string)pkg/event_processor/processor.go:130-141 - Thread-safe retrieval with reference countingaddWorkerByUUID(worker IWorker)pkg/event_processor/processor.go:143-148 - Thread-safe worker registrationdelWorkerByUUID(worker IWorker)pkg/event_processor/processor.go:151-155 - Thread-safe worker removaldestroyWorkers(destroyUUID uint64)pkg/event_processor/processor.go:115-128 - Closes workers associated with a destroyed socket
Sources: pkg/event_processor/processor.go:1-216
Worker System (IWorker)
Each eventWorker handles events for a single connection (identified by UUID), accumulating payload fragments and triggering parsing when appropriate.
IWorker Interface
Defined in pkg/event_processor/iworker.go:35-49:
type IWorker interface {
Write(event.IEventStruct) error // Add event to processing queue
GetUUID() string // Return worker's UUID
GetDestroyUUID() uint64 // Return socket ID for lifecycle tracking
IfUsed() bool // Check if worker is currently referenced
Get() // Increment reference count
Put() // Decrement reference count
CloseEventWorker() // Signal worker to close
}Worker Lifecycle Models
Workers support two distinct lifecycle models pkg/event_processor/iworker.go:57-63:
1. LifeCycleStateDefault (Self-Managed)
- Created when first event arrives
- Self-destructs after
MaxTickerCount(10 × 100ms = 1 second) of inactivity closeChanisnil- Used for short-lived connections
2. LifeCycleStateSock (Socket-Bound)
- Created for events with UUID prefix
sock:user/event/ievent.go:39 - Persists until explicit
CloseEventWorker()call closeChanis non-nil- Worker extracts socket ID from UUID:
sock:Pid_Tid_Comm_Fd_DataType_Tuple_Sock - Used for long-lived socket connections
Worker Event Processing Loop
The Run() method pkg/event_processor/iworker.go:262-306 implements the main event processing loop:
Reference Counting and Thread Safety
Workers use atomic reference counting to prevent race conditions during destruction pkg/event_processor/iworker.go:346-360:
Get()- Atomically setsusedflag to true, called bygetWorkerByUUID()before returning workerPut()- Atomically setsusedflag to false, called after event is writtenIfUsed()- Checks if worker is currently referenced
The drainAndClose() method pkg/event_processor/iworker.go:308-337 ensures safe cleanup by:
- Draining remaining events from
incomingchannel - Waiting until
IfUsed()returns false (no other goroutines hold reference) - Calling
Close()to output final results
Sources: pkg/event_processor/iworker.go:1-366
Parser System (IParser)
Parsers transform accumulated payload bytes into human-readable, protocol-specific output. The system supports dynamic parser selection based on payload content.
IParser Interface
Defined in pkg/event_processor/iparser.go:49-60:
type IParser interface {
detect(b []byte) error // Attempt to identify protocol from payload
Write(b []byte) (int, error) // Accumulate payload data
ParserType() ParserType // Return parser type identifier
PacketType() PacketType // Return packet encoding (gzip, etc.)
Name() string // Return human-readable parser name
IsDone() bool // Check if parsing is complete
Init() // Initialize parser state
Display() []byte // Generate formatted output
Reset() // Reset parser for reuse
}Parser Types
The system defines several parser types pkg/event_processor/iparser.go:40-47:
| ParserType | Value | Purpose |
|---|---|---|
ParserTypeNull | 0 | Default/unknown protocol (hex dump) |
ParserTypeHttpRequest | 1 | HTTP/1.x request parsing |
ParserTypeHttp2Request | 2 | HTTP/2 request parsing |
ParserTypeHttpResponse | 3 | HTTP/1.x response parsing |
ParserTypeHttp2Response | 4 | HTTP/2 response parsing |
ParserTypeWebSocket | 5 | WebSocket frame parsing |
Packet Types
Parsers can identify packet encoding pkg/event_processor/iparser.go:33-38:
| PacketType | Purpose |
|---|---|
PacketTypeNull | Uncompressed data |
PacketTypeGzip | Gzip-compressed data |
PacketTypeWebSocket | WebSocket-framed data |
Sources: pkg/event_processor/iparser.go:1-167
Protocol Detection Mechanism
The parser selection process uses a try-each-parser approach with automatic fallback to hex dump.
Parser Registration
Parsers register themselves during package initialization using the Register() function pkg/event_processor/iparser.go:64-73:
func init() {
hr := &HTTPRequest{}
hr.Init()
Register(hr) // Adds to global parsers map
}All registered parsers are stored in the global parsers map pkg/event_processor/iparser.go:62.
Detection Flow
The NewParser() function pkg/event_processor/iparser.go:85-115 implements the detection logic:
Example: HTTP Request Detection
The HTTPRequest parser pkg/event_processor/http_request.go:83-92 uses Go's http.ReadRequest() to detect valid HTTP requests:
func (hr *HTTPRequest) detect(payload []byte) error {
rd := bytes.NewReader(payload)
buf := bufio.NewReader(rd)
_, err := http.ReadRequest(buf)
if err != nil {
return err // Not a valid HTTP request
}
return nil // Valid HTTP request detected
}Similar approach for HTTP Response pkg/event_processor/http_response.go:94-102:
func (hr *HTTPResponse) detect(payload []byte) error {
rd := bytes.NewReader(payload)
buf := bufio.NewReader(rd)
_, err := http.ReadResponse(buf, nil)
if err != nil {
return err // Not a valid HTTP response
}
return nil // Valid HTTP response detected
}Sources: pkg/event_processor/iparser.go:85-115, pkg/event_processor/http_request.go:83-92, pkg/event_processor/http_response.go:94-102
Adding New Event Types
To add a new event type that flows through the processing system:
Step 1: Define Event Structure
Create a new struct in the appropriate user/event/ subdirectory that implements IEventStruct:
// user/event/mymodule/my_event.go
package mymodule
import (
"encoding/binary"
"github.com/gojue/ecapture/user/event"
pb "github.com/gojue/ecapture/protobuf/gen/v1"
)
type MyCustomEvent struct {
event.Base // Embed base event metadata
CustomField1 uint32
CustomField2 [64]byte
}
func (e *MyCustomEvent) Decode(payload []byte) error {
// Deserialize from eBPF C struct layout
if len(payload) < 104 { // sizeof(Base) + custom fields
return errors.New("payload too short")
}
// Decode base fields
e.Base.Decode(payload)
// Decode custom fields
offset := 40 // Size of Base struct
e.CustomField1 = binary.LittleEndian.Uint32(payload[offset:])
copy(e.CustomField2[:], payload[offset+4:offset+68])
return nil
}
func (e *MyCustomEvent) Clone() event.IEventStruct {
newEvent := *e
return &newEvent
}
func (e *MyCustomEvent) EventType() event.Type {
return event.TypeEventProcessor // Route to EventProcessor
}
func (e *MyCustomEvent) GetUUID() string {
// Generate unique identifier for connection grouping
return fmt.Sprintf("%d_%d_%s_%d",
e.PID, e.TID, e.PName, e.CustomField1)
}
// Implement remaining IEventStruct methods...Step 2: Module Integration
In your module's Decode() method, instantiate and populate your event:
func (m *MyModule) Decode(em *ebpf.Map, b []byte) (event.IEventStruct, error) {
e := &mymodule.MyCustomEvent{}
err := e.Decode(b)
if err != nil {
return nil, err
}
return e, nil
}Step 3: Send to EventProcessor
In your module's event reading loop, write events to the EventProcessor:
// In module's Run() method
for {
select {
case event := <-eventChannel:
m.processor.Write(event) // Routes to EventProcessor
}
}Sources: user/event/ievent.go:41-52, pkg/event_processor/processor.go:165-175
Implementing Custom Parsers
To add support for a new protocol or output format:
Step 1: Create Parser Structure
// pkg/event_processor/my_protocol.go
package event_processor
import (
"bytes"
"bufio"
)
type MyProtocolParser struct {
reader *bytes.Buffer
bufReader *bufio.Reader
isDone bool
isInit bool
// Protocol-specific fields
header MyProtocolHeader
}Step 2: Implement IParser Interface
Initialize Parser
func (p *MyProtocolParser) Init() {
p.reader = bytes.NewBuffer(nil)
p.bufReader = bufio.NewReader(p.reader)
p.isDone = false
p.isInit = false
}
func (p *MyProtocolParser) Name() string {
return "MyProtocolParser"
}Implement Detection Logic
func (p *MyProtocolParser) detect(payload []byte) error {
// Check for protocol magic bytes or structure
if len(payload) < 4 {
return errors.New("payload too short")
}
// Example: Check for magic number
magic := binary.BigEndian.Uint32(payload[0:4])
if magic != MY_PROTOCOL_MAGIC {
return errors.New("not my protocol")
}
return nil
}Accumulate Data
func (p *MyProtocolParser) Write(b []byte) (int, error) {
// First write: parse header
if !p.isInit {
n, err := p.reader.Write(b)
if err != nil {
return n, err
}
// Attempt to parse header
err = p.parseHeader()
if err != nil {
return 0, err
}
p.isInit = true
return n, nil
}
// Subsequent writes: accumulate body
n, err := p.reader.Write(b)
if err != nil {
return n, err
}
// Check if complete based on protocol length field
if p.reader.Len() >= p.header.TotalLength {
p.isDone = true
}
return n, nil
}Format Output
func (p *MyProtocolParser) Display() []byte {
var output bytes.Buffer
// Format header
fmt.Fprintf(&output, "My Protocol v%d\n", p.header.Version)
fmt.Fprintf(&output, "Type: %d\n", p.header.MessageType)
fmt.Fprintf(&output, "Length: %d\n", p.header.TotalLength)
fmt.Fprintf(&output, "\n")
// Format body
body := p.reader.Bytes()[p.header.HeaderSize:]
output.Write(body)
return output.Bytes()
}Implement Remaining Methods
func (p *MyProtocolParser) ParserType() ParserType {
return ParserTypeNull // Or define new ParserType constant
}
func (p *MyProtocolParser) PacketType() PacketType {
return PacketTypeNull
}
func (p *MyProtocolParser) IsDone() bool {
return p.isDone
}
func (p *MyProtocolParser) Reset() {
p.isDone = false
p.isInit = false
p.reader.Reset()
p.bufReader.Reset(p.reader)
}Step 3: Register Parser
Add an init() function to register your parser globally:
func init() {
p := &MyProtocolParser{}
p.Init()
Register(p) // Makes parser available for detection
}Step 4: Handle Special Cases
Compression Support
If your protocol supports compression (like HTTP gzip), decompress in Display():
func (p *MyProtocolParser) Display() []byte {
rawData := p.reader.Bytes()
if p.isCompressed {
reader, err := gzip.NewReader(bytes.NewReader(rawData))
if err != nil {
return rawData // Return raw on error
}
defer reader.Close()
decompressed, err := io.ReadAll(reader)
if err != nil {
return rawData
}
p.packerType = PacketTypeGzip
return decompressed
}
return rawData
}Chunked Transfer
If your protocol uses chunked transfer encoding, accumulate chunks:
func (p *MyProtocolParser) Write(b []byte) (int, error) {
n, err := p.reader.Write(b)
if err != nil {
return n, err
}
// Check for chunk terminator
if bytes.HasSuffix(p.reader.Bytes(), []byte("0\r\n\r\n")) {
p.isDone = true
}
return n, nil
}Sources: pkg/event_processor/iparser.go:49-167, pkg/event_processor/http_request.go:1-164, pkg/event_processor/http_response.go:1-182
Example: HTTP Request Parser Implementation
The existing HTTP request parser provides a complete implementation example:
Structure
pkg/event_processor/http_request.go:28-35:
type HTTPRequest struct {
request *http.Request // Parsed HTTP request
packerType PacketType // Compression type
isDone bool // Parsing complete flag
isInit bool // Header parsed flag
reader *bytes.Buffer // Raw data accumulator
bufReader *bufio.Reader // Buffered reader for parsing
}Key Methods
Detection pkg/event_processor/http_request.go:83-92:
- Uses Go's standard
http.ReadRequest()to validate HTTP syntax - Returns error if payload is not a valid HTTP request
Writing pkg/event_processor/http_request.go:54-81:
- On first write: parse HTTP request line and headers
- On subsequent writes: accumulate body data
- Sets
isDonewhen body is complete (though currently always false)
Display pkg/event_processor/http_request.go:105-157:
- Reads request body using
io.ReadAll() - Detects and decompresses gzip-encoded bodies
- Uses
httputil.DumpRequest()to format headers - Appends body content
HTTP Response Parser
Similar structure in pkg/event_processor/http_response.go:28-37 with key differences:
- Uses
http.ReadResponse()for detection pkg/event_processor/http_response.go:94-102 - Handles chunked transfer encoding and Content-Length validation pkg/event_processor/http_response.go:115-135
- Supports gzip decompression of response bodies pkg/event_processor/http_response.go:136-159
Sources: pkg/event_processor/http_request.go:1-164, pkg/event_processor/http_response.go:1-182
Parser Registration System
Global Parser Registry
The parser system maintains a global registry pkg/event_processor/iparser.go:62:
var parsers = make(map[string]IParser)Registration Function
pkg/event_processor/iparser.go:64-73:
func Register(p IParser) {
if p == nil {
panic("Register Parser is nil")
}
name := p.Name()
if _, dup := parsers[name]; dup {
panic(fmt.Sprintf("Register called twice for Parser %s", name))
}
parsers[name] = p
}Accessing Parsers
Two utility functions provide access to registered parsers:
GetAllModules() map[string]IParserpkg/event_processor/iparser.go:76-78 - Returns all registered parsers for detection loopGetModuleByName(name string) IParserpkg/event_processor/iparser.go:81-83 - Retrieves specific parser by name
Init-Time Registration Pattern
Each parser file includes an init() function that registers the parser when the package loads:
func init() {
hr := &HTTPRequest{}
hr.Init()
Register(hr)
}This pattern ensures all parsers are automatically registered before NewParser() is called during event processing.
Sources: pkg/event_processor/iparser.go:62-83, pkg/event_processor/http_request.go:159-163, pkg/event_processor/http_response.go:177-181
Complete Event Processing Example
Here's how a complete event flows through the system:
Sources: pkg/event_processor/processor.go:66-109, pkg/event_processor/iworker.go:91-137, pkg/event_processor/iworker.go:262-306, pkg/event_processor/iparser.go:85-115
Testing Custom Parsers
Unit Testing Pattern
Create a test file for your parser:
// pkg/event_processor/my_protocol_test.go
package event_processor
import (
"testing"
)
func TestMyProtocolParser_Detect(t *testing.T) {
p := &MyProtocolParser{}
p.Init()
// Valid payload
validPayload := []byte{0x12, 0x34, 0x56, 0x78, ...}
err := p.detect(validPayload)
if err != nil {
t.Errorf("detect() failed on valid payload: %v", err)
}
// Invalid payload
invalidPayload := []byte{0xFF, 0xFF, 0xFF, 0xFF}
err = p.detect(invalidPayload)
if err == nil {
t.Error("detect() should have failed on invalid payload")
}
}
func TestMyProtocolParser_WriteAndDisplay(t *testing.T) {
p := &MyProtocolParser{}
p.Init()
// Write payload chunks
chunk1 := []byte{...}
chunk2 := []byte{...}
n, err := p.Write(chunk1)
if err != nil {
t.Fatalf("Write() failed: %v", err)
}
n, err = p.Write(chunk2)
if err != nil {
t.Fatalf("Write() failed: %v", err)
}
// Check if done
if !p.IsDone() {
t.Error("Parser should be done after full payload")
}
// Get formatted output
output := p.Display()
if len(output) == 0 {
t.Error("Display() returned empty output")
}
}Integration Testing
Test your parser with the full EventProcessor:
func TestMyProtocolParser_Integration(t *testing.T) {
// Create EventProcessor with test logger
var buf bytes.Buffer
logger := &buf
ep := NewEventProcessor(logger, false, 0)
// Start processor
go ep.Serve()
// Create test event
event := &MyTestEvent{
Base: event.Base{
PID: 1234,
// ... other fields
},
// ... custom fields
}
// Write event
ep.Write(event)
// Wait for processing
time.Sleep(2 * time.Second)
// Check output
output := buf.String()
if !strings.Contains(output, "My Protocol") {
t.Errorf("Output doesn't contain expected content: %s", output)
}
// Cleanup
ep.Close()
}Sources: pkg/event_processor/iparser.go:49-167, pkg/event_processor/processor.go:206-215
Summary
The eCapture event processing and parser system provides:
- EventProcessor - Central coordinator that routes events to workers by UUID
- eventWorker - Per-connection payload accumulation with two lifecycle models (default and socket-bound)
- IParser - Protocol-aware parsing with automatic detection and fallback
- Parser Registry - Global registration system for extensibility
To extend the system:
- Define new event types implementing
IEventStruct - Create parsers implementing
IParserwithdetect(),Write(), andDisplay()methods - Register parsers in
init()functions for automatic discovery - Test parsers in isolation and with full EventProcessor integration
The architecture ensures thread-safe operation through reference counting, supports both short-lived and long-lived connections, and provides graceful shutdown with payload draining.