コアダンプの数だけ強くなれるよ

見習いエンジニアの備忘log

Python3サンプルコード集(その2)

Pythonのサンプルコードです。

ネットワーク系が中心となります。

書かなくなるとすぐ忘れてしまうので備忘録として残しておきます。

全てクラアント-サーバの通信のサンプルです。

TCP


tcp_client.py

import os
import sys
import socket

if __name__ == '__main__':

    address = ('127.0.0.1', 50000)
    datasize = 4096

    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(address)
    
    msg = 'Hello TCP'
    s.send(msg.encode())
    s.close()

tcp_server.py

import os
import sys
import socket

if __name__ == '__main__':

    address = ('127.0.0.1', 50000)
    datasize = 4096

    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind(address)
    s.listen(1)

    client, addr = s.accept()
    data = client.recv(datasize)
    
    sys.stdout.write("receive : {}\n".format(data.decode()))
    s.close()

UDP


udp_client.py

import os
import sys
import socket

if __name__ == '__main__':
    
    server_address = ('127.0.0.1', 50001)
    datasize = 4096

    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    msg = 'Hello UDP'
    s.sendto(msg.encode(), server_address)
    s.close()

udp_server.py

import os
import sys
import socket

if __name__ == '__main__':
    
    bind_address = ('127.0.0.1', 50001)
    datasize = 4096
    
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.bind(bind_address)
    data, client = s.recvfrom(datasize)
    sys.stdout.write("receive : {}\n".format(data.decode()))

    s.close()


UNIXドメイン(DATAGRAM型)


unix_dgram_client.py

import os
import sys
import socket

if __name__ == '__main__':
    
    socket_path = './unix_dgram'

    s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)

    s.connect(socket_path)

    msg = 'Hello unix domain (dgram)'
    s.send(msg.encode())

    s.close()

unix_dgram_server.py

import os
import sys
import socket

if __name__ == '__main__':
    
    socket_path = './unix_dgram'
    datasize = 4096

    s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)

    s.bind(socket_path)

    data, client = s.recvfrom(datasize)

    sys.stdout.write("received : {}\n".format(data.decode()))

    os.unlink(socket_path)
    s.close()


UNIXドメイン(STREAM型)


unix_stream_client.py

import os
import sys
import socket

if __name__ == '__main__':
    
    socket_path = './unix_stream'

    s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

    s.connect(socket_path)

    msg = 'Hello unix domain (stream)'
    s.send(msg.encode())

    s.close()

unix_stream_server.py

import os
import sys
import socket

if __name__ == '__main__':
    
    socket_path = './unix_stream'
    datasize = 4096

    s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

    s.bind(socket_path)
    s.listen(1)

    client, addr = s.accept()

    data = client.recv(datasize)

    sys.stdout.write("received : {}\n".format(data.decode()))

    os.unlink(socket_path)
    s.close()


TCP(selectで待つ)


tcp_select.py

import os
import sys
import socket
import select

if __name__ == '__main__':
    
    address = ('127.0.0.1', 50000)
    datasize = 4096
    maxconn = 10
    sockets = []
    
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    s.bind(address)
    s.listen(maxconn)
    sockets.append(s)    

    try:
        readfds = list(sockets)
        while True:
            
            sys.stdout.write("waiting...\n")
            r,w,x = select.select(readfds, [], [])
            
            for s in r:
                if s in sockets:    
                    conn, addr = s.accept()
                    readfds.append(conn)
                    sys.stdout.write("connected.\n")
                else:
                    data = s.recv(datasize)
                    if len(data) == 0:
                        sys.stdout.write("disconnected.\n")
                        readfds.remove(s)
                        s.close()
                        break
                    sys.stdout.write("received {}\n".format(data.decode()))
    finally:
        s.close()


TCP-UDP-UNIXを全てselectで待つ


all_select.py

import os
import sys
import socket
import select
import signal

if __name__ == '__main__':
    
    address1 = ('127.0.0.1', 50000)
    address2 = ('127.0.0.1', 50001)
    socket_path1 = "./unix_stream"
    socket_path2 = "./unix_dgram"
    datasize = 4096
    maxconn = 1
    sockets = []
    conn_unix = None
    conn_tcp = None

    tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp.bind(address1)
    tcp.listen(maxconn)

    udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp.bind(address2)

    unix_s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    unix_s.bind(socket_path1)
    unix_s.listen(maxconn)

    unix_d = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
    unix_d.bind(socket_path2)

    sockets.append(tcp)
    sockets.append(udp)
    sockets.append(unix_s)
    sockets.append(unix_d)

    try:
        readfds = list(sockets)
        while True:
            
            sys.stdout.write("waiting...\n")
            r,w,x = select.select(readfds, [], [])
            
            for s in r:
                if s in sockets:
                    if s == tcp:
                        conn_tcp, addr_tcp = s.accept()
                        readfds.append(conn_tcp)
                        sys.stdout.write("tcp connected.\n")
                    if s == udp:
                        data, client = s.recvfrom(datasize)
                        sys.stdout.write("received {}\n".format(data.decode()))
                    if s == unix_s:
                        conn_unix, addr_unix = s.accept()
                        readfds.append(conn_unix)
                        sys.stdout.write("unix connected.\n")
                    if s == unix_d:
                        data, client = s.recvfrom(datasize)
                        sys.stdout.write("received {}\n".format(data.decode()))
                else:
                    if s == conn_tcp:
                        data = s.recv(datasize)
                        if len(data) == 0:
                            sys.stdout.write("tcp disconnected.\n")
                            readfds.remove(s)
                            s.close()
                            break
                        sys.stdout.write("received {}\n".format(data.decode()))
                    if s == conn_unix:
                        data = s.recv(datasize)
                        if len(data) == 0:
                            sys.stdout.write("unix disconnected.\n")
                            readfds.remove(s)
                            s.close()
                            break
                        sys.stdout.write("received {}\n".format(data.decode()))
    except KeyboardInterrupt:
        tcp.close()
        udp.close()
        os.unlink(socket_path1)
        os.unlink(socket_path2)
        unix_s.close()
        unix_d.close()


エラー処理は上手く書けないのでごめんなさい。

www.segmentation-fault.xyz