2
\$\begingroup\$

Listening to messages from the udp socket, I would like to somehow determine where packets come from and scatter in sessions to get a more detailed report on the received data, I just did it forehead, looking for the current session and recording in its channel, I would like to know if more elegant methods? full code

    func serve(ctx context.Context, addr string, port int) <-chan Message {
type session struct {
    id           int
    conn         *net.UDPAddr
    len          int64
    countMessage int64
    expiration   int64
    buffer       chan []byte
    run          func(wg *sync.WaitGroup, in chan []byte, ip string)
}

var (
    out         = make(chan Message, 64)
    done        = make(chan error, 1)
    wg          sync.WaitGroup
    localAddres = &net.UDPAddr{IP: net.ParseIP(addr), Port: port}
    bufPool     = sync.Pool{New: func() interface{} { return make([]byte, bufferSize) }}
    sessions    []*session

    // TODO может будет не линейный поиск, пока посмотрим так
    getSession = func(addr *net.UDPAddr) (*session, bool) {
        for _, s := range sessions {
            if reflect.DeepEqual(s.conn, addr) {
                return s, true
            }
        }
        return nil, false
    }
    addSession = func(s *session) {
        fmt.Printf("Add new session %s:%d id: %d \n", s.conn.IP.String(), s.conn.Port, s.id)
        sessions = append(sessions, s)
    }
    removeSession = func(sess *session) {
        for i, s := range sessions {
            if s.id == sess.id {
                fmt.Printf("Remove session %s:%d id: %d \n", s.conn.IP.String(), s.conn.Port, s.id)
                sessions = sessions[:i+copy(sessions[i:], sessions[i+1:])]
            }
        }
    }
    gc = func() {
        for {
            <-time.After(time.Duration(3 * time.Second))
            for _, s := range sessions {
                if time.Now().UnixNano() > s.expiration && s.expiration > 0 {
                    removeSession(s)
                }
            }
        }
    }
)
go gc()
go func() {
    pc, err := net.ListenUDP("udp", localAddres)
    if err != nil {
        done <- err
    }
    defer pc.Close()
    go func() {
        for {
            buff := bufPool.Get().([]byte)
            size, addr, err := pc.ReadFromUDP(buff[0:])
            if err != nil {
                done <- err
                return
            }
            switch s, ok := getSession(addr); ok {
            case true:
                s.buffer <- buff[0:size]
                bufPool.Put(buff)
                s.expiration = time.Now().Add(time.Duration(time.Second * 10)).UnixNano()
                atomic.AddInt64(&s.countMessage, 1)
                atomic.AddInt64(&s.len, int64(size))
            case false:
                s := &session{
                    id:         rand.Int(),
                    conn:       addr,
                    expiration: time.Now().UnixNano(),
                    buffer:     make(chan []byte, 64),
                    run: func(wg *sync.WaitGroup, in chan []byte, ip string) {
                        for b := range in {
                            var m Message
                            err := json.Unmarshal(b, &m)
                            if err != nil {
                                log.Fatal(err)
                                continue
                            }
                            m.Device_ip = ip
                            out <- m
                        }
                    },
                }
                wg.Add(1)
                s.buffer <- buff[0:size]
                bufPool.Put(buff)
                atomic.AddInt64(&s.countMessage, 1)
                atomic.AddInt64(&s.len, int64(size))
                go s.run(&wg, s.buffer, s.conn.IP.String())
                addSession(s)
            }
        }
    }()
    select {
    case <-ctx.Done():
        wg.Wait()
        log.Println("cancelled")
        err = ctx.Err()
    case err = <-done:
        panic(err)
    }
}()

return out
}
\$\endgroup\$
4
  • \$\begingroup\$ your code won't compile! your code is racy in gc(). given those errors are fixed i m fine with this code. elegance is not that important when the code is not functional. lets fix it at first. \$\endgroup\$ Commented Apr 7, 2020 at 12:28
  • \$\begingroup\$ @mh-cbon I edited the code and made a link to pastebin \$\endgroup\$ Commented Apr 8, 2020 at 5:31
  • \$\begingroup\$ Дмитрий, please include the code into the question itself. Links can rot, questions have to be self-supporting. \$\endgroup\$ Commented Apr 8, 2020 at 8:10
  • 1
    \$\begingroup\$ @Mast insert full code for function. \$\endgroup\$ Commented Apr 8, 2020 at 9:43

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.