/*
 * Copyright (c) 2021 IBM Corp and others.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v2.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    https://www.eclipse.org/legal/epl-2.0/
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Seth Hoenig
 *    Allan Stockdill-Mander
 *    Mike Robertson
 */

package mqtt

import (
	"container/list"
	"strings"
	"sync"

	"github.com/eclipse/paho.mqtt.golang/packets"
)

// route is a type which associates MQTT Topic strings with a
// callback to be executed upon the arrival of a message associated
// with a subscription to that topic.
type route struct {
	topic    string
	callback MessageHandler
}

// match takes a slice of strings which represent the route being tested having been split on '/'
// separators, and a slice of strings representing the topic string in the published message, similarly
// split.
// The function determines if the topic string matches the route according to the MQTT topic rules
// and returns a boolean of the outcome
func match(route []string, topic []string) bool {
	if len(route) == 0 {
		return len(topic) == 0
	}

	if len(topic) == 0 {
		return route[0] == "#"
	}

	if route[0] == "#" {
		return true
	}

	if (route[0] == "+") || (route[0] == topic[0]) {
		return match(route[1:], topic[1:])
	}
	return false
}

func routeIncludesTopic(route, topic string) bool {
	return match(routeSplit(route), strings.Split(topic, "/"))
}

// removes $share and sharename when splitting the route to allow
// shared subscription routes to correctly match the topic
func routeSplit(route string) []string {
	var result []string
	if strings.HasPrefix(route, "$share") {
		result = strings.Split(route, "/")[2:]
	} else {
		result = strings.Split(route, "/")
	}
	return result
}

// match takes the topic string of the published message and does a basic compare to the
// string of the current Route, if they match it returns true
func (r *route) match(topic string) bool {
	return r.topic == topic || routeIncludesTopic(r.topic, topic)
}

type router struct {
	sync.RWMutex
	routes         *list.List
	defaultHandler MessageHandler
	messages       chan *packets.PublishPacket
}

// newRouter returns a new instance of a Router and channel which can be used to tell the Router
// to stop
func newRouter() *router {
	router := &router{routes: list.New(), messages: make(chan *packets.PublishPacket)}
	return router
}

// addRoute takes a topic string and MessageHandler callback. It looks in the current list of
// routes to see if there is already a matching Route. If there is it replaces the current
// callback with the new one. If not it add a new entry to the list of Routes.
func (r *router) addRoute(topic string, callback MessageHandler) {
	r.Lock()
	defer r.Unlock()
	for e := r.routes.Front(); e != nil; e = e.Next() {
		if e.Value.(*route).topic == topic {
			r := e.Value.(*route)
			r.callback = callback
			return
		}
	}
	r.routes.PushBack(&route{topic: topic, callback: callback})
}

// deleteRoute takes a route string, looks for a matching Route in the list of Routes. If
// found it removes the Route from the list.
func (r *router) deleteRoute(topic string) {
	r.Lock()
	defer r.Unlock()
	for e := r.routes.Front(); e != nil; e = e.Next() {
		if e.Value.(*route).topic == topic {
			r.routes.Remove(e)
			return
		}
	}
}

// setDefaultHandler assigns a default callback that will be called if no matching Route
// is found for an incoming Publish.
func (r *router) setDefaultHandler(handler MessageHandler) {
	r.Lock()
	defer r.Unlock()
	r.defaultHandler = handler
}

// matchAndDispatch takes a channel of Message pointers as input and starts a go routine that
// takes messages off the channel, matches them against the internal route list and calls the
// associated callback (or the defaultHandler, if one exists and no other route matched). If
// anything is sent down the stop channel the function will end.
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
	var wg sync.WaitGroup
	ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
	var ackInChan chan *PacketAndToken       // ACKs generated by ackFunc get put onto this channel

	stopAckCopy := make(chan struct{})    // Closure requests stop of go routine copying ackInChan to ackOutChan
	ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
	goRoutinesDone := make(chan struct{}) // closed on wg.Done()
	if order {
		ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
	} else {
		// When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
		ackInChan = make(chan *PacketAndToken)
		go func() { // go routine to copy from ackInChan to ackOutChan until stopped
			for {
				select {
				case a := <-ackInChan:
					ackOutChan <- a
				case <-stopAckCopy:
					close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
					for {
						select {
						case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
							DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
						case <-goRoutinesDone:
							close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
							DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
							return
						}
					}
				}
			}
		}()
	}

	go func() { // Main go routine handling inbound messages
		for message := range messages {
			// DEBUG.Println(ROU, "matchAndDispatch received message")
			sent := false
			r.RLock()
			m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
			var handlers []MessageHandler
			for e := r.routes.Front(); e != nil; e = e.Next() {
				if e.Value.(*route).match(message.TopicName) {
					if order {
						handlers = append(handlers, e.Value.(*route).callback)
					} else {
						hd := e.Value.(*route).callback
						wg.Add(1)
						go func() {
							hd(client, m)
							if !client.options.AutoAckDisabled {
								m.Ack()
							}
							wg.Done()
						}()
					}
					sent = true
				}
			}
			if !sent {
				if r.defaultHandler != nil {
					if order {
						handlers = append(handlers, r.defaultHandler)
					} else {
						wg.Add(1)
						go func() {
							r.defaultHandler(client, m)
							if !client.options.AutoAckDisabled {
								m.Ack()
							}
							wg.Done()
						}()
					}
				} else {
					DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.")
				}
			}
			r.RUnlock()
			for _, handler := range handlers {
				handler(client, m)
				if !client.options.AutoAckDisabled {
					m.Ack()
				}
			}
			// DEBUG.Println(ROU, "matchAndDispatch handled message")
		}
		if order {
			close(ackOutChan)
		} else { // Ensure that nothing further will be written to ackOutChan before closing it
			close(stopAckCopy)
			<-ackCopyStopped
			close(ackOutChan)
			go func() {
				wg.Wait() // Note: If this remains running then the user has handlers that are not returning
				close(goRoutinesDone)
			}()
		}
		DEBUG.Println(ROU, "matchAndDispatch exiting")
	}()
	return ackOutChan
}