天天看点

network.c

<a href="https://github.com/felipecruz/rio/blob/master/src/network.c">https://github.com/felipecruz/rio/blob/master/src/network.c</a>

#include "network.h"

#include "buffer.h"

#include "websocket.h"

int eag

= 0;

static int

    on_message(http_parser

*parser)

{

    return 0;

}

    on_path(http_parser

*parser,

const char *at,

size_t len)

    rio_client

*client = parser-&gt;data;

    client-&gt;path

= malloc(sizeof(char)

* (len+1));

    if (client-&gt;path

== NULL)

        error_exit("Malloc");

    }

    strncpy(client-&gt;path,

at, len);

    client-&gt;path[len]

= '\0';

    client-&gt;method

= (unsigned

char) parser-&gt;method;

    debug_print("HTTP-REQ method: %d\n",

(int)

client-&gt;method);

    debug_print("HTTP-REQ Path: %s %d\n",

client-&gt;path,

len);

int on_header_field(http_parser

int on_header_value(http_parser

int on_headers_complete(http_parser

int on_body(http_parser

int on_message_complete(http_parser

http_parser_settings

parser_settings =

    on_message,

    on_path,

    on_header_field,

    on_header_value,

    on_headers_complete,

    on_body,

    on_message_complete

};

void

    handle_write(rio_worker

*worker,

rio_client *cli,

char* resp)

    struct epoll_event

event;

    int s,

ret;

    khiter_t k;

    cli-&gt;buffer

= new_rio_buffer_size(strlen(resp));

    rio_buffer_copy_data(cli-&gt;buffer,

resp, strlen(resp));

    free(resp);

    debug_print("Handle Write: %d : %s\n",

cli-&gt;fd,

                                rio_buffer_get_data(cli-&gt;buffer));

    event.events

= EPOLLOUT;

    event.data.fd

= cli-&gt;fd;

    if (epoll_ctl(worker-&gt;epoll_fd,

EPOLL_CTL_MOD, cli-&gt;fd,

&amp;event)

== -1)

        debug_print("Error on epoll_ctl_mod on %d\n",

                                                cli-&gt;fd,

worker-&gt;epoll_fd);

    k =

kh_put(clients,

h, cli-&gt;fd

, &amp;ret);

    kh_value(h,

k) =

*cli;

    do_write(rio_worker

struct epoll_event

*event)

    int sent;

    debug_print("Do Write to fd: %d : %s\n",

                                    rio_buffer_get_data(cli-&gt;buffer));

    do {

        sent

= send(cli-&gt;fd,

                    rio_buffer_get_data(cli-&gt;buffer),

                    cli-&gt;buffer-&gt;length,

                    MSG_DONTWAIT);

        if (sent

&lt; 0 &amp;&amp;

errno != EAGAIN)

            debug_print("Do Write: send error on fd: %d errno: %d\n",

                        cli-&gt;fd,

errno);

            break;

        } else

if (sent

&lt; 0 &amp;&amp; errno

== EAGAIN)

             debug_print("Do Write: EAGAIN on fd: %d\n",

cli-&gt;fd);

             break;

&gt; 0) {

            rio_buffer_adjust(cli-&gt;buffer,

sent);

        }

    } while

(sent &gt;

0 &amp;&amp; rio_buffer_get_data(cli-&gt;buffer)

!= NULL);

    debug_print("Do Write sent: %d strlen: %zu\n",

sent, cli-&gt;buffer-&gt;length);

    remove_and_close(cli,

worker, event);

int

    handle_read(rio_worker

*ev)

    size_t len

= 4096;

    ssize_t

received = 0;

total_received = 0;

    //allocate space for data

= new_rio_buffer_size(sizeof(char)

* 4096);

    debug_print("Handle Read from %d\n",

        received

= recv(ev-&gt;data.fd,

cli-&gt;buffer-&gt;content,

len, MSG_DONTWAIT);

        if (received

&lt; 0)

            if

(errno != EAGAIN

&amp;&amp; errno !=

EWOULDBLOCK) {

                if

(received ==

0) {

                    //if error, remove from epoll and close socket

                    debug_print("Client received error: disconnected"

                                "errno %d\n",

                }

else {

                    debug_print("Some other error %d\n",

                //handle_http will take care of this :)

            }

                //received += 1;

                debug_print("EAGAIN on recv from fd: %d\n",

                //if EAGAIN, insert on epoll again

                ev-&gt;events

= EPOLLIN |

EPOLLET;

                //add socket to epoll

(epoll_ctl(worker-&gt;epoll_fd,

                                EPOLL_CTL_MOD,

                                cli-&gt;fd,

                                ev)

                    error_exit("Could not add conn_sock to epoll");

                eag

+= 1;

                printf("EAGAIN %d\n",

eag);

if (received

== 0)

            //client disconnected

            return

0;

            total_received

+= received;

            debug_print("READ AGAIN on %d\n",

(received &gt;

    debug_print("Total received %zu\n",

total_received);

    cli-&gt;buffer-&gt;length

= total_received;

    return received;

    remove_and_close(rio_client

*client,

                     rio_worker

                     struct

epoll_event *event)

    int rc

= epoll_ctl(worker-&gt;epoll_fd,

EPOLL_CTL_DEL, client-&gt;fd,

event);

    if (rc

       debug_print("[WARNING] on epoll_ctl_del on %d\n",

                   client-&gt;fd,

    if (close(client-&gt;fd)

        debug_print("Error on close client %d\n",

                                            client-&gt;fd,

    if (client-&gt;buffer

!= NULL)

        rio_buffer_free(&amp;client-&gt;buffer);

    return rc;

    handle_http(rio_worker

struct epoll_event event,

rio_client *cli)

    int response;

    char buf[4096];

    char resp[1024];

    size_t n;

    if (event.events

&amp; EPOLLIN)

        //handle read

        int

received = handle_read(worker,

cli, &amp;event);

        //create http parser

        http_parser

*parser = malloc(sizeof(http_parser));

        if (!parser){

            error_exit("malloc error: http_parser");

        http_parser_init(parser,

HTTP_REQUEST);

        //set parser data

        parser-&gt;data

= (void*)cli;

        //execute http parsing only if data was read

&gt; 0)

            debug_print("Execute http parsing client: %d\n",

            n

= http_parser_execute(parser,

                                    &amp;parser_settings,

                                    rio_buffer_get_data(cli-&gt;buffer),

                                    received);

        if (parser-&gt;upgrade)

            //#TODO: what to do?

{ // client disconnected!

            debug_print("Client %d Disconnected!\n",

            //delete fd from epoll and close

            remove_and_close(cli,

worker, &amp;event);

            free(parser);

            return;

if (n

!= received)

            debug_print("Error parsing, closing socket n:%zu received:%d\n",

                        n,

received);

        rio_buffer_free(&amp;cli-&gt;buffer);

        response

= dispatch(cli,

cli-&gt;path);

        if (response

!= DISPATCH_FINISHED)

            //write response

            debug_print("Async Dispatch to fd: %d\n",

        debug_print("Freeing %s\n",

        free(cli-&gt;path);

        cli-&gt;path

= NULL;

        free(parser);

    } else

if (event.events

&amp; EPOLLOUT)

        //if socket is ready to write, do it!

        do_write(worker,

    init_clients()

   h =

kh_init(clients);

    free_clients()

    khiter_t

element;

    debug_print("Closing clients structures\n",

h);

    for (element

= kh_begin(h);

element != kh_end(h);

++element)

        if (kh_exist(h,

element)) {

            debug_print("%d\n",

((rio_client)

kh_val(h,

element)).fd);

            kh_del(clients,

h, element);

    kh_destroy(clients,

    socket_bind()

    int server_fd;

    int arg;

    struct sockaddr_in

sin;

    //bind

    memset(&amp;sin,

0, sizeof(struct

sockaddr_in));

    sin.sin_family

= AF_INET;

    sin.sin_port

= htons(80);

    sin.sin_addr.s_addr

= inet_addr("0.0.0.0");

    //create socket

    if ((server_fd

= socket(AF_INET,

SOCK_STREAM, 0))

        error_exit("Could not create socket.");

    //set socket non-blocking

    if (fcntl(server_fd,

F_SETFL, O_NONBLOCK)

        error_exit("Could not set socket non-blocking");

    //set socket options

    arg =

1;

    if (setsockopt

(server_fd,

SOL_SOCKET, SO_REUSEADDR,

&amp;arg,

sizeof(arg))

        error_exit("Socket options");

    //bind socket to local addr

    if (bind(server_fd,

(struct sockaddr

*)&amp;sin,

sizeof(sin))

        error_exit("bind");

    //listen on this socket

    if (listen(server_fd,

MAX_EVENTS) &lt;

        error_exit("listen");

    return server_fd;

    accept_incoming_connection(rio_runtime

*runtime,

rio_worker *worker)

    int new_connection_socket;

    int flags;

    int ret;

    unsigned

int client_len;

ev;

temp_client;

cli;

k;

    client_len

= sizeof(temp_client);

    //accept client connection

    new_connection_socket

= accept(runtime-&gt;server_fd,

                                  (struct

sockaddr *)

&amp;temp_client,

                                   &amp;client_len);

    if (new_connection_socket

        //#TODO what to do?

        error_exit("Could not accept socket");

    //check sockets flags and set non-blocking after

    if (-1

== (flags

= fcntl(new_connection_socket,

F_GETFL, 0)))

        flags

    if (fcntl(new_connection_socket,

F_SETFL, flags

| O_NONBLOCK)

        error_exit("Could not set client socket non-blocking");

    ev.events

    ev.data.fd

= new_connection_socket;

    //add socket to epoll

                  EPOLL_CTL_ADD,

                  new_connection_socket,

                  &amp;ev)

        error_exit("Could not add conn_sock to epoll");

    //store client information

    cli.fd

    cli.websocket

    cli.buffer

h, new_connection_socket

    debug_print("New Client: %d\n",

cli.fd);

    run_worker(int

id, rio_worker*

worker, rio_runtime

*runtime)

    int size_epoll_events;

    int rc;

ev, events[MAX_EVENTS];

    sprintf(worker-&gt;name,

"worker %d", id);

    debug_print("Identifying worker as %s pid %d\n",

worker-&gt;name,

                                                     getpid());

    init_clients();

    init_dispatcher();

    init_static_server();

    worker-&gt;zmq_context

= zmq_init(1);

    worker-&gt;master

= zmq_socket(worker-&gt;zmq_context,

ZMQ_SUB);

    zmq_setsockopt(worker-&gt;master,

ZMQ_SUBSCRIBE, "",

strlen(""));

    zmq_connect(worker-&gt;master,

"ipc:///tmp/rio_master.sock");

    //create epoll

    worker-&gt;epoll_fd

= epoll_create(MAX_EVENTS);

    if (worker-&gt;epoll_fd

        error_exit("epoll_create");

    //configure epoll events and file descriptor

EPOLLPRI;

= runtime-&gt;server_fd;

    //add listen socket to epoll

                  runtime-&gt;server_fd,

        error_exit("epoll_ctl: listen_sock");

    while (1)

        //poll events

        size_epoll_events

= epoll_wait(worker-&gt;epoll_fd,

                                       events,

                                       MAX_EVENTS,

                                       100);

        if (size_epoll_events

== -1

        for (int

n = 0;

n &lt; size_epoll_events;

++n)

            //if event fd == server fd -&gt; accept new connection

(events[n].data.fd

== runtime-&gt;server_fd)

                accept_incoming_connection(runtime,

worker);

else { //handle in out readyness :)

                //retrieve client info by fd and handle event

                k

= kh_get(clients,

h, events[n].data.fd);

                cli

= kh_val(h,

k);

                handle_http(worker,

events[n],

&amp;cli);

        //dispatch responses

        dispatch_responses(worker);

        //look for master messages

        zmq_msg_t

msg;

        zmq_msg_init(&amp;msg);

        rc =

zmq_recv(worker-&gt;master,

&amp;msg,

ZMQ_NOBLOCK);

        if (rc

            debug_print("Worker %d Received %s from master\n",

                                            id,

                                            (char

*) zmq_msg_data(&amp;msg));

        if (strcmp((char

*) zmq_msg_data(&amp;msg),

"terminate") ==

            zmq_msg_close(&amp;msg);

        zmq_msg_close(&amp;msg);

    debug_print("\nWorker terminating gracefully\n",

    rc =

zmq_close(worker-&gt;master);

    debug_print("Worker ZMQ Socket close return %d\n",

rc);

zmq_term(worker-&gt;zmq_context);

    debug_print("Worker ZMQ Context termination return :%d\n",

    free_clients();

    destroy_static_server();

    destroy_dispatcher();

    close(worker-&gt;epoll_fd);