101 lines
2.4 KiB
Go
101 lines
2.4 KiB
Go
package node
|
|
|
|
import (
|
|
"errors"
|
|
)
|
|
|
|
type Node struct {
|
|
id NodeId
|
|
transport Transport
|
|
sequenceNumber uint32
|
|
}
|
|
|
|
func New(id NodeId, transport Transport) *Node {
|
|
return &Node{
|
|
id: id,
|
|
transport: transport,
|
|
}
|
|
}
|
|
|
|
type NodeId uint8
|
|
|
|
func (id NodeId) IsBroadcast() bool {
|
|
return id == 0xFF
|
|
}
|
|
|
|
func (b *Node) Receive(timeout uint32) (NodeId, uint32, []byte, error) {
|
|
data, err := b.transport.Rx(timeout)
|
|
if err != nil {
|
|
return 0, 0, nil, err
|
|
}
|
|
if len(data) < 6 {
|
|
return 0, 0, nil, errors.New("invalid message length")
|
|
}
|
|
sourceId := NodeId(data[0])
|
|
targetId := NodeId(data[1])
|
|
if targetId != b.id && !targetId.IsBroadcast() {
|
|
return 0, 0, nil, nil
|
|
}
|
|
seq := uint32(data[2])<<24 | uint32(data[3])<<16 | uint32(data[4])<<8 | uint32(data[5])
|
|
payload := data[6:]
|
|
return sourceId, seq, payload, nil
|
|
}
|
|
|
|
func (b *Node) ReceiveWithAck(timeout uint32) (NodeId, uint32, []byte, error) {
|
|
data, err := b.transport.Rx(timeout)
|
|
if err != nil {
|
|
return 0, 0, nil, err
|
|
}
|
|
if len(data) < 6 {
|
|
return 0, 0, nil, errors.New("invalid message length")
|
|
}
|
|
sourceId := NodeId(data[0])
|
|
targetId := NodeId(data[1])
|
|
if targetId != b.id && !targetId.IsBroadcast() {
|
|
return 0, 0, nil, errors.New("message not intended for this node")
|
|
}
|
|
seq := uint32(data[2])<<24 | uint32(data[3])<<16 | uint32(data[4])<<8 | uint32(data[5])
|
|
payload := data[6:]
|
|
ackMsg := make([]byte, 6)
|
|
ackMsg[0] = byte(b.id)
|
|
ackMsg[1] = byte(sourceId)
|
|
ackMsg[2] = byte(seq >> 24)
|
|
ackMsg[3] = byte(seq >> 16)
|
|
ackMsg[4] = byte(seq >> 8)
|
|
ackMsg[5] = byte(seq)
|
|
err = b.transport.Tx(ackMsg, timeout)
|
|
if err != nil {
|
|
return 0, 0, nil, err
|
|
}
|
|
return sourceId, seq, payload, nil
|
|
}
|
|
|
|
func (b *Node) SendWithAck(target NodeId, payload []byte, timeout uint32) error {
|
|
if target.IsBroadcast() {
|
|
return errors.New("cannot send with ack to broadcast address")
|
|
}
|
|
msg := make([]byte, 6)
|
|
msg[0] = byte(b.id)
|
|
msg[1] = byte(target)
|
|
sentSeq := b.sequenceNumber
|
|
msg[2] = byte(sentSeq >> 24)
|
|
msg[3] = byte(sentSeq >> 16)
|
|
msg[4] = byte(sentSeq >> 8)
|
|
msg[5] = byte(sentSeq)
|
|
b.sequenceNumber++
|
|
msg = append(msg, payload...)
|
|
b.transport.Tx(msg, timeout)
|
|
|
|
sender, receivedSeq, _, err := b.Receive(timeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if sender != target {
|
|
return errors.New("received ACK from unexpected sender")
|
|
}
|
|
if receivedSeq != sentSeq {
|
|
return errors.New("received ACK with unexpected sequence number")
|
|
}
|
|
return nil
|
|
}
|