Wait with timeout

This commit is contained in:
Joren 2024-06-16 15:27:46 +02:00
parent ff0bff0eb1
commit d7dab5e41d
Signed by: Joren
GPG Key ID: 280E33DFBC0F1B55
2 changed files with 53 additions and 29 deletions

View File

@ -4,7 +4,6 @@ import (
"sync" "sync"
) )
type RingBuffer struct { type RingBuffer struct {
buffer []string buffer []string
size int size int
@ -13,7 +12,6 @@ type RingBuffer struct {
mu sync.Mutex mu sync.Mutex
} }
func NewRingBuffer(size int) *RingBuffer { func NewRingBuffer(size int) *RingBuffer {
return &RingBuffer{ return &RingBuffer{
buffer: make([]string, size), buffer: make([]string, size),
@ -23,7 +21,6 @@ func NewRingBuffer(size int) *RingBuffer {
} }
} }
func (rb *RingBuffer) Add(item string) { func (rb *RingBuffer) Add(item string) {
rb.mu.Lock() rb.mu.Lock()
defer rb.mu.Unlock() defer rb.mu.Unlock()
@ -40,7 +37,6 @@ func (rb *RingBuffer) Add(item string) {
} }
} }
func (rb *RingBuffer) GetLast() (string, int) { func (rb *RingBuffer) GetLast() (string, int) {
rb.mu.Lock() rb.mu.Lock()
defer rb.mu.Unlock() defer rb.mu.Unlock()
@ -52,19 +48,28 @@ func (rb *RingBuffer) GetLast() (string, int) {
return rb.buffer[rb.tail], rb.tail return rb.buffer[rb.tail], rb.tail
} }
func (rb *RingBuffer) WaitForNewItem() string { func (rb *RingBuffer) WaitForNewItem() string {
var newItem string rb.mu.Lock()
defer rb.mu.Unlock()
for { if rb.tail == -1 {
rb.mu.Lock() return ""
if rb.head != -1 && rb.tail != -1 && rb.buffer[rb.tail] != newItem {
newItem = rb.buffer[rb.tail]
rb.mu.Unlock()
break
}
rb.mu.Unlock()
} }
return newItem
return rb.buffer[rb.tail]
}
func (rb *RingBuffer) IsEmpty() bool {
rb.mu.Lock()
defer rb.mu.Unlock()
return rb.head == -1
}
func (rb *RingBuffer) GetLastPosition() int {
rb.mu.Lock()
defer rb.mu.Unlock()
return rb.tail
} }

View File

@ -4,23 +4,21 @@ import (
"encoding/json" "encoding/json"
"net/http" "net/http"
"sync" "sync"
"time"
"SmsHook/ringbuffer" "SmsHook/ringbuffer"
) )
type WebhookServer struct { type WebhookServer struct {
buffer *ringbuffer.RingBuffer buffer *ringbuffer.RingBuffer
cond *sync.Cond cond *sync.Cond
mux *http.ServeMux mux *http.ServeMux
} }
type WebhookPayload struct { type WebhookPayload struct {
Content string `json:"content"` Content string `json:"content"`
} }
func NewWebhookServer(bufferSize int) *WebhookServer { func NewWebhookServer(bufferSize int) *WebhookServer {
mutex := sync.Mutex{} mutex := sync.Mutex{}
server := &WebhookServer{ server := &WebhookServer{
@ -33,7 +31,6 @@ func NewWebhookServer(bufferSize int) *WebhookServer {
return server return server
} }
func (s *WebhookServer) webhookHandler(w http.ResponseWriter, r *http.Request) { func (s *WebhookServer) webhookHandler(w http.ResponseWriter, r *http.Request) {
var payload WebhookPayload var payload WebhookPayload
@ -47,12 +44,10 @@ func (s *WebhookServer) webhookHandler(w http.ResponseWriter, r *http.Request) {
s.cond.Broadcast() s.cond.Broadcast()
} }
func (s *WebhookServer) GetLastItem() (string, int) { func (s *WebhookServer) GetLastItem() (string, int) {
return s.buffer.GetLast() return s.buffer.GetLast()
} }
func (s *WebhookServer) WaitForNewItem() string { func (s *WebhookServer) WaitForNewItem() string {
s.cond.L.Lock() s.cond.L.Lock()
s.cond.Wait() s.cond.Wait()
@ -61,30 +56,54 @@ func (s *WebhookServer) WaitForNewItem() string {
return newItem return newItem
} }
func (s *WebhookServer) WaitForNewItemWithTimeout(timeout time.Duration) (string, bool) {
s.cond.L.Lock()
defer s.cond.L.Unlock()
initialPos := s.buffer.GetLastPosition()
timedOut := false
timer := time.AfterFunc(timeout, func() {
s.cond.Broadcast()
timedOut = true
})
for s.buffer.GetLastPosition() == initialPos {
if timedOut {
return "", false
}
s.cond.Wait()
}
timer.Stop()
newItem := s.buffer.WaitForNewItem()
return newItem, true
}
func (s *WebhookServer) Start(address string) error { func (s *WebhookServer) Start(address string) error {
return http.ListenAndServe(address, s.mux) return http.ListenAndServe(address, s.mux)
} }
var server *WebhookServer var Server *WebhookServer
func Init(bufferSize int, address string) { func Init(bufferSize int, address string) {
server = NewWebhookServer(bufferSize) Server = NewWebhookServer(bufferSize)
go func() { go func() {
if err := server.Start(address); err != nil { if err := Server.Start(address); err != nil {
panic(err) panic(err)
} }
}() }()
} }
func GetLast() (string, int) { func GetLast() (string, int) {
return server.GetLastItem() return Server.GetLastItem()
} }
func WaitForNew() string { func WaitForNew() string {
return server.WaitForNewItem() return Server.WaitForNewItem()
}
func WaitForNewWithTimeout(timeout time.Duration) (string, bool) {
return Server.WaitForNewItemWithTimeout(timeout)
} }