Skip to content

Commit

Permalink
Remove async and retry; replace by connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Sébastien GLON committed Sep 6, 2016
1 parent 002ce83 commit 435858f
Show file tree
Hide file tree
Showing 8 changed files with 461 additions and 354 deletions.
17 changes: 12 additions & 5 deletions examples/flume/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func main() {
//t.SkipNow()
transceiver,err := netty.NewTransceiver(netty.Config{AsyncConnect:true, NettyHost:"10.98.80.113"})
transceiver,err := netty.NewTransceiver(netty.Config{AsyncConnect:false, NettyHost:"192.168.11.152"})
if err != nil {
log.Fatal(err)
}
Expand All @@ -24,20 +24,27 @@ func main() {
headers := make(map[string]interface{})
headers["host_header"] = "127.0.0.1"
flumeRecord.Set("headers", headers)
flumeRecord.Set("body", []byte("2016-08-02 14:45:38|flume.composantTechnique_IS_UNDEFINED|flume.application_IS_UNDEFINED|flume.client_IS_UNDEFINED|flume.plateforme_IS_UNDEFINED|instance_IS_UNDEFINED|logname_IS_UNDEFINED|WARN |test.LogGenerator|test !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"))
requestor := goavro.NewRequestor(protocol, transceiver)

requestor := goavro.NewRequestor(protocol, transceiver)

flumeRecord.Set("body", []byte("test 1"))
err = requestor.Request("append", flumeRecord)

if err != nil {
log.Fatal("Request: ", err)
log.Fatal("Request 1: ", err)
}

log.Printf("Test 1 OK")


time.Sleep(5 * time.Second)
flumeRecord.Set("body", []byte("test 2"))
err = requestor.Request("append", flumeRecord)

if err != nil {
log.Fatal("Request: ", err)
log.Fatal("Request 2: ", err)
}
log.Printf("Test 2 OK")

}

49 changes: 22 additions & 27 deletions requestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@ func init() {
}

func NewRequestor(localProto Protocol, transceiver transceiver.Transceiver) *Requestor {
return &Requestor{

r := &Requestor{
local_protocol: localProto,
transceiver: transceiver,
// remote_protocol: nil,
// remote_hash: nil,
send_protocol: false,
send_handshake: true,
}
transceiver.InitHandshake(r.write_handshake_request, r.read_handshake_response)
return r
}


Expand All @@ -72,12 +75,7 @@ func (a *Requestor) Request(message_name string, request_datum interface{}) er
frame1 := new(bytes.Buffer)
frame2 := new(bytes.Buffer)

err := a.write_handshake_request(frame1)
if err!=nil {
return err
}

err = a.write_call_requestHeader(message_name, frame1)
err := a.write_call_requestHeader(message_name, frame1)
if err!=nil {
return err
}
Expand All @@ -91,33 +89,24 @@ func (a *Requestor) Request(message_name string, request_datum interface{}) er
responses, err := a.transceiver.Transceive(buffer_writers)

if err!=nil {
return err
return fmt.Errorf("Fail to transceive %v", err)
}
//buffer_decoder := bytes.NewBuffer(decoder)
// process the handshake and call response

if len(responses) >0 {
ok, err := a.read_handshake_response(responses[0])
a.read_call_responseCode(responses[1])
if err != nil {
return err
}
a.send_handshake = !ok

if ok {
a.read_call_responseCode(responses[1])
if err != nil {
return err
}
// a.Request(message_name, request_datum)
}
// a.Request(message_name, request_datum)
}
return nil
}

func (a *Requestor) write_handshake_request( buffer io.Writer ) (err error) {
if !a.send_handshake {
return nil
}
func (a *Requestor) write_handshake_request() (handshake []byte ,err error) {
buffer := new(bytes.Buffer)
defer buffer.Write(handshake)
local_hash :=a.local_protocol.MD5
remote_name := a.remote_protocol.Name
remote_hash := REMOTE_HASHES[remote_name]
Expand All @@ -128,30 +117,36 @@ func (a *Requestor) write_handshake_request( buffer io.Writer ) (err error) {

record, err := NewRecord(RecordSchema(handshakeRequestshema))
if err != nil {
return fmt.Errorf("Avro fail to init record handshakeRequest %v",err)
err = fmt.Errorf("Avro fail to init record handshakeRequest %v",err)
return
}

record.Set("clientHash", local_hash)
record.Set("serverHash", remote_hash)
record.Set("meta", make(map[string]interface{}))
codecHandshake, err := NewCodec(handshakeRequestshema)
if err != nil {
return fmt.Errorf("Avro fail to get codec handshakeRequest %v",err)
err = fmt.Errorf("Avro fail to get codec handshakeRequest %v",err)
return
}

if a.send_protocol {
json, err := a.local_protocol.Json()
if err!=nil {
return err
return nil ,err
}
record.Set("clientProtocol", json)
}



if err = codecHandshake.Encode(buffer, record); err !=nil {
return fmt.Errorf("Encode handshakeRequest ",err)
err = fmt.Errorf("Encode handshakeRequest ",err)
return
}

return nil

return
}

func (a *Requestor) write_call_request(message_name string, request_datum interface{}, frame io.Writer) (err error) {
Expand Down
Loading

0 comments on commit 435858f

Please sign in to comment.