122 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			122 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
|  *
 | |
|  * Copyright 2023 gRPC authors.
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  *
 | |
|  */
 | |
| 
 | |
| package grpcsync
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"sync"
 | |
| )
 | |
| 
 | |
| // Subscriber represents an entity that is subscribed to messages published on
 | |
| // a PubSub. It wraps the callback to be invoked by the PubSub when a new
 | |
| // message is published.
 | |
| type Subscriber interface {
 | |
| 	// OnMessage is invoked when a new message is published. Implementations
 | |
| 	// must not block in this method.
 | |
| 	OnMessage(msg any)
 | |
| }
 | |
| 
 | |
| // PubSub is a simple one-to-many publish-subscribe system that supports
 | |
| // messages of arbitrary type. It guarantees that messages are delivered in
 | |
| // the same order in which they were published.
 | |
| //
 | |
| // Publisher invokes the Publish() method to publish new messages, while
 | |
| // subscribers interested in receiving these messages register a callback
 | |
| // via the Subscribe() method.
 | |
| //
 | |
| // Once a PubSub is stopped, no more messages can be published, but any pending
 | |
| // published messages will be delivered to the subscribers.  Done may be used
 | |
| // to determine when all published messages have been delivered.
 | |
| type PubSub struct {
 | |
| 	cs *CallbackSerializer
 | |
| 
 | |
| 	// Access to the below fields are guarded by this mutex.
 | |
| 	mu          sync.Mutex
 | |
| 	msg         any
 | |
| 	subscribers map[Subscriber]bool
 | |
| }
 | |
| 
 | |
| // NewPubSub returns a new PubSub instance.  Users should cancel the
 | |
| // provided context to shutdown the PubSub.
 | |
| func NewPubSub(ctx context.Context) *PubSub {
 | |
| 	return &PubSub{
 | |
| 		cs:          NewCallbackSerializer(ctx),
 | |
| 		subscribers: map[Subscriber]bool{},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Subscribe registers the provided Subscriber to the PubSub.
 | |
| //
 | |
| // If the PubSub contains a previously published message, the Subscriber's
 | |
| // OnMessage() callback will be invoked asynchronously with the existing
 | |
| // message to begin with, and subsequently for every newly published message.
 | |
| //
 | |
| // The caller is responsible for invoking the returned cancel function to
 | |
| // unsubscribe itself from the PubSub.
 | |
| func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
 | |
| 	ps.mu.Lock()
 | |
| 	defer ps.mu.Unlock()
 | |
| 
 | |
| 	ps.subscribers[sub] = true
 | |
| 
 | |
| 	if ps.msg != nil {
 | |
| 		msg := ps.msg
 | |
| 		ps.cs.TrySchedule(func(context.Context) {
 | |
| 			ps.mu.Lock()
 | |
| 			defer ps.mu.Unlock()
 | |
| 			if !ps.subscribers[sub] {
 | |
| 				return
 | |
| 			}
 | |
| 			sub.OnMessage(msg)
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	return func() {
 | |
| 		ps.mu.Lock()
 | |
| 		defer ps.mu.Unlock()
 | |
| 		delete(ps.subscribers, sub)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Publish publishes the provided message to the PubSub, and invokes
 | |
| // callbacks registered by subscribers asynchronously.
 | |
| func (ps *PubSub) Publish(msg any) {
 | |
| 	ps.mu.Lock()
 | |
| 	defer ps.mu.Unlock()
 | |
| 
 | |
| 	ps.msg = msg
 | |
| 	for sub := range ps.subscribers {
 | |
| 		s := sub
 | |
| 		ps.cs.TrySchedule(func(context.Context) {
 | |
| 			ps.mu.Lock()
 | |
| 			defer ps.mu.Unlock()
 | |
| 			if !ps.subscribers[s] {
 | |
| 				return
 | |
| 			}
 | |
| 			s.OnMessage(msg)
 | |
| 		})
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Done returns a channel that is closed after the context passed to NewPubSub
 | |
| // is canceled and all updates have been sent to subscribers.
 | |
| func (ps *PubSub) Done() <-chan struct{} {
 | |
| 	return ps.cs.Done()
 | |
| }
 |