sse

package
v1.14.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 3, 2025 License: MIT Imports: 16 Imported by: 0

README

SSE

A high-performance Go language Server-Sent Events (SSE) server and client implementation, supporting uni-cast and broadcast events, automatic reconnection, message persistence, and other features.

Features

  • 🚀 High-performance Event Hub for managing client connections
  • 🔌 Supports automatic reconnection and event retransmission on disconnect
  • 📊 Built-in push statistics and performance monitoring
  • 🔒 Thread-safe client management
  • ⏱️ Supports timeout retries and asynchronous task processing
  • 💾 Optional persistent storage interface
  • ❤️ Built-in heartbeat detection mechanism

Example of Use

Server Example
package main

import (
    "net/http"
    "strconv"
    "time"
    "math/rand"
    "github.com/gin-gonic/gin"
    "github.com/go-dev-frame/sponge/pkg/sse"
)

func main() {
    // Initialize SSE Hub
    hub := sse.NewHub()
    defer hub.Close()

    // Create Gin router
    r := gin.Default()

    // SSE Event Stream Interface, requires authentication to set uid
    r.GET("/events", func(c *gin.Context) {
        var uid string
        u, isExists := c.Get("uid")
        if !isExists {
            uid = strconv.Itoa(rand.Intn(99) + 100) // mock uid
        } else {
            uid, _ = u.(string)
        }
        hub.Serve(c, uid)
    })

    // Register event push endpoint, supports pushing to specified users and broadcast pushing
    // Push to specified users
    // curl -X POST -H "Content-Type: application/json" -d '{"uids":["u001"],"events":[{"event":"message","data":"hello_world"}]}' http://localhost:8080/push
    // Broadcast push, not specifying users means pushing to all users
    // curl -X POST -H "Content-Type: application/json" -d '{"events":[{"event":"message","data":"hello_world"}]}' http://localhost:8080/push
    r.POST("/push", hub.PushEventHandler())

    // simulated event push
    go func() {
        i := 0
        for {
            time.Sleep(time.Second * 5)
            i++
            e := &sse.Event{Event: sse.DefaultEventType, Data: "hello_world_" + strconv.Itoa(i)}
            _ = hub.Push(nil, e) // broadcast push
            //_ = hub.Push([]string{uid}, e) // specified user push
        }
    }()

    // Start HTTP server
    if err := http.ListenAndServe(":8080", r); err != nil {
        panic(err)
    }
}

Client Example
package main

import (
    "fmt"
    "github.com/go-dev-frame/sponge/pkg/sse"
)

func main() {
    url := "http://localhost:8080/events"

    // Create SSE client
    client := sse.NewClient(url)

    client.OnEvent(sse.DefaultEventType, func(event *sse.Event) {
        fmt.Printf("Received: %#v\n", event)
    })

    err := client.Connect()
    if err != nil {
        fmt.Printf("Connection failed: %v\n", err)
        return
    }

    fmt.Println("SSE client started, press Ctrl+C to exit")
    <-client.Wait()
}

Advanced Configuration

Using Persistent Storage

You can implement map, redis, mysql and other storage to achieve persistent storage and query of events. Example code:

// Implement the Store interface
type MyStore struct{}

func (s *MyStore) Save(ctx context.Context, e *sse.Event) error {
    // Implement event storage logic
    return nil
}

func (s *MyStore) ListByLastID(ctx context.Context, eventType string, lastID string, pageSize int) ([]*sse.Event, string, error) {
    // Implement event query logic, paginate query, return event list, last event ID
    return nil, nil
}

// Create Hub with storage
hub := sse.NewHub(sse.WithStore(&MyStore{}))

Configure whether events need to be resent when the client disconnects and reconnects

To enable this feature, it needs to be used with event persistent storage. Example code:

hub := sse.NewHub(
    sse.WithStore(&MyStore{}),
    sse.WithEnableResendEvents(),
)

Customizing Push Failed Event Handling

Code example:

fn := func(uid string, event *sse.Event) {
    // Custom handling logic for push failures, such as logging or saving to database
    log.Printf("Push failed: User %s, Event ID %s", uid, event.ID)
}

// Create Hub with push failed handling
hub := sse.NewHub(sse.WithPushFailedHandleFn(fn))

API Reference

Hub Methods
  • NewHub(opts ...HubOption) *Hub: Creates a new event hub, supporting custom persistence, re-sending events, logging, push event buffer size, and concurrent push event goroutine options.
  • Push(uids []string, events ...*Event) error: Pushes events to specified users or all users
  • OnlineClientsNum() int: Gets the number of online clients
  • Close(): Closes the event hub
  • PrintPushStats(): Prints push statistics

Serve Method
  • Serve(c *gin.Context, hub *Hub, uid string, opts...ServeOption): Handles SSE client connection requests, supports setting custom request headers.

Client Methods
  • NewClient(url string) *SSEClient: Creates a new SSE client, supporting custom request headers, reconnection interval, and log options.
  • Connect() error: Connects to the server
  • Disconnect(): Disconnects
  • OnEvent(eventType string, callback EventCallback): Registers an event callback

Performance Tuning

  • WithChanBufferSize(size int): Sets the broadcast channel buffer size
  • WithWorkerNum(num int): Sets the number of asynchronous worker goroutines

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultEventType = "message"

DefaultEventType is the default event type if not provided in the event

Functions

func SetNoConnectionErrText

func SetNoConnectionErrText(errText string)

Types

type AsyncTaskPool

type AsyncTaskPool struct {
	// contains filtered or unexported fields
}

func NewAsyncTaskPool

func NewAsyncTaskPool(maxWorkers int) *AsyncTaskPool

NewAsyncTaskPool creates a task pool with a fixed capacity

func (*AsyncTaskPool) Stop

func (p *AsyncTaskPool) Stop()

func (*AsyncTaskPool) Submit

func (p *AsyncTaskPool) Submit(task func())

func (*AsyncTaskPool) Wait

func (p *AsyncTaskPool) Wait()

type ClientOption

type ClientOption func(*clientOptions)

func WithClientHeaders

func WithClientHeaders(headers map[string]string) ClientOption

WithClientHeaders set HTTP headers

func WithClientLogger

func WithClientLogger(logger *zap.Logger) ClientOption

WithClientLogger set logger

func WithClientReconnectTimeInterval

func WithClientReconnectTimeInterval(d time.Duration) ClientOption

WithClientReconnectTimeInterval set reconnect time interval

type Event

type Event struct {
	ID    string      `json:"id"`    // event id, unique, if not provided, server will generate one
	Event string      `json:"event"` // event type
	Data  interface{} `json:"data"`  // event data
}

Event defines a Server-Sent Event

func (*Event) CheckValid

func (e *Event) CheckValid() error

CheckValid checks if the event is valid

type EventCallback

type EventCallback func(event *Event)

EventCallback event callback

type Hub

type Hub struct {
	PushStats *PushStats
	// contains filtered or unexported fields
}

Hub event center, manage client connections, receive user events, and broadcast them to online users

func NewHub

func NewHub(opts ...HubOption) *Hub

NewHub create a new event center

func (*Hub) Close

func (h *Hub) Close()

Close event center and stop all worker

func (*Hub) OnlineClientsNum

func (h *Hub) OnlineClientsNum() int

OnlineClientsNum get online clients num

func (*Hub) PrintPushStats

func (h *Hub) PrintPushStats()

PrintPushStats print push stats

func (*Hub) Push

func (h *Hub) Push(uids []string, events ...*Event) error

Push to specified users or all online users

func (*Hub) PushEventHandler

func (h *Hub) PushEventHandler() func(c *gin.Context)

PushEventHandler gin handler for push event request

func (*Hub) PushHeartBeat

func (h *Hub) PushHeartBeat(uid string)

PushHeartBeat push heart beat event to specified user

func (*Hub) Serve

func (h *Hub) Serve(c *gin.Context, uid string, opts ...ServeOption)

Serve serves a client connection

func (*Hub) ServeHandler

func (h *Hub) ServeHandler(opts ...ServeOption) func(c *gin.Context)

ServeHandler gin handler for sse server

type HubOption

type HubOption func(*hubOptions)

HubOption hub option

func WithContext

func WithContext(ctx context.Context, cancel context.CancelFunc) HubOption

WithContext set context and cancel

func WithEnableResendEvents

func WithEnableResendEvents() HubOption

WithEnableResendEvents enable resend events

func WithLogger

func WithLogger(logger *zap.Logger) HubOption

WithLogger set logger

func WithPushBufferSize

func WithPushBufferSize(size int) HubOption

WithPushBufferSize set push events buffer size

func WithPushFailedHandleFn

func WithPushFailedHandleFn(fn func(uid string, event *Event)) HubOption

WithPushFailedHandleFn set push failed handle function

func WithStore

func WithStore(store Store) HubOption

WithStore set store

func WithWorkerNum

func WithWorkerNum(num int) HubOption

WithWorkerNum set worker num

type PushRequest

type PushRequest struct {
	UIDs   []string `json:"uids"`
	Events []*Event `json:"events"`
}

PushRequest push request

type PushStats

type PushStats struct {
	// contains filtered or unexported fields
}

PushStats push stats

func (*PushStats) IncFailed

func (s *PushStats) IncFailed()

IncFailed increment failed push count

func (*PushStats) IncSuccess

func (s *PushStats) IncSuccess()

IncSuccess increment success push count

func (*PushStats) IncTimeout

func (s *PushStats) IncTimeout()

IncTimeout increment timeout push count

func (*PushStats) IncTotal

func (s *PushStats) IncTotal()

IncTotal increment total push count

func (*PushStats) Snapshot

func (s *PushStats) Snapshot() (total, success, failed, timeout int64)

Snapshot get push stats snapshot

type SSEClient

type SSEClient struct {
	// contains filtered or unexported fields
}

SSEClient sse client

func NewClient

func NewClient(url string, opts ...ClientOption) *SSEClient

NewClient create a new sse client

func (*SSEClient) Connect

func (c *SSEClient) Connect() error

Connect to server

func (*SSEClient) Disconnect

func (c *SSEClient) Disconnect()

Disconnect to server

func (*SSEClient) GetConnectStatus

func (c *SSEClient) GetConnectStatus() bool

GetConnectStatus get connect status

func (*SSEClient) OnEvent

func (c *SSEClient) OnEvent(eventType string, callback EventCallback)

OnEvent register event callback

func (*SSEClient) Wait

func (c *SSEClient) Wait() <-chan struct{}

Wait returns a channel that will be closed when the client is disconnected.

type SafeMap

type SafeMap struct {
	// contains filtered or unexported fields
}

SafeMap goroutine security Map structure encapsulating sync.Map

func NewSafeMap

func NewSafeMap() *SafeMap

NewSafeMap creates a new SafeMap

func (*SafeMap) Clear

func (sm *SafeMap) Clear()

Clear all key-value pairs

func (*SafeMap) Delete

func (sm *SafeMap) Delete(uid string)

Delete key-value pair

func (*SafeMap) Get

func (sm *SafeMap) Get(uid string) (*UserClient, bool)

Get value by key

func (*SafeMap) Has

func (sm *SafeMap) Has(uid string) bool

Has checked if key exists

func (*SafeMap) Keys

func (sm *SafeMap) Keys() []string

Keys get all keys

func (*SafeMap) Len

func (sm *SafeMap) Len() int

Len Gets the number of elements in Map Note: Due to the nature of sync.Map, this operation is O(n) complex

func (*SafeMap) Range

func (sm *SafeMap) Range(f func(key, value interface{}) bool)

Range all key-value pairs

func (*SafeMap) Set

func (sm *SafeMap) Set(uid string, client *UserClient)

Set store key-value pairs

func (*SafeMap) Values

func (sm *SafeMap) Values() []*UserClient

Values get all values

type ServeOption

type ServeOption func(*serveOptions)

ServeOption is an option for Serve

func WithServeExtraHeaders

func WithServeExtraHeaders(headers map[string]string) ServeOption

WithServeExtraHeaders sets extra headers to be sent with the response.

type Store

type Store interface {
	Save(ctx context.Context, e *Event) error
	// ListByLastID list events by event type and last id, return events and last id, if no more events, return empty slice and last id ""
	ListByLastID(ctx context.Context, eventType string, lastID string, pageSize int) ([]*Event, string, error)
}

Store defines the interface for storing and retrieving events

type UserClient

type UserClient struct {
	UID  string // user id
	Send chan *Event
	// contains filtered or unexported fields
}

UserClient information

type UserEvent

type UserEvent struct {
	UID   string `json:"uid"`
	Event *Event `json:"event"`
}

UserEvent user event

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL